Skip to content

Commit b6bef88

Browse files
author
share121
committed
fix: 修复多线程下载的问题
1 parent 7301967 commit b6bef88

File tree

1 file changed

+54
-34
lines changed

1 file changed

+54
-34
lines changed

src/reqwest/reader.rs

Lines changed: 54 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -31,62 +31,82 @@ impl RandReader for ReqwestReader {
3131
url: self.url.clone(),
3232
start: range.start,
3333
end: range.end,
34-
resp: None,
34+
resp: ResponseState::None,
3535
max_retries: 3,
3636
curr_retries: 0,
3737
}
3838
}
3939
}
4040
type ResponseFut = Pin<Box<dyn Future<Output = Result<Response, reqwest::Error>> + Send>>;
41+
enum ResponseState {
42+
Pending(ResponseFut),
43+
Ready(Response),
44+
None,
45+
}
4146
struct ReqwestStream {
4247
client: Client,
4348
url: Url,
4449
start: u64,
4550
end: u64,
46-
resp: Option<ResponseFut>,
51+
resp: ResponseState,
4752
max_retries: usize,
4853
curr_retries: usize,
4954
}
5055
impl Stream for ReqwestStream {
5156
type Item = Result<Bytes, reqwest::Error>;
5257
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
53-
if self.resp.is_none() {
54-
let resp = self
55-
.client
56-
.get(self.url.clone())
57-
.header(
58-
header::RANGE,
59-
format!("bytes={}-{}", self.start, self.end - 1),
60-
)
61-
.send();
62-
self.resp.replace(Box::pin(resp));
63-
}
64-
let resp = self.resp.as_mut().unwrap();
65-
match resp.try_poll_unpin(cx) {
66-
Poll::Ready(Ok(mut resp)) => {
67-
let mut chunk = pin!(resp.chunk());
68-
match chunk.try_poll_unpin(cx) {
69-
Poll::Ready(Ok(Some(chunk))) => {
70-
self.start += chunk.len() as u64;
71-
Poll::Ready(Some(Ok(chunk)))
72-
}
73-
Poll::Ready(Ok(None)) => Poll::Ready(None),
74-
Poll::Ready(Err(e)) => {
75-
self.curr_retries += 1;
76-
if self.curr_retries >= self.max_retries {
77-
self.curr_retries = 0;
78-
self.resp = None;
58+
let chunk_global;
59+
match &mut self.resp {
60+
ResponseState::Pending(resp) => match resp.try_poll_unpin(cx) {
61+
Poll::Ready(resp) => {
62+
match resp {
63+
Ok(resp) => {
64+
self.resp = ResponseState::Ready(resp);
65+
return self.poll_next(cx);
66+
}
67+
Err(e) => {
68+
self.resp = ResponseState::None;
69+
return Poll::Ready(Some(Err(e)));
7970
}
80-
Poll::Ready(Some(Err(e)))
81-
}
82-
Poll::Pending => Poll::Pending,
71+
};
8372
}
73+
Poll::Pending => return Poll::Pending,
74+
},
75+
ResponseState::None => {
76+
let resp = self
77+
.client
78+
.get(self.url.clone())
79+
.header(
80+
header::RANGE,
81+
format!("bytes={}-{}", self.start, self.end - 1),
82+
)
83+
.send();
84+
self.resp = ResponseState::Pending(Box::pin(resp));
85+
return self.poll_next(cx);
8486
}
85-
Poll::Ready(Err(e)) => {
86-
self.resp = None;
87+
ResponseState::Ready(resp) => {
88+
let mut chunk = pin!(resp.chunk());
89+
match chunk.try_poll_unpin(cx) {
90+
Poll::Ready(Ok(Some(chunk))) => chunk_global = Ok(chunk),
91+
Poll::Ready(Ok(None)) => return Poll::Ready(None),
92+
Poll::Ready(Err(e)) => chunk_global = Err(e),
93+
Poll::Pending => return Poll::Pending,
94+
};
95+
}
96+
};
97+
match chunk_global {
98+
Ok(chunk) => {
99+
self.start += chunk.len() as u64;
100+
Poll::Ready(Some(Ok(chunk)))
101+
}
102+
Err(e) => {
103+
self.curr_retries += 1;
104+
if self.curr_retries >= self.max_retries {
105+
self.curr_retries = 0;
106+
self.resp = ResponseState::None;
107+
}
87108
Poll::Ready(Some(Err(e)))
88109
}
89-
Poll::Pending => Poll::Pending,
90110
}
91111
}
92112
}

0 commit comments

Comments
 (0)