Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler multithreading #6821

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions src/libstd/rt/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use ops::Drop;
use kinds::Owned;
use rt::sched::{Scheduler, Coroutine};
use rt::local::Local;
use rt::rtio::EventLoop;
use unstable::intrinsics::{atomic_xchg, atomic_load};
use util::Void;
use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
Expand Down Expand Up @@ -158,7 +159,7 @@ impl<T> PortOne<T> {

// Switch to the scheduler to put the ~Task into the Packet state.
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| {
do sched.deschedule_running_task_and_then |sched, task| {
unsafe {
// Atomically swap the task pointer into the Packet state, issuing
// an acquire barrier to prevent reordering of the subsequent read
Expand All @@ -172,9 +173,15 @@ impl<T> PortOne<T> {
}
STATE_ONE => {
// Channel is closed. Switch back and check the data.
// NB: We have to drop back into the scheduler event loop here
// instead of switching immediately back or we could end up
// triggering infinite recursion on the scheduler's stack.
let task: ~Coroutine = cast::transmute(task_as_state);
let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task);
let task = Cell(task);
do sched.event_loop.callback {
let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task.take());
}
}
_ => util::unreachable()
}
Expand Down Expand Up @@ -614,5 +621,15 @@ mod test {
}
}
}

#[test]
fn recv_a_lot() {
// Regression test that we don't run out of stack in scheduler context
do run_in_newsched_task {
let (port, chan) = stream();
for 10000.times { chan.send(()) }
for 10000.times { port.recv() }
}
}
}

9 changes: 5 additions & 4 deletions src/libstd/rt/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,30 +85,31 @@ impl Local for IoFactoryObject {

#[cfg(test)]
mod test {
use rt::test::*;
use rt::sched::Scheduler;
use rt::uv::uvio::UvEventLoop;
use super::*;

#[test]
fn thread_local_scheduler_smoke_test() {
let scheduler = ~UvEventLoop::new_scheduler();
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
}

#[test]
fn thread_local_scheduler_two_instances() {
let scheduler = ~UvEventLoop::new_scheduler();
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
let scheduler = ~UvEventLoop::new_scheduler();
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
}

#[test]
fn borrow_smoke_test() {
let scheduler = ~UvEventLoop::new_scheduler();
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
unsafe {
let _scheduler: *mut Scheduler = Local::unsafe_borrow();
Expand Down
3 changes: 3 additions & 0 deletions src/libstd/rt/message_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

//! A concurrent queue that supports multiple producers and a
//! single consumer.

use container::Container;
use kinds::Owned;
use vec::OwnedVector;
Expand Down
20 changes: 13 additions & 7 deletions src/libstd/rt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ mod work_queue;
/// A parallel queue.
mod message_queue;

/// A parallel data structure for tracking sleeping schedulers.
mod sleeper_list;

/// Stack segments and caching.
mod stack;

Expand Down Expand Up @@ -145,12 +148,17 @@ pub mod thread_local_storage;
pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int {

use self::sched::{Scheduler, Coroutine};
use self::work_queue::WorkQueue;
use self::uv::uvio::UvEventLoop;
use self::sleeper_list::SleeperList;

init(crate_map);

let loop_ = ~UvEventLoop::new();
let mut sched = ~Scheduler::new(loop_);
let work_queue = WorkQueue::new();
let sleepers = SleeperList::new();
let mut sched = ~Scheduler::new(loop_, work_queue, sleepers);
sched.no_sleep = true;
let main_task = ~Coroutine::new(&mut sched.stack_pool, main);

sched.enqueue_task(main_task);
Expand Down Expand Up @@ -221,20 +229,18 @@ fn test_context() {
use rt::uv::uvio::UvEventLoop;
use cell::Cell;
use rt::local::Local;
use rt::test::new_test_uv_sched;

assert_eq!(context(), OldTaskContext);
do run_in_bare_thread {
assert_eq!(context(), GlobalContext);
let mut sched = ~UvEventLoop::new_scheduler();
let mut sched = ~new_test_uv_sched();
let task = ~do Coroutine::new(&mut sched.stack_pool) {
assert_eq!(context(), TaskContext);
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then() |task| {
do sched.deschedule_running_task_and_then() |sched, task| {
assert_eq!(context(), SchedulerContext);
let task = Cell(task);
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(task.take());
}
sched.enqueue_task(task);
}
};
sched.enqueue_task(task);
Expand Down
11 changes: 11 additions & 0 deletions src/libstd/rt/rtio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use rt::uv::uvio;
// XXX: ~object doesn't work currently so these are some placeholder
// types to use instead
pub type EventLoopObject = uvio::UvEventLoop;
pub type RemoteCallbackObject = uvio::UvRemoteCallback;
pub type IoFactoryObject = uvio::UvIoFactory;
pub type RtioTcpStreamObject = uvio::UvTcpStream;
pub type RtioTcpListenerObject = uvio::UvTcpListener;
Expand All @@ -26,10 +27,20 @@ pub trait EventLoop {
fn run(&mut self);
fn callback(&mut self, ~fn());
fn callback_ms(&mut self, ms: u64, ~fn());
fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject;
/// The asynchronous I/O services. Not all event loops may provide one
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject>;
}

pub trait RemoteCallback {
/// Trigger the remote callback. Note that the number of times the callback
/// is run is not guaranteed. All that is guaranteed is that, after calling 'fire',
/// the callback will be called at least once, but multiple callbacks may be coalesced
/// and callbacks may be called more often requested. Destruction also triggers the
/// callback.
fn fire(&mut self);
}

pub trait IoFactory {
fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>;
fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>;
Expand Down
Loading