Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Windowed Beacons #466

Merged
merged 6 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 133 additions & 111 deletions src/beaconer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::TryFutureExt;
use helium_proto::services::poc_lora::{self, lora_stream_response_v1};
use http::Uri;
use std::sync::Arc;
use time::{Duration, Instant};
use time::{Duration, Instant, OffsetDateTime};
use tracing::{info, warn};

/// Message types that can be sent to `Beaconer`'s inbox.
Expand Down Expand Up @@ -49,7 +49,7 @@ pub struct Beaconer {
/// Beacon interval
interval: Duration,
// Time next beacon attempt is to be made
next_beacon_time: Instant,
next_beacon_time: Option<OffsetDateTime>,
/// Last seen beacons
last_seen: MessageCache<Vec<u8>>,
/// Use for channel plan and FR parameters
Expand All @@ -66,7 +66,11 @@ impl Beaconer {
) -> Self {
let interval = Duration::seconds(settings.poc.interval as i64);
let entropy_uri = settings.poc.entropy_uri.clone();
let service = PocIotService::new(settings.poc.ingest_uri.clone(), settings.keypair.clone());
let service = PocIotService::new(
"beaconer",
settings.poc.ingest_uri.clone(),
settings.keypair.clone(),
);
let reconnect = Reconnect::default();
let region_params = Arc::new(region_watcher::current_value(&region_watch));
let disabled = settings.poc.disable;
Expand All @@ -77,10 +81,7 @@ impl Beaconer {
region_watch,
interval,
last_seen: MessageCache::new(15),
// Set a beacon at least an interval out... arrival of region_params
// will recalculate this time and no arrival of region_params will
// cause the beacon to not occur
next_beacon_time: Instant::now() + interval,
next_beacon_time: None,
region_params,
service,
entropy_uri,
Expand All @@ -97,15 +98,22 @@ impl Beaconer {
"starting"
);

let mut next_beacon_instant = Instant::now() + self.interval;

loop {
tokio::select! {
_ = shutdown.clone() => {
info!("shutting down");
return Ok(())
},
_ = tokio::time::sleep_until(self.next_beacon_time.into_inner().into()) => {
self.handle_beacon_tick().await;
self.next_beacon_time += self.interval;
_ = tokio::time::sleep_until(next_beacon_instant.into_inner().into()) => {
// Check if beaconing is enabled and we have valid region params
if !self.disabled && self.region_params.check_valid().is_ok() {
self.handle_beacon_tick().await;
} else {
// otherwise sleep for another interval period
next_beacon_instant = Instant::now() + self.interval;
}
},
message = self.messages.recv() => match message {
Some(Message::ReceivedBeacon(packet)) => self.handle_received_beacon(packet).await,
Expand All @@ -115,31 +123,41 @@ impl Beaconer {
},
region_change = self.region_watch.changed() => match region_change {
Ok(()) => {
// Recalculate beacon time based on if this
// was the first time region params have
// arrived. Ensure that the next beacon
// time is not the full interval if this is
// not the first region change
//
// Do the first time check below before
// region params are assigned
let new_region_params = region_watcher::current_value(&self.region_watch);

if self.region_params.params.is_empty() {
// Calculate a random but deterministic time offset
// for this hotspot's beacons
let offset = mk_beacon_offset(self.service.gateway_key(), self.interval);
// Get a delay for the first beacon based on the
// deterministic offset and the timestamp in the
// first region params. If there's an error
// converting the region param timestamp the
// calculated offset
let delay = mk_first_beacon_delay(new_region_params.timestamp, self.interval, offset).unwrap_or(offset);
info!(delay = delay.whole_seconds(), "first beacon");
self.next_beacon_time = Instant::now() + delay;
// Recalculate a potential next beacon time based on the
// timestamp in the region parameters.
let new_region_params = Arc::new(region_watcher::current_value(&self.region_watch));
// new region params can come back with the unknown
// region and empty region params. We don't accept
// anything but a valid region param before we set a
// beacon time
if new_region_params.check_valid().is_err() {
continue;
}
// If we can't parse the timestamp ignore the region change altogether
let Ok(new_timestamp) = OffsetDateTime::from_unix_timestamp(new_region_params.timestamp as i64) else {
continue;
};

// Calculate next beacon time
let new_beacon_time = mk_next_beacon_time(
new_timestamp,
self.next_beacon_time,
self.interval,
);

// Log next beacom time if changed
if Some(new_beacon_time) != self.next_beacon_time {
info!(beacon_time = %new_beacon_time, "next beacon time");
}
self.region_params = Arc::new(region_watcher::current_value(&self.region_watch));
info!(region = RegionParams::to_string(&self.region_params), "region updated");
self.next_beacon_time = Some(new_beacon_time);
next_beacon_instant = Instant::now() + (new_beacon_time - new_timestamp);

// Reduce noise, log param change if they actually
// changed
if self.region_params != new_region_params {
info!(region = RegionParams::to_string(&new_region_params), "region updated");
}
self.region_params = new_region_params;
},
Err(_) => warn!("region watch disconnected"),
},
Expand Down Expand Up @@ -220,11 +238,10 @@ impl Beaconer {
}

async fn handle_beacon_tick(&mut self) {
if self.disabled {
return;
}

let last_beacon = Self::mk_beacon(self.region_params.clone(), self.entropy_uri.clone())
// Need to clone to allow the subsequence borrow of self for send_beacon.
// The Arc around the region_params makes this a cheap clone
let region_params = self.region_params.clone();
let last_beacon = Self::mk_beacon(&region_params, self.entropy_uri.clone())
madninja marked this conversation as resolved.
Show resolved Hide resolved
.inspect_err(|err| warn!(%err, "construct beacon"))
.and_then(|beacon| self.send_beacon(beacon))
.map_ok_or_else(|_| None, Some)
Expand Down Expand Up @@ -263,7 +280,7 @@ impl Beaconer {
}

pub async fn mk_beacon(
region_params: Arc<RegionParams>,
region_params: &RegionParams,
entropy_uri: Uri,
) -> Result<beacon::Beacon> {
region_params.check_valid()?;
Expand All @@ -272,7 +289,7 @@ impl Beaconer {
let remote_entropy = entropy_service.get_entropy().await?;
let local_entropy = beacon::Entropy::local()?;

let beacon = beacon::Beacon::new(remote_entropy, local_entropy, &region_params)?;
let beacon = beacon::Beacon::new(remote_entropy, local_entropy, region_params)?;
Ok(beacon)
}

Expand Down Expand Up @@ -301,35 +318,36 @@ impl Beaconer {
}
}

/// Construct a random but deterministic offset for beaconing. This is based on
/// the public key as of this hotspot as the seed to a random number generator.
fn mk_beacon_offset(key: &PublicKey, interval: Duration) -> Duration {
use rand::{Rng, SeedableRng};
use sha2::Digest;

let hash = sha2::Sha256::digest(key.to_vec());
let mut rng = rand::rngs::StdRng::from_seed(*hash.as_ref());
Duration::seconds(rng.gen_range(0..interval.whole_seconds()))
fn random_duration(duration: Duration) -> Duration {
use rand::{rngs::OsRng, Rng};
Duration::seconds(OsRng.gen_range(0..duration.whole_seconds()))
}

/// Construct the first beacon time. This positions the given offset in the next
/// interval based wall clock segment. It returns the time to sleep until that
/// determinstic offset in the current or next segment.
fn mk_first_beacon_delay(
current_time: u64,
fn mk_next_beacon_time(
current_time: OffsetDateTime,
beacon_time: Option<OffsetDateTime>,
interval: Duration,
offset: Duration,
) -> Option<Duration> {
time::OffsetDateTime::from_unix_timestamp(current_time as i64)
.map(|now| {
let current_segment = duration_trunc(now, interval);
let mut first_time = current_segment + offset;
if first_time < now {
first_time += interval;
) -> OffsetDateTime {
let current_segment = duration_trunc(current_time, interval);
let next_segment = current_segment + interval;
match beacon_time {
// beacon time is in the future, just use it
Some(beacon_time) if beacon_time > current_time => beacon_time,
// beacon time is in the past, either in the current or previous segment
Some(beacon_time) => {
let beacon_segment = duration_trunc(beacon_time, interval);
if beacon_segment == current_segment {
// current segment: pick a time in the next segment
next_segment + random_duration(interval)
} else {
// previous segment: Pick a time in the remainder of this
// segment
current_time + random_duration(next_segment - current_time)
}
first_time - now
})
.ok()
}
// No next beacon time pick a time in the remainder of this segment
None => current_time + random_duration(next_segment - current_time),
}
}

/// Return a the given time truncated to the nearest duration. Based on
Expand Down Expand Up @@ -385,54 +403,58 @@ mod test {
}

#[test]
fn test_beacon_offset() {
use super::mk_beacon_offset;
use std::str::FromStr;

const PUBKEY_1: &str = "13WvV82S7QN3VMzMSieiGxvuaPKknMtf213E5JwPnboDkUfesKw";
const PUBKEY_2: &str = "14HZVR4bdF9QMowYxWrumcFBNfWnhDdD5XXA5za1fWwUhHxxFS1";
let pubkey_1 = crate::PublicKey::from_str(PUBKEY_1).expect("public key");
let offset_1 = mk_beacon_offset(&pubkey_1, time::Duration::hours(6));
// Same key and interval should always end up at the same offset
assert_eq!(
offset_1,
mk_beacon_offset(&pubkey_1, time::Duration::hours(6))
);
let pubkey_2 = crate::PublicKey::from_str(PUBKEY_2).expect("public key 2");
let offset_2 = mk_beacon_offset(&pubkey_2, time::Duration::hours(6));
assert_eq!(
offset_2,
mk_beacon_offset(&pubkey_2, time::Duration::hours(6))
);
// And two offsets based on different keys should not land at the same
// offset
assert_ne!(offset_1, offset_2);
}

#[test]
fn test_beacon_first_time() {
use super::mk_first_beacon_delay;
fn test_beacon_time() {
use super::{duration_trunc, mk_next_beacon_time};
use time::{macros::datetime, Duration};

let interval = Duration::hours(6);
let early_offset = Duration::minutes(10);
let late_offset = early_offset + Duration::hours(5);

let current_time = datetime!(2023-09-01 09:20 UTC);
let early_sleep =
mk_first_beacon_delay(current_time.unix_timestamp() as u64, interval, early_offset)
.unwrap_or(early_offset);
let late_sleep =
mk_first_beacon_delay(current_time.unix_timestamp() as u64, interval, late_offset)
.unwrap_or(late_offset);

assert_eq!(
datetime!(2023-09-01 12:10:00 UTC),
current_time + early_sleep
);
assert_eq!(
datetime!(2023-09-01 11:10:00 UTC),
current_time + late_sleep
);

let current_segment = duration_trunc(current_time, interval);
let next_segment = current_segment + interval;
assert!(current_time < next_segment);

// No beacon time, should pick a time in the current segment
{
let next_time = mk_next_beacon_time(current_time, None, interval);
assert!(next_time > current_time);
assert!(next_time < next_segment);
assert_eq!(current_segment, duration_trunc(next_time, interval));
}

// Beacon time in the future
{
// In this segment
let beacon_time = current_time + Duration::minutes(10);
assert_eq!(current_segment, duration_trunc(beacon_time, interval));
let next_time = mk_next_beacon_time(current_time, Some(beacon_time), interval);
assert!(next_time > current_time);
assert!(next_time < next_segment);
assert_eq!(current_segment, duration_trunc(next_time, interval));
// In the next segment
let beacon_time = next_segment + Duration::minutes(10);
let next_time = mk_next_beacon_time(current_time, Some(beacon_time), interval);
assert!(next_time > current_time);
assert!(next_time > next_segment);
assert_eq!(next_segment, duration_trunc(next_time, interval));
}

// Beacon time in the past
{
// This segment, should pick a time in the next segment
let beacon_time = current_segment + Duration::minutes(10);
assert!(beacon_time < current_time);
assert_eq!(current_segment, duration_trunc(beacon_time, interval));
let next_time = mk_next_beacon_time(current_time, Some(beacon_time), interval);
assert!(next_time > current_time);
assert_eq!(next_segment, duration_trunc(next_time, interval));

// Previous segment, should pick a time in this segment
let beacon_time = current_segment - Duration::minutes(10);
let next_time = mk_next_beacon_time(current_time, Some(beacon_time), interval);
assert!(next_time > current_time);
assert!(next_time < next_segment);
assert_eq!(current_segment, duration_trunc(next_time, interval));
}
}
}
9 changes: 7 additions & 2 deletions src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,13 @@ impl Gateway {
},
region_change = self.region_watch.changed() => match region_change {
Ok(()) => {
self.region_params = region_watcher::current_value(&self.region_watch);
info!(region = RegionParams::to_string(&self.region_params), "region updated");
let new_region_params = region_watcher::current_value(&self.region_watch);
// Only log if region parameters have changed to avoid
// log noise
if self.region_params != new_region_params {
info!(region = RegionParams::to_string(&new_region_params), "region updated");
}
self.region_params = new_region_params;
}
Err(_) => warn!("region watch disconnected")
},
Expand Down
6 changes: 3 additions & 3 deletions src/region_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ impl RegionWatcher {
Ok(None) => (),
Ok(Some(remote_params)) => {
self.request_retry = REGION_BACKOFF_RETRIES + 1;
if remote_params != *self.watch.borrow() {
_ = self.watch.send_replace(remote_params);
};
// We do not check for a change in params here since we
// want to propagate the timestamp in the remote params
_ = self.watch.send_replace(remote_params);
},
}
}
Expand Down
Loading