Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace the wait_queue data structure VecQueue with a linked list #184

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions api/arceos_posix_api/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ fn main() {
// TODO: generate size and initial content automatically.
let (mutex_size, mutex_init) = if cfg!(feature = "multitask") {
if cfg!(feature = "smp") {
(6, "{0, 0, 8, 0, 0, 0}") // core::mem::transmute::<_, [usize; 6]>(axsync::Mutex::new(()))
(3, "{0, 0, 0}") // core::mem::transmute::<_, [usize; 3]>(axsync::Mutex::new(()))
} else {
(5, "{0, 8, 0, 0, 0}") // core::mem::transmute::<_, [usize; 5]>(axsync::Mutex::new(()))
(2, "{0, 0}") // core::mem::transmute::<_, [usize; 2]>(axsync::Mutex::new(()))
}
} else {
(1, "{0}")
Expand Down
18 changes: 18 additions & 0 deletions modules/axsync/src/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,4 +249,22 @@ mod tests {
assert_eq!(*M.lock(), NUM_ITERS * NUM_TASKS * 3);
println!("Mutex test OK");
}

fn assert_mem_content<T>(val: &T, content: &[usize]) {
let val_ptr = val as *const T as *const usize;
let size = core::mem::size_of::<T>() / core::mem::size_of::<usize>();
assert_eq!(size, content.len());
let usize_slice = unsafe { core::slice::from_raw_parts(val_ptr, size) };
for (i, chunk) in usize_slice.iter().enumerate() {
assert_eq!(*chunk, content[i]);
}
}

#[test]
fn mutex_test_for_posix() {
// Test mutex size is equal api/arceos_posix_api/build.rs
let mutex_tuple = axsync::Mutex::new(());
let content: [usize; 2] = [0, 0];
assert_mem_content(&mutex_tuple, &content);
}
}
4 changes: 3 additions & 1 deletion modules/axtask/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ multitask = [
"kernel_guard",
"dep:crate_interface",
"dep:cpumask",
"dep:linked_list_r4l"
]
irq = []
tls = ["axhal/tls"]
Expand All @@ -48,7 +49,8 @@ timer_list = { version = "0.1", optional = true }
kernel_guard = { version = "0.1", optional = true }
crate_interface = { version = "0.1", optional = true }
cpumask = { version = "0.1", optional = true }
scheduler = { git = "https://github.com/arceos-org/scheduler.git", tag = "v0.1.0", optional = true }
scheduler = { git = "https://github.com/arceos-org/scheduler.git", tag = "v0.2.0", optional = true }
linked_list_r4l = { version = "0.2.1" , optional = true }

[dev-dependencies]
rand = "0.8"
Expand Down
12 changes: 7 additions & 5 deletions modules/axtask/src/run_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use scheduler::BaseScheduler;
use axhal::cpu::this_cpu_id;

use crate::task::{CurrentTask, TaskState};
use crate::wait_queue::WaitQueueGuard;
use crate::wait_queue::{WaitQueueGuard, WaitTaskNode};
use crate::{AxCpuMask, AxTaskRef, Scheduler, TaskInner, WaitQueue};

macro_rules! percpu_static {
Expand Down Expand Up @@ -367,7 +367,7 @@ impl<'a, G: BaseGuard> CurrentRunQueueRef<'a, G> {
/// 2. The caller must ensure that the current task is in the running state.
/// 3. The caller must ensure that the current task is not the idle task.
/// 4. The lock of the wait queue will be released explicitly after current task is pushed into it.
pub fn blocked_resched(&mut self, mut wq_guard: WaitQueueGuard) {
pub fn blocked_resched(&mut self, mut wq_guard: WaitQueueGuard, curr_waiter: &WaitTaskNode) {
let curr = &self.current_task;
assert!(curr.is_running());
assert!(!curr.is_idle());
Expand All @@ -380,9 +380,11 @@ impl<'a, G: BaseGuard> CurrentRunQueueRef<'a, G> {
// Mark the task as blocked, this has to be done before adding it to the wait queue
// while holding the lock of the wait queue.
curr.set_state(TaskState::Blocked);
curr.set_in_wait_queue(true);

wq_guard.push_back(curr.clone());
// SAFETY: The waiter was on caller stack, the lifetime ends
// only when the task resumes running.
unsafe {
assert!(wq_guard.push_back(curr_waiter));
}
// Drop the lock of wait queue explictly.
drop(wq_guard);

Expand Down
19 changes: 4 additions & 15 deletions modules/axtask/src/task.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use alloc::{boxed::Box, string::String, sync::Arc};
use core::ops::Deref;
use core::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicU8, Ordering};
use core::sync::atomic::{AtomicI32, AtomicU64, AtomicU8, Ordering};
use core::{alloc::Layout, cell::UnsafeCell, fmt, ptr::NonNull};

#[cfg(any(feature = "smp", feature = "preempt"))]
use core::sync::atomic::AtomicBool;

#[cfg(feature = "smp")]
use alloc::sync::Weak;
#[cfg(feature = "preempt")]
Expand Down Expand Up @@ -50,9 +53,6 @@ pub struct TaskInner {
/// CPU affinity mask.
cpumask: SpinNoIrq<AxCpuMask>,

/// Mark whether the task is in the wait queue.
in_wait_queue: AtomicBool,

/// Used to indicate whether the task is running on a CPU.
#[cfg(feature = "smp")]
on_cpu: AtomicBool,
Expand Down Expand Up @@ -228,7 +228,6 @@ impl TaskInner {
state: AtomicU8::new(TaskState::Ready as u8),
// By default, the task is allowed to run on all CPUs.
cpumask: SpinNoIrq::new(AxCpuMask::full()),
in_wait_queue: AtomicBool::new(false),
#[cfg(feature = "irq")]
timer_ticket_id: AtomicU64::new(0),
#[cfg(feature = "smp")]
Expand Down Expand Up @@ -317,16 +316,6 @@ impl TaskInner {
self.is_idle
}

#[inline]
pub(crate) fn in_wait_queue(&self) -> bool {
self.in_wait_queue.load(Ordering::Acquire)
}

#[inline]
pub(crate) fn set_in_wait_queue(&self, in_wait_queue: bool) {
self.in_wait_queue.store(in_wait_queue, Ordering::Release);
}

/// Returns task's current timer ticket ID.
#[inline]
#[cfg(feature = "irq")]
Expand Down
4 changes: 0 additions & 4 deletions modules/axtask/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ fn test_wait_queue() {
WQ1.notify_one(true); // WQ1.wait_until()
WQ2.wait();

assert!(!current().in_wait_queue());

COUNTER.fetch_sub(1, Ordering::Relaxed);
println!("wait_queue: task {:?} finished", current().id());
WQ1.notify_one(true); // WQ1.wait_until()
Expand All @@ -92,7 +90,6 @@ fn test_wait_queue() {
println!("task {:?} is waiting for tasks to start...", current().id());
WQ1.wait_until(|| COUNTER.load(Ordering::Relaxed) == NUM_TASKS);
assert_eq!(COUNTER.load(Ordering::Relaxed), NUM_TASKS);
assert!(!current().in_wait_queue());
WQ2.notify_all(true); // WQ2.wait()

println!(
Expand All @@ -101,7 +98,6 @@ fn test_wait_queue() {
);
WQ1.wait_until(|| COUNTER.load(Ordering::Relaxed) == 0);
assert_eq!(COUNTER.load(Ordering::Relaxed), 0);
assert!(!current().in_wait_queue());
}

#[test]
Expand Down
Loading
Loading