Skip to content

Commit

Permalink
Use an lru cache to drop duplicate beacons (#453)
Browse files Browse the repository at this point in the history
* Use an lru cache to drop duplicate beacons - default
  • Loading branch information
madninja authored Aug 31, 2023
1 parent d5801ab commit 5017a30
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 39 deletions.
1 change: 1 addition & 0 deletions lorawan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use bytes::{Buf, BufMut, Bytes};
use std::{convert::From, fmt, mem::size_of, result};

pub mod error;
pub use bytes;
pub use error::LoraWanError;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down
86 changes: 67 additions & 19 deletions src/beaconer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
//! This module provides proof-of-coverage (PoC) beaconing support.

use crate::{
error::DecodeError,
gateway::{self, BeaconResp},
message_cache::MessageCache,
region_watcher,
service::{entropy::EntropyService, poc::PocIotService},
settings::Settings,
Expand Down Expand Up @@ -55,8 +57,8 @@ pub struct Beaconer {
interval: Duration,
// Time next beacon attempt is o be made
next_beacon_time: Instant,
/// The last beacon that was transitted
last_beacon: Option<beacon::Beacon>,
/// Last seen beacons
last_seen: MessageCache<Vec<u8>>,
/// Use for channel plan and FR parameters
region_params: RegionParams,
poc_ingest_uri: Uri,
Expand All @@ -83,7 +85,7 @@ impl Beaconer {
messages,
region_watch,
interval,
last_beacon: None,
last_seen: MessageCache::new(15),
// Set a beacon at least an interval out... arrival of region_params
// will recalculate this time and no arrival of region_params will
// cause the beacon to not occur
Expand Down Expand Up @@ -157,7 +159,11 @@ impl Beaconer {
///
/// See [`gateway::MessageSender::transmit_beacon`]
pub async fn send_beacon(&self, beacon: beacon::Beacon) -> Result<beacon::Beacon> {
let beacon_id = beacon.beacon_id();
let beacon_id = beacon
.beacon_data()
.map(|data| data.to_b64())
.ok_or_else(DecodeError::not_beacon)?;

info!(beacon_id, "transmitting beacon");

let (powe, tmst) = self
Expand Down Expand Up @@ -205,8 +211,10 @@ impl Beaconer {
async fn mk_witness_report(
&self,
packet: PacketUp,
payload: Vec<u8>,
) -> Result<poc_lora::LoraWitnessReportReqV1> {
let mut report = poc_lora::LoraWitnessReportReqV1::try_from(packet)?;
report.data = payload;
report.pub_key = self.keypair.public_key().to_vec();
report.signature = sign(self.keypair.clone(), report.encode_to_vec()).await?;
Ok(report)
Expand All @@ -230,28 +238,39 @@ impl Beaconer {
.await;

self.next_beacon_time = next_beacon_time;
self.last_beacon = last_beacon;

if let Some(data) = last_beacon.beacon_data() {
self.last_seen.tag_now(data);
}
}

async fn handle_received_beacon(&mut self, packet: PacketUp) {
// Check if poc reporting is disabled
if self.disabled {
return;
}
if let Some(last_beacon) = &self.last_beacon {
if packet.payload() == last_beacon.data {
info!("ignoring last self beacon witness");
return;
}

// Check that there is beacon data present
let Some(beacon_data) = packet.beacon_data() else {
warn!("ignoring invalid received beacon");
return;
};

let beacon_id = beacon_data.to_b64();

// Check if we've seen this beacon before
if self.last_seen.tag_now(beacon_data.clone()) {
info!(%beacon_id, "ignoring duplicate or self beacon witness");
return;
}

// Construct concurrent futures for connecting to the poc ingester and
// signing the report
let report_fut = self.mk_witness_report(packet);
let report_fut = self.mk_witness_report(packet, beacon_data);
let service_fut = PocIotService::connect(self.poc_ingest_uri.clone());

match tokio::try_join!(report_fut, service_fut) {
Ok((report, mut poc_service)) => {
let beacon_id = report.data.to_b64();
let _ = poc_service
.submit_witness(report)
.inspect_err(|err| warn!(beacon_id, %err, "submit poc witness report"))
Expand Down Expand Up @@ -279,12 +298,41 @@ impl Beaconer {
}
}

#[test]
fn test_beacon_roundtrip() {
use lorawan::PHYPayload;
trait BeaconData {
fn beacon_data(&self) -> Option<Vec<u8>>;
}

let phy_payload_a = PHYPayload::proprietary(b"poc_beacon_data");
let payload: Vec<u8> = phy_payload_a.clone().try_into().expect("beacon packet");
let phy_payload_b = PHYPayload::read(lorawan::Direction::Uplink, &mut &payload[..]).unwrap();
assert_eq!(phy_payload_a, phy_payload_b);
impl BeaconData for PacketUp {
fn beacon_data(&self) -> Option<Vec<u8>> {
match PacketUp::parse_frame(lorawan::Direction::Uplink, self.payload()) {
Ok(lorawan::PHYPayloadFrame::Proprietary(payload)) => Some(payload.into()),
_ => None,
}
}
}

impl BeaconData for beacon::Beacon {
fn beacon_data(&self) -> Option<Vec<u8>> {
Some(self.data.clone())
}
}

impl BeaconData for Option<beacon::Beacon> {
fn beacon_data(&self) -> Option<Vec<u8>> {
self.as_ref().and_then(|beacon| beacon.beacon_data())
}
}

#[cfg(test)]
mod test {
#[test]
fn test_beacon_roundtrip() {
use lorawan::PHYPayload;

let phy_payload_a = PHYPayload::proprietary(b"poc_beacon_data");
let payload: Vec<u8> = phy_payload_a.clone().try_into().expect("beacon packet");
let phy_payload_b =
PHYPayload::read(lorawan::Direction::Uplink, &mut &payload[..]).unwrap();
assert_eq!(phy_payload_a, phy_payload_b);
}
}
86 changes: 72 additions & 14 deletions src/message_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@ use std::{
time::{Duration, Instant},
};

pub struct MessageCache<T> {
waiting: VecDeque<CacheMessage<T>>,
#[derive(Debug)]
pub struct MessageCache<T: PartialEq> {
cache: VecDeque<CacheMessage<T>>,
max_messages: u16,
}

#[derive(Debug, Clone)]
pub struct CacheMessage<T> {
pub struct CacheMessage<T: PartialEq> {
received: Instant,
message: T,
}

impl<T> CacheMessage<T> {
impl<T: PartialEq> CacheMessage<T> {
pub fn new(message: T, received: Instant) -> Self {
Self { message, received }
}
Expand All @@ -25,19 +26,19 @@ impl<T> CacheMessage<T> {
}
}

impl<T> Deref for CacheMessage<T> {
impl<T: PartialEq> Deref for CacheMessage<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.message
}
}

impl<T> MessageCache<T> {
impl<T: PartialEq> MessageCache<T> {
pub fn new(max_messages: u16) -> Self {
let waiting = VecDeque::new();
Self {
waiting,
cache: waiting,
max_messages,
}
}
Expand All @@ -48,13 +49,36 @@ impl<T> MessageCache<T> {
///
/// Pushing a packet onto the back of a full cache will cause the oldest
/// (first) message in the cache to be dropped.
pub fn push_back(&mut self, message: T, received: Instant) {
self.waiting.push_back(CacheMessage::new(message, received));
pub fn push_back(&mut self, message: T, received: Instant) -> Option<CacheMessage<T>> {
self.cache.push_back(CacheMessage::new(message, received));
if self.len() > self.max_messages as usize {
self.waiting.pop_front();
self.cache.pop_front()
} else {
None
}
}

/// Returns the index of the first matching message in the cache or None if
/// not present
pub fn index_of(&self, message: &T) -> Option<usize> {
self.cache.iter().position(|m| m.message == *message)
}

/// Promotes the given message to the back of the queue, effectively
/// recreating an LRU cache. Returns true if a cache hit was found
pub fn tag(&mut self, message: T, received: Instant) -> bool {
let result = self
.index_of(&message)
.and_then(|index| self.cache.remove(index))
.is_some();
self.push_back(message, received);
result
}

pub fn tag_now(&mut self, message: T) -> bool {
self.tag(message, Instant::now())
}

/// Pushes a CacheMessage back on the front of the queue. This is useful to
/// push a packet back at the front after a failed delivery attempt.
///
Expand All @@ -64,13 +88,13 @@ impl<T> MessageCache<T> {
if self.len() > self.max_messages as usize {
return;
}
self.waiting.push_front(cache_message);
self.cache.push_front(cache_message);
}

pub fn pop_front(&mut self, duration: Duration) -> (usize, Option<CacheMessage<T>>) {
let mut dropped = 0;
let mut front = None;
while let Some(msg) = self.waiting.pop_front() {
while let Some(msg) = self.cache.pop_front() {
if msg.hold_time() <= duration {
front = Some(msg);
break;
Expand All @@ -81,11 +105,45 @@ impl<T> MessageCache<T> {
(dropped, front)
}

/// Returns a reference to the first (and oldest/first to be removed)
/// message in the cache
pub fn peek_front(&self) -> Option<&CacheMessage<T>> {
self.cache.front()
}

pub fn len(&self) -> usize {
self.waiting.len()
self.cache.len()
}

pub fn is_empty(&self) -> bool {
self.waiting.is_empty()
self.cache.is_empty()
}
}

#[cfg(test)]
mod test {
use super::MessageCache;

#[test]
fn test_cache_tagging() {
let mut cache = MessageCache::<Vec<u8>>::new(2);

// First should trigger a "not in cache"
assert!(!cache.tag_now(vec![1]));
// Second should trigger a "not in cache" and make the first least
// recently used
assert!(!cache.tag_now(vec![2]));
// Second tag should promote the old entry but remove none
assert!(cache.tag_now(vec![1]));
assert_eq!(
cache.peek_front().map(|entry| entry.message.as_ref()),
Some([2u8].as_ref())
);

// Third tag should evict the least recently used entry (2)
assert!(!cache.tag_now(vec![3]));
assert_eq!(Some(0), cache.index_of(&vec![1u8]));
assert_eq!(Some(1), cache.index_of(&vec![3u8]));
assert!(cache.index_of(&vec![2u8]).is_none());
}
}
8 changes: 2 additions & 6 deletions src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{
time::{SystemTime, UNIX_EPOCH},
};

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct PacketUp(PacketRouterPacketUpV1);

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -65,12 +65,8 @@ impl fmt::Display for PacketUp {
impl TryFrom<PacketUp> for poc_lora::LoraWitnessReportReqV1 {
type Error = Error;
fn try_from(value: PacketUp) -> Result<Self> {
let payload = match PacketUp::parse_frame(Direction::Uplink, value.payload()) {
Ok(PHYPayloadFrame::Proprietary(payload)) => payload,
_ => return Err(DecodeError::not_beacon()),
};
let report = poc_lora::LoraWitnessReportReqV1 {
data: payload.to_vec(),
data: vec![],
tmst: value.0.timestamp as u32,
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
Expand Down

0 comments on commit 5017a30

Please sign in to comment.