Skip to content

Commit

Permalink
[feat] use type CpuSet to wrap cpumask::CpuMask
Browse files Browse the repository at this point in the history
  • Loading branch information
hky1999 committed Sep 28, 2024
1 parent 74111ef commit 3ba0c58
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 67 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions modules/axtask/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ memory_addr = { version = "0.3", optional = true }
timer_list = { version = "0.1", optional = true }
kernel_guard = { version = "0.1", optional = true }
crate_interface = { version = "0.1", optional = true }
# scheduler = { git = "https://github.com/arceos-org/scheduler.git", tag = "v0.1.0", optional = true }
scheduler = { git = "https://github.com/arceos-org/scheduler.git", branch = "num_tasks", optional = true }
scheduler = { git = "https://github.com/arceos-org/scheduler.git", tag = "v0.1.0", optional = true }
cpumask = { git = "https://github.com/arceos-org/cpumask.git", optional = true }

[dev-dependencies]
Expand Down
9 changes: 3 additions & 6 deletions modules/axtask/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub use crate::wait_queue::WaitQueue;
/// The reference type of a task.
pub type AxTaskRef = Arc<AxTask>;

pub type CpuSet = cpumask::CpuMask<{ axconfig::SMP }>;

cfg_if::cfg_if! {
if #[cfg(feature = "sched_rr")] {
const MAX_TIME_SLICE: usize = 5;
Expand Down Expand Up @@ -96,12 +98,7 @@ pub fn on_timer_tick() {
/// Adds the given task to the run queue, returns the task reference.
pub fn spawn_task(task: TaskInner) -> AxTaskRef {
let task_ref = task.into_arc();
let _kernel_guard = kernel_guard::NoPreemptIrqSave::new();
crate::select_run_queue::<NoPreemptIrqSave>(
#[cfg(feature = "smp")]
task_ref.clone(),
)
.add_task(task_ref.clone());
crate::select_run_queue::<NoPreemptIrqSave>(task_ref.clone()).add_task(task_ref.clone());
task_ref
}

Expand Down
80 changes: 39 additions & 41 deletions modules/axtask/src/run_queue.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use alloc::collections::VecDeque;
use alloc::sync::Arc;
use core::mem::MaybeUninit;
use core::sync::atomic::{AtomicUsize, Ordering};

use cpumask::CpuMask;
use kernel_guard::BaseGuard;
use kspin::SpinRaw;
use lazyinit::LazyInit;
use scheduler::BaseScheduler;

use axhal::cpu::this_cpu_id;

use crate::task::{CurrentTask, TaskState};
use crate::{AxTaskRef, Scheduler, TaskInner, WaitQueue};
use crate::{AxTaskRef, CpuSet, Scheduler, TaskInner, WaitQueue};

macro_rules! percpu_static {
($($name:ident: $ty:ty = $init:expr),* $(,)?) => {
Expand Down Expand Up @@ -63,10 +65,10 @@ pub(crate) fn current_run_queue<G: BaseGuard>() -> AxRunQueueRef<'static, G> {
}
}

/// Selects the run queue index based on a CPU set bitmap, minimizing the number of tasks.
/// Selects the run queue index based on a CPU set bitmap and load balancing.
///
/// This function filters the available run queues based on the provided `cpumask` and
/// selects the one with the fewest tasks. The selected run queue's index (cpu_id) is returned.
/// selects the run queue index for the next task. The selection is based on a round-robin algorithm.
///
/// ## Arguments
///
Expand All @@ -78,19 +80,22 @@ pub(crate) fn current_run_queue<G: BaseGuard>() -> AxRunQueueRef<'static, G> {
///
/// ## Panics
///
/// This function will panic if there is no available run queue that matches the CPU set.
/// This function will panic if `cpu_mask` is empty, indicating that there are no available CPUs for task execution.
///
#[cfg(feature = "smp")]
#[inline]
fn select_run_queue_index(cpumask: CpuMask<{ axconfig::SMP }>) -> usize {
unsafe {
RUN_QUEUES
.iter()
.filter(|rq| cpumask.get(rq.assume_init_ref().cpu_id()))
.min_by_key(|rq| rq.assume_init_ref().num_tasks())
.expect("No available run queue that matches the CPU set")
.assume_init_ref()
.cpu_id()
fn select_run_queue_index(cpumask: CpuSet) -> usize {
static RUN_QUEUE_INDEX: AtomicUsize = AtomicUsize::new(0);

assert!(!cpumask.is_empty(), "No available CPU for task execution");

// Round-robin selection of the run queue index.
loop {
let index = RUN_QUEUE_INDEX.load(Ordering::SeqCst) % axconfig::SMP;
if cpumask.get(index) {
return index;
}
RUN_QUEUE_INDEX.fetch_add(1, Ordering::SeqCst);
}
}

Expand Down Expand Up @@ -136,9 +141,7 @@ fn get_run_queue(index: usize) -> &'static mut AxRunQueue {
/// 2. Use a more generic load balancing algorithm that can be customized or replaced.
///
#[inline]
pub(crate) fn select_run_queue<G: BaseGuard>(
#[cfg(feature = "smp")] task: AxTaskRef,
) -> AxRunQueueRef<'static, G> {
pub(crate) fn select_run_queue<G: BaseGuard>(task: AxTaskRef) -> AxRunQueueRef<'static, G> {
#[cfg(not(feature = "smp"))]
{
// When SMP is disabled, all tasks are scheduled on the same global run queue.
Expand All @@ -162,7 +165,9 @@ pub(crate) struct AxRunQueue {
/// The ID of the CPU this run queue is associated with.
cpu_id: usize,
/// The core scheduler of this run queue.
scheduler: Scheduler,
/// Since irq and preempt are preserved by the kernel guard hold by `AxRunQueueRef`,
/// we just use a simple raw spin lock here.
scheduler: SpinRaw<Scheduler>,
}

pub(crate) struct AxRunQueueRef<'a, G: BaseGuard> {
Expand All @@ -185,20 +190,10 @@ impl AxRunQueue {

let mut scheduler = Scheduler::new();
scheduler.add_task(gc_task);
Self { cpu_id, scheduler }
}

/// Returns the cpu id of current run queue,
/// which is also its index in `RUN_QUEUES`.
pub fn cpu_id(&self) -> usize {
self.cpu_id
}

/// Returns the number of tasks in current run queue's scheduler,
/// which is used for load balance during scheduling.
#[cfg(feature = "smp")]
pub fn num_tasks(&self) -> usize {
self.scheduler.num_tasks()
Self {
cpu_id,
scheduler: SpinRaw::new(scheduler),
}
}
}

Expand All @@ -207,20 +202,19 @@ impl<'a, G: BaseGuard> AxRunQueueRef<'a, G> {
pub fn add_task(&mut self, task: AxTaskRef) {
debug!("Add {} on run_queue {}", task.id_name(), self.inner.cpu_id);
assert!(task.is_ready());
self.inner.scheduler.add_task(task);
self.inner.scheduler.lock().add_task(task);
}

#[cfg(feature = "irq")]
pub fn scheduler_timer_tick(&mut self) {
let curr = crate::current();
if !curr.is_idle() && self.inner.scheduler.task_tick(curr.as_task_ref()) {
if !curr.is_idle() && self.inner.scheduler.lock().task_tick(curr.as_task_ref()) {
#[cfg(feature = "preempt")]
curr.set_preempt_pending(true);
}
}

pub fn yield_current(&mut self) {
let _kernel_guard = kernel_guard::NoPreemptIrqSave::new();
let curr = crate::current();
trace!("task yield: {}", curr.id_name());
assert!(curr.is_running());
Expand All @@ -230,6 +224,7 @@ impl<'a, G: BaseGuard> AxRunQueueRef<'a, G> {
pub fn set_current_priority(&mut self, prio: isize) -> bool {
self.inner
.scheduler
.lock()
.set_priority(crate::current().as_task_ref(), prio)
}

Expand Down Expand Up @@ -297,7 +292,7 @@ impl<'a, G: BaseGuard> AxRunQueueRef<'a, G> {
let cpu_id = self.inner.cpu_id;
debug!("task unblock: {} on run_queue {}", task.id_name(), cpu_id);
task.set_state(TaskState::Ready);
self.inner.scheduler.add_task(task.clone()); // TODO: priority
self.inner.scheduler.lock().add_task(task.clone()); // TODO: priority

// Note: when the task is unblocked on another CPU's run queue,
// we just ingiore the `resched` flag.
Expand All @@ -310,7 +305,6 @@ impl<'a, G: BaseGuard> AxRunQueueRef<'a, G> {

#[cfg(feature = "irq")]
pub fn sleep_until(&mut self, deadline: axhal::time::TimeValue) {
let _kernel_guard = kernel_guard::NoPreemptIrqSave::new();
let curr = crate::current();
debug!("task sleep: {}, deadline={:?}", curr.id_name(), deadline);
assert!(curr.is_running());
Expand All @@ -333,14 +327,18 @@ impl AxRunQueue {
if prev.is_running() {
prev.set_state(TaskState::Ready);
if !prev.is_idle() {
self.scheduler.put_prev_task(prev.clone(), preempt);
self.scheduler.lock().put_prev_task(prev.clone(), preempt);
}
}

let next = self.scheduler.pick_next_task().unwrap_or_else(|| unsafe {
// Safety: IRQs must be disabled at this time.
IDLE_TASK.current_ref_raw().get_unchecked().clone()
});
let next = self
.scheduler
.lock()
.pick_next_task()
.unwrap_or_else(|| unsafe {
// Safety: IRQs must be disabled at this time.
IDLE_TASK.current_ref_raw().get_unchecked().clone()
});
assert!(
next.is_ready(),
"next {} is not ready: {:?}",
Expand Down
12 changes: 6 additions & 6 deletions modules/axtask/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use axhal::cpu::this_cpu_id;
use axhal::tls::TlsArea;

use crate::task_ext::AxTaskExt;
use crate::{AxTask, AxTaskRef, WaitQueue};
use crate::{AxTask, AxTaskRef, CpuSet, WaitQueue};

/// A unique identifier for a thread.
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
Expand Down Expand Up @@ -47,7 +47,7 @@ pub struct TaskInner {
state: AtomicU8,

/// CPU affinity mask.
cpumask: SpinNoIrq<CpuMask<{ axconfig::SMP }>>,
cpumask: SpinNoIrq<CpuSet>,

/// Mark whether the task is in the wait queue.
in_wait_queue: AtomicBool,
Expand Down Expand Up @@ -280,11 +280,11 @@ impl TaskInner {
}

#[inline]
pub(crate) fn cpumask(&self) -> CpuMask<{ axconfig::SMP }> {
pub(crate) fn cpumask(&self) -> CpuSet {
*self.cpumask.lock()
}

pub(crate) fn set_cpumask(&self, cpumask: CpuMask<{ axconfig::SMP }>) {
pub(crate) fn set_cpumask(&self, cpumask: CpuSet) {
*self.cpumask.lock() = cpumask
}

Expand Down Expand Up @@ -385,9 +385,9 @@ impl TaskInner {
fn current_check_preempt_pending() {
let curr = crate::current();
if curr.need_resched.load(Ordering::Acquire) && curr.can_preempt(0) {
let _kernel_guard = kernel_guard::NoPreemptIrqSave::new();
let mut rq = crate::current_run_queue::<kernel_guard::NoPreemptIrqSave>();
if curr.need_resched.load(Ordering::Acquire) {
crate::current_run_queue::<kernel_guard::NoOp>().preempt_resched()
rq.preempt_resched()
}
}
}
Expand Down
6 changes: 1 addition & 5 deletions modules/axtask/src/timers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@ impl TimerEvent for TaskWakeupEvent {
self.task.set_in_timer_list(false);
// Timer event is triggered, expire the ticket ID.
self.task.timer_ticket_expire_one();
select_run_queue::<NoOp>(
#[cfg(feature = "smp")]
self.task.clone(),
)
.unblock_task(self.task, true)
select_run_queue::<NoOp>(self.task.clone()).unblock_task(self.task, true)
}
}

Expand Down
6 changes: 1 addition & 5 deletions modules/axtask/src/wait_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,5 @@ impl WaitQueue {

pub(crate) fn unblock_one_task(task: AxTaskRef, resched: bool) {
// Select run queue by the CPU set of the task.
select_run_queue::<NoPreemptIrqSave>(
#[cfg(feature = "smp")]
task.clone(),
)
.unblock_task(task, resched)
select_run_queue::<NoPreemptIrqSave>(task.clone()).unblock_task(task, resched)
}

0 comments on commit 3ba0c58

Please sign in to comment.