Skip to content

Commit 3168580

Browse files
author
share121
committed
chore: 使用 kanel 框架代替 async-channel
1 parent e9f54e5 commit 3168580

File tree

7 files changed

+51
-29
lines changed

7 files changed

+51
-29
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,14 @@ tokio = { version = "1.47.0", features = [
4040
"io-util",
4141
], default-features = false }
4242
urlencoding = "2.1.3"
43+
kanal = "0.1.1"
4344

4445
[dev-dependencies]
4546
mockito = "1.7.0"
4647
tempfile = "3.20.0"
4748

4849
[features]
49-
default = ["file"]
50+
default = ["file", "reqwest"]
5051
file = ["memmap2"]
5152
memmap2 = ["dep:memmap2"]
5253
reqwest = ["common.url", "dep:reqwest", "dep:http"]

src/common/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
#[cfg(feature = "common.url")]
22
mod url;
33

4+
use crate::{FetchResult, Fetcher, ProgressEntry, Puller, RandomPusher};
45
use std::num::NonZeroUsize;
56
use std::time::Duration;
67
#[cfg(feature = "common.url")]
78
pub use url::UrlInfo;
8-
use crate::{FetchResult, Fetcher, ProgressEntry, Puller, RandomPusher};
99

1010
#[derive(Debug, Eq, PartialEq)]
1111
pub struct DownloadOptions {
@@ -15,13 +15,15 @@ pub struct DownloadOptions {
1515
}
1616

1717
pub trait AutoDownload {
18-
async fn download<F, P>(
18+
fn download<F, P>(
1919
&self,
2020
fetcher: F,
2121
pusher: P,
2222
maybe_chunks: Option<Vec<ProgressEntry>>,
2323
options: DownloadOptions,
24-
) -> FetchResult<F::Error, <F::Puller as Puller>::Error, P::Error>
24+
) -> impl std::future::Future<
25+
Output = FetchResult<F::Error, <F::Puller as Puller>::Error, P::Error>,
26+
> + Send
2527
where
2628
F: Fetcher + Send + 'static,
2729
P: RandomPusher + Send + 'static;

src/common/url.rs

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1+
use super::{AutoDownload, DownloadOptions};
2+
use crate::{FetchResult, Fetcher, ProgressEntry, Puller, RandomPusher, multi, single};
13
use std::num::NonZeroUsize;
2-
use std::time::Duration;
34
use url::Url;
4-
use crate::{multi, single, FetchResult, Fetcher, ProgressEntry, Puller, Pusher, RandomPusher};
5-
use super::{AutoDownload, DownloadOptions};
65

76
#[derive(Debug, Clone)]
87
pub struct UrlInfo {
@@ -28,18 +27,30 @@ impl AutoDownload for UrlInfo {
2827
P: RandomPusher + Send + 'static,
2928
{
3029
if self.fast_download {
31-
multi::download_multi(fetcher, pusher, multi::DownloadOptions {
32-
pull_chunks: maybe_chunks.unwrap_or_else(|| vec![0..self.size]),
33-
concurrent: options.concurrent.unwrap_or(const { NonZeroUsize::new(1).unwrap() }),
34-
retry_gap: options.retry_gap,
35-
push_queue_cap: options.push_queue_cap,
36-
}).await
30+
multi::download_multi(
31+
fetcher,
32+
pusher,
33+
multi::DownloadOptions {
34+
#[allow(clippy::single_range_in_vec_init)]
35+
pull_chunks: maybe_chunks.unwrap_or_else(|| vec![0..self.size]),
36+
concurrent: options
37+
.concurrent
38+
.unwrap_or(const { NonZeroUsize::new(1).unwrap() }),
39+
retry_gap: options.retry_gap,
40+
push_queue_cap: options.push_queue_cap,
41+
},
42+
)
43+
.await
3744
} else {
38-
single::download_single(fetcher, pusher, single::DownloadOptions {
39-
retry_gap: options.retry_gap,
40-
push_queue_cap: options.push_queue_cap,
41-
}).await
45+
single::download_single(
46+
fetcher,
47+
pusher,
48+
single::DownloadOptions {
49+
retry_gap: options.retry_gap,
50+
push_queue_cap: options.push_queue_cap,
51+
},
52+
)
53+
.await
4254
}
4355
}
4456
}
45-

src/core/mod.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
use crate::Event;
2-
use crate::base::pusher::Pusher;
3-
use crate::base::source::{Fetcher, Puller};
4-
use async_channel::Receiver;
2+
use kanal::AsyncReceiver;
53
use std::{
64
fmt::Debug,
75
sync::{
@@ -22,15 +20,14 @@ pub mod single;
2220

2321
#[derive(Debug, Clone)]
2422
pub struct FetchResult<FetchError, PullError, PushError> {
25-
pub event_chain: Receiver<Event<FetchError, PullError, PushError>>,
23+
pub event_chain: AsyncReceiver<Event<FetchError, PullError, PushError>>,
2624
handle: Arc<Mutex<Option<JoinHandle<()>>>>,
2725
is_running: Arc<AtomicBool>,
2826
}
2927

30-
impl<FetchError, PullError, PushError> FetchResult<FetchError, PullError, PushError>
31-
{
28+
impl<FetchError, PullError, PushError> FetchResult<FetchError, PullError, PushError> {
3229
pub fn new(
33-
event_chain: Receiver<Event<FetchError, PullError, PushError>>,
30+
event_chain: AsyncReceiver<Event<FetchError, PullError, PushError>>,
3431
handle: JoinHandle<()>,
3532
is_running: Arc<AtomicBool>,
3633
) -> Self {

src/core/multi.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ where
3030
F: Fetcher + Send + 'static,
3131
P: RandomPusher + Send + 'static,
3232
{
33-
let (tx, event_chain) = async_channel::unbounded();
33+
let (tx, event_chain) = kanal::unbounded_async();
3434
let (tx_write, rx_write) =
35-
async_channel::bounded::<(WorkerId, ProgressEntry, Bytes)>(options.push_queue_cap);
35+
kanal::bounded_async::<(WorkerId, ProgressEntry, Bytes)>(options.push_queue_cap);
3636
let tx_clone = tx.clone();
3737
let handle = tokio::spawn(async move {
3838
while let Ok((id, spin, data)) = rx_write.recv().await {

src/core/single.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ where
2525
F: Fetcher + Send + 'static,
2626
P: Pusher + Send + 'static,
2727
{
28-
let (tx, event_chain) = async_channel::unbounded();
28+
let (tx, event_chain) = kanal::unbounded_async();
2929
let (tx_write, rx_write) =
30-
async_channel::bounded::<(ProgressEntry, Bytes)>(options.push_queue_cap);
30+
kanal::bounded_async::<(ProgressEntry, Bytes)>(options.push_queue_cap);
3131
let tx_clone = tx.clone();
3232
const ID: usize = 0;
3333
let handle = tokio::spawn(async move {

0 commit comments

Comments
 (0)