Skip to content

Commit

Permalink
threadpool: refactor pool shutdown (#769)
Browse files Browse the repository at this point in the history
  • Loading branch information
Stjepan Glavina authored Nov 20, 2018
1 parent 9c03704 commit 3235749
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 125 deletions.
47 changes: 27 additions & 20 deletions tokio-threadpool/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use callback::Callback;
use config::{Config, MAX_WORKERS};
use park::{BoxPark, BoxedPark, DefaultPark};
use sender::Sender;
use shutdown::ShutdownTrigger;
use pool::{Pool, MAX_BACKUP};
use thread_pool::ThreadPool;
use worker::{self, Worker, WorkerId};
Expand Down Expand Up @@ -396,31 +396,38 @@ impl Builder {
/// # }
/// ```
pub fn build(&self) -> ThreadPool {
let mut workers = vec![];

trace!("build; num-workers={}", self.pool_size);

for i in 0..self.pool_size {
let id = WorkerId::new(i);
let park = (self.new_park)(&id);
let unpark = park.unpark();
// Create the worker entry list
let workers: Arc<[worker::Entry]> = {
let mut workers = vec![];

workers.push(worker::Entry::new(park, unpark));
}
for i in 0..self.pool_size {
let id = WorkerId::new(i);
let park = (self.new_park)(&id);
let unpark = park.unpark();

// Create the pool
let pool = Arc::new(
Pool::new(
workers.into_boxed_slice(),
self.max_blocking,
self.config.clone()));
workers.push(worker::Entry::new(park, unpark));
}

// Wrap with `Sender`
let sender = Some(Sender {
pool
});
workers.into()
};

// Create a trigger that will clean up resources on shutdown.
//
// The `Pool` contains a weak reference to it, while `Worker`s and the `ThreadPool` contain
// strong references.
let trigger = Arc::new(ShutdownTrigger::new(workers.clone()));

// Create the pool
let pool = Arc::new(Pool::new(
workers,
Arc::downgrade(&trigger),
self.max_blocking,
self.config.clone(),
));

ThreadPool { sender }
ThreadPool::new2(pool, trigger)
}
}

Expand Down
1 change: 0 additions & 1 deletion tokio-threadpool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ mod notifier;
mod pool;
mod sender;
mod shutdown;
mod shutdown_task;
mod task;
mod thread_pool;
mod worker;
Expand Down
89 changes: 25 additions & 64 deletions tokio-threadpool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@ use self::backup::Handoff;
use self::backup_stack::BackupStack;

use config::Config;
use shutdown_task::ShutdownTask;
use shutdown::ShutdownTrigger;
use task::{Task, Blocking};
use worker::{self, Worker, WorkerId};

use futures::Poll;
use futures::task::AtomicTask;

use std::cell::Cell;
use std::num::Wrapping;
use std::sync::atomic::Ordering::{Acquire, AcqRel};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::thread;

use crossbeam_utils::CachePadded;
Expand Down Expand Up @@ -57,8 +56,14 @@ pub(crate) struct Pool {
// A worker is a thread that is processing the work queue and polling
// futures.
//
// This will *usually* be a small number.
pub workers: Box<[worker::Entry]>,
// The number of workers will *usually* be small.
pub workers: Arc<[worker::Entry]>,

// Completes the shutdown process when the `ThreadPool` and all `Worker`s get dropped.
//
// When spawning a new `Worker`, this weak reference is upgraded and handed out to the new
// thread.
pub trigger: Weak<ShutdownTrigger>,

// Backup thread state
//
Expand All @@ -74,18 +79,18 @@ pub(crate) struct Pool {
// are pending blocking capacity.
blocking: Blocking,

// Task notified when the worker shuts down
pub shutdown_task: ShutdownTask,

// Configuration
pub config: Config,
}

const TERMINATED: usize = 1;

impl Pool {
/// Create a new `Pool`
pub fn new(workers: Box<[worker::Entry]>, max_blocking: usize, config: Config) -> Pool {
pub fn new(
workers: Arc<[worker::Entry]>,
trigger: Weak<ShutdownTrigger>,
max_blocking: usize,
config: Config,
) -> Pool {
let pool_size = workers.len();
let total_size = max_blocking + pool_size;

Expand All @@ -112,12 +117,10 @@ impl Pool {
sleep_stack: CachePadded::new(worker::Stack::new()),
num_workers: AtomicUsize::new(0),
workers,
trigger,
backup,
backup_stack,
blocking,
shutdown_task: ShutdownTask {
task: AtomicTask::new(),
},
config,
};

Expand Down Expand Up @@ -191,10 +194,6 @@ impl Pool {
self.terminate_sleeping_workers();
}

pub fn is_shutdown(&self) -> bool {
self.num_workers.load(Acquire) == TERMINATED
}

/// Called by `Worker` as it tries to enter a sleeping state. Before it
/// sleeps, it must push itself onto the sleep stack. This enables other
/// threads to see it when signaling work.
Expand All @@ -205,12 +204,6 @@ impl Pool {
pub fn terminate_sleeping_workers(&self) {
use worker::Lifecycle::Signaled;

// First, set the TERMINATED flag on `num_workers`. This signals that
// whichever thread transitions the count to zero must notify the
// shutdown task.
let prev = self.num_workers.fetch_or(TERMINATED, AcqRel);
let notify = prev == 0;

trace!(" -> shutting down workers");
// Wakeup all sleeping workers. They will wake up, see the state
// transition, and terminate.
Expand All @@ -226,40 +219,6 @@ impl Pool {
while let Ok(Some(backup_id)) = self.backup_stack.pop(&self.backup, true) {
self.backup[backup_id.0].signal_stop();
}

if notify {
self.shutdown_task.notify();
}
}

/// Track that a worker thread has started
///
/// If `Err` is returned, then the thread is not permitted to started.
fn thread_started(&self) -> Result<(), ()> {
let mut curr = self.num_workers.load(Acquire);

loop {
if curr & TERMINATED == TERMINATED {
return Err(());
}

let actual = self.num_workers.compare_and_swap(
curr, curr + 2, AcqRel);

if curr == actual {
return Ok(());
}

curr = actual;
}
}

fn thread_stopped(&self) {
let prev = self.num_workers.fetch_sub(2, AcqRel);

if prev == TERMINATED | 2 {
self.shutdown_task.notify();
}
}

pub fn poll_blocking_capacity(&self, task: &Arc<Task>) -> Poll<(), ::BlockingError> {
Expand Down Expand Up @@ -385,10 +344,14 @@ impl Pool {
return;
}

if self.thread_started().is_err() {
let trigger = match self.trigger.upgrade() {
// The pool is shutting down.
return;
}
None => {
// The pool is shutting down.
return;
}
Some(t) => t,
};

let mut th = thread::Builder::new();

Expand Down Expand Up @@ -416,7 +379,7 @@ impl Pool {
debug_assert!(pool.backup[backup_id.0].is_running());

// TODO: Avoid always cloning
let worker = Worker::new(worker_id, backup_id, pool.clone());
let worker = Worker::new(worker_id, backup_id, pool.clone(), trigger.clone());

// Run the worker. If the worker transitioned to a "blocking"
// state, then `is_blocking` will be true.
Expand Down Expand Up @@ -463,8 +426,6 @@ impl Pool {
if let Some(ref f) = pool.config.before_stop {
f();
}

pool.thread_stopped();
});

if let Err(e) = res {
Expand Down
65 changes: 55 additions & 10 deletions tokio-threadpool/src/shutdown.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use pool::Pool;
use sender::Sender;
use worker;

use futures::{Future, Poll, Async};
use futures::task::AtomicTask;

use std::sync::{Arc, Mutex};

/// Future that resolves when the thread pool is shutdown.
///
Expand All @@ -16,12 +18,25 @@ use futures::{Future, Poll, Async};
/// [`shutdown_now`]: struct.ThreadPool.html#method.shutdown_now
#[derive(Debug)]
pub struct Shutdown {
pub(crate) sender: Sender,
inner: Arc<Mutex<Inner>>,
}

/// Shared state between `Shutdown` and `ShutdownTrigger`.
///
/// This is used for notifying the `Shutdown` future when `ShutdownTrigger` gets dropped.
#[derive(Debug)]
struct Inner {
/// The task to notify when the threadpool completes the shutdown process.
task: AtomicTask,
/// `true` if the threadpool has been shut down.
completed: bool,
}

impl Shutdown {
fn pool(&self) -> &Pool {
&*self.sender.pool
pub(crate) fn new(trigger: &ShutdownTrigger) -> Shutdown {
Shutdown {
inner: trigger.inner.clone(),
}
}
}

Expand All @@ -30,14 +45,44 @@ impl Future for Shutdown {
type Error = ();

fn poll(&mut self) -> Poll<(), ()> {
use futures::task;
let inner = self.inner.lock().unwrap();

self.pool().shutdown_task.task.register_task(task::current());
if !inner.completed {
inner.task.register();
Ok(Async::NotReady)
} else {
Ok(().into())
}
}
}

/// When dropped, cleans up threadpool's resources and completes the shutdown process.
#[derive(Debug)]
pub(crate) struct ShutdownTrigger {
inner: Arc<Mutex<Inner>>,
workers: Arc<[worker::Entry]>,
}

if !self.pool().is_shutdown() {
return Ok(Async::NotReady);
unsafe impl Send for ShutdownTrigger {}
unsafe impl Sync for ShutdownTrigger {}

impl ShutdownTrigger {
pub(crate) fn new(workers: Arc<[worker::Entry]>) -> ShutdownTrigger {
ShutdownTrigger {
inner: Arc::new(Mutex::new(Inner {
task: AtomicTask::new(),
completed: false,
})),
workers,
}
}
}

Ok(().into())
impl Drop for ShutdownTrigger {
fn drop(&mut self) {
// Notify the task interested in shutdown.
let mut inner = self.inner.lock().unwrap();
inner.completed = true;
inner.task.notify();
}
}
12 changes: 0 additions & 12 deletions tokio-threadpool/src/shutdown_task.rs

This file was deleted.

Loading

0 comments on commit 3235749

Please sign in to comment.