Skip to content

Commit 804333e

Browse files
committed
refactor: 修复大量问题
1 parent f190661 commit 804333e

File tree

15 files changed

+112
-77
lines changed

15 files changed

+112
-77
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ license = "MIT"
1010
authors = ["share121 <[email protected]>", "Cyan Changes <[email protected]>"]
1111

1212
[workspace.dependencies]
13-
fast-pull = { version = "3.2.3", path = "crates/fast-pull" }
13+
fast-pull = { version = "3.3.0", path = "crates/fast-pull" }
1414
fast-steal = { version = "6.0.10", path = "crates/fast-steal" }
1515
kanal = "0.1.1"
1616
bytes = { version = "1.10.1", default-features = false }

crates/fast-down/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "fast-down"
33
description = "Download everything fast"
4-
version = "3.2.7"
4+
version = "3.3.0"
55
readme = "README.md"
66
documentation = "https://docs.rs/fast-down"
77
keywords = ["download", "fast", "parallel", "concurrency"]

crates/fast-down/src/http/mod.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ pub use puller::*;
66
use crate::url_info::FileId;
77
use bytes::Bytes;
88
use fast_pull::ProgressEntry;
9-
use std::{fmt::Debug, future::Future};
9+
use std::{fmt::Debug, future::Future, time::Duration};
1010
use url::Url;
1111

1212
pub trait HttpClient: Clone + Send + Sync + Unpin {
@@ -16,7 +16,9 @@ pub trait HttpClient: Clone + Send + Sync + Unpin {
1616
pub trait HttpRequestBuilder {
1717
type Response: HttpResponse;
1818
type RequestError: Send + Debug;
19-
fn send(self) -> impl Future<Output = Result<Self::Response, Self::RequestError>> + Send;
19+
fn send(
20+
self,
21+
) -> impl Future<Output = Result<Self::Response, (Self::RequestError, Option<Duration>)>> + Send;
2022
}
2123
pub trait HttpResponse: Send + Unpin {
2224
type Headers: HttpHeaders;
@@ -30,10 +32,9 @@ pub trait HttpHeaders {
3032
fn get(&self, header: &str) -> Result<&str, Self::GetHeaderError>;
3133
}
3234

33-
pub type GetResponse<Client> =
34-
<<Client as HttpClient>::RequestBuilder as HttpRequestBuilder>::Response;
35-
pub type GetRequestError<Client> =
36-
<<Client as HttpClient>::RequestBuilder as HttpRequestBuilder>::RequestError;
35+
pub type GetRequestBuilder<Client> = <Client as HttpClient>::RequestBuilder;
36+
pub type GetResponse<Client> = <GetRequestBuilder<Client> as HttpRequestBuilder>::Response;
37+
pub type GetRequestError<Client> = <GetRequestBuilder<Client> as HttpRequestBuilder>::RequestError;
3738
pub type GetChunkError<Client> = <GetResponse<Client> as HttpResponse>::ChunkError;
3839
pub type GetHeader<Client> = <GetResponse<Client> as HttpResponse>::Headers;
3940
pub type GetHeaderError<Client> = <GetHeader<Client> as HttpHeaders>::GetHeaderError;

crates/fast-down/src/http/prefetch.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,22 @@ use crate::{
44
url_info::FileId,
55
};
66
use content_disposition;
7-
use std::{borrow::Borrow, future::Future};
7+
use std::{borrow::Borrow, future::Future, time::Duration};
88
use url::Url;
99

10+
pub type PrefetchResult<Client, E> = Result<(UrlInfo, GetResponse<Client>), (E, Option<Duration>)>;
11+
1012
pub trait Prefetch<Client: HttpClient> {
1113
type Error;
1214
fn prefetch(
1315
&self,
1416
url: Url,
15-
) -> impl Future<Output = Result<(UrlInfo, GetResponse<Client>), Self::Error>> + Send;
17+
) -> impl Future<Output = PrefetchResult<Client, Self::Error>> + Send;
1618
}
1719

1820
impl<Client: HttpClient, BorrowClient: Borrow<Client> + Sync> Prefetch<Client> for BorrowClient {
1921
type Error = HttpError<Client>;
20-
async fn prefetch(&self, url: Url) -> Result<(UrlInfo, GetResponse<Client>), Self::Error> {
22+
async fn prefetch(&self, url: Url) -> PrefetchResult<Client, Self::Error> {
2123
prefetch(self.borrow(), url).await
2224
}
2325
}
@@ -36,18 +38,19 @@ fn get_filename(headers: &impl HttpHeaders, url: &Url) -> String {
3638
.filter(|s| !s.trim().is_empty())
3739
.map(|s| s.to_string())
3840
})
41+
.or_else(|| url.domain().map(|s| s.to_string()))
3942
.unwrap_or_else(|| url.to_string())
4043
}
4144

4245
async fn prefetch<Client: HttpClient>(
4346
client: &Client,
4447
url: Url,
45-
) -> Result<(UrlInfo, GetResponse<Client>), HttpError<Client>> {
48+
) -> PrefetchResult<Client, HttpError<Client>> {
4649
let resp = client
4750
.get(url, None)
4851
.send()
4952
.await
50-
.map_err(HttpError::Request)?;
53+
.map_err(|(e, d)| (HttpError::Request(e), d))?;
5154
let headers = resp.headers();
5255
let supports_range = headers
5356
.get("accept-ranges")

crates/fast-down/src/http/puller.rs

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use std::{
1010
pin::{Pin, pin},
1111
sync::Arc,
1212
task::{Context, Poll},
13+
time::Duration,
1314
};
1415
use url::Url;
1516

@@ -36,8 +37,13 @@ impl<Client: HttpClient> HttpPuller<Client> {
3637
}
3738
}
3839

39-
type ResponseFut<Client> =
40-
Pin<Box<dyn Future<Output = Result<GetResponse<Client>, GetRequestError<Client>>> + Send>>;
40+
type ResponseFut<Client> = Pin<
41+
Box<
42+
dyn Future<
43+
Output = Result<GetResponse<Client>, (GetRequestError<Client>, Option<Duration>)>,
44+
> + Send,
45+
>,
46+
>;
4147
enum ResponseState<Client: HttpClient> {
4248
Pending(ResponseFut<Client>),
4349
Ready(GetResponse<Client>),
@@ -49,7 +55,7 @@ impl<Client: HttpClient + 'static> RandPuller for HttpPuller<Client> {
4955
fn pull(
5056
&mut self,
5157
range: &ProgressEntry,
52-
) -> impl TryStream<Ok = Bytes, Error = Self::Error> + Send + Unpin {
58+
) -> impl TryStream<Ok = Bytes, Error = (Self::Error, Option<Duration>)> + Send + Unpin {
5359
RandRequestStream {
5460
client: self.client.clone(),
5561
url: self.url.clone(),
@@ -76,7 +82,7 @@ struct RandRequestStream<Client: HttpClient + 'static> {
7682
file_id: FileId,
7783
}
7884
impl<Client: HttpClient> Stream for RandRequestStream<Client> {
79-
type Item = Result<Bytes, HttpError<Client>>;
85+
type Item = Result<Bytes, (HttpError<Client>, Option<Duration>)>;
8086
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
8187
let chunk_global;
8288
match &mut self.state {
@@ -89,15 +95,18 @@ impl<Client: HttpClient> Stream for RandRequestStream<Client> {
8995
let new_file_id = FileId::new(etag, last_modified);
9096
if new_file_id != self.file_id {
9197
self.state = ResponseState::None;
92-
Poll::Ready(Some(Err(HttpError::MismatchedBody(new_file_id))))
98+
Poll::Ready(Some(Err((
99+
HttpError::MismatchedBody(new_file_id),
100+
None,
101+
))))
93102
} else {
94103
self.state = ResponseState::Ready(resp);
95104
self.poll_next(cx)
96105
}
97106
}
98-
Err(e) => {
107+
Err((e, d)) => {
99108
self.state = ResponseState::None;
100-
Poll::Ready(Some(Err(HttpError::Request(e))))
109+
Poll::Ready(Some(Err((HttpError::Request(e), d))))
101110
}
102111
},
103112
Poll::Pending => Poll::Pending,
@@ -128,15 +137,17 @@ impl<Client: HttpClient> Stream for RandRequestStream<Client> {
128137
}
129138
Err(e) => {
130139
self.state = ResponseState::None;
131-
Poll::Ready(Some(Err(HttpError::Chunk(e))))
140+
Poll::Ready(Some(Err((HttpError::Chunk(e), None))))
132141
}
133142
}
134143
}
135144
}
136145

137146
impl<Client: HttpClient + 'static> SeqPuller for HttpPuller<Client> {
138147
type Error = HttpError<Client>;
139-
fn pull(&mut self) -> impl TryStream<Ok = Bytes, Error = Self::Error> + Send + Unpin {
148+
fn pull(
149+
&mut self,
150+
) -> impl TryStream<Ok = Bytes, Error = (Self::Error, Option<Duration>)> + Send + Unpin {
140151
SeqRequestStream {
141152
state: if let Some(resp) = &self.resp
142153
&& let Some(resp) = resp.lock().take()
@@ -155,7 +166,7 @@ struct SeqRequestStream<Client: HttpClient + 'static> {
155166
file_id: FileId,
156167
}
157168
impl<Client: HttpClient> Stream for SeqRequestStream<Client> {
158-
type Item = Result<Bytes, HttpError<Client>>;
169+
type Item = Result<Bytes, (HttpError<Client>, Option<Duration>)>;
159170
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
160171
let chunk_global;
161172
match &mut self.state {
@@ -168,21 +179,24 @@ impl<Client: HttpClient> Stream for SeqRequestStream<Client> {
168179
let new_file_id = FileId::new(etag, last_modified);
169180
if new_file_id != self.file_id {
170181
self.state = ResponseState::None;
171-
Poll::Ready(Some(Err(HttpError::MismatchedBody(new_file_id))))
182+
Poll::Ready(Some(Err((
183+
HttpError::MismatchedBody(new_file_id),
184+
None,
185+
))))
172186
} else {
173187
self.state = ResponseState::Ready(resp);
174188
self.poll_next(cx)
175189
}
176190
}
177-
Err(e) => {
191+
Err((e, d)) => {
178192
self.state = ResponseState::None;
179-
Poll::Ready(Some(Err(HttpError::Request(e))))
193+
Poll::Ready(Some(Err((HttpError::Request(e), d))))
180194
}
181195
},
182196
Poll::Pending => Poll::Pending,
183197
};
184198
}
185-
ResponseState::None => return Poll::Ready(Some(Err(HttpError::Irrecoverable))),
199+
ResponseState::None => return Poll::Ready(Some(Err((HttpError::Irrecoverable, None)))),
186200
ResponseState::Ready(resp) => {
187201
let mut chunk = pin!(resp.chunk());
188202
match chunk.try_poll_unpin(cx) {
@@ -197,7 +211,7 @@ impl<Client: HttpClient> Stream for SeqRequestStream<Client> {
197211
Ok(chunk) => Poll::Ready(Some(Ok(chunk))),
198212
Err(e) => {
199213
self.state = ResponseState::None;
200-
Poll::Ready(Some(Err(HttpError::Chunk(e))))
214+
Poll::Ready(Some(Err((HttpError::Chunk(e), None))))
201215
}
202216
}
203217
}

crates/fast-down/src/reqwest/mod.rs

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,30 +25,27 @@ impl HttpClient for Client {
2525
impl HttpRequestBuilder for RequestBuilder {
2626
type Response = Response;
2727
type RequestError = ReqwestResponseError;
28-
async fn send(self) -> Result<Self::Response, Self::RequestError> {
29-
let res = self.send().await.map_err(ReqwestResponseError::Reqwest)?;
30-
let retry_after = res.headers().get(header::RETRY_AFTER);
31-
if let Some(retry_after) = retry_after
32-
&& let Ok(retry_after) = retry_after.to_str()
33-
{
34-
let retry_after = match retry_after.parse() {
35-
Ok(retry_after) => Some(Duration::from_secs(retry_after)),
36-
Err(_) => match parse_http_date(retry_after) {
37-
Ok(target_time) => target_time.duration_since(SystemTime::now()).ok(),
38-
Err(_) => None,
39-
},
40-
};
41-
if let Some(retry_after) = retry_after
42-
&& retry_after > Duration::ZERO
43-
{
44-
tokio::time::sleep(retry_after).await;
45-
}
46-
}
28+
async fn send(self) -> Result<Self::Response, (Self::RequestError, Option<Duration>)> {
29+
let res = self
30+
.send()
31+
.await
32+
.map_err(|e| (ReqwestResponseError::Reqwest(e), None))?;
4733
let status = res.status();
4834
if status.is_success() {
4935
Ok(res)
5036
} else {
51-
Err(ReqwestResponseError::StatusCode(status))
37+
let retry_after = res
38+
.headers()
39+
.get(header::RETRY_AFTER)
40+
.and_then(|r| r.to_str().ok())
41+
.and_then(|r| match r.parse() {
42+
Ok(r) => Some(Duration::from_secs(r)),
43+
Err(_) => match parse_http_date(r) {
44+
Ok(target_time) => target_time.duration_since(SystemTime::now()).ok(),
45+
Err(_) => None,
46+
},
47+
});
48+
Err((ReqwestResponseError::StatusCode(status), retry_after))
5249
}
5350
}
5451
}
@@ -196,7 +193,7 @@ mod tests {
196193
let url = Url::parse(&format!("{}/404", server.url())).unwrap();
197194
match client.prefetch(url).await {
198195
Ok(info) => unreachable!("404 status code should not success: {info:?}"),
199-
Err(err) => match err {
196+
Err((err, _)) => match err {
200197
HttpError::Request(e) => match e {
201198
ReqwestResponseError::Reqwest(error) => unreachable!("{error:?}"),
202199
ReqwestResponseError::StatusCode(status_code) => {

crates/fast-pull/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "fast-pull"
33
description = "Pull everything fast"
4-
version = "3.2.3"
4+
version = "3.3.0"
55
readme = "README.md"
66
documentation = "https://docs.rs/fast-pull"
77
keywords = ["performance", "multiplex", "fast", "parallel", "concurrency"]
Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
use crate::ProgressEntry;
22
use bytes::Bytes;
3+
use core::time::Duration;
34
use futures::TryStream;
45

56
pub trait RandPuller: Send + Clone {
67
type Error: Send;
78
fn pull(
89
&mut self,
910
range: &ProgressEntry,
10-
) -> impl TryStream<Ok = Bytes, Error = Self::Error> + Send + Unpin;
11+
) -> impl TryStream<Ok = Bytes, Error = (Self::Error, Option<Duration>)> + Send + Unpin;
1112
}
1213

1314
pub trait SeqPuller: Send {
1415
type Error: Send;
15-
fn pull(&mut self) -> impl TryStream<Ok = Bytes, Error = Self::Error> + Send + Unpin;
16+
fn pull(
17+
&mut self,
18+
) -> impl TryStream<Ok = Bytes, Error = (Self::Error, Option<Duration>)> + Send + Unpin;
1619
}

crates/fast-pull/src/core/mock.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
extern crate alloc;
2+
use core::time::Duration;
3+
24
use crate::{ProgressEntry, RandPuller, SeqPuller};
35
use alloc::{sync::Arc, vec::Vec};
46
use bytes::Bytes;
@@ -20,14 +22,16 @@ impl RandPuller for MockPuller {
2022
fn pull(
2123
&mut self,
2224
range: &ProgressEntry,
23-
) -> impl TryStream<Ok = Bytes, Error = Self::Error> + Send + Unpin {
25+
) -> impl TryStream<Ok = Bytes, Error = (Self::Error, Option<Duration>)> + Send + Unpin {
2426
let data = &self.0[range.start as usize..range.end as usize];
2527
stream::iter(data.iter().map(|e| Ok(Bytes::from_iter([*e]))))
2628
}
2729
}
2830
impl SeqPuller for MockPuller {
2931
type Error = ();
30-
fn pull(&mut self) -> impl TryStream<Ok = Bytes, Error = Self::Error> + Send + Unpin {
32+
fn pull(
33+
&mut self,
34+
) -> impl TryStream<Ok = Bytes, Error = (Self::Error, Option<Duration>)> + Send + Unpin {
3135
stream::iter(self.0.iter().map(|e| Ok(Bytes::from_iter([*e]))))
3236
}
3337
}

0 commit comments

Comments
 (0)