diff --git a/examples/Cargo.toml b/examples/Cargo.toml index d95a32d036d..5fa4f7438c5 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -10,7 +10,7 @@ edition = "2018" tokio = { version = "0.3.0", path = "../tokio", features = ["full", "tracing"] } tracing = "0.1" tracing-subscriber = { version = "0.2.7", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log"] } -tokio-util = { version = "0.4.0", path = "../tokio-util", features = ["full"] } +tokio-util = { version = "0.5.0", path = "../tokio-util", features = ["full"] } bytes = "0.6" futures = "0.3.0" http = "0.2" diff --git a/tokio-util/CHANGELOG.md b/tokio-util/CHANGELOG.md index e9aa0bac747..48dfe778e26 100644 --- a/tokio-util/CHANGELOG.md +++ b/tokio-util/CHANGELOG.md @@ -1,3 +1,11 @@ +### Added +- io: `poll_read_buf` util fn (#2972). + +# 0.5.0 (October 30, 2020) + +### Changed +- io: update `bytes` to 0.6 (#3071). + # 0.4.0 (October 15, 2020) ### Added diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index c1b2ee66db4..ba2f3a4f1f6 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -7,13 +7,13 @@ name = "tokio-util" # - Cargo.toml # - Update CHANGELOG.md. # - Create "v0.2.x" git tag. -version = "0.4.0" +version = "0.5.0" edition = "2018" authors = ["Tokio Contributors "] license = "MIT" repository = "https://github.com/tokio-rs/tokio" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-util/0.4.0/tokio_util" +documentation = "https://docs.rs/tokio-util/0.5.0/tokio_util" description = """ Additional utilities for working with Tokio. """ diff --git a/tokio-util/src/codec/framed_impl.rs b/tokio-util/src/codec/framed_impl.rs index c161808f66e..e8b29999e10 100644 --- a/tokio-util/src/codec/framed_impl.rs +++ b/tokio-util/src/codec/framed_impl.rs @@ -150,7 +150,7 @@ where // got room for at least one byte to read to ensure that we don't // get a spurious 0 that looks like EOF state.buffer.reserve(1); - let bytect = match poll_read_buf(cx, pinned.inner.as_mut(), &mut state.buffer)? { + let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer)? { Poll::Ready(ct) => ct, Poll::Pending => return Poll::Pending, }; diff --git a/tokio-util/src/io/mod.rs b/tokio-util/src/io/mod.rs index 6f181ab1703..eefd65a5e83 100644 --- a/tokio-util/src/io/mod.rs +++ b/tokio-util/src/io/mod.rs @@ -13,3 +13,4 @@ mod stream_reader; pub use self::read_buf::read_buf; pub use self::reader_stream::ReaderStream; pub use self::stream_reader::StreamReader; +pub use crate::util::poll_read_buf; diff --git a/tokio-util/src/io/read_buf.rs b/tokio-util/src/io/read_buf.rs index 5bc0d5866f4..cc3c505f934 100644 --- a/tokio-util/src/io/read_buf.rs +++ b/tokio-util/src/io/read_buf.rs @@ -59,7 +59,7 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = &mut *self; - crate::util::poll_read_buf(cx, Pin::new(this.0), this.1) + crate::util::poll_read_buf(Pin::new(this.0), cx, this.1) } } } diff --git a/tokio-util/src/io/reader_stream.rs b/tokio-util/src/io/reader_stream.rs index ab0c22fba73..3e6a05eff4d 100644 --- a/tokio-util/src/io/reader_stream.rs +++ b/tokio-util/src/io/reader_stream.rs @@ -83,7 +83,7 @@ impl Stream for ReaderStream { this.buf.reserve(CAPACITY); } - match poll_read_buf(cx, reader, &mut this.buf) { + match poll_read_buf(reader, cx, &mut this.buf) { Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => { self.project().reader.set(None); diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index f4cf94704ad..09dd5a1071c 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -1,4 +1,4 @@ -#![doc(html_root_url = "https://docs.rs/tokio-util/0.4.0")] +#![doc(html_root_url = "https://docs.rs/tokio-util/0.5.0")] #![allow(clippy::needless_doctest_main)] #![warn( missing_debug_implementations, @@ -69,10 +69,49 @@ mod util { use std::pin::Pin; use std::task::{Context, Poll}; - pub(crate) fn poll_read_buf( - cx: &mut Context<'_>, + /// Try to read data from an `AsyncRead` into an implementer of the [`Buf`] trait. + /// + /// [`Buf`]: bytes::Buf + /// + /// # Example + /// + /// ``` + /// use bytes::{Bytes, BytesMut}; + /// use tokio::stream; + /// use tokio::io::Result; + /// use tokio_util::io::{StreamReader, poll_read_buf}; + /// use futures::future::poll_fn; + /// use std::pin::Pin; + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// + /// // Create a reader from an iterator. This particular reader will always be + /// // ready. + /// let mut read = StreamReader::new(stream::iter(vec![Result::Ok(Bytes::from_static(&[0, 1, 2, 3]))])); + /// + /// let mut buf = BytesMut::new(); + /// let mut reads = 0; + /// + /// loop { + /// reads += 1; + /// let n = poll_fn(|cx| poll_read_buf(Pin::new(&mut read), cx, &mut buf)).await?; + /// + /// if n == 0 { + /// break; + /// } + /// } + /// + /// // one or more reads might be necessary. + /// assert!(reads >= 1); + /// assert_eq!(&buf[..], &[0, 1, 2, 3]); + /// # Ok(()) + /// # } + /// ``` + #[cfg_attr(not(feature = "io"), allow(unreachable_pub))] + pub fn poll_read_buf( io: Pin<&mut T>, - buf: &mut impl BufMut, + cx: &mut Context<'_>, + buf: &mut B, ) -> Poll> { if !buf.has_remaining_mut() { return Poll::Ready(Ok(0)); diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 6c5de90ddc2..c4825c20650 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -102,7 +102,7 @@ mio = { version = "0.7.3", optional = true } num_cpus = { version = "1.8.0", optional = true } parking_lot = { version = "0.11.0", optional = true } # Not in full slab = { version = "0.4.1", optional = true } -tracing = { version = "0.1.16", default-features = false, features = ["std"], optional = true } # Not in full +tracing = { version = "0.1.21", default-features = false, features = ["std"], optional = true } # Not in full [target.'cfg(unix)'.dependencies] libc = { version = "0.2.42", optional = true } @@ -126,9 +126,12 @@ tempfile = "3.1.0" [target.'cfg(loom)'.dev-dependencies] loom = { version = "0.3.5", features = ["futures", "checkpoint"] } +[build-dependencies] +autocfg = "1" # Needed for conditionally enabling `track-caller` + [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] [package.metadata.playground] -features = ["full"] +features = ["full"] \ No newline at end of file diff --git a/tokio/build.rs b/tokio/build.rs new file mode 100644 index 00000000000..fe5c8300560 --- /dev/null +++ b/tokio/build.rs @@ -0,0 +1,22 @@ +use autocfg::AutoCfg; + +fn main() { + match AutoCfg::new() { + Ok(ac) => { + // The #[track_caller] attribute was stabilized in rustc 1.46.0. + if ac.probe_rustc_version(1, 46) { + autocfg::emit("tokio_track_caller") + } + } + + Err(e) => { + // If we couldn't detect the compiler version and features, just + // print a warning. This isn't a fatal error: we can still build + // Tokio, we just can't enable cfgs automatically. + println!( + "cargo:warning=tokio: failed to detect compiler features: {}", + e + ); + } + } +} diff --git a/tokio/src/net/unix/ucred.rs b/tokio/src/net/unix/ucred.rs index ef214a702fa..7d73ee02ebb 100644 --- a/tokio/src/net/unix/ucred.rs +++ b/tokio/src/net/unix/ucred.rs @@ -1,8 +1,10 @@ -use libc::{gid_t, uid_t}; +use libc::{gid_t, pid_t, uid_t}; /// Credentials of a process #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] pub struct UCred { + /// PID (process ID) of the process + pid: Option, /// UID (user ID) of the process uid: uid_t, /// GID (group ID) of the process @@ -19,6 +21,13 @@ impl UCred { pub fn gid(&self) -> gid_t { self.gid } + + /// Gets PID (process ID) of the process. + /// + /// This is only implemented under linux, android, IOS and MacOS + pub fn pid(&self) -> Option { + self.pid + } } #[cfg(any(target_os = "linux", target_os = "android"))] @@ -26,12 +35,13 @@ pub(crate) use self::impl_linux::get_peer_cred; #[cfg(any( target_os = "dragonfly", - target_os = "macos", - target_os = "ios", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd" ))] +pub(crate) use self::impl_bsd::get_peer_cred; + +#[cfg(any(target_os = "macos", target_os = "ios"))] pub(crate) use self::impl_macos::get_peer_cred; #[cfg(any(target_os = "solaris", target_os = "illumos"))] @@ -77,6 +87,7 @@ pub(crate) mod impl_linux { Ok(super::UCred { uid: ucred.uid, gid: ucred.gid, + pid: Some(ucred.pid), }) } else { Err(io::Error::last_os_error()) @@ -87,13 +98,11 @@ pub(crate) mod impl_linux { #[cfg(any( target_os = "dragonfly", - target_os = "macos", - target_os = "ios", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd" ))] -pub(crate) mod impl_macos { +pub(crate) mod impl_bsd { use crate::net::unix::UnixStream; use libc::getpeereid; @@ -114,6 +123,54 @@ pub(crate) mod impl_macos { Ok(super::UCred { uid: uid.assume_init(), gid: gid.assume_init(), + pid: None, + }) + } else { + Err(io::Error::last_os_error()) + } + } + } +} + +#[cfg(any(target_os = "macos", target_os = "ios"))] +pub(crate) mod impl_macos { + use crate::net::unix::UnixStream; + + use libc::{c_void, getpeereid, getsockopt, pid_t, LOCAL_PEEREPID, SOL_LOCAL}; + use std::io; + use std::mem::size_of; + use std::mem::MaybeUninit; + use std::os::unix::io::AsRawFd; + + pub(crate) fn get_peer_cred(sock: &UnixStream) -> io::Result { + unsafe { + let raw_fd = sock.as_raw_fd(); + + let mut uid = MaybeUninit::uninit(); + let mut gid = MaybeUninit::uninit(); + let mut pid: MaybeUninit = MaybeUninit::uninit(); + let mut pid_size: MaybeUninit = MaybeUninit::new(size_of::() as u32); + + if getsockopt( + raw_fd, + SOL_LOCAL, + LOCAL_PEEREPID, + pid.as_mut_ptr() as *mut c_void, + pid_size.as_mut_ptr(), + ) != 0 + { + return Err(io::Error::last_os_error()); + } + + assert!(pid_size.assume_init() == (size_of::() as u32)); + + let ret = getpeereid(raw_fd, uid.as_mut_ptr(), gid.as_mut_ptr()); + + if ret == 0 { + Ok(super::UCred { + uid: uid.assume_init(), + gid: gid.assume_init(), + pid: Some(pid.assume_init()), }) } else { Err(io::Error::last_os_error()) @@ -154,7 +211,11 @@ pub(crate) mod impl_solaris { ucred_free(cred); - Ok(super::UCred { uid, gid }) + Ok(super::UCred { + uid, + gid, + pid: None, + }) } else { Err(io::Error::last_os_error()) } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index d2a69a24ceb..bce5d07d17d 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -125,15 +125,19 @@ impl Handle { /// }); /// # } /// ``` + #[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn(&self, future: F) -> JoinHandle where F: Future + Send + 'static, F::Output: Send + 'static, { + #[cfg(feature = "tracing")] + let future = crate::util::trace::task(future, "task"); self.spawner.spawn(future) } - /// Run the provided function on an executor dedicated to blocking operations. + /// Run the provided function on an executor dedicated to blocking + /// operations. /// /// # Examples /// @@ -151,12 +155,24 @@ impl Handle { /// println!("now running on a worker thread"); /// }); /// # } + #[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn_blocking(&self, func: F) -> JoinHandle where F: FnOnce() -> R + Send + 'static, { #[cfg(feature = "tracing")] let func = { + #[cfg(tokio_track_caller)] + let location = std::panic::Location::caller(); + #[cfg(tokio_track_caller)] + let span = tracing::trace_span!( + target: "tokio::task", + "task", + kind = %"blocking", + function = %std::any::type_name::(), + spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()), + ); + #[cfg(not(tokio_track_caller))] let span = tracing::trace_span!( target: "tokio::task", "task", diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 08da39f4e6d..9419085379e 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -378,6 +378,7 @@ cfg_rt! { /// }); /// # } /// ``` + #[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn(&self, future: F) -> JoinHandle where F: Future + Send + 'static, @@ -402,6 +403,7 @@ cfg_rt! { /// println!("now running on a worker thread"); /// }); /// # } + #[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn_blocking(&self, func: F) -> JoinHandle where F: FnOnce() -> R + Send + 'static, diff --git a/tokio/src/task/blocking.rs b/tokio/src/task/blocking.rs index fc6632be3b7..36bc4579bfe 100644 --- a/tokio/src/task/blocking.rs +++ b/tokio/src/task/blocking.rs @@ -104,6 +104,7 @@ cfg_rt_multi_thread! { /// # Ok(()) /// # } /// ``` +#[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn_blocking(f: F) -> JoinHandle where F: FnOnce() -> R + Send + 'static, diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 5896126c7e5..566b2f2d7f8 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -190,6 +190,7 @@ cfg_rt! { /// }).await; /// } /// ``` + #[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn_local(future: F) -> JoinHandle where F: Future + 'static, @@ -273,6 +274,7 @@ impl LocalSet { /// } /// ``` /// [`spawn_local`]: fn@spawn_local + #[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn_local(&self, future: F) -> JoinHandle where F: Future + 'static, diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs index 77acb579f73..a060852dc23 100644 --- a/tokio/src/task/spawn.rs +++ b/tokio/src/task/spawn.rs @@ -122,6 +122,7 @@ cfg_rt! { /// ```text /// error[E0391]: cycle detected when processing `main` /// ``` + #[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn(task: T) -> JoinHandle where T: Future + Send + 'static, diff --git a/tokio/src/util/trace.rs b/tokio/src/util/trace.rs index 18956a36306..96a9db91d1f 100644 --- a/tokio/src/util/trace.rs +++ b/tokio/src/util/trace.rs @@ -1,47 +1,27 @@ cfg_trace! { cfg_rt! { - use std::future::Future; - use std::pin::Pin; - use std::task::{Context, Poll}; - use pin_project_lite::pin_project; - - use tracing::Span; - - pin_project! { - /// A future that has been instrumented with a `tracing` span. - #[derive(Debug, Clone)] - pub(crate) struct Instrumented { - #[pin] - inner: T, - span: Span, - } - } - - impl Future for Instrumented { - type Output = T::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let _enter = this.span.enter(); - this.inner.poll(cx) - } - } - - impl Instrumented { - pub(crate) fn new(inner: T, span: Span) -> Self { - Self { inner, span } - } - } + pub(crate) use tracing::instrument::Instrumented; #[inline] + #[cfg_attr(tokio_track_caller, track_caller)] pub(crate) fn task(task: F, kind: &'static str) -> Instrumented { + use tracing::instrument::Instrument; + #[cfg(tokio_track_caller)] + let location = std::panic::Location::caller(); + #[cfg(tokio_track_caller)] + let span = tracing::trace_span!( + target: "tokio::task", + "task", + %kind, + spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()), + ); + #[cfg(not(tokio_track_caller))] let span = tracing::trace_span!( target: "tokio::task", "task", %kind, - future = %std::any::type_name::(), ); - Instrumented::new(task, span) + task.instrument(span) } } }