Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

steal other workers only before going to sleep #32

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/pool/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ impl<T: TaskCell + Send> Local<T> {
&self.core
}

pub(crate) fn pop(&mut self) -> Option<Pop<T>> {
self.local_queue.pop()
pub(crate) fn pop(&mut self, steal_workers: bool) -> Option<Pop<T>> {
self.local_queue.pop(steal_workers)
}

/// Pops a task from the queue.
Expand All @@ -271,7 +271,8 @@ impl<T: TaskCell + Send> Local<T> {
if !self.core.mark_sleep() {
return false;
}
task = self.local_queue.pop();
// Steal other workers so we don't sleep while leaving other workers busy.
task = self.local_queue.pop(true);
task.is_none()
},
|| {},
Expand Down
6 changes: 4 additions & 2 deletions src/pool/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ where
// Wait some time before going to sleep, which is more expensive.
let mut spin = SpinWait::new();
loop {
if let Some(t) = self.local.pop() {
// By default we don't steal other workers to reduce the cost of pop.
// But we will always try to steal other workers before going to sleep.
if let Some(t) = self.local.pop(false) {
return Some(t);
}
if !spin.spin() {
Expand All @@ -50,7 +52,7 @@ where
self.runner.end(&mut self.local);

// Drain all futures in the queue
while self.local.pop().is_some() {}
while self.local.pop(true).is_some() {}
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ impl<T: TaskCell + Send> LocalQueue<T> {

/// Gets a task cell from the queue. Returns `None` if there is no task cell
/// available.
pub fn pop(&mut self) -> Option<Pop<T>> {
pub fn pop(&mut self, steal_workers: bool) -> Option<Pop<T>> {
match &mut self.0 {
LocalQueueInner::SingleLevel(q) => q.pop(),
LocalQueueInner::Multilevel(q) => q.pop(),
LocalQueueInner::SingleLevel(q) => q.pop(steal_workers),
LocalQueueInner::Multilevel(q) => q.pop(steal_workers),
}
}

Expand Down
55 changes: 45 additions & 10 deletions src/queue/multilevel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ where
self.local_queue.push(task_cell);
}

pub(super) fn pop(&mut self) -> Option<Pop<T>> {
pub(super) fn pop(&mut self, steal_workers: bool) -> Option<Pop<T>> {
fn into_pop<T>(mut t: T, from_local: bool) -> Pop<T>
where
T: TaskCell,
Expand Down Expand Up @@ -130,7 +130,7 @@ where
Steal::Retry => need_retry = true,
_ => {}
}
if !self.stealers.is_empty() {
if !self.stealers.is_empty() && steal_workers {
let mut found = None;
for (idx, stealer) in self.stealers.iter().enumerate() {
match stealer.steal_batch_and_pop(&self.local_queue) {
Expand Down Expand Up @@ -732,7 +732,7 @@ mod tests {
let (injector, mut locals) = builder.build(1);
injector.push(MockTask::new(0, Extras::multilevel_default()));
thread::sleep(SLEEP_DUR);
let schedule_time = locals[0].pop().unwrap().schedule_time;
let schedule_time = locals[0].pop(true).unwrap().schedule_time;
assert!(schedule_time.elapsed() >= SLEEP_DUR);
}

Expand Down Expand Up @@ -818,14 +818,49 @@ mod tests {
injector.push(MockTask::new(i, Extras::multilevel_default()));
}
let sum: u64 = (0..100)
.map(|_| locals[2].pop().unwrap().task_cell.sleep_ms)
.map(|_| locals[2].pop(false).unwrap().task_cell.sleep_ms)
.sum();
assert_eq!(sum, (0..100).sum());
assert!(locals.iter_mut().all(|c| c.pop().is_none()));
assert!(locals.iter_mut().all(|c| c.pop(true).is_none()));
}

#[test]
fn test_pop_by_steal_others() {
fn test_pop_without_stealing_workers() {
let builder = Builder::new(Config::default());
let (injector, mut locals) = builder.build_raw(3);
for i in 0..50 {
injector.push(MockTask::new(i, Extras::multilevel_default()));
}
assert!(injector.level_injectors[0]
.steal_batch(&locals[0].local_queue)
.is_success());
for i in 50..100 {
injector.push(MockTask::new(i, Extras::multilevel_default()));
}
assert!(injector.level_injectors[0]
.steal_batch(&locals[1].local_queue)
.is_success());

let mut sum = 0;
while let Some(task) = locals[2].pop(false) {
sum += task.task_cell.sleep_ms;
}
assert_ne!(
sum,
(0..100).sum(),
"locals[2] shall not pop all tasks without stealing others"
);

for &i in &[0, 1] {
while let Some(task) = locals[i].pop(false) {
sum += task.task_cell.sleep_ms;
}
}
assert_eq!(sum, (0..100).sum());
}

#[test]
fn test_pop_by_steal_workers() {
let builder = Builder::new(Config::default());
let (injector, mut locals) = builder.build_raw(3);
for i in 0..50 {
Expand All @@ -841,10 +876,10 @@ mod tests {
.steal_batch(&locals[1].local_queue)
.is_success());
let sum: u64 = (0..100)
.map(|_| locals[2].pop().unwrap().task_cell.sleep_ms)
.map(|_| locals[2].pop(true).unwrap().task_cell.sleep_ms)
.sum();
assert_eq!(sum, (0..100).sum());
assert!(locals.iter_mut().all(|c| c.pop().is_none()));
assert!(locals.iter_mut().all(|c| c.pop(true).is_none()));
}

#[test]
Expand All @@ -860,7 +895,7 @@ mod tests {
.map(|mut consumer| {
let sum = sum.clone();
thread::spawn(move || {
while let Some(pop) = consumer.pop() {
while let Some(pop) = consumer.pop(true) {
sum.fetch_add(pop.task_cell.sleep_ms, SeqCst);
}
})
Expand All @@ -881,7 +916,7 @@ mod tests {
let mut runner = runner_builder.build();

remote.spawn(MockTask::new(100, Extras::new_multilevel(1, None)));
if let Some(Pop { task_cell, .. }) = locals[0].pop() {
if let Some(Pop { task_cell, .. }) = locals[0].pop(true) {
assert!(runner.handle(&mut locals[0], task_cell));
}
assert!(
Expand Down
48 changes: 39 additions & 9 deletions src/queue/single_level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ where
self.local_queue.push(task_cell);
}

pub fn pop(&mut self) -> Option<Pop<T>> {
pub fn pop(&mut self, steal_workers: bool) -> Option<Pop<T>> {
fn into_pop<T>(mut t: T, from_local: bool) -> Pop<T>
where
T: TaskCell,
Expand All @@ -81,7 +81,7 @@ where
Steal::Retry => need_retry = true,
_ => {}
}
if !self.stealers.is_empty() {
if !self.stealers.is_empty() && steal_workers {
let mut found = None;
for (idx, stealer) in self.stealers.iter().enumerate() {
match stealer.steal_batch_and_pop(&self.local_queue) {
Expand Down Expand Up @@ -184,7 +184,7 @@ mod tests {
let (injector, mut locals) = super::create(1);
injector.push(MockCell::new(0));
thread::sleep(SLEEP_DUR);
let schedule_time = locals[0].pop().unwrap().schedule_time;
let schedule_time = locals[0].pop(true).unwrap().schedule_time;
assert!(schedule_time.elapsed() >= SLEEP_DUR);
}

Expand All @@ -195,14 +195,44 @@ mod tests {
injector.push(MockCell::new(i));
}
let sum: i32 = (0..100)
.map(|_| locals[2].pop().unwrap().task_cell.value)
.map(|_| locals[2].pop(false).unwrap().task_cell.value)
.sum();
assert_eq!(sum, (0..100).sum());
assert!(locals.iter_mut().all(|c| c.pop().is_none()));
assert!(locals.iter_mut().all(|c| c.pop(true).is_none()));
}

#[test]
fn test_pop_by_steal_others() {
fn test_pop_without_stealing_workers() {
let (injector, mut locals) = super::create(3);
for i in 0..50 {
injector.push(MockCell::new(i));
}
assert!(injector.0.steal_batch(&locals[0].local_queue).is_success());
for i in 50..100 {
injector.push(MockCell::new(i));
}
assert!(injector.0.steal_batch(&locals[1].local_queue).is_success());

let mut sum = 0;
while let Some(task) = locals[2].pop(false) {
sum += task.task_cell.value;
}
assert_ne!(
sum,
(0..100).sum(),
"locals[2] shall not pop all tasks without stealing others"
);

for &i in &[0, 1] {
while let Some(task) = locals[i].pop(false) {
sum += task.task_cell.value;
}
}
assert_eq!(sum, (0..100).sum());
}

#[test]
fn test_pop_by_steal_workers() {
let (injector, mut locals) = super::create(3);
for i in 0..50 {
injector.push(MockCell::new(i));
Expand All @@ -213,10 +243,10 @@ mod tests {
}
assert!(injector.0.steal_batch(&locals[1].local_queue).is_success());
let sum: i32 = (0..100)
.map(|_| locals[2].pop().unwrap().task_cell.value)
.map(|_| locals[2].pop(true).unwrap().task_cell.value)
.sum();
assert_eq!(sum, (0..100).sum());
assert!(locals.iter_mut().all(|c| c.pop().is_none()));
assert!(locals.iter_mut().all(|c| c.pop(true).is_none()));
}

#[test]
Expand All @@ -231,7 +261,7 @@ mod tests {
.map(|mut consumer| {
let sum = sum.clone();
thread::spawn(move || {
while let Some(pop) = consumer.pop() {
while let Some(pop) = consumer.pop(true) {
sum.fetch_add(pop.task_cell.value, Ordering::SeqCst);
}
})
Expand Down
4 changes: 2 additions & 2 deletions src/task/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ mod tests {
);
assert_eq!(rx.recv().unwrap(), 42);
assert_eq!(rx.recv().unwrap(), 42);
assert!(locals[0].pop().is_none());
assert!(locals[0].pop(true).is_none());
assert!(rx.recv().is_err());
}

Expand All @@ -230,7 +230,7 @@ mod tests {
);
assert_eq!(rx.recv().unwrap(), 42);
assert_eq!(rx.recv().unwrap(), 42);
assert!(locals[0].pop().is_some());
assert!(locals[0].pop(true).is_some());
assert!(rx.recv().is_err());
}
}
2 changes: 1 addition & 1 deletion src/task/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ mod tests {

/// Run `Runner::handle` once.
fn handle_once(&mut self) {
if let Some(t) = self.locals[0].pop() {
if let Some(t) = self.locals[0].pop(true) {
let runner = self.runner.clone();
runner.borrow_mut().handle(&mut self.locals[0], t.task_cell);
}
Expand Down