Skip to content

Commit 0c24589

Browse files
authored
Merge pull request #3622 from bugadani/next
Allow polling exited tasks
2 parents c1120c7 + 76d8a89 commit 0c24589

File tree

6 files changed

+45
-47
lines changed

6 files changed

+45
-47
lines changed

embassy-executor/src/raw/mod.rs

+35-32
Original file line numberDiff line numberDiff line change
@@ -50,33 +50,32 @@ use super::SpawnToken;
5050
/// A task's complete life cycle is as follows:
5151
///
5252
/// ```text
53-
/// ┌────────────┐ ┌────────────────────────┐
54-
/// ┌─►│Not spawned │◄─6┤Not spawned|Run enqueued│
55-
/// │ ├7─►│ │
56-
/// └─────┬──────┘ └──────▲─────────────────┘
57-
/// 1 │
58-
/// │ ┌────────────┘
59-
/// 5
60-
/// ┌─────▼────┴─────────┐
61-
/// │Spawned|Run enqueued│
62-
/// │
63-
/// └─────┬▲─────────────┘
64-
/// 2│
65-
/// │3
66-
/// ┌─────▼┴─────┐
67-
/// └─4┤ Spawned │
68-
/// │ │
69-
/// └────────────┘
53+
/// ┌────────────┐ ┌────────────────────────┐
54+
/// │Not spawned │◄─5┤Not spawned|Run enqueued│
55+
/// │ ├6─►│ │
56+
/// └─────┬──────┘ └──────▲─────────────────┘
57+
/// 1 │
58+
/// │ ┌────────────┘
59+
/// │ 4
60+
/// ┌─────▼────┴─────────┐
61+
/// │Spawned|Run enqueued│
62+
/// │ │
63+
/// └─────┬▲─────────────┘
64+
/// 2│
65+
/// │3
66+
/// ┌─────▼┴─────┐
67+
/// Spawned │
68+
/// │ │
69+
/// └────────────┘
7070
/// ```
7171
///
7272
/// Transitions:
7373
/// - 1: Task is spawned - `AvailableTask::claim -> Executor::spawn`
7474
/// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue`
75-
/// - 3: Task wakes itself, waker wakes task - `Waker::wake -> wake_task -> State::run_enqueue`
76-
/// - 4: Task exits - `TaskStorage::poll -> Poll::Ready`
77-
/// - 5: A run-queued task exits - `TaskStorage::poll -> Poll::Ready`
78-
/// - 6: Task is dequeued and then ignored via `State::run_dequeue`
79-
/// - 7: A task is waken when it is not spawned - `wake_task -> State::run_enqueue`
75+
/// - 3: Task wakes itself, waker wakes task, or task exits - `Waker::wake -> wake_task -> State::run_enqueue`
76+
/// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready`
77+
/// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`.
78+
/// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue`
8079
pub(crate) struct TaskHeader {
8180
pub(crate) state: State,
8281
pub(crate) run_queue_item: RunQueueItem,
@@ -162,6 +161,10 @@ pub struct TaskStorage<F: Future + 'static> {
162161
future: UninitCell<F>, // Valid if STATE_SPAWNED
163162
}
164163

164+
unsafe fn poll_exited(_p: TaskRef) {
165+
// Nothing to do, the task is already !SPAWNED and dequeued.
166+
}
167+
165168
impl<F: Future + 'static> TaskStorage<F> {
166169
const NEW: Self = Self::new();
167170

@@ -203,14 +206,23 @@ impl<F: Future + 'static> TaskStorage<F> {
203206
}
204207

205208
unsafe fn poll(p: TaskRef) {
206-
let this = &*(p.as_ptr() as *const TaskStorage<F>);
209+
let this = &*p.as_ptr().cast::<TaskStorage<F>>();
207210

208211
let future = Pin::new_unchecked(this.future.as_mut());
209212
let waker = waker::from_task(p);
210213
let mut cx = Context::from_waker(&waker);
211214
match future.poll(&mut cx) {
212215
Poll::Ready(_) => {
216+
// As the future has finished and this function will not be called
217+
// again, we can safely drop the future here.
213218
this.future.drop_in_place();
219+
220+
// We replace the poll_fn with a despawn function, so that the task is cleaned up
221+
// when the executor polls it next.
222+
this.raw.poll_fn.set(Some(poll_exited));
223+
224+
// Make sure we despawn last, so that other threads can only spawn the task
225+
// after we're done with it.
214226
this.raw.state.despawn();
215227
}
216228
Poll::Pending => {}
@@ -411,15 +423,6 @@ impl SyncExecutor {
411423
self.run_queue.dequeue_all(|p| {
412424
let task = p.header();
413425

414-
if !task.state.run_dequeue() {
415-
// If task is not running, ignore it. This can happen in the following scenario:
416-
// - Task gets dequeued, poll starts
417-
// - While task is being polled, it gets woken. It gets placed in the queue.
418-
// - Task poll finishes, returning done=true
419-
// - RUNNING bit is cleared, but the task is already in the queue.
420-
return;
421-
}
422-
423426
#[cfg(feature = "trace")]
424427
trace::task_exec_begin(self, &p);
425428

embassy-executor/src/raw/run_queue_atomics.rs

+1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ impl RunQueue {
8181
// safety: there are no concurrent accesses to `next`
8282
next = unsafe { task.header().run_queue_item.next.get() };
8383

84+
task.header().state.run_dequeue();
8485
on_task(task);
8586
}
8687
}

embassy-executor/src/raw/run_queue_critical_section.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,10 @@ impl RunQueue {
6363
// If the task re-enqueues itself, the `next` pointer will get overwritten.
6464
// Therefore, first read the next pointer, and only then process the task.
6565

66-
// safety: we know if the task is enqueued, no one else will touch the `next` pointer.
67-
let cs = unsafe { CriticalSection::new() };
68-
next = task.header().run_queue_item.next.borrow(cs).get();
66+
critical_section::with(|cs| {
67+
next = task.header().run_queue_item.next.borrow(cs).get();
68+
task.header().state.run_dequeue(cs);
69+
});
6970

7071
on_task(task);
7172
}

embassy-executor/src/raw/state_atomics.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ impl State {
5252

5353
/// Unmark the task as run-queued. Return whether the task is spawned.
5454
#[inline(always)]
55-
pub fn run_dequeue(&self) -> bool {
56-
let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
57-
state & STATE_SPAWNED != 0
55+
pub fn run_dequeue(&self) {
56+
self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
5857
}
5958
}

embassy-executor/src/raw/state_atomics_arm.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,9 @@ impl State {
7575

7676
/// Unmark the task as run-queued. Return whether the task is spawned.
7777
#[inline(always)]
78-
pub fn run_dequeue(&self) -> bool {
78+
pub fn run_dequeue(&self) {
7979
compiler_fence(Ordering::Release);
8080

81-
let r = self.spawned.load(Ordering::Relaxed);
8281
self.run_queued.store(false, Ordering::Relaxed);
83-
r
8482
}
8583
}

embassy-executor/src/raw/state_critical_section.rs

+2-6
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,7 @@ impl State {
6767

6868
/// Unmark the task as run-queued. Return whether the task is spawned.
6969
#[inline(always)]
70-
pub fn run_dequeue(&self) -> bool {
71-
self.update(|s| {
72-
let ok = *s & STATE_SPAWNED != 0;
73-
*s &= !STATE_RUN_QUEUED;
74-
ok
75-
})
70+
pub fn run_dequeue(&self, cs: CriticalSection<'_>) {
71+
self.update_with_cs(cs, |s| *s &= !STATE_RUN_QUEUED)
7672
}
7773
}

0 commit comments

Comments
 (0)