Skip to content

Commit

Permalink
fix: linting issues
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel1302 committed Jul 29, 2023
1 parent ca5be5d commit fe42db9
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 38 deletions.
5 changes: 3 additions & 2 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import os
import logging


# ref: https://pytest-xdist.readthedocs.io/en/latest/how-to.html#creating-one-log-file-for-each-worker
def pytest_configure(config):
path = "test_logs"
isExist = os.path.exists(path)
if not isExist:
os.makedirs(path)

env_log_level = os.getenv('LOG_LEVEL')
env_log_level = os.getenv("LOG_LEVEL")
log_level = config.getini("log_file_level")
if not env_log_level is None:
log_level = env_log_level
Expand All @@ -19,4 +20,4 @@ def pytest_configure(config):
format=config.getini("log_file_format"),
filename=f"{path}/tests_{worker_id}.test.log",
level=log_level,
)
)
1 change: 0 additions & 1 deletion tests/integration/utils/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ def vega_service():
) as vega:
yield vega
logging.debug("vega_service theardown")



@pytest.fixture(scope="function")
Expand Down
38 changes: 30 additions & 8 deletions vega_sim/api/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,21 @@ def wait_for_datanode_sync(
(each attempt waits 0.05 * 1.03^attempt_num seconds).
"""
attempts = 1
core_time = retry(10, 0.5, lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp)
trading_time = retry(10, 0.5, lambda: trading_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp)
core_time = retry(
10, 0.5, lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
)
trading_time = retry(
10, 0.5, lambda: trading_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
)
while core_time > trading_time:
logging.debug(f"Sleeping in wait_for_datanode_sync for {0.05 * 1.03**attempts}")
time.sleep(0.05 * 1.03**attempts)
try:
trading_time = retry(10, 2.0, lambda: trading_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp)
trading_time = retry(
10,
2.0,
lambda: trading_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp,
)
except Exception as e:
logging.warn(e)
trading_time = sys.maxsize
Expand All @@ -98,16 +106,28 @@ def wait_for_core_catchup(
in a standard tendermint chain
"""
attempts = 1
core_time = retry(10, 0.5, lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp)
core_time = retry(
10, 0.5, lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
)
time.sleep(0.1)
core_time_two = retry(10, 0.5, lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp)
core_time_two = retry(
10, 0.5, lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
)

while core_time != core_time_two:
logging.debug(f"Sleeping in wait_for_core_catchup for {0.05 * 1.03**attempts}")

core_time = retry(10, 0.5, lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp)
core_time = retry(
10,
0.5,
lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp,
)
time.sleep(0.05 * 1.03**attempts)
core_time_two = retry(10, 0.5, lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp)
core_time_two = retry(
10,
0.5,
lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp,
)
attempts += 1
if attempts >= max_retries:
raise DataNodeBehindError(
Expand Down Expand Up @@ -159,4 +179,6 @@ def forward(time: str, vega_node_url: str) -> None:


def statistics(core_data_client: VegaCoreClient):
return retry(10, 0.5, lambda: core_data_client.Statistics(StatisticsRequest()).statistics)
return retry(
10, 0.5, lambda: core_data_client.Statistics(StatisticsRequest()).statistics
)
54 changes: 29 additions & 25 deletions vega_sim/null_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,27 +572,24 @@ def manage_vega_processes(

# Send process pid values for resource monitoring
child_conn.send({name: process.pid for name, process in processes.items()})




# According to https://docs.oracle.com/cd/E19455-01/806-5257/gen-75415/index.html
# There is no guarantee that signal will be catch by this thread. Usually the
# parent process catches the signal and removes it from the list of pending
# There is no guarantee that signal will be catch by this thread. Usually the
# parent process catches the signal and removes it from the list of pending
# signals, this leave us with memory leak where we have orphaned vega processes
# and the docker containers. Below is hack to maximize chance by catching the
# signal.
# We call signal.signal method as a workaround to move this thread on top of
# the catch stack, then sigwait waits until singal is trapped.
# As last resort We catches the `SIGCHLD` in case the parent process exited
# and the docker containers. Below is hack to maximize chance by catching the
# signal.
# We call signal.signal method as a workaround to move this thread on top of
# the catch stack, then sigwait waits until singal is trapped.
# As last resort We catches the `SIGCHLD` in case the parent process exited
# and this is the orphan now.
# But to provide 100% guarantee this should be implemented in another way:
# - Signal should be trapped in the main process, and this should be synced
# the shared memory
# - Signal should be trapped in the main process, and this should be synced
# the shared memory
# - or this entire process manager should be incorporated in the VegaServiceNull
# and containers/processes should be removed as inline call in the __exit__
#
#
#
#
# Important assumption is that this signal can be caught multiple times as well
def sighandler(signal, frame):
if signal is None:
Expand All @@ -602,19 +599,22 @@ def sighandler(signal, frame):

logger.debug("Received signal from parent process")
if use_docker_postgres:

def kill_docker_container() -> None:
try:
data_node_container.kill()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
logger.debug(f"Container {data_node_container.name} has been already killed")
logger.debug(
f"Container {data_node_container.name} has been already killed"
)
return
else:
raise e

logger.debug(f"Stopping container {data_node_container.name}")
retry(10, 1.0, kill_docker_container)

removed = False
for _ in range(10):
try:
Expand All @@ -625,7 +625,9 @@ def kill_docker_container() -> None:
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
removed = True
logger.debug(f"Data node volume {data_node_docker_volume.name} has been already killed")
logger.debug(
f"Data node volume {data_node_docker_volume.name} has been already killed"
)
break
else:
time.sleep(1)
Expand All @@ -636,7 +638,6 @@ def kill_docker_container() -> None:
"Docker volume failed to cleanup, will require manual cleaning"
)


logger.debug("Starting termination for processes")
for name, process in processes.items():
logger.debug(f"Terminating process {name}(pid: {process.pid})")
Expand Down Expand Up @@ -665,11 +666,15 @@ def kill_docker_container() -> None:
# The below lines are workaround to put the signal listeners on top of the stack, so this process can handle it.
signal.signal(signal.SIGINT, lambda _s, _h: None)
signal.signal(signal.SIGTERM, lambda _s, _h: None)
signal.signal(signal.SIGCHLD, sighandler) # The process had previously created one or more child processes with the fork() function. One or more of these processes has since died.
signal.sigwait([
signal.SIGKILL, # The process was explicitly killed by somebody wielding the kill program.
signal.SIGTERM, # The process was explicitly killed by somebody wielding the terminate program.
])
signal.signal(
signal.SIGCHLD, sighandler
) # The process had previously created one or more child processes with the fork() function. One or more of these processes has since died.
signal.sigwait(
[
signal.SIGKILL, # The process was explicitly killed by somebody wielding the kill program.
signal.SIGTERM, # The process was explicitly killed by somebody wielding the terminate program.
]
)
sighandler(None, None)


Expand Down Expand Up @@ -935,7 +940,6 @@ def stop(self) -> None:
self.wallet.stop()
super().stop()


@property
def wallet_url(self) -> str:
return self._build_url(self.wallet_port)
Expand Down Expand Up @@ -976,4 +980,4 @@ def clone(self) -> VegaServiceNull:
port_config=self._generate_port_config(),
use_full_vega_wallet=self._use_full_vega_wallet,
warn_on_raw_data_access=self.warn_on_raw_data_access,
)
)
5 changes: 3 additions & 2 deletions vega_sim/tools/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import time
import logging

T = TypeVar('T')
T = TypeVar("T")


def retry(attempts: int, delay: float, func: Callable[[], T]) -> T:
for i in range(attempts):
Expand All @@ -14,5 +15,5 @@ def retry(attempts: int, delay: float, func: Callable[[], T]) -> T:
return result
except Exception as e:
time.sleep(delay)
if i == attempts-1:
if i == attempts - 1:
raise Exception(e)

0 comments on commit fe42db9

Please sign in to comment.