Skip to content

Commit

Permalink
Finish async API
Browse files Browse the repository at this point in the history
  • Loading branch information
evanrittenhouse committed Jun 17, 2024
1 parent 84e3ff4 commit ee36a17
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 90 deletions.
8 changes: 3 additions & 5 deletions apps/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,10 +629,8 @@ impl HttpConn for Http09Conn {
s
);

// let body =
// std::fs::read(path.as_path())
// .unwrap_or_else(|_| b"Not Found!\r\n".to_vec());
let body = vec![0; 1_000_000];
let body = std::fs::read(path.as_path())
.unwrap_or_else(|_| b"Not Found!\r\n".to_vec());

info!(
"{} sending response of size {} on stream {}",
Expand Down Expand Up @@ -1094,7 +1092,7 @@ impl Http3Conn {
match std::fs::read(file_path.as_path()) {
Ok(data) => (200, data),

Err(_) => (404, vec![57; 1_000_000]),
Err(_) => (404, b"Not Found!".to_vec()),
}
},

Expand Down
5 changes: 4 additions & 1 deletion apps/src/recvfrom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ pub fn recv_from(
use dgram::RecvMsgSettings;
use std::os::unix::io::AsRawFd;

let mut recvmsg_cmsg_settings = RecvMsgSettings::default();
let mut recvmsg_cmsg_settings = RecvMsgSettings {
store_cmsgs: false,
cmsg_space: &mut vec![],
};
socket.try_io(|| {
let fd =
unsafe { std::os::fd::BorrowedFd::borrow_raw(socket.as_raw_fd()) };
Expand Down
2 changes: 0 additions & 2 deletions apps/src/sendto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ fn send_to_gso_pacing(
let fd =
unsafe { std::os::fd::BorrowedFd::borrow_raw(socket.as_raw_fd()) };

// TODO: make sure this actually errors properly, pretty sure there
// was some weirdness with the `Blocked` error
dgram::sync::send_to(&fd, buf, sendmsg_settings)
})
}
Expand Down
30 changes: 8 additions & 22 deletions dgram/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,9 @@ pub struct SendMsgSettings {

/// Settings for handling control messages when receiving data.
#[cfg(target_os = "linux")]
#[derive(Clone)]
pub struct RecvMsgSettings {
// TODO(evanrittenhouse): deprecate store_cmsgs and only store based on what
// cmsg_space can handle.
pub struct RecvMsgSettings<'c> {
/// If cmsgs should be stored when receiving a message. If set, cmsgs will
/// be stored in the `cmsg_space` vector.
/// be stored in the [`RecvData`]'s `cmsgs` field.
pub store_cmsgs: bool,
/// The vector where cmsgs will be stored, if store_cmsgs is set.
///
Expand All @@ -45,19 +42,15 @@ pub struct RecvMsgSettings {
/// [`cmsg_space`] macro.
///
/// [`cmsg_space`]: https://docs.rs/nix/latest/nix/macro.cmsg_space.html
pub cmsg_space: Vec<u8>,
/// Flags for [`recvmsg`]. See [MsgFlags] for more.
///
/// [`recvmsg`]: [nix::sys::socket::recvmsg]
pub msg_flags: MsgFlags,
pub cmsg_space: &'c mut Vec<u8>,
}

impl Default for RecvMsgSettings {
fn default() -> Self {
impl<'c> RecvMsgSettings<'c> {
// Convenience to avoid forcing a specific version of nix
pub fn new(store_cmsgs: bool, cmsg_space: &'c mut Vec<u8>) -> Self {
Self {
msg_flags: MsgFlags::empty(),
store_cmsgs: false,
cmsg_space: vec![],
store_cmsgs,
cmsg_space,
}
}
}
Expand Down Expand Up @@ -173,10 +166,3 @@ mod linux_imports {
pub(super) use std::net::SocketAddrV6;
pub(super) use std::os::fd::AsRawFd;
}

#[cfg(feature = "async")]
mod async_imports {
pub(super) use std::io::ErrorKind;
pub(super) use tokio::io::Interest;
pub(super) use tokio::net::UdpSocket;
}
23 changes: 11 additions & 12 deletions dgram/src/syscalls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ pub fn send_msg(
///
/// # Note
///
/// It is the caller's responsibility to create the cmsg space. `nix` recommends
/// that the space be created via the `cmsg_space!()` macro. Calling this
/// function will clear the cmsg buffer. It is also the caller's responsibility
/// to set any relevant socket options.
/// It is the caller's responsibility to create and clear the cmsg space.`nix`
/// recommends that the space be created via the `cmsg_space!()` macro. Calling
/// this function will clear the cmsg buffer. It is also the caller's
/// responsibility to set any relevant socket options.
#[cfg(target_os = "linux")]
pub fn recv_msg(
fd: impl AsFd, read_buf: &mut [u8], recvmsg_settings: &mut RecvMsgSettings,
Expand All @@ -102,7 +102,6 @@ pub fn recv_msg(
let RecvMsgSettings {
store_cmsgs,
ref mut cmsg_space,
msg_flags,
} = recvmsg_settings;

cmsg_space.clear();
Expand All @@ -115,7 +114,7 @@ pub fn recv_msg(
borrowed.as_raw_fd(),
iov_s,
Some(cmsg_space),
*msg_flags,
MsgFlags::empty(),
) {
Ok(r) => {
let bytes = r.bytes;
Expand All @@ -137,7 +136,6 @@ pub fn recv_msg(
};

let mut recv_data = RecvData::new(peer_addr, bytes, cmsg_space_len);

for msg in r.cmsgs() {
match msg {
ControlMessageOwned::ScmTimestampns(time) =>
Expand Down Expand Up @@ -260,8 +258,10 @@ mod tests {
sendmsg(send.as_raw_fd(), &iov, &[], MsgFlags::empty(), Some(&addr))?;

let mut read_buf = [0; 4];
let recv_data =
recv_msg(recv, &mut read_buf, &mut RecvMsgSettings::default())?;
let recv_data = recv_msg(recv, &mut read_buf, &mut RecvMsgSettings {
store_cmsgs: false,
cmsg_space: &mut vec![],
})?;

assert_eq!(recv_data.bytes, 4);
assert_eq!(&read_buf, b"jets");
Expand Down Expand Up @@ -317,11 +317,10 @@ mod tests {
let iov = [IoSlice::new(send_buf)];
sendmsg(send.as_raw_fd(), &iov, &[], MsgFlags::empty(), Some(&addr))?;

let cmsg_space = cmsg_space!(TimeVal);
let mut cmsg_space = cmsg_space!(TimeVal);
let mut recvmsg_settings = RecvMsgSettings {
store_cmsgs: true,
cmsg_space,
msg_flags: MsgFlags::empty(),
cmsg_space: &mut cmsg_space,
};

let mut read_buf = [0; 4];
Expand Down
110 changes: 62 additions & 48 deletions dgram/src/tokio.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use crate::RecvData;
use std::io::ErrorKind;
use std::io::Result;
use std::task::Context;
use std::task::Poll;

use crate::async_imports::*;
use tokio::io::Interest;
use tokio::net::UdpSocket;

#[cfg(target_os = "linux")]
mod linux {
Expand All @@ -13,64 +17,74 @@ mod linux {
use linux::*;

#[cfg(target_os = "linux")]
pub async fn send_to(
socket: &UdpSocket, send_buf: &[u8], send_msg_settings: SendMsgCmsgSettings,
) -> Result<usize> {
pub fn poll_send_to(
socket: &UdpSocket, ctx: &mut Context<'_>, send_buf: &[u8],
sendmsg_settings: SendMsgSettings,
) -> Poll<Result<usize>> {
loop {
// Important to use try_io so that Tokio can clear the socket's readiness
// flag
let res = socket.try_io(Interest::WRITABLE, || {
let fd = socket.as_fd();
send_msg(fd, send_buf, send_msg_settings).map_err(Into::into)
});

match res {
Err(e) if e.kind() == ErrorKind::WouldBlock =>
socket.writable().await?,
res => return res,
match socket.poll_send_ready(ctx) {
Poll::Ready(Ok(())) => {
// Important to use try_io so that Tokio can clear the socket's
// readiness flag
match socket.try_io(Interest::WRITABLE, || {
let fd = socket.as_fd();
send_msg(fd, send_buf, sendmsg_settings).map_err(Into::into)
}) {
Err(e) if e.kind() == ErrorKind::WouldBlock => {},
io_res => break Poll::Ready(io_res),
}
},
Poll::Ready(Err(e)) => break Poll::Ready(Err(e)),
Poll::Pending => break Poll::Pending,
}
}
}

#[cfg(target_os = "linux")]
pub async fn recv_from(
socket: &UdpSocket, read_buf: &mut [u8], msg_flags: Option<MsgFlags>,
store_cmsg_settings: &mut RecvMsgCmsgSettings,
) -> Result<RecvData> {
loop {
// Important to use try_io so that Tokio can clear the socket's readiness
// flag
let res = socket.try_io(Interest::READABLE, || {
let fd = socket.as_fd();
recv_msg(
fd,
read_buf,
msg_flags.unwrap_or(MsgFlags::empty()),
store_cmsg_settings,
)
.map_err(Into::into)
});
pub async fn send_to(
socket: &UdpSocket, send_buf: &[u8], sendmsg_settings: SendMsgSettings,
) -> Result<usize> {
std::future::poll_fn(|mut cx| {
poll_send_to(socket, &mut cx, send_buf, sendmsg_settings)
})
.await
}

match res {
Err(e) if e.kind() == ErrorKind::WouldBlock =>
socket.readable().await?,
_ => return res,
#[cfg(target_os = "linux")]
pub fn poll_recv_from(
socket: &UdpSocket, ctx: &mut Context<'_>, recv_buf: &mut [u8],
recvmsg_settings: &mut RecvMsgSettings,
) -> Poll<Result<RecvData>> {
loop {
match socket.poll_recv_ready(ctx) {
Poll::Ready(Ok(())) => {
// Important to use try_io so that Tokio can clear the socket's
// readiness flag
match socket.try_io(Interest::READABLE, || {
let fd = socket.as_fd();
recv_msg(fd, recv_buf, recvmsg_settings).map_err(Into::into)
}) {
// The `poll_recv_ready` future registers the ctx with Tokio.
// We can only return Pending when that
// future is Pending or we won't wake the
// runtime properly
Err(e) if e.kind() == ErrorKind::WouldBlock => {},
io_res => break Poll::Ready(io_res),
}
},
Poll::Ready(Err(e)) => break Poll::Ready(Err(e)),
Poll::Pending => break Poll::Pending,
}
}
}

#[cfg(not(target_os = "linux"))]
pub async fn send_to(
socket: &UdpSocket, client_addr: SocketAddr,
) -> Result<usize> {
socket.send_to(send_buf, client_addr).await
}

#[cfg(not(target_os = "linux"))]
#[cfg(target_os = "linux")]
pub async fn recv_from(
socket: &UdpSocket, read_buf: &mut [u8],
socket: &UdpSocket, recv_buf: &mut [u8],
recvmsg_settings: &mut RecvMsgSettings<'_>,
) -> Result<RecvData> {
let recv = socket.recv(read_buf).await?;

Ok(RecvData::from_bytes(bytes))
std::future::poll_fn(|mut ctx| {
poll_recv_from(socket, &mut ctx, recv_buf, recvmsg_settings)
})
.await
}

0 comments on commit ee36a17

Please sign in to comment.