From fa42bc43dc4f29d1f64def45c33e5b1d212f6d49 Mon Sep 17 00:00:00 2001 From: Marc Nijdam Date: Tue, 10 Oct 2023 13:54:02 -0700 Subject: [PATCH] Remove most sharing of keypairs And move keypair and session keypair management to the conduit itself. --- src/api/mod.rs | 4 +- src/beaconer.rs | 91 +++++++++--------------------------- src/error.rs | 8 ++-- src/keyed_uri.rs | 4 +- src/keypair.rs | 66 +++++++++++++++++--------- src/lib.rs | 53 +++++++++++++++++---- src/packet_router/mod.rs | 85 +++++++-------------------------- src/service/conduit.rs | 52 ++++++++++++++++++++- src/service/config.rs | 17 +++++-- src/service/packet_router.rs | 36 ++++++++++++-- src/service/poc.rs | 45 ++++++++++++++---- 11 files changed, 264 insertions(+), 197 deletions(-) diff --git a/src/api/mod.rs b/src/api/mod.rs index 60f5e7fe..4509fb13 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -11,7 +11,7 @@ pub use helium_proto::{ }; pub use server::LocalServer; -use crate::{Error, Result}; +use crate::{Error, PublicKey, Result}; impl TryFrom for crate::packet_router::RouterStatus { type Error = Error; @@ -20,7 +20,7 @@ impl TryFrom for crate::packet_router::RouterStatus { Ok(Self { uri: http::Uri::from_str(&value.uri)?, connected: value.connected, - session_key: helium_crypto::PublicKey::try_from(value.session_key).ok(), + session_key: PublicKey::try_from(value.session_key).ok(), }) } } diff --git a/src/beaconer.rs b/src/beaconer.rs index 6a8872fb..2006f2d1 100644 --- a/src/beaconer.rs +++ b/src/beaconer.rs @@ -1,20 +1,15 @@ //! This module provides proof-of-coverage (PoC) beaconing support. - use crate::{ error::DecodeError, gateway::{self, BeaconResp}, - keypair::mk_session_keypair, message_cache::MessageCache, region_watcher, service::{entropy::EntropyService, poc::PocIotService, Reconnect}, settings::Settings, - sign, sync, Base64, Error, Keypair, PacketUp, PublicKey, RegionParams, Result, + sync, Base64, PacketUp, PublicKey, RegionParams, Result, }; use futures::TryFutureExt; -use helium_proto::{ - services::poc_lora::{self, lora_stream_request_v1, lora_stream_response_v1}, - Message as ProtoMessage, -}; +use helium_proto::services::poc_lora::{self, lora_stream_response_v1}; use http::Uri; use std::sync::Arc; use time::{Duration, Instant}; @@ -42,10 +37,6 @@ impl MessageSender { pub struct Beaconer { /// Beacon/Witness handling disabled disabled: bool, - /// keypair to sign session init with - keypair: Arc, - /// session keypair to use for reports - session_key: Option>, /// gateway packet transmit message queue transmit: gateway::MessageSender, /// Our receive queue. @@ -77,14 +68,11 @@ impl Beaconer { let interval = Duration::seconds(settings.poc.interval as i64); let entropy_uri = settings.poc.entropy_uri.clone(); let service = PocIotService::new(settings.poc.ingest_uri.clone(), settings.keypair.clone()); - let keypair = settings.keypair.clone(); let reconnect = Reconnect::default(); let region_params = Arc::new(region_watcher::current_value(®ion_watch)); let disabled = settings.poc.disable; Self { - keypair, - session_key: None, transmit, messages, region_watch, @@ -106,6 +94,7 @@ impl Beaconer { info!( beacon_interval = self.interval.whole_seconds(), disabled = self.disabled, + uri = %self.service.uri, "starting" ); @@ -140,7 +129,7 @@ impl Beaconer { if self.region_params.params.is_empty() { // Calculate a random but deterministic time offset // for this hotspot's beacons - let offset = mk_beacon_offset(self.keypair.public_key(), self.interval); + let offset = mk_beacon_offset(self.service.gateway_key(), self.interval); // Get a delay for the first beacon based on the // deterministic offset and the timestamp in the // first region params. If there's an error @@ -204,17 +193,16 @@ impl Beaconer { .map_ok(|BeaconResp { powe, tmst }| (powe, tmst)) .await?; - // Check if a session key is available to sign the report - let Some(session_key) = self.session_key.clone() else { - warn!(%beacon_id, "no session key for beacon report"); - return Err(Error::no_service()); - }; - - Self::mk_beacon_report(beacon.clone(), powe, tmst, session_key) - .and_then(|report| self.service.submit_beacon(report)) - .inspect_err(|err| warn!(beacon_id, %err, "submit poc beacon report")) - .inspect_ok(|_| info!(beacon_id, "poc beacon report submitted")) - .await?; + Self::mk_beacon_report( + beacon.clone(), + powe, + tmst, + self.service.gateway_key().clone(), + ) + .and_then(|report| self.service.submit_beacon(report)) + .inspect_err(|err| warn!(beacon_id, %err, "submit poc beacon report")) + .inspect_ok(|_| info!(beacon_id, "poc beacon report submitted")) + .await?; Ok(beacon) } @@ -223,15 +211,7 @@ impl Beaconer { &mut self, message: poc_lora::LoraStreamSessionOfferV1, ) -> Result { - let session_key = mk_session_key_init(self.keypair.clone(), &message) - .and_then(|(session_key, session_init)| { - self.service.send(session_init).map_ok(|_| session_key) - }) - .inspect_err(|err| warn!(%err, "failed to initialize session")) - .await?; - self.session_key = Some(session_key.clone()); - info!(session_key = %session_key.public_key(),"initialized session"); - Ok(()) + self.service.session_init(&message.nonce).await } async fn handle_reconnect(&mut self) -> Result { @@ -280,13 +260,7 @@ impl Beaconer { return; } - // Check if a session key is available to sign the report - let Some(session_key) = self.session_key.clone() else { - warn!(%beacon_id, "no session key for witness report"); - return; - }; - - let _ = Self::mk_witness_report(packet, beacon_data, session_key) + let _ = Self::mk_witness_report(packet, beacon_data, self.service.gateway_key().clone()) .and_then(|report| self.service.submit_witness(report)) .inspect_err(|err| warn!(beacon_id, %err, "submit poc witness report")) .inspect_ok(|_| info!(beacon_id, "poc witness report submitted")) @@ -295,7 +269,6 @@ impl Beaconer { fn disconnect(&mut self) { self.service.disconnect(); - self.session_key = None; } pub async fn mk_beacon( @@ -316,47 +289,27 @@ impl Beaconer { beacon: beacon::Beacon, conducted_power: i32, tmst: u32, - keypair: Arc, + gateway: PublicKey, ) -> Result { let mut report = poc_lora::LoraBeaconReportReqV1::try_from(beacon)?; + report.pub_key = gateway.to_vec(); report.tx_power = conducted_power; report.tmst = tmst; - report.pub_key = keypair.public_key().to_vec(); - report.signature = sign(keypair.clone(), report.encode_to_vec()).await?; Ok(report) } async fn mk_witness_report( packet: PacketUp, payload: Vec, - keypair: Arc, + gateway: PublicKey, ) -> Result { let mut report = poc_lora::LoraWitnessReportReqV1::try_from(packet)?; + report.pub_key = gateway.to_vec(); report.data = payload; - report.pub_key = keypair.public_key().to_vec(); - report.signature = sign(keypair.clone(), report.encode_to_vec()).await?; Ok(report) } } -pub async fn mk_session_key_init( - keypair: Arc, - offer: &poc_lora::LoraStreamSessionOfferV1, -) -> Result<(Arc, lora_stream_request_v1::Request)> { - let session_keypair = Arc::new(mk_session_keypair()); - let session_key = session_keypair.public_key(); - - let mut session_init = poc_lora::LoraStreamSessionInitV1 { - pub_key: keypair.public_key().into(), - session_key: session_key.into(), - nonce: offer.nonce.clone(), - signature: vec![], - }; - session_init.signature = sign(keypair, session_init.encode_to_vec()).await?; - let envelope = lora_stream_request_v1::Request::SessionInit(session_init); - Ok((session_keypair, envelope)) -} - /// Construct a random but deterministic offset for beaconing. This is based on /// the public key as of this hotspot as the seed to a random number generator. fn mk_beacon_offset(key: &PublicKey, interval: Duration) -> Duration { @@ -447,14 +400,14 @@ mod test { const PUBKEY_1: &str = "13WvV82S7QN3VMzMSieiGxvuaPKknMtf213E5JwPnboDkUfesKw"; const PUBKEY_2: &str = "14HZVR4bdF9QMowYxWrumcFBNfWnhDdD5XXA5za1fWwUhHxxFS1"; - let pubkey_1 = helium_crypto::PublicKey::from_str(PUBKEY_1).expect("public key"); + let pubkey_1 = crate::PublicKey::from_str(PUBKEY_1).expect("public key"); let offset_1 = mk_beacon_offset(&pubkey_1, time::Duration::hours(6)); // Same key and interval should always end up at the same offset assert_eq!( offset_1, mk_beacon_offset(&pubkey_1, time::Duration::hours(6)) ); - let pubkey_2 = helium_crypto::PublicKey::from_str(PUBKEY_2).expect("public key 2"); + let pubkey_2 = crate::PublicKey::from_str(PUBKEY_2).expect("public key 2"); let offset_2 = mk_beacon_offset(&pubkey_2, time::Duration::hours(6)); assert_eq!( offset_2, diff --git a/src/error.rs b/src/error.rs index 55671b08..e389bf06 100644 --- a/src/error.rs +++ b/src/error.rs @@ -77,8 +77,8 @@ pub enum ServiceError { Stream, #[error("channel closed")] Channel, - #[error("no service")] - NoService, + #[error("no active session")] + NoSession, #[error("age {age}s > {max_age}s")] Check { age: u64, max_age: u64 }, #[error("Unable to connect to local server. Check that `helium_gateway` is running.")] @@ -170,8 +170,8 @@ impl Error { Error::Service(ServiceError::Channel) } - pub fn no_service() -> Error { - Error::Service(ServiceError::NoService) + pub fn no_session() -> Error { + Error::Service(ServiceError::NoSession) } pub fn gateway_service_check(age: u64, max_age: u64) -> Error { diff --git a/src/keyed_uri.rs b/src/keyed_uri.rs index c929f5d8..50b5746a 100644 --- a/src/keyed_uri.rs +++ b/src/keyed_uri.rs @@ -43,7 +43,7 @@ impl TryFrom for KeyedUri { fn try_from(v: helium_proto::services::local::KeyedUri) -> Result { let result = Self { uri: http::Uri::from_str(&v.uri)?, - pubkey: Arc::new(helium_crypto::PublicKey::from_bytes(v.address)?), + pubkey: Arc::new(PublicKey::from_bytes(v.address)?), }; Ok(result) } @@ -63,7 +63,7 @@ impl TryFrom for KeyedUri { fn try_from(v: helium_proto::RoutingAddress) -> Result { let result = Self { uri: http::Uri::from_str(&String::from_utf8_lossy(&v.uri))?, - pubkey: Arc::new(helium_crypto::PublicKey::from_bytes(v.pub_key)?), + pubkey: Arc::new(PublicKey::from_bytes(v.pub_key)?), }; Ok(result) } diff --git a/src/keypair.rs b/src/keypair.rs index 97bae71e..39e75125 100644 --- a/src/keypair.rs +++ b/src/keypair.rs @@ -1,4 +1,4 @@ -use crate::*; +use crate::{error, Error, Result}; #[cfg(feature = "ecc608")] use helium_crypto::ecc608; #[cfg(feature = "tpm")] @@ -10,33 +10,21 @@ use serde::{de, Deserializer}; #[cfg(feature = "ecc608")] use std::path::Path; use std::{collections::HashMap, convert::TryFrom, fmt, fs, io, path, str::FromStr}; +use tonic::async_trait; #[derive(Debug)] pub struct Keypair(helium_crypto::Keypair); pub type PublicKey = helium_crypto::PublicKey; -pub fn load_from_file(path: &str) -> error::Result { - let data = fs::read(path)?; - Ok(helium_crypto::Keypair::try_from(&data[..])?.into()) -} - -pub fn save_to_file(keypair: &Keypair, path: &str) -> io::Result<()> { - if let Some(parent) = path::PathBuf::from(path).parent() { - fs::create_dir_all(parent)?; - }; - fs::write(path, keypair.0.to_vec())?; - Ok(()) +#[async_trait] +pub trait Sign { + async fn sign(&mut self, keypair: K) -> Result + where + K: AsRef + std::marker::Send + 'static; } -pub fn mk_session_keypair() -> Keypair { - let keypair = helium_crypto::Keypair::generate( - KeyTag { - network: Network::MainNet, - key_type: KeyType::Ed25519, - }, - &mut OsRng, - ); - keypair.into() +pub trait Verify { + fn verify(&self, pub_key: &crate::PublicKey) -> Result; } macro_rules! uri_error { @@ -61,7 +49,7 @@ impl FromStr for Keypair { .parse() .map_err(|err| uri_error!("invalid keypair url \"{str}\": {err:?}"))?; match url.scheme_str() { - Some("file") | None => match load_from_file(url.path()) { + Some("file") | None => match Self::load_from_file(url.path()) { Ok(k) => Ok(k), Err(Error::IO(io_error)) if io_error.kind() == std::io::ErrorKind::NotFound => { let args = KeypairArgs::from_uri(&url)?; @@ -74,7 +62,7 @@ impl FromStr for Keypair { &mut OsRng, ) .into(); - save_to_file(&new_key, url.path()).map_err(|err| { + new_key.save_to_file(url.path()).map_err(|err| { uri_error!("unable to save key file \"{}\": {err:?}", url.path()) })?; Ok(new_key) @@ -137,6 +125,38 @@ impl std::ops::Deref for Keypair { } } +impl Keypair { + pub fn new() -> Self { + let keypair = helium_crypto::Keypair::generate( + KeyTag { + network: Network::MainNet, + key_type: KeyType::Ed25519, + }, + &mut OsRng, + ); + keypair.into() + } + + pub fn load_from_file(path: &str) -> Result { + let data = fs::read(path)?; + Ok(helium_crypto::Keypair::try_from(&data[..])?.into()) + } + + pub fn save_to_file(&self, path: &str) -> io::Result<()> { + if let Some(parent) = path::PathBuf::from(path).parent() { + fs::create_dir_all(parent)?; + }; + fs::write(path, self.0.to_vec())?; + Ok(()) + } +} + +impl Default for Keypair { + fn default() -> Self { + Self::new() + } +} + #[derive(Debug)] struct KeypairArgs(HashMap); diff --git a/src/lib.rs b/src/lib.rs index ac59cf19..db491cd2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,7 +21,7 @@ pub(crate) use crate::base64::Base64; pub use beacon::{Region, RegionParams}; pub use error::{Error, Result}; pub use keyed_uri::KeyedUri; -pub use keypair::{Keypair, PublicKey}; +pub use keypair::{Keypair, PublicKey, Sign, Verify}; pub use packet::{PacketDown, PacketUp}; pub use settings::Settings; @@ -34,7 +34,7 @@ pub type Future = Pin> + Send>>; /// A type alias for `Stream` that may result in `crate::error::Error` pub type Stream = Pin> + Send>>; -pub async fn sign(keypair: K, data: Vec) -> Result> +async fn sign(keypair: K, data: Vec) -> Result> where K: AsRef + std::marker::Send + 'static, { @@ -49,13 +49,46 @@ where .await? } -macro_rules! verify { - ($key: expr, $msg: expr, $sig: ident) => {{ - let mut _msg = $msg.clone(); - _msg.$sig = vec![]; - let buf = _msg.encode_to_vec(); - $key.verify(&buf, &$msg.$sig).map_err(Error::from) - }}; +macro_rules! impl_sign { + ($type: ty) => { + #[tonic::async_trait] + impl Sign for $type { + async fn sign(&mut self, keypair: K) -> Result + where + K: AsRef + std::marker::Send + 'static, + { + self.signature = crate::sign(keypair, self.encode_to_vec()).await?; + Ok(()) + } + } + }; } +pub(crate) use impl_sign; -pub(crate) use verify; +macro_rules! impl_verify { + ($type: ty) => { + impl crate::Verify for $type { + fn verify(&self, pub_key: &crate::PublicKey) -> Result { + use helium_crypto::Verify as _; + let mut _msg = self.clone(); + _msg.signature = vec![]; + let buf = _msg.encode_to_vec(); + pub_key + .verify(&buf, &self.signature) + .map_err(crate::Error::from) + } + } + }; +} +pub(crate) use impl_verify; + +// macro_rules! verify { +// ($key: expr, $msg: expr, $sig: ident) => {{ +// let mut _msg = $msg.clone(); +// _msg.$sig = vec![]; +// let buf = _msg.encode_to_vec(); +// $key.verify(&buf, &$msg.$sig).map_err(Error::from) +// }}; +// } + +// pub(crate) use verify; diff --git a/src/packet_router/mod.rs b/src/packet_router/mod.rs index 1afeeafe..1e156861 100644 --- a/src/packet_router/mod.rs +++ b/src/packet_router/mod.rs @@ -1,20 +1,15 @@ use crate::{ gateway, - keypair::mk_session_keypair, message_cache::{CacheMessage, MessageCache}, service::{packet_router::PacketRouterService, Reconnect}, - sign, sync, Base64, Keypair, PacketUp, Result, Settings, + sync, Base64, PacketUp, PublicKey, Result, Settings, }; use futures::TryFutureExt; -use helium_proto::{ - services::router::{ - envelope_down_v1, envelope_up_v1, PacketRouterPacketDownV1, PacketRouterPacketUpV1, - PacketRouterSessionInitV1, PacketRouterSessionOfferV1, - }, - Message as ProtoMessage, +use helium_proto::services::router::{ + envelope_down_v1, PacketRouterPacketDownV1, PacketRouterPacketUpV1, PacketRouterSessionOfferV1, }; use serde::Serialize; -use std::{sync::Arc, time::Instant as StdInstant}; +use std::{ops::Deref, time::Instant as StdInstant}; use tokio::time::Duration; use tracing::{debug, info, warn}; @@ -35,7 +30,7 @@ pub struct RouterStatus { #[serde(with = "http_serde::uri")] pub uri: http::Uri, pub connected: bool, - pub session_key: Option, + pub session_key: Option, } pub type MessageSender = sync::MessageSender; @@ -60,8 +55,6 @@ pub struct PacketRouter { transmit: gateway::MessageSender, service: PacketRouterService, reconnect: Reconnect, - session_key: Option>, - keypair: Arc, store: MessageCache, } @@ -78,8 +71,6 @@ impl PacketRouter { let reconnect = Reconnect::default(); Self { service, - keypair: settings.keypair.clone(), - session_key: None, transmit, messages, store, @@ -108,7 +99,7 @@ impl PacketRouter { let status = RouterStatus { uri: self.service.uri.clone(), connected: self.service.is_connected(), - session_key: self.session_key.as_ref().map(|keypair| keypair.public_key().to_owned()), + session_key: self.service.session_key().cloned(), }; tx_resp.send(status) } @@ -158,9 +149,7 @@ impl PacketRouter { async fn handle_uplink(&mut self, uplink: PacketUp, received: StdInstant) -> Result { self.store.push_back(uplink, received); if self.service.is_connected() { - if let Some(session_key) = &self.session_key { - self.send_waiting_packets(session_key.clone()).await?; - } + self.send_waiting_packets().await?; } Ok(()) } @@ -170,31 +159,22 @@ impl PacketRouter { } async fn handle_session_offer(&mut self, message: PacketRouterSessionOfferV1) -> Result { - let session_key = mk_session_key_init(self.keypair.clone(), &message) - .and_then(|(session_key, session_init)| { - self.service.send(session_init).map_ok(|_| session_key) - }) - .inspect_err(|err| warn!(%err, "failed to initialize session")) - .await?; - self.session_key = Some(session_key.clone()); - info!(session_key = %session_key.public_key(),"initialized session"); - self.send_waiting_packets(session_key.clone()) + self.service.session_init(&message.nonce).await?; + self.send_waiting_packets() .inspect_err(|err| warn!(%err, "failed to send queued packets")) - .await?; - Ok(()) + .await } fn disconnect(&mut self) { self.service.disconnect(); - self.session_key = None; } - async fn send_waiting_packets(&mut self, keypair: Arc) -> Result { + async fn send_waiting_packets(&mut self) -> Result { while let (removed, Some(packet)) = self.store.pop_front(STORE_GC_INTERVAL) { if removed > 0 { info!(removed, "discarded queued packets"); } - if let Err(err) = self.send_packet(&packet, keypair.clone()).await { + if let Err(err) = self.send_packet(&packet).await { warn!(%err, "failed to send uplink"); self.store.push_front(packet); return Err(err); @@ -203,44 +183,11 @@ impl PacketRouter { Ok(()) } - async fn send_packet( - &mut self, - packet: &CacheMessage, - keypair: Arc, - ) -> Result { + async fn send_packet(&mut self, packet: &CacheMessage) -> Result { debug!(packet_hash = packet.hash().to_b64(), "sending packet"); - let uplink = mk_uplink(packet, keypair).await?; - self.service.send(uplink).await + let mut uplink: PacketRouterPacketUpV1 = packet.deref().into(); + uplink.hold_time = packet.hold_time().as_millis() as u64; + self.service.send_uplink(uplink).await } } - -pub async fn mk_uplink( - packet: &CacheMessage, - keypair: Arc, -) -> Result { - use std::ops::Deref; - let mut uplink: PacketRouterPacketUpV1 = packet.deref().into(); - uplink.hold_time = packet.hold_time().as_millis() as u64; - uplink.signature = sign(keypair, uplink.encode_to_vec()).await?; - let envelope = envelope_up_v1::Data::Packet(uplink); - Ok(envelope) -} - -pub async fn mk_session_key_init( - keypair: Arc, - offer: &PacketRouterSessionOfferV1, -) -> Result<(Arc, envelope_up_v1::Data)> { - let session_keypair = Arc::new(mk_session_keypair()); - let session_key = session_keypair.public_key(); - - let mut session_init = PacketRouterSessionInitV1 { - gateway: keypair.public_key().into(), - session_key: session_key.into(), - nonce: offer.nonce.clone(), - signature: vec![], - }; - session_init.signature = sign(keypair, session_init.encode_to_vec()).await?; - let envelope = envelope_up_v1::Data::SessionInit(session_init); - Ok((session_keypair, envelope)) -} diff --git a/src/service/conduit.rs b/src/service/conduit.rs index 46c77229..4d660286 100644 --- a/src/service/conduit.rs +++ b/src/service/conduit.rs @@ -1,12 +1,14 @@ use crate::{ service::{CONNECT_TIMEOUT, RPC_TIMEOUT}, - Keypair, Result, + Error, Keypair, PublicKey, Result, Sign, }; +use futures::TryFutureExt; use helium_proto::services::{Channel, Endpoint}; use http::Uri; use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; +use tracing::{info, warn}; /// The time between TCP keepalive messages to keep the connection to the packet /// router open. Some load balancer disconnect after a number of seconds. AWS @@ -19,6 +21,7 @@ pub const CONDUIT_CAPACITY: usize = 50; #[derive(Debug)] pub struct ConduitService> { pub uri: Uri, + session_keypair: Option>, conduit: Option>, keypair: Arc, client: C, @@ -39,6 +42,13 @@ pub trait ConduitClient { client_rx: ReceiverStream, keypair: Arc, ) -> Result>; + + async fn mk_session_init( + &self, + nonce: &[u8], + session_key: &PublicKey, + keypair: Arc, + ) -> Result; } impl Conduit { @@ -77,9 +87,10 @@ impl> ConduitService { pub fn new(uri: Uri, client: C, keypair: Arc) -> Self { Self { uri, - conduit: None, keypair, client, + conduit: None, + session_keypair: None, } } @@ -118,6 +129,7 @@ impl> ConduitService { pub fn disconnect(&mut self) { self.conduit = None; + self.session_keypair = None; } pub async fn connect(&mut self) -> Result { @@ -135,4 +147,40 @@ impl> ConduitService { pub fn is_connected(&self) -> bool { self.conduit.is_some() } + + pub fn gateway_key(&self) -> &PublicKey { + self.keypair.public_key() + } + + pub fn session_key(&self) -> Option<&PublicKey> { + self.session_keypair.as_ref().map(|k| k.public_key()) + } + + pub fn session_keypair(&self) -> Option> { + self.session_keypair.clone() + } + + pub async fn session_sign(&self, msg: &mut M) -> Result { + if let Some(keypair) = self.session_keypair.as_ref() { + msg.sign(keypair.clone()).await?; + Ok(()) + } else { + Err(Error::no_session()) + } + } + + pub async fn session_init(&mut self, nonce: &[u8]) -> Result { + let session_keypair = Arc::new(Keypair::new()); + let session_key = session_keypair.public_key(); + let msg = self + .client + .mk_session_init(nonce, session_key, self.keypair.clone()) + .await?; + self.send(msg) + .inspect_err(|err| warn!(%err, "failed to initialize session")) + .await?; + self.session_keypair = Some(session_keypair.clone()); + info!(%session_key, "initialized session"); + Ok(()) + } } diff --git a/src/service/config.rs b/src/service/config.rs index eaeaaae8..d9fcb32e 100644 --- a/src/service/config.rs +++ b/src/service/config.rs @@ -1,10 +1,14 @@ use crate::{ + impl_sign, impl_verify, service::{CONNECT_TIMEOUT, RPC_TIMEOUT}, - sign, verify, Error, KeyedUri, Keypair, Region, RegionParams, Result, + KeyedUri, Keypair, Region, RegionParams, Result, Sign, Verify, }; -use helium_crypto::Verify; use helium_proto::{ - services::{self, iot_config::GatewayRegionParamsReqV1, Channel, Endpoint}, + services::{ + self, + iot_config::{GatewayRegionParamsReqV1, GatewayRegionParamsResV1}, + Channel, Endpoint, + }, Message, }; use std::sync::Arc; @@ -39,10 +43,13 @@ impl ConfigService { address: keypair.public_key().to_vec(), signature: vec![], }; - req.signature = sign(keypair, req.encode_to_vec()).await?; + req.sign(keypair).await?; let resp = self.client.region_params(req).await?.into_inner(); - verify!(&self.uri.pubkey, resp, signature)?; + resp.verify(&self.uri.pubkey)?; Ok(RegionParams::try_from(resp)?) } } + +impl_sign!(GatewayRegionParamsReqV1); +impl_verify!(GatewayRegionParamsResV1); diff --git a/src/service/packet_router.rs b/src/service/packet_router.rs index 571e3dee..80482fe2 100644 --- a/src/service/packet_router.rs +++ b/src/service/packet_router.rs @@ -1,13 +1,14 @@ use crate::{ error::DecodeError, + impl_sign, service::conduit::{ConduitClient, ConduitService}, - sign, Error, Keypair, Result, + Error, Keypair, PublicKey, Result, Sign, }; use helium_proto::{ services::{ router::{ envelope_down_v1, envelope_up_v1, EnvelopeDownV1, EnvelopeUpV1, PacketRouterClient, - PacketRouterRegisterV1, + PacketRouterPacketUpV1, PacketRouterRegisterV1, PacketRouterSessionInitV1, }, Channel, }, @@ -52,15 +53,38 @@ impl ConduitClient for PacketRouterConduitClient { signature: vec![], session_capable: true, }; - msg.signature = sign(keypair.clone(), msg.encode_to_vec()).await?; + msg.sign(keypair.clone()).await?; let msg = EnvelopeUpV1 { data: Some(envelope_up_v1::Data::Register(msg)), }; tx.send(msg).await.map_err(|_| Error::channel())?; Ok(rx) } + + async fn mk_session_init( + &self, + nonce: &[u8], + session_key: &PublicKey, + keypair: Arc, + ) -> Result { + let mut session_init = PacketRouterSessionInitV1 { + gateway: keypair.public_key().into(), + session_key: session_key.into(), + nonce: nonce.to_vec(), + signature: vec![], + }; + session_init.sign(keypair).await?; + let envelope = EnvelopeUpV1 { + data: Some(envelope_up_v1::Data::SessionInit(session_init)), + }; + Ok(envelope) + } } +impl_sign!(PacketRouterRegisterV1); +impl_sign!(PacketRouterPacketUpV1); +impl_sign!(PacketRouterSessionInitV1); + impl std::ops::Deref for PacketRouterService { type Target = ConduitService; fn deref(&self) -> &Self::Target { @@ -85,6 +109,12 @@ impl PacketRouterService { self.0.send(msg).await } + pub async fn send_uplink(&mut self, mut msg: PacketRouterPacketUpV1) -> Result { + self.session_sign(&mut msg).await?; + let msg = envelope_up_v1::Data::Packet(msg); + self.send(msg).await + } + pub async fn recv(&mut self) -> Result> { match self.0.recv().await { Ok(Some(msg)) => match msg.data { diff --git a/src/service/poc.rs b/src/service/poc.rs index 1804763d..5a56478a 100644 --- a/src/service/poc.rs +++ b/src/service/poc.rs @@ -1,14 +1,18 @@ use crate::{ error::DecodeError, + impl_sign, service::conduit::{ConduitClient, ConduitService}, - Keypair, Result, + Keypair, PublicKey, Result, Sign, }; -use helium_proto::services::{ - poc_lora::{ - self, lora_stream_request_v1, lora_stream_response_v1, LoraBeaconReportReqV1, - LoraStreamRequestV1, LoraStreamResponseV1, LoraWitnessReportReqV1, +use helium_proto::{ + services::{ + poc_lora::{ + self, lora_stream_request_v1, lora_stream_response_v1, LoraBeaconReportReqV1, + LoraStreamRequestV1, LoraStreamResponseV1, LoraWitnessReportReqV1, + }, + Channel, }, - Channel, + Message as ProtoMessage, }; use http::Uri; use std::sync::Arc; @@ -39,8 +43,31 @@ impl ConduitClient for PocIotConduitC let rx = client.stream_requests(client_rx).await?.into_inner(); Ok(rx) } + + async fn mk_session_init( + &self, + nonce: &[u8], + session_key: &PublicKey, + keypair: Arc, + ) -> Result { + let mut session_init = poc_lora::LoraStreamSessionInitV1 { + pub_key: keypair.public_key().into(), + session_key: session_key.into(), + nonce: nonce.to_vec(), + signature: vec![], + }; + session_init.sign(keypair).await?; + let envelope = LoraStreamRequestV1 { + request: Some(lora_stream_request_v1::Request::SessionInit(session_init)), + }; + Ok(envelope) + } } +impl_sign!(poc_lora::LoraStreamSessionInitV1); +impl_sign!(poc_lora::LoraBeaconReportReqV1); +impl_sign!(poc_lora::LoraWitnessReportReqV1); + impl std::ops::Deref for PocIotService { type Target = ConduitService; fn deref(&self) -> &Self::Target { @@ -76,12 +103,14 @@ impl PocIotService { } } - pub async fn submit_beacon(&mut self, req: LoraBeaconReportReqV1) -> Result { + pub async fn submit_beacon(&mut self, mut req: LoraBeaconReportReqV1) -> Result { + self.0.session_sign(&mut req).await?; let msg = lora_stream_request_v1::Request::BeaconReport(req); self.send(msg).await } - pub async fn submit_witness(&mut self, req: LoraWitnessReportReqV1) -> Result { + pub async fn submit_witness(&mut self, mut req: LoraWitnessReportReqV1) -> Result { + self.0.session_sign(&mut req).await?; let msg = lora_stream_request_v1::Request::WitnessReport(req); self.send(msg).await }