Skip to content

Commit 2562a12

Browse files
committed
Fix control queue handling
1 parent 6513599 commit 2562a12

File tree

6 files changed

+268
-191
lines changed

6 files changed

+268
-191
lines changed

CHANGES.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [3.2.0] - 2024-12-03
4+
5+
* Fix control queue handling
6+
37
## [3.1.0] - 2024-12-01
48

59
* Set "next_incoming_id" for Flow frame

Cargo.toml

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-amqp"
3-
version = "3.1.0"
3+
version = "3.2.0"
44
authors = ["ntex contributors <[email protected]>"]
55
description = "AMQP 1.0 Client/Server framework"
66
documentation = "https://docs.rs/ntex-amqp"
@@ -25,7 +25,8 @@ default = []
2525
frame-trace = []
2626

2727
[dependencies]
28-
ntex = "2"
28+
ntex = "2.9"
29+
ntex-util = "2.7"
2930
ntex-amqp-codec = "0.9"
3031

3132
bitflags = "2"

src/connection.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use ntex::util::{HashMap, PoolRef, Ready};
66

77
use crate::codec::protocol::{self as codec, Begin, Close, End, Error, Frame, Role};
88
use crate::codec::{AmqpCodec, AmqpFrame};
9-
use crate::dispatcher::ControlQueue;
9+
use crate::control::ControlQueue;
1010
use crate::session::{Session, SessionInner, INITIAL_NEXT_OUTGOING_ID};
1111
use crate::sndlink::{SenderLink, SenderLinkInner};
1212
use crate::{cell::Cell, error::AmqpProtocolError, types::Action, Configuration};

src/control.rs

+15-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use std::{fmt, io};
1+
use std::{cell::RefCell, collections::VecDeque, fmt, io};
22

3-
use ntex::util::Either;
3+
use ntex::{task::LocalWaker, util::Either};
44
use ntex_amqp_codec::protocol;
55

66
use crate::cell::Cell;
@@ -72,3 +72,16 @@ impl ControlFrame {
7272
self.0.get_ref().session.clone().map(Session::new)
7373
}
7474
}
75+
76+
#[derive(Default, Debug)]
77+
pub(crate) struct ControlQueue {
78+
pub(crate) pending: RefCell<VecDeque<ControlFrame>>,
79+
pub(crate) waker: LocalWaker,
80+
}
81+
82+
impl ControlQueue {
83+
pub(crate) fn enqueue_frame(&self, frame: ControlFrame) {
84+
self.pending.borrow_mut().push_back(frame);
85+
self.waker.wake();
86+
}
87+
}

0 commit comments

Comments
 (0)