Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 010a340

Browse files
committedJan 29, 2025·
Rename DeliveryBuilder to TransferBuilder
1 parent 009407d commit 010a340

File tree

8 files changed

+45
-34
lines changed

8 files changed

+45
-34
lines changed
 

‎CHANGES.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [3.5.0] - 2025-01-29
4+
5+
* Rename DeliveryBuilder to TransferBuilder
6+
37
## [3.4.0] - 2025-01-08
48

59
* Allow to set Transfer format

‎Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-amqp"
3-
version = "3.4.0"
3+
version = "3.5.0"
44
authors = ["ntex contributors <team@ntex.rs>"]
55
description = "AMQP 1.0 Client/Server framework"
66
documentation = "https://docs.rs/ntex-amqp"

‎src/client/connector.rs

+9-11
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,10 @@ where
140140
{
141141
/// Connect to amqp server
142142
pub async fn connect(&self, address: A) -> Result<Client, ConnectError> {
143-
let fut = timeout_checked(self.config.handshake_timeout, self._connect(address));
144-
match fut.await {
145-
Ok(res) => res.map_err(From::from),
146-
Err(_) => Err(ConnectError::HandshakeTimeout),
147-
}
143+
timeout_checked(self.config.handshake_timeout, self._connect(address))
144+
.await
145+
.map_err(|_| ConnectError::HandshakeTimeout)
146+
.and_then(|res| res)
148147
}
149148

150149
/// Negotiate amqp protocol over opened socket
@@ -168,14 +167,13 @@ where
168167

169168
/// Connect to amqp server
170169
pub async fn connect_sasl(&self, addr: A, auth: SaslAuth) -> Result<Client, ConnectError> {
171-
let fut = timeout_checked(
170+
timeout_checked(
172171
self.config.handshake_timeout,
173172
self._connect_sasl(addr, auth),
174-
);
175-
match fut.await {
176-
Ok(res) => res.map_err(From::from),
177-
Err(_) => Err(ConnectError::HandshakeTimeout),
178-
}
173+
)
174+
.await
175+
.map_err(|_| ConnectError::HandshakeTimeout)
176+
.and_then(|res| res)
179177
}
180178

181179
/// Negotiate amqp sasl protocol over opened socket

‎src/delivery.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -288,15 +288,15 @@ impl Drop for DeliveryInner {
288288
}
289289
}
290290

291-
pub struct DeliveryBuilder {
291+
pub struct TransferBuilder {
292292
tag: Option<Bytes>,
293293
settled: bool,
294294
data: TransferBody,
295295
format: Option<MessageFormat>,
296296
sender: Cell<SenderLinkInner>,
297297
}
298298

299-
impl DeliveryBuilder {
299+
impl TransferBuilder {
300300
pub(crate) fn new(data: TransferBody, sender: Cell<SenderLinkInner>) -> Self {
301301
Self {
302302
tag: None,

‎src/dispatcher.rs

+9-11
Original file line numberDiff line numberDiff line change
@@ -139,16 +139,14 @@ where
139139
}
140140

141141
// handle idle timeout
142-
if self.idle_timeout.non_zero() {
143-
if self.idle_sleep.poll_elapsed(cx).is_ready() {
144-
log::trace!(
145-
"{}: Send keep-alive ping, timeout: {:?} secs",
146-
self.sink.tag(),
147-
self.idle_timeout
148-
);
149-
self.sink.post_frame(AmqpFrame::new(0, Frame::Empty));
150-
self.idle_sleep.reset(self.idle_timeout);
151-
}
142+
if self.idle_timeout.non_zero() && self.idle_sleep.poll_elapsed(cx).is_ready() {
143+
log::trace!(
144+
"{}: Send keep-alive ping, timeout: {:?} secs",
145+
self.sink.tag(),
146+
self.idle_timeout
147+
);
148+
self.sink.post_frame(AmqpFrame::new(0, Frame::Empty));
149+
self.idle_sleep.reset(self.idle_timeout);
152150
}
153151

154152
Ok(())
@@ -391,7 +389,7 @@ pin_project_lite::pin_project! {
391389
}
392390
}
393391

394-
impl<'f, F, E> Future for ServiceResult<'f, F, E>
392+
impl<F, E> Future for ServiceResult<'_, F, E>
395393
where
396394
F: Future<Output = Result<(), E>>,
397395
E: Into<Error>,

‎src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub mod types;
2727

2828
pub use self::connection::{Connection, ConnectionRef};
2929
pub use self::control::{ControlFrame, ControlFrameKind};
30-
pub use self::delivery::{Delivery, DeliveryBuilder};
30+
pub use self::delivery::{Delivery, TransferBuilder};
3131
pub use self::rcvlink::{ReceiverLink, ReceiverLinkBuilder};
3232
pub use self::session::Session;
3333
pub use self::sndlink::{SenderLink, SenderLinkBuilder};

‎src/sndlink.rs

+13-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use ntex_amqp_codec::protocol::{
77
SenderSettleMode, SequenceNo, Target, TerminusDurability, TerminusExpiryPolicy, TransferBody,
88
};
99

10-
use crate::delivery::DeliveryBuilder;
10+
use crate::delivery::TransferBuilder;
1111
use crate::session::{Session, SessionInner};
1212
use crate::{cell::Cell, error::AmqpProtocolError, Handle};
1313

@@ -120,12 +120,22 @@ impl SenderLink {
120120
self.inner.get_ref().error.as_ref()
121121
}
122122

123+
#[doc(hidden)]
124+
#[deprecated]
123125
/// Start delivery process
124-
pub fn delivery<T>(&self, body: T) -> DeliveryBuilder
126+
pub fn delivery<T>(&self, body: T) -> TransferBuilder
125127
where
126128
T: Into<TransferBody>,
127129
{
128-
DeliveryBuilder::new(body.into(), self.inner.clone())
130+
self.transfer(body)
131+
}
132+
133+
/// Start delivery process
134+
pub fn transfer<T>(&self, body: T) -> TransferBuilder
135+
where
136+
T: Into<TransferBody>,
137+
{
138+
TransferBuilder::new(body.into(), self.inner.clone())
129139
}
130140

131141
/// Close sender link

‎tests/test_server.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,15 @@ async fn test_simple() -> std::io::Result<()> {
7171
.await
7272
.unwrap();
7373
let delivery = link
74-
.delivery(Bytes::from(b"test".as_ref()))
74+
.transfer(Bytes::from(b"test".as_ref()))
7575
.send()
7676
.await
7777
.unwrap();
7878
let st = delivery.wait().await.unwrap().unwrap();
7979
assert_eq!(st, protocol::DeliveryState::Accepted(protocol::Accepted {}));
8080

8181
let delivery = link
82-
.delivery(Bytes::from(b"test".as_ref()))
82+
.transfer(Bytes::from(b"test".as_ref()))
8383
.settled()
8484
.send()
8585
.await
@@ -146,7 +146,7 @@ async fn test_large_transfer() -> std::io::Result<()> {
146146
.unwrap();
147147

148148
let delivery = link
149-
.delivery(Bytes::from(data.clone()))
149+
.transfer(Bytes::from(data.clone()))
150150
.send()
151151
.await
152152
.unwrap();
@@ -274,7 +274,7 @@ async fn test_session_end() -> std::io::Result<()> {
274274
.await
275275
.unwrap();
276276
let _delivery = link
277-
.delivery(Bytes::from(b"test".as_ref()))
277+
.transfer(Bytes::from(b"test".as_ref()))
278278
.send()
279279
.await
280280
.unwrap();
@@ -512,7 +512,8 @@ async fn test_drop_delivery_on_link_detach() -> std::io::Result<()> {
512512
.unwrap();
513513

514514
let delivery = link
515-
.delivery(Bytes::from(b"test".as_ref()))
515+
.transfer(Bytes::from(b"test".as_ref()))
516+
.format(1)
516517
.send()
517518
.await
518519
.unwrap();

0 commit comments

Comments
 (0)
Please sign in to comment.