Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Respect event boundary in dashboard #846

Merged
merged 1 commit into from
Mar 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 4 additions & 18 deletions crates/hyperqueue/src/bin/hq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,26 +298,12 @@ async fn command_dashboard_start(
gsettings: &GlobalSettings,
opts: hyperqueue::common::cli::DashboardOpts,
) -> anyhow::Result<()> {
use hyperqueue::common::cli::DashboardCommand;
use hyperqueue::dashboard::preload_dashboard_events;
use hyperqueue::dashboard::start_ui_loop;
use hyperqueue::server::event::Event;
use hyperqueue::server::event::journal::JournalReader;

match opts.subcmd.unwrap_or_default() {
DashboardCommand::Replay { journal } => {
println!("Loading journal {}", journal.display());
let mut journal = JournalReader::open(&journal)?;
let events: Vec<Event> = journal.collect::<Result<_, _>>()?;
println!("Loaded {} events", events.len());

start_ui_loop(gsettings, Some(events)).await?;
}
DashboardCommand::Stream => {
start_ui_loop(gsettings, None).await?;
}
}

Ok(())
let cmd = opts.subcmd.unwrap_or_default();
let events = preload_dashboard_events(gsettings, cmd).await?;
start_ui_loop(events).await
}

fn make_global_settings(opts: CommonOpts) -> GlobalSettings {
Expand Down
14 changes: 3 additions & 11 deletions crates/hyperqueue/src/dashboard/data/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
use crate::server::event::Event;
use crate::transfer::connection::ClientSession;
use crate::transfer::messages::{FromClientMessage, StreamEvents, ToClientMessage};
use crate::transfer::messages::ToClientMessage;
use std::time::Duration;
use tokio::sync::mpsc::Sender;

/// Create an async process that fetches new events from the server and sends them to `sender`.
/// We assume that `session` has already been configured to receive streamed events from the server.
pub async fn create_data_fetch_process(
mut session: ClientSession,
sender: Sender<Vec<Event>>,
) -> anyhow::Result<()> {
session
.connection()
.send(FromClientMessage::StreamEvents(StreamEvents {
live_events: true,
}))
.await?;

const CAPACITY: usize = 1024;

let mut events = Vec::with_capacity(CAPACITY);
Expand Down Expand Up @@ -43,9 +38,6 @@ pub async fn create_data_fetch_process(
events = Vec::with_capacity(CAPACITY);
}
},
ToClientMessage::EventLiveBoundary => {
/* Do nothing */
}
_ => {
return Err(anyhow::anyhow!("Dashboard received unexpected message {message:?}"));
}
Expand Down
74 changes: 74 additions & 0 deletions crates/hyperqueue/src/dashboard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,81 @@ mod utils;

pub use ui_loop::start_ui_loop;

use crate::client::globalsettings::GlobalSettings;
use crate::common::cli::DashboardCommand;
use crate::server::bootstrap::get_client_session;
use crate::server::event::Event;
use crate::server::event::journal::JournalReader;
use crate::transfer::connection::ClientSession;
use crate::transfer::messages::{FromClientMessage, StreamEvents, ToClientMessage};
use std::time::Duration;

// The time range in which the live timeline is display ([now() - duration, now()])
const DEFAULT_LIVE_DURATION: Duration = Duration::from_secs(60 * 10);

pub enum PreloadedEvents {
FromJournal(Vec<Event>),
FromServer {
events: Vec<Event>,
connection: ClientSession,
},
}

/// Preload initial events for the dashboard.
/// Either loads them from a journal on disk, or streams past events from the server,
/// until a boundary is hit.
pub async fn preload_dashboard_events(
gsettings: &GlobalSettings,
cmd: DashboardCommand,
) -> anyhow::Result<PreloadedEvents> {
match cmd {
DashboardCommand::Stream => {
let mut session = get_client_session(gsettings.server_directory()).await?;
let connection = session.connection();
let mut events = Vec::new();

// Start streaming events
connection
.send(FromClientMessage::StreamEvents(StreamEvents {
live_events: true,
}))
.await?;

println!("Streaming events from the server");
while let Some(message) = connection.receive().await {
let message = message?;
match message {
ToClientMessage::Event(event) => {
events.push(event);
}
ToClientMessage::EventLiveBoundary => {
// All past events have been received, finish preloading
println!("Loaded {} events", events.len());
return Ok(PreloadedEvents::FromServer {
events,
connection: session,
});
}
_ => {
return Err(anyhow::anyhow!(
"Dashboard received unexpected message {message:?}"
));
}
};
}
Err(anyhow::anyhow!("Server connection ended unexpectedly"))
}
DashboardCommand::Replay { journal } => {
// In theory, we could also load the events from the server using `live_events: false`,
// but loading it from the journal file directly allows us to run dashboard post-mortem,
// without the server having to run.
// It also means that the journal file has to be accessible from the client though,
// but that should usually be the case.
println!("Loading journal {}", journal.display());
let mut journal = JournalReader::open(&journal)?;
let events: Vec<Event> = journal.collect::<Result<_, _>>()?;
println!("Loaded {} events", events.len());
Ok(PreloadedEvents::FromJournal(events))
}
}
}
39 changes: 19 additions & 20 deletions crates/hyperqueue/src/dashboard/ui_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,47 +7,46 @@ use std::pin::Pin;
use std::time::SystemTime;
use tokio::time::Duration;

use crate::client::globalsettings::GlobalSettings;
use crate::dashboard::DEFAULT_LIVE_DURATION;
use crate::dashboard::data::DashboardData;
use crate::dashboard::data::{TimeMode, TimeRange, create_data_fetch_process};
use crate::dashboard::ui::screens::root_screen::RootScreen;
use crate::dashboard::ui::terminal::initialize_terminal;
use crate::server::bootstrap::get_client_session;
use crate::server::event::Event;
use crate::dashboard::{DEFAULT_LIVE_DURATION, PreloadedEvents};

/// Starts the dashboard UI with a keyboard listener and tick provider
pub async fn start_ui_loop(
gsettings: &GlobalSettings,
events: Option<Vec<Event>>,
) -> anyhow::Result<()> {
let stream = events.is_none();
pub async fn start_ui_loop(events: PreloadedEvents) -> anyhow::Result<()> {
let time_mode = match &events {
Some(events) => {
PreloadedEvents::FromJournal(events) => {
let end = match events.last() {
Some(event) => event.time.into(),
None => SystemTime::now(),
};
TimeMode::Fixed(TimeRange::new(end - Duration::from_secs(60 * 5), end))
}
None => TimeMode::Live(DEFAULT_LIVE_DURATION),
PreloadedEvents::FromServer { .. } => TimeMode::Live(DEFAULT_LIVE_DURATION),
};

let stream = match &events {
PreloadedEvents::FromJournal(_) => false,
PreloadedEvents::FromServer { .. } => true,
};
let mut dashboard_data = DashboardData::new(time_mode, stream);
if let Some(events) = events {
dashboard_data.push_new_events(events);
}
let (events, session) = match events {
PreloadedEvents::FromJournal(events) => (events, None),
PreloadedEvents::FromServer { events, connection } => (events, Some(connection)),
};
dashboard_data.push_new_events(events);

let mut root_screen = RootScreen::default();

let (tx, mut rx) = tokio::sync::mpsc::channel(1024);

let mut data_fetch_process: Pin<Box<dyn Future<Output = anyhow::Result<()>>>> = if stream {
let connection = get_client_session(gsettings.server_directory()).await?;
Box::pin(create_data_fetch_process(connection, tx))
} else {
Box::pin(std::future::pending())
};
let mut data_fetch_process: Pin<Box<dyn Future<Output = anyhow::Result<()>>>> =
if let Some(session) = session {
Box::pin(create_data_fetch_process(session, tx))
} else {
Box::pin(std::future::pending())
};

let mut terminal = initialize_terminal()?;
let mut reader = event::EventStream::new();
Expand Down
Loading