Skip to content

Commit a342ffa

Browse files
committed
Add documentation ot the ...Extra adapter traits.
Also remove const generics and make the read and write fns a bit more "traditional". Makes usage a bit more inconvenient, but 🤷
1 parent d1b23ab commit a342ffa

File tree

3 files changed

+47
-30
lines changed

3 files changed

+47
-30
lines changed

src/api/blobs.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,12 @@ impl Blobs {
437437
ranges: ChunkRanges,
438438
mut reader: R,
439439
) -> RequestResult<R> {
440-
let size = u64::from_le_bytes(reader.recv::<8>().await.map_err(super::Error::other)?);
440+
let mut size = [0; 8];
441+
reader
442+
.recv_exact(&mut size)
443+
.await
444+
.map_err(super::Error::other)?;
445+
let size = u64::from_le_bytes(size);
441446
let Some(size) = NonZeroU64::new(size) else {
442447
return if hash == Hash::EMPTY {
443448
Ok(reader)

src/get.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,8 @@ pub mod fsm {
535535
impl<R: RecvStream> AtBlobHeader<R> {
536536
/// Read the size header, returning it and going into the `Content` state.
537537
pub async fn next(mut self) -> Result<(AtBlobContent<R>, u64), AtBlobHeaderNextError> {
538-
let size = self.reader.recv::<8>().await.map_err(|cause| {
538+
let mut size = [0; 8];
539+
self.reader.recv_exact(&mut size).await.map_err(|cause| {
539540
if cause.kind() == io::ErrorKind::UnexpectedEof {
540541
at_blob_header_next_error::NotFoundSnafu.build()
541542
} else {

src/util/stream.rs

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@ pub trait SendStream: Send {
1717
/// This method is not cancellation safe. Even if this does not resolve, some bytes may have been written when previously polled.
1818
fn send_bytes(&mut self, bytes: Bytes) -> impl Future<Output = io::Result<()>> + Send;
1919
/// Send that sends a fixed sized buffer.
20-
fn send<const L: usize>(
21-
&mut self,
22-
buf: &[u8; L],
23-
) -> impl Future<Output = io::Result<()>> + Send;
20+
fn send(&mut self, buf: &[u8]) -> impl Future<Output = io::Result<()>> + Send;
2421
/// Sync the stream. Not needed for iroh, but needed for intermediate buffered streams such as compression.
2522
fn sync(&mut self) -> impl Future<Output = io::Result<()>> + Send;
2623
/// Reset the stream with the given error code.
@@ -41,8 +38,8 @@ pub trait RecvStream: Send {
4138
///
4239
/// Note that this is different from `recv_bytes`, which will return fewer bytes if the stream ends.
4340
fn recv_bytes_exact(&mut self, len: usize) -> impl Future<Output = io::Result<Bytes>> + Send;
44-
/// Receive exactly `L` bytes from the stream, directly into a `[u8; L]`.
45-
fn recv<const L: usize>(&mut self) -> impl Future<Output = io::Result<[u8; L]>> + Send;
41+
/// Receive exactly `target.len()` bytes from the stream.
42+
fn recv_exact(&mut self, target: &mut [u8]) -> impl Future<Output = io::Result<()>> + Send;
4643
/// Stop the stream with the given error code.
4744
fn stop(&mut self, code: VarInt) -> io::Result<()>;
4845
/// Get the stream id.
@@ -54,7 +51,7 @@ impl SendStream for iroh::endpoint::SendStream {
5451
Ok(self.write_chunk(bytes).await?)
5552
}
5653

57-
async fn send<const L: usize>(&mut self, buf: &[u8; L]) -> io::Result<()> {
54+
async fn send(&mut self, buf: &[u8]) -> io::Result<()> {
5855
Ok(self.write_all(buf).await?)
5956
}
6057

@@ -100,14 +97,12 @@ impl RecvStream for iroh::endpoint::RecvStream {
10097
Ok(buf.into())
10198
}
10299

103-
async fn recv<const L: usize>(&mut self) -> io::Result<[u8; L]> {
104-
let mut buf = [0; L];
105-
self.read_exact(&mut buf).await.map_err(|e| match e {
100+
async fn recv_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
101+
self.read_exact(buf).await.map_err(|e| match e {
106102
ReadExactError::FinishedEarly(0) => io::Error::new(io::ErrorKind::UnexpectedEof, ""),
107103
ReadExactError::FinishedEarly(_) => io::Error::new(io::ErrorKind::InvalidData, ""),
108104
ReadExactError::ReadError(e) => e.into(),
109-
})?;
110-
Ok(buf)
105+
})
111106
}
112107

113108
fn stop(&mut self, code: VarInt) -> io::Result<()> {
@@ -128,8 +123,8 @@ impl<R: RecvStream> RecvStream for &mut R {
128123
self.deref_mut().recv_bytes_exact(len).await
129124
}
130125

131-
async fn recv<const L: usize>(&mut self) -> io::Result<[u8; L]> {
132-
self.deref_mut().recv::<L>().await
126+
async fn recv_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
127+
self.deref_mut().recv_exact(buf).await
133128
}
134129

135130
fn stop(&mut self, code: VarInt) -> io::Result<()> {
@@ -146,7 +141,7 @@ impl<W: SendStream> SendStream for &mut W {
146141
self.deref_mut().send_bytes(bytes).await
147142
}
148143

149-
async fn send<const L: usize>(&mut self, buf: &[u8; L]) -> io::Result<()> {
144+
async fn send(&mut self, buf: &[u8]) -> io::Result<()> {
150145
self.deref_mut().send(buf).await
151146
}
152147

@@ -174,8 +169,15 @@ pub struct AsyncReadRecvStream<R>(R);
174169
/// `AsyncRead + Unpin + Send`, you can implement these additional methods and wrap the result
175170
/// in an `AsyncReadRecvStream` to get a `RecvStream` that reads from the underlying `AsyncRead`.
176171
pub trait AsyncReadRecvStreamExtra: Send {
172+
/// Get a mutable reference to the inner `AsyncRead`.
173+
///
174+
/// Getting a reference is easier than implementing all methods on `AsyncWrite` with forwarders to the inner instance.
177175
fn inner(&mut self) -> &mut (impl AsyncRead + Unpin + Send);
176+
/// Stop the stream with the given error code.
178177
fn stop(&mut self, code: VarInt) -> io::Result<()>;
178+
/// A local unique identifier for the stream.
179+
///
180+
/// This allows distinguishing between streams, but once the stream is closed, the id may be reused.
179181
fn id(&self) -> u64;
180182
}
181183

@@ -209,10 +211,9 @@ impl<R: AsyncReadRecvStreamExtra> RecvStream for AsyncReadRecvStream<R> {
209211
Ok(res.into())
210212
}
211213

212-
async fn recv<const L: usize>(&mut self) -> io::Result<[u8; L]> {
213-
let mut res = [0; L];
214-
self.0.inner().read_exact(&mut res).await?;
215-
Ok(res)
214+
async fn recv_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
215+
self.0.inner().read_exact(buf).await?;
216+
Ok(())
216217
}
217218

218219
fn stop(&mut self, code: VarInt) -> io::Result<()> {
@@ -241,14 +242,13 @@ impl RecvStream for Bytes {
241242
Ok(res)
242243
}
243244

244-
async fn recv<const L: usize>(&mut self) -> io::Result<[u8; L]> {
245-
if self.len() < L {
245+
async fn recv_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
246+
if self.len() < buf.len() {
246247
return Err(io::ErrorKind::UnexpectedEof.into());
247248
}
248-
let mut res = [0; L];
249-
res.copy_from_slice(&self[..L]);
250-
*self = self.slice(L..);
251-
Ok(res)
249+
buf.copy_from_slice(&self[..buf.len()]);
250+
*self = self.slice(buf.len()..);
251+
Ok(())
252252
}
253253

254254
fn stop(&mut self, _code: VarInt) -> io::Result<()> {
@@ -270,9 +270,17 @@ pub struct AsyncWriteSendStream<W>(W);
270270
/// methods and wrap the result in an `AsyncWriteSendStream` to get a `SendStream`
271271
/// that writes to the underlying `AsyncWrite`.
272272
pub trait AsyncWriteSendStreamExtra: Send {
273+
/// Get a mutable reference to the inner `AsyncWrite`.
274+
///
275+
/// Getting a reference is easier than implementing all methods on `AsyncWrite` with forwarders to the inner instance.
273276
fn inner(&mut self) -> &mut (impl AsyncWrite + Unpin + Send);
277+
/// Reset the stream with the given error code.
274278
fn reset(&mut self, code: VarInt) -> io::Result<()>;
279+
/// Wait for the stream to be stopped, returning the optional error code if it was.
275280
fn stopped(&mut self) -> impl Future<Output = io::Result<Option<VarInt>>> + Send;
281+
/// A local unique identifier for the stream.
282+
///
283+
/// This allows distinguishing between streams, but once the stream is closed, the id may be reused.
276284
fn id(&self) -> u64;
277285
}
278286

@@ -293,7 +301,7 @@ impl<W: AsyncWriteSendStreamExtra> SendStream for AsyncWriteSendStream<W> {
293301
self.0.inner().write_all(&bytes).await
294302
}
295303

296-
async fn send<const L: usize>(&mut self, buf: &[u8; L]) -> io::Result<()> {
304+
async fn send(&mut self, buf: &[u8]) -> io::Result<()> {
297305
self.0.inner().write_all(buf).await
298306
}
299307

@@ -335,7 +343,9 @@ impl<R: RecvStream> AsyncStreamReader for RecvStreamAsyncStreamReader<R> {
335343
}
336344

337345
async fn read<const L: usize>(&mut self) -> io::Result<[u8; L]> {
338-
self.0.recv::<L>().await
346+
let mut buf = [0; L];
347+
self.0.recv_exact(&mut buf).await?;
348+
Ok(buf)
339349
}
340350
}
341351

@@ -352,7 +362,8 @@ pub(crate) trait RecvStreamExt: RecvStream {
352362
}
353363

354364
async fn read_u8(&mut self) -> io::Result<u8> {
355-
let buf = self.recv::<1>().await?;
365+
let mut buf = [0; 1];
366+
self.recv_exact(&mut buf).await?;
356367
Ok(buf[0])
357368
}
358369

0 commit comments

Comments
 (0)