diff --git a/src/beaconer.rs b/src/beaconer.rs index 3e3a77c1..50716178 100644 --- a/src/beaconer.rs +++ b/src/beaconer.rs @@ -11,7 +11,7 @@ use futures::TryFutureExt; use helium_proto::services::poc_lora::{self, lora_stream_response_v1}; use http::Uri; use std::sync::Arc; -use time::{Duration, Instant}; +use time::{Duration, Instant, OffsetDateTime}; use tracing::{info, warn}; /// Message types that can be sent to `Beaconer`'s inbox. @@ -49,7 +49,7 @@ pub struct Beaconer { /// Beacon interval interval: Duration, // Time next beacon attempt is to be made - next_beacon_time: Instant, + next_beacon_time: Option, /// Last seen beacons last_seen: MessageCache>, /// Use for channel plan and FR parameters @@ -66,7 +66,11 @@ impl Beaconer { ) -> Self { let interval = Duration::seconds(settings.poc.interval as i64); let entropy_uri = settings.poc.entropy_uri.clone(); - let service = PocIotService::new(settings.poc.ingest_uri.clone(), settings.keypair.clone()); + let service = PocIotService::new( + "beaconer", + settings.poc.ingest_uri.clone(), + settings.keypair.clone(), + ); let reconnect = Reconnect::default(); let region_params = Arc::new(region_watcher::current_value(®ion_watch)); let disabled = settings.poc.disable; @@ -77,10 +81,7 @@ impl Beaconer { region_watch, interval, last_seen: MessageCache::new(15), - // Set a beacon at least an interval out... arrival of region_params - // will recalculate this time and no arrival of region_params will - // cause the beacon to not occur - next_beacon_time: Instant::now() + interval, + next_beacon_time: None, region_params, service, entropy_uri, @@ -97,15 +98,22 @@ impl Beaconer { "starting" ); + let mut next_beacon_instant = Instant::now() + self.interval; + loop { tokio::select! { _ = shutdown.clone() => { info!("shutting down"); return Ok(()) }, - _ = tokio::time::sleep_until(self.next_beacon_time.into_inner().into()) => { - self.handle_beacon_tick().await; - self.next_beacon_time += self.interval; + _ = tokio::time::sleep_until(next_beacon_instant.into_inner().into()) => { + // Check if beaconing is enabled and we have valid region params + if !self.disabled && self.region_params.check_valid().is_ok() { + self.handle_beacon_tick().await; + } else { + // otherwise sleep for another interval period + next_beacon_instant = Instant::now() + self.interval; + } }, message = self.messages.recv() => match message { Some(Message::ReceivedBeacon(packet)) => self.handle_received_beacon(packet).await, @@ -115,31 +123,41 @@ impl Beaconer { }, region_change = self.region_watch.changed() => match region_change { Ok(()) => { - // Recalculate beacon time based on if this - // was the first time region params have - // arrived. Ensure that the next beacon - // time is not the full interval if this is - // not the first region change - // - // Do the first time check below before - // region params are assigned - let new_region_params = region_watcher::current_value(&self.region_watch); - - if self.region_params.params.is_empty() { - // Calculate a random but deterministic time offset - // for this hotspot's beacons - let offset = mk_beacon_offset(self.service.gateway_key(), self.interval); - // Get a delay for the first beacon based on the - // deterministic offset and the timestamp in the - // first region params. If there's an error - // converting the region param timestamp the - // calculated offset - let delay = mk_first_beacon_delay(new_region_params.timestamp, self.interval, offset).unwrap_or(offset); - info!(delay = delay.whole_seconds(), "first beacon"); - self.next_beacon_time = Instant::now() + delay; + // Recalculate a potential next beacon time based on the + // timestamp in the region parameters. + let new_region_params = Arc::new(region_watcher::current_value(&self.region_watch)); + // new region params can come back with the unknown + // region and empty region params. We don't accept + // anything but a valid region param before we set a + // beacon time + if new_region_params.check_valid().is_err() { + continue; + } + // If we can't parse the timestamp ignore the region change altogether + let Ok(new_timestamp) = OffsetDateTime::from_unix_timestamp(new_region_params.timestamp as i64) else { + continue; + }; + + // Calculate next beacon time + let new_beacon_time = mk_next_beacon_time( + new_timestamp, + self.next_beacon_time, + self.interval, + ); + + // Log next beacom time if changed + if Some(new_beacon_time) != self.next_beacon_time { + info!(beacon_time = %new_beacon_time, "next beacon time"); } - self.region_params = Arc::new(region_watcher::current_value(&self.region_watch)); - info!(region = RegionParams::to_string(&self.region_params), "region updated"); + self.next_beacon_time = Some(new_beacon_time); + next_beacon_instant = Instant::now() + (new_beacon_time - new_timestamp); + + // Reduce noise, log param change if they actually + // changed + if self.region_params != new_region_params { + info!(region = RegionParams::to_string(&new_region_params), "region updated"); + } + self.region_params = new_region_params; }, Err(_) => warn!("region watch disconnected"), }, @@ -220,11 +238,10 @@ impl Beaconer { } async fn handle_beacon_tick(&mut self) { - if self.disabled { - return; - } - - let last_beacon = Self::mk_beacon(self.region_params.clone(), self.entropy_uri.clone()) + // Need to clone to allow the subsequence borrow of self for send_beacon. + // The Arc around the region_params makes this a cheap clone + let region_params = self.region_params.clone(); + let last_beacon = Self::mk_beacon(®ion_params, self.entropy_uri.clone()) .inspect_err(|err| warn!(%err, "construct beacon")) .and_then(|beacon| self.send_beacon(beacon)) .map_ok_or_else(|_| None, Some) @@ -263,7 +280,7 @@ impl Beaconer { } pub async fn mk_beacon( - region_params: Arc, + region_params: &RegionParams, entropy_uri: Uri, ) -> Result { region_params.check_valid()?; @@ -272,7 +289,7 @@ impl Beaconer { let remote_entropy = entropy_service.get_entropy().await?; let local_entropy = beacon::Entropy::local()?; - let beacon = beacon::Beacon::new(remote_entropy, local_entropy, ®ion_params)?; + let beacon = beacon::Beacon::new(remote_entropy, local_entropy, region_params)?; Ok(beacon) } @@ -301,35 +318,36 @@ impl Beaconer { } } -/// Construct a random but deterministic offset for beaconing. This is based on -/// the public key as of this hotspot as the seed to a random number generator. -fn mk_beacon_offset(key: &PublicKey, interval: Duration) -> Duration { - use rand::{Rng, SeedableRng}; - use sha2::Digest; - - let hash = sha2::Sha256::digest(key.to_vec()); - let mut rng = rand::rngs::StdRng::from_seed(*hash.as_ref()); - Duration::seconds(rng.gen_range(0..interval.whole_seconds())) +fn random_duration(duration: Duration) -> Duration { + use rand::{rngs::OsRng, Rng}; + Duration::seconds(OsRng.gen_range(0..duration.whole_seconds())) } -/// Construct the first beacon time. This positions the given offset in the next -/// interval based wall clock segment. It returns the time to sleep until that -/// determinstic offset in the current or next segment. -fn mk_first_beacon_delay( - current_time: u64, +fn mk_next_beacon_time( + current_time: OffsetDateTime, + beacon_time: Option, interval: Duration, - offset: Duration, -) -> Option { - time::OffsetDateTime::from_unix_timestamp(current_time as i64) - .map(|now| { - let current_segment = duration_trunc(now, interval); - let mut first_time = current_segment + offset; - if first_time < now { - first_time += interval; +) -> OffsetDateTime { + let current_segment = duration_trunc(current_time, interval); + let next_segment = current_segment + interval; + match beacon_time { + // beacon time is in the future, just use it + Some(beacon_time) if beacon_time > current_time => beacon_time, + // beacon time is in the past, either in the current or previous segment + Some(beacon_time) => { + let beacon_segment = duration_trunc(beacon_time, interval); + if beacon_segment == current_segment { + // current segment: pick a time in the next segment + next_segment + random_duration(interval) + } else { + // previous segment: Pick a time in the remainder of this + // segment + current_time + random_duration(next_segment - current_time) } - first_time - now - }) - .ok() + } + // No next beacon time pick a time in the remainder of this segment + None => current_time + random_duration(next_segment - current_time), + } } /// Return a the given time truncated to the nearest duration. Based on @@ -385,54 +403,58 @@ mod test { } #[test] - fn test_beacon_offset() { - use super::mk_beacon_offset; - use std::str::FromStr; - - const PUBKEY_1: &str = "13WvV82S7QN3VMzMSieiGxvuaPKknMtf213E5JwPnboDkUfesKw"; - const PUBKEY_2: &str = "14HZVR4bdF9QMowYxWrumcFBNfWnhDdD5XXA5za1fWwUhHxxFS1"; - let pubkey_1 = crate::PublicKey::from_str(PUBKEY_1).expect("public key"); - let offset_1 = mk_beacon_offset(&pubkey_1, time::Duration::hours(6)); - // Same key and interval should always end up at the same offset - assert_eq!( - offset_1, - mk_beacon_offset(&pubkey_1, time::Duration::hours(6)) - ); - let pubkey_2 = crate::PublicKey::from_str(PUBKEY_2).expect("public key 2"); - let offset_2 = mk_beacon_offset(&pubkey_2, time::Duration::hours(6)); - assert_eq!( - offset_2, - mk_beacon_offset(&pubkey_2, time::Duration::hours(6)) - ); - // And two offsets based on different keys should not land at the same - // offset - assert_ne!(offset_1, offset_2); - } - - #[test] - fn test_beacon_first_time() { - use super::mk_first_beacon_delay; + fn test_beacon_time() { + use super::{duration_trunc, mk_next_beacon_time}; use time::{macros::datetime, Duration}; let interval = Duration::hours(6); - let early_offset = Duration::minutes(10); - let late_offset = early_offset + Duration::hours(5); - let current_time = datetime!(2023-09-01 09:20 UTC); - let early_sleep = - mk_first_beacon_delay(current_time.unix_timestamp() as u64, interval, early_offset) - .unwrap_or(early_offset); - let late_sleep = - mk_first_beacon_delay(current_time.unix_timestamp() as u64, interval, late_offset) - .unwrap_or(late_offset); - - assert_eq!( - datetime!(2023-09-01 12:10:00 UTC), - current_time + early_sleep - ); - assert_eq!( - datetime!(2023-09-01 11:10:00 UTC), - current_time + late_sleep - ); + + let current_segment = duration_trunc(current_time, interval); + let next_segment = current_segment + interval; + assert!(current_time < next_segment); + + // No beacon time, should pick a time in the current segment + { + let next_time = mk_next_beacon_time(current_time, None, interval); + assert!(next_time > current_time); + assert!(next_time < next_segment); + assert_eq!(current_segment, duration_trunc(next_time, interval)); + } + + // Beacon time in the future + { + // In this segment + let beacon_time = current_time + Duration::minutes(10); + assert_eq!(current_segment, duration_trunc(beacon_time, interval)); + let next_time = mk_next_beacon_time(current_time, Some(beacon_time), interval); + assert!(next_time > current_time); + assert!(next_time < next_segment); + assert_eq!(current_segment, duration_trunc(next_time, interval)); + // In the next segment + let beacon_time = next_segment + Duration::minutes(10); + let next_time = mk_next_beacon_time(current_time, Some(beacon_time), interval); + assert!(next_time > current_time); + assert!(next_time > next_segment); + assert_eq!(next_segment, duration_trunc(next_time, interval)); + } + + // Beacon time in the past + { + // This segment, should pick a time in the next segment + let beacon_time = current_segment + Duration::minutes(10); + assert!(beacon_time < current_time); + assert_eq!(current_segment, duration_trunc(beacon_time, interval)); + let next_time = mk_next_beacon_time(current_time, Some(beacon_time), interval); + assert!(next_time > current_time); + assert_eq!(next_segment, duration_trunc(next_time, interval)); + + // Previous segment, should pick a time in this segment + let beacon_time = current_segment - Duration::minutes(10); + let next_time = mk_next_beacon_time(current_time, Some(beacon_time), interval); + assert!(next_time > current_time); + assert!(next_time < next_segment); + assert_eq!(current_segment, duration_trunc(next_time, interval)); + } } } diff --git a/src/gateway.rs b/src/gateway.rs index 114b3316..7fe785e1 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -114,8 +114,13 @@ impl Gateway { }, region_change = self.region_watch.changed() => match region_change { Ok(()) => { - self.region_params = region_watcher::current_value(&self.region_watch); - info!(region = RegionParams::to_string(&self.region_params), "region updated"); + let new_region_params = region_watcher::current_value(&self.region_watch); + // Only log if region parameters have changed to avoid + // log noise + if self.region_params != new_region_params { + info!(region = RegionParams::to_string(&new_region_params), "region updated"); + } + self.region_params = new_region_params; } Err(_) => warn!("region watch disconnected") }, diff --git a/src/region_watcher.rs b/src/region_watcher.rs index 7025cad4..4ab49c65 100644 --- a/src/region_watcher.rs +++ b/src/region_watcher.rs @@ -78,9 +78,9 @@ impl RegionWatcher { Ok(None) => (), Ok(Some(remote_params)) => { self.request_retry = REGION_BACKOFF_RETRIES + 1; - if remote_params != *self.watch.borrow() { - _ = self.watch.send_replace(remote_params); - }; + // We do not check for a change in params here since we + // want to propagate the timestamp in the remote params + _ = self.watch.send_replace(remote_params); }, } } diff --git a/src/service/conduit.rs b/src/service/conduit.rs index e4bf13ea..5e250484 100644 --- a/src/service/conduit.rs +++ b/src/service/conduit.rs @@ -21,6 +21,7 @@ pub const CONDUIT_CAPACITY: usize = 50; #[derive(Debug)] pub struct ConduitService> { pub uri: Uri, + module: &'static str, session_keypair: Option>, conduit: Option>, keypair: Arc, @@ -84,9 +85,10 @@ impl Conduit { } impl> ConduitService { - pub fn new(uri: Uri, client: C, keypair: Arc) -> Self { + pub fn new(module: &'static str, uri: Uri, client: C, keypair: Arc) -> Self { Self { uri, + module, keypair, client, conduit: None, @@ -176,15 +178,16 @@ impl> ConduitService { pub async fn session_init(&mut self, nonce: &[u8]) -> Result { let session_keypair = Arc::new(Keypair::new()); let session_key = session_keypair.public_key(); + let module: &'static str = self.module; let msg = self .client .mk_session_init(nonce, session_key, self.keypair.clone()) .await?; self.send(msg) - .inspect_err(|err| warn!(%err, "failed to initialize session")) + .inspect_err(|err| warn!(module, %err, "failed to initialize session")) .await?; self.session_keypair = Some(session_keypair.clone()); - info!(%session_key, "initialized session"); + info!(module, %session_key, "initialized session"); Ok(()) } } diff --git a/src/service/packet_router.rs b/src/service/packet_router.rs index 394cf05a..18b1be25 100644 --- a/src/service/packet_router.rs +++ b/src/service/packet_router.rs @@ -100,7 +100,7 @@ impl std::ops::DerefMut for PacketRouterService { impl PacketRouterService { pub fn new(uri: Uri, keypair: Arc) -> Self { let client = PacketRouterConduitClient {}; - Self(ConduitService::new(uri, client, keypair)) + Self(ConduitService::new("packet_router", uri, client, keypair)) } pub async fn send_uplink(&mut self, mut msg: PacketRouterPacketUpV1) -> Result { diff --git a/src/service/poc.rs b/src/service/poc.rs index 466eab1a..adb71b47 100644 --- a/src/service/poc.rs +++ b/src/service/poc.rs @@ -81,9 +81,9 @@ impl std::ops::DerefMut for PocIotService { } impl PocIotService { - pub fn new(uri: Uri, keypair: Arc) -> Self { + pub fn new(module: &'static str, uri: Uri, keypair: Arc) -> Self { let client = PocIotConduitClient {}; - Self(ConduitService::new(uri, client, keypair)) + Self(ConduitService::new(module, uri, client, keypair)) } pub async fn send(&mut self, msg: lora_stream_request_v1::Request) -> Result {