Skip to content

Commit ee9c5cf

Browse files
author
share121
committed
feat: 成功抽象出 http 模块
1 parent ed31c6c commit ee9c5cf

File tree

3 files changed

+94
-79
lines changed

3 files changed

+94
-79
lines changed

crates/fast-down/src/http/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,5 @@ pub enum HttpError<Client: HttpClient> {
4242
Request(GetRequestError<Client>),
4343
Chunk(GetChunkError<Client>),
4444
GetHeader(GetHeaderError<Client>),
45+
Irrecoverable,
4546
}

crates/fast-down/src/http/puller.rs

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use bytes::Bytes;
55
use fast_pull::{ProgressEntry, RandPuller, SeqPuller};
66
use futures::{Stream, TryFutureExt, TryStream};
77
use std::{
8-
marker::PhantomData,
98
pin::{Pin, pin},
109
task::{Context, Poll},
1110
};
@@ -22,6 +21,14 @@ impl<Client: HttpClient> HttpPuller<Client> {
2221
}
2322
}
2423

24+
type ResponseFut<Client> =
25+
Pin<Box<dyn Future<Output = Result<GetResponse<Client>, GetRequestError<Client>>> + Send>>;
26+
enum ResponseState<Client: HttpClient> {
27+
Pending(ResponseFut<Client>),
28+
Ready(GetResponse<Client>),
29+
None,
30+
}
31+
2532
impl<Client: HttpClient + 'static> RandPuller for HttpPuller<Client> {
2633
type Error = HttpError<Client>;
2734
fn pull(
@@ -37,13 +44,6 @@ impl<Client: HttpClient + 'static> RandPuller for HttpPuller<Client> {
3744
}
3845
}
3946
}
40-
type ResponseFut<Client> =
41-
Pin<Box<dyn Future<Output = Result<GetResponse<Client>, GetRequestError<Client>>> + Send>>;
42-
enum ResponseState<Client: HttpClient> {
43-
Pending(ResponseFut<Client>),
44-
Ready(GetResponse<Client>),
45-
None,
46-
}
4747
struct RandRequestStream<Client: HttpClient + 'static> {
4848
client: Client,
4949
url: Url,
@@ -107,38 +107,50 @@ impl<Client: HttpClient + 'static> SeqPuller for HttpPuller<Client> {
107107
fn pull(&mut self) -> impl TryStream<Ok = Bytes, Error = Self::Error> + Send + Unpin {
108108
let req = self.client.get(self.url.clone(), None).send();
109109
SeqRequestStream {
110-
resp: Box::pin(req),
111-
_client: PhantomData,
110+
state: ResponseState::Pending(Box::pin(req)),
112111
}
113112
}
114113
}
115-
struct SeqRequestStream<Client: HttpClient + 'static, Fut>
116-
where
117-
Fut: Future<Output = Result<GetResponse<Client>, GetRequestError<Client>>> + Unpin,
118-
{
119-
_client: PhantomData<Client>,
120-
resp: Fut,
114+
struct SeqRequestStream<Client: HttpClient + 'static> {
115+
state: ResponseState<Client>,
121116
}
122-
impl<Client: HttpClient, Fut> Stream for SeqRequestStream<Client, Fut>
123-
where
124-
Fut: Future<Output = Result<GetResponse<Client>, GetRequestError<Client>>> + Unpin,
125-
{
117+
impl<Client: HttpClient> Stream for SeqRequestStream<Client> {
126118
type Item = Result<Bytes, HttpError<Client>>;
127119
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
128-
let mut response;
129-
match self.resp.try_poll_unpin(cx) {
130-
Poll::Ready(resp) => match resp {
131-
Ok(resp) => response = resp,
132-
Err(e) => return Poll::Ready(Some(Err(HttpError::Request(e)))),
133-
},
134-
Poll::Pending => return Poll::Pending,
120+
let chunk_global;
121+
match &mut self.state {
122+
ResponseState::Pending(resp) => {
123+
return match resp.try_poll_unpin(cx) {
124+
Poll::Ready(resp) => match resp {
125+
Ok(resp) => {
126+
self.state = ResponseState::Ready(resp);
127+
self.poll_next(cx)
128+
}
129+
Err(e) => {
130+
self.state = ResponseState::None;
131+
Poll::Ready(Some(Err(HttpError::Request(e))))
132+
}
133+
},
134+
Poll::Pending => Poll::Pending,
135+
};
136+
}
137+
ResponseState::None => return Poll::Ready(Some(Err(HttpError::Irrecoverable))),
138+
ResponseState::Ready(resp) => {
139+
let mut chunk = pin!(resp.chunk());
140+
match chunk.try_poll_unpin(cx) {
141+
Poll::Ready(Ok(Some(chunk))) => chunk_global = Ok(chunk),
142+
Poll::Ready(Ok(None)) => return Poll::Ready(None),
143+
Poll::Ready(Err(e)) => chunk_global = Err(e),
144+
Poll::Pending => return Poll::Pending,
145+
};
146+
}
135147
};
136-
let mut chunk = pin!(response.chunk());
137-
match chunk.try_poll_unpin(cx) {
138-
Poll::Ready(Ok(Some(chunk))) => Poll::Ready(Some(Ok(chunk))),
139-
Poll::Ready(Ok(None)) => Poll::Ready(None),
140-
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(HttpError::Chunk(e)))),
141-
Poll::Pending => Poll::Pending,
148+
match chunk_global {
149+
Ok(chunk) => Poll::Ready(Some(Ok(chunk))),
150+
Err(e) => {
151+
self.state = ResponseState::None;
152+
Poll::Ready(Some(Err(HttpError::Chunk(e))))
153+
}
142154
}
143155
}
144156
}

crates/fast-down/src/reqwest/mod.rs

Lines changed: 48 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ mod tests {
7777
mem::MemPusher,
7878
mock::build_mock_data,
7979
multi::{self, download_multi},
80+
single::{self, download_single},
8081
};
8182
use reqwest::{Client, StatusCode};
8283
use std::{num::NonZero, time::Duration};
@@ -164,6 +165,7 @@ mod tests {
164165
HttpError::Request(e) => assert_eq!(e.status(), Some(StatusCode::NOT_FOUND)),
165166
HttpError::Chunk(_) => unreachable!(),
166167
HttpError::GetHeader(_) => unreachable!(),
168+
HttpError::Irrecoverable => unreachable!(),
167169
},
168170
}
169171
mock1.assert_async().await;
@@ -244,52 +246,52 @@ mod tests {
244246
assert_eq!(&**pusher.receive.lock(), mock_data);
245247
}
246248

247-
// #[tokio::test]
248-
// async fn test_sequential_download() {
249-
// let mock_data = build_mock_data(300 * 1024 * 1024);
250-
// let mut server = mockito::Server::new_async().await;
251-
// let _mock = server
252-
// .mock("GET", "/sequential")
253-
// .with_status(200)
254-
// .with_body(mock_data.clone())
255-
// .create_async()
256-
// .await;
257-
// let puller = HttpPuller::new(
258-
// format!("{}/sequential", server.url()).parse().unwrap(),
259-
// Client::new(),
260-
// );
261-
// let pusher = MemPusher::with_capacity(mock_data.len());
262-
// #[allow(clippy::single_range_in_vec_init)]
263-
// let download_chunks = vec![0..mock_data.len() as u64];
264-
// let result = download_single(
265-
// puller,
266-
// pusher.clone(),
267-
// single::DownloadOptions {
268-
// retry_gap: Duration::from_secs(1),
269-
// push_queue_cap: 1024,
270-
// },
271-
// )
272-
// .await;
249+
#[tokio::test]
250+
async fn test_sequential_download() {
251+
let mock_data = build_mock_data(300 * 1024 * 1024);
252+
let mut server = mockito::Server::new_async().await;
253+
let _mock = server
254+
.mock("GET", "/sequential")
255+
.with_status(200)
256+
.with_body(mock_data.clone())
257+
.create_async()
258+
.await;
259+
let puller = HttpPuller::new(
260+
format!("{}/sequential", server.url()).parse().unwrap(),
261+
Client::new(),
262+
);
263+
let pusher = MemPusher::with_capacity(mock_data.len());
264+
#[allow(clippy::single_range_in_vec_init)]
265+
let download_chunks = vec![0..mock_data.len() as u64];
266+
let result = download_single(
267+
puller,
268+
pusher.clone(),
269+
single::DownloadOptions {
270+
retry_gap: Duration::from_secs(1),
271+
push_queue_cap: 1024,
272+
},
273+
)
274+
.await;
273275

274-
// let mut pull_progress: Vec<ProgressEntry> = Vec::new();
275-
// let mut push_progress: Vec<ProgressEntry> = Vec::new();
276-
// while let Ok(e) = result.event_chain.recv().await {
277-
// match e {
278-
// Event::PullProgress(_, p) => {
279-
// pull_progress.merge_progress(p);
280-
// }
281-
// Event::PushProgress(_, p) => {
282-
// push_progress.merge_progress(p);
283-
// }
284-
// _ => {}
285-
// }
286-
// }
287-
// dbg!(&pull_progress);
288-
// dbg!(&push_progress);
289-
// assert_eq!(pull_progress, download_chunks);
290-
// assert_eq!(push_progress, download_chunks);
276+
let mut pull_progress: Vec<ProgressEntry> = Vec::new();
277+
let mut push_progress: Vec<ProgressEntry> = Vec::new();
278+
while let Ok(e) = result.event_chain.recv().await {
279+
match e {
280+
Event::PullProgress(_, p) => {
281+
pull_progress.merge_progress(p);
282+
}
283+
Event::PushProgress(_, p) => {
284+
push_progress.merge_progress(p);
285+
}
286+
_ => {}
287+
}
288+
}
289+
dbg!(&pull_progress);
290+
dbg!(&push_progress);
291+
assert_eq!(pull_progress, download_chunks);
292+
assert_eq!(push_progress, download_chunks);
291293

292-
// result.join().await.unwrap();
293-
// assert_eq!(&**pusher.receive.lock(), mock_data);
294-
// }
294+
result.join().await.unwrap();
295+
assert_eq!(&**pusher.receive.lock(), mock_data);
296+
}
295297
}

0 commit comments

Comments
 (0)