From d1ecadce3ece362afd183944cdb8fb7d1caeba98 Mon Sep 17 00:00:00 2001 From: Johannes Barthel Date: Wed, 22 May 2024 11:09:37 +0200 Subject: [PATCH] add 'latching' field to connection header --- rosrust/src/tcpros/publisher.rs | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/rosrust/src/tcpros/publisher.rs b/rosrust/src/tcpros/publisher.rs index f54169f..38b1b46 100644 --- a/rosrust/src/tcpros/publisher.rs +++ b/rosrust/src/tcpros/publisher.rs @@ -9,6 +9,7 @@ use error_chain::bail; use log::error; use std::collections::HashMap; use std::net::{TcpListener, TcpStream, ToSocketAddrs}; +use std::sync::atomic::AtomicBool; use std::sync::{atomic, Arc, Mutex}; use std::os::fd::AsRawFd; @@ -20,7 +21,8 @@ pub struct Publisher { queue_size: usize, exists: Arc, raw_fd : i32, - join_handle: Option> + join_handle: Option>, + latching : Arc } impl Drop for Publisher { @@ -62,6 +64,7 @@ fn write_response( caller_id: &str, topic: &str, message_description: &RawMessageDescription, + latching : bool, ) -> Result<()> { let mut fields = HashMap::::new(); fields.insert(String::from("md5sum"), message_description.md5sum.clone()); @@ -72,6 +75,8 @@ fn write_response( String::from("message_definition"), message_description.msg_definition.clone(), ); + + fields.insert(String::from("latching"), String::from(if latching {"1"} else {"0"})); header::encode(&mut stream, &fields)?; Ok(()) } @@ -81,12 +86,13 @@ fn exchange_headers( topic: &str, pub_caller_id: &str, message_description: &RawMessageDescription, + latching : bool, ) -> Result where U: std::io::Write + std::io::Read, { let caller_id = read_request(&mut stream, topic, message_description)?; - write_response(&mut stream, pub_caller_id, topic, message_description)?; + write_response(&mut stream, pub_caller_id, topic, message_description, latching)?; Ok(caller_id) } @@ -97,11 +103,12 @@ fn process_subscriber( last_message: &Mutex>>, pub_caller_id: &str, message_description: &RawMessageDescription, + latching : bool, ) -> tcpconnection::Feedback where U: std::io::Read + std::io::Write + Send, { - let result = exchange_headers(&mut stream, topic, pub_caller_id, message_description) + let result = exchange_headers(&mut stream, topic, pub_caller_id, message_description, latching) .chain_err(|| ErrorKind::TopicConnectionFail(topic.into())); let caller_id = match result { Ok(caller_id) => caller_id, @@ -152,12 +159,15 @@ impl Publisher { let (targets, data) = fork(queue_size); let last_message = Arc::new(Mutex::new(Arc::new(Vec::new()))); + let latching = Arc::new(AtomicBool::new(false)); + let iterate_handler = { let publisher_exists = publisher_exists.clone(); let topic = String::from(topic); let last_message = Arc::clone(&last_message); let caller_id = String::from(caller_id); let message_description = message_description.clone(); + let latching = latching.clone(); move |stream: TcpStream| { if !publisher_exists.load(atomic::Ordering::SeqCst) { @@ -170,6 +180,7 @@ impl Publisher { &last_message, &caller_id, &message_description, + latching.load(atomic::Ordering::SeqCst) ) } }; @@ -190,7 +201,8 @@ impl Publisher { queue_size, exists: publisher_exists, raw_fd, - join_handle + join_handle, + latching }) } @@ -199,7 +211,7 @@ impl Publisher { queue_size: usize, message_description: RawMessageDescription, ) -> Result> { - let mut stream = PublisherStream::new(self, message_description)?; + let mut stream = PublisherStream::new(self, message_description, Arc::clone(&self.latching))?; stream.set_queue_size_max(queue_size); Ok(stream) } @@ -218,13 +230,14 @@ pub struct PublisherStream { stream: DataStream, last_message: Arc>>>, datatype: std::marker::PhantomData, - latching: bool, + latching: Arc, } impl PublisherStream { fn new( publisher: &Publisher, message_description: RawMessageDescription, + latching : Arc ) -> Result> { if publisher.topic.msg_type != message_description.msg_type { bail!(ErrorKind::MessageTypeMismatch( @@ -236,7 +249,7 @@ impl PublisherStream { stream: publisher.subscriptions.clone(), datatype: std::marker::PhantomData, last_message: Arc::clone(&publisher.last_message), - latching: false, + latching, }; stream.set_queue_size_max(publisher.queue_size); Ok(stream) @@ -254,7 +267,7 @@ impl PublisherStream { #[inline] pub fn set_latching(&mut self, latching: bool) { - self.latching = latching; + self.latching.store(latching, atomic::Ordering::SeqCst); } #[inline] @@ -270,7 +283,7 @@ impl PublisherStream { pub fn send(&self, message: &T) -> Result<()> { let bytes = Arc::new(message.encode_vec()?); - if self.latching { + if self.latching.load(atomic::Ordering::SeqCst) { *self.last_message.lock().expect(FAILED_TO_LOCK) = Arc::clone(&bytes); }