Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Bytes::from_owner #742

Merged
merged 28 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 141 additions & 1 deletion src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ pub(crate) struct Vtable {
pub to_mut: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> BytesMut,
/// fn(data)
pub is_unique: unsafe fn(&AtomicPtr<()>) -> bool,
pub cheap_into_mut: unsafe fn(&AtomicPtr<()>) -> bool,
/// fn(data, ptr, len)
pub drop: unsafe fn(&mut AtomicPtr<()>, *const u8, usize),
}
Expand Down Expand Up @@ -200,6 +201,53 @@ impl Bytes {
}
}

/// Create [Bytes] with a buffer whose lifetime is controlled
/// via an explicit owner.
///
/// A common use case is to zero-copy construct from mapped memory.
///
/// ```ignore
amunra marked this conversation as resolved.
Show resolved Hide resolved
/// use bytes::Bytes;
/// use std::fs::File;
/// use memmap2::Map;
///
/// let file = File::open("upload_bundle.tar.gz")?;
/// let mmap = unsafe { Mmap::map(&file) }?;
/// let b = Bytes::from_owner(mmap);
/// ```
///
/// The `owner` will be transferred to the constructed [Bytes] object, which
/// will ensure it is dropped once all remaining clones of the constructed
/// object are dropped. The owner will then be responsible for dropping the
/// specified region of memory as part of its [Drop] implementation.
///
/// Note that converting [Bytes] constructed from an owner into a [BytesMut]
/// will always create a deep copy of the buffer into newly allocated memory.
pub fn from_owner<T: AsRef<[u8]>>(owner: T) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this need some kind of StableDeref bound to protect against weird edge cases where the referenced region changes over time?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The string crate had a similar issue: carllerche/string#9

I do wonder if Sync would be sufficient here... In general, we probably do need a Send bound as well since Bytes is Send

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sync wouldn't cover it since you can have a thread-safe implementation that still mutates internally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for StableDeref isn't calling as_ref just once, never moving the memory afterwards because it's Boxed and never calling any other method already guaranteeing the Bytes invariants and soundness in general?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah only calling as_ref once may actually make this fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a safe way for it to still be UB, even with only calling it once. If I pass in a form of Arc<Mutex<_>>, and then afterwards change the internal buffer completely (maybe a reallocation), the captured pointer could be referring to invalid memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reference we get back from the as_ref needs to be valid for the remaining lifetime of the object though - it can't be swapped out without a call to an &mut method that would ensure the initial borrow is gone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's sound as-is. You cannot write a safe thing where this breaks even with Arc/Mutex because if the buffer is inside the mutex, the implementer of AsRef can't return the buffer from as_ref since it borrows from the mutex guard.

As written, the as_ref call essentially borrows the owner field until we run the destructor. As long as we never make any mutable function calls on owner during that time, it's okay.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use std::sync::{Arc, Mutex};

struct Evil {
    within: Arc<Mutex<Vec<u8>>>,
}

impl AsRef<[u8]> for Evil {
    fn as_ref(&self) -> &[u8] {
        // error[E0515]: cannot return value referencing temporary value
        self.within.try_lock().unwrap().as_ref()
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken the current API should guard against Owner exposing a thread local as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we may want our own trait instead of AsRef<[u8]> so we can add more methods in the future.

Copy link
Contributor Author

@amunra amunra Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<opinionated>

A custom trait is how I had originally started implementing this, but gave up early on for a few reasons:

  • Worse ergonomics for custom trait:
    • Many things already implement AsRef<[u8]>, so Bytes::from_owner(mmap); is easy and straight-forward.
    • Conversely, a custom trait would require Bytes::from_custom(MyMmapWrapper::new(mmap)).
  • Safety:
    • AsRef<[u8]> can be called during construction and only once. It's a trick that makes the from_owner API safe.
    • Any additional APIs would - in practice - need to be called during the lifetime of bytes. As an example trait CustomOwner { fn try_into_mut(self) -> Result<impl CustomOwnerMut, Self>; } (unless I'm wrong) would make it into an unsafe trait.
    • What's missing from the current approach is zero-copy conversion to BytesMut.
      The implementor of a trait that that exposed this would need to be very careful here to respect the Vec<u8> semantics. Context: If Bytes is not unique, the existing code creates a newly allocated Vec<u8> copy of the referenced memory. Unless due care is taken - in the specific case of an Mmap - this would likely lead an implementor to erroneously assume that they should create an MmapMut. This would be wrong, especially if the mmapped section is backed by a file as the resulting behaviour would sometimes update the contents of a file and sometimes not, depending on Bytes's internal refcount.
    • In general, also allowing any other calls to the object during its lifetime also allows for scope for the object to invalidate the slice returned from .as_ref() rendering that initial call unsafe too, since there's no borrow checker here to police usage.
  • Feature creep:
    • By all means, I think a custom trait approach might work in the future for a fn from_custom <T: CustomOwner> and as a good way of exposing more of the vtable functionality, but it is not where the current pain point in the API is and there are many hurdles to get past that as has been shown in previous efforts to tackle meta: Expose Bytes vtable #437.

The full-fat custom use case would mostly cover - as far as I can tell - mixing Bytes created and managed via multiple custom memory allocators (QuestDB would certainly care!), but I suspect there's a more elegant way to handle those anyway.

</opinionated>

.. that said, if you guys prefer that route I'm happy to amend my PR in that direction.

amunra marked this conversation as resolved.
Show resolved Hide resolved
let owned = Box::into_raw(Box::new(Owned {
lifetime: OwnedLifetime {
ref_cnt: AtomicUsize::new(1),
drop: owned_box_and_drop::<T>,
},
owner,
}));

// Now that the ownership is moved to the Box its memory location is pinned.
// It's therefore safe to access the memory region, which will remain valid,
// even if the slice returned refers to memory within the owner object itself.
let owned_ref = unsafe { &(*owned) };
let buf = owned_ref.owner.as_ref();
amunra marked this conversation as resolved.
Show resolved Hide resolved
let ptr = buf.as_ptr();
let len = buf.len();

Bytes {
ptr,
len,
data: AtomicPtr::new(owned as _),
vtable: &OWNED_VTABLE,
}
}

/// Returns the number of bytes contained in this `Bytes`.
///
/// # Examples
Expand Down Expand Up @@ -536,6 +584,9 @@ impl Bytes {
/// If `self` is not unique for the entire original buffer, this will fail
/// and return self.
///
/// This will also always fail if the buffer was constructed via
/// [from_owner](Bytes::from_owner).
///
/// # Examples
///
/// ```
Expand All @@ -545,7 +596,7 @@ impl Bytes {
/// assert_eq!(bytes.try_into_mut(), Ok(BytesMut::from(&b"hello"[..])));
/// ```
pub fn try_into_mut(self) -> Result<BytesMut, Bytes> {
if self.is_unique() {
if unsafe { (self.vtable.cheap_into_mut)(&self.data) } {
amunra marked this conversation as resolved.
Show resolved Hide resolved
Ok(self.into())
} else {
Err(self)
Expand Down Expand Up @@ -986,6 +1037,7 @@ const STATIC_VTABLE: Vtable = Vtable {
to_vec: static_to_vec,
to_mut: static_to_mut,
is_unique: static_is_unique,
cheap_into_mut: static_is_unique,
drop: static_drop,
};

Expand All @@ -1012,13 +1064,99 @@ unsafe fn static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize) {
// nothing to drop for &'static [u8]
}

// ===== impl OwnedVtable =====

#[repr(C)]
struct OwnedLifetime {
ref_cnt: AtomicUsize,
drop: unsafe fn(*mut ()),
}

#[repr(C)]
struct Owned<T> {
lifetime: OwnedLifetime,
owner: T,
}

unsafe fn owned_box_and_drop<T>(ptr: *mut ()) {
let b: Box<Owned<T>> = Box::from_raw(ptr as _);
drop(b);
}

unsafe fn owned_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
let owned = data.load(Ordering::Acquire);
amunra marked this conversation as resolved.
Show resolved Hide resolved
let ref_cnt = &(*owned.cast::<OwnedLifetime>()).ref_cnt;
let old_cnt = ref_cnt.fetch_add(1, Ordering::Relaxed);
if old_cnt > usize::MAX >> 1 {
crate::abort()
}

Bytes {
ptr,
len,
data: AtomicPtr::new(owned as _),
vtable: &OWNED_VTABLE,
}
}

unsafe fn owned_to_vec(_data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> {
let slice = slice::from_raw_parts(ptr, len);
slice.to_vec()
}

unsafe fn owned_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
let bytes_mut = BytesMut::from_vec(owned_to_vec(data, ptr, len));
owned_drop_impl(data.load(Ordering::Acquire));
amunra marked this conversation as resolved.
Show resolved Hide resolved
bytes_mut
}

unsafe fn owned_is_unique(data: &AtomicPtr<()>) -> bool {
let owned = data.load(Ordering::Acquire);
amunra marked this conversation as resolved.
Show resolved Hide resolved
let ref_cnt = &(*owned.cast::<OwnedLifetime>()).ref_cnt;
ref_cnt.load(Ordering::Relaxed) == 1
amunra marked this conversation as resolved.
Show resolved Hide resolved
}

unsafe fn owned_cheap_into_mut(_data: &AtomicPtr<()>) -> bool {
// Since the memory's ownership is tied to an external owner
// it is never zero-copy to create a BytesMut.
false
}

unsafe fn owned_drop_impl(owned: *mut ()) {
let lifetime = owned.cast::<OwnedLifetime>();
let ref_cnt = &(*lifetime).ref_cnt;

let old_cnt = ref_cnt.fetch_sub(1, Ordering::Acquire);
if old_cnt != 1 {
return;
}
amunra marked this conversation as resolved.
Show resolved Hide resolved

let drop = &(*lifetime).drop;
drop(owned)
amunra marked this conversation as resolved.
Show resolved Hide resolved
}

unsafe fn owned_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) {
let owned = data.load(Ordering::Acquire);
amunra marked this conversation as resolved.
Show resolved Hide resolved
owned_drop_impl(owned);
}

static OWNED_VTABLE: Vtable = Vtable {
clone: owned_clone,
to_vec: owned_to_vec,
to_mut: owned_to_mut,
is_unique: owned_is_unique,
cheap_into_mut: owned_cheap_into_mut,
drop: owned_drop,
};

// ===== impl PromotableVtable =====

static PROMOTABLE_EVEN_VTABLE: Vtable = Vtable {
clone: promotable_even_clone,
to_vec: promotable_even_to_vec,
to_mut: promotable_even_to_mut,
is_unique: promotable_is_unique,
cheap_into_mut: promotable_is_unique,
drop: promotable_even_drop,
};

Expand All @@ -1027,6 +1165,7 @@ static PROMOTABLE_ODD_VTABLE: Vtable = Vtable {
to_vec: promotable_odd_to_vec,
to_mut: promotable_odd_to_mut,
is_unique: promotable_is_unique,
cheap_into_mut: promotable_is_unique,
drop: promotable_odd_drop,
};

Expand Down Expand Up @@ -1203,6 +1342,7 @@ static SHARED_VTABLE: Vtable = Vtable {
to_vec: shared_to_vec,
to_mut: shared_to_mut,
is_unique: shared_is_unique,
cheap_into_mut: shared_is_unique,
drop: shared_drop,
};

Expand Down
1 change: 1 addition & 0 deletions src/bytes_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1776,6 +1776,7 @@ static SHARED_VTABLE: Vtable = Vtable {
to_vec: shared_v_to_vec,
to_mut: shared_v_to_mut,
is_unique: shared_v_is_unique,
cheap_into_mut: shared_v_is_unique,
drop: shared_v_drop,
};

Expand Down
95 changes: 95 additions & 0 deletions tests/test_bytes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#![warn(rust_2018_idioms)]

use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::cell::Cell;
use std::rc::Rc;

use std::usize;

Expand Down Expand Up @@ -1479,3 +1481,96 @@ fn split_to_empty_addr_mut() {
let _ = &empty_start[..];
let _ = &buf[..];
}

// N.B.: Using `Rc` and `Cell` rather than `Arc` and `usize`
// since this forces the type to be !Send + !Sync, ensuring
// in this test that the owner remains a generic T.
#[derive(Clone)]
struct OwnedTester<const L: usize> {
buf: [u8; L],
drop_count: Rc<Cell<usize>>,
}

impl<const L: usize> OwnedTester<L> {
fn new(buf: [u8; L], drop_count: Rc<Cell<usize>>) -> Self {
Self { buf, drop_count }
}
}

impl<const L: usize> AsRef<[u8]> for OwnedTester<L> {
fn as_ref(&self) -> &[u8] {
self.buf.as_slice()
}
}

impl<const L: usize> Drop for OwnedTester<L> {
fn drop(&mut self) {
let current = self.drop_count.get();
self.drop_count.set(current + 1)
}
}

#[test]
fn owned_basic() {
let buf: [u8; 5] = [1, 2, 3, 4, 5];
let drop_counter = Rc::new(Cell::new(0));
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_owner(owner);
assert_eq!(&buf[..], &b1[..]);
assert!(b1.is_unique());
let b2 = b1.clone();
assert!(!b1.is_unique());
assert!(!b2.is_unique());
assert_eq!(drop_counter.get(), 0);
drop(b1);
assert_eq!(drop_counter.get(), 0);
assert!(b2.is_unique());
let b3 = b2.slice(1..b2.len() - 1);
assert!(!b2.is_unique());
assert!(!b3.is_unique());
drop(b2);
assert_eq!(drop_counter.get(), 0);
assert!(b3.is_unique());
drop(b3);
assert_eq!(drop_counter.get(), 1);
}

#[test]
fn owned_to_mut() {
let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let drop_counter = Rc::new(Cell::new(0));
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_owner(owner);

// Holding an owner will fail converting to a BytesMut,
// even when the bytes instance is unique.
assert!(b1.is_unique());
let b1 = b1.try_into_mut().unwrap_err();
assert_eq!(drop_counter.get(), 0);

// That said, it's still possible, just not cheap.
let bm1: BytesMut = b1.into();
let new_buf = &bm1[..];
assert_eq!(new_buf, &buf[..]);

assert_eq!(drop_counter.get(), 1);
drop(bm1);
assert_eq!(drop_counter.get(), 1);
}

#[test]
fn owned_to_vec() {
let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let drop_counter = Rc::new(Cell::new(0));
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_owner(owner);
assert!(b1.is_unique());

let v1 = b1.to_vec();
assert!(b1.is_unique());
assert_eq!(&v1[..], &buf[..]);
assert_eq!(&v1[..], &b1[..]);

drop(b1);
assert_eq!(drop_counter.get(), 1);
}