Skip to content

Commit afe3c24

Browse files
committed
Add support for sendmmsg(2) on linux
https://man7.org/linux/man-pages/man2/sendmmsg.2.html Partially addresses bytecodealliance#1156. Signed-off-by: Colin Marc <[email protected]>
1 parent 666f5bd commit afe3c24

File tree

9 files changed

+343
-10
lines changed

9 files changed

+343
-10
lines changed

Diff for: src/backend/libc/net/msghdr.rs

+21-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ use crate::backend::net::write_sockaddr::{encode_sockaddr_v4, encode_sockaddr_v6
1212
use crate::io::{self, IoSlice, IoSliceMut};
1313
#[cfg(target_os = "linux")]
1414
use crate::net::xdp::SocketAddrXdp;
15-
use crate::net::{RecvAncillaryBuffer, SendAncillaryBuffer, SocketAddrV4, SocketAddrV6};
15+
use crate::net::{
16+
RawSocketAddr, RecvAncillaryBuffer, SendAncillaryBuffer, SocketAddrV4, SocketAddrV6,
17+
};
1618
use crate::utils::as_ptr;
1719

1820
use core::mem::{size_of, zeroed, MaybeUninit};
@@ -66,6 +68,24 @@ pub(crate) fn with_noaddr_msghdr<R>(
6668
})
6769
}
6870

71+
/// Create a message header with a pre-encoded address.
72+
pub(crate) fn with_addr_msghdr<R>(
73+
addr: &RawSocketAddr,
74+
iov: &[IoSlice<'_>],
75+
control: &mut SendAncillaryBuffer<'_, '_, '_>,
76+
f: impl FnOnce(c::msghdr) -> R,
77+
) -> R {
78+
f(c::msghdr {
79+
msg_name: addr.as_ptr() as _,
80+
msg_namelen: addr.namelen() as _,
81+
msg_iov: iov.as_ptr() as _,
82+
msg_iovlen: msg_iov_len(iov.len()),
83+
msg_control: control.as_control_ptr().cast(),
84+
msg_controllen: msg_control_len(control.control_len()),
85+
msg_flags: 0,
86+
})
87+
}
88+
6989
/// Create a message header intended to send with an IPv4 address.
7090
pub(crate) fn with_v4_msghdr<R>(
7191
addr: &SocketAddrV4,

Diff for: src/backend/libc/net/syscalls.rs

+21
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@ use super::msghdr::with_xdp_msghdr;
77
#[cfg(target_os = "linux")]
88
use super::write_sockaddr::encode_sockaddr_xdp;
99
use crate::backend::c;
10+
#[cfg(target_os = "linux")]
11+
use crate::backend::conv::ret_u32;
1012
use crate::backend::conv::{borrowed_fd, ret, ret_owned_fd, ret_send_recv, send_recv_len};
1113
use crate::fd::{BorrowedFd, OwnedFd};
1214
use crate::io;
1315
#[cfg(target_os = "linux")]
1416
use crate::net::xdp::SocketAddrXdp;
17+
#[cfg(target_os = "linux")]
18+
use crate::net::MMsgHdr;
1519
use crate::net::{SocketAddrAny, SocketAddrV4, SocketAddrV6};
1620
use crate::utils::as_ptr;
1721
use core::mem::{size_of, MaybeUninit};
@@ -455,6 +459,23 @@ pub(crate) fn sendmsg_xdp(
455459
})
456460
}
457461

462+
#[cfg(target_os = "linux")]
463+
pub(crate) fn sendmmsg(
464+
sockfd: BorrowedFd<'_>,
465+
msgs: &mut [MMsgHdr<'_>],
466+
flags: SendFlags,
467+
) -> io::Result<usize> {
468+
unsafe {
469+
ret_u32(c::sendmmsg(
470+
borrowed_fd(sockfd),
471+
msgs.as_mut_ptr() as _,
472+
msgs.len().try_into().unwrap_or(c::c_uint::MAX),
473+
bitflags_bits!(flags),
474+
))
475+
.map(|ret| ret as usize)
476+
}
477+
}
478+
458479
#[cfg(not(any(
459480
apple,
460481
windows,

Diff for: src/backend/linux_raw/c.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,12 @@ pub(crate) use linux_raw_sys::{
5656
general::{O_CLOEXEC as SOCK_CLOEXEC, O_NONBLOCK as SOCK_NONBLOCK},
5757
if_ether::*,
5858
net::{
59-
linger, msghdr, sockaddr, sockaddr_in, sockaddr_in6, sockaddr_un, socklen_t, AF_DECnet,
6059
__kernel_sa_family_t as sa_family_t, __kernel_sockaddr_storage as sockaddr_storage,
61-
cmsghdr, in6_addr, in_addr, ip_mreq, ip_mreq_source, ip_mreqn, ipv6_mreq, AF_APPLETALK,
62-
AF_ASH, AF_ATMPVC, AF_ATMSVC, AF_AX25, AF_BLUETOOTH, AF_BRIDGE, AF_CAN, AF_ECONET,
63-
AF_IEEE802154, AF_INET, AF_INET6, AF_IPX, AF_IRDA, AF_ISDN, AF_IUCV, AF_KEY, AF_LLC,
64-
AF_NETBEUI, AF_NETLINK, AF_NETROM, AF_PACKET, AF_PHONET, AF_PPPOX, AF_RDS, AF_ROSE,
60+
cmsghdr, in6_addr, in_addr, ip_mreq, ip_mreq_source, ip_mreqn, ipv6_mreq, linger, mmsghdr,
61+
msghdr, sockaddr, sockaddr_in, sockaddr_in6, sockaddr_un, socklen_t, AF_DECnet,
62+
AF_APPLETALK, AF_ASH, AF_ATMPVC, AF_ATMSVC, AF_AX25, AF_BLUETOOTH, AF_BRIDGE, AF_CAN,
63+
AF_ECONET, AF_IEEE802154, AF_INET, AF_INET6, AF_IPX, AF_IRDA, AF_ISDN, AF_IUCV, AF_KEY,
64+
AF_LLC, AF_NETBEUI, AF_NETLINK, AF_NETROM, AF_PACKET, AF_PHONET, AF_PPPOX, AF_RDS, AF_ROSE,
6565
AF_RXRPC, AF_SECURITY, AF_SNA, AF_TIPC, AF_UNIX, AF_UNSPEC, AF_WANPIPE, AF_X25, AF_XDP,
6666
IP6T_SO_ORIGINAL_DST, IPPROTO_FRAGMENT, IPPROTO_ICMPV6, IPPROTO_MH, IPPROTO_ROUTING,
6767
IPV6_ADD_MEMBERSHIP, IPV6_DROP_MEMBERSHIP, IPV6_FREEBIND, IPV6_MULTICAST_HOPS,

Diff for: src/backend/linux_raw/net/msghdr.rs

+20-1
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ use crate::backend::c;
99
#[cfg(target_os = "linux")]
1010
use crate::backend::net::write_sockaddr::encode_sockaddr_xdp;
1111
use crate::backend::net::write_sockaddr::{encode_sockaddr_v4, encode_sockaddr_v6};
12-
1312
use crate::io::{self, IoSlice, IoSliceMut};
1413
#[cfg(target_os = "linux")]
1514
use crate::net::xdp::SocketAddrXdp;
15+
#[cfg(target_os = "linux")]
16+
use crate::net::RawSocketAddr;
1617
use crate::net::{RecvAncillaryBuffer, SendAncillaryBuffer, SocketAddrV4, SocketAddrV6};
1718
use crate::utils::as_ptr;
1819

@@ -78,6 +79,24 @@ pub(crate) fn with_noaddr_msghdr<R>(
7879
})
7980
}
8081

82+
/// Create a message header with a pre-encoded address.
83+
pub(crate) fn with_addr_msghdr<R>(
84+
addr: &RawSocketAddr,
85+
iov: &[IoSlice<'_>],
86+
control: &mut SendAncillaryBuffer<'_, '_, '_>,
87+
f: impl FnOnce(c::msghdr) -> R,
88+
) -> R {
89+
f(c::msghdr {
90+
msg_name: addr.as_ptr() as _,
91+
msg_namelen: addr.namelen() as _,
92+
msg_iov: iov.as_ptr() as _,
93+
msg_iovlen: msg_iov_len(iov.len()),
94+
msg_control: control.as_control_ptr().cast(),
95+
msg_controllen: msg_control_len(control.control_len()),
96+
msg_flags: 0,
97+
})
98+
}
99+
81100
/// Create a message header intended to send with an IPv4 address.
82101
pub(crate) fn with_v4_msghdr<R>(
83102
addr: &SocketAddrV4,

Diff for: src/backend/linux_raw/net/syscalls.rs

+30-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use super::send_recv::{RecvFlags, SendFlags};
1616
use super::write_sockaddr::encode_sockaddr_xdp;
1717
use super::write_sockaddr::{encode_sockaddr_v4, encode_sockaddr_v6};
1818
use crate::backend::c;
19+
#[cfg(target_os = "linux")]
20+
use crate::backend::conv::slice_mut;
1921
use crate::backend::conv::{
2022
by_mut, by_ref, c_int, c_uint, pass_usize, ret, ret_owned_fd, ret_usize, size_of, slice,
2123
socklen_t, zero,
@@ -24,6 +26,8 @@ use crate::fd::{BorrowedFd, OwnedFd};
2426
use crate::io::{self, IoSlice, IoSliceMut};
2527
#[cfg(target_os = "linux")]
2628
use crate::net::xdp::SocketAddrXdp;
29+
#[cfg(target_os = "linux")]
30+
use crate::net::MMsgHdr;
2731
use crate::net::{
2832
AddressFamily, Protocol, RecvAncillaryBuffer, RecvMsgReturn, SendAncillaryBuffer, Shutdown,
2933
SocketAddrAny, SocketAddrUnix, SocketAddrV4, SocketAddrV6, SocketFlags, SocketType,
@@ -36,8 +40,8 @@ use {
3640
crate::backend::reg::{ArgReg, SocketArg},
3741
linux_raw_sys::net::{
3842
SYS_ACCEPT, SYS_ACCEPT4, SYS_BIND, SYS_CONNECT, SYS_GETPEERNAME, SYS_GETSOCKNAME,
39-
SYS_LISTEN, SYS_RECV, SYS_RECVFROM, SYS_RECVMSG, SYS_SEND, SYS_SENDMSG, SYS_SENDTO,
40-
SYS_SHUTDOWN, SYS_SOCKET, SYS_SOCKETPAIR,
43+
SYS_LISTEN, SYS_RECV, SYS_RECVFROM, SYS_RECVMSG, SYS_SEND, SYS_SENDMMSG, SYS_SENDMSG,
44+
SYS_SENDTO, SYS_SHUTDOWN, SYS_SOCKET, SYS_SOCKETPAIR,
4145
},
4246
};
4347

@@ -439,6 +443,30 @@ pub(crate) fn sendmsg_xdp(
439443
})
440444
}
441445

446+
#[cfg(target_os = "linux")]
447+
#[inline]
448+
pub(crate) fn sendmmsg(
449+
sockfd: BorrowedFd<'_>,
450+
msgs: &mut [MMsgHdr<'_>],
451+
flags: SendFlags,
452+
) -> io::Result<usize> {
453+
let (msgs, len) = slice_mut(msgs);
454+
455+
#[cfg(not(target_arch = "x86"))]
456+
let result = unsafe { ret_usize(syscall!(__NR_sendmmsg, sockfd, msgs, len, flags)) };
457+
458+
#[cfg(target_arch = "x86")]
459+
let result = unsafe {
460+
ret_usize(syscall!(
461+
__NR_socketcall,
462+
x86_sys(SYS_SENDMMSG),
463+
slice_just_addr::<ArgReg<'_, SocketArg>, _>(&[sockfd.into(), msgs, len, flags.into()])
464+
))
465+
};
466+
467+
result
468+
}
469+
442470
#[inline]
443471
pub(crate) fn shutdown(fd: BorrowedFd<'_>, how: Shutdown) -> io::Result<()> {
444472
#[cfg(not(target_arch = "x86"))]

Diff for: src/net/send_recv/msg.rs

+62
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@
22
33
#![allow(unsafe_code)]
44

5+
#[cfg(target_os = "linux")]
6+
use crate::backend::net::msghdr::{with_addr_msghdr, with_noaddr_msghdr};
57
use crate::backend::{self, c};
68
use crate::fd::{AsFd, BorrowedFd, OwnedFd};
79
use crate::io::{self, IoSlice, IoSliceMut};
10+
#[cfg(target_os = "linux")]
11+
use crate::net::RawSocketAddr;
812
#[cfg(linux_kernel)]
913
use crate::net::UCred;
1014

@@ -591,6 +595,48 @@ impl<'buf> Iterator for AncillaryDrain<'buf> {
591595

592596
impl FusedIterator for AncillaryDrain<'_> {}
593597

598+
/// An ABI-compatible wrapper for `mmsghdr`, for sending multiple messages with
599+
/// [sendmmsg].
600+
#[cfg(target_os = "linux")]
601+
#[repr(transparent)]
602+
pub struct MMsgHdr<'a> {
603+
raw: c::mmsghdr,
604+
_phantom: PhantomData<&'a mut ()>,
605+
}
606+
607+
#[cfg(target_os = "linux")]
608+
impl<'a> MMsgHdr<'a> {
609+
/// Constructs a new message with no destination address.
610+
pub fn new(iov: &'a [IoSlice<'_>], control: &'a mut SendAncillaryBuffer<'_, '_, '_>) -> Self {
611+
with_noaddr_msghdr(iov, control, Self::wrap)
612+
}
613+
614+
/// Constructs a new message to a specific address.
615+
pub fn new_with_addr(
616+
addr: &'a RawSocketAddr,
617+
iov: &'a [IoSlice<'_>],
618+
control: &'a mut SendAncillaryBuffer<'_, '_, '_>,
619+
) -> MMsgHdr<'a> {
620+
with_addr_msghdr(addr, iov, control, Self::wrap)
621+
}
622+
623+
fn wrap(msg_hdr: c::msghdr) -> Self {
624+
Self {
625+
raw: c::mmsghdr {
626+
msg_hdr,
627+
msg_len: 0,
628+
},
629+
_phantom: PhantomData,
630+
}
631+
}
632+
633+
/// Returns the number of bytes sent. This will return 0 until after a
634+
/// successful call to [sendmmsg].
635+
pub fn bytes_sent(&self) -> usize {
636+
self.raw.msg_len as _
637+
}
638+
}
639+
594640
/// `sendmsg(msghdr)`—Sends a message on a socket.
595641
///
596642
/// # References
@@ -781,6 +827,22 @@ pub fn sendmsg_any(
781827
}
782828
}
783829

830+
/// `sendmmsg(msghdr)`—Sends multiple messages on a socket.
831+
///
832+
/// # References
833+
/// - [Linux]
834+
///
835+
/// [Linux]: https://man7.org/linux/man-pages/man2/sendmmsg.2.html
836+
#[inline]
837+
#[cfg(target_os = "linux")]
838+
pub fn sendmmsg(
839+
socket: impl AsFd,
840+
msgs: &mut [MMsgHdr<'_>],
841+
flags: SendFlags,
842+
) -> io::Result<usize> {
843+
backend::net::syscalls::sendmmsg(socket.as_fd(), msgs, flags)
844+
}
845+
784846
/// `recvmsg(msghdr)`—Receives a message from a socket.
785847
///
786848
/// # References

Diff for: src/net/socket_addr_any.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::utils::{as_mut_ptr, as_ptr};
1818
use crate::{backend, io};
1919
#[cfg(feature = "std")]
2020
use core::fmt;
21+
use core::mem::zeroed;
2122

2223
pub use backend::net::addr::SocketAddrStorage;
2324

@@ -88,7 +89,7 @@ impl SocketAddrAny {
8889
/// and returns it.
8990
pub fn to_raw(&self) -> RawSocketAddr {
9091
let mut raw = RawSocketAddr {
91-
storage: unsafe { std::mem::zeroed() },
92+
storage: unsafe { zeroed() },
9293
len: 0,
9394
};
9495

Diff for: tests/net/v4.rs

+88
Original file line numberDiff line numberDiff line change
@@ -194,3 +194,91 @@ fn test_v4_msg() {
194194
client.join().unwrap();
195195
server.join().unwrap();
196196
}
197+
198+
#[test]
199+
#[cfg(target_os = "linux")]
200+
fn test_v4_sendmmsg() {
201+
crate::init();
202+
203+
use std::net::TcpStream;
204+
205+
use rustix::io::IoSlice;
206+
use rustix::net::{sendmmsg, MMsgHdr};
207+
208+
fn server(ready: Arc<(Mutex<u16>, Condvar)>) {
209+
let connection_socket = socket(AddressFamily::INET, SocketType::STREAM, None).unwrap();
210+
211+
let name = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0);
212+
bind_v4(&connection_socket, &name).unwrap();
213+
214+
let who = match getsockname(&connection_socket).unwrap() {
215+
SocketAddrAny::V4(addr) => addr,
216+
_ => panic!(),
217+
};
218+
219+
listen(&connection_socket, 1).unwrap();
220+
221+
{
222+
let (lock, cvar) = &*ready;
223+
let mut port = lock.lock().unwrap();
224+
*port = who.port();
225+
cvar.notify_all();
226+
}
227+
228+
let mut buffer = vec![0; 13];
229+
let mut data_socket: TcpStream = accept(&connection_socket).unwrap().into();
230+
231+
std::io::Read::read_exact(&mut data_socket, &mut buffer).unwrap();
232+
assert_eq!(String::from_utf8_lossy(&buffer), "hello...world");
233+
}
234+
235+
fn client(ready: Arc<(Mutex<u16>, Condvar)>) {
236+
let port = {
237+
let (lock, cvar) = &*ready;
238+
let mut port = lock.lock().unwrap();
239+
while *port == 0 {
240+
port = cvar.wait(port).unwrap();
241+
}
242+
*port
243+
};
244+
245+
let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port);
246+
let data_socket = socket(AddressFamily::INET, SocketType::STREAM, None).unwrap();
247+
connect_v4(&data_socket, &addr).unwrap();
248+
249+
let mut off = 0;
250+
while off < 2 {
251+
let sent = sendmmsg(
252+
&data_socket,
253+
&mut [
254+
MMsgHdr::new(&[IoSlice::new(b"hello")], &mut Default::default()),
255+
MMsgHdr::new(&[IoSlice::new(b"...world")], &mut Default::default()),
256+
][off..],
257+
SendFlags::empty(),
258+
)
259+
.unwrap();
260+
261+
off += sent;
262+
}
263+
}
264+
265+
let ready = Arc::new((Mutex::new(0_u16), Condvar::new()));
266+
let ready_clone = Arc::clone(&ready);
267+
268+
let server = thread::Builder::new()
269+
.name("server".to_string())
270+
.spawn(move || {
271+
server(ready);
272+
})
273+
.unwrap();
274+
275+
let client = thread::Builder::new()
276+
.name("client".to_string())
277+
.spawn(move || {
278+
client(ready_clone);
279+
})
280+
.unwrap();
281+
282+
client.join().unwrap();
283+
server.join().unwrap();
284+
}

0 commit comments

Comments
 (0)