Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: create reactor per worker #660

Merged
merged 5 commits into from Oct 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ appveyor = { repository = "carllerche/tokio", id = "s83yxhy9qeb58va7" }

[dependencies]
bytes = "0.4"
num_cpus = "1.8.0"
tokio-codec = { version = "0.1.0", path = "tokio-codec" }
tokio-current-thread = { version = "0.1.3", path = "tokio-current-thread" }
tokio-io = { version = "0.1.6", path = "tokio-io" }
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ extern crate bytes;
#[macro_use]
extern crate futures;
extern crate mio;
extern crate num_cpus;
extern crate tokio_current_thread;
extern crate tokio_io;
extern crate tokio_executor;
Expand Down
63 changes: 39 additions & 24 deletions src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use runtime::{Inner, Runtime};
use reactor::Reactor;

use std::io;
use std::sync::Mutex;
use std::time::Duration;

use num_cpus;
use tokio_reactor;
use tokio_threadpool::Builder as ThreadPoolBuilder;
use tokio_threadpool::park::DefaultPark;
use tokio_timer::clock::{self, Clock};
use tokio_timer::timer::{self, Timer};

Expand Down Expand Up @@ -51,6 +52,9 @@ pub struct Builder {
/// Thread pool specific builder
threadpool_builder: ThreadPoolBuilder,

/// The number of worker threads
core_threads: usize,

/// The clock to use
clock: Clock,
}
Expand All @@ -61,11 +65,15 @@ impl Builder {
///
/// Configuration methods can be chained on the return value.
pub fn new() -> Builder {
let core_threads = num_cpus::get().max(1);

let mut threadpool_builder = ThreadPoolBuilder::new();
threadpool_builder.name_prefix("tokio-runtime-worker-");
threadpool_builder.pool_size(core_threads);

Builder {
threadpool_builder,
core_threads,
clock: Clock::new(),
}
}
Expand Down Expand Up @@ -110,6 +118,7 @@ impl Builder {
/// # }
/// ```
pub fn core_threads(&mut self, val: usize) -> &mut Self {
self.core_threads = val;
self.threadpool_builder.pool_size(val);
self
}
Expand Down Expand Up @@ -243,44 +252,50 @@ impl Builder {
/// # }
/// ```
pub fn build(&mut self) -> io::Result<Runtime> {
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
// TODO(stjepang): Once we remove the `threadpool_builder` method, remove this line too.
self.threadpool_builder.pool_size(self.core_threads);

// Get a handle to the clock for the runtime.
let clock1 = self.clock.clone();
let clock2 = clock1.clone();
let mut reactor_handles = Vec::new();
let mut timer_handles = Vec::new();
let mut timers = Vec::new();

let timers = Arc::new(Mutex::new(HashMap::<_, timer::Handle>::new()));
let t1 = timers.clone();
for _ in 0..self.core_threads {
// Create a new reactor.
let reactor = Reactor::new()?;
reactor_handles.push(reactor.handle());

// Spawn a reactor on a background thread.
let reactor = Reactor::new()?.background()?;
// Create a new timer.
let timer = Timer::new_with_now(reactor, self.clock.clone());
timer_handles.push(timer.handle());
timers.push(Mutex::new(Some(timer)));
}

// Get a handle to the reactor.
let reactor_handle = reactor.handle().clone();
// Get a handle to the clock for the runtime.
let clock = self.clock.clone();

// Get a handle to the first reactor.
let reactor = reactor_handles[0].clone();

let pool = self.threadpool_builder
.around_worker(move |w, enter| {
let timer_handle = t1.lock().unwrap()
.get(w.id()).unwrap()
.clone();
let index = w.id().to_usize();

tokio_reactor::with_default(&reactor_handle, enter, |enter| {
clock::with_default(&clock1, enter, |enter| {
timer::with_default(&timer_handle, enter, |_| {
tokio_reactor::with_default(&reactor_handles[index], enter, |enter| {
clock::with_default(&clock, enter, |enter| {
timer::with_default(&timer_handles[index], enter, |_| {
w.run();
});
})
});
})
.custom_park(move |worker_id| {
// Create a new timer
let timer = Timer::new_with_now(DefaultPark::new(), clock2.clone());

timers.lock().unwrap()
.insert(worker_id.clone(), timer.handle());
let index = worker_id.to_usize();

timer
timers[index]
.lock()
.unwrap()
.take()
.unwrap()
})
.build();

Expand Down
21 changes: 7 additions & 14 deletions src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub use self::builder::Builder;
pub use self::shutdown::Shutdown;
pub use self::task_executor::TaskExecutor;

use reactor::{Background, Handle};
use reactor::Handle;

use std::io;

Expand Down Expand Up @@ -152,8 +152,8 @@ pub struct Runtime {

#[derive(Debug)]
struct Inner {
/// Reactor running on a background thread.
reactor: Background,
/// A handle to one of the per-worker reactors.
reactor: Handle,

/// Task execution pool.
pool: threadpool::ThreadPool,
Expand Down Expand Up @@ -254,6 +254,7 @@ impl Runtime {
#[deprecated(since = "0.1.5", note = "use `reactor` instead")]
#[doc(hidden)]
pub fn handle(&self) -> &Handle {
#[allow(deprecated)]
self.reactor()
}

Expand All @@ -275,8 +276,9 @@ impl Runtime {
///
/// // use `reactor_handle`
/// ```
#[deprecated(since = "0.1.11", note = "there is now a reactor per worker thread")]
pub fn reactor(&self) -> &Handle {
self.inner().reactor.handle()
&self.inner().reactor
}

/// Return a handle to the runtime's executor.
Expand Down Expand Up @@ -424,16 +426,7 @@ impl Runtime {
/// [mod]: index.html
pub fn shutdown_on_idle(mut self) -> Shutdown {
let inner = self.inner.take().unwrap();

let inner = Box::new({
let pool = inner.pool;
let reactor = inner.reactor;

pool.shutdown_on_idle().and_then(|_| {
reactor.shutdown_on_idle()
})
});

let inner = inner.pool.shutdown_on_idle();
Shutdown { inner }
}

Expand Down
16 changes: 3 additions & 13 deletions src/runtime/shutdown.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,18 @@
use runtime::Inner;
use tokio_threadpool as threadpool;

use std::fmt;

use futures::{Future, Poll};

/// A future that resolves when the Tokio `Runtime` is shut down.
pub struct Shutdown {
pub(super) inner: Box<Future<Item = (), Error = ()> + Send>,
pub(super) inner: threadpool::Shutdown,
}

impl Shutdown {
pub(super) fn shutdown_now(inner: Inner) -> Self {
let inner = Box::new({
let pool = inner.pool;
let reactor = inner.reactor;

pool.shutdown_now().and_then(|_| {
reactor.shutdown_now()
.then(|_| {
Ok(())
})
})
});

let inner = inner.pool.shutdown_now();
Shutdown { inner }
}
}
Expand Down
8 changes: 8 additions & 0 deletions tokio-threadpool/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -892,4 +892,12 @@ impl WorkerId {
pub(crate) fn new(idx: usize) -> WorkerId {
WorkerId(idx)
}

/// Returns this identifier represented as an integer.
///
/// Worker identifiers in a single thread pool are guaranteed to correspond to integers in the
/// range `0..pool_size`.
pub fn to_usize(&self) -> usize {
self.0
}
}