diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d3c1de36f1..1eedadff69 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -5,7 +5,7 @@ on: [push, pull_request] env: qemu-version: 8.2.0 rust-toolchain: nightly-2024-05-02 - arceos-apps: '68054e8' + arceos-apps: 'b25b7e2' jobs: unit-test: diff --git a/Cargo.lock b/Cargo.lock index eb9a02423d..8db9882c99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -510,6 +510,7 @@ dependencies = [ "axconfig", "axhal", "axtask", + "bitmaps", "cfg-if", "crate_interface", "kernel_guard", @@ -1022,9 +1023,9 @@ checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" [[package]] name = "memory_addr" -version = "0.3.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f769efcf10b9dfb4c913bebb409cda77b1a3f072b249bf5465e250bcb30eb49" +checksum = "2ca25419c2b34080d526d6836a53dcab129767ab6a9904587c87265ae6d41aa9" [[package]] name = "minimal-lexical" @@ -1314,8 +1315,9 @@ checksum = "e6e36312fb5ddc10d08ecdc65187402baba4ac34585cb9d1b78522ae2358d890" [[package]] name = "scheduler" version = "0.1.0" -source = "git+https://github.com/arceos-org/scheduler.git?tag=v0.1.0#c8d25d9aed146dca28dc8987afd229b52c20361a" +source = "git+https://github.com/arceos-org/scheduler.git?branch=num_tasks#415a620347722cb734fa440d9103065690a5853b" dependencies = [ + "kspin", "linked_list", ] diff --git a/api/axfeat/Cargo.toml b/api/axfeat/Cargo.toml index 6e246b36df..17121025f8 100644 --- a/api/axfeat/Cargo.toml +++ b/api/axfeat/Cargo.toml @@ -13,7 +13,7 @@ documentation = "https://arceos-org.github.io/arceos/axfeat/index.html" default = [] # Multicore -smp = ["axhal/smp", "axruntime/smp", "kspin/smp"] +smp = ["axhal/smp", "axruntime/smp", "axtask/smp", "kspin/smp"] # Floating point/SIMD fp_simd = ["axhal/fp_simd"] diff --git a/modules/axruntime/Cargo.toml b/modules/axruntime/Cargo.toml index b904fe6b89..c26237b718 100644 --- a/modules/axruntime/Cargo.toml +++ b/modules/axruntime/Cargo.toml @@ -12,7 +12,7 @@ documentation = "https://arceos-org.github.io/arceos/axruntime/index.html" [features] default = [] -smp = ["axhal/smp"] +smp = ["axhal/smp", "axtask?/smp"] irq = ["axhal/irq", "axtask?/irq", "percpu", "kernel_guard"] tls = ["axhal/tls", "axtask?/tls"] alloc = ["axalloc"] diff --git a/modules/axtask/Cargo.toml b/modules/axtask/Cargo.toml index 25fe7fb371..ec5a16ab3e 100644 --- a/modules/axtask/Cargo.toml +++ b/modules/axtask/Cargo.toml @@ -13,12 +13,20 @@ documentation = "https://arceos-org.github.io/arceos/axtask/index.html" default = [] multitask = [ - "dep:axconfig", "dep:percpu", "dep:kspin", "dep:lazyinit", "dep:memory_addr", - "dep:scheduler", "dep:timer_list", "kernel_guard", "dep:crate_interface", + "dep:axconfig", + "dep:percpu", + "dep:kspin", + "dep:lazyinit", + "dep:memory_addr", + "dep:scheduler", + "dep:timer_list", + "kernel_guard", + "dep:crate_interface", ] irq = [] tls = ["axhal/tls"] preempt = ["irq", "percpu?/preempt", "kernel_guard/preempt"] +smp = ["kspin/smp"] sched_fifo = ["multitask"] sched_rr = ["multitask", "preempt"] @@ -29,6 +37,7 @@ test = ["percpu?/sp-naive"] [dependencies] cfg-if = "1.0" log = "0.4.21" +bitmaps = { version = "3.2.1", default-features = false } axhal = { workspace = true } axconfig = { workspace = true, optional = true } percpu = { version = "0.1", optional = true } @@ -38,7 +47,8 @@ memory_addr = { version = "0.3", optional = true } timer_list = { version = "0.1", optional = true } kernel_guard = { version = "0.1", optional = true } crate_interface = { version = "0.1", optional = true } -scheduler = { git = "https://github.com/arceos-org/scheduler.git", tag = "v0.1.0", optional = true } +# scheduler = { git = "https://github.com/arceos-org/scheduler.git", tag = "v0.1.0", optional = true } +scheduler = { git = "https://github.com/arceos-org/scheduler.git", branch = "num_tasks", optional = true } [dev-dependencies] rand = "0.8" diff --git a/modules/axtask/src/api.rs b/modules/axtask/src/api.rs index 156ba9d3b6..cd99702dc2 100644 --- a/modules/axtask/src/api.rs +++ b/modules/axtask/src/api.rs @@ -2,7 +2,7 @@ use alloc::{string::String, sync::Arc}; -pub(crate) use crate::run_queue::{AxRunQueue, RUN_QUEUE}; +pub(crate) use crate::run_queue::{current_run_queue, select_run_queue}; #[doc(cfg(feature = "multitask"))] pub use crate::task::{CurrentTask, TaskId, TaskInner}; @@ -77,6 +77,8 @@ pub fn init_scheduler() { /// Initializes the task scheduler for secondary CPUs. pub fn init_scheduler_secondary() { crate::run_queue::init_secondary(); + #[cfg(feature = "irq")] + crate::timers::init(); } /// Handles periodic timer ticks for the task manager. @@ -86,13 +88,18 @@ pub fn init_scheduler_secondary() { #[doc(cfg(feature = "irq"))] pub fn on_timer_tick() { crate::timers::check_events(); - RUN_QUEUE.lock().scheduler_timer_tick(); + current_run_queue().scheduler_timer_tick(); } /// Adds the given task to the run queue, returns the task reference. pub fn spawn_task(task: TaskInner) -> AxTaskRef { let task_ref = task.into_arc(); - RUN_QUEUE.lock().add_task(task_ref.clone()); + let _kernel_guard = kernel_guard::NoPreemptIrqSave::new(); + crate::select_run_queue( + #[cfg(feature = "smp")] + task_ref.clone(), + ) + .add_task(task_ref.clone()); task_ref } @@ -103,7 +110,7 @@ pub fn spawn_raw(f: F, name: String, stack_size: usize) -> AxTaskRef where F: FnOnce() + Send + 'static, { - spawn_task(TaskInner::new(f, name, stack_size)) + spawn_task(TaskInner::new(f, name, stack_size, None)) } /// Spawns a new task with the default parameters. @@ -129,13 +136,13 @@ where /// /// [CFS]: https://en.wikipedia.org/wiki/Completely_Fair_Scheduler pub fn set_priority(prio: isize) -> bool { - RUN_QUEUE.lock().set_current_priority(prio) + current_run_queue().set_current_priority(prio) } /// Current task gives up the CPU time voluntarily, and switches to another /// ready task. pub fn yield_now() { - RUN_QUEUE.lock().yield_current(); + current_run_queue().yield_current() } /// Current task is going to sleep for the given duration. @@ -150,14 +157,14 @@ pub fn sleep(dur: core::time::Duration) { /// If the feature `irq` is not enabled, it uses busy-wait instead. pub fn sleep_until(deadline: axhal::time::TimeValue) { #[cfg(feature = "irq")] - RUN_QUEUE.lock().sleep_until(deadline); + current_run_queue().sleep_until(deadline); #[cfg(not(feature = "irq"))] axhal::time::busy_wait_until(deadline); } /// Exits the current task. pub fn exit(exit_code: i32) -> ! { - RUN_QUEUE.lock().exit_current(exit_code) + current_run_queue().exit_current(exit_code) } /// The idle task routine. diff --git a/modules/axtask/src/lib.rs b/modules/axtask/src/lib.rs index 64702634f5..489ac0a95a 100644 --- a/modules/axtask/src/lib.rs +++ b/modules/axtask/src/lib.rs @@ -42,6 +42,7 @@ cfg_if::cfg_if! { extern crate log; extern crate alloc; + #[macro_use] mod run_queue; mod task; mod task_ext; diff --git a/modules/axtask/src/run_queue.rs b/modules/axtask/src/run_queue.rs index 1592b7637e..231a8adfd1 100644 --- a/modules/axtask/src/run_queue.rs +++ b/modules/axtask/src/run_queue.rs @@ -1,37 +1,190 @@ use alloc::collections::VecDeque; use alloc::sync::Arc; -use kspin::SpinNoIrq; +use core::mem::MaybeUninit; + +use bitmaps::Bitmap; use lazyinit::LazyInit; use scheduler::BaseScheduler; +use axhal::cpu::this_cpu_id; + use crate::task::{CurrentTask, TaskState}; use crate::{AxTaskRef, Scheduler, TaskInner, WaitQueue}; -// TODO: per-CPU -pub(crate) static RUN_QUEUE: LazyInit> = LazyInit::new(); +macro_rules! percpu_static { + ($($name:ident: $ty:ty = $init:expr),* $(,)?) => { + $( + #[percpu::def_percpu] + static $name: $ty = $init; + )* + }; +} -// TODO: per-CPU -static EXITED_TASKS: SpinNoIrq> = SpinNoIrq::new(VecDeque::new()); +percpu_static! { + RUN_QUEUE: LazyInit = LazyInit::new(), + EXITED_TASKS: VecDeque = VecDeque::new(), + WAIT_FOR_EXIT: WaitQueue = WaitQueue::new(), + IDLE_TASK: LazyInit = LazyInit::new(), +} -static WAIT_FOR_EXIT: WaitQueue = WaitQueue::new(); +/// An array of references to run queues, one for each CPU, indexed by cpu_id. +/// +/// This static variable holds references to the run queues for each CPU in the system. +/// +/// # Safety +/// +/// Access to this variable is marked as `unsafe` because it contains `MaybeUninit` references, +/// which require careful handling to avoid undefined behavior. The array should be fully +/// initialized before being accessed to ensure safe usage. +static mut RUN_QUEUES: [MaybeUninit<&'static mut AxRunQueue>; axconfig::SMP] = + [ARRAY_REPEAT_VALUE; axconfig::SMP]; +const ARRAY_REPEAT_VALUE: MaybeUninit<&'static mut AxRunQueue> = MaybeUninit::uninit(); +/// Returns a reference to the current run queue. +/// +/// ## Safety +/// +/// This function returns a static reference to the current run queue, which +/// is inherently unsafe. It assumes that the `RUN_QUEUE` has been properly +/// initialized and is not accessed concurrently in a way that could cause +/// data races or undefined behavior. +/// +/// ## Returns +/// +/// A static reference to the current run queue. +#[inline] +pub(crate) fn current_run_queue() -> &'static mut AxRunQueue { + unsafe { RUN_QUEUE.current_ref_mut_raw() } +} -#[percpu::def_percpu] -static IDLE_TASK: LazyInit = LazyInit::new(); +/// Selects the run queue index based on a CPU set bitmap, minimizing the number of tasks. +/// +/// This function filters the available run queues based on the provided `cpu_set` and +/// selects the one with the fewest tasks. The selected run queue's index (cpu_id) is returned. +/// +/// ## Arguments +/// +/// * `cpu_set` - A bitmap representing the CPUs that are eligible for task execution. +/// +/// ## Returns +/// +/// The index (cpu_id) of the selected run queue. +/// +/// ## Panics +/// +/// This function will panic if there is no available run queue that matches the CPU set. +/// +#[cfg(feature = "smp")] +#[inline] +fn select_run_queue_index(cpu_set: Bitmap<{ axconfig::SMP }>) -> usize { + unsafe { + RUN_QUEUES + .iter() + .filter(|rq| cpu_set.get(rq.assume_init_ref().cpu_id())) + .min_by_key(|rq| rq.assume_init_ref().num_tasks()) + .expect("No available run queue that matches the CPU set") + .assume_init_ref() + .cpu_id() + } +} +/// Retrieves a `'static` reference to the run queue corresponding to the given index. +/// +/// This function asserts that the provided index is within the range of available CPUs +/// and returns a reference to the corresponding run queue. +/// +/// ## Arguments +/// +/// * `index` - The index of the run queue to retrieve. +/// +/// ## Returns +/// +/// A reference to the `AxRunQueue` corresponding to the provided index. +/// +/// ## Panics +/// +/// This function will panic if the index is out of bounds. +/// +#[inline] +fn get_run_queue(index: usize) -> &'static mut AxRunQueue { + assert!(index < axconfig::SMP); + unsafe { RUN_QUEUES[index].assume_init_mut() } +} + +/// Selects the appropriate run queue for the provided task. +/// +/// * In a single-core system, this function always returns a reference to the global run queue. +/// * In a multi-core system, this function selects the run queue based on the task's CPU affinity and load balance. +/// +/// ## Arguments +/// +/// * `task` - A reference to the task for which a run queue is being selected. +/// +/// ## Returns +/// +/// A reference to the selected `AxRunQueue`. +/// +/// ## TODO +/// +/// 1. Implement better load balancing across CPUs for more efficient task distribution. +/// 2. Use a more generic load balancing algorithm that can be customized or replaced. +/// +#[inline] +pub(crate) fn select_run_queue(#[cfg(feature = "smp")] task: AxTaskRef) -> &'static mut AxRunQueue { + #[cfg(not(feature = "smp"))] + { + // When SMP is disabled, all tasks are scheduled on the same global run queue. + current_run_queue() + } + #[cfg(feature = "smp")] + { + // When SMP is enabled, select the run queue based on the task's CPU affinity and load balance. + let index = select_run_queue_index(task.cpu_set()); + get_run_queue(index) + } +} + +/// AxRunQueue represents a run queue for global system or a specific CPU. pub(crate) struct AxRunQueue { + /// The ID of the CPU this run queue is associated with. + cpu_id: usize, + /// The core scheduler of this run queue. scheduler: Scheduler, } impl AxRunQueue { - pub fn new() -> SpinNoIrq { - let gc_task = TaskInner::new(gc_entry, "gc".into(), axconfig::TASK_STACK_SIZE).into_arc(); + pub fn new(cpu_id: usize) -> Self { + let gc_task = TaskInner::new( + gc_entry, + "gc".into(), + axconfig::TASK_STACK_SIZE, + // gc task shoule be pinned to the current CPU. + Some(1 << cpu_id), + ) + .into_arc(); + let mut scheduler = Scheduler::new(); scheduler.add_task(gc_task); - SpinNoIrq::new(Self { scheduler }) + Self { cpu_id, scheduler } + } + + /// Returns the cpu id of current run queue, + /// which is also its index in `RUN_QUEUES`. + pub fn cpu_id(&self) -> usize { + self.cpu_id + } + + /// Returns the number of tasks in current run queue's scheduler, + /// which is used for load balance during scheduling. + #[cfg(feature = "smp")] + pub fn num_tasks(&self) -> usize { + self.scheduler.num_tasks() } +} +/// Core functions of run queue. +impl AxRunQueue { pub fn add_task(&mut self, task: AxTaskRef) { - debug!("task spawn: {}", task.id_name()); + debug!("Add {} on run_queue {}", task.id_name(), self.cpu_id); assert!(task.is_ready()); self.scheduler.add_task(task); } @@ -46,6 +199,7 @@ impl AxRunQueue { } pub fn yield_current(&mut self) { + let _kernel_guard = kernel_guard::NoPreemptIrqSave::new(); let curr = crate::current(); trace!("task yield: {}", curr.id_name()); assert!(curr.is_running()); @@ -59,14 +213,15 @@ impl AxRunQueue { #[cfg(feature = "preempt")] pub fn preempt_resched(&mut self) { + // There is no need to disable IRQ and preemption here, because + // they both have been disabled in `current_check_preempt_pending`. let curr = crate::current(); assert!(curr.is_running()); - // When we get the mutable reference of the run queue, we must - // have held the `SpinNoIrq` lock with both IRQs and preemption - // disabled. So we need to set `current_disable_count` to 1 in - // `can_preempt()` to obtain the preemption permission before - // locking the run queue. + // When we call `preempt_resched()`, both IRQs and preemption must + // have been disabled by `kernel_guard::NoPreemptIrqSave`. So we need + // to set `current_disable_count` to 1 in `can_preempt()` to obtain + // the preemption permission. let can_preempt = curr.can_preempt(1); debug!( @@ -82,55 +237,65 @@ impl AxRunQueue { } pub fn exit_current(&mut self, exit_code: i32) -> ! { + let _kernel_guard = kernel_guard::NoPreemptIrqSave::new(); + let curr = crate::current(); debug!("task exit: {}, exit_code={}", curr.id_name(), exit_code); - assert!(curr.is_running()); + assert!(curr.is_running(), "task is not running: {:?}", curr.state()); assert!(!curr.is_idle()); if curr.is_init() { - EXITED_TASKS.lock().clear(); + EXITED_TASKS.with_current(|exited_tasks| exited_tasks.clear()); axhal::misc::terminate(); } else { curr.set_state(TaskState::Exited); - curr.notify_exit(exit_code, self); - EXITED_TASKS.lock().push_back(curr.clone()); - WAIT_FOR_EXIT.notify_one_locked(false, self); + + // Notify the joiner task. + curr.notify_exit(exit_code); + + // Push current task to the `EXITED_TASKS` list, which will be consumed by the GC task. + EXITED_TASKS.with_current(|exited_tasks| exited_tasks.push_back(curr.clone())); + // Wake up the GC task to drop the exited tasks. + WAIT_FOR_EXIT.with_current(|wq| wq.notify_one(false)); + // Schedule to next task. self.resched(false); } + drop(_kernel_guard); unreachable!("task exited!"); } - pub fn block_current(&mut self, wait_queue_push: F) - where - F: FnOnce(AxTaskRef), - { + pub fn blocked_resched(&mut self) { let curr = crate::current(); - debug!("task block: {}", curr.id_name()); - assert!(curr.is_running()); - assert!(!curr.is_idle()); - - // we must not block current task with preemption disabled. - #[cfg(feature = "preempt")] - assert!(curr.can_preempt(1)); + assert!( + curr.is_blocking(), + "task is not blocking, {:?}", + curr.state() + ); - curr.set_state(TaskState::Blocked); - wait_queue_push(curr.clone()); + debug!("task block: {}", curr.id_name()); self.resched(false); } + /// Unblock one task by inserting it into the run queue. + /// If task state is `BLOCKING`, it will enter a loop until the task is in `BLOCKED` state. pub fn unblock_task(&mut self, task: AxTaskRef, resched: bool) { - debug!("task unblock: {}", task.id_name()); - if task.is_blocked() { + task.clone().unblock_locked(|| { + let cpu_id = self.cpu_id; + debug!("task unblock: {} on run_queue {}", task.id_name(), cpu_id); task.set_state(TaskState::Ready); - self.scheduler.add_task(task); // TODO: priority - if resched { + self.scheduler.add_task(task.clone()); // TODO: priority + + // Note: when the task is unblocked on another CPU's run queue, + // we just ingiore the `resched` flag. + if resched && cpu_id == this_cpu_id() { #[cfg(feature = "preempt")] crate::current().set_preempt_pending(true); } - } + }) } #[cfg(feature = "irq")] pub fn sleep_until(&mut self, deadline: axhal::time::TimeValue) { + let kernel_guard = kernel_guard::NoPreemptIrqSave::new(); let curr = crate::current(); debug!("task sleep: {}, deadline={:?}", curr.id_name(), deadline); assert!(curr.is_running()); @@ -139,9 +304,10 @@ impl AxRunQueue { let now = axhal::time::wall_time(); if now < deadline { crate::timers::set_alarm_wakeup(deadline, curr.clone()); - curr.set_state(TaskState::Blocked); + curr.set_state(TaskState::Blocking); self.resched(false); } + drop(kernel_guard) } } @@ -156,19 +322,32 @@ impl AxRunQueue { self.scheduler.put_prev_task(prev.clone(), preempt); } } + + if prev.is_blocking() { + prev.set_state(TaskState::Blocked); + } + let next = self.scheduler.pick_next_task().unwrap_or_else(|| unsafe { // Safety: IRQs must be disabled at this time. IDLE_TASK.current_ref_raw().get_unchecked().clone() }); + assert!( + next.is_ready(), + "next {} is not ready: {:?}", + next.id_name(), + next.state() + ); self.switch_to(prev, next); } fn switch_to(&mut self, prev_task: CurrentTask, next_task: AxTaskRef) { - trace!( - "context switch: {} -> {}", - prev_task.id_name(), - next_task.id_name() - ); + if !prev_task.is_idle() || !next_task.is_idle() { + debug!( + "context switch: {} -> {}", + prev_task.id_name(), + next_task.id_name() + ); + } #[cfg(feature = "preempt")] next_task.set_preempt_pending(false); next_task.set_state(TaskState::Running); @@ -186,6 +365,7 @@ impl AxRunQueue { assert!(Arc::strong_count(&next_task) >= 1); CurrentTask::set_current(prev_task, next_task); + (*prev_ctx_ptr).switch_to(&*next_ctx_ptr); } } @@ -194,10 +374,10 @@ impl AxRunQueue { fn gc_entry() { loop { // Drop all exited tasks and recycle resources. - let n = EXITED_TASKS.lock().len(); + let n = EXITED_TASKS.with_current(|exited_tasks| exited_tasks.len()); for _ in 0..n { // Do not do the slow drops in the critical section. - let task = EXITED_TASKS.lock().pop_front(); + let task = EXITED_TASKS.with_current(|exited_tasks| exited_tasks.pop_front()); if let Some(task) = task { if Arc::strong_count(&task) == 1 { // If I'm the last holder of the task, drop it immediately. @@ -205,18 +385,25 @@ fn gc_entry() { } else { // Otherwise (e.g, `switch_to` is not compeleted, held by the // joiner, etc), push it back and wait for them to drop first. - EXITED_TASKS.lock().push_back(task); + EXITED_TASKS.with_current(|exited_tasks| exited_tasks.push_back(task)); } } } - WAIT_FOR_EXIT.wait(); + unsafe { WAIT_FOR_EXIT.current_ref_raw() }.wait(); } } pub(crate) fn init() { + let cpu_id = this_cpu_id(); + // Create the `idle` task (not current task). const IDLE_TASK_STACK_SIZE: usize = 4096; - let idle_task = TaskInner::new(|| crate::run_idle(), "idle".into(), IDLE_TASK_STACK_SIZE); + let idle_task = TaskInner::new( + || crate::run_idle(), + "idle".into(), + IDLE_TASK_STACK_SIZE, + Some(1 << cpu_id), + ); IDLE_TASK.with_current(|i| { i.init_once(idle_task.into_arc()); }); @@ -224,12 +411,20 @@ pub(crate) fn init() { // Put the subsequent execution into the `main` task. let main_task = TaskInner::new_init("main".into()).into_arc(); main_task.set_state(TaskState::Running); - unsafe { CurrentTask::init_current(main_task) }; + unsafe { CurrentTask::init_current(main_task) } - RUN_QUEUE.init_once(AxRunQueue::new()); + info!("Initialize RUN_QUEUES"); + RUN_QUEUE.with_current(|rq| { + rq.init_once(AxRunQueue::new(cpu_id)); + }); + unsafe { + RUN_QUEUES[cpu_id].write(RUN_QUEUE.current_ref_mut_raw()); + } } pub(crate) fn init_secondary() { + let cpu_id = this_cpu_id(); + // Put the subsequent execution into the `idle` task. let idle_task = TaskInner::new_init("idle".into()).into_arc(); idle_task.set_state(TaskState::Running); @@ -237,4 +432,10 @@ pub(crate) fn init_secondary() { i.init_once(idle_task.clone()); }); unsafe { CurrentTask::init_current(idle_task) } + RUN_QUEUE.with_current(|rq| { + rq.init_once(AxRunQueue::new(cpu_id)); + }); + unsafe { + RUN_QUEUES[cpu_id].write(RUN_QUEUE.current_ref_mut_raw()); + } } diff --git a/modules/axtask/src/task.rs b/modules/axtask/src/task.rs index a916551eb5..d1d8201ab7 100644 --- a/modules/axtask/src/task.rs +++ b/modules/axtask/src/task.rs @@ -1,19 +1,21 @@ use alloc::{boxed::Box, string::String, sync::Arc}; use core::ops::Deref; +#[cfg(any(feature = "preempt", feature = "irq"))] +use core::sync::atomic::AtomicUsize; use core::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicU8, Ordering}; use core::{alloc::Layout, cell::UnsafeCell, fmt, ptr::NonNull}; -#[cfg(feature = "preempt")] -use core::sync::atomic::AtomicUsize; +use bitmaps::Bitmap; +use kspin::SpinRaw; +use memory_addr::{align_up_4k, VirtAddr}; +use axhal::arch::TaskContext; +use axhal::cpu::this_cpu_id; #[cfg(feature = "tls")] use axhal::tls::TlsArea; -use axhal::arch::TaskContext; -use memory_addr::{align_up_4k, VirtAddr}; - use crate::task_ext::AxTaskExt; -use crate::{AxRunQueue, AxTask, AxTaskRef, WaitQueue}; +use crate::{AxTask, AxTaskRef, WaitQueue}; /// A unique identifier for a thread. #[derive(Debug, Clone, Copy, Eq, PartialEq)] @@ -23,10 +25,18 @@ pub struct TaskId(u64); #[repr(u8)] #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub(crate) enum TaskState { + /// Task is running on some CPU. Running = 1, + /// Task is ready to run on some scheduler's ready queue. Ready = 2, - Blocked = 3, - Exited = 4, + /// Task is just be blocked and inserted into the wait queue, + /// but still have **NOT finished** its scheduling process. + Blocking = 3, + /// Task is blocked (in the wait queue or timer list), + /// and it has finished its scheduling process, it can be wake up by `notify()` on any run queue safely. + Blocked = 4, + /// Task is exited and waiting for being dropped. + Exited = 5, } /// The inner task structure. @@ -39,10 +49,19 @@ pub struct TaskInner { entry: Option<*mut dyn FnOnce()>, state: AtomicU8, + /// CPU affinity mask. + cpu_set: Bitmap<{ axconfig::SMP }>, + in_wait_queue: AtomicBool, #[cfg(feature = "irq")] in_timer_list: AtomicBool, + /// Used to protect the task from being unblocked by timer and `notify()` at the same time. + /// It is used in `unblock_task()`, which is called by wait queue's `notify()` and timer's callback. + /// Since preemption and irq are both disabled during `unblock_task()`, we can simply use a raw spin lock here. + #[cfg(feature = "irq")] + unblock_lock: SpinRaw<()>, + #[cfg(feature = "preempt")] need_resched: AtomicBool, #[cfg(feature = "preempt")] @@ -77,8 +96,9 @@ impl From for TaskState { match state { 1 => Self::Running, 2 => Self::Ready, - 3 => Self::Blocked, - 4 => Self::Exited, + 3 => Self::Blocking, + 4 => Self::Blocked, + 5 => Self::Exited, _ => unreachable!(), } } @@ -89,7 +109,16 @@ unsafe impl Sync for TaskInner {} impl TaskInner { /// Create a new task with the given entry function and stack size. - pub fn new(entry: F, name: String, stack_size: usize) -> Self + /// + /// When "smp" feature is enabled: + /// `cpu_set` represents a set of physical CPUs, which is implemented as a bit mask, + /// refering to `cpu_set_t` in Linux. + /// The task will be only scheduled on the specified CPUs if `cpu_set` is set as `Some(cpu_mask)`, + /// Otherwise, the task will be scheduled on all CPUs under specific load balancing policy. + /// Reference: + /// * https://man7.org/linux/man-pages/man2/sched_setaffinity.2.html + /// * https://man7.org/linux/man-pages/man3/CPU_SET.3.html + pub fn new(entry: F, name: String, stack_size: usize, cpu_set: Option) -> Self where F: FnOnce() + Send + 'static, { @@ -108,6 +137,21 @@ impl TaskInner { if t.name == "idle" { t.is_idle = true; } + t.cpu_set = match cpu_set { + Some(cpu_set) => { + let mut bit_map = Bitmap::new(); + let mut i = 0; + while i < axconfig::SMP { + if cpu_set & (1 << i) != 0 { + bit_map.set(i, true); + } + i += 1; + } + bit_map + } + // This task can be scheduled on all CPUs by default. + None => Bitmap::mask(axconfig::SMP), + }; t } @@ -171,9 +215,12 @@ impl TaskInner { is_init: false, entry: None, state: AtomicU8::new(TaskState::Ready as u8), + cpu_set: Bitmap::new(), in_wait_queue: AtomicBool::new(false), #[cfg(feature = "irq")] in_timer_list: AtomicBool::new(false), + #[cfg(feature = "irq")] + unblock_lock: SpinRaw::new(()), #[cfg(feature = "preempt")] need_resched: AtomicBool::new(false), #[cfg(feature = "preempt")] @@ -199,6 +246,7 @@ impl TaskInner { pub(crate) fn new_init(name: String) -> Self { let mut t = Self::new_common(TaskId::new(), name); t.is_init = true; + t.cpu_set.set(this_cpu_id(), true); if t.name == "idle" { t.is_idle = true; } @@ -234,6 +282,11 @@ impl TaskInner { matches!(self.state(), TaskState::Blocked) } + #[inline] + pub(crate) fn is_blocking(&self) -> bool { + matches!(self.state(), TaskState::Blocking) + } + #[inline] pub(crate) const fn is_init(&self) -> bool { self.is_init @@ -244,6 +297,11 @@ impl TaskInner { self.is_idle } + #[inline] + pub(crate) const fn cpu_set(&self) -> Bitmap<{ axconfig::SMP }> { + self.cpu_set + } + #[inline] pub(crate) fn in_wait_queue(&self) -> bool { self.in_wait_queue.load(Ordering::Acquire) @@ -266,6 +324,27 @@ impl TaskInner { self.in_timer_list.store(in_timer_list, Ordering::Release); } + pub(crate) fn unblock_locked(&self, mut run_queue_push: F) + where + F: FnMut(), + { + // When task's state is Blocking, it has not finished its scheduling process. + if self.is_blocking() { + while self.is_blocking() { + // Wait for the task to finish its scheduling process. + core::hint::spin_loop(); + } + assert!(self.is_blocked()) + } + + // When irq is enabled, use `unblock_lock` to protect the task from being unblocked by timer and `notify()` at the same time. + #[cfg(feature = "irq")] + let _lock = self.unblock_lock.lock(); + if self.is_blocked() { + run_queue_push(); + } + } + #[inline] #[cfg(feature = "preempt")] pub(crate) fn set_preempt_pending(&self, pending: bool) { @@ -297,16 +376,17 @@ impl TaskInner { fn current_check_preempt_pending() { let curr = crate::current(); if curr.need_resched.load(Ordering::Acquire) && curr.can_preempt(0) { - let mut rq = crate::RUN_QUEUE.lock(); + let _kernel_guard = kernel_guard::NoPreemptIrqSave::new(); if curr.need_resched.load(Ordering::Acquire) { - rq.preempt_resched(); + crate::current_run_queue().preempt_resched() } } } - pub(crate) fn notify_exit(&self, exit_code: i32, rq: &mut AxRunQueue) { + /// Notify all tasks that join on this task. + pub(crate) fn notify_exit(&self, exit_code: i32) { self.exit_code.store(exit_code, Ordering::Release); - self.wait_for_exit.notify_all_locked(false, rq); + self.wait_for_exit.notify_all(false); } #[inline] @@ -429,8 +509,7 @@ impl Deref for CurrentTask { } extern "C" fn task_entry() -> ! { - // release the lock that was implicitly held across the reschedule - unsafe { crate::RUN_QUEUE.force_unlock() }; + // Enable irq (if feature "irq" is enabled) before running the task entry function. #[cfg(feature = "irq")] axhal::arch::enable_irqs(); let task = crate::current(); diff --git a/modules/axtask/src/task_ext.rs b/modules/axtask/src/task_ext.rs index bacd56d0a0..f64ddbdcc9 100644 --- a/modules/axtask/src/task_ext.rs +++ b/modules/axtask/src/task_ext.rs @@ -145,7 +145,7 @@ pub trait TaskExtMut { /// /// axtask::init_scheduler(); /// -/// let mut inner = TaskInner::new(|| {}, "".into(), 0x1000); +/// let mut inner = TaskInner::new(|| {}, "".into(), 0x1000, None); /// assert!(inner.init_task_ext(TaskExtImpl { proc_id: 233 }).is_some()); /// // cannot initialize twice /// assert!(inner.init_task_ext(TaskExtImpl { proc_id: 0xdead }).is_none()); diff --git a/modules/axtask/src/timers.rs b/modules/axtask/src/timers.rs index 1c4a8eed05..cb55dde24c 100644 --- a/modules/axtask/src/timers.rs +++ b/modules/axtask/src/timers.rs @@ -1,40 +1,47 @@ use alloc::sync::Arc; -use axhal::time::wall_time; -use kspin::SpinNoIrq; + use lazyinit::LazyInit; use timer_list::{TimeValue, TimerEvent, TimerList}; -use crate::{AxTaskRef, RUN_QUEUE}; +use axhal::time::wall_time; -// TODO: per-CPU -static TIMER_LIST: LazyInit>> = LazyInit::new(); +use crate::{select_run_queue, AxTaskRef}; + +percpu_static! { + TIMER_LIST: LazyInit> = LazyInit::new(), +} struct TaskWakeupEvent(AxTaskRef); impl TimerEvent for TaskWakeupEvent { fn callback(self, _now: TimeValue) { - let mut rq = RUN_QUEUE.lock(); self.0.set_in_timer_list(false); - rq.unblock_task(self.0, true); + select_run_queue( + #[cfg(feature = "smp")] + self.0.clone(), + ) + .unblock_task(self.0, true) } } pub fn set_alarm_wakeup(deadline: TimeValue, task: AxTaskRef) { - let mut timers = TIMER_LIST.lock(); - task.set_in_timer_list(true); - timers.set(deadline, TaskWakeupEvent(task)); + TIMER_LIST.with_current(|timer_list| { + task.set_in_timer_list(true); + timer_list.set(deadline, TaskWakeupEvent(task)); + }) } pub fn cancel_alarm(task: &AxTaskRef) { - let mut timers = TIMER_LIST.lock(); - task.set_in_timer_list(false); - timers.cancel(|t| Arc::ptr_eq(&t.0, task)); + TIMER_LIST.with_current(|timer_list| { + task.set_in_timer_list(false); + timer_list.cancel(|t| Arc::ptr_eq(&t.0, task)); + }) } pub fn check_events() { loop { let now = wall_time(); - let event = TIMER_LIST.lock().expire_one(now); + let event = TIMER_LIST.with_current(|timer_list| timer_list.expire_one(now)); if let Some((_deadline, event)) = event { event.callback(now); } else { @@ -44,5 +51,7 @@ pub fn check_events() { } pub fn init() { - TIMER_LIST.init_once(SpinNoIrq::new(TimerList::new())); + TIMER_LIST.with_current(|timer_list| { + timer_list.init_once(TimerList::new()); + }); } diff --git a/modules/axtask/src/wait_queue.rs b/modules/axtask/src/wait_queue.rs index d13628ad6f..ba3fc31209 100644 --- a/modules/axtask/src/wait_queue.rs +++ b/modules/axtask/src/wait_queue.rs @@ -1,8 +1,8 @@ use alloc::collections::VecDeque; use alloc::sync::Arc; -use kspin::SpinRaw; +use kspin::SpinNoIrq; -use crate::{AxRunQueue, AxTaskRef, CurrentTask, RUN_QUEUE}; +use crate::{current_run_queue, select_run_queue, task::TaskState, AxTaskRef, CurrentTask}; /// A queue to store sleeping tasks. /// @@ -27,21 +27,21 @@ use crate::{AxRunQueue, AxTaskRef, CurrentTask, RUN_QUEUE}; /// assert_eq!(VALUE.load(Ordering::Relaxed), 1); /// ``` pub struct WaitQueue { - queue: SpinRaw>, // we already disabled IRQs when lock the `RUN_QUEUE` + queue: SpinNoIrq>, } impl WaitQueue { /// Creates an empty wait queue. pub const fn new() -> Self { Self { - queue: SpinRaw::new(VecDeque::new()), + queue: SpinNoIrq::new(VecDeque::new()), } } /// Creates an empty wait queue with space for at least `capacity` elements. pub fn with_capacity(capacity: usize) -> Self { Self { - queue: SpinRaw::new(VecDeque::with_capacity(capacity)), + queue: SpinNoIrq::new(VecDeque::with_capacity(capacity)), } } @@ -50,9 +50,8 @@ impl WaitQueue { // the event from another queue. if curr.in_wait_queue() { // wake up by timer (timeout). - // `RUN_QUEUE` is not locked here, so disable IRQs. - let _guard = kernel_guard::IrqSave::new(); - self.queue.lock().retain(|t| !curr.ptr_eq(t)); + let mut wq_locked = self.queue.lock(); + wq_locked.retain(|t| !curr.ptr_eq(t)); curr.set_in_wait_queue(false); } #[cfg(feature = "irq")] @@ -62,14 +61,42 @@ impl WaitQueue { } } + fn push_to_wait_queue(&self) { + let mut wq = self.queue.lock(); + let curr = crate::current(); + assert!(curr.is_running()); + assert!(!curr.is_idle()); + // we must not block current task with preemption disabled. + // Current expected preempt count is 2. + // 1 for `NoPreemptIrqSave`, 1 for wait queue's `SpinNoIrq`. + #[cfg(feature = "preempt")] + assert!(curr.can_preempt(2)); + + // We set task state as `Blocking` to clarify that the task is blocked + // but **still NOT** finished its scheduling process. + // + // When another task (generally on another run queue) try to unblock this task, + // * if this task's state is still `Blocking`: + // it needs to wait for this task's state to be changed to `Blocked`, which means it has finished its scheduling process. + // * if this task's state is `Blocked`: + // it means this task is blocked and finished its scheduling process, in another word, it has left current run queue, + // so this task can be scheduled on any run queue. + curr.set_state(TaskState::Blocking); + curr.set_in_wait_queue(true); + + debug!("{} push to wait queue", curr.id_name()); + + wq.push_back(curr.clone()); + } + /// Blocks the current task and put it into the wait queue, until other task /// notifies it. pub fn wait(&self) { - RUN_QUEUE.lock().block_current(|task| { - task.set_in_wait_queue(true); - self.queue.lock().push_back(task) - }); + let kernel_guard = kernel_guard::NoPreemptIrqSave::new(); + self.push_to_wait_queue(); + current_run_queue().blocked_resched(); self.cancel_events(crate::current()); + drop(kernel_guard); } /// Blocks the current task and put it into the wait queue, until the given @@ -81,23 +108,40 @@ impl WaitQueue { where F: Fn() -> bool, { + let kernel_guard = kernel_guard::NoPreemptIrqSave::new(); loop { - let mut rq = RUN_QUEUE.lock(); + let mut wq = self.queue.lock(); if condition() { break; } - rq.block_current(|task| { - task.set_in_wait_queue(true); - self.queue.lock().push_back(task); - }); + let curr = crate::current(); + assert!(curr.is_running()); + assert!(!curr.is_idle()); + + debug!("{} push to wait queue on wait_until", curr.id_name()); + + // we must not block current task with preemption disabled. + // Current expected preempt count is 2. + // 1 for `NoPreemptIrqSave`, 1 for wait queue's `SpinNoIrq`. + #[cfg(feature = "preempt")] + assert!(curr.can_preempt(2)); + wq.push_back(curr.clone()); + + curr.set_state(TaskState::Blocking); + curr.set_in_wait_queue(true); + drop(wq); + + current_run_queue().blocked_resched(); } self.cancel_events(crate::current()); + drop(kernel_guard); } /// Blocks the current task and put it into the wait queue, until other tasks /// notify it, or the given duration has elapsed. #[cfg(feature = "irq")] pub fn wait_timeout(&self, dur: core::time::Duration) -> bool { + let kernel_guard = kernel_guard::NoPreemptIrqSave::new(); let curr = crate::current(); let deadline = axhal::time::wall_time() + dur; debug!( @@ -107,12 +151,12 @@ impl WaitQueue { ); crate::timers::set_alarm_wakeup(deadline, curr.clone()); - RUN_QUEUE.lock().block_current(|task| { - task.set_in_wait_queue(true); - self.queue.lock().push_back(task) - }); + self.push_to_wait_queue(); + current_run_queue().blocked_resched(); + let timeout = curr.in_wait_queue(); // still in the wait queue, must have timed out self.cancel_events(curr); + drop(kernel_guard); timeout } @@ -126,6 +170,7 @@ impl WaitQueue { where F: Fn() -> bool, { + let kernel_guard = kernel_guard::NoPreemptIrqSave::new(); let curr = crate::current(); let deadline = axhal::time::wall_time() + dur; debug!( @@ -137,17 +182,29 @@ impl WaitQueue { let mut timeout = true; while axhal::time::wall_time() < deadline { - let mut rq = RUN_QUEUE.lock(); + let mut wq = self.queue.lock(); if condition() { timeout = false; break; } - rq.block_current(|task| { - task.set_in_wait_queue(true); - self.queue.lock().push_back(task); - }); + assert!(curr.is_running()); + assert!(!curr.is_idle()); + + // we must not block current task with preemption disabled. + // Current expected preempt count is 2. + // 1 for `NoPreemptIrqSave`, 1 for wait queue's `SpinNoIrq`. + #[cfg(feature = "preempt")] + assert!(curr.can_preempt(2)); + wq.push_back(curr.clone()); + + curr.set_state(TaskState::Blocking); + curr.set_in_wait_queue(true); + drop(wq); + + current_run_queue().blocked_resched() } self.cancel_events(curr); + drop(kernel_guard); timeout } @@ -156,9 +213,12 @@ impl WaitQueue { /// If `resched` is true, the current task will be preempted when the /// preemption is enabled. pub fn notify_one(&self, resched: bool) -> bool { - let mut rq = RUN_QUEUE.lock(); - if !self.queue.lock().is_empty() { - self.notify_one_locked(resched, &mut rq) + let mut wq = self.queue.lock(); + if let Some(task) = wq.pop_front() { + task.set_in_wait_queue(false); + unblock_one_task(task, resched); + drop(wq); + true } else { false } @@ -170,14 +230,14 @@ impl WaitQueue { /// preemption is enabled. pub fn notify_all(&self, resched: bool) { loop { - let mut rq = RUN_QUEUE.lock(); - if let Some(task) = self.queue.lock().pop_front() { + let mut wq = self.queue.lock(); + if let Some(task) = wq.pop_front() { task.set_in_wait_queue(false); - rq.unblock_task(task, resched); + unblock_one_task(task, resched); } else { break; } - drop(rq); // we must unlock `RUN_QUEUE` after unlocking `self.queue`. + drop(wq); } } @@ -186,31 +246,31 @@ impl WaitQueue { /// If `resched` is true, the current task will be preempted when the /// preemption is enabled. pub fn notify_task(&mut self, resched: bool, task: &AxTaskRef) -> bool { - let mut rq = RUN_QUEUE.lock(); let mut wq = self.queue.lock(); - if let Some(index) = wq.iter().position(|t| Arc::ptr_eq(t, task)) { - task.set_in_wait_queue(false); - rq.unblock_task(wq.remove(index).unwrap(), resched); - true - } else { - false - } - } - - pub(crate) fn notify_one_locked(&self, resched: bool, rq: &mut AxRunQueue) -> bool { - if let Some(task) = self.queue.lock().pop_front() { + let task_to_be_notify = { + if let Some(index) = wq.iter().position(|t| Arc::ptr_eq(t, task)) { + wq.remove(index) + } else { + None + } + }; + if let Some(task) = task_to_be_notify { + // Mark task as not in wait queue. task.set_in_wait_queue(false); - rq.unblock_task(task, resched); + unblock_one_task(task, resched); + drop(wq); true } else { false } } +} - pub(crate) fn notify_all_locked(&self, resched: bool, rq: &mut AxRunQueue) { - while let Some(task) = self.queue.lock().pop_front() { - task.set_in_wait_queue(false); - rq.unblock_task(task, resched); - } - } +pub(crate) fn unblock_one_task(task: AxTaskRef, resched: bool) { + // Select run queue by the CPU set of the task. + select_run_queue( + #[cfg(feature = "smp")] + task.clone(), + ) + .unblock_task(task, resched) }