-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Using sharded locks instead of a global lock for Timers #6534
Conversation
The following code is used for the benchmark: use std::{
future::pending,
time::Duration,
};
fn main() {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(8)
.build()
.unwrap();
let _r = runtime.block_on(async {
let mut handles = Vec::with_capacity(1024);
for _ in 0..1024 {
handles.push(tokio::spawn(async move {
loop {
let h = timeout(Duration::from_millis(10), never_ready_job(1));
let _r = h.await;
}
}));
}
for handle in handles {
handle.await.unwrap();
}
});
}
// a job never ready
async fn never_ready_job(n: u64) -> u64 {
pending::<()>().await;
n * 2
} The flamegraph of the master branch: The flamegraph of this PR: The above shows a significant reduction in lock contention:
|
tokio/src/runtime/time/entry.rs
Outdated
let shard_id = | ||
super::rand::thread_rng_n(self.driver.driver().time().inner.get_shard_size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering whether we could use the current worker id, when this is called from a runtime thread. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's worth a try, this is the difference between sleep
s and task
s. The timer wheel is more like an mpsc, while the OwnedTasks I optimized before are mpmc. That is to say, when it comes to lock contention in sleep, the most severe one is insert, while clear is much smaller in lock contention. Therefore, making the create operation thread-local is worth trying for sleep
s.
However, I still need to point out that it would be useful if our scenario involves creating sleep and timeouts in a worker thread environment. But when it needs to be created in a thread outside of the worker thread, the effect is minimal. If there are enough shards, then the performance consumed by locking is already small enough, such as the 0.79%
in the above benchmark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the thread-local logic, as expected, creating a sleep in the worker thread is almost lock free contention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the thread-local logic, as expected, creating a sleep in the worker thread is almost lock free contention.
Assume the same would be said for creating a timeout in the worker thread? That it would be an almost lock free operation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is almost lock-free.
tokio/src/runtime/time/mod.rs
Outdated
// Used by `TimerEntry`. | ||
pub(crate) fn thread_rng_n(n: u32) -> u32 { | ||
thread_local! { | ||
static THREAD_RNG: Cell<FastRand> = Cell::new(FastRand::new()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not create new thread locals. Some platforms have small limits on the number of thread locals that each process can have.
There should already be a random number generator in the runtime context somewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used the context::thread_rng_n
instead. But this introduces unexpected additional time
features.
If we are not allowed to access 'context:: thread_rng'n' in the time
feature, then our other option may be to use std::thread::current().id().as_u64()
. However, the latter is unstable. Maybe the hash value of thread id can address this problem.
tokio/src/runtime/time/mod.rs
Outdated
next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); | ||
// Finds out the min expiration time to park. | ||
let mut next_wake: Option<u64> = None; | ||
for id in 0..rt_handle.time().inner.get_shard_size() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a question. What does it mean to the system if the park_internal came up with a new expiration time that wasn't the min? I don't know if other threads could be running when this function is being called, but if one can, couldn't it set a new smaller timer on a shard that the loop had already just checked? But I would have a question about the earlier version too so this is not an important question. Tx.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good question.
When the park_internal method is executed, other worker threads may be running or sleeping on condvar. Our concern is that if a smaller expiration time timer is registered, the entire runtime can process it in a timely manner.
Therefore, when Timer
s are registered, if necessary, we execute unpark.unpark()
to wake up the worker thread. Please See
tokio/tokio/src/runtime/time/mod.rs
Line 360 in cdf9d99
unpark.unpark(); |
Considering this is a multi-threaded issue, both the current PR version and the master branch version have unnecessary wake-up issues. That is to say, even if the current minimum expiration time has changed, the worker thread still parks on the driver with the older and larger expiration time which is generated by the last check. In this case, the worker thread will be quickly unparked by unpark.unpark()
.
This is similar to spurious wakeup, but the cost is not high.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are some other running worker threads.
- If necessary, it will attempt to execute
unpark.unpark()
- Alternatively, the worker thread that was originally responsible for the poll driver is not executed in the timeout way. If there is no event in the driver, it will immediately return from park_internal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing me to reregister
. So any time a task creates a timer entry that happens to create a new earliest time for the timer wheel to fire, the task is rescheduled to be woken up immediately. This PR doesn't change that fact.
And this design favors helping systems that are busy. The busier the system, the more unlikely a new timer entry will be the earliest, as there would likely be other tasks already waiting for similar timeout events that had been created earlier. Only when the system isn't busy is it likely a new entry will often be the wheel's next entry and that's just when a little inefficiency doesn't make any difference.
drop(lock); | ||
|
||
waker_list.wake_all(); | ||
next_wake_up |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I'm confused that waker_list.wake_all is called potentially multiple times before self.inner.set_next_wake is called and not afterwards, but I will admit I haven't quite caught up my understanding of the implications in the multi threaded case. Never mind. This makes sense to me now; of course the temporary waker_list would be flushed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM.
tokio/src/runtime/time/entry.rs
Outdated
// Gets the shard id. If current thread is a worker thread, we use its worker index as a shard id. | ||
// Otherwise, we use a random number generator to obtain the shard id. | ||
cfg_rt! { | ||
fn get_shard_id(&self) -> u32 { | ||
let shard_size = self.driver.driver().time().inner.get_shard_size(); | ||
let id = context::with_scheduler(|ctx| match ctx { | ||
Some(scheduler::Context::CurrentThread(_ctx)) => 0, | ||
#[cfg(feature = "rt-multi-thread")] | ||
Some(scheduler::Context::MultiThread(ctx)) => ctx.get_worker_index() as u32, | ||
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] | ||
Some(scheduler::Context::MultiThreadAlt(ctx)) => ctx.get_worker_index() as u32, | ||
_ => context::thread_rng_n(shard_size), | ||
}); | ||
id % shard_size | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds like a getter for a value that is already created, but actually it is the logic for figuring out which shard to use for a new timer. Can we rename this, and perhaps move it to a stand-alone function instead of a method on TimerEntry
. Otherwise I am worried that someone will call it and expect it to return the same value as what the TimerShared
is using.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this CR. I would like to make it not a method of TimerEntry
, and rename it to generate_shard_id
.
tokio/src/runtime/time/mod.rs
Outdated
/// The earliest time at which we promise to wake up without unparking. | ||
next_wake: AtomicU64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps it would result in clearer code to wrap this in an AtomicOptionNonZeroU64
utility type? Or at least we should simplify NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it shoud be simplified. I have added a new helper type exactly called AtomicOptionNonZeroU64
.
Compared to unwrap_or_else
and unwarp
, I prefer match
here.
tokio/src/runtime/time/mod.rs
Outdated
} | ||
|
||
fn store(&self, val: Option<u64>) { | ||
self.0.store(Self::turn(val), Ordering::Relaxed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This stores Some(1)
if passed Some(0)
. Can you add a comment that explains why?
Actually, it might be more clear to change this to Option<NonZeroU64>
and move the call to turn
to where you call this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add the helper function next_wake_time
instead and some explains about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. Thanks.
Motivation
As part of addressing #6504, this PR attempts to implement a shard approach to improve
timeout
andsleep
performance in specific scenarios.When high concurrency, a large number of
timeout
s orsleep
s are registered to Timer, the global lock contention is very severe. By sharding the lock, this performance issue can be significantly reduced.Solution