Skip to content

Commit 65a8463

Browse files
committed
Use ntex-service 3.0
1 parent 766c4a8 commit 65a8463

File tree

5 files changed

+119
-104
lines changed

5 files changed

+119
-104
lines changed

CHANGES.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [3.0.0] - 2024-05-28
4+
5+
* Use ntex-service 3.0
6+
37
## [2.1.7] - 2024-05-12
48

59
* Cleanup pending transfers and deliveries on link detach

Cargo.toml

+3-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-amqp"
3-
version = "2.1.7"
3+
version = "3.0.0"
44
authors = ["ntex contributors <[email protected]>"]
55
description = "AMQP 1.0 Client/Server framework"
66
documentation = "https://docs.rs/ntex-amqp"
@@ -24,8 +24,7 @@ default = []
2424
frame-trace = []
2525

2626
[dependencies]
27-
ntex = "1"
28-
ntex-util = "1.0.1"
27+
ntex = "2"
2928
ntex-amqp-codec = "0.9"
3029

3130
bitflags = "2"
@@ -38,7 +37,7 @@ uuid = { version = "1", features = ["v4"] }
3837
[dev-dependencies]
3938
env_logger = "0.11"
4039
rand = "0.8"
41-
ntex = { version = "1", features = ["tokio"] }
40+
ntex = { version = "2", features = ["tokio"] }
4241
ntex-amqp = { path = ".", features = ["frame-trace"] }
4342

4443
[patch.crates-io]

src/dispatcher.rs

+71-81
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
use std::collections::VecDeque;
2-
use std::{cell, future::Future, marker, pin::Pin, rc::Rc, task::Context, task::Poll};
2+
use std::{
3+
cell, future::poll_fn, future::Future, marker, pin::Pin, rc::Rc, task::Context, task::Poll,
4+
};
35

4-
use ntex::service::{Pipeline, PipelineCall, Service, ServiceCtx};
6+
use ntex::service::{Pipeline, PipelineBinding, PipelineCall, Service, ServiceCtx};
57
use ntex::time::{sleep, Millis, Sleep};
6-
use ntex::util::{ready, BoxFuture, Either};
8+
use ntex::util::{ready, Either};
79
use ntex::{io::DispatchItem, rt::spawn, task::LocalWaker};
810

911
use crate::codec::{protocol::Frame, AmqpCodec, AmqpFrame};
@@ -24,13 +26,12 @@ impl ControlQueue {
2426
}
2527

2628
/// Amqp server dispatcher service.
27-
pub(crate) struct Dispatcher<Sr, Ctl: Service<ControlFrame>> {
29+
pub(crate) struct Dispatcher<Sr: Service<types::Message>, Ctl: Service<ControlFrame>> {
2830
sink: Connection,
29-
service: Pipeline<Sr>,
30-
ctl_service: Pipeline<Ctl>,
31+
service: PipelineBinding<Sr, types::Message>,
32+
ctl_service: PipelineBinding<Ctl, ControlFrame>,
3133
ctl_fut: cell::RefCell<Vec<(ControlFrame, PipelineCall<Ctl, ControlFrame>)>>,
3234
ctl_queue: Rc<ControlQueue>,
33-
shutdown: cell::RefCell<Option<BoxFuture<'static, ()>>>,
3435
expire: Sleep,
3536
idle_timeout: Millis,
3637
}
@@ -51,17 +52,16 @@ where
5152
Dispatcher {
5253
sink,
5354
idle_timeout,
54-
service,
55-
ctl_service,
5655
ctl_queue,
56+
service: service.bind(),
57+
ctl_service: ctl_service.bind(),
5758
ctl_fut: cell::RefCell::new(Vec::new()),
58-
shutdown: cell::RefCell::new(None),
5959
expire: sleep(idle_timeout),
6060
}
6161
}
6262

6363
fn call_control_service(&self, frame: ControlFrame) {
64-
let fut = self.ctl_service.call_static(frame.clone());
64+
let fut = self.ctl_service.call(frame.clone());
6565
self.ctl_fut.borrow_mut().push((frame, fut));
6666
self.ctl_queue.waker.wake();
6767
}
@@ -152,7 +152,7 @@ where
152152
let frm = frm.clone();
153153
let fut = self
154154
.service
155-
.call_static(types::Message::Attached(frm.clone(), link.clone()));
155+
.call(types::Message::Attached(frm.clone(), link.clone()));
156156
let _ = ntex::rt::spawn(async move {
157157
let result = fut.await;
158158
if let Err(err) = result {
@@ -200,80 +200,72 @@ where
200200
type Response = Option<AmqpFrame>;
201201
type Error = AmqpDispatcherError;
202202

203-
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
204-
self.ctl_queue.waker.register(cx.waker());
203+
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
204+
poll_fn(|cx| {
205+
self.ctl_queue.waker.register(cx.waker());
205206

206-
// idle timeout
207-
self.handle_idle_timeout(cx);
207+
// idle timeout
208+
self.handle_idle_timeout(cx);
208209

209-
// process control frame
210-
let mut control_fut_pending = !self.handle_control_fut(cx)?;
210+
// process control frame
211+
let mut control_fut_pending = !self.handle_control_fut(cx)?;
211212

212-
// check readiness
213-
let service_poll = self.service.poll_ready(cx).map_err(|err| {
214-
let err = Error::from(err);
215-
log::error!(
216-
"{}: Publish service readiness check failed: {:?}",
217-
self.sink.tag(),
218-
err
219-
);
220-
let _ = self.sink.close_with_error(err);
221-
AmqpDispatcherError::Service
222-
})?;
213+
// check readiness
214+
let service_poll = self.service.poll_ready(cx).map_err(|err| {
215+
let err = Error::from(err);
216+
log::error!(
217+
"{}: Publish service readiness check failed: {:?}",
218+
self.sink.tag(),
219+
err
220+
);
221+
let _ = self.sink.close_with_error(err);
222+
AmqpDispatcherError::Service
223+
})?;
223224

224-
let ctl_service_poll = self.ctl_service.poll_ready(cx).map_err(|err| {
225-
let err = Error::from(err);
226-
log::error!(
227-
"{}: Control service readiness check failed: {:?}",
228-
self.sink.tag(),
229-
err
230-
);
231-
let _ = self.sink.close_with_error(err);
232-
AmqpDispatcherError::Service
233-
})?;
225+
let ctl_service_poll = self.ctl_service.poll_ready(cx).map_err(|err| {
226+
let err = Error::from(err);
227+
log::error!(
228+
"{}: Control service readiness check failed: {:?}",
229+
self.sink.tag(),
230+
err
231+
);
232+
let _ = self.sink.close_with_error(err);
233+
AmqpDispatcherError::Service
234+
})?;
234235

235-
// enqueue pending control frames
236-
if ctl_service_poll.is_ready() && !self.ctl_queue.pending.borrow().is_empty() {
237-
self.ctl_queue
238-
.pending
239-
.borrow_mut()
240-
.drain(..)
241-
.for_each(|frame| {
242-
self.call_control_service(frame);
243-
});
244-
control_fut_pending = true;
245-
}
236+
// enqueue pending control frames
237+
if ctl_service_poll.is_ready() && !self.ctl_queue.pending.borrow().is_empty() {
238+
self.ctl_queue
239+
.pending
240+
.borrow_mut()
241+
.drain(..)
242+
.for_each(|frame| {
243+
self.call_control_service(frame);
244+
});
245+
control_fut_pending = true;
246+
}
246247

247-
if control_fut_pending || service_poll.is_pending() || ctl_service_poll.is_pending() {
248-
Poll::Pending
249-
} else {
250-
Poll::Ready(Ok(()))
251-
}
248+
if control_fut_pending || service_poll.is_pending() || ctl_service_poll.is_pending() {
249+
Poll::Pending
250+
} else {
251+
Poll::Ready(Ok(()))
252+
}
253+
})
254+
.await
252255
}
253256

254-
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
255-
let mut shutdown = self.shutdown.borrow_mut();
256-
if !shutdown.is_some() {
257-
self.sink
258-
.0
259-
.get_mut()
260-
.set_error(AmqpProtocolError::Disconnected);
261-
let fut = self
262-
.ctl_service
263-
.call_static(ControlFrame::new_kind(ControlFrameKind::Closed));
264-
*shutdown = Some(Box::pin(async move {
265-
let _ = fut.await;
266-
}));
267-
}
257+
async fn shutdown(&self) {
258+
self.sink
259+
.0
260+
.get_mut()
261+
.set_error(AmqpProtocolError::Disconnected);
262+
let _ = self
263+
.ctl_service
264+
.call(ControlFrame::new_kind(ControlFrameKind::Closed))
265+
.await;
268266

269-
let res0 = shutdown.as_mut().expect("guard above").as_mut().poll(cx);
270-
let res1 = self.service.poll_shutdown(cx);
271-
let res2 = self.ctl_service.poll_shutdown(cx);
272-
if res0.is_pending() || res1.is_pending() || res2.is_pending() {
273-
Poll::Pending
274-
} else {
275-
Poll::Ready(())
276-
}
267+
self.service.shutdown().await;
268+
self.ctl_service.shutdown().await;
277269
}
278270

279271
async fn call(
@@ -334,7 +326,7 @@ where
334326
}
335327
types::Action::DetachReceiver(link, frm) => {
336328
let lnk = link.clone();
337-
let fut = self.service.call_static(types::Message::Detached(lnk));
329+
let fut = self.service.call(types::Message::Detached(lnk));
338330
let _ = spawn(async move {
339331
let _ = fut.await;
340332
});
@@ -352,9 +344,7 @@ where
352344
})
353345
.collect();
354346

355-
let fut = self
356-
.service
357-
.call_static(types::Message::DetachedAll(receivers));
347+
let fut = self.service.call(types::Message::DetachedAll(receivers));
358348
let _ = spawn(async move {
359349
let _ = fut.await;
360350
});

src/router.rs

+22-15
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{future::poll_fn, marker, rc::Rc};
1+
use std::{marker, rc::Rc};
22

33
use ntex::router::{IntoPattern, Router as PatternRouter};
44
use ntex::service::{
@@ -141,34 +141,41 @@ impl<S: 'static> Service<Message> for RouterService<S> {
141141
log::trace!("Releasing handler service for {}", link.name());
142142
let name = link.name().clone();
143143
let _ = ntex::rt::spawn(async move {
144-
poll_fn(move |cx| srv.poll_shutdown(cx)).await;
144+
srv.shutdown().await;
145145
log::trace!("Handler service for {} has shutdown", name);
146146
});
147147
}
148148
Ok(())
149149
}
150150
Message::DetachedAll(links) => {
151-
let futs: Vec<_> = links
151+
let links: Vec<_> = links
152152
.into_iter()
153153
.filter_map(|link| {
154-
self.0.get_mut().handlers.remove(&link).and_then(|srv| {
155-
srv.map(|srv| {
156-
log::trace!(
157-
"Releasing handler service for {} (session ended)",
158-
link.name()
159-
);
160-
poll_fn(move |cx| srv.poll_shutdown(cx))
161-
})
162-
})
154+
self.0
155+
.get_mut()
156+
.handlers
157+
.remove(&link)
158+
.and_then(move |srv| srv.map(|srv| (link, srv)))
163159
})
164160
.collect();
165161

166162
log::trace!(
167163
"Shutting down {} handler services (session ended)",
168-
futs.len()
164+
links.len()
169165
);
170166

171167
let _ = ntex::rt::spawn(async move {
168+
let futs: Vec<_> = links
169+
.iter()
170+
.map(|(link, srv)| {
171+
log::trace!(
172+
"Releasing handler service for {} (session ended)",
173+
link.name()
174+
);
175+
srv.shutdown()
176+
})
177+
.collect();
178+
172179
let len = futs.len();
173180
let _ = join_all(futs).await;
174181
log::trace!(
@@ -279,8 +286,8 @@ where
279286
type Response = Outcome;
280287
type Error = Error;
281288

282-
ntex::forward_poll_ready!(service);
283-
ntex::forward_poll_shutdown!(service);
289+
ntex::forward_ready!(service);
290+
ntex::forward_shutdown!(service);
284291

285292
async fn call(
286293
&self,

src/server/service.rs

+19-4
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,15 @@ where
231231
type Response = ();
232232
type Error = ServerError<H::Error>;
233233

234-
ntex::forward_poll_ready!(handshake, ServerError::Service);
235-
ntex::forward_poll_shutdown!(handshake);
234+
#[inline]
235+
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
236+
self.handshake.ready().await.map_err(ServerError::Service)
237+
}
238+
239+
#[inline]
240+
async fn shutdown(&self) {
241+
self.handshake.shutdown().await
242+
}
236243

237244
async fn call(
238245
&self,
@@ -256,9 +263,17 @@ where
256263
type Response = ();
257264
type Error = ServerError<H::Error>;
258265

259-
ntex::forward_poll_ready!(handshake, ServerError::Service);
260-
ntex::forward_poll_shutdown!(handshake);
266+
#[inline]
267+
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
268+
self.handshake.ready().await.map_err(ServerError::Service)
269+
}
270+
271+
#[inline]
272+
async fn shutdown(&self) {
273+
self.handshake.shutdown().await
274+
}
261275

276+
#[inline]
262277
async fn call(
263278
&self,
264279
req: IoBoxed,

0 commit comments

Comments
 (0)