Skip to content

Commit 99463c5

Browse files
committed
fix(fast-steal): 修复任务窃取中 min_chunk_size 含义错误的问题
1 parent 9de58bf commit 99463c5

File tree

4 files changed

+14
-14
lines changed

4 files changed

+14
-14
lines changed

crates/fast-pull/src/core/multi.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::{DownloadResult, Event, ProgressEntry, RandPuller, RandPusher, Total,
44
use alloc::{sync::Arc, vec::Vec};
55
use bytes::Bytes;
66
use core::{
7-
num::{NonZero, NonZeroU64, NonZeroUsize},
7+
num::{NonZeroU64, NonZeroUsize},
88
sync::atomic::{AtomicUsize, Ordering},
99
time::Duration,
1010
};
@@ -98,13 +98,12 @@ where
9898
type Handle = TokioHandle;
9999
fn execute(self: Arc<Self>, task: Arc<Task>, task_list: Arc<TaskList<Self>>) -> Self::Handle {
100100
let id = self.id.fetch_add(1, Ordering::SeqCst);
101-
let steal_min_chunk_size = NonZero::new(2 * self.min_chunk_size.get()).unwrap();
102101
let mut puller = self.puller.clone();
103102
let handle = tokio::spawn(async move {
104103
'steal_task: loop {
105104
let mut start = task.start();
106105
if start >= task.end() {
107-
if task_list.steal(&task, steal_min_chunk_size) {
106+
if task_list.steal(&task, self.min_chunk_size) {
108107
continue;
109108
} else {
110109
break;
@@ -165,6 +164,7 @@ mod tests {
165164
mock::{MockPuller, build_mock_data},
166165
};
167166
use alloc::vec;
167+
use core::num::NonZero;
168168
use std::dbg;
169169

170170
#[tokio::test]

crates/fast-steal/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ impl Executor for TokioExecutor {
4949
println!("task: {i} = {res}");
5050
self.tx.send((i, res)).unwrap();
5151
}
52-
if !task_list.steal(&task, NonZero::new(2).unwrap()) {
52+
if !task_list.steal(&task, NonZero::new(1).unwrap()) {
5353
break;
5454
}
5555
}
@@ -81,8 +81,8 @@ async fn main() {
8181
let (tx, mut rx) = mpsc::unbounded_channel();
8282
let executor = TokioExecutor { tx };
8383
let pre_data = [1..20, 41..48];
84-
let task_list = TaskList::run(NonZero::new(8).unwrap(), NonZero::new(2).unwrap(), &pre_data[..], executor);
85-
let handles: Arc<[_]> = task_list.handles(|it| it.collect());
84+
let task_list = TaskList::run(NonZero::new(8).unwrap(), NonZero::new(1).unwrap(), &pre_data[..], executor);
85+
let handles: Arc<[_]> = task_list.handles(|it| it.map(|h| h.clone()).collect());
8686
drop(task_list);
8787
for handle in handles.iter() {
8888
handle.0.lock().await.take().unwrap().await.unwrap();

crates/fast-steal/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
//! println!("task: {i} = {res}");
5151
//! self.tx.send((i, res)).unwrap();
5252
//! }
53-
//! if !task_list.steal(&task, NonZero::new(2).unwrap()) {
53+
//! if !task_list.steal(&task, NonZero::new(1).unwrap()) {
5454
//! break;
5555
//! }
5656
//! }
@@ -85,8 +85,8 @@
8585
//! let task_list = Arc::new(TaskList::run(&pre_data[..], executor));
8686
//! task_list
8787
//! .clone()
88-
//! .set_threads(NonZero::new(8).unwrap(), NonZero::new(2).unwrap());
89-
//! let handles: Arc<[_]> = task_list.handles(|it| it.map(|task| task.clone()).collect());
88+
//! .set_threads(NonZero::new(8).unwrap(), NonZero::new(1).unwrap());
89+
//! let handles: Arc<[_]> = task_list.handles(|it| it.map(|h| h.clone()).collect());
9090
//! drop(task_list);
9191
//! for handle in handles.iter() {
9292
//! handle.0.lock().await.take().unwrap().await.unwrap();

crates/fast-steal/src/task_list.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ impl<E: Executor> TaskList<E> {
5151
return true;
5252
}
5353
if let Some(steal_task) = guard.running.iter().map(|w| &w.0).max()
54-
&& steal_task.remain() >= min_chunk_size
54+
&& steal_task.remain() >= min_chunk_size * 2
5555
{
5656
let (start, end) = steal_task.split_two();
5757
task.set_end(end);
@@ -78,7 +78,7 @@ impl<E: Executor> TaskList<E> {
7878
guard.running.extend(temp);
7979
while guard.running.len() < threads
8080
&& let Some(steal_task) = guard.running.iter().map(|w| &w.0).max()
81-
&& steal_task.remain() >= min_chunk_size
81+
&& steal_task.remain() >= min_chunk_size * 2
8282
{
8383
let (start, end) = steal_task.split_two();
8484
let task = Arc::new(Task::new(start, end));
@@ -147,7 +147,7 @@ mod tests {
147147
println!("task: {i} = {res}");
148148
self.tx.send((i, res)).unwrap();
149149
}
150-
if !task_list.steal(&task, NonZero::new(2).unwrap()) {
150+
if !task_list.steal(&task, NonZero::new(1).unwrap()) {
151151
break;
152152
}
153153
}
@@ -182,8 +182,8 @@ mod tests {
182182
let task_list = Arc::new(TaskList::run(&pre_data[..], executor));
183183
task_list
184184
.clone()
185-
.set_threads(NonZero::new(8).unwrap(), NonZero::new(2).unwrap());
186-
let handles: Arc<[_]> = task_list.handles(|it| it.map(|task| task.clone()).collect());
185+
.set_threads(NonZero::new(8).unwrap(), NonZero::new(1).unwrap());
186+
let handles: Arc<[_]> = task_list.handles(|it| it.map(|h| h.clone()).collect());
187187
drop(task_list);
188188
for handle in handles.iter() {
189189
handle.0.lock().await.take().unwrap().await.unwrap();

0 commit comments

Comments
 (0)