diff --git a/server/Cargo.toml b/server/Cargo.toml index f5b237a0..8e8b7a9d 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -14,6 +14,7 @@ ya-relay-core = { workspace = true } actix-rt = "2.7" serde = { version = "1.0.192", features = ["derive"] } rmp-serde = "1" +serde_json = "1.0.128" serde_bytes = "0.11.12" anyhow = "1.0" chrono = "0.4" @@ -30,6 +31,7 @@ lazy_static = "1.4.0" dashmap = "4.0.2" tokio = { version = "1", features = ["net", "sync", "macros", "time", "rt", "io-util"] } +tokio-stream = "0.1.16" tokio-util = { version = "0.7", features = ["codec"] } hex = "0.4.3" parking_lot = "0.12.1" @@ -38,7 +40,10 @@ quick_cache = "0.4.0" tiny-keccak = "2" actix-web = { version = "4.4.0", default-features = false, features = ["macros"] } +actix-web-lab = "0.22.0" cfg-if = "1.0.0" +futures-util = "0.3.30" +actix-cors = "0.5" # Add this line [target."cfg(unix)".dependencies] libc = "0.2" @@ -52,7 +57,7 @@ features = [ [dev-dependencies] tokio = { version = "1", features = ["rt-multi-thread"] } -tokio-stream = "0.1.8" +tokio-stream = "0.1.16" test-case = "3.1" ethsign = "0.8.0" test-log = "0.2.13" diff --git a/server/src/bin/ya-relay-server.rs b/server/src/bin/ya-relay-server.rs index 288abd09..19fd53df 100644 --- a/server/src/bin/ya-relay-server.rs +++ b/server/src/bin/ya-relay-server.rs @@ -1,16 +1,33 @@ +use actix_web::get; +use actix_web::web; +use actix_web::Responder; +use actix_web_lab::sse::Sse; use clap::Parser; +use futures_util::StreamExt; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::convert::Infallible; use std::future; use std::net::SocketAddr; use std::sync::Arc; - -use actix_web::{get, web, Responder}; -use serde::{Deserialize, Serialize}; - +use std::time::Duration; use ya_relay_core::NodeId; use ya_relay_server::metrics::register_metrics; +use ya_relay_server::sse::SseClients; use ya_relay_server::{AddrStatus, Config, Selector, SessionManager}; +#[get("/sse")] +async fn new_sse_client(sse_clients: web::Data>) -> impl Responder { + // Add a new client and get the receiver stream + let sse_stream = sse_clients.add_client().await; + + // Map the `Event` stream to `Result` + let result_stream = sse_stream.map(|event| Ok::<_, Infallible>(event)); + + // Return the SSE stream to the client + Sse::from_stream(result_stream).with_keep_alive(Duration::from_secs(10)) +} + #[get("/sessions")] async fn sessions_list(sm: web::Data>) -> impl Responder { format!("sessions: {}", sm.num_sessions()) @@ -82,12 +99,16 @@ async fn main() -> anyhow::Result<()> { let args = Config::parse(); + let sse_clients = Arc::new(SseClients::new()); + let handle = register_metrics(); - let server = ya_relay_server::run(&args).await?; + let server = ya_relay_server::run(&args, sse_clients.clone()).await?; let sessions = web::Data::new(server.sessions()); + let sse_clients_clone = web::Data::new(sse_clients.clone()); + let web_server = actix_web::HttpServer::new(move || { use actix_web::*; @@ -95,8 +116,10 @@ async fn main() -> anyhow::Result<()> { App::new() .app_data(sessions.clone()) + .app_data(sse_clients_clone.clone()) .service(nodes_list_prefix) .service(sessions_list) + .service(new_sse_client) .route("/", web::get().to(move || future::ready(handle.render()))) }) .workers(1) diff --git a/server/src/lib.rs b/server/src/lib.rs index 2736344b..6b154500 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -2,6 +2,7 @@ mod config; pub mod metrics; mod server; +pub mod sse; mod state; #[cfg(feature = "test-utils")] pub mod testing; diff --git a/server/src/server.rs b/server/src/server.rs index 82e6dc28..3df9e381 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -1,12 +1,12 @@ +use crate::AddrStatus; +use metrics::Counter; +use quick_cache::sync::Cache; +use rand::{thread_rng, Rng}; use std::net::SocketAddr; use std::path::{Path, PathBuf}; use std::rc::Rc; use std::sync::Arc; use std::time::{Duration, Instant}; - -use metrics::Counter; -use quick_cache::sync::Cache; -use rand::{thread_rng, Rng}; use tokio_util::codec::Decoder; use ya_relay_core::challenge; @@ -41,6 +41,7 @@ mod state_decoder; mod ip_checker; +use crate::sse::{NodeInfo, SseClients, SseMessage}; pub use ip_checker::IpCheckerConfig; pub use session::SessionHandlerConfig; @@ -109,7 +110,7 @@ impl Drop for Server { } } -pub async fn run(config: &Config) -> anyhow::Result { +pub async fn run(config: &Config, sse_clients: Arc) -> anyhow::Result { let bind_addr: SocketAddr = config.server.address; let slot_manager = config @@ -159,10 +160,12 @@ pub async fn run(config: &Config) -> anyhow::Result { let server = { let session_manager = session_manager.clone(); let slot_manager = slot_manager.clone(); + let sse_clients = sse_clients.clone(); UdpServerBuilder::new(move |reply: Rc| { let session_manager = session_manager.clone(); let slot_manager = slot_manager.clone(); + let sse_clients = sse_clients.clone(); let checker_ip = reply.local_addr()?.ip(); let session_handler = session::SessionHandler::new(&session_manager, &session_handler_config); @@ -222,7 +225,37 @@ pub async fn run(config: &Config) -> anyhow::Result { match request { request::Kind::Session(session) => { - session_handler.handle(&clock, src, request_id, session_id, &session) + let response = session_handler.handle(&clock, src, request_id, session_id, &session); + if let Some((_, Packet { session_id, .. })) = &response { + if let Ok(session_id) = SessionId::try_from(session_id.clone()) { + let sse_clients_clone = Arc::clone(&sse_clients); + let session_manager_clone = session_manager.clone(); + let broadcast_future = async move { + // Add a small delay to allow the session to be fully registered + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + if let Some(session_ref) = session_manager_clone.session(&session_id) { + let msg = SseMessage { + status: "connected".to_string(), + node: NodeInfo { + id: session_ref.node_id.to_string(), + peer: session_ref.peer.to_string(), + seen: format!("{:?}", session_ref.ts.age()), + addr_status: match &*session_ref.addr_status.lock() { + AddrStatus::Unknown => "Unknown".to_owned(), + AddrStatus::Pending(ts) => format!("pending({:?})", ts.elapsed()), + AddrStatus::Invalid(ts) => format!("invalid({:?})", ts.elapsed()), + AddrStatus::Valid(ts) => format!("valid({:?})", ts.elapsed()), + }, + }, + }; + let json = serde_json::to_string(&msg).unwrap(); + sse_clients_clone.broadcast(&json).await; + } + }; + tokio::spawn(broadcast_future); + } + } + response } request::Kind::Ping(_) => { session_id.and_then(|session_id| handle_ping(&clock, src, request_id, session_id, &session_manager)) @@ -257,11 +290,33 @@ pub async fn run(config: &Config) -> anyhow::Result { })) })) }) => { - let session_id: Option = session_id.try_into().ok(); - if let Some(session_id) = session_id { - session_manager.remove_session(&session_id); - log::debug!(target: "request:disconnect", "[{src}] session {session_id} disconnected"); - } + let session_id: Option = session_id.try_into().ok(); + if let Some(session_id) = session_id { + if let Some(session_ref) = session_manager.session(&session_id) { + let sse_clients_clone = Arc::clone(&sse_clients); + let broadcast_future = async move { + let msg = SseMessage { + status: "disconnected".to_string(), + node: NodeInfo { + id: session_ref.node_id.to_string(), + peer: session_ref.peer.to_string(), + seen: format!("{:?}", session_ref.ts.age()), + addr_status: match &*session_ref.addr_status.lock() { + AddrStatus::Unknown => "Unknown".to_owned(), + AddrStatus::Pending(ts) => format!("pending({:?})", ts.elapsed()), + AddrStatus::Invalid(ts) => format!("invalid({:?})", ts.elapsed()), + AddrStatus::Valid(ts) => format!("valid({:?})", ts.elapsed()), + }, + }, + }; + let json = serde_json::to_string(&msg).unwrap(); + sse_clients_clone.broadcast(&json).await; + }; + tokio::spawn(broadcast_future); + } + session_manager.remove_session(&session_id); + log::debug!(target: "request:disconnect", "[{src}] session {session_id} disconnected"); + } None } PacketKind::Packet(Packet { session_id: _, kind: Some(packet::Kind::Control(Control { kind: Some(control::Kind::ResumeForwarding(_)) })) }) => { diff --git a/server/src/sse.rs b/server/src/sse.rs new file mode 100644 index 00000000..7bf27193 --- /dev/null +++ b/server/src/sse.rs @@ -0,0 +1,56 @@ +use actix_web_lab::sse; +use log::info; +use serde::Serialize; +use std::sync::{Arc, Mutex}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; + +#[derive(Serialize)] +pub struct SseMessage { + pub(crate) status: String, + pub(crate) node: NodeInfo, +} + +#[derive(Serialize)] +pub struct NodeInfo { + pub(crate) id: String, + pub(crate) peer: String, + pub(crate) seen: String, + #[serde(rename = "addrStatus")] + pub(crate) addr_status: String, +} + +#[derive(Debug, Clone, Default)] +pub struct SseClients { + clients: Arc>>>, +} + +impl SseClients { + pub fn new() -> Self { + SseClients { + clients: Arc::new(Mutex::new(Vec::new())), + } + } + + pub async fn add_client(&self) -> ReceiverStream { + let (tx, rx) = mpsc::channel(10); + + // Send a "connected" message to the new client + tx.send(sse::Data::new("connected").into()).await.unwrap(); + + // Add the sender to the list of clients + self.clients.lock().unwrap().push(tx); + info!("New SSE connection established"); + + // Return the receiver stream to be used for SSE + ReceiverStream::new(rx) + } + + pub async fn broadcast(&self, msg: &str) { + let clients = self.clients.lock().unwrap().clone(); + let send_futures = clients + .iter() + .map(|client| client.send(sse::Data::new(msg).into())); + let _ = futures_util::future::join_all(send_futures).await; + } +} diff --git a/server/src/testing/server.rs b/server/src/testing/server.rs index f83c2032..ad1f9a5e 100644 --- a/server/src/testing/server.rs +++ b/server/src/testing/server.rs @@ -1,10 +1,12 @@ use crate::config::Config; use crate::server::{IpCheckerConfig, Server, ServerConfig, SessionHandlerConfig}; +use crate::sse::SseClients; use crate::SessionManagerConfig; use futures::future::LocalBoxFuture; use futures::FutureExt; use std::rc::Rc; +use std::sync::Arc; use std::{future, net}; use tokio::time::Duration; use ya_relay_core::testing::TestServerWrapper; @@ -34,7 +36,8 @@ pub async fn init_test_server() -> anyhow::Result { } pub async fn init_test_server_with_config(config: Config) -> anyhow::Result { - let server = Rc::new(crate::run(&config).await?); + let sse_clients = Arc::new(SseClients::new()); + let server = Rc::new(crate::run(&config, sse_clients.clone()).await?); Ok(ServerWrapper { server }) }