Skip to content

Commit 681bb77

Browse files
committed
Use updated Service trait
1 parent 15008af commit 681bb77

11 files changed

+133
-135
lines changed

CHANGES.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [4.5.0] - 2024-12-04
4+
5+
* Use updated Service trait
6+
37
## [4.4.0] - 2024-11-10
48

59
* Check service readiness once per decoded item

Cargo.toml

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-mqtt"
3-
version = "4.4.0"
3+
version = "4.5.0"
44
authors = ["ntex contributors <[email protected]>"]
55
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
66
documentation = "https://docs.rs/ntex-mqtt"
@@ -17,8 +17,8 @@ features = ["ntex/tokio"]
1717
[dependencies]
1818
ntex-io = "2"
1919
ntex-net = "2"
20-
ntex-util = "2.5"
21-
ntex-service = "3.3.3"
20+
ntex-util = "2.8"
21+
ntex-service = "3.4"
2222
ntex-bytes = "0.1"
2323
ntex-codec = "0.6"
2424
ntex-router = "0.5"

src/inflight.rs

+2-13
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
use std::{cell::Cell, future::poll_fn, rc::Rc, task::Context, task::Poll};
33

44
use ntex_service::{Middleware, Service, ServiceCtx};
5-
use ntex_util::{future::join, future::select, task::LocalWaker};
5+
use ntex_util::{future::join, task::LocalWaker};
66

77
/// Trait for types that could be sized
88
pub trait SizedRequest {
@@ -83,13 +83,6 @@ where
8383
}
8484
}
8585

86-
#[inline]
87-
async fn not_ready(&self) {
88-
if self.count.is_available() {
89-
select(self.count.unavailable(), self.service.not_ready()).await;
90-
}
91-
}
92-
9386
#[inline]
9487
async fn call(&self, req: R, ctx: ServiceCtx<'_, Self>) -> Result<S::Response, S::Error> {
9588
let size = if self.count.0.max_size > 0 { req.size() } else { 0 };
@@ -99,6 +92,7 @@ where
9992
result
10093
}
10194

95+
ntex_service::forward_poll!(service);
10296
ntex_service::forward_shutdown!(service);
10397
}
10498

@@ -135,10 +129,6 @@ impl Counter {
135129
async fn available(&self) {
136130
poll_fn(|cx| if self.0.available(cx) { Poll::Ready(()) } else { Poll::Pending }).await
137131
}
138-
139-
async fn unavailable(&self) {
140-
poll_fn(|cx| if self.0.available(cx) { Poll::Pending } else { Poll::Ready(()) }).await
141-
}
142132
}
143133

144134
struct CounterGuard(u32, Rc<CounterInner>);
@@ -305,7 +295,6 @@ mod tests {
305295
}))
306296
.bind();
307297
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
308-
assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending);
309298

310299
let srv2 = srv.clone();
311300
ntex_util::spawn(async move {

src/io.rs

+1-32
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
//! Framed transport dispatcher
2-
use std::future::{poll_fn, Future};
32
use std::task::{ready, Context, Poll};
4-
use std::{cell::Cell, cell::RefCell, collections::VecDeque, pin::Pin, rc::Rc};
3+
use std::{cell::Cell, cell::RefCell, collections::VecDeque, future::Future, pin::Pin, rc::Rc};
54

65
use ntex_codec::{Decoder, Encoder};
76
use ntex_io::{
@@ -255,12 +254,6 @@ where
255254
}
256255
}
257256

258-
// start ready task
259-
if inner.flags.contains(Flags::READY_TASK) {
260-
inner.flags.insert(Flags::READY_TASK);
261-
ntex_rt::spawn(not_ready(inner.state.clone(), inner.service.clone()));
262-
}
263-
264257
loop {
265258
match inner.st {
266259
IoDispatcherState::Processing => {
@@ -595,30 +588,6 @@ where
595588
}
596589
}
597590

598-
async fn not_ready<S, U>(
599-
slf: Rc<DispatcherState<S, U>>,
600-
pl: PipelineBinding<S, DispatchItem<U>>,
601-
) where
602-
S: Service<DispatchItem<U>, Response = Option<Response<U>>> + 'static,
603-
U: Encoder + Decoder + 'static,
604-
{
605-
loop {
606-
if !pl.is_shutdown() {
607-
if let Err(err) = poll_fn(|cx| pl.poll_ready(cx)).await {
608-
slf.error.set(Some(IoDispatcherError::Service(err)));
609-
break;
610-
}
611-
if !pl.is_shutdown() {
612-
poll_fn(|cx| pl.poll_not_ready(cx)).await;
613-
slf.ready.set(false);
614-
slf.waker.wake();
615-
continue;
616-
}
617-
}
618-
break;
619-
}
620-
}
621-
622591
#[cfg(test)]
623592
mod tests {
624593
use std::{cell::Cell, io, sync::Arc, sync::Mutex};

src/server.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{fmt, io, marker};
1+
use std::{fmt, io, marker, task::Context};
22

33
use ntex_codec::{Decoder, Encoder};
44
use ntex_io::{DispatchItem, Filter, Io, IoBoxed};
@@ -234,8 +234,9 @@ where
234234
}
235235

236236
#[inline]
237-
async fn not_ready(&self) {
238-
select(self.handlers.0.not_ready(), self.handlers.1.not_ready()).await;
237+
fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
238+
self.handlers.0.poll(cx)?;
239+
self.handlers.1.poll(cx)
239240
}
240241

241242
#[inline]
@@ -302,8 +303,8 @@ where
302303
}
303304

304305
#[inline]
305-
async fn not_ready(&self) {
306-
Service::<IoBoxed>::not_ready(self).await
306+
fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
307+
Service::<IoBoxed>::poll(self, cx)
307308
}
308309

309310
#[inline]

src/v3/client/dispatcher.rs

+24-12
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use std::{cell::RefCell, marker::PhantomData, num::NonZeroU16, rc::Rc};
1+
use std::{cell::RefCell, marker::PhantomData, num::NonZeroU16, rc::Rc, task::Context};
22

33
use ntex_io::DispatchItem;
44
use ntex_service::{Pipeline, Service, ServiceCtx};
5-
use ntex_util::future::{join, select, Either};
5+
use ntex_util::future::{join, Either};
66
use ntex_util::{services::inflight::InFlightService, HashSet};
77

88
use crate::error::{HandshakeError, MqttError, ProtocolError};
@@ -38,7 +38,7 @@ pub(crate) struct Dispatcher<T, C: Service<Control<E>>, E> {
3838
}
3939

4040
struct Inner<C> {
41-
control: C,
41+
control: Pipeline<C>,
4242
sink: Rc<MqttShared>,
4343
inflight: RefCell<HashSet<NonZeroU16>>,
4444
}
@@ -51,15 +51,19 @@ where
5151
pub(crate) fn new(sink: Rc<MqttShared>, publish: T, control: C) -> Self {
5252
Self {
5353
publish,
54-
inner: Rc::new(Inner { sink, control, inflight: RefCell::new(HashSet::default()) }),
54+
inner: Rc::new(Inner {
55+
sink,
56+
control: Pipeline::new(control),
57+
inflight: RefCell::new(HashSet::default()),
58+
}),
5559
_t: PhantomData,
5660
}
5761
}
5862
}
5963

6064
impl<T, C, E> Service<DispatchItem<Rc<MqttShared>>> for Dispatcher<T, C, E>
6165
where
62-
T: Service<Publish, Response = Either<(), Publish>, Error = E>,
66+
T: Service<Publish, Response = Either<(), Publish>, Error = E> + 'static,
6367
C: Service<Control<E>, Response = ControlAck, Error = MqttError<E>> + 'static,
6468
E: 'static,
6569
{
@@ -68,12 +72,13 @@ where
6872

6973
#[inline]
7074
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
71-
let (res1, res2) = join(ctx.ready(&self.publish), ctx.ready(&self.inner.control)).await;
75+
let (res1, res2) =
76+
join(ctx.ready(&self.publish), ctx.ready(self.inner.control.get_ref())).await;
7277
if let Err(e) = res1 {
7378
if res2.is_err() {
7479
Err(MqttError::Service(e))
7580
} else {
76-
match ctx.call_nowait(&self.inner.control, Control::error(e)).await {
81+
match ctx.call_nowait(self.inner.control.get_ref(), Control::error(e)).await {
7782
Ok(_) => {
7883
self.inner.sink.close();
7984
Ok(())
@@ -86,14 +91,21 @@ where
8691
}
8792
}
8893

89-
#[inline]
90-
async fn not_ready(&self) {
91-
select(self.publish.not_ready(), self.inner.control.not_ready()).await;
94+
fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
95+
if let Err(e) = self.publish.poll(cx) {
96+
let inner = self.inner.clone();
97+
ntex_rt::spawn(async move {
98+
if inner.control.call_nowait(Control::error(e.into())).await.is_ok() {
99+
inner.sink.close();
100+
}
101+
});
102+
}
103+
self.inner.control.poll(cx)
92104
}
93105

94106
async fn shutdown(&self) {
95107
self.inner.sink.close();
96-
let _ = Pipeline::new(&self.inner.control).call(Control::closed()).await;
108+
let _ = self.inner.control.call(Control::closed()).await;
97109

98110
self.publish.shutdown().await;
99111
self.inner.control.shutdown().await
@@ -231,7 +243,7 @@ async fn control<'f, T, C, E>(
231243
where
232244
C: Service<Control<E>, Response = ControlAck, Error = MqttError<E>>,
233245
{
234-
let packet = match ctx.call(&inner.control, msg).await?.result {
246+
let packet = match ctx.call(inner.control.get_ref(), msg).await?.result {
235247
ControlAckKind::Ping => Some(codec::Packet::PingResponse),
236248
ControlAckKind::PublishAck(id) => {
237249
inner.inflight.borrow_mut().remove(&id);

src/v3/dispatcher.rs

+27-12
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
use std::{cell::RefCell, marker::PhantomData, num::NonZeroU16, rc::Rc};
1+
use std::{cell::RefCell, marker::PhantomData, num::NonZeroU16, rc::Rc, task::Context};
22

33
use ntex_io::DispatchItem;
44
use ntex_service::{Pipeline, Service, ServiceCtx, ServiceFactory};
55
use ntex_util::services::buffer::{BufferService, BufferServiceError};
66
use ntex_util::services::inflight::InFlightService;
7-
use ntex_util::{future::join, future::select, HashSet};
7+
use ntex_util::{future::join, HashSet};
88

99
use crate::error::{HandshakeError, MqttError, ProtocolError};
1010
use crate::types::QoS;
@@ -88,7 +88,7 @@ pub(crate) struct Dispatcher<T, C: Service<Control<E>>, E> {
8888
}
8989

9090
struct Inner<C> {
91-
control: C,
91+
control: Pipeline<C>,
9292
sink: Rc<MqttShared>,
9393
inflight: RefCell<HashSet<NonZeroU16>>,
9494
}
@@ -110,7 +110,11 @@ where
110110
publish,
111111
max_qos,
112112
handle_qos_after_disconnect,
113-
inner: Rc::new(Inner { sink, control, inflight: RefCell::new(HashSet::default()) }),
113+
inner: Rc::new(Inner {
114+
sink,
115+
control: Pipeline::new(control),
116+
inflight: RefCell::new(HashSet::default()),
117+
}),
114118
_t: PhantomData,
115119
}
116120
}
@@ -119,20 +123,24 @@ where
119123
impl<T, C, E> Service<DispatchItem<Rc<MqttShared>>> for Dispatcher<T, C, E>
120124
where
121125
E: From<T::Error> + 'static,
122-
T: Service<Publish, Response = ()>,
126+
T: Service<Publish, Response = ()> + 'static,
123127
C: Service<Control<E>, Response = ControlAck, Error = MqttError<E>> + 'static,
124128
{
125129
type Response = Option<codec::Packet>;
126130
type Error = MqttError<E>;
127131

128132
#[inline]
129133
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
130-
let (res1, res2) = join(ctx.ready(&self.publish), ctx.ready(&self.inner.control)).await;
134+
let (res1, res2) =
135+
join(ctx.ready(&self.publish), ctx.ready(self.inner.control.get_ref())).await;
131136
if let Err(e) = res1 {
132137
if res2.is_err() {
133138
Err(MqttError::Service(e.into()))
134139
} else {
135-
match ctx.call_nowait(&self.inner.control, Control::error(e.into())).await {
140+
match ctx
141+
.call_nowait(self.inner.control.get_ref(), Control::error(e.into()))
142+
.await
143+
{
136144
Ok(_) => {
137145
self.inner.sink.close();
138146
Ok(())
@@ -145,14 +153,21 @@ where
145153
}
146154
}
147155

148-
#[inline]
149-
async fn not_ready(&self) {
150-
select(self.publish.not_ready(), self.inner.control.not_ready()).await;
156+
fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
157+
if let Err(e) = self.publish.poll(cx) {
158+
let inner = self.inner.clone();
159+
ntex_rt::spawn(async move {
160+
if inner.control.call_nowait(Control::error(e.into())).await.is_ok() {
161+
inner.sink.close();
162+
}
163+
});
164+
}
165+
self.inner.control.poll(cx)
151166
}
152167

153168
async fn shutdown(&self) {
154169
self.inner.sink.close();
155-
let _ = Pipeline::new(&self.inner.control).call(Control::closed()).await;
170+
let _ = self.inner.control.call(Control::closed()).await;
156171

157172
self.publish.shutdown().await;
158173
self.inner.control.shutdown().await;
@@ -387,7 +402,7 @@ where
387402
let mut error = matches!(pkt, Control::Error(_) | Control::ProtocolError(_));
388403

389404
loop {
390-
match ctx.call(&inner.control, pkt).await {
405+
match ctx.call(inner.control.get_ref(), pkt).await {
391406
Ok(item) => {
392407
let packet = match item.result {
393408
ControlAckKind::Ping => Some(codec::Packet::PingResponse),

src/v3/router.rs

+5-15
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{future::poll_fn, future::Future, pin::Pin, rc::Rc, task::Poll};
1+
use std::{rc::Rc, task::Context};
22

33
use ntex_router::{IntoPattern, RouterBuilder};
44
use ntex_service::boxed::{self, BoxService, BoxServiceFactory};
@@ -117,24 +117,14 @@ impl<Err> Service<Publish> for RouterService<Err> {
117117
}
118118

119119
#[inline]
120-
async fn not_ready(&self) {
121-
let mut futs = Vec::with_capacity(self.handlers.len() + 1);
120+
fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
122121
for hnd in &self.handlers {
123-
futs.push(Box::pin(hnd.not_ready()));
122+
hnd.poll(cx)?;
124123
}
125-
futs.push(Box::pin(self.default.not_ready()));
126-
127-
poll_fn(|cx| {
128-
for hnd in &mut futs {
129-
if Pin::new(hnd).poll(cx).is_ready() {
130-
return Poll::Ready(());
131-
}
132-
}
133-
Poll::Pending
134-
})
135-
.await;
124+
self.default.poll(cx)
136125
}
137126

127+
#[inline]
138128
async fn call(
139129
&self,
140130
mut req: Publish,

0 commit comments

Comments
 (0)