-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sync: fix notify_waiters
notifying sequential awaits
#5404
Conversation
tokio/src/sync/notify.rs
Outdated
// Safety: the waiter is still not inserted | ||
let initial_notify_waiters_calls = | ||
unsafe { (*waiter.get()).notify_waiters_calls }; | ||
|
||
// Optimistically check if notify_waiters has been called | ||
// after the future was created. | ||
if get_num_notify_waiters_calls(curr) != initial_notify_waiters_calls { | ||
*state = Done; | ||
return Poll::Ready(()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This additional check is basically free and should help to avoid waiting for the lock.
@@ -222,8 +222,11 @@ struct Waiter { | |||
/// Waiting task's waker. | |||
waker: Option<Waker>, | |||
|
|||
/// `true` if the notification has been assigned to this waiter. | |||
notified: Option<NotificationType>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed this field because it was confusing to me because of the similarly named future.
It seems like there is a compilation failure with some choice of feature flags:
|
Yeah, thanks, I fixed it. There is something complicated going on with atomic ordering causing a regression. I will look into this more later. |
Turned out to be a logic bug unrelated to atomics. This is ready for a review @Darksonn. Sorry for so many scattered changes, but I found some comments inaccurate and decided to change them as well. |
Let's say we have two pending
Can the assert fail under this change? |
If we have only two futures, then this assert cannot fail. To see why, note that However, this reasoning falls apart if we consider more pending futures. If we have an array of 33 not yet polled futs[1].enable();
for i in 2..33 {
futs[i].enable();
}
futs[0].enable();
// Waiters queue: head -> [futs[0], ..., futs[2] | futs[1]]
// 0 ^ 31 ^ 32 ^
tokio::spawn(async move {
notify.notify_waiters();
});
let res1 = futs[0].poll();
let res2 = futs[1].poll();
// this can fail
assert!(res1.is_pending() || res2.is_ready()); // if res1 is ready, then res2 must also be ready It can fail because This change uses futs[0].enable();
for i in 2..33 {
futs[i].enable();
}
futs[1].enable();
// ... |
Oh, I was thinking of the case where there are also other waiters. |
Then it is possible to fail this assertion under this change and on master branch. Please let me know if this should be fixed together in this PR, I think it will require more work then. |
It seems like an important property to me. I don't care whether it is fixed in this PR, or in a separate one. |
It should be fixed now. Please take a look @Darksonn. |
Did you double-check that |
It does not fail because the code on current master traverses the queue in reverse order, but it fails under 572db8d where this test was added without the actual fix. |
I've added a test variant which fails under the current master, double-checked that. |
As a side note: the whole reason behind the issue this PR is trying to fix is that we have to wake up waiters in batches to avoid deadlocks. As far as I can tell from previous discussion, a deadlock can happen if a waker has a drop impl which tries to acquire the mutex again, for example by calling |
Ok, that sounds good. I don't have time to do a full review right now, but I'm happy with the tests and the claimed behavior. Please file bugs for wakers that are dropped/woken while a mutex is held. |
Hmm, something complicated is going on. For example, it seems like one can still observe it not being atomic in the following manner:
Then, the I'm not sure what to do about this. I will need to think about it. |
That's a good call. I think one way to fix it would be to make every call to |
Yeah, adding even more special cases to handle the thing I mentioned seems too error prone. Moving all of the to-be-notified waiters to another list seems reasonable. We might be able to do it by giving each waiter a raw pointer to the list it is stored in? (It has to be able to remove itself from the list in case the waiter is destroyed during the |
Hmm, but to keep those raw pointers valid, we would have to either box the list or set each pointer at I think it can be avoided by creating a pinned version of a linked list in every A waiter can then determine whether the list it belongs to is a pinned one by comparing its |
I imagined that we would traverse the entire list, yes. Some sort of solution where the waiters can remove themselves without knowing which list it is in, is an interesting idea. I don't think we have support for that in our current linked list implementation, though. |
I went ahead with the idea of giving each waiter a raw pointer. The idea of a pinned linked list is definitely a possibility for future improvement, but the amount of unsafe code it would require is a bit too much for me right now. I think there is still some work to do to make the aliasing correct. I will get back to it later and let you know when it's ready for another review. |
@@ -222,8 +228,9 @@ struct Waiter { | |||
/// Waiting task's waker. | |||
waker: Option<Waker>, | |||
|
|||
/// `true` if the notification has been assigned to this waiter. | |||
notified: Option<NotificationType>, | |||
notify_waiters_queue: Option<NonNull<WaitList>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell, the invariant of this field is that it is None
when it is in the Notify
's own list, and Some
when it is in some list. Please document that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, I've added a note about this.
// See if the node was notified but not received. In this case, if | ||
// the notification was triggered via `notify_one`, it must be sent | ||
// to the next waiter. | ||
// | ||
// Safety: with the entry removed from the linked list, there can be | ||
// no concurrent access to the entry | ||
if matches!( | ||
unsafe { (*waiter.get()).notification }, | ||
Some(NotificationType::OneWaiter) | ||
) { | ||
if let Some(waker) = notify_locked(&mut waiters, ¬ify.state, notify_state) { | ||
drop(waiters); | ||
waker.wake(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can also cause weird things ... I know this was already here, and maybe the answer is to just not change it.
Imagine this sequence of actions:
- Create two
Notified
futures and poll them. - Call
notify_one
twice. - Create two new
Notified
futures. - Drop the original two
Notified
futures. - Both of the futures from step 3 complete.
This is weird since the futures that completed were created after the notify_one
calls, and the fact that there are two means that we can't just pretend that this is due to the permit being stored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this for a while and I think that's a tough problem. The issue is that we would like to think about cancelling a Notified
future as simply removing it from the queue, like it's written in the docs. In reality, a future is removed much earlier and at drop we only simulate what would happen, if the future was removed before receiving the permit. In this code, it is done by passing the permit to another waiting future.
However, we can pass the permit from a future A to a waiting future B if and only if at any point between receiving the permit by A and pushing B to the queue the state of Notify
didn't reach NOTIFIED
. We could check for that by enumerating all events of calling notify_one
and enabling Notified
futures and keeping the last number when the state was NOTIFIED
. In Drop
it would suffice to check if the future being dropped isn't older than this number.
Unfortunately, the problem is deeper than that, and your example demonstrates this. We can drop multiple futures, and simulating that one of them was removed from the queue before receiving the permit will affect how we simulate the same for others.
I like to think about it the following way: let Notified
future and notify_one
call. Then we can define the number of waiters after the NOTIFIED
. To check whether we can transfer the permit from a dropped future, we should track the last number when the state was NOTIFIED
which is the greatest
The issue is that dropping a future enabled in the Drop
impl.
To sum up, we would need a complicated data structure to effectively track whether we should transfer the permit from a dropped future, for example a deque or some sort of segment tree, which probably is not feasible here. I think the best we can do is the heuristic I've mentioned earlier, but it doesn't even fix your case, so in my opinion we should probably leave this code as it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that we ran into a similar problem during a discussion about adding a condition variable to Tokio. To me, it seems that the best option is to just document that dropping a future that has consumed a permit from a notify_one
call will pass on its permit as-if notify_one
was called in the destructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will be happy to open a PR improving the docs.
tokio/src/sync/notify.rs
Outdated
let waiters_to_notify = UnsafeCell::new(std::mem::take(&mut *waiters)); | ||
pin!(waiters_to_notify); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The LinkedList
type is Unpin
, so pinning it is a no-op.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it felt awkward to use Pin here...
My intention was to only shadow the original variable binding to prevent from accidentally moving it. I am not sure what is the best way to express that this variable won't change its address. Anyway, I replaced the pin macro with manual shadowing.
tokio/src/sync/notify.rs
Outdated
for mut waiter in unsafe { &*waiters_to_notify.get() } { | ||
// Safety: we hold the `waiters` lock. | ||
let waiter = unsafe { waiter.as_mut() }; | ||
|
||
// Safety: address of `waiters_to_notify` is not null. | ||
waiter.notify_waiters_queue = | ||
unsafe { Some(NonNull::new_unchecked(waiters_to_notify.get())) }; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to create a raw pointer once and reuse it. This has clearer semantics aliasing-wise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored this part.
I've noticed a slight degradation in performance when all waiters can fit into a one batch. In such case there is no need to traverse the list and set all the pointers, as they will be immediately removed. For that reason, now Just for the record, as there are many new loom tests, almost all of them complete instantaneously. Only the |
Closing in favor of #5458. |
Motivation
Closes: #5396
tokio::sync::Notify
has an internal waiters queue locked behind a mutex.Notify::notify_waiters
is a function to notify all waiters from this queue. It first acquires the lock, then removes waiters from the queue and wakes them up. However, the whole process is done in batches of 32 waiters to avoid deadlocks. The function has to release the lock before waking up a batch and re-acquire it before proceeding to the next one. In this short timespan another thread can acquire the lock and modify the queue and internal state. For example, it is possible to insert a new waiter to the queue, which will be included in one of the later batches. Current implementation ofnotify_waiters
does not account for that, what leads to the unexpected behavior described in #5396.Solution
Notify
tracks the number of calls tonotify_waiters
internally and each time a newNotified
future is created, the current value of this counter is stored inside the future. The solution is to utilize this counter and compare its value with the number of the current call tonotify_waiters
inside this function. This way, futures created during thenotify_waiters
execution can be filtered out and kept in the queue.