Skip to content

Commit c811434

Browse files
committed
Add example how to add compression to the entire blobs protocol.
1 parent 4e8387a commit c811434

File tree

6 files changed

+331
-10
lines changed

6 files changed

+331
-10
lines changed

Cargo.lock

Lines changed: 97 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ iroh-base = "0.91.1"
4444
reflink-copy = "0.1.24"
4545
irpc = { version = "0.7.0", features = ["rpc", "quinn_endpoint_setup", "spans", "stream", "derive"], default-features = false }
4646
iroh-metrics = { version = "0.35" }
47+
async-compression = { version = "0.4.30", features = ["lz4", "tokio"] }
4748

4849
[dev-dependencies]
4950
clap = { version = "4.5.31", features = ["derive"] }
@@ -60,6 +61,7 @@ tracing-test = "0.2.5"
6061
walkdir = "2.5.0"
6162
atomic_refcell = "0.1.13"
6263
iroh = { version = "0.91.1", features = ["discovery-local-network"]}
64+
async-compression = { version = "0.4.30", features = ["zstd", "tokio"] }
6365

6466
[features]
6567
hide-proto-docs = []
@@ -68,4 +70,4 @@ default = ["hide-proto-docs"]
6870

6971
[patch.crates-io]
7072
iroh = { git = "https://github.com/n0-computer/iroh", branch = "main" }
71-
iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" }
73+
iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" }

examples/compression.rs

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/// Example how to limit blob requests by hash and node id, and to add
2+
/// throttling or limiting the maximum number of connections.
3+
///
4+
/// Limiting is done via a fn that returns an EventSender and internally
5+
/// makes liberal use of spawn to spawn background tasks.
6+
///
7+
/// This is fine, since the tasks will terminate as soon as the [BlobsProtocol]
8+
/// instance holding the [EventSender] will be dropped. But for production
9+
/// grade code you might nevertheless put the tasks into a [tokio::task::JoinSet] or
10+
/// [n0_future::FuturesUnordered].
11+
mod common;
12+
use std::{path::PathBuf, time::Instant};
13+
14+
use anyhow::Result;
15+
use async_compression::tokio::{bufread::Lz4Decoder, write::Lz4Encoder};
16+
use bao_tree::blake3;
17+
use clap::Parser;
18+
use common::setup_logging;
19+
use iroh::protocol::ProtocolHandler;
20+
use iroh_blobs::{
21+
api::Store,
22+
get::fsm::{AtConnected, ConnectedNext, EndBlobNext},
23+
protocol::{ChunkRangesSeq, GetRequest, Request},
24+
provider::{
25+
events::{ClientConnected, EventSender, HasErrorCode},
26+
handle_get, ErrorHandler, StreamPair,
27+
},
28+
store::mem::MemStore,
29+
ticket::BlobTicket,
30+
};
31+
use iroh_io::{TokioStreamReader, TokioStreamWriter};
32+
use tokio::io::BufReader;
33+
use tracing::debug;
34+
35+
use crate::common::get_or_generate_secret_key;
36+
37+
#[derive(Debug, Parser)]
38+
#[command(version, about)]
39+
pub enum Args {
40+
/// Limit requests by node id
41+
Provide {
42+
/// Path for files to add.
43+
path: PathBuf,
44+
},
45+
/// Get a blob. Just for completeness sake.
46+
Get {
47+
/// Ticket for the blob to download
48+
ticket: BlobTicket,
49+
/// Path to save the blob to
50+
#[clap(long)]
51+
target: Option<PathBuf>,
52+
},
53+
}
54+
55+
type CompressedWriter =
56+
TokioStreamWriter<async_compression::tokio::write::Lz4Encoder<iroh::endpoint::SendStream>>;
57+
type CompressedReader = TokioStreamReader<
58+
async_compression::tokio::bufread::Lz4Decoder<BufReader<iroh::endpoint::RecvStream>>,
59+
>;
60+
61+
#[derive(Debug, Clone)]
62+
struct CompressedBlobsProtocol {
63+
store: Store,
64+
events: EventSender,
65+
}
66+
67+
impl CompressedBlobsProtocol {
68+
fn new(store: &Store, events: EventSender) -> Self {
69+
Self {
70+
store: store.clone(),
71+
events,
72+
}
73+
}
74+
}
75+
76+
struct CompressedErrorHandler;
77+
78+
impl ErrorHandler for CompressedErrorHandler {
79+
type W = CompressedWriter;
80+
81+
type R = CompressedReader;
82+
83+
async fn stop(reader: &mut Self::R, code: quinn::VarInt) {
84+
reader.0.get_mut().get_mut().stop(code).ok();
85+
}
86+
87+
async fn reset(writer: &mut Self::W, code: quinn::VarInt) {
88+
writer.0.get_mut().reset(code).ok();
89+
}
90+
}
91+
92+
impl ProtocolHandler for CompressedBlobsProtocol {
93+
async fn accept(
94+
&self,
95+
connection: iroh::endpoint::Connection,
96+
) -> std::result::Result<(), iroh::protocol::AcceptError> {
97+
let connection_id = connection.stable_id() as u64;
98+
let node_id = connection.remote_node_id()?;
99+
if let Err(cause) = self
100+
.events
101+
.client_connected(|| ClientConnected {
102+
connection_id,
103+
node_id,
104+
})
105+
.await
106+
{
107+
connection.close(cause.code(), cause.reason());
108+
debug!("closing connection: {cause}");
109+
return Ok(());
110+
}
111+
while let Ok((send, recv)) = connection.accept_bi().await {
112+
println!("Accepted new stream");
113+
let stream_id = send.id().index();
114+
let send = TokioStreamWriter(Lz4Encoder::new(send));
115+
let recv = TokioStreamReader(Lz4Decoder::new(BufReader::new(recv)));
116+
let store = self.store.clone();
117+
let mut pair =
118+
StreamPair::new(connection_id, stream_id, recv, send, self.events.clone());
119+
tokio::spawn(async move {
120+
println!("Handling stream");
121+
let request = pair.read_request().await?;
122+
println!("Received request: {request:?}");
123+
if let Request::Get(request) = request {
124+
handle_get::<CompressedErrorHandler>(pair, store, request).await?;
125+
}
126+
anyhow::Ok(())
127+
});
128+
}
129+
Ok(())
130+
}
131+
}
132+
133+
const ALPN: &[u8] = b"iroh-blobs-compressed/0.1.0";
134+
135+
#[tokio::main]
136+
async fn main() -> Result<()> {
137+
setup_logging();
138+
let args = Args::parse();
139+
let secret = get_or_generate_secret_key()?;
140+
let endpoint = iroh::Endpoint::builder()
141+
.secret_key(secret)
142+
.discovery_n0()
143+
.bind()
144+
.await?;
145+
match args {
146+
Args::Provide { path } => {
147+
let store = MemStore::new();
148+
let tag = store.add_path(path).await?;
149+
let blobs = CompressedBlobsProtocol::new(&store, EventSender::DEFAULT);
150+
let router = iroh::protocol::Router::builder(endpoint.clone())
151+
.accept(ALPN, blobs)
152+
.spawn();
153+
let ticket = BlobTicket::new(endpoint.node_id().into(), tag.hash, tag.format);
154+
println!("Serving blob with hash {}", tag.hash);
155+
println!("Ticket: {ticket}");
156+
println!("Node is running. Press Ctrl-C to exit.");
157+
tokio::signal::ctrl_c().await?;
158+
println!("Shutting down.");
159+
router.shutdown().await?;
160+
}
161+
Args::Get { ticket, target } => {
162+
let conn = endpoint.connect(ticket.node_addr().clone(), ALPN).await?;
163+
let (send, recv) = conn.open_bi().await?;
164+
let send = TokioStreamWriter(Lz4Encoder::new(send));
165+
let recv = TokioStreamReader(Lz4Decoder::new(BufReader::new(recv)));
166+
let request = GetRequest {
167+
hash: ticket.hash(),
168+
ranges: ChunkRangesSeq::root(),
169+
};
170+
let connected =
171+
AtConnected::new(Instant::now(), recv, send, request, Default::default());
172+
let ConnectedNext::StartRoot(start) = connected.next().await? else {
173+
unreachable!("expected start root");
174+
};
175+
let (end, data) = start.next().concatenate_into_vec().await?;
176+
let EndBlobNext::Closing(closing) = end.next() else {
177+
unreachable!("expected closing");
178+
};
179+
let stats = closing.next().await?;
180+
if let Some(target) = target {
181+
tokio::fs::write(&target, &data).await?;
182+
println!(
183+
"Wrote {} bytes to {}",
184+
stats.payload_bytes_read,
185+
target.display()
186+
);
187+
} else {
188+
let hash = blake3::hash(&data);
189+
println!("Hash: {hash}");
190+
}
191+
}
192+
}
193+
Ok(())
194+
}

0 commit comments

Comments
 (0)