Skip to content

Commit 3cc277c

Browse files
committed
initial import
0 parents  commit 3cc277c

13 files changed

+1293
-0
lines changed

.gitignore

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
Cargo.lock
2+
target/
3+
guide/build/
4+
/gh-pages
5+
6+
*.so
7+
*.out
8+
*.pyc
9+
*.pid
10+
*.sock
11+
*~
12+
13+
# These are backup files generated by rustfmt
14+
**/*.rs.bk

Cargo.toml

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[package]
2+
name = "amqp-transport"
3+
version = "0.1.0"
4+
authors = ["Nikolay Kim <[email protected]>"]
5+
edition = "2018"
6+
7+
[dependencies]
8+
amqp = { git = "https://github.com/fafhrd91/rust-amqp.git" }
9+
10+
derive_more = "0.13"
11+
either = "1.5"
12+
futures = "0.1"
13+
log = "0.4"
14+
bytes = "0.4"
15+
uuid = { version = "0.7", features = ["v4"] }
16+
slab = "0.4"
17+
string = "0.1"
18+
19+
actix-codec = "0.1.0"
20+
actix-service = "0.1.2"
21+
actix-connector = "0.1.0"

README.md

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# AMQP 1.0 Protocol Transport in Rust
2+
3+
[![Build Status](https://travis-ci.org/fafhrd91/rust-amqp-transport.svg?branch=master)](https://travis-ci.org/fafhrd91/rust-amqp-transport)

rustfmt.toml

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
max_width = 100

src/cell.rs

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
//! Custom cell impl
2+
use std::{cell::UnsafeCell, fmt, rc::Rc};
3+
4+
pub(crate) struct Cell<T> {
5+
inner: Rc<UnsafeCell<T>>,
6+
}
7+
8+
impl<T> Clone for Cell<T> {
9+
fn clone(&self) -> Self {
10+
Self {
11+
inner: self.inner.clone(),
12+
}
13+
}
14+
}
15+
16+
impl<T: fmt::Debug> fmt::Debug for Cell<T> {
17+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
18+
self.inner.fmt(f)
19+
}
20+
}
21+
22+
impl<T> Cell<T> {
23+
pub(crate) fn new(inner: T) -> Self {
24+
Self {
25+
inner: Rc::new(UnsafeCell::new(inner)),
26+
}
27+
}
28+
29+
pub(crate) fn get_mut(&mut self) -> &mut T {
30+
unsafe { &mut *self.inner.as_ref().get() }
31+
}
32+
}

src/connection.rs

+270
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
use std::collections::VecDeque;
2+
3+
use actix_codec::{AsyncRead, AsyncWrite, Framed};
4+
use bytes::Bytes;
5+
use futures::future::{err, Either};
6+
use futures::task::AtomicTask;
7+
use futures::unsync::oneshot;
8+
use futures::{future, Async, Future, Poll, Sink, Stream};
9+
10+
use amqp::framing::AmqpFrame;
11+
use amqp::protocol::{Begin, Frame};
12+
use amqp::AmqpCodec;
13+
14+
use crate::cell::Cell;
15+
use crate::errors::AmqpTransportError;
16+
use crate::session::{Session, SessionInner};
17+
use crate::Configuration;
18+
19+
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
20+
pub(crate) struct ChannelId(ChannelType);
21+
22+
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
23+
enum ChannelType {
24+
In(u16),
25+
Out(u16),
26+
}
27+
28+
impl ChannelId {
29+
pub fn as_u16(&self) -> u16 {
30+
match self.0 {
31+
ChannelType::In(val) => val,
32+
ChannelType::Out(val) => val,
33+
}
34+
}
35+
}
36+
37+
pub struct Connection<T: AsyncRead + AsyncWrite + 'static> {
38+
inner: Cell<ConnectionInner>,
39+
framed: Framed<T, AmqpCodec<AmqpFrame>>,
40+
}
41+
42+
enum ChannelState {
43+
Opening(oneshot::Sender<Session>, Cell<ConnectionInner>),
44+
Established(Cell<SessionInner>),
45+
Closing(Cell<SessionInner>),
46+
None,
47+
}
48+
49+
impl ChannelState {
50+
fn is_opening(&self) -> bool {
51+
match self {
52+
ChannelState::Opening(_, _) => true,
53+
_ => false,
54+
}
55+
}
56+
}
57+
58+
pub(crate) struct ConnectionInner {
59+
local: Configuration,
60+
remote: Configuration,
61+
write_queue: VecDeque<AmqpFrame>,
62+
write_task: AtomicTask,
63+
channels: slab::Slab<ChannelState>,
64+
}
65+
66+
impl<T: AsyncRead + AsyncWrite> Connection<T> {
67+
pub fn new(
68+
framed: Framed<T, AmqpCodec<AmqpFrame>>,
69+
local: Configuration,
70+
remote: Configuration,
71+
) -> Connection<T> {
72+
Connection {
73+
framed,
74+
inner: Cell::new(ConnectionInner::new(local, remote)),
75+
}
76+
}
77+
78+
pub fn close(&mut self) -> impl Future<Item = (), Error = AmqpTransportError> {
79+
future::ok(())
80+
}
81+
82+
/// Opens the session
83+
pub fn open_session(&mut self) -> impl Future<Item = Session, Error = AmqpTransportError> {
84+
let inner = self.inner.clone();
85+
self.inner.get_mut().open_session(inner)
86+
}
87+
}
88+
89+
impl<T: AsyncRead + AsyncWrite> Future for Connection<T> {
90+
type Item = ();
91+
type Error = ();
92+
93+
fn poll(&mut self) -> Poll<(), ()> {
94+
let inner = self.inner.get_mut();
95+
96+
loop {
97+
match self.framed.poll() {
98+
Ok(Async::Ready(Some(frame))) => inner.handle_frame(frame),
99+
Ok(Async::Ready(None)) => return Err(()),
100+
Ok(Async::NotReady) => break,
101+
Err(e) => {
102+
warn!("AMQP: Error reading: {:?}", e);
103+
return Err(());
104+
}
105+
}
106+
}
107+
108+
loop {
109+
while !self.framed.is_write_buf_full() {
110+
if let Some(frame) = inner.pop_next_frame() {
111+
println!("SEND: {:?} \n", frame);
112+
if let Err(_) = self.framed.force_send(frame) {
113+
return Err(());
114+
}
115+
} else {
116+
break;
117+
}
118+
}
119+
120+
if !self.framed.is_write_buf_empty() {
121+
match self.framed.poll_complete() {
122+
Ok(Async::NotReady) => break,
123+
Err(e) => {
124+
debug!("Error sending data: {}", e);
125+
return Err(());
126+
}
127+
Ok(Async::Ready(_)) => {
128+
inner.write_task.register();
129+
}
130+
}
131+
} else {
132+
break;
133+
}
134+
}
135+
136+
Ok(Async::NotReady)
137+
}
138+
}
139+
140+
pub(crate) struct ConnectionController(Cell<ConnectionInner>);
141+
142+
impl ConnectionController {
143+
pub fn close_session(&mut self) {
144+
unimplemented!()
145+
}
146+
147+
pub fn post_frame(&mut self, frame: AmqpFrame) {
148+
self.0.get_mut().post_frame(frame)
149+
}
150+
151+
pub(crate) fn drop_session_copy(&mut self, id: ChannelId) {}
152+
}
153+
154+
impl ConnectionInner {
155+
pub fn new(local: Configuration, remote: Configuration) -> ConnectionInner {
156+
ConnectionInner {
157+
local,
158+
remote,
159+
write_queue: VecDeque::new(),
160+
write_task: AtomicTask::new(),
161+
channels: slab::Slab::new(),
162+
}
163+
}
164+
165+
fn pop_next_frame(&mut self) -> Option<AmqpFrame> {
166+
self.write_queue.pop_front()
167+
}
168+
169+
fn post_frame(&mut self, frame: AmqpFrame) {
170+
self.write_queue.push_back(frame);
171+
self.write_task.notify();
172+
}
173+
174+
fn handle_frame(&mut self, frame: AmqpFrame) {
175+
println!("FRAME: {:?} \n", frame);
176+
177+
match *frame.performative() {
178+
Frame::Begin(ref begin) if begin.remote_channel().is_some() => {
179+
self.complete_session_creation(frame.channel_id() as usize, begin);
180+
return;
181+
}
182+
// todo: handle Close, End?
183+
Frame::End(_) | Frame::Close(_) => {
184+
println!("todo: unexpected frame: {:#?}", frame);
185+
}
186+
_ => (), // todo: handle unexpected frames
187+
}
188+
189+
if let Some(channel) = self.channels.get_mut(frame.channel_id() as usize) {
190+
match channel {
191+
ChannelState::Established(ref mut session) => {
192+
let s = session.clone();
193+
session.get_mut().handle_frame(frame, s)
194+
}
195+
_ => (),
196+
}
197+
} else {
198+
// todo: missing session
199+
println!("todo: missing session: {}", frame.channel_id());
200+
}
201+
}
202+
203+
fn complete_session_creation(&mut self, channel_id: usize, begin: &Begin) {
204+
println!("COMPLETE: {:?} {:?}", channel_id, begin.remote_channel());
205+
206+
let id = begin.remote_channel().unwrap() as usize;
207+
208+
if let Some(channel) = self.channels.get_mut(id) {
209+
if channel.is_opening() {
210+
let item = std::mem::replace(channel, ChannelState::None);
211+
212+
if let ChannelState::Opening(tx, self_rc) = item {
213+
let session = Cell::new(SessionInner::new(
214+
ChannelId(ChannelType::Out(id as u16)),
215+
ConnectionController(self_rc),
216+
begin.remote_channel().unwrap(),
217+
begin.incoming_window(),
218+
begin.next_outgoing_id(),
219+
begin.outgoing_window(),
220+
));
221+
222+
if tx.send(Session::new(session.clone())).is_err() {
223+
// todo: send end session
224+
}
225+
*channel = ChannelState::Established(session)
226+
}
227+
} else {
228+
// send error response
229+
}
230+
} else {
231+
// todo: rogue begin right now - do nothing. in future might indicate incoming attach
232+
}
233+
}
234+
235+
fn open_session(
236+
&mut self,
237+
inner: Cell<ConnectionInner>,
238+
) -> impl Future<Item = Session, Error = AmqpTransportError> {
239+
let (tx, rx) = oneshot::channel();
240+
241+
let entry = self.channels.vacant_entry();
242+
let token = entry.key();
243+
244+
println!("TOKEN-ID: {:?}", token);
245+
246+
if token >= self.local.channel_max {
247+
Either::A(err(AmqpTransportError::TooManyChannels.into()))
248+
} else {
249+
entry.insert(ChannelState::Opening(tx, inner));
250+
251+
let begin = Begin {
252+
// todo: let user specify settings
253+
remote_channel: None,
254+
next_outgoing_id: 1,
255+
incoming_window: 0,
256+
outgoing_window: ::std::u32::MAX,
257+
handle_max: ::std::u32::MAX,
258+
offered_capabilities: None,
259+
desired_capabilities: None,
260+
properties: None,
261+
};
262+
self.post_frame(AmqpFrame::new(
263+
token as u16,
264+
Frame::Begin(begin),
265+
Bytes::new(),
266+
));
267+
Either::B(rx.map_err(|_e| AmqpTransportError::Disconnected))
268+
}
269+
}
270+
}

src/errors.rs

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
use amqp::errors::{AmqpCodecError, AmqpParseError, ProtocolIdError};
2+
use amqp::protocol::SaslCode;
3+
4+
#[derive(Debug, Display, From)]
5+
pub enum AmqpTransportError {
6+
ParseError(AmqpParseError),
7+
TooManyChannels,
8+
Disconnected,
9+
}
10+
11+
#[derive(Debug, From)]
12+
pub enum SaslConnectError {
13+
Protocol(ProtocolIdError),
14+
AmqpError(AmqpCodecError),
15+
Sasl(SaslCode),
16+
ExpectedOpenFrame,
17+
Disconnected,
18+
}

0 commit comments

Comments
 (0)