From 110ab9e0cd9676c1cfbf8311dece8f78fa346a28 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Wed, 13 Dec 2023 13:24:29 +0300 Subject: [PATCH] sync yamux Signed-off-by: onur-ozkan --- Cargo.lock | 63 ++++++--- muxers/yamux/CHANGELOG.md | 28 +++- muxers/yamux/Cargo.toml | 11 +- muxers/yamux/src/lib.rs | 282 ++++++++++++++++++++++++-------------- 4 files changed, 258 insertions(+), 126 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cef33101b5fb..22ee35926ad2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1682,9 +1682,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" dependencies = [ "futures-channel", "futures-core", @@ -1697,9 +1697,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" dependencies = [ "futures-core", "futures-sink", @@ -1707,15 +1707,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" [[package]] name = "futures-executor" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" dependencies = [ "futures-core", "futures-task", @@ -1725,9 +1725,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" [[package]] name = "futures-lite" @@ -1746,9 +1746,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", @@ -1778,15 +1778,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" +checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" [[package]] name = "futures-task" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" [[package]] name = "futures-ticker" @@ -1811,9 +1811,9 @@ dependencies = [ [[package]] name = "futures-util" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" dependencies = [ "futures-channel", "futures-core", @@ -3312,12 +3312,14 @@ name = "libp2p-yamux" version = "0.44.0" dependencies = [ "async-std", + "either", "futures", "libp2p-core", "libp2p-muxer-test-harness", - "log", "thiserror", - "yamux", + "tracing", + "yamux 0.12.1", + "yamux 0.13.1", ] [[package]] @@ -6675,14 +6677,31 @@ dependencies = [ [[package]] name = "yamux" -version = "0.10.2" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5d9ba232399af1783a58d8eb26f6b5006fbefe2dc9ef36bd283324792d03ea5" +checksum = "9ed0164ae619f2dc144909a9f082187ebb5893693d8c0196e8085283ccd4b776" dependencies = [ "futures", "log", "nohash-hasher", "parking_lot", + "pin-project", + "rand 0.8.5", + "static_assertions", +] + +[[package]] +name = "yamux" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad1d0148b89300047e72994bee99ecdabd15a9166a7b70c8b8c37c314dcc9002" +dependencies = [ + "futures", + "instant", + "log", + "nohash-hasher", + "parking_lot", + "pin-project", "rand 0.8.5", "static_assertions", ] diff --git a/muxers/yamux/CHANGELOG.md b/muxers/yamux/CHANGELOG.md index f0055c3fe4f7..de608b195f86 100644 --- a/muxers/yamux/CHANGELOG.md +++ b/muxers/yamux/CHANGELOG.md @@ -1,4 +1,30 @@ -## 0.44.0 +## 0.45.1 + +- Deprecate `WindowUpdateMode::on_receive`. + It does not enforce flow-control, i.e. breaks backpressure. + Use `WindowUpdateMode::on_read` instead. + See `yamux` crate version `v0.12.1` and [Yamux PR #177](https://github.com/libp2p/rust-yamux/pull/177). +- `yamux` `v0.13` enables auto-tuning for the Yamux stream receive window. + While preserving small buffers on low-latency and/or low-bandwidth connections, this change allows for high-latency and/or high-bandwidth connections to exhaust the available bandwidth on a single stream. + Have `libp2p-yamux` use `yamux` `v0.13` (new version) by default and fall back to `yamux` `v0.12` (old version) when setting any configuration options. + Thus default users benefit from the increased performance, while power users with custom configurations maintain the old behavior. + `libp2p-yamux` will switch over to `yamux` `v0.13` entirely with the next breaking release. + See [PR 4970](https://github.com/libp2p/rust-libp2p/pull/4970). + +## 0.45.0 + +- Migrate to `{In,Out}boundConnectionUpgrade` traits. + See [PR 4695](https://github.com/libp2p/rust-libp2p/pull/4695). + +## 0.44.1 + +- Update to `yamux` `v0.12` which brings performance improvements and introduces an ACK backlog of 256 inbound streams. + When interacting with other libp2p nodes that are also running this or a newer version, the creation of inbound streams will be backpressured once the ACK backlog is hit. + See [PR 3013]. + +[PR 3013]: https://github.com/libp2p/rust-libp2p/pull/3013 + +## 0.44.0 - Raise MSRV to 1.65. See [PR 3715]. diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index 50a9e97d1d0f..e8c7c5dacece 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -11,11 +11,13 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -futures = "0.3.28" +either = "1" +futures = "0.3.29" libp2p-core = { workspace = true } thiserror = "1.0" -yamux = "0.10.0" -log = "0.4" +yamux012 = { version = "0.12.1", package = "yamux" } +yamux013 = { version = "0.13.1", package = "yamux" } +tracing = "0.1.37" [dev-dependencies] async-std = { version = "1.7.0", features = ["attributes"] } @@ -27,3 +29,6 @@ libp2p-muxer-test-harness = { path = "../test-harness" } all-features = true rustdoc-args = ["--cfg", "docsrs"] rustc-args = ["--cfg", "docsrs"] + +[lints] +workspace = true diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index b24c976ebf24..29dbc32bc767 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -22,29 +22,27 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -use futures::{future, prelude::*, ready, stream::BoxStream}; +use either::Either; +use futures::{future, prelude::*, ready}; use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use std::collections::VecDeque; use std::io::{IoSlice, IoSliceMut}; use std::task::Waker; use std::{ - fmt, io, iter, mem, + io, iter, pin::Pin, task::{Context, Poll}, }; use thiserror::Error; -use yamux::ConnectionError; /// A Yamux connection. +#[derive(Debug)] pub struct Muxer { - /// The [`futures::stream::Stream`] of incoming substreams. - incoming: BoxStream<'static, Result>, - /// Handle to control the connection. - control: yamux::Control, + connection: Either, yamux013::Connection>, /// Temporarily buffers inbound streams in case our node is performing backpressure on the remote. /// - /// The only way how yamux can make progress is by driving the stream. However, the + /// The only way how yamux can make progress is by calling [`yamux013::Connection::poll_next_inbound`]. However, the /// [`StreamMuxer`] interface is designed to allow a caller to selectively make progress via /// [`StreamMuxer::poll_inbound`] and [`StreamMuxer::poll_outbound`] whilst the more general /// [`StreamMuxer::poll`] is designed to make progress on existing streams etc. @@ -54,44 +52,36 @@ pub struct Muxer { inbound_stream_buffer: VecDeque, /// Waker to be called when new inbound streams are available. inbound_stream_waker: Option, - - _phantom: std::marker::PhantomData, } -const MAX_BUFFERED_INBOUND_STREAMS: usize = 25; - -impl fmt::Debug for Muxer { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("Yamux") - } -} +/// How many streams to buffer before we start resetting them. +/// +/// This is equal to the ACK BACKLOG in `rust-yamux`. +/// Thus, for peers running on a recent version of `rust-libp2p`, we should never need to reset streams because they'll voluntarily stop opening them once they hit the ACK backlog. +const MAX_BUFFERED_INBOUND_STREAMS: usize = 256; impl Muxer where C: AsyncRead + AsyncWrite + Send + Unpin + 'static, { /// Create a new Yamux connection. - fn new(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { - let conn = yamux::Connection::new(io, cfg, mode); - let ctrl = conn.control(); - - Self { - incoming: yamux::into_stream(conn).err_into().boxed(), - control: ctrl, + fn new(connection: Either, yamux013::Connection>) -> Self { + Muxer { + connection, inbound_stream_buffer: VecDeque::default(), inbound_stream_waker: None, - _phantom: Default::default(), } } } impl StreamMuxer for Muxer where - C: AsyncRead + AsyncWrite + Send + Unpin + 'static, + C: AsyncRead + AsyncWrite + Unpin + 'static, { type Substream = Stream; type Error = Error; + #[tracing::instrument(level = "trace", name = "StreamMuxer::poll_inbound", skip(self, cx))] fn poll_inbound( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -108,16 +98,31 @@ where Poll::Pending } + #[tracing::instrument(level = "trace", name = "StreamMuxer::poll_outbound", skip(self, cx))] fn poll_outbound( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - Pin::new(&mut self.control) - .poll_open_stream(cx) - .map_ok(Stream) - .map_err(Error) + let stream = match self.connection.as_mut() { + Either::Left(c) => ready!(c.poll_new_outbound(cx)) + .map_err(|e| Error(Either::Left(e))) + .map(|s| Stream(Either::Left(s))), + Either::Right(c) => ready!(c.poll_new_outbound(cx)) + .map_err(|e| Error(Either::Right(e))) + .map(|s| Stream(Either::Right(s))), + }?; + Poll::Ready(Ok(stream)) + } + + #[tracing::instrument(level = "trace", name = "StreamMuxer::poll_close", skip(self, cx))] + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.connection.as_mut() { + Either::Left(c) => c.poll_close(cx).map_err(|e| Error(Either::Left(e))), + Either::Right(c) => c.poll_close(cx).map_err(|e| Error(Either::Right(e))), + } } + #[tracing::instrument(level = "trace", name = "StreamMuxer::poll", skip(self, cx))] fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -127,7 +132,10 @@ where let inbound_stream = ready!(this.poll_inner(cx))?; if this.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS { - log::warn!("dropping {} because buffer is full", inbound_stream.0); + tracing::warn!( + stream=%inbound_stream.0, + "dropping stream because buffer is full" + ); drop(inbound_stream); } else { this.inbound_stream_buffer.push_back(inbound_stream); @@ -141,28 +149,11 @@ where cx.waker().wake_by_ref(); Poll::Pending } - - fn poll_close(mut self: Pin<&mut Self>, c: &mut Context<'_>) -> Poll> { - if let Poll::Ready(()) = Pin::new(&mut self.control).poll_close(c).map_err(Error)? { - return Poll::Ready(Ok(())); - } - - while let Poll::Ready(maybe_inbound_stream) = - self.incoming.poll_next_unpin(c).map_err(Error)? - { - match maybe_inbound_stream { - Some(inbound_stream) => mem::drop(inbound_stream), - None => return Poll::Ready(Ok(())), - } - } - - Poll::Pending - } } /// A stream produced by the yamux multiplexer. #[derive(Debug)] -pub struct Stream(yamux::Stream); +pub struct Stream(Either); impl AsyncRead for Stream { fn poll_read( @@ -170,7 +161,7 @@ impl AsyncRead for Stream { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - Pin::new(&mut self.0).poll_read(cx, buf) + either::for_both!(self.0.as_mut(), s => Pin::new(s).poll_read(cx, buf)) } fn poll_read_vectored( @@ -178,7 +169,7 @@ impl AsyncRead for Stream { cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll> { - Pin::new(&mut self.0).poll_read_vectored(cx, bufs) + either::for_both!(self.0.as_mut(), s => Pin::new(s).poll_read_vectored(cx, bufs)) } } @@ -188,7 +179,7 @@ impl AsyncWrite for Stream { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - Pin::new(&mut self.0).poll_write(cx, buf) + either::for_both!(self.0.as_mut(), s => Pin::new(s).poll_write(cx, buf)) } fn poll_write_vectored( @@ -196,45 +187,67 @@ impl AsyncWrite for Stream { cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll> { - Pin::new(&mut self.0).poll_write_vectored(cx, bufs) + either::for_both!(self.0.as_mut(), s => Pin::new(s).poll_write_vectored(cx, bufs)) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.0).poll_flush(cx) + either::for_both!(self.0.as_mut(), s => Pin::new(s).poll_flush(cx)) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.0).poll_close(cx) + either::for_both!(self.0.as_mut(), s => Pin::new(s).poll_close(cx)) } } impl Muxer where - C: AsyncRead + AsyncWrite + Send + Unpin + 'static, + C: AsyncRead + AsyncWrite + Unpin + 'static, { fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll> { - self.incoming.poll_next_unpin(cx).map(|maybe_stream| { - let stream = maybe_stream - .transpose() - .map_err(Error)? - .map(Stream) - .ok_or(Error(ConnectionError::Closed))?; - - Ok(stream) - }) + let stream = match self.connection.as_mut() { + Either::Left(c) => ready!(c.poll_next_inbound(cx)) + .ok_or(Error(Either::Left(yamux012::ConnectionError::Closed)))? + .map_err(|e| Error(Either::Left(e))) + .map(|s| Stream(Either::Left(s)))?, + Either::Right(c) => ready!(c.poll_next_inbound(cx)) + .ok_or(Error(Either::Right(yamux013::ConnectionError::Closed)))? + .map_err(|e| Error(Either::Right(e))) + .map(|s| Stream(Either::Right(s)))?, + }; + + Poll::Ready(Ok(stream)) } } /// The yamux configuration. #[derive(Debug, Clone)] -pub struct Config { - inner: yamux::Config, - mode: Option, +pub struct Config(Either); + +impl Default for Config { + fn default() -> Self { + Self(Either::Right(Config013::default())) + } +} + +#[derive(Debug, Clone)] +struct Config012 { + inner: yamux012::Config, + mode: Option, +} + +impl Default for Config012 { + fn default() -> Self { + let mut inner = yamux012::Config::default(); + // For conformity with mplex, read-after-close on a multiplexed + // connection is never permitted and not configurable. + inner.set_read_after_close(false); + Self { inner, mode: None } + } } /// The window update mode determines when window updates are /// sent to the remote, giving it new credit to send more data. -pub struct WindowUpdateMode(yamux::WindowUpdateMode); +pub struct WindowUpdateMode(yamux012::WindowUpdateMode); impl WindowUpdateMode { /// The window update mode whereby the remote is given @@ -249,8 +262,10 @@ impl WindowUpdateMode { /// > size must be tuned appropriately for the desired /// > throughput and level of tolerance for (temporarily) /// > slow receivers. + #[deprecated(note = "Use `WindowUpdateMode::on_read` instead.")] pub fn on_receive() -> Self { - WindowUpdateMode(yamux::WindowUpdateMode::OnReceive) + #[allow(deprecated)] + WindowUpdateMode(yamux012::WindowUpdateMode::OnReceive) } /// The window update mode whereby the remote is given new @@ -268,62 +283,71 @@ impl WindowUpdateMode { /// > **Note**: With this strategy, there is usually no point in the /// > receive buffer being larger than the window size. pub fn on_read() -> Self { - WindowUpdateMode(yamux::WindowUpdateMode::OnRead) + WindowUpdateMode(yamux012::WindowUpdateMode::OnRead) } } impl Config { /// Creates a new `YamuxConfig` in client mode, regardless of whether /// it will be used for an inbound or outbound upgrade. + #[deprecated(note = "Will be removed with the next breaking release.")] pub fn client() -> Self { - Self { - mode: Some(yamux::Mode::Client), + Self(Either::Left(Config012 { + mode: Some(yamux012::Mode::Client), ..Default::default() - } + })) } /// Creates a new `YamuxConfig` in server mode, regardless of whether /// it will be used for an inbound or outbound upgrade. + #[deprecated(note = "Will be removed with the next breaking release.")] pub fn server() -> Self { - Self { - mode: Some(yamux::Mode::Server), + Self(Either::Left(Config012 { + mode: Some(yamux012::Mode::Server), ..Default::default() - } + })) } /// Sets the size (in bytes) of the receive window per substream. + #[deprecated( + note = "Will be replaced in the next breaking release with a connection receive window size limit." + )] pub fn set_receive_window_size(&mut self, num_bytes: u32) -> &mut Self { - self.inner.set_receive_window(num_bytes); - self + self.set(|cfg| cfg.set_receive_window(num_bytes)) } /// Sets the maximum size (in bytes) of the receive buffer per substream. + #[deprecated(note = "Will be removed with the next breaking release.")] pub fn set_max_buffer_size(&mut self, num_bytes: usize) -> &mut Self { - self.inner.set_max_buffer_size(num_bytes); - self + self.set(|cfg| cfg.set_max_buffer_size(num_bytes)) } /// Sets the maximum number of concurrent substreams. pub fn set_max_num_streams(&mut self, num_streams: usize) -> &mut Self { - self.inner.set_max_num_streams(num_streams); - self + self.set(|cfg| cfg.set_max_num_streams(num_streams)) } /// Sets the window update mode that determines when the remote /// is given new credit for sending more data. + #[deprecated( + note = "`WindowUpdate::OnRead` is the default. `WindowUpdate::OnReceive` breaks backpressure, is thus not recommended, and will be removed in the next breaking release. Thus this method becomes obsolete and will be removed with the next breaking release." + )] pub fn set_window_update_mode(&mut self, mode: WindowUpdateMode) -> &mut Self { - self.inner.set_window_update_mode(mode.0); - self + self.set(|cfg| cfg.set_window_update_mode(mode.0)) } -} -impl Default for Config { - fn default() -> Self { - let mut inner = yamux::Config::default(); - // For conformity with mplex, read-after-close on a multiplexed - // connection is never permitted and not configurable. - inner.set_read_after_close(false); - Config { inner, mode: None } + fn set(&mut self, f: impl FnOnce(&mut yamux012::Config) -> &mut yamux012::Config) -> &mut Self { + let cfg012 = match self.0.as_mut() { + Either::Left(c) => &mut c.inner, + Either::Right(_) => { + self.0 = Either::Left(Config012::default()); + &mut self.0.as_mut().unwrap_left().inner + } + }; + + f(cfg012); + + self } } @@ -345,8 +369,18 @@ where type Future = future::Ready>; fn upgrade_inbound(self, io: C, _: Self::Info) -> Self::Future { - let mode = self.mode.unwrap_or(yamux::Mode::Server); - future::ready(Ok(Muxer::new(io, self.inner, mode))) + let connection = match self.0 { + Either::Left(Config012 { inner, mode }) => Either::Left(yamux012::Connection::new( + io, + inner, + mode.unwrap_or(yamux012::Mode::Server), + )), + Either::Right(Config013(cfg)) => { + Either::Right(yamux013::Connection::new(io, cfg, yamux013::Mode::Server)) + } + }; + + future::ready(Ok(Muxer::new(connection))) } } @@ -359,21 +393,69 @@ where type Future = future::Ready>; fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future { - let mode = self.mode.unwrap_or(yamux::Mode::Client); - future::ready(Ok(Muxer::new(io, self.inner, mode))) + let connection = match self.0 { + Either::Left(Config012 { inner, mode }) => Either::Left(yamux012::Connection::new( + io, + inner, + mode.unwrap_or(yamux012::Mode::Client), + )), + Either::Right(Config013(cfg)) => { + Either::Right(yamux013::Connection::new(io, cfg, yamux013::Mode::Client)) + } + }; + + future::ready(Ok(Muxer::new(connection))) + } +} + +#[derive(Debug, Clone)] +struct Config013(yamux013::Config); + +impl Default for Config013 { + fn default() -> Self { + let mut cfg = yamux013::Config::default(); + // For conformity with mplex, read-after-close on a multiplexed + // connection is never permitted and not configurable. + cfg.set_read_after_close(false); + Self(cfg) } } /// The Yamux [`StreamMuxer`] error type. #[derive(Debug, Error)] #[error(transparent)] -pub struct Error(yamux::ConnectionError); +pub struct Error(Either); impl From for io::Error { fn from(err: Error) -> Self { match err.0 { - yamux::ConnectionError::Io(e) => e, - e => io::Error::new(io::ErrorKind::Other, e), + Either::Left(err) => match err { + yamux012::ConnectionError::Io(e) => e, + e => io::Error::new(io::ErrorKind::Other, e), + }, + Either::Right(err) => match err { + yamux013::ConnectionError::Io(e) => e, + e => io::Error::new(io::ErrorKind::Other, e), + }, } } } + +#[cfg(test)] +mod test { + use super::*; + #[test] + fn config_set_switches_to_v012() { + // By default we use yamux v0.13. Thus we provide the benefits of yamux v0.13 to all users + // that do not depend on any of the behaviors (i.e. configuration options) of v0.12. + let mut cfg = Config::default(); + assert!(matches!( + cfg, + Config(Either::Right(Config013(yamux013::Config { .. }))) + )); + + // In case a user makes any configurations, use yamux v0.12 instead. + cfg.set_max_num_streams(42); + assert!(matches!(cfg, Config(Either::Left(Config012 { .. })))); + } +}