Skip to content

Commit

Permalink
provide a non-destructive mechanism to determine if a sink/stream are…
Browse files Browse the repository at this point in the history
… paired
  • Loading branch information
tones111 authored and taiki-e committed Dec 24, 2023
1 parent 40eed5c commit 4910799
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 1 deletion.
7 changes: 6 additions & 1 deletion futures-util/src/lock/bilock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,19 @@ impl<T> BiLock<T> {
BiLockAcquire { bilock: self }
}

/// Returns `true` only if the other `BiLock<T>` originated from the same call to `BiLock::new`.
pub fn is_pair_of(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.arc, &other.arc)
}

/// Attempts to put the two "halves" of a `BiLock<T>` back together and
/// recover the original value. Succeeds only if the two `BiLock<T>`s
/// originated from the same call to `BiLock::new`.
pub fn reunite(self, other: Self) -> Result<T, ReuniteError<T>>
where
T: Unpin,
{
if Arc::ptr_eq(&self.arc, &other.arc) {
if self.is_pair_of(&other) {
drop(other);
let inner = Arc::try_unwrap(self.arc)
.ok()
Expand Down
80 changes: 80 additions & 0 deletions futures-util/src/stream/stream/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ pub struct SplitStream<S>(BiLock<S>);

impl<S> Unpin for SplitStream<S> {}

impl<S> SplitStream<S> {
/// Returns `true` if the `SplitStream<S>` and `SplitSink<S>` originate from the same call to `StreamExt::split`.
pub fn is_pair_of<Item>(&self, other: &SplitSink<S, Item>) -> bool {
other.is_pair_of(&self)
}
}

impl<S: Unpin> SplitStream<S> {
/// Attempts to put the two "halves" of a split `Stream + Sink` back
/// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
Expand Down Expand Up @@ -60,6 +67,13 @@ impl<S: Sink<Item> + Unpin, Item> SplitSink<S, Item> {
}
}

impl<S, Item> SplitSink<S, Item> {
/// Returns `true` if the `SplitStream<S>` and `SplitSink<S>` originate from the same call to `StreamExt::split`.
pub fn is_pair_of(&self, other: &SplitStream<S>) -> bool {
self.lock.is_pair_of(&other.0)
}
}

impl<S: Sink<Item>, Item> SplitSink<S, Item> {
fn poll_flush_slot(
mut inner: Pin<&mut S>,
Expand Down Expand Up @@ -142,3 +156,69 @@ impl<T, Item> fmt::Display for ReuniteError<T, Item> {

#[cfg(feature = "std")]
impl<T: core::any::Any, Item> std::error::Error for ReuniteError<T, Item> {}

#[cfg(test)]
mod tests {
use super::*;
use crate::{sink::Sink, stream::StreamExt};
use core::marker::PhantomData;

struct NopStream<Item> {
phantom: PhantomData<Item>,
}

impl<Item> Stream for NopStream<Item> {
type Item = Item;

fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
todo!()
}
}

impl<Item> Sink<Item> for NopStream<Item> {
type Error = ();

fn poll_ready(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
todo!()
}

fn start_send(self: Pin<&mut Self>, _item: Item) -> Result<(), Self::Error> {
todo!()
}

fn poll_flush(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
todo!()
}

fn poll_close(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
todo!()
}
}

#[test]
fn test_pairing() {
let s1 = NopStream::<()> { phantom: PhantomData };
let (sink1, stream1) = s1.split();
assert!(sink1.is_pair_of(&stream1));
assert!(stream1.is_pair_of(&sink1));

let s2 = NopStream::<()> { phantom: PhantomData };
let (sink2, stream2) = s2.split();
assert!(sink2.is_pair_of(&stream2));
assert!(stream2.is_pair_of(&sink2));

assert!(!sink1.is_pair_of(&stream2));
assert!(!stream1.is_pair_of(&sink2));
assert!(!sink2.is_pair_of(&stream1));
assert!(!stream2.is_pair_of(&sink1));
}
}

0 comments on commit 4910799

Please sign in to comment.