Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(oneshot channel): ensure msg won't be dropped on sender side when send returns ok #6558

Merged

Conversation

wenym1
Copy link
Contributor

@wenym1 wenym1 commented May 14, 2024

Motivation

Related: risingwavelabs/risingwave#6617.

When we use oneshot::Sender, it is intuitive to assume that when tx.send(msg) returns with Ok(()), the msg will have been added to the buffer and either will be consumed by the rx.await, or dropped along with dropping rx, but in either way, msg should not be dropped on the tx side.

However, in the current implementation, the assumption does not hold. It might happen that, even though tx.send(msg) returns with Ok(()), the msg is still dropped on the sender side.

The current struct of Sender is

struct Sender<T> {
    inner: Option<Arc<Inner<T>>>,
}

In the current implementation, in tx.send(msg), msg will be added to inner. Before msg gets consumed by rx, the struct Inner holds the ownership of msg. Inner is protected by Arc with reference counting to avoid memory leak. When the reference counting of Arc<Inner> decreases to 0, msg gets dropped along with Inner. However, in the following execution flow, msg will be dropped on sender side when tx.send(msg) returns Ok(()).

pub fn send(mut self, t: T) -> Result<(), T> {
    let inner = self.inner.take().unwrap();

    inner.value.with_mut(|ptr| unsafe {
        *ptr = Some(t);
    });

    if !inner.complete() {
        unsafe {
            return Err(inner.consume_value().unwrap());
        }
    }

    // assume that rx not dropped yet here, ref count of `Arc<Inner>` is 2
-> rx dropped here, ref count becomes 1

    Ok(())
-> variable `inner` gets dropped,  ref count decreases to 0, `msg` gets dropped along with dropping `Inner`, but we return with `Ok(())`
}

Solution

The problem happens due to the implicit drop of variable inner. We are not aware of whether or not dropping inner will decreases the Arc ref count to 0 and further drop the Inner.

Therefore, in this PR, we can leverage the Arc::into_inner to explicitly consume the variable inner so that we can atomically decrease the ref count and check whether the Inner will be further dropped. If Inner is going to be dropped, we can then take the msg out and return with Err(msg) if the msg has not been consumed by rx yet.

@github-actions github-actions bot added the R-loom-sync Run loom sync tests on this PR label May 14, 2024
@Darksonn Darksonn added A-tokio Area: The main tokio crate M-sync Module: tokio/sync labels May 14, 2024
@Darksonn
Copy link
Contributor

Can you please add a loom test that catches the issue you have mentioned? It would need to go somewhere in this folder. Preferably the test should be as simple as possible.

tokio/Cargo.toml Outdated Show resolved Hide resolved
@wenym1
Copy link
Contributor Author

wenym1 commented May 14, 2024

Can you please add a loom test that catches the issue you have mentioned? It would need to go somewhere in this folder. Preferably the test should be as simple as possible.

I have added a loom test. The test logic is as simple as followed.

let (tx, rx) = oneshot::channel();

// thread 1
if let Err(msg) = tx.send(msg) {
    std::mem::forget(msg);
}

// thread 2
drop(rx);

We can assert that msg can only be dropped in thread 2, because in thread 1, tx.send(msg) either return with Ok(()), or call std::mem::forget(msg) on Err(msg), and drop shouldn't be called when returning Ok(()).

// we call `std::mem::forget(msg)`, so that
// `drop` is not expected to be called in the
// tx thread.
assert!(*is_rx.borrow());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the newly added code in oneshot.rs is removed, we will hit this assertion.

@wenym1 wenym1 requested a review from Darksonn May 14, 2024 11:40
@Darksonn
Copy link
Contributor

I suspect this could be solved by having impl Drop for Receiver call consume_value if the call to self.close() observes sees the VALUE_SET bit.

@wenym1
Copy link
Contributor Author

wenym1 commented May 14, 2024

I suspect this could be solved by having impl Drop for Receiver call consume_value if the call to self.close() observes sees the VALUE_SET bit.

It's feasible. I have updated the code. Thanks for the suggestion!. Now no dependency will be added in this PR. @Darksonn

tokio/src/sync/oneshot.rs Outdated Show resolved Hide resolved
tokio/src/sync/tests/loom_oneshot.rs Outdated Show resolved Hide resolved
Comment on lines 180 to 182
let rx_thread_join_handle = thread::spawn(move || {
drop(rx);
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just call drop(rx) on the main thread. Spawning threads is very expensive in loom tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a try.

If I simply use drop(rx) in the main thread, when I revert the changes in oneshot.rs, the assertion cannot be hit, and if I do it in a spawned thread, the assertion can be hit.

I doubt that only drop(rx) in a separate thread can cover the case in loom.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you hit the assertion with LOOM_MAX_PREEMPTIONS=2? That's what we run with in CI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, when I set it to 2, the assertion can be hit.

Should the CONTRIBUTING.md be updated? I ran the loom test following the guide in it.

LOOM_MAX_PREEMPTIONS=1 LOOM_MAX_BRANCHES=10000 RUSTFLAGS="--cfg loom -C debug_assertions" \

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It takes a lot longer to run with the option set to 2, and most bugs can be caught with the value set to 1. It may be better to have your test use the loom test builder to explicitly set the max preemptions to 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Have updated the code.

@wenym1 wenym1 requested a review from Darksonn May 15, 2024 10:26
Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you.

@Darksonn Darksonn merged commit 18e048d into tokio-rs:master May 15, 2024
77 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-sync Module: tokio/sync R-loom-sync Run loom sync tests on this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants