diff --git a/CHANGELOG.md b/CHANGELOG.md index 0458fb07..424a7984 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.11.0 + +- Remove `ConnectionError` from public API in favor `std::io::Error`. See [PR ] + # 0.10.1 - Update `parking_lot` dependency. See [PR 126]. diff --git a/Cargo.toml b/Cargo.toml index 0b53bbd1..c8eddc70 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yamux" -version = "0.10.1" +version = "0.11.0" authors = ["Parity Technologies "] license = "Apache-2.0 OR MIT" description = "Multiplexer over reliable, ordered connections" diff --git a/src/connection/control.rs b/src/connection/control.rs index cf260350..d80e3044 100644 --- a/src/connection/control.rs +++ b/src/connection/control.rs @@ -9,19 +9,19 @@ // at https://opensource.org/licenses/MIT. use super::ControlCommand; -use crate::{error::ConnectionError, Stream}; +use crate::error::{into_io_error, ConnectionError}; +use crate::Stream; use futures::{ channel::{mpsc, oneshot}, prelude::*, ready, }; use std::{ + io, pin::Pin, task::{Context, Poll}, }; -type Result = std::result::Result; - /// The Yamux `Connection` controller. /// /// While a Yamux connection makes progress via its `next_stream` method, @@ -36,7 +36,7 @@ pub struct Control { /// Command channel to `Connection`. sender: mpsc::Sender, /// Pending state of `poll_open_stream`. - pending_open: Option>>, + pending_open: Option>>, /// Pending state of `poll_close`. pending_close: Option>, } @@ -61,14 +61,19 @@ impl Control { } /// Open a new stream to the remote. - pub async fn open_stream(&mut self) -> Result { + pub async fn open_stream(&mut self) -> io::Result { let (tx, rx) = oneshot::channel(); - self.sender.send(ControlCommand::OpenStream(tx)).await?; - rx.await? + self.sender + .send(ControlCommand::OpenStream(tx)) + .await + .map_err(into_io_error)?; + let stream = rx.await.map_err(into_io_error)?.map_err(into_io_error)?; + + Ok(stream) } /// Close the connection. - pub async fn close(&mut self) -> Result<()> { + pub async fn close(&mut self) -> io::Result<()> { let (tx, rx) = oneshot::channel(); if self .sender @@ -86,17 +91,22 @@ impl Control { } /// [`Poll`] based alternative to [`Control::open_stream`]. - pub fn poll_open_stream(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + pub fn poll_open_stream( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll> { loop { match self.pending_open.take() { None => { - ready!(self.sender.poll_ready(cx)?); + ready!(self.sender.poll_ready(cx).map_err(into_io_error)?); let (tx, rx) = oneshot::channel(); - self.sender.start_send(ControlCommand::OpenStream(tx))?; + self.sender + .start_send(ControlCommand::OpenStream(tx)) + .map_err(into_io_error)?; self.pending_open = Some(rx) } - Some(mut rx) => match rx.poll_unpin(cx)? { - Poll::Ready(result) => return Poll::Ready(result), + Some(mut rx) => match rx.poll_unpin(cx).map_err(into_io_error)? { + Poll::Ready(result) => return Poll::Ready(result.map_err(into_io_error)), Poll::Pending => { self.pending_open = Some(rx); return Poll::Pending; @@ -112,7 +122,7 @@ impl Control { } /// [`Poll`] based alternative to [`Control::close`]. - pub fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + pub fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { loop { match self.pending_close.take() { None => { diff --git a/src/error.rs b/src/error.rs index f9a20c21..7803851f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -9,6 +9,7 @@ // at https://opensource.org/licenses/MIT. use crate::frame::FrameDecodeError; +use std::io; /// The various error cases a connection may encounter. #[non_exhaustive] @@ -63,6 +64,15 @@ impl std::error::Error for ConnectionError { } } +pub fn into_io_error>(error: E) -> std::io::Error { + let connection_error = error.into(); + + match connection_error { + ConnectionError::Io(io) => io, + other => io::Error::new(io::ErrorKind::Other, other), + } +} + impl From for ConnectionError { fn from(e: std::io::Error) -> Self { ConnectionError::Io(e) diff --git a/src/lib.rs b/src/lib.rs index a5e2eb4c..5c7d8cc3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,7 +35,6 @@ mod tests; pub(crate) mod connection; pub use crate::connection::{into_stream, Connection, Control, Mode, Packet, Stream}; -pub use crate::error::ConnectionError; pub use crate::frame::{ header::{HeaderDecodeError, StreamId}, FrameDecodeError, diff --git a/src/tests.rs b/src/tests.rs index bdfc15e0..b626aadc 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -9,7 +9,7 @@ // at https://opensource.org/licenses/MIT. use crate::WindowUpdateMode; -use crate::{connection::State, Config, Connection, ConnectionError, Control, Mode}; +use crate::{connection::State, error::ConnectionError, Config, Connection, Control, Mode}; use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures::executor::LocalPool; use futures::future::join; @@ -164,7 +164,12 @@ fn prop_max_streams() { for _ in 0..max_streams { v.push(control.open_stream().await.expect("open_stream")) } - if let Err(ConnectionError::TooManyStreams) = control.open_stream().await { + if let Err(Some(Ok(ConnectionError::TooManyStreams))) = + control.open_stream().await.map_err(|e| { + e.into_inner() + .map(|inner| inner.downcast::().map(|b| *b)) + }) + { true } else { false diff --git a/tests/concurrent.rs b/tests/concurrent.rs index d816eb87..e364beda 100644 --- a/tests/concurrent.rs +++ b/tests/concurrent.rs @@ -11,6 +11,7 @@ use futures::{channel::mpsc, prelude::*}; use quickcheck::{Arbitrary, Gen, QuickCheck}; use std::{ + io, net::{Ipv4Addr, SocketAddr, SocketAddrV4}, sync::Arc, }; @@ -106,7 +107,7 @@ async fn roundtrip( log::debug!("C: {}: read {} bytes", stream.id(), frame.len()); assert_eq!(&data[..], &frame[..]); tx.unbounded_send(1).expect("unbounded_send"); - Ok::<(), yamux::ConnectionError>(()) + Ok::<(), io::Error>(()) }); } let n = rx