Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: set of fixes and improvements for vega-market-sim #471

Merged
merged 7 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import os
import logging


# ref: https://pytest-xdist.readthedocs.io/en/latest/how-to.html#creating-one-log-file-for-each-worker
def pytest_configure(config):
path = "test_logs"
if not os.path.exists(path):
os.makedirs(path)

worker_id = os.environ.get("PYTEST_XDIST_WORKER")
if worker_id is not None:
logging.basicConfig(
format=config.getini("log_file_format"),
filename=f"{path}/tests_{worker_id}.test.log",
level=os.getenv("LOG_LEVEL", config.getini("log_file_level")),
)
7 changes: 6 additions & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
[pytest]
markers =
integration: mark a test as requiring a full vega sim infrastructure with running backend
integration: mark a test as requiring a full vega sim infrastructure with running backend

log_file_format=%(asctime)s.%(msecs)03d %(threadName)s %(processName)s (%(filename)s:%(funcName)s:%(lineno)s):%(message)s
log_file_date_format=%Y-%m-%d %H:%M:%S

log_file_level = INFO
16 changes: 14 additions & 2 deletions scripts/run-integration-test.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
#!/usr/bin/env bash

set -e


# Default values for environment variables. Those variables may be exported with bash `export <ENV_NAME>=value`
# Jenkins provides definition of the below variables
: "${PARALLEL_WORKERS:=0}"
: "${TEST_FUNCTION:=}"
: "${LOG_LEVEL:=INFO}"

WORK_DIR="$(realpath "$(dirname "$0")/..")"
RESULT_DIR="${WORK_DIR}/test_logs/$(date '+%F_%H%M%S')-integration"
mkdir -p "${RESULT_DIR}"

pytest -s -v -m integration --junitxml ${RESULT_DIR}/integration-test-results.xml --log-cli-level INFO

set -x
pytest -s -v -m integration \
--junitxml ${RESULT_DIR}/integration-test-results.xml \
--log-cli-level "${LOG_LEVEL}" \
-n "${PARALLEL_WORKERS}" \
-k "${TEST_FUNCTION}"
1 change: 1 addition & 0 deletions tests/integration/test_reinforcement.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


@pytest.mark.integration
@pytest.mark.skip(reason="We will enable it once job is stable in the Jenkins")
def test_rl_run():
# Simply testing that it doesn't error
import vega_sim.reinforcement.run_rl_agent as rl
Expand Down
11 changes: 7 additions & 4 deletions tests/integration/utils/fixtures.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections import namedtuple
import logging
from typing import Optional

import pytest
Expand Down Expand Up @@ -137,7 +138,7 @@ def build_basic_market(
vega.wait_for_total_catchup()


@pytest.fixture
@pytest.fixture(scope="function")
def vega_service():
with VegaServiceNull(
warn_on_raw_data_access=False,
Expand All @@ -147,15 +148,16 @@ def vega_service():
listen_for_high_volume_stream_updates=False,
) as vega:
yield vega
logging.debug("vega_service teardown")


@pytest.fixture
@pytest.fixture(scope="function")
def vega_service_with_market(vega_service):
build_basic_market(vega_service, initial_price=0.3)
return vega_service


@pytest.fixture
@pytest.fixture(scope="function")
def vega_service_with_high_volume():
with VegaServiceNull(
warn_on_raw_data_access=False,
Expand All @@ -165,9 +167,10 @@ def vega_service_with_high_volume():
listen_for_high_volume_stream_updates=True,
) as vega:
yield vega
logging.debug("vega_service_with_high_volume teardown")


@pytest.fixture
@pytest.fixture(scope="function")
def vega_service_with_high_volume_with_market(vega_service_with_high_volume):
build_basic_market(vega_service_with_high_volume, initial_price=0.3)
return vega_service_with_high_volume
69 changes: 49 additions & 20 deletions vega_sim/api/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@
import random
import string
import time
import sys
from decimal import Decimal
from typing import Any, Callable, Optional, TypeVar, Union

import requests

from vega_sim.grpc.client import VegaCoreClient, VegaTradingDataClientV2
from vega_sim.proto.data_node.api.v2.trading_data_pb2 import GetVegaTimeRequest
from vega_sim.proto.vega.api.v1.core_pb2 import StatisticsRequest
from vega_sim.tools.retry import retry

T = TypeVar("T")

TIME_FORWARD_URL = "{base_url}/api/v1/forwardtime"

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -56,7 +56,7 @@ def num_from_padded_int(to_convert: Union[str, int], decimals: int) -> float:
def wait_for_datanode_sync(
trading_data_client: VegaTradingDataClientV2,
core_data_client: VegaCoreClient,
max_retries: int = 650,
max_retries: int = 100,
) -> None:
"""Waits for Datanode to catch up to vega core client.
Note: Will wait for datanode 'latest' time to catch up to core time
Expand All @@ -67,15 +67,28 @@ def wait_for_datanode_sync(
*at the time of call* not necessarily the latest data when the function returns.

Wait time is exponential with increasing retries
(each attempt waits 0.0005 * 1.01^attempt_num seconds).
(each attempt waits 0.05 * 1.03^attempt_num seconds).
"""
attempts = 1
core_time = core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
trading_time = trading_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp

core_time = retry(
10, 0.5, lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
)
trading_time = retry(
10, 0.5, lambda: trading_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
)
while core_time > trading_time:
time.sleep(0.0005 * 1.01**attempts)
trading_time = trading_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
logging.debug(f"Sleeping in wait_for_datanode_sync for {0.05 * 1.03**attempts}")
time.sleep(0.05 * 1.03**attempts)
try:
trading_time = retry(
10,
2.0,
lambda: trading_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp,
)
except Exception as e:
logging.warn(e)
trading_time = sys.maxsize

attempts += 1
if attempts >= max_retries:
raise DataNodeBehindError(
Expand All @@ -85,22 +98,36 @@ def wait_for_datanode_sync(

def wait_for_core_catchup(
core_data_client: VegaCoreClient,
max_retries: int = 1000,
max_retries: int = 20,
) -> None:
"""Waits for core node to fully execute everything in it's backlog.
Note that this operates by a rough cut of requesting time twice and checking for it
being unchanged, so only works on nullchain where we control time. May wait forever
in a standard tendermint chain
"""
attempts = 1
core_time = core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
time.sleep(0.0001)
core_time_two = core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
core_time = retry(
10, 0.5, lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
)
time.sleep(0.1)
core_time_two = retry(
10, 0.5, lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
)

while core_time != core_time_two:
core_time = core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
time.sleep(0.0001)
core_time_two = core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
logging.debug(f"Sleeping in wait_for_core_catchup for {0.05 * 1.03**attempts}")

core_time = retry(
10,
0.5,
lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp,
)
time.sleep(0.05 * 1.03**attempts)
core_time_two = retry(
10,
0.5,
lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp,
)
attempts += 1
if attempts >= max_retries:
raise DataNodeBehindError(
Expand All @@ -114,18 +141,18 @@ def wait_for_acceptance(
) -> T:
logger.debug("Waiting for proposal acceptance")
submission_accepted = False
for _ in range(1000):
for i in range(20):
try:
proposal = submission_load_func(submission_ref)
except:
time.sleep(0.001)
time.sleep(0.05 * 1.1**i)
continue

if proposal:
logger.debug("Your proposal has been accepted by the network")
submission_accepted = True
break
time.sleep(0.001)
time.sleep(0.05 * 1.1**i)

if not submission_accepted:
raise ProposalNotAcceptedError(
Expand All @@ -152,4 +179,6 @@ def forward(time: str, vega_node_url: str) -> None:


def statistics(core_data_client: VegaCoreClient):
return core_data_client.Statistics(StatisticsRequest()).statistics
return retry(
10, 0.5, lambda: core_data_client.Statistics(StatisticsRequest()).statistics
)
3 changes: 2 additions & 1 deletion vega_sim/local_data_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import vega_sim.grpc.client as vac
import vega_sim.proto.vega as vega_protos
import vega_sim.proto.vega.events.v1.events_pb2 as events_protos
from vega_sim.tools.retry import retry

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,7 +57,7 @@ def _queue_forwarder(
for event in o.events:
if (kill_thread_sig is not None) and kill_thread_sig.is_set():
return
output = handlers[event.type](event)
output = retry(5, 1.0, lambda: handlers[event.type](event))
if isinstance(output, (list, GeneratorType)):
for elem in output:
sink.put(elem)
Expand Down
Loading