Skip to content

Commit 8a353c9

Browse files
author
share121
committed
chore(fast-pull,fast-steal): 整理代码
1 parent 1632a97 commit 8a353c9

File tree

13 files changed

+222
-2340
lines changed

13 files changed

+222
-2340
lines changed

Cargo.lock

Lines changed: 173 additions & 1675 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/fast-pull/Cargo.toml

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ homepage.workspace = true
1616
tokio = { version = "1.47.1", features = [
1717
"macros",
1818
"io-util",
19+
"time",
1920
], default-features = false }
2021
kanal.workspace = true
2122
fast-steal.workspace = true
@@ -26,28 +27,12 @@ mmap-io = { version = "0.8.0", features = [
2627
"async",
2728
"hugepages",
2829
], optional = true }
29-
reqwest = { version = "0.12.22", features = [
30-
"stream",
31-
], default-features = false, optional = true }
32-
content_disposition = { version = "0.4.0", optional = true }
33-
sanitize-filename = { version = "0.6.0", optional = true }
34-
urlencoding = { version = "2.1.3", optional = true }
35-
url = { workspace = true, optional = true }
3630

3731
[dev-dependencies]
38-
mockito = "1.7.0"
3932
tempfile = "3.20.0"
4033

4134
[features]
42-
default = ["file", "reqwest"]
35+
default = ["file"]
4336

4437
file = ["mmap-io"]
4538
mmap-io = ["dep:mmap-io"]
46-
47-
reqwest = [
48-
"dep:url",
49-
"dep:reqwest",
50-
"dep:content_disposition",
51-
"dep:sanitize-filename",
52-
"dep:urlencoding",
53-
]

crates/fast-pull/src/base/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ mod progress;
44
mod puller;
55
mod pusher;
66
mod total;
7-
#[cfg(feature = "reqwest")]
8-
pub(crate) mod url;
97

108
pub use event::*;
119
pub use merge_progress::*;

crates/fast-pull/src/base/pusher.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@ use crate::ProgressEntry;
22
use bytes::Bytes;
33
use core::future;
44

5-
/// Random Pusher
6-
///
7-
/// Implementation should only overwrite content in `range`, even if `content.len()` is larger than `range.total()`
85
pub trait RandPusher: Send {
96
type Error: Send;
107
fn push(

crates/fast-pull/src/base/url.rs

Lines changed: 0 additions & 18 deletions
This file was deleted.

crates/fast-pull/src/core/multi.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
extern crate alloc;
22
use super::macros::poll_ok;
3-
use crate::{DownloadResult, Event, ProgressEntry, RandPuller, RandPusher, WorkerId};
3+
use crate::{DownloadResult, Event, ProgressEntry, RandPuller, RandPusher, Total, WorkerId};
44
use alloc::{sync::Arc, vec::Vec};
55
use bytes::Bytes;
66
use core::{
@@ -51,20 +51,18 @@ where
5151
options.retry_gap
5252
);
5353
});
54-
let executor: TokioExecutor<R, W> = TokioExecutor {
54+
let executor: Arc<TokioExecutor<R, W>> = Arc::new(TokioExecutor {
5555
tx,
5656
tx_push,
5757
puller,
5858
retry_gap: options.retry_gap,
5959
id: Arc::new(AtomicUsize::new(0)),
6060
min_chunk_size: options.min_chunk_size,
61-
};
62-
let task_list = TaskList::run(
63-
options.concurrent,
64-
options.min_chunk_size,
65-
&options.download_chunks[..],
66-
executor,
67-
);
61+
});
62+
let task_list = Arc::new(TaskList::run(&options.download_chunks[..], executor));
63+
task_list
64+
.clone()
65+
.set_threads(options.concurrent, options.min_chunk_size);
6866
DownloadResult::new(
6967
event_chain,
7068
push_handle,
@@ -117,7 +115,7 @@ where
117115
let mut stream = puller.pull(&download_range);
118116
loop {
119117
match stream.try_next().await {
120-
Ok(Some(chunk)) => {
118+
Ok(Some(mut chunk)) => {
121119
let len = chunk.len() as u64;
122120
task.fetch_add_start(len);
123121
let range_start = start;
@@ -127,14 +125,12 @@ where
127125
continue 'steal_task;
128126
}
129127
let span = range_start..range_end;
128+
chunk.truncate(span.total() as usize);
130129
self.tx
131130
.send(Event::PullProgress(id, span.clone()))
132131
.await
133132
.unwrap();
134-
self.tx_push
135-
.send((id, span, chunk))
136-
.await
137-
.unwrap();
133+
self.tx_push.send((id, span, chunk)).await.unwrap();
138134
}
139135
Ok(None) => break,
140136
Err(e) => {
@@ -144,8 +140,8 @@ where
144140
}
145141
}
146142
}
147-
self.tx.send(Event::Finished(id)).await.unwrap();
148143
task_list.remove(&task);
144+
self.tx.send(Event::Finished(id)).await.unwrap();
149145
});
150146
TokioHandle(handle.abort_handle())
151147
}

crates/fast-pull/src/file/pusher.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@ extern crate std;
22
use crate::{ProgressEntry, RandPusher, SeqPusher, Total};
33
use bytes::Bytes;
44
use mmap_io::{MemoryMappedFile, MmapIoError, MmapMode, flush::FlushPolicy};
5-
use std::{path::Path, vec::Vec};
6-
use thiserror::Error;
5+
use std::{collections::VecDeque, path::Path};
76
use tokio::{
87
fs::{File, OpenOptions},
98
io::{self, AsyncSeekExt, AsyncWriteExt, BufWriter, SeekFrom},
109
};
1110

12-
#[derive(Error, Debug)]
11+
#[derive(thiserror::Error, Debug)]
1312
pub enum FilePusherError {
1413
#[error(transparent)]
1514
MmapIo(#[from] MmapIoError),
@@ -76,7 +75,7 @@ impl RandPusher for RandFilePusherMmap {
7675
self.mmap
7776
.as_slice_mut(range.start, range.total())?
7877
.as_mut()
79-
.copy_from_slice(&bytes[0..(range.total() as usize)]);
78+
.copy_from_slice(&bytes);
8079
self.downloaded += bytes.len();
8180
if self.downloaded >= self.buffer_size {
8281
self.mmap.flush_async().await?;
@@ -93,7 +92,7 @@ impl RandPusher for RandFilePusherMmap {
9392
#[derive(Debug)]
9493
pub struct RandFilePusherStd {
9594
buffer: BufWriter<File>,
96-
cache: Vec<(u64, Bytes)>,
95+
cache: VecDeque<(u64, Bytes)>,
9796
p: u64,
9897
cache_size: usize,
9998
buffer_size: usize,
@@ -103,7 +102,7 @@ impl RandFilePusherStd {
103102
file.set_len(size).await?;
104103
Ok(Self {
105104
buffer: BufWriter::with_capacity(buffer_size, file),
106-
cache: Vec::new(),
105+
cache: VecDeque::new(),
107106
p: 0,
108107
cache_size: 0,
109108
buffer_size,
@@ -112,26 +111,26 @@ impl RandFilePusherStd {
112111
}
113112
impl RandPusher for RandFilePusherStd {
114113
type Error = FilePusherError;
115-
async fn push(&mut self, range: ProgressEntry, mut bytes: Bytes) -> Result<(), Self::Error> {
114+
async fn push(&mut self, range: ProgressEntry, bytes: Bytes) -> Result<(), Self::Error> {
116115
let pos = self.cache.partition_point(|(i, _)| i < &range.start);
117116
self.cache_size += bytes.len();
118-
drop(bytes.split_off(range.total() as usize));
119117
self.cache.insert(pos, (range.start, bytes));
120118
if self.cache_size >= self.buffer_size {
121119
self.flush().await?;
122120
}
123121
Ok(())
124122
}
125123
async fn flush(&mut self) -> Result<(), Self::Error> {
126-
for (start, bytes) in self.cache.drain(..) {
124+
while let Some((start, bytes)) = self.cache.front() {
127125
let len = bytes.len();
128126
self.cache_size -= len;
129-
if start != self.p {
130-
self.buffer.seek(SeekFrom::Start(start)).await?;
131-
self.p = start;
127+
if *start != self.p {
128+
self.buffer.seek(SeekFrom::Start(*start)).await?;
129+
self.p = *start;
132130
}
133-
self.buffer.write_all(&bytes).await?;
131+
self.buffer.write_all(bytes).await?;
134132
self.p += len as u64;
133+
self.cache.pop_front();
135134
}
136135
self.buffer.flush().await?;
137136
Ok(())
@@ -142,6 +141,7 @@ impl RandPusher for RandFilePusherStd {
142141
mod tests {
143142
use super::*;
144143
use bytes::Bytes;
144+
use std::vec::Vec;
145145
use tempfile::NamedTempFile;
146146
use tokio::io::AsyncReadExt;
147147

crates/fast-pull/src/lib.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,5 @@ mod base;
33
mod core;
44
#[cfg(feature = "file")]
55
pub mod file;
6-
#[cfg(feature = "reqwest")]
7-
pub mod reqwest;
86
pub use base::*;
97
pub use core::*;

crates/fast-pull/src/reqwest/mod.rs

Lines changed: 0 additions & 10 deletions
This file was deleted.

0 commit comments

Comments
 (0)