Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add server side events to notify the clients #346

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ ya-relay-core = { workspace = true }
actix-rt = "2.7"
serde = { version = "1.0.192", features = ["derive"] }
rmp-serde = "1"
serde_json = "1.0.128"
serde_bytes = "0.11.12"
anyhow = "1.0"
chrono = "0.4"
Expand All @@ -30,6 +31,7 @@ lazy_static = "1.4.0"
dashmap = "4.0.2"

tokio = { version = "1", features = ["net", "sync", "macros", "time", "rt", "io-util"] }
tokio-stream = "0.1.16"
tokio-util = { version = "0.7", features = ["codec"] }
hex = "0.4.3"
parking_lot = "0.12.1"
Expand All @@ -38,7 +40,10 @@ quick_cache = "0.4.0"

tiny-keccak = "2"
actix-web = { version = "4.4.0", default-features = false, features = ["macros"] }
actix-web-lab = "0.22.0"
cfg-if = "1.0.0"
futures-util = "0.3.30"
actix-cors = "0.5" # Add this line

[target."cfg(unix)".dependencies]
libc = "0.2"
Expand All @@ -52,7 +57,7 @@ features = [

[dev-dependencies]
tokio = { version = "1", features = ["rt-multi-thread"] }
tokio-stream = "0.1.8"
tokio-stream = "0.1.16"
test-case = "3.1"
ethsign = "0.8.0"
test-log = "0.2.13"
Expand Down
43 changes: 38 additions & 5 deletions server/src/bin/ya-relay-server.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,45 @@
use clap::Parser;
use std::collections::HashMap;
use std::convert::Infallible;
use std::future;
use std::net::SocketAddr;
use std::sync::Arc;

use actix_web::{get, web, Responder};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::mpsc::channel;
use actix_web::{get, web, HttpResponse, Responder};
use actix_web::web::Data;
use actix_web_lab::extract::Path;
use actix_web_lab::sse;
use actix_web_lab::sse::Sse;
use futures_util::StreamExt;
use log::{info, log};
use serde::{Deserialize, Serialize};

use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use ya_relay_core::NodeId;
use ya_relay_server::metrics::register_metrics;
use ya_relay_server::{AddrStatus, Config, Selector, SessionManager};
use ya_relay_server::sse::SseClients;
// Shared state to manage all the sse clients


#[get("/sse")]
async fn new_sse_client(sse_clients: web::Data<Arc<SseClients>>) -> impl Responder {
// Add a new client and get the receiver stream
let sse_stream = sse_clients.add_client().await;

// Map the `Event` stream to `Result<Event, Infallible>`
let result_stream = sse_stream.map(|event| Ok::<_, Infallible>(event));

// Return the SSE stream to the client
Sse::from_stream(result_stream).with_keep_alive(Duration::from_secs(10))
}


#[get("/sessions")]
async fn sessions_list(sm: web::Data<Arc<SessionManager>>) -> impl Responder {
format!("sessions: {}", sm.num_sessions())

}

#[derive(Deserialize)]
Expand Down Expand Up @@ -68,6 +94,7 @@ async fn nodes_list_prefix(
Ok(web::Json(nodes))
}


#[actix_rt::main]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
Expand All @@ -82,21 +109,27 @@ async fn main() -> anyhow::Result<()> {

let args = Config::parse();

let sse_clients = Arc::new(SseClients::new());

let handle = register_metrics();

let server = ya_relay_server::run(&args).await?;
let server = ya_relay_server::run(&args, sse_clients.clone()).await?;

let sessions = web::Data::new(server.sessions());

let sse_clients_clone = web::Data::new(sse_clients.clone());

let web_server = actix_web::HttpServer::new(move || {
use actix_web::*;

let handle = handle.clone();

App::new()
.app_data(sessions.clone())
.app_data(sse_clients_clone.clone())
.service(nodes_list_prefix)
.service(sessions_list)
.service(new_sse_client)
.route("/", web::get().to(move || future::ready(handle.render())))
})
.workers(1)
Expand Down
1 change: 1 addition & 0 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod state;
#[cfg(feature = "test-utils")]
pub mod testing;
pub mod udp_server;
pub mod sse;

pub use state::session_manager::*;

Expand Down
72 changes: 64 additions & 8 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::sync::Arc;
use std::time::{Duration, Instant};

use crate::AddrStatus;
use metrics::Counter;
use quick_cache::sync::Cache;
use rand::{thread_rng, Rng};
Expand Down Expand Up @@ -43,6 +43,7 @@ mod ip_checker;

pub use ip_checker::IpCheckerConfig;
pub use session::SessionHandlerConfig;
use crate::sse::{SseClients, SseMessage, NodeInfo};

#[derive(clap::Args)]
/// Ip Checker configuration args
Expand Down Expand Up @@ -109,7 +110,7 @@ impl Drop for Server {
}
}

pub async fn run(config: &Config) -> anyhow::Result<Server> {
pub async fn run(config: &Config, sse_clients: Arc<SseClients>) -> anyhow::Result<Server> {
let bind_addr: SocketAddr = config.server.address;

let slot_manager = config
Expand Down Expand Up @@ -159,10 +160,12 @@ pub async fn run(config: &Config) -> anyhow::Result<Server> {
let server = {
let session_manager = session_manager.clone();
let slot_manager = slot_manager.clone();
let sse_clients = sse_clients.clone();

UdpServerBuilder::new(move |reply: Rc<UdpSocket>| {
let session_manager = session_manager.clone();
let slot_manager = slot_manager.clone();
let sse_clients = sse_clients.clone();
let checker_ip = reply.local_addr()?.ip();

let session_handler = session::SessionHandler::new(&session_manager, &session_handler_config);
Expand Down Expand Up @@ -222,7 +225,37 @@ pub async fn run(config: &Config) -> anyhow::Result<Server> {

match request {
request::Kind::Session(session) => {
session_handler.handle(&clock, src, request_id, session_id, &session)
let response = session_handler.handle(&clock, src, request_id, session_id, &session);
if let Some((_, Packet { session_id, .. })) = &response {
if let Ok(session_id) = SessionId::try_from(session_id.clone()) {
let sse_clients_clone = Arc::clone(&sse_clients);
let session_manager_clone = session_manager.clone();
let broadcast_future = async move {
// Add a small delay to allow the session to be fully registered
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
if let Some(session_ref) = session_manager_clone.session(&session_id) {
let msg = SseMessage {
status: "connected".to_string(),
node: NodeInfo {
id: session_ref.node_id.to_string(),
peer: session_ref.peer.to_string(),
seen: format!("{:?}", session_ref.ts.age()),
addr_status: match &*session_ref.addr_status.lock() {
AddrStatus::Unknown => "Unknown".to_owned(),
AddrStatus::Pending(ts) => format!("pending({:?})", ts.elapsed()),
AddrStatus::Invalid(ts) => format!("invalid({:?})", ts.elapsed()),
AddrStatus::Valid(ts) => format!("valid({:?})", ts.elapsed()),
},
},
};
let json = serde_json::to_string(&msg).unwrap();
sse_clients_clone.broadcast(&json).await;
}
};
tokio::spawn(broadcast_future);
}
}
response
}
request::Kind::Ping(_) => {
session_id.and_then(|session_id| handle_ping(&clock, src, request_id, session_id, &session_manager))
Expand Down Expand Up @@ -257,11 +290,33 @@ pub async fn run(config: &Config) -> anyhow::Result<Server> {
}))
}))
}) => {
let session_id: Option<SessionId> = session_id.try_into().ok();
if let Some(session_id) = session_id {
session_manager.remove_session(&session_id);
log::debug!(target: "request:disconnect", "[{src}] session {session_id} disconnected");
}
let session_id: Option<SessionId> = session_id.try_into().ok();
if let Some(session_id) = session_id {
if let Some(session_ref) = session_manager.session(&session_id) {
let sse_clients_clone = Arc::clone(&sse_clients);
let broadcast_future = async move {
let msg = SseMessage {
status: "disconnected".to_string(),
node: NodeInfo {
id: session_ref.node_id.to_string(),
peer: session_ref.peer.to_string(),
seen: format!("{:?}", session_ref.ts.age()),
addr_status: match &*session_ref.addr_status.lock() {
AddrStatus::Unknown => "Unknown".to_owned(),
AddrStatus::Pending(ts) => format!("pending({:?})", ts.elapsed()),
AddrStatus::Invalid(ts) => format!("invalid({:?})", ts.elapsed()),
AddrStatus::Valid(ts) => format!("valid({:?})", ts.elapsed()),
},
},
};
let json = serde_json::to_string(&msg).unwrap();
sse_clients_clone.broadcast(&json).await;
};
tokio::spawn(broadcast_future);
}
session_manager.remove_session(&session_id);
log::debug!(target: "request:disconnect", "[{src}] session {session_id} disconnected");
}
None
}
PacketKind::Packet(Packet { session_id: _, kind: Some(packet::Kind::Control(Control { kind: Some(control::Kind::ResumeForwarding(_)) })) }) => {
Expand Down Expand Up @@ -375,3 +430,4 @@ fn counter_ack(success: &Counter, error: &Counter) -> CompletionHandler {

Rc::new(CounterAck(success.clone(), error.clone()))
}

63 changes: 63 additions & 0 deletions server/src/sse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::sync::{Arc, Mutex};
use actix_web_lab::sse;
use actix_web_lab::sse::Event;

Check failure on line 3 in server/src/sse.rs

View workflow job for this annotation

GitHub Actions / Tests (windows-latest)

unused import: `actix_web_lab::sse::Event`

Check failure on line 3 in server/src/sse.rs

View workflow job for this annotation

GitHub Actions / Tests (ubuntu-latest)

unused import: `actix_web_lab::sse::Event`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix those unused imports. Changes look fine otherwise, I’ll approve once this is fixed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the build fails without those 3?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, the comment might’ve been placed on the wrong line. See the checks tab — it says which imports are unused (also not all code is formatted with cargo fmt, this too needs to be done)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code formatted and removed unused imports

use futures_util::SinkExt;

Check failure on line 4 in server/src/sse.rs

View workflow job for this annotation

GitHub Actions / Tests (windows-latest)

unused import: `futures_util::SinkExt`

Check failure on line 4 in server/src/sse.rs

View workflow job for this annotation

GitHub Actions / Tests (ubuntu-latest)

unused import: `futures_util::SinkExt`
use log::{info, log};

Check failure on line 5 in server/src/sse.rs

View workflow job for this annotation

GitHub Actions / Tests (windows-latest)

unused import: `log`

Check failure on line 5 in server/src/sse.rs

View workflow job for this annotation

GitHub Actions / Tests (ubuntu-latest)

unused import: `log`
use tokio::sync::mpsc;
use tokio::sync::mpsc::{channel, Receiver};

Check failure on line 7 in server/src/sse.rs

View workflow job for this annotation

GitHub Actions / Tests (windows-latest)

unused imports: `Receiver`, `channel`

Check failure on line 7 in server/src/sse.rs

View workflow job for this annotation

GitHub Actions / Tests (ubuntu-latest)

unused imports: `Receiver`, `channel`
use tokio_stream::wrappers::ReceiverStream;
use serde::Serialize;

#[derive(Serialize)]
pub struct SseMessage {
pub(crate) status: String,
pub(crate) node: NodeInfo,
}

#[derive(Serialize)]
pub struct NodeInfo {
pub(crate) id: String,
pub(crate) peer: String,
pub(crate) seen: String,
#[serde(rename = "addrStatus")]
pub(crate) addr_status: String,
}

#[derive(Debug, Clone, Default)]
pub struct SseClients{
clients: Arc<Mutex<Vec<mpsc::Sender<sse::Event>>>>
}

impl SseClients{
pub fn new() -> Self{
SseClients{
clients: Arc::new(Mutex::new(Vec::new())),
}
}



pub async fn add_client(&self) -> ReceiverStream<sse::Event> {
let (tx, rx) = mpsc::channel(10);

// Send a "connected" message to the new client
tx.send(sse::Data::new("connected").into()).await.unwrap();

// Add the sender to the list of clients
self.clients.lock().unwrap().push(tx);
info!("New SSE connection established");

// Return the receiver stream to be used for SSE
ReceiverStream::new(rx)
}

pub fn get_no_of_clients(&self) -> usize{
self.clients.lock().unwrap().len()
}

pub async fn broadcast(&self, msg:&str){
let clients = self.clients.lock().unwrap().clone();
let send_futures = clients.iter().map(|client| client.send(sse::Data::new(msg).into()));
let _ = futures_util::future::join_all(send_futures).await;
}
}
2 changes: 1 addition & 1 deletion server/src/testing/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
}

pub async fn init_test_server_with_config(config: Config) -> anyhow::Result<ServerWrapper> {
let server = Rc::new(crate::run(&config).await?);
let server = Rc::new(crate::run(&config, ).await?);

Check failure on line 37 in server/src/testing/server.rs

View workflow job for this annotation

GitHub Actions / Tests (windows-latest)

this function takes 2 arguments but 1 argument was supplied

Check failure on line 37 in server/src/testing/server.rs

View workflow job for this annotation

GitHub Actions / Tests (ubuntu-latest)

this function takes 2 arguments but 1 argument was supplied

Ok(ServerWrapper { server })
}
Expand Down
Loading