Skip to content

Commit dc587e9

Browse files
committed
Allow to set Transfer format
1 parent 48236a8 commit dc587e9

File tree

5 files changed

+26
-10
lines changed

5 files changed

+26
-10
lines changed

CHANGES.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [3.4.0] - 2025-01-08
4+
5+
* Allow to set Transfer format
6+
37
## [3.3.1] - 2025-01-02
48

59
* Fix rcv-settle-mode for sender links

Cargo.toml

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-amqp"
3-
version = "3.3.1"
3+
version = "3.4.0"
44
authors = ["ntex contributors <[email protected]>"]
55
description = "AMQP 1.0 Client/Server framework"
66
documentation = "https://docs.rs/ntex-amqp"
@@ -26,7 +26,6 @@ frame-trace = []
2626

2727
[dependencies]
2828
ntex = "2.10"
29-
ntex-io = "2.9.2"
3029
ntex-amqp-codec = "0.9"
3130

3231
bitflags = "2"

src/delivery.rs

+9-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::cell::Cell as StdCell;
33
use ntex::{channel::pool, util::Bytes};
44
use ntex_amqp_codec::protocol::{
55
DeliveryNumber, DeliveryState, Disposition, DispositionInner, Error, ErrorCondition, Handle,
6-
Rejected, Role, TransferBody,
6+
MessageFormat, Rejected, Role, TransferBody,
77
};
88
use ntex_amqp_codec::types::{Str, Symbol};
99

@@ -292,6 +292,7 @@ pub struct DeliveryBuilder {
292292
tag: Option<Bytes>,
293293
settled: bool,
294294
data: TransferBody,
295+
format: Option<MessageFormat>,
295296
sender: Cell<SenderLinkInner>,
296297
}
297298

@@ -300,6 +301,7 @@ impl DeliveryBuilder {
300301
Self {
301302
tag: None,
302303
settled: false,
304+
format: None,
303305
data,
304306
sender,
305307
}
@@ -315,6 +317,11 @@ impl DeliveryBuilder {
315317
self
316318
}
317319

320+
pub fn format(mut self, fmt: MessageFormat) -> Self {
321+
self.format = Some(fmt);
322+
self
323+
}
324+
318325
pub async fn send(self) -> Result<Delivery, AmqpProtocolError> {
319326
let inner = self.sender.get_ref();
320327

@@ -332,7 +339,7 @@ impl DeliveryBuilder {
332339
let (id, tag) = self
333340
.sender
334341
.get_mut()
335-
.send(self.data, self.tag, self.settled)
342+
.send(self.data, self.tag, self.settled, self.format)
336343
.await?;
337344

338345
Ok(Delivery {

src/session.rs

+8-3
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ use slab::Slab;
66

77
use ntex_amqp_codec::protocol::{
88
self as codec, Accepted, Attach, DeliveryNumber, DeliveryState, Detach, Disposition, End,
9-
Error, Flow, Frame, Handle, ReceiverSettleMode, Role, SenderSettleMode, Source, Transfer,
10-
TransferBody, TransferNumber,
9+
Error, Flow, Frame, Handle, MessageFormat, ReceiverSettleMode, Role, SenderSettleMode, Source,
10+
Transfer, TransferBody, TransferNumber,
1111
};
1212
use ntex_amqp_codec::{AmqpFrame, Encode};
1313

@@ -1217,6 +1217,7 @@ impl SessionInner {
12171217
tag: Bytes,
12181218
body: TransferBody,
12191219
settled: bool,
1220+
format: Option<MessageFormat>,
12201221
) -> Result<DeliveryNumber, AmqpProtocolError> {
12211222
loop {
12221223
if self.remote_incoming_window == 0 {
@@ -1247,7 +1248,11 @@ impl SessionInner {
12471248
} else {
12481249
None
12491250
};
1250-
let message_format = body.message_format();
1251+
let message_format = if format.is_none() {
1252+
body.message_format()
1253+
} else {
1254+
format
1255+
};
12511256

12521257
let max_frame_size = self.max_frame_size();
12531258
let max_frame_size = if max_frame_size > 2048 {

src/sndlink.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use std::{collections::VecDeque, future::Future};
33
use ntex::channel::{condition, oneshot, pool};
44
use ntex::util::{BufMut, ByteString, Bytes, Either, PoolRef, Ready};
55
use ntex_amqp_codec::protocol::{
6-
self as codec, Attach, DeliveryNumber, Error, Flow, ReceiverSettleMode, Role, SenderSettleMode,
7-
SequenceNo, Target, TerminusDurability, TerminusExpiryPolicy, TransferBody,
6+
self as codec, Attach, DeliveryNumber, Error, Flow, MessageFormat, ReceiverSettleMode, Role,
7+
SenderSettleMode, SequenceNo, Target, TerminusDurability, TerminusExpiryPolicy, TransferBody,
88
};
99

1010
use crate::delivery::DeliveryBuilder;
@@ -321,6 +321,7 @@ impl SenderLinkInner {
321321
body: T,
322322
tag: Option<Bytes>,
323323
settled: bool,
324+
format: Option<MessageFormat>,
324325
) -> Result<(DeliveryNumber, Bytes), AmqpProtocolError> {
325326
if let Some(ref err) = self.error {
326327
Err(err.clone())
@@ -355,7 +356,7 @@ impl SenderLinkInner {
355356
.session
356357
.inner
357358
.get_mut()
358-
.send_transfer(self.id as u32, tag.clone(), body, settled)
359+
.send_transfer(self.id as u32, tag.clone(), body, settled, format)
359360
.await?;
360361

361362
Ok((id, tag))

0 commit comments

Comments
 (0)