Skip to content

Commit 1afaa60

Browse files
authored
Migrate to ntex-service-1.0 (#27)
* use service 1.0
1 parent ef335bc commit 1afaa60

17 files changed

+202
-257
lines changed

.github/workflows/linux.yml

+5-5
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ jobs:
88
fail-fast: false
99
matrix:
1010
version:
11-
- 1.57.0 # MSRV
11+
- 1.65.0 # MSRV
1212
- stable
1313
- nightly
1414

@@ -34,7 +34,7 @@ jobs:
3434
uses: Swatinem/[email protected]
3535

3636
- name: Cache cargo tarpaulin
37-
if: matrix.version == '1.57.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
37+
if: matrix.version == '1.65.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
3838
uses: actions/cache@v1
3939
with:
4040
path: ~/.cargo/bin
@@ -48,19 +48,19 @@ jobs:
4848
args: --all --no-fail-fast --features=ntex/tokio -- --nocapture
4949

5050
- name: Install tarpaulin
51-
if: matrix.version == '1.57.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
51+
if: matrix.version == '1.65.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
5252
continue-on-error: true
5353
run: |
5454
cargo install cargo-tarpaulin
5555
5656
- name: Generate coverage report
57-
if: matrix.version == '1.57.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
57+
if: matrix.version == '1.65.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
5858
continue-on-error: true
5959
run: |
6060
cargo tarpaulin --out Xml --all --features=ntex/tokio
6161
6262
- name: Upload to Codecov
63-
if: matrix.version == '1.57.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
63+
if: matrix.version == '1.65.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
6464
continue-on-error: true
6565
uses: codecov/codecov-action@v1
6666
with:

CHANGES.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [0.7.0-beta.0] - 2022-12-28
4+
5+
* Migrate to ntex-service 1.0
6+
37
## [0.6.4] - 2022-08-22
48

59
* Must respond with attach before detach when rejecting links #24

Cargo.toml

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-amqp"
3-
version = "0.6.4"
3+
version = "0.7.0-beta.0"
44
authors = ["ntex contributors <[email protected]>"]
55
description = "AMQP 1.0 Client/Server framework"
66
documentation = "https://docs.rs/ntex-amqp"
@@ -24,7 +24,7 @@ default = []
2424
frame-trace = []
2525

2626
[dependencies]
27-
ntex = "0.5.14"
27+
ntex = "0.6.0-beta.0"
2828
ntex-amqp-codec = "0.8.2"
2929

3030
bitflags = "1.3"
@@ -35,8 +35,8 @@ slab = "0.4"
3535
uuid = { version = "0.8", features = ["v4"] }
3636

3737
[dev-dependencies]
38-
env_logger = "0.9"
39-
ntex = { version = "0.5", features = ["tokio"] }
38+
env_logger = "0.10"
39+
ntex = { version = "0.6.0-beta.0", features = ["tokio"] }
4040

4141
[patch.crates-io]
4242
ntex-amqp = { path = "." }

codec/src/codec/encode.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ impl ArrayEncode for bool {
4545
1
4646
}
4747
fn array_encode(&self, buf: &mut BytesMut) {
48-
buf.put_u8(if *self { 1 } else { 0 });
48+
buf.put_u8(u8::from(*self));
4949
}
5050
}
5151

codec/src/types/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,8 @@ impl From<String> for Str {
139139
impl hash::Hash for Str {
140140
fn hash<H: hash::Hasher>(&self, state: &mut H) {
141141
match self {
142-
Str::String(s) => (&**s).hash(state),
143-
Str::ByteStr(s) => (&**s).hash(state),
142+
Str::String(s) => (**s).hash(state),
143+
Str::ByteStr(s) => (**s).hash(state),
144144
Str::Static(s) => s.hash(state),
145145
}
146146
}

codec/src/types/variant.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ impl Variant {
126126

127127
pub fn as_int(&self) -> Option<i32> {
128128
match self {
129-
Variant::Int(v) => Some(*v as i32),
129+
Variant::Int(v) => Some(*v),
130130
_ => None,
131131
}
132132
}
@@ -140,7 +140,7 @@ impl Variant {
140140
Variant::Byte(v) => Some(*v as i64),
141141
Variant::Short(v) => Some(*v as i64),
142142
Variant::Int(v) => Some(*v as i64),
143-
Variant::Long(v) => Some(*v as i64),
143+
Variant::Long(v) => Some(*v),
144144
_ => None,
145145
}
146146
}

examples/server.rs

+2-13
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,9 @@
1-
use ntex::service::{fn_factory_with_config, Service};
2-
use ntex::util::Ready;
1+
use ntex::service::{boxed::BoxService, fn_factory_with_config};
32
use ntex_amqp::{error::AmqpError, error::LinkError, server};
43

54
async fn server(
65
link: server::Link<()>,
7-
) -> Result<
8-
Box<
9-
dyn Service<
10-
server::Transfer,
11-
Response = server::Outcome,
12-
Error = AmqpError,
13-
Future = Ready<server::Outcome, AmqpError>,
14-
> + 'static,
15-
>,
16-
LinkError,
17-
> {
6+
) -> Result<BoxService<server::Transfer, server::Outcome, AmqpError>, LinkError> {
187
println!("OPEN LINK: {:?}", link);
198
Err(LinkError::force_detach().description("unimplemented"))
209
}

src/client/connector.rs

+21-37
Original file line numberDiff line numberDiff line change
@@ -145,13 +145,11 @@ where
145145
IoBoxed: From<T::Response>,
146146
{
147147
/// Connect to amqp server
148-
pub fn connect(&self, address: A) -> impl Future<Output = Result<Client, ConnectError>> {
148+
pub async fn connect(&self, address: A) -> Result<Client, ConnectError> {
149149
let fut = timeout_checked(self.handshake_timeout, self._connect(address));
150-
async move {
151-
match fut.await {
152-
Ok(res) => res.map_err(From::from),
153-
Err(_) => Err(ConnectError::HandshakeTimeout),
154-
}
150+
match fut.await {
151+
Ok(res) => res.map_err(From::from),
152+
Err(_) => Err(ConnectError::HandshakeTimeout),
155153
}
156154
}
157155

@@ -165,35 +163,27 @@ where
165163
_connect_plain(io, self.config.clone())
166164
}
167165

168-
fn _connect(&self, address: A) -> impl Future<Output = Result<Client, ConnectError>> {
169-
let fut = self.connector.call(Connect::new(address));
166+
async fn _connect(&self, address: A) -> Result<Client, ConnectError> {
167+
let io = self.connector.call(Connect::new(address)).await?;
170168
let config = self.config.clone();
171169
let pool = self.pool;
172170
let disconnect = self.disconnect_timeout;
173171

174-
async move {
175-
trace!("Negotiation client protocol id: Amqp");
172+
trace!("Negotiation client protocol id: Amqp");
176173

177-
let io = IoBoxed::from(fut.await?);
178-
io.set_memory_pool(pool);
179-
io.set_disconnect_timeout(disconnect.into());
174+
let io = IoBoxed::from(io);
175+
io.set_memory_pool(pool);
176+
io.set_disconnect_timeout(disconnect.into());
180177

181-
_connect_plain(io, config).await
182-
}
178+
_connect_plain(io, config).await
183179
}
184180

185181
/// Connect to amqp server
186-
pub fn connect_sasl(
187-
&self,
188-
addr: A,
189-
auth: SaslAuth,
190-
) -> impl Future<Output = Result<Client, ConnectError>> {
182+
pub async fn connect_sasl(&self, addr: A, auth: SaslAuth) -> Result<Client, ConnectError> {
191183
let fut = timeout_checked(self.handshake_timeout, self._connect_sasl(addr, auth));
192-
async move {
193-
match fut.await {
194-
Ok(res) => res.map_err(From::from),
195-
Err(_) => Err(ConnectError::HandshakeTimeout),
196-
}
184+
match fut.await {
185+
Ok(res) => res.map_err(From::from),
186+
Err(_) => Err(ConnectError::HandshakeTimeout),
197187
}
198188
}
199189

@@ -212,23 +202,17 @@ where
212202
_connect_sasl(io, auth, config)
213203
}
214204

215-
fn _connect_sasl(
216-
&self,
217-
addr: A,
218-
auth: SaslAuth,
219-
) -> impl Future<Output = Result<Client, ConnectError>> {
220-
let fut = self.connector.call(Connect::new(addr));
205+
async fn _connect_sasl(&self, addr: A, auth: SaslAuth) -> Result<Client, ConnectError> {
206+
let io = self.connector.call(Connect::new(addr)).await?;
221207
let config = self.config.clone();
222208
let pool = self.pool;
223209
let disconnect = self.disconnect_timeout;
224210

225-
async move {
226-
let io = IoBoxed::from(fut.await?);
227-
io.set_memory_pool(pool);
228-
io.set_disconnect_timeout(disconnect.into());
211+
let io = IoBoxed::from(io);
212+
io.set_memory_pool(pool);
213+
io.set_disconnect_timeout(disconnect.into());
229214

230-
_connect_sasl(io, auth, config).await
231-
}
215+
_connect_sasl(io, auth, config).await
232216
}
233217
}
234218

src/control.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub enum ControlFrameKind {
3434
SessionEnded(Vec<Either<SenderLink, ReceiverLink>>),
3535
ProtocolError(AmqpProtocolError),
3636
Disconnected(Option<io::Error>),
37-
Closed(bool),
37+
Closed,
3838
}
3939

4040
impl ControlFrame {

src/default.rs

+9-19
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{marker::PhantomData, task::Context, task::Poll};
1+
use std::marker::PhantomData;
22

33
use ntex::service::{Service, ServiceFactory};
44
use ntex::util::Ready;
@@ -20,25 +20,20 @@ impl<S, E> ServiceFactory<Link<S>, State<S>> for DefaultPublishService<S, E> {
2020
type Error = E;
2121
type InitError = LinkError;
2222
type Service = DefaultPublishService<S, E>;
23-
type Future = Ready<Self::Service, Self::InitError>;
23+
type Future<'f> = Ready<Self::Service, Self::InitError> where Self: 'f;
2424

25-
fn new_service(&self, _: State<S>) -> Self::Future {
25+
fn create(&self, _: State<S>) -> Self::Future<'_> {
2626
Ready::Err(LinkError::force_detach().description("not configured"))
2727
}
2828
}
2929

3030
impl<S, E> Service<Link<S>> for DefaultPublishService<S, E> {
3131
type Response = ();
3232
type Error = E;
33-
type Future = Ready<Self::Response, Self::Error>;
33+
type Future<'f> = Ready<Self::Response, Self::Error> where Self: 'f;
3434

3535
#[inline]
36-
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
37-
Poll::Ready(Ok(()))
38-
}
39-
40-
#[inline]
41-
fn call(&self, _pkt: Link<S>) -> Self::Future {
36+
fn call(&self, _pkt: Link<S>) -> Self::Future<'_> {
4237
log::warn!("AMQP Publish service is not configured");
4338
Ready::Ok(())
4439
}
@@ -58,25 +53,20 @@ impl<S, E> ServiceFactory<ControlFrame, State<S>> for DefaultControlService<S, E
5853
type Error = E;
5954
type InitError = E;
6055
type Service = DefaultControlService<S, E>;
61-
type Future = Ready<Self::Service, Self::InitError>;
56+
type Future<'f> = Ready<Self::Service, Self::InitError> where Self: 'f;
6257

63-
fn new_service(&self, _: State<S>) -> Self::Future {
58+
fn create(&self, _: State<S>) -> Self::Future<'_> {
6459
Ready::Ok(DefaultControlService(PhantomData))
6560
}
6661
}
6762

6863
impl<S, E> Service<ControlFrame> for DefaultControlService<S, E> {
6964
type Response = ();
7065
type Error = E;
71-
type Future = Ready<Self::Response, Self::Error>;
72-
73-
#[inline]
74-
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
75-
Poll::Ready(Ok(()))
76-
}
66+
type Future<'f> = Ready<Self::Response, Self::Error> where Self: 'f;
7767

7868
#[inline]
79-
fn call(&self, _pkt: ControlFrame) -> Self::Future {
69+
fn call(&self, _pkt: ControlFrame) -> Self::Future<'_> {
8070
Ready::Ok(())
8171
}
8272
}

0 commit comments

Comments
 (0)