Skip to content

Commit

Permalink
Windowed Beacons (#466)
Browse files Browse the repository at this point in the history
* Add module info to conduit service messages
* Add windowed beacons
* log next beacon time if changed
  • Loading branch information
madninja authored Oct 19, 2023
1 parent 203d9dd commit 6ef7cd0
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 122 deletions.
244 changes: 133 additions & 111 deletions src/beaconer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::TryFutureExt;
use helium_proto::services::poc_lora::{self, lora_stream_response_v1};
use http::Uri;
use std::sync::Arc;
use time::{Duration, Instant};
use time::{Duration, Instant, OffsetDateTime};
use tracing::{info, warn};

/// Message types that can be sent to `Beaconer`'s inbox.
Expand Down Expand Up @@ -49,7 +49,7 @@ pub struct Beaconer {
/// Beacon interval
interval: Duration,
// Time next beacon attempt is to be made
next_beacon_time: Instant,
next_beacon_time: Option<OffsetDateTime>,
/// Last seen beacons
last_seen: MessageCache<Vec<u8>>,
/// Use for channel plan and FR parameters
Expand All @@ -66,7 +66,11 @@ impl Beaconer {
) -> Self {
let interval = Duration::seconds(settings.poc.interval as i64);
let entropy_uri = settings.poc.entropy_uri.clone();
let service = PocIotService::new(settings.poc.ingest_uri.clone(), settings.keypair.clone());
let service = PocIotService::new(
"beaconer",
settings.poc.ingest_uri.clone(),
settings.keypair.clone(),
);
let reconnect = Reconnect::default();
let region_params = Arc::new(region_watcher::current_value(&region_watch));
let disabled = settings.poc.disable;
Expand All @@ -77,10 +81,7 @@ impl Beaconer {
region_watch,
interval,
last_seen: MessageCache::new(15),
// Set a beacon at least an interval out... arrival of region_params
// will recalculate this time and no arrival of region_params will
// cause the beacon to not occur
next_beacon_time: Instant::now() + interval,
next_beacon_time: None,
region_params,
service,
entropy_uri,
Expand All @@ -97,15 +98,22 @@ impl Beaconer {
"starting"
);

let mut next_beacon_instant = Instant::now() + self.interval;

loop {
tokio::select! {
_ = shutdown.clone() => {
info!("shutting down");
return Ok(())
},
_ = tokio::time::sleep_until(self.next_beacon_time.into_inner().into()) => {
self.handle_beacon_tick().await;
self.next_beacon_time += self.interval;
_ = tokio::time::sleep_until(next_beacon_instant.into_inner().into()) => {
// Check if beaconing is enabled and we have valid region params
if !self.disabled && self.region_params.check_valid().is_ok() {
self.handle_beacon_tick().await;

This comment has been minimized.

Copy link
@ke6jjj

ke6jjj Oct 20, 2023

Did you forget to advance to the next beacon time here? The beacon happens, but it doesn't look like anyone computes the next beacon instant in this branch.

This comment has been minimized.

Copy link
@madninja

madninja Oct 20, 2023

Author Member

Yup fixed in #469

} else {
// otherwise sleep for another interval period
next_beacon_instant = Instant::now() + self.interval;
}
},
message = self.messages.recv() => match message {
Some(Message::ReceivedBeacon(packet)) => self.handle_received_beacon(packet).await,
Expand All @@ -115,31 +123,41 @@ impl Beaconer {
},
region_change = self.region_watch.changed() => match region_change {
Ok(()) => {
// Recalculate beacon time based on if this
// was the first time region params have
// arrived. Ensure that the next beacon
// time is not the full interval if this is
// not the first region change
//
// Do the first time check below before
// region params are assigned
let new_region_params = region_watcher::current_value(&self.region_watch);

if self.region_params.params.is_empty() {
// Calculate a random but deterministic time offset
// for this hotspot's beacons
let offset = mk_beacon_offset(self.service.gateway_key(), self.interval);
// Get a delay for the first beacon based on the
// deterministic offset and the timestamp in the
// first region params. If there's an error
// converting the region param timestamp the
// calculated offset
let delay = mk_first_beacon_delay(new_region_params.timestamp, self.interval, offset).unwrap_or(offset);
info!(delay = delay.whole_seconds(), "first beacon");
self.next_beacon_time = Instant::now() + delay;
// Recalculate a potential next beacon time based on the
// timestamp in the region parameters.
let new_region_params = Arc::new(region_watcher::current_value(&self.region_watch));
// new region params can come back with the unknown
// region and empty region params. We don't accept
// anything but a valid region param before we set a
// beacon time
if new_region_params.check_valid().is_err() {
continue;
}
// If we can't parse the timestamp ignore the region change altogether
let Ok(new_timestamp) = OffsetDateTime::from_unix_timestamp(new_region_params.timestamp as i64) else {
continue;
};

// Calculate next beacon time
let new_beacon_time = mk_next_beacon_time(
new_timestamp,
self.next_beacon_time,
self.interval,
);

// Log next beacom time if changed
if Some(new_beacon_time) != self.next_beacon_time {
info!(beacon_time = %new_beacon_time, "next beacon time");
}
self.region_params = Arc::new(region_watcher::current_value(&self.region_watch));
info!(region = RegionParams::to_string(&self.region_params), "region updated");
self.next_beacon_time = Some(new_beacon_time);
next_beacon_instant = Instant::now() + (new_beacon_time - new_timestamp);

// Reduce noise, log param change if they actually
// changed
if self.region_params != new_region_params {
info!(region = RegionParams::to_string(&new_region_params), "region updated");
}
self.region_params = new_region_params;
},
Err(_) => warn!("region watch disconnected"),
},
Expand Down Expand Up @@ -220,11 +238,10 @@ impl Beaconer {
}

async fn handle_beacon_tick(&mut self) {
if self.disabled {
return;
}

let last_beacon = Self::mk_beacon(self.region_params.clone(), self.entropy_uri.clone())
// Need to clone to allow the subsequence borrow of self for send_beacon.
// The Arc around the region_params makes this a cheap clone
let region_params = self.region_params.clone();
let last_beacon = Self::mk_beacon(&region_params, self.entropy_uri.clone())
.inspect_err(|err| warn!(%err, "construct beacon"))
.and_then(|beacon| self.send_beacon(beacon))
.map_ok_or_else(|_| None, Some)
Expand Down Expand Up @@ -263,7 +280,7 @@ impl Beaconer {
}

pub async fn mk_beacon(
region_params: Arc<RegionParams>,
region_params: &RegionParams,
entropy_uri: Uri,
) -> Result<beacon::Beacon> {
region_params.check_valid()?;
Expand All @@ -272,7 +289,7 @@ impl Beaconer {
let remote_entropy = entropy_service.get_entropy().await?;
let local_entropy = beacon::Entropy::local()?;

let beacon = beacon::Beacon::new(remote_entropy, local_entropy, &region_params)?;
let beacon = beacon::Beacon::new(remote_entropy, local_entropy, region_params)?;
Ok(beacon)
}

Expand Down Expand Up @@ -301,35 +318,36 @@ impl Beaconer {
}
}

/// Construct a random but deterministic offset for beaconing. This is based on
/// the public key as of this hotspot as the seed to a random number generator.
fn mk_beacon_offset(key: &PublicKey, interval: Duration) -> Duration {
use rand::{Rng, SeedableRng};
use sha2::Digest;

let hash = sha2::Sha256::digest(key.to_vec());
let mut rng = rand::rngs::StdRng::from_seed(*hash.as_ref());
Duration::seconds(rng.gen_range(0..interval.whole_seconds()))
fn random_duration(duration: Duration) -> Duration {
use rand::{rngs::OsRng, Rng};
Duration::seconds(OsRng.gen_range(0..duration.whole_seconds()))
}

/// Construct the first beacon time. This positions the given offset in the next
/// interval based wall clock segment. It returns the time to sleep until that
/// determinstic offset in the current or next segment.
fn mk_first_beacon_delay(
current_time: u64,
fn mk_next_beacon_time(
current_time: OffsetDateTime,
beacon_time: Option<OffsetDateTime>,
interval: Duration,
offset: Duration,
) -> Option<Duration> {
time::OffsetDateTime::from_unix_timestamp(current_time as i64)
.map(|now| {
let current_segment = duration_trunc(now, interval);
let mut first_time = current_segment + offset;
if first_time < now {
first_time += interval;
) -> OffsetDateTime {
let current_segment = duration_trunc(current_time, interval);
let next_segment = current_segment + interval;
match beacon_time {
// beacon time is in the future, just use it
Some(beacon_time) if beacon_time > current_time => beacon_time,
// beacon time is in the past, either in the current or previous segment
Some(beacon_time) => {
let beacon_segment = duration_trunc(beacon_time, interval);
if beacon_segment == current_segment {
// current segment: pick a time in the next segment
next_segment + random_duration(interval)
} else {
// previous segment: Pick a time in the remainder of this
// segment
current_time + random_duration(next_segment - current_time)
}
first_time - now
})
.ok()
}
// No next beacon time pick a time in the remainder of this segment
None => current_time + random_duration(next_segment - current_time),
}
}

/// Return a the given time truncated to the nearest duration. Based on
Expand Down Expand Up @@ -385,54 +403,58 @@ mod test {
}

#[test]
fn test_beacon_offset() {
use super::mk_beacon_offset;
use std::str::FromStr;

const PUBKEY_1: &str = "13WvV82S7QN3VMzMSieiGxvuaPKknMtf213E5JwPnboDkUfesKw";
const PUBKEY_2: &str = "14HZVR4bdF9QMowYxWrumcFBNfWnhDdD5XXA5za1fWwUhHxxFS1";
let pubkey_1 = crate::PublicKey::from_str(PUBKEY_1).expect("public key");
let offset_1 = mk_beacon_offset(&pubkey_1, time::Duration::hours(6));
// Same key and interval should always end up at the same offset
assert_eq!(
offset_1,
mk_beacon_offset(&pubkey_1, time::Duration::hours(6))
);
let pubkey_2 = crate::PublicKey::from_str(PUBKEY_2).expect("public key 2");
let offset_2 = mk_beacon_offset(&pubkey_2, time::Duration::hours(6));
assert_eq!(
offset_2,
mk_beacon_offset(&pubkey_2, time::Duration::hours(6))
);
// And two offsets based on different keys should not land at the same
// offset
assert_ne!(offset_1, offset_2);
}

#[test]
fn test_beacon_first_time() {
use super::mk_first_beacon_delay;
fn test_beacon_time() {
use super::{duration_trunc, mk_next_beacon_time};
use time::{macros::datetime, Duration};

let interval = Duration::hours(6);
let early_offset = Duration::minutes(10);
let late_offset = early_offset + Duration::hours(5);

let current_time = datetime!(2023-09-01 09:20 UTC);
let early_sleep =
mk_first_beacon_delay(current_time.unix_timestamp() as u64, interval, early_offset)
.unwrap_or(early_offset);
let late_sleep =
mk_first_beacon_delay(current_time.unix_timestamp() as u64, interval, late_offset)
.unwrap_or(late_offset);

assert_eq!(
datetime!(2023-09-01 12:10:00 UTC),
current_time + early_sleep
);
assert_eq!(
datetime!(2023-09-01 11:10:00 UTC),
current_time + late_sleep
);

let current_segment = duration_trunc(current_time, interval);
let next_segment = current_segment + interval;
assert!(current_time < next_segment);

// No beacon time, should pick a time in the current segment
{
let next_time = mk_next_beacon_time(current_time, None, interval);
assert!(next_time > current_time);
assert!(next_time < next_segment);
assert_eq!(current_segment, duration_trunc(next_time, interval));
}

// Beacon time in the future
{
// In this segment
let beacon_time = current_time + Duration::minutes(10);
assert_eq!(current_segment, duration_trunc(beacon_time, interval));
let next_time = mk_next_beacon_time(current_time, Some(beacon_time), interval);
assert!(next_time > current_time);
assert!(next_time < next_segment);
assert_eq!(current_segment, duration_trunc(next_time, interval));
// In the next segment
let beacon_time = next_segment + Duration::minutes(10);
let next_time = mk_next_beacon_time(current_time, Some(beacon_time), interval);
assert!(next_time > current_time);
assert!(next_time > next_segment);
assert_eq!(next_segment, duration_trunc(next_time, interval));
}

// Beacon time in the past
{
// This segment, should pick a time in the next segment
let beacon_time = current_segment + Duration::minutes(10);
assert!(beacon_time < current_time);
assert_eq!(current_segment, duration_trunc(beacon_time, interval));
let next_time = mk_next_beacon_time(current_time, Some(beacon_time), interval);
assert!(next_time > current_time);
assert_eq!(next_segment, duration_trunc(next_time, interval));

// Previous segment, should pick a time in this segment
let beacon_time = current_segment - Duration::minutes(10);
let next_time = mk_next_beacon_time(current_time, Some(beacon_time), interval);
assert!(next_time > current_time);
assert!(next_time < next_segment);
assert_eq!(current_segment, duration_trunc(next_time, interval));
}
}
}
9 changes: 7 additions & 2 deletions src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,13 @@ impl Gateway {
},
region_change = self.region_watch.changed() => match region_change {
Ok(()) => {
self.region_params = region_watcher::current_value(&self.region_watch);
info!(region = RegionParams::to_string(&self.region_params), "region updated");
let new_region_params = region_watcher::current_value(&self.region_watch);
// Only log if region parameters have changed to avoid
// log noise
if self.region_params != new_region_params {
info!(region = RegionParams::to_string(&new_region_params), "region updated");
}
self.region_params = new_region_params;
}
Err(_) => warn!("region watch disconnected")
},
Expand Down
6 changes: 3 additions & 3 deletions src/region_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ impl RegionWatcher {
Ok(None) => (),
Ok(Some(remote_params)) => {
self.request_retry = REGION_BACKOFF_RETRIES + 1;
if remote_params != *self.watch.borrow() {
_ = self.watch.send_replace(remote_params);
};
// We do not check for a change in params here since we
// want to propagate the timestamp in the remote params
_ = self.watch.send_replace(remote_params);
},
}
}
Expand Down
Loading

0 comments on commit 6ef7cd0

Please sign in to comment.