Skip to content

Commit

Permalink
feat: set of fixes and improvements for vega-market-sim (#471)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Tom McLean <[email protected]>
  • Loading branch information
daniel1302 and TomMcL authored Jul 31, 2023
1 parent 237a86f commit bfc8439
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 63 deletions.
17 changes: 17 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import os
import logging


# ref: https://pytest-xdist.readthedocs.io/en/latest/how-to.html#creating-one-log-file-for-each-worker
def pytest_configure(config):
path = "test_logs"
if not os.path.exists(path):
os.makedirs(path)

worker_id = os.environ.get("PYTEST_XDIST_WORKER")
if worker_id is not None:
logging.basicConfig(
format=config.getini("log_file_format"),
filename=f"{path}/tests_{worker_id}.test.log",
level=os.getenv("LOG_LEVEL", config.getini("log_file_level")),
)
7 changes: 6 additions & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
[pytest]
markers =
integration: mark a test as requiring a full vega sim infrastructure with running backend
integration: mark a test as requiring a full vega sim infrastructure with running backend

log_file_format=%(asctime)s.%(msecs)03d %(threadName)s %(processName)s (%(filename)s:%(funcName)s:%(lineno)s):%(message)s
log_file_date_format=%Y-%m-%d %H:%M:%S

log_file_level = INFO
16 changes: 14 additions & 2 deletions scripts/run-integration-test.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
#!/usr/bin/env bash

set -e


# Default values for environment variables. Those variables may be exported with bash `export <ENV_NAME>=value`
# Jenkins provides definition of the below variables
: "${PARALLEL_WORKERS:=0}"
: "${TEST_FUNCTION:=}"
: "${LOG_LEVEL:=INFO}"

WORK_DIR="$(realpath "$(dirname "$0")/..")"
RESULT_DIR="${WORK_DIR}/test_logs/$(date '+%F_%H%M%S')-integration"
mkdir -p "${RESULT_DIR}"

pytest -s -v -m integration --junitxml ${RESULT_DIR}/integration-test-results.xml --log-cli-level INFO

set -x
pytest -s -v -m integration \
--junitxml ${RESULT_DIR}/integration-test-results.xml \
--log-cli-level "${LOG_LEVEL}" \
-n "${PARALLEL_WORKERS}" \
-k "${TEST_FUNCTION}"
1 change: 1 addition & 0 deletions tests/integration/test_reinforcement.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


@pytest.mark.integration
@pytest.mark.skip(reason="We will enable it once job is stable in the Jenkins")
def test_rl_run():
# Simply testing that it doesn't error
import vega_sim.reinforcement.run_rl_agent as rl
Expand Down
11 changes: 7 additions & 4 deletions tests/integration/utils/fixtures.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections import namedtuple
import logging
from typing import Optional

import pytest
Expand Down Expand Up @@ -137,7 +138,7 @@ def build_basic_market(
vega.wait_for_total_catchup()


@pytest.fixture
@pytest.fixture(scope="function")
def vega_service():
with VegaServiceNull(
warn_on_raw_data_access=False,
Expand All @@ -147,15 +148,16 @@ def vega_service():
listen_for_high_volume_stream_updates=False,
) as vega:
yield vega
logging.debug("vega_service teardown")


@pytest.fixture
@pytest.fixture(scope="function")
def vega_service_with_market(vega_service):
build_basic_market(vega_service, initial_price=0.3)
return vega_service


@pytest.fixture
@pytest.fixture(scope="function")
def vega_service_with_high_volume():
with VegaServiceNull(
warn_on_raw_data_access=False,
Expand All @@ -165,9 +167,10 @@ def vega_service_with_high_volume():
listen_for_high_volume_stream_updates=True,
) as vega:
yield vega
logging.debug("vega_service_with_high_volume teardown")


@pytest.fixture
@pytest.fixture(scope="function")
def vega_service_with_high_volume_with_market(vega_service_with_high_volume):
build_basic_market(vega_service_with_high_volume, initial_price=0.3)
return vega_service_with_high_volume
69 changes: 49 additions & 20 deletions vega_sim/api/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@
import random
import string
import time
import sys
from decimal import Decimal
from typing import Any, Callable, Optional, TypeVar, Union

import requests

from vega_sim.grpc.client import VegaCoreClient, VegaTradingDataClientV2
from vega_sim.proto.data_node.api.v2.trading_data_pb2 import GetVegaTimeRequest
from vega_sim.proto.vega.api.v1.core_pb2 import StatisticsRequest
from vega_sim.tools.retry import retry

T = TypeVar("T")

TIME_FORWARD_URL = "{base_url}/api/v1/forwardtime"

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -56,7 +56,7 @@ def num_from_padded_int(to_convert: Union[str, int], decimals: int) -> float:
def wait_for_datanode_sync(
trading_data_client: VegaTradingDataClientV2,
core_data_client: VegaCoreClient,
max_retries: int = 650,
max_retries: int = 100,
) -> None:
"""Waits for Datanode to catch up to vega core client.
Note: Will wait for datanode 'latest' time to catch up to core time
Expand All @@ -67,15 +67,28 @@ def wait_for_datanode_sync(
*at the time of call* not necessarily the latest data when the function returns.
Wait time is exponential with increasing retries
(each attempt waits 0.0005 * 1.01^attempt_num seconds).
(each attempt waits 0.05 * 1.03^attempt_num seconds).
"""
attempts = 1
core_time = core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
trading_time = trading_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp

core_time = retry(
10, 0.5, lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
)
trading_time = retry(
10, 0.5, lambda: trading_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
)
while core_time > trading_time:
time.sleep(0.0005 * 1.01**attempts)
trading_time = trading_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
logging.debug(f"Sleeping in wait_for_datanode_sync for {0.05 * 1.03**attempts}")
time.sleep(0.05 * 1.03**attempts)
try:
trading_time = retry(
10,
2.0,
lambda: trading_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp,
)
except Exception as e:
logging.warn(e)
trading_time = sys.maxsize

attempts += 1
if attempts >= max_retries:
raise DataNodeBehindError(
Expand All @@ -85,22 +98,36 @@ def wait_for_datanode_sync(

def wait_for_core_catchup(
core_data_client: VegaCoreClient,
max_retries: int = 1000,
max_retries: int = 20,
) -> None:
"""Waits for core node to fully execute everything in it's backlog.
Note that this operates by a rough cut of requesting time twice and checking for it
being unchanged, so only works on nullchain where we control time. May wait forever
in a standard tendermint chain
"""
attempts = 1
core_time = core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
time.sleep(0.0001)
core_time_two = core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
core_time = retry(
10, 0.5, lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
)
time.sleep(0.1)
core_time_two = retry(
10, 0.5, lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
)

while core_time != core_time_two:
core_time = core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
time.sleep(0.0001)
core_time_two = core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
logging.debug(f"Sleeping in wait_for_core_catchup for {0.05 * 1.03**attempts}")

core_time = retry(
10,
0.5,
lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp,
)
time.sleep(0.05 * 1.03**attempts)
core_time_two = retry(
10,
0.5,
lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp,
)
attempts += 1
if attempts >= max_retries:
raise DataNodeBehindError(
Expand All @@ -114,18 +141,18 @@ def wait_for_acceptance(
) -> T:
logger.debug("Waiting for proposal acceptance")
submission_accepted = False
for _ in range(1000):
for i in range(20):
try:
proposal = submission_load_func(submission_ref)
except:
time.sleep(0.001)
time.sleep(0.05 * 1.1**i)
continue

if proposal:
logger.debug("Your proposal has been accepted by the network")
submission_accepted = True
break
time.sleep(0.001)
time.sleep(0.05 * 1.1**i)

if not submission_accepted:
raise ProposalNotAcceptedError(
Expand All @@ -152,4 +179,6 @@ def forward(time: str, vega_node_url: str) -> None:


def statistics(core_data_client: VegaCoreClient):
return core_data_client.Statistics(StatisticsRequest()).statistics
return retry(
10, 0.5, lambda: core_data_client.Statistics(StatisticsRequest()).statistics
)
3 changes: 2 additions & 1 deletion vega_sim/local_data_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import vega_sim.grpc.client as vac
import vega_sim.proto.vega as vega_protos
import vega_sim.proto.vega.events.v1.events_pb2 as events_protos
from vega_sim.tools.retry import retry

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,7 +57,7 @@ def _queue_forwarder(
for event in o.events:
if (kill_thread_sig is not None) and kill_thread_sig.is_set():
return
output = handlers[event.type](event)
output = retry(5, 1.0, lambda: handlers[event.type](event))
if isinstance(output, (list, GeneratorType)):
for elem in output:
sink.put(elem)
Expand Down
Loading

0 comments on commit bfc8439

Please sign in to comment.