Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 2dc3f87

Browse files
committedMar 10, 2025·
Introduced command "Replay"
1 parent b3767d0 commit 2dc3f87

File tree

5 files changed

+30
-32
lines changed

5 files changed

+30
-32
lines changed
 

‎crates/hyperqueue/src/client/commands/journal/mod.rs

+11-13
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,13 @@ enum JournalCommand {
2525
/// Events will be exported to `stdout`, you can redirect it e.g. to a file.
2626
Export(ExportOpts),
2727

28-
/// Stream events from a running server.
29-
Stream(StreamOpts),
28+
/// Stream events from a running server, it first replays old events
29+
/// then it waits for new live events.
30+
Stream,
31+
32+
/// Stream events from a running server, it replays old events
33+
/// after that it terminates the connection.
34+
Replay,
3035

3136
/// Connect to a server and remove completed tasks and non-active workers from journal
3237
Prune,
@@ -43,29 +48,22 @@ struct ExportOpts {
4348
journal: PathBuf,
4449
}
4550

46-
#[derive(Parser)]
47-
struct StreamOpts {
48-
/// If enabled, server terminates the connection when all currents events
49-
/// are sent.
50-
#[arg(long)]
51-
history_only: bool,
52-
}
53-
5451
pub async fn command_journal(gsettings: &GlobalSettings, opts: JournalOpts) -> anyhow::Result<()> {
5552
match opts.command {
5653
JournalCommand::Export(opts) => export_json(opts),
57-
JournalCommand::Stream(opts) => stream_json(gsettings, opts).await,
54+
JournalCommand::Replay => stream_json(gsettings, false).await,
55+
JournalCommand::Stream => stream_json(gsettings, true).await,
5856
JournalCommand::Prune => prune_journal(gsettings).await,
5957
JournalCommand::Flush => flush_journal(gsettings).await,
6058
}
6159
}
6260

63-
async fn stream_json(gsettings: &GlobalSettings, opts: StreamOpts) -> anyhow::Result<()> {
61+
async fn stream_json(gsettings: &GlobalSettings, live_events: bool) -> anyhow::Result<()> {
6462
let mut connection = get_client_session(gsettings.server_directory()).await?;
6563
connection
6664
.connection()
6765
.send(FromClientMessage::StreamEvents(StreamEvents {
68-
history_only: opts.history_only,
66+
live_events,
6967
}))
7068
.await?;
7169
let stdout = std::io::stdout();

‎crates/hyperqueue/src/dashboard/data/fetch.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ pub async fn create_data_fetch_process(
1111
session
1212
.connection()
1313
.send(FromClientMessage::StreamEvents(StreamEvents {
14-
history_only: false,
14+
live_events: true,
1515
}))
1616
.await?;
1717

‎crates/hyperqueue/src/server/client/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ pub async fn client_rpc_loop<
175175
So while historic events are loaded from the file and streamed, live events are already
176176
collected and sent immediately the historic events are sent */
177177
let (tx1, rx1) = mpsc::unbounded_channel::<Event>();
178-
let live = if !msg.history_only {
178+
let live = if msg.live_events {
179179
let (tx2, rx2) = mpsc::unbounded_channel::<Event>();
180180
let listener_id = senders.events.register_listener(tx2);
181181
Some((rx2, listener_id))

‎crates/hyperqueue/src/transfer/messages.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ pub struct TaskKindProgram {
8888

8989
#[derive(Serialize, Deserialize, Debug, Clone)]
9090
pub struct StreamEvents {
91-
pub history_only: bool,
91+
pub live_events: bool,
9292
}
9393

9494
#[derive(Serialize, Deserialize, Debug, Clone)]

‎tests/test_events.py

+16-16
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,10 @@ def test_worker_stream_events2(hq_env: HqEnv, tmp_path):
6464
assert events[2]["event"]["desc"]["name"] == "uname"
6565

6666

67-
def test_worker_stream_history_only(hq_env: HqEnv, tmp_path):
67+
def test_worker_journal_replay(hq_env: HqEnv, tmp_path):
6868
journal = tmp_path.joinpath("test.journal")
6969
hq_env.start_server(args=["--journal", journal])
70-
r = hq_env.command(["journal", "stream", "--history-only"])
70+
r = hq_env.command(["journal", "replay"])
7171
msg = json.loads(r)
7272
assert msg["event"]["type"] == "server-start"
7373

@@ -162,21 +162,21 @@ def body():
162162
with hq_env.mock.mock_program_with_code(
163163
"rocm-smi",
164164
"""
165-
import json
166-
data = {
167-
"card0": {
168-
"GPU use (%)": "1.5",
169-
"GPU memory use (%)": "12.5",
170-
"PCI Bus": "FOOBAR1"
171-
},
172-
"card1": {
173-
"GPU use (%)": "12.5",
174-
"GPU memory use (%)": "64.0",
175-
"PCI Bus": "FOOBAR2"
176-
}
165+
import json
166+
data = {
167+
"card0": {
168+
"GPU use (%)": "1.5",
169+
"GPU memory use (%)": "12.5",
170+
"PCI Bus": "FOOBAR1"
171+
},
172+
"card1": {
173+
"GPU use (%)": "12.5",
174+
"GPU memory use (%)": "64.0",
175+
"PCI Bus": "FOOBAR2"
177176
}
178-
print(json.dumps(data))
179-
""",
177+
}
178+
print(json.dumps(data))
179+
""",
180180
):
181181
hq_env.start_worker(args=["--overview-interval", "10ms", "--resource", "gpus/amd=[0]"])
182182
wait_for_worker_state(hq_env, 1, "RUNNING")

0 commit comments

Comments
 (0)
Please sign in to comment.