Skip to content

Commit 92138f9

Browse files
committed
refactor fetch_and_render into components
1 parent 38cfd35 commit 92138f9

16 files changed

+1134
-158
lines changed

Diff for: .gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
bazel_events
33
travis_events
44
results.db
5+
cache_dir/
56

67
.vscode/
78

Diff for: ray_ci_tracker/__init__.py

Whitespace-only changes.

Diff for: ray_ci_tracker/common.py

+121
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import asyncio
2+
import functools
3+
import os
4+
from datetime import datetime
5+
from functools import wraps
6+
from itertools import chain
7+
from pathlib import Path
8+
from subprocess import PIPE
9+
from typing import List, Optional, Tuple
10+
11+
import aiofiles
12+
import click
13+
import httpx
14+
import ujson as json
15+
from dotenv import load_dotenv
16+
from tqdm.asyncio import tqdm_asyncio
17+
18+
from ray_ci_tracker.interfaces import (
19+
BuildkiteArtifact,
20+
BuildkiteStatus,
21+
BuildResult,
22+
GHAJobStat,
23+
GHCommit,
24+
TestResult,
25+
_parse_duration,
26+
)
27+
28+
29+
def retry(func):
30+
@wraps(func)
31+
async def wrapper(*args, **kwargs):
32+
exception = None
33+
for _ in range(3):
34+
try:
35+
return await func(*args, **kwargs)
36+
except Exception as e:
37+
exception = e
38+
else:
39+
raise exception
40+
41+
return wrapper
42+
43+
44+
def run_as_sync(async_func):
45+
@wraps(async_func)
46+
def wrapper(*args, **kwargs):
47+
asyncio.run(async_func(*args, **kwargs))
48+
49+
return wrapper
50+
51+
52+
async def get_or_fetch(
53+
cache_path: Path, *, use_cached: bool, result_cls, many: bool, async_func
54+
):
55+
if not use_cached or not cache_path.exists():
56+
result = await async_func()
57+
if result is None:
58+
return
59+
cache_path.parent.mkdir(parents=True, exist_ok=True)
60+
async with aiofiles.open(cache_path, "w") as f:
61+
if result_cls and many:
62+
content = json.dumps([r.to_dict() for r in result])
63+
elif result_cls and not many:
64+
content = json.dumps(result.to_dict())
65+
else:
66+
content = json.dumps(result)
67+
await f.write(content)
68+
return result
69+
else:
70+
async with aiofiles.open(cache_path) as f:
71+
content = await f.read()
72+
if result_cls and many:
73+
loaded = json.loads(content)
74+
return [result_cls.from_dict(r) for r in loaded]
75+
elif result_cls and not many:
76+
return result_cls.from_json(content)
77+
else:
78+
return json.loads(content)
79+
80+
81+
def _yield_test_result(bazel_log_path):
82+
# Gather the known flaky set
83+
flaky_tests = set()
84+
with open(bazel_log_path) as f:
85+
for line in f:
86+
loaded = json.loads(line)
87+
if "targetConfigured" in loaded["id"] and "tag" in loaded["configured"]:
88+
test_name = loaded["id"]["targetConfigured"]["label"]
89+
if "flaky" in loaded["configured"]["tag"]:
90+
flaky_tests.add(test_name)
91+
92+
with open(bazel_log_path) as f:
93+
for line in f:
94+
loaded = json.loads(line)
95+
if "testSummary" in loaded:
96+
test_summary = loaded["testSummary"]
97+
98+
name = loaded["id"]["testSummary"]["label"]
99+
status = test_summary["overallStatus"]
100+
if status in {"FAILED", "TIMEOUT", "NO_STATUS"}:
101+
status = "FAILED"
102+
duration_s = float(test_summary["totalRunDurationMillis"]) / 1e3
103+
yield TestResult(name, status, duration_s, name in flaky_tests)
104+
105+
106+
def _process_single_build(dir_name) -> BuildResult:
107+
with open(dir_name / "metadata.json") as f:
108+
metadata = json.load(f)
109+
110+
return BuildResult(
111+
sha=metadata["build_env"]["TRAVIS_COMMIT"],
112+
job_url=metadata["build_env"]["TRAVIS_JOB_WEB_URL"],
113+
os=metadata["build_env"]["TRAVIS_OS_NAME"],
114+
build_env=metadata["build_config"]["config"]["env"],
115+
job_id=os.path.split(dir_name)[-1],
116+
results=list(
117+
chain.from_iterable(
118+
_yield_test_result(log) for log in dir_name.glob("bazel_log.*")
119+
)
120+
),
121+
)

Diff for: ray_ci_tracker/data_source/__init__.py

Whitespace-only changes.

Diff for: ray_ci_tracker/data_source/buildkite.py

+196
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
import asyncio
2+
import functools
3+
import os
4+
from itertools import chain
5+
from pathlib import Path
6+
from typing import List
7+
8+
import aiofiles
9+
import httpx
10+
from tqdm.asyncio import tqdm_asyncio
11+
12+
from ray_ci_tracker.common import _process_single_build, get_or_fetch, retry
13+
from ray_ci_tracker.interfaces import BuildkiteArtifact, BuildkiteStatus, BuildResult
14+
15+
GRAPHQL_QUERY = """
16+
query AllPipelinesQuery {
17+
pipeline(slug: "ray-project/ray-builders-branch") {
18+
builds(branch: "master", commit: "COMMIT_PLACEHODLER") {
19+
count
20+
edges {
21+
node {
22+
createdAt
23+
startedAt
24+
finishedAt
25+
number
26+
jobs(first: 100) {
27+
edges {
28+
node {
29+
... on JobTypeCommand {
30+
uuid
31+
label
32+
passed
33+
state
34+
url
35+
build {
36+
commit
37+
}
38+
createdAt
39+
runnableAt
40+
startedAt
41+
finishedAt
42+
artifacts(first: 100) {
43+
edges {
44+
node {
45+
downloadURL
46+
path
47+
}
48+
}
49+
}
50+
}
51+
}
52+
}
53+
}
54+
}
55+
}
56+
}
57+
}
58+
}
59+
"""
60+
61+
62+
class BuildkiteSource:
63+
@staticmethod
64+
async def fetch_all(cache_path: Path, cached_buildkite, commits):
65+
print("Downloading Buildkite Status (Jobs)")
66+
concurrency_limiter = asyncio.Semaphore(20)
67+
buildkite_jsons = await tqdm_asyncio.gather(
68+
*[
69+
get_or_fetch(
70+
cache_path / f"bk_jobs/{commit.sha}/http_resp.json",
71+
use_cached=cached_buildkite,
72+
result_cls=None,
73+
many=False,
74+
async_func=functools.partial(
75+
BuildkiteSource.get_buildkite_job_status,
76+
commit_sha=commit.sha,
77+
concurrency_limiter=concurrency_limiter,
78+
),
79+
)
80+
for commit in commits
81+
]
82+
)
83+
buildkite_parsed: List[BuildkiteStatus] = await asyncio.gather(
84+
*[
85+
get_or_fetch(
86+
cache_path / f"bk_jobs/{commit.sha}/parsed.json",
87+
use_cached=cached_buildkite,
88+
result_cls=BuildkiteStatus,
89+
many=True,
90+
async_func=functools.partial(
91+
BuildkiteSource.parse_buildkite_build_json,
92+
resp_json,
93+
),
94+
)
95+
for commit, resp_json in zip(commits, buildkite_jsons)
96+
]
97+
)
98+
print("Downloading Buildkite Status (macOS Bazel Events)")
99+
macos_bazel_events = await tqdm_asyncio.gather(
100+
*[
101+
get_or_fetch(
102+
cache_path
103+
/ f"bazel_cached/{status.commit}/mac_result_{status.job_id}.json",
104+
use_cached=cached_buildkite,
105+
result_cls=BuildResult,
106+
many=False,
107+
async_func=functools.partial(
108+
BuildkiteSource.get_buildkite_artifact,
109+
dir_prefix=cache_path,
110+
artifacts=status.artifacts,
111+
concurrency_limiter=asyncio.Semaphore(50),
112+
),
113+
)
114+
for status in chain.from_iterable(buildkite_parsed)
115+
if len(status.artifacts) > 0
116+
]
117+
)
118+
return list(chain.from_iterable(buildkite_parsed)), macos_bazel_events
119+
120+
@staticmethod
121+
@retry
122+
async def get_buildkite_job_status(
123+
commit_sha, concurrency_limiter: asyncio.Semaphore
124+
):
125+
http_client = httpx.AsyncClient(timeout=httpx.Timeout(60))
126+
async with concurrency_limiter, http_client:
127+
resp = await http_client.post(
128+
"https://graphql.buildkite.com/v1",
129+
headers={"Authorization": f"Bearer {os.environ['BUILDKITE_TOKEN']}"},
130+
json={"query": GRAPHQL_QUERY.replace("COMMIT_PLACEHODLER", commit_sha)},
131+
)
132+
resp.raise_for_status()
133+
return resp.json()
134+
135+
@staticmethod
136+
async def parse_buildkite_build_json(
137+
resp_json: dict,
138+
) -> List[BuildkiteStatus]:
139+
builds = resp_json["data"]["pipeline"]["builds"]["edges"]
140+
141+
statuses = []
142+
for build in builds:
143+
jobs = build["node"]["jobs"]["edges"]
144+
for job in jobs:
145+
actual_job = job["node"]
146+
job_id = actual_job["uuid"]
147+
sha = actual_job["build"]["commit"]
148+
149+
artifacts = []
150+
for artifact in actual_job["artifacts"]["edges"]:
151+
url = artifact["node"]["downloadURL"]
152+
path = artifact["node"]["path"]
153+
filename = os.path.split(path)[1]
154+
on_disk_path = f"bazel_events/master/{sha}/{job_id}/{filename}"
155+
artifacts.append(
156+
BuildkiteArtifact(url=url, bazel_events_path=on_disk_path)
157+
)
158+
159+
status = BuildkiteStatus(
160+
job_id=job_id,
161+
label=actual_job["label"],
162+
passed=actual_job["passed"],
163+
state=actual_job["state"],
164+
url=actual_job["url"],
165+
commit=sha,
166+
startedAt=actual_job["startedAt"],
167+
finished_at=actual_job["finishedAt"],
168+
artifacts=artifacts,
169+
)
170+
statuses.append(status)
171+
return statuses
172+
173+
@staticmethod
174+
@retry
175+
async def get_buildkite_artifact(
176+
dir_prefix: Path,
177+
artifacts: List[BuildkiteArtifact],
178+
concurrency_limiter: asyncio.Semaphore,
179+
) -> BuildResult:
180+
assert len(artifacts)
181+
182+
bazel_events_dir = None
183+
async with concurrency_limiter, httpx.AsyncClient() as client:
184+
for artifact in artifacts:
185+
path = dir_prefix / artifact.bazel_events_path
186+
path.parent.mkdir(exist_ok=True, parents=True)
187+
bazel_events_dir = path.parent
188+
async with client.stream(
189+
"GET", artifact.url
190+
) as response, aiofiles.open(path, "wb") as f:
191+
response.raise_for_status()
192+
async for chunk in response.aiter_bytes():
193+
await f.write(chunk)
194+
195+
assert bazel_events_dir is not None
196+
return _process_single_build(bazel_events_dir)

0 commit comments

Comments
 (0)