Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC Ingest Streams #465

Merged
merged 8 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 107 additions & 107 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub use helium_proto::{
};
pub use server::LocalServer;

use crate::{Error, Result};
use crate::{Error, PublicKey, Result};

impl TryFrom<RouterRes> for crate::packet_router::RouterStatus {
type Error = Error;
Expand All @@ -20,7 +20,7 @@ impl TryFrom<RouterRes> for crate::packet_router::RouterStatus {
Ok(Self {
uri: http::Uri::from_str(&value.uri)?,
connected: value.connected,
session_key: helium_crypto::PublicKey::try_from(value.session_key).ok(),
session_key: PublicKey::try_from(value.session_key).ok(),
})
}
}
204 changes: 112 additions & 92 deletions src/beaconer.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
//! This module provides proof-of-coverage (PoC) beaconing support.

use crate::{
error::DecodeError,
gateway::{self, BeaconResp},
message_cache::MessageCache,
region_watcher,
service::{entropy::EntropyService, poc::PocIotService},
service::{entropy::EntropyService, poc::PocIotService, Reconnect},
settings::Settings,
sign, sync, Base64, Keypair, PacketUp, PublicKey, RegionParams, Result,
sync, Base64, DecodeError, PacketUp, PublicKey, RegionParams, Result,
};
use futures::TryFutureExt;
use helium_proto::{services::poc_lora, Message as ProtoMessage};
use helium_proto::services::poc_lora::{self, lora_stream_response_v1};
use http::Uri;
use std::sync::Arc;
use time::{Duration, Instant};
Expand Down Expand Up @@ -38,23 +36,24 @@ impl MessageSender {
pub struct Beaconer {
/// Beacon/Witness handling disabled
disabled: bool,
/// keypair to sign reports with
keypair: Arc<Keypair>,
/// gateway packet transmit message queue
transmit: gateway::MessageSender,
/// Our receive queue.
messages: MessageReceiver,
/// Service to deliver PoC reports to
service: PocIotService,
/// Service reconnect trigger
reconnect: Reconnect,
/// Region change queue
region_watch: region_watcher::MessageReceiver,
/// Beacon interval
interval: Duration,
// Time next beacon attempt is o be made
// Time next beacon attempt is to be made
next_beacon_time: Instant,
/// Last seen beacons
last_seen: MessageCache<Vec<u8>>,
/// Use for channel plan and FR parameters
region_params: RegionParams,
poc_ingest_uri: Uri,
region_params: Arc<RegionParams>,
entropy_uri: Uri,
}

Expand All @@ -66,14 +65,13 @@ impl Beaconer {
transmit: gateway::MessageSender,
) -> Self {
let interval = Duration::seconds(settings.poc.interval as i64);
let poc_ingest_uri = settings.poc.ingest_uri.clone();
let entropy_uri = settings.poc.entropy_uri.clone();
let keypair = settings.keypair.clone();
let region_params = region_watcher::current_value(&region_watch);
let service = PocIotService::new(settings.poc.ingest_uri.clone(), settings.keypair.clone());
let reconnect = Reconnect::default();
let region_params = Arc::new(region_watcher::current_value(&region_watch));
let disabled = settings.poc.disable;

Self {
keypair,
transmit,
messages,
region_watch,
Expand All @@ -84,16 +82,18 @@ impl Beaconer {
// cause the beacon to not occur
next_beacon_time: Instant::now() + interval,
region_params,
poc_ingest_uri,
service,
entropy_uri,
disabled,
reconnect,
}
}

pub async fn run(&mut self, shutdown: &triggered::Listener) -> Result {
info!(
beacon_interval = self.interval.whole_seconds(),
disabled = self.disabled,
uri = %self.service.uri,
"starting"
);

Expand Down Expand Up @@ -128,7 +128,7 @@ impl Beaconer {
if self.region_params.params.is_empty() {
// Calculate a random but deterministic time offset
// for this hotspot's beacons
let offset = mk_beacon_offset(self.keypair.public_key(), self.interval);
let offset = mk_beacon_offset(self.service.gateway_key(), self.interval);
// Get a delay for the first beacon based on the
// deterministic offset and the timestamp in the
// first region params. If there's an error
Expand All @@ -138,32 +138,42 @@ impl Beaconer {
info!(delay = delay.whole_seconds(), "first beacon");
self.next_beacon_time = Instant::now() + delay;
}
self.region_params = region_watcher::current_value(&self.region_watch);
self.region_params = Arc::new(region_watcher::current_value(&self.region_watch));
info!(region = RegionParams::to_string(&self.region_params), "region updated");
},
Err(_) => warn!("region watch disconnected"),
}

},
service_message = self.service.recv() => match service_message {
Ok(lora_stream_response_v1::Response::Offer(message)) => {
let session_result = self.handle_session_offer(message).await;
if session_result.is_ok() {
// (Re)set retry count to max to maximize time to
// next disconnect from service
self.reconnect.retry_count = self.reconnect.max_retries;
} else {
// Failed to handle session offer, disconnect
self.service.disconnect();
}
self.reconnect.update_next_time(session_result.is_err());
},
Err(err) => {
warn!(?err, "ingest error");
self.reconnect.update_next_time(true);
},
},
_ = self.reconnect.wait() => {
let reconnect_result = self.handle_reconnect().await;
self.reconnect.update_next_time(reconnect_result.is_err());
},

}
}
}

pub async fn mk_beacon(&self) -> Result<beacon::Beacon> {
self.region_params.check_valid()?;

let mut entropy_service = EntropyService::new(self.entropy_uri.clone());
let remote_entropy = entropy_service.get_entropy().await?;
let local_entropy = beacon::Entropy::local()?;

let beacon = beacon::Beacon::new(remote_entropy, local_entropy, &self.region_params)?;
Ok(beacon)
}

/// Sends a gateway-to-gateway packet.
///
/// See [`gateway::MessageSender::transmit_beacon`]
pub async fn send_beacon(&self, beacon: beacon::Beacon) -> Result<beacon::Beacon> {
pub async fn send_beacon(&mut self, beacon: beacon::Beacon) -> Result<beacon::Beacon> {
let beacon_id = beacon
.beacon_data()
.map(|data| data.to_b64())
Expand All @@ -178,59 +188,43 @@ impl Beaconer {
.map_ok(|BeaconResp { powe, tmst }| (powe, tmst))
.await?;

// Construct concurrent futures for connecting to the poc ingester and
// signing the report
let report_fut = self.mk_beacon_report(beacon.clone(), powe, tmst);
let service_fut = PocIotService::connect(self.poc_ingest_uri.clone());

match tokio::try_join!(report_fut, service_fut) {
Ok((report, mut poc_service)) => {
poc_service
.submit_beacon(report)
.inspect_err(|err| warn!(beacon_id, %err, "submit poc beacon report"))
.inspect_ok(|_| info!(beacon_id, "poc beacon report submitted"))
.await?
}
Err(err) => {
warn!(beacon_id, %err, "poc beacon report");
}
}
Self::mk_beacon_report(
beacon.clone(),
powe,
tmst,
self.service.gateway_key().clone(),
)
.and_then(|report| self.service.submit_beacon(report))
.inspect_err(|err| warn!(beacon_id, %err, "submit poc beacon report"))
.inspect_ok(|_| info!(beacon_id, "poc beacon report submitted"))
.await?;

Ok(beacon)
}

async fn mk_beacon_report(
&self,
beacon: beacon::Beacon,
conducted_power: i32,
tmst: u32,
) -> Result<poc_lora::LoraBeaconReportReqV1> {
let mut report = poc_lora::LoraBeaconReportReqV1::try_from(beacon)?;
report.tx_power = conducted_power;
report.tmst = tmst;
report.pub_key = self.keypair.public_key().to_vec();
report.signature = sign(self.keypair.clone(), report.encode_to_vec()).await?;
Ok(report)
async fn handle_session_offer(
&mut self,
message: poc_lora::LoraStreamSessionOfferV1,
) -> Result {
self.service.session_init(&message.nonce).await
}

async fn mk_witness_report(
&self,
packet: PacketUp,
payload: Vec<u8>,
) -> Result<poc_lora::LoraWitnessReportReqV1> {
let mut report = poc_lora::LoraWitnessReportReqV1::try_from(packet)?;
report.data = payload;
report.pub_key = self.keypair.public_key().to_vec();
report.signature = sign(self.keypair.clone(), report.encode_to_vec()).await?;
Ok(report)
async fn handle_reconnect(&mut self) -> Result {
// Do not send waiting reports on ok here since we wait for a session
// offer. Also do not reset the reconnect retry counter since only a
// session key indicates a good connection
self.service
.reconnect()
.inspect_err(|err| warn!(%err, "failed to reconnect"))
.await
}

async fn handle_beacon_tick(&mut self) {
if self.disabled {
return;
}
let last_beacon = self
.mk_beacon()

let last_beacon = Self::mk_beacon(self.region_params.clone(), self.entropy_uri.clone())
.inspect_err(|err| warn!(%err, "construct beacon"))
.and_then(|beacon| self.send_beacon(beacon))
.map_ok_or_else(|_| None, Some)
Expand Down Expand Up @@ -261,23 +255,49 @@ impl Beaconer {
return;
}

// Construct concurrent futures for connecting to the poc ingester and
// signing the report
let report_fut = self.mk_witness_report(packet, beacon_data);
let service_fut = PocIotService::connect(self.poc_ingest_uri.clone());

match tokio::try_join!(report_fut, service_fut) {
Ok((report, mut poc_service)) => {
let _ = poc_service
.submit_witness(report)
.inspect_err(|err| warn!(beacon_id, %err, "submit poc witness report"))
.inspect_ok(|_| info!(beacon_id, "poc witness report submitted"))
.await;
}
Err(err) => {
warn!(%err, "poc witness report");
}
}
let _ = Self::mk_witness_report(packet, beacon_data, self.service.gateway_key().clone())
.and_then(|report| self.service.submit_witness(report))
.inspect_err(|err| warn!(beacon_id, %err, "submit poc witness report"))
.inspect_ok(|_| info!(beacon_id, "poc witness report submitted"))
.await;
}

pub async fn mk_beacon(
region_params: Arc<RegionParams>,
entropy_uri: Uri,
) -> Result<beacon::Beacon> {
region_params.check_valid()?;

let mut entropy_service = EntropyService::new(entropy_uri);
let remote_entropy = entropy_service.get_entropy().await?;
let local_entropy = beacon::Entropy::local()?;

let beacon = beacon::Beacon::new(remote_entropy, local_entropy, &region_params)?;
Ok(beacon)
}

async fn mk_beacon_report(
beacon: beacon::Beacon,
conducted_power: i32,
tmst: u32,
gateway: PublicKey,
) -> Result<poc_lora::LoraBeaconReportReqV1> {
let mut report = poc_lora::LoraBeaconReportReqV1::try_from(beacon)?;
report.pub_key = gateway.to_vec();
report.tx_power = conducted_power;
report.tmst = tmst;
Ok(report)
}

async fn mk_witness_report(
packet: PacketUp,
payload: Vec<u8>,
gateway: PublicKey,
) -> Result<poc_lora::LoraWitnessReportReqV1> {
let mut report = poc_lora::LoraWitnessReportReqV1::try_from(packet)?;
report.pub_key = gateway.to_vec();
report.data = payload;
Ok(report)
}
}

Expand All @@ -287,7 +307,7 @@ fn mk_beacon_offset(key: &PublicKey, interval: Duration) -> Duration {
use rand::{Rng, SeedableRng};
use sha2::Digest;

let hash = sha2::Sha256::digest(&key.to_vec());
let hash = sha2::Sha256::digest(key.to_vec());
let mut rng = rand::rngs::StdRng::from_seed(*hash.as_ref());
Duration::seconds(rng.gen_range(0..interval.whole_seconds()))
}
Expand Down Expand Up @@ -371,14 +391,14 @@ mod test {

const PUBKEY_1: &str = "13WvV82S7QN3VMzMSieiGxvuaPKknMtf213E5JwPnboDkUfesKw";
const PUBKEY_2: &str = "14HZVR4bdF9QMowYxWrumcFBNfWnhDdD5XXA5za1fWwUhHxxFS1";
let pubkey_1 = helium_crypto::PublicKey::from_str(PUBKEY_1).expect("public key");
let pubkey_1 = crate::PublicKey::from_str(PUBKEY_1).expect("public key");
let offset_1 = mk_beacon_offset(&pubkey_1, time::Duration::hours(6));
// Same key and interval should always end up at the same offset
assert_eq!(
offset_1,
mk_beacon_offset(&pubkey_1, time::Duration::hours(6))
);
let pubkey_2 = helium_crypto::PublicKey::from_str(PUBKEY_2).expect("public key 2");
let pubkey_2 = crate::PublicKey::from_str(PUBKEY_2).expect("public key 2");
let offset_2 = mk_beacon_offset(&pubkey_2, time::Duration::hours(6));
assert_eq!(
offset_2,
Expand Down
12 changes: 8 additions & 4 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ pub enum ServiceError {
Stream,
#[error("channel closed")]
Channel,
#[error("no service")]
NoService,
#[error("no active session")]
NoSession,
#[error("age {age}s > {max_age}s")]
Check { age: u64, max_age: u64 },
#[error("Unable to connect to local server. Check that `helium_gateway` is running.")]
Expand Down Expand Up @@ -170,8 +170,12 @@ impl Error {
Error::Service(ServiceError::Channel)
}

pub fn no_service() -> Error {
Error::Service(ServiceError::NoService)
pub fn no_session() -> Error {
Error::Service(ServiceError::NoSession)
}

pub fn no_stream() -> Error {
Error::Service(ServiceError::Stream)
}

pub fn gateway_service_check(age: u64, max_age: u64) -> Error {
Expand Down
4 changes: 2 additions & 2 deletions src/keyed_uri.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl TryFrom<helium_proto::services::local::KeyedUri> for KeyedUri {
fn try_from(v: helium_proto::services::local::KeyedUri) -> Result<Self> {
let result = Self {
uri: http::Uri::from_str(&v.uri)?,
pubkey: Arc::new(helium_crypto::PublicKey::from_bytes(v.address)?),
pubkey: Arc::new(PublicKey::from_bytes(v.address)?),
};
Ok(result)
}
Expand All @@ -63,7 +63,7 @@ impl TryFrom<helium_proto::RoutingAddress> for KeyedUri {
fn try_from(v: helium_proto::RoutingAddress) -> Result<Self> {
let result = Self {
uri: http::Uri::from_str(&String::from_utf8_lossy(&v.uri))?,
pubkey: Arc::new(helium_crypto::PublicKey::from_bytes(v.pub_key)?),
pubkey: Arc::new(PublicKey::from_bytes(v.pub_key)?),
};
Ok(result)
}
Expand Down
Loading