Skip to content

Commit 9944cdf

Browse files
committed
feat(fast-down/curl): fix flow control
1 parent b5616bf commit 9944cdf

File tree

4 files changed

+280
-26
lines changed

4 files changed

+280
-26
lines changed

Cargo.lock

Lines changed: 68 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/fast-down/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ kanal.workspace = true
2626
slab = "0.4.11"
2727
oneshot = "0.1.11"
2828
anyhow = "1.0.99"
29+
atomic-wait = { version = "1.1.0", optional = true }
2930

3031
[[bin]]
3132
name = "libcurl-main"
@@ -34,7 +35,7 @@ path = "curl.rs"
3435

3536
[features]
3637
reqwest = ["dep:reqwest"]
37-
curl = ["dep:curl"]
38+
curl = ["dep:curl", "dep:atomic-wait"]
3839

3940
[dev-dependencies]
4041
tokio = { version = "1.47.1", features = [

crates/fast-down/curl.rs

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
use fast_down::curl::worker::{DataSignal, Op, multi, options, SIG_EVENT, STATE_SEND_FAILED};
2+
use kanal::ReceiveError;
3+
use std::sync::Arc;
14
use std::thread;
2-
use fast_down::curl::worker::{multi, Op};
5+
use std::time::Duration;
36

47
pub fn main() {
58
let (tx_ops, rx_ops) = kanal::unbounded();
@@ -8,9 +11,39 @@ pub fn main() {
811
});
912
let (tx_ret, ret) = oneshot::channel();
1013
let (tx_data, rx_data) = kanal::bounded(1);
11-
tx_ops.send(Op::New(tx_data, curl::easy::List::new(), "https://1.1.1.1/cdn-cgi/trace".to_string(), tx_ret)).unwrap();
12-
for data in rx_data.into_iter() {
13-
dbg!(String::from_utf8_lossy(&*data));
14+
let signal: Arc<DataSignal> = Default::default();
15+
tx_ops
16+
.send(Op::New(
17+
tx_data,
18+
options::New {
19+
signal: signal.clone(),
20+
headers: curl::easy::List::new(),
21+
url: "https://github.com/".to_string(),
22+
extra: None,
23+
},
24+
tx_ret,
25+
))
26+
.unwrap();
27+
let th = ret.recv().unwrap();
28+
loop {
29+
if signal.is_send_failed() {
30+
tx_ops.send(Op::UnpauseData(th)).unwrap();
31+
}
32+
33+
match rx_data.try_recv() {
34+
Ok(Some(data)) => {
35+
eprint!("{}", std::str::from_utf8(&data).unwrap());
36+
thread::sleep(Duration::from_millis(100));
37+
}
38+
Ok(None) => {
39+
if signal.is_send_failed() {
40+
tx_ops.send(Op::UnpauseData(th)).unwrap();
41+
}
42+
atomic_wait::wait(signal.signal(), SIG_EVENT);
43+
}
44+
Err(ReceiveError::SendClosed) => break,
45+
Err(ReceiveError::Closed) => unreachable!(),
46+
}
1447
}
1548
handle.join().unwrap();
16-
}
49+
}

0 commit comments

Comments
 (0)