Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Bytes::from_owner #742

Merged
merged 28 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 143 additions & 1 deletion src/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use core::iter::FromIterator;
use core::mem::{self, ManuallyDrop};
use core::ops::{Deref, RangeBounds};
use core::ptr::NonNull;
use core::{cmp, fmt, hash, ptr, slice, usize};

use alloc::{
Expand Down Expand Up @@ -116,6 +117,7 @@ pub(crate) struct Vtable {
pub to_mut: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> BytesMut,
/// fn(data)
pub is_unique: unsafe fn(&AtomicPtr<()>) -> bool,
pub cheap_into_mut: unsafe fn(&AtomicPtr<()>) -> bool,
/// fn(data, ptr, len)
pub drop: unsafe fn(&mut AtomicPtr<()>, *const u8, usize),
}
Expand Down Expand Up @@ -200,6 +202,54 @@ impl Bytes {
}
}

/// Create [Bytes] with a buffer whose lifetime is controlled
/// via an explicit owner.
///
/// A common use case is to zero-copy construct from mapped memory.
///
/// ```ignore
amunra marked this conversation as resolved.
Show resolved Hide resolved
/// use bytes::Bytes;
/// use std::fs::File;
/// use memmap2::Map;
///
/// let file = File::open("upload_bundle.tar.gz")?;
/// let mmap = unsafe { Mmap::map(&file) }?;
/// let b = Bytes::from_owner(mmap);
/// ```
///
/// The `owner` will be transferred to the constructed [Bytes] object, which
/// will ensure it is dropped once all remaining clones of the constructed
/// object are dropped. The owner will then be responsible for dropping the
/// specified region of memory as part of its [Drop] implementation.
///
/// Note that converting [Bytes] constructed from an owner into a [BytesMut]
/// will always create a deep copy of the buffer into newly allocated memory.
pub fn from_owner<T>(owner: T) -> Self
where
T: AsRef<[u8]> + Send + 'static,
{
let owned = Box::into_raw(Box::new(Owned {
lifetime: OwnedLifetime {
ref_cnt: AtomicUsize::new(1),
drop: owned_box_and_drop::<T>,
},
owner,
}));

let mut ret = Bytes {
ptr: NonNull::dangling().as_ptr(),
len: 0,
data: AtomicPtr::new(owned.cast()),
vtable: &OWNED_VTABLE,
};

let buf = unsafe { &*owned }.owner.as_ref();
ret.ptr = buf.as_ptr();
ret.len = buf.len();

ret
}

/// Returns the number of bytes contained in this `Bytes`.
///
/// # Examples
Expand Down Expand Up @@ -536,6 +586,9 @@ impl Bytes {
/// If `self` is not unique for the entire original buffer, this will fail
/// and return self.
///
/// This will also always fail if the buffer was constructed via
/// [from_owner](Bytes::from_owner).
///
/// # Examples
///
/// ```
Expand All @@ -545,7 +598,7 @@ impl Bytes {
/// assert_eq!(bytes.try_into_mut(), Ok(BytesMut::from(&b"hello"[..])));
/// ```
pub fn try_into_mut(self) -> Result<BytesMut, Bytes> {
if self.is_unique() {
if unsafe { (self.vtable.cheap_into_mut)(&self.data) } {
amunra marked this conversation as resolved.
Show resolved Hide resolved
Ok(self.into())
} else {
Err(self)
Expand Down Expand Up @@ -986,6 +1039,7 @@ const STATIC_VTABLE: Vtable = Vtable {
to_vec: static_to_vec,
to_mut: static_to_mut,
is_unique: static_is_unique,
cheap_into_mut: static_is_unique,
drop: static_drop,
};

Expand All @@ -1012,13 +1066,99 @@ unsafe fn static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize) {
// nothing to drop for &'static [u8]
}

// ===== impl OwnedVtable =====

#[repr(C)]
struct OwnedLifetime {
ref_cnt: AtomicUsize,
drop: unsafe fn(*mut ()),
}

#[repr(C)]
struct Owned<T> {
lifetime: OwnedLifetime,
owner: T,
}

unsafe fn owned_box_and_drop<T>(ptr: *mut ()) {
let b: Box<Owned<T>> = Box::from_raw(ptr as _);
drop(b);
}

unsafe fn owned_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
let owned = data.load(Ordering::Acquire);
amunra marked this conversation as resolved.
Show resolved Hide resolved
let ref_cnt = &(*owned.cast::<OwnedLifetime>()).ref_cnt;
let old_cnt = ref_cnt.fetch_add(1, Ordering::Relaxed);
if old_cnt > usize::MAX >> 1 {
crate::abort()
}

Bytes {
ptr,
len,
data: AtomicPtr::new(owned as _),
vtable: &OWNED_VTABLE,
}
}

unsafe fn owned_to_vec(_data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> {
let slice = slice::from_raw_parts(ptr, len);
slice.to_vec()
}

unsafe fn owned_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
let bytes_mut = BytesMut::from_vec(owned_to_vec(data, ptr, len));
owned_drop_impl(data.load(Ordering::Acquire));
amunra marked this conversation as resolved.
Show resolved Hide resolved
bytes_mut
}

unsafe fn owned_is_unique(data: &AtomicPtr<()>) -> bool {
let owned = data.load(Ordering::Acquire);
amunra marked this conversation as resolved.
Show resolved Hide resolved
let ref_cnt = &(*owned.cast::<OwnedLifetime>()).ref_cnt;
ref_cnt.load(Ordering::Relaxed) == 1
amunra marked this conversation as resolved.
Show resolved Hide resolved
}

unsafe fn owned_cheap_into_mut(_data: &AtomicPtr<()>) -> bool {
// Since the memory's ownership is tied to an external owner
// it is never zero-copy to create a BytesMut.
false
}

unsafe fn owned_drop_impl(owned: *mut ()) {
let lifetime = owned.cast::<OwnedLifetime>();
let ref_cnt = &(*lifetime).ref_cnt;

let old_cnt = ref_cnt.fetch_sub(1, Ordering::Acquire);
if old_cnt != 1 {
return;
}
amunra marked this conversation as resolved.
Show resolved Hide resolved

let drop = &(*lifetime).drop;
drop(owned)
amunra marked this conversation as resolved.
Show resolved Hide resolved
}

unsafe fn owned_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) {
let owned = data.load(Ordering::Acquire);
amunra marked this conversation as resolved.
Show resolved Hide resolved
owned_drop_impl(owned);
}

static OWNED_VTABLE: Vtable = Vtable {
clone: owned_clone,
to_vec: owned_to_vec,
to_mut: owned_to_mut,
is_unique: owned_is_unique,
cheap_into_mut: owned_cheap_into_mut,
drop: owned_drop,
};

// ===== impl PromotableVtable =====

static PROMOTABLE_EVEN_VTABLE: Vtable = Vtable {
clone: promotable_even_clone,
to_vec: promotable_even_to_vec,
to_mut: promotable_even_to_mut,
is_unique: promotable_is_unique,
cheap_into_mut: promotable_is_unique,
drop: promotable_even_drop,
};

Expand All @@ -1027,6 +1167,7 @@ static PROMOTABLE_ODD_VTABLE: Vtable = Vtable {
to_vec: promotable_odd_to_vec,
to_mut: promotable_odd_to_mut,
is_unique: promotable_is_unique,
cheap_into_mut: promotable_is_unique,
drop: promotable_odd_drop,
};

Expand Down Expand Up @@ -1203,6 +1344,7 @@ static SHARED_VTABLE: Vtable = Vtable {
to_vec: shared_to_vec,
to_mut: shared_to_mut,
is_unique: shared_is_unique,
cheap_into_mut: shared_is_unique,
drop: shared_drop,
};

Expand Down
1 change: 1 addition & 0 deletions src/bytes_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1776,6 +1776,7 @@ static SHARED_VTABLE: Vtable = Vtable {
to_vec: shared_v_to_vec,
to_mut: shared_v_to_mut,
is_unique: shared_v_is_unique,
cheap_into_mut: shared_v_is_unique,
drop: shared_v_drop,
};

Expand Down
91 changes: 91 additions & 0 deletions tests/test_bytes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#![warn(rust_2018_idioms)]

use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use std::usize;

Expand Down Expand Up @@ -1479,3 +1481,92 @@ fn split_to_empty_addr_mut() {
let _ = &empty_start[..];
let _ = &buf[..];
}

#[derive(Clone)]
struct OwnedTester<const L: usize> {
buf: [u8; L],
drop_count: Arc<AtomicUsize>,
}

impl<const L: usize> OwnedTester<L> {
fn new(buf: [u8; L], drop_count: Arc<AtomicUsize>) -> Self {
Self { buf, drop_count }
}
}

impl<const L: usize> AsRef<[u8]> for OwnedTester<L> {
fn as_ref(&self) -> &[u8] {
self.buf.as_slice()
}
}

impl<const L: usize> Drop for OwnedTester<L> {
fn drop(&mut self) {
self.drop_count.fetch_add(1, Ordering::AcqRel);
amunra marked this conversation as resolved.
Show resolved Hide resolved
}
}

#[test]
fn owned_basic() {
let buf: [u8; 5] = [1, 2, 3, 4, 5];
let drop_counter = Arc::new(AtomicUsize::new(0));
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_owner(owner);
assert_eq!(&buf[..], &b1[..]);
assert!(b1.is_unique());
let b2 = b1.clone();
assert!(!b1.is_unique());
assert!(!b2.is_unique());
assert_eq!(drop_counter.load(Ordering::Acquire), 0);
drop(b1);
assert_eq!(drop_counter.load(Ordering::Acquire), 0);
assert!(b2.is_unique());
let b3 = b2.slice(1..b2.len() - 1);
assert!(!b2.is_unique());
assert!(!b3.is_unique());
drop(b2);
assert_eq!(drop_counter.load(Ordering::Acquire), 0);
assert!(b3.is_unique());
drop(b3);
assert_eq!(drop_counter.load(Ordering::Acquire), 1);
}
Darksonn marked this conversation as resolved.
Show resolved Hide resolved

#[test]
fn owned_to_mut() {
let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let drop_counter = Arc::new(AtomicUsize::new(0));
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_owner(owner);

// Holding an owner will fail converting to a BytesMut,
// even when the bytes instance is unique.
assert!(b1.is_unique());
let b1 = b1.try_into_mut().unwrap_err();
assert_eq!(drop_counter.load(Ordering::Acquire), 0);

// That said, it's still possible, just not cheap.
let bm1: BytesMut = b1.into();
let new_buf = &bm1[..];
assert_eq!(new_buf, &buf[..]);

assert_eq!(drop_counter.load(Ordering::Acquire), 1);
drop(bm1);
assert_eq!(drop_counter.load(Ordering::Acquire), 1);
}

#[test]
fn owned_to_vec() {
let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let drop_counter = Arc::new(AtomicUsize::new(0));
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_owner(owner);
assert!(b1.is_unique());

let v1 = b1.to_vec();
assert!(b1.is_unique());
assert_eq!(&v1[..], &buf[..]);
assert_eq!(&v1[..], &b1[..]);

drop(b1);
assert_eq!(drop_counter.load(Ordering::Acquire), 1);
}