Skip to content

Commit ddbeb7d

Browse files
committedMar 14, 2025·
Respect event boundary in dashboard
The dashboard now explicitly preload past events and only displays GUI once it starts receiving new data.
1 parent c10bf72 commit ddbeb7d

File tree

4 files changed

+100
-49
lines changed

4 files changed

+100
-49
lines changed
 

‎crates/hyperqueue/src/bin/hq.rs

+4-18
Original file line numberDiff line numberDiff line change
@@ -298,26 +298,12 @@ async fn command_dashboard_start(
298298
gsettings: &GlobalSettings,
299299
opts: hyperqueue::common::cli::DashboardOpts,
300300
) -> anyhow::Result<()> {
301-
use hyperqueue::common::cli::DashboardCommand;
301+
use hyperqueue::dashboard::preload_dashboard_events;
302302
use hyperqueue::dashboard::start_ui_loop;
303-
use hyperqueue::server::event::Event;
304-
use hyperqueue::server::event::journal::JournalReader;
305303

306-
match opts.subcmd.unwrap_or_default() {
307-
DashboardCommand::Replay { journal } => {
308-
println!("Loading journal {}", journal.display());
309-
let mut journal = JournalReader::open(&journal)?;
310-
let events: Vec<Event> = journal.collect::<Result<_, _>>()?;
311-
println!("Loaded {} events", events.len());
312-
313-
start_ui_loop(gsettings, Some(events)).await?;
314-
}
315-
DashboardCommand::Stream => {
316-
start_ui_loop(gsettings, None).await?;
317-
}
318-
}
319-
320-
Ok(())
304+
let cmd = opts.subcmd.unwrap_or_default();
305+
let events = preload_dashboard_events(gsettings, cmd).await?;
306+
start_ui_loop(events).await
321307
}
322308

323309
fn make_global_settings(opts: CommonOpts) -> GlobalSettings {

‎crates/hyperqueue/src/dashboard/data/fetch.rs

+3-11
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,15 @@
11
use crate::server::event::Event;
22
use crate::transfer::connection::ClientSession;
3-
use crate::transfer::messages::{FromClientMessage, StreamEvents, ToClientMessage};
3+
use crate::transfer::messages::ToClientMessage;
44
use std::time::Duration;
55
use tokio::sync::mpsc::Sender;
66

7+
/// Create an async process that fetches new events from the server and sends them to `sender`.
8+
/// We assume that `session` has already been configured to receive streamed events from the server.
79
pub async fn create_data_fetch_process(
810
mut session: ClientSession,
911
sender: Sender<Vec<Event>>,
1012
) -> anyhow::Result<()> {
11-
session
12-
.connection()
13-
.send(FromClientMessage::StreamEvents(StreamEvents {
14-
live_events: true,
15-
}))
16-
.await?;
17-
1813
const CAPACITY: usize = 1024;
1914

2015
let mut events = Vec::with_capacity(CAPACITY);
@@ -43,9 +38,6 @@ pub async fn create_data_fetch_process(
4338
events = Vec::with_capacity(CAPACITY);
4439
}
4540
},
46-
ToClientMessage::EventLiveBoundary => {
47-
/* Do nothing */
48-
}
4941
_ => {
5042
return Err(anyhow::anyhow!("Dashboard received unexpected message {message:?}"));
5143
}

‎crates/hyperqueue/src/dashboard/mod.rs

+74
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,81 @@ mod utils;
55

66
pub use ui_loop::start_ui_loop;
77

8+
use crate::client::globalsettings::GlobalSettings;
9+
use crate::common::cli::DashboardCommand;
10+
use crate::server::bootstrap::get_client_session;
11+
use crate::server::event::Event;
12+
use crate::server::event::journal::JournalReader;
13+
use crate::transfer::connection::ClientSession;
14+
use crate::transfer::messages::{FromClientMessage, StreamEvents, ToClientMessage};
815
use std::time::Duration;
916

1017
// The time range in which the live timeline is display ([now() - duration, now()])
1118
const DEFAULT_LIVE_DURATION: Duration = Duration::from_secs(60 * 10);
19+
20+
pub enum PreloadedEvents {
21+
FromJournal(Vec<Event>),
22+
FromServer {
23+
events: Vec<Event>,
24+
connection: ClientSession,
25+
},
26+
}
27+
28+
/// Preload initial events for the dashboard.
29+
/// Either loads them from a journal on disk, or streams past events from the server,
30+
/// until a boundary is hit.
31+
pub async fn preload_dashboard_events(
32+
gsettings: &GlobalSettings,
33+
cmd: DashboardCommand,
34+
) -> anyhow::Result<PreloadedEvents> {
35+
match cmd {
36+
DashboardCommand::Stream => {
37+
let mut session = get_client_session(gsettings.server_directory()).await?;
38+
let connection = session.connection();
39+
let mut events = Vec::new();
40+
41+
// Start streaming events
42+
connection
43+
.send(FromClientMessage::StreamEvents(StreamEvents {
44+
live_events: true,
45+
}))
46+
.await?;
47+
48+
println!("Streaming events from the server");
49+
while let Some(message) = connection.receive().await {
50+
let message = message?;
51+
match message {
52+
ToClientMessage::Event(event) => {
53+
events.push(event);
54+
}
55+
ToClientMessage::EventLiveBoundary => {
56+
// All past events have been received, finish preloading
57+
println!("Loaded {} events", events.len());
58+
return Ok(PreloadedEvents::FromServer {
59+
events,
60+
connection: session,
61+
});
62+
}
63+
_ => {
64+
return Err(anyhow::anyhow!(
65+
"Dashboard received unexpected message {message:?}"
66+
));
67+
}
68+
};
69+
}
70+
Err(anyhow::anyhow!("Server connection ended unexpectedly"))
71+
}
72+
DashboardCommand::Replay { journal } => {
73+
// In theory, we could also load the events from the server using `live_events: false`,
74+
// but loading it from the journal file directly allows us to run dashboard post-mortem,
75+
// without the server having to run.
76+
// It also means that the journal file has to be accessible from the client though,
77+
// but that should usually be the case.
78+
println!("Loading journal {}", journal.display());
79+
let mut journal = JournalReader::open(&journal)?;
80+
let events: Vec<Event> = journal.collect::<Result<_, _>>()?;
81+
println!("Loaded {} events", events.len());
82+
Ok(PreloadedEvents::FromJournal(events))
83+
}
84+
}
85+
}

‎crates/hyperqueue/src/dashboard/ui_loop.rs

+19-20
Original file line numberDiff line numberDiff line change
@@ -7,47 +7,46 @@ use std::pin::Pin;
77
use std::time::SystemTime;
88
use tokio::time::Duration;
99

10-
use crate::client::globalsettings::GlobalSettings;
11-
use crate::dashboard::DEFAULT_LIVE_DURATION;
1210
use crate::dashboard::data::DashboardData;
1311
use crate::dashboard::data::{TimeMode, TimeRange, create_data_fetch_process};
1412
use crate::dashboard::ui::screens::root_screen::RootScreen;
1513
use crate::dashboard::ui::terminal::initialize_terminal;
16-
use crate::server::bootstrap::get_client_session;
17-
use crate::server::event::Event;
14+
use crate::dashboard::{DEFAULT_LIVE_DURATION, PreloadedEvents};
1815

1916
/// Starts the dashboard UI with a keyboard listener and tick provider
20-
pub async fn start_ui_loop(
21-
gsettings: &GlobalSettings,
22-
events: Option<Vec<Event>>,
23-
) -> anyhow::Result<()> {
24-
let stream = events.is_none();
17+
pub async fn start_ui_loop(events: PreloadedEvents) -> anyhow::Result<()> {
2518
let time_mode = match &events {
26-
Some(events) => {
19+
PreloadedEvents::FromJournal(events) => {
2720
let end = match events.last() {
2821
Some(event) => event.time.into(),
2922
None => SystemTime::now(),
3023
};
3124
TimeMode::Fixed(TimeRange::new(end - Duration::from_secs(60 * 5), end))
3225
}
33-
None => TimeMode::Live(DEFAULT_LIVE_DURATION),
26+
PreloadedEvents::FromServer { .. } => TimeMode::Live(DEFAULT_LIVE_DURATION),
3427
};
3528

29+
let stream = match &events {
30+
PreloadedEvents::FromJournal(_) => false,
31+
PreloadedEvents::FromServer { .. } => true,
32+
};
3633
let mut dashboard_data = DashboardData::new(time_mode, stream);
37-
if let Some(events) = events {
38-
dashboard_data.push_new_events(events);
39-
}
34+
let (events, session) = match events {
35+
PreloadedEvents::FromJournal(events) => (events, None),
36+
PreloadedEvents::FromServer { events, connection } => (events, Some(connection)),
37+
};
38+
dashboard_data.push_new_events(events);
4039

4140
let mut root_screen = RootScreen::default();
4241

4342
let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
4443

45-
let mut data_fetch_process: Pin<Box<dyn Future<Output = anyhow::Result<()>>>> = if stream {
46-
let connection = get_client_session(gsettings.server_directory()).await?;
47-
Box::pin(create_data_fetch_process(connection, tx))
48-
} else {
49-
Box::pin(std::future::pending())
50-
};
44+
let mut data_fetch_process: Pin<Box<dyn Future<Output = anyhow::Result<()>>>> =
45+
if let Some(session) = session {
46+
Box::pin(create_data_fetch_process(session, tx))
47+
} else {
48+
Box::pin(std::future::pending())
49+
};
5150

5251
let mut terminal = initialize_terminal()?;
5352
let mut reader = event::EventStream::new();

0 commit comments

Comments
 (0)
Please sign in to comment.