Skip to content

Commit

Permalink
Improve genesis_chunked_stream method
Browse files Browse the repository at this point in the history
  • Loading branch information
ljoss17 committed Jun 27, 2024
1 parent 56e837d commit 52fbc03
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 5 deletions.
28 changes: 26 additions & 2 deletions rpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub use compat::CompatMode;

#[cfg(any(feature = "http-client", feature = "websocket-client"))]
mod subscription;
use futures::Stream;
#[cfg(any(feature = "http-client", feature = "websocket-client"))]
pub use subscription::{Subscription, SubscriptionClient};

Expand All @@ -25,6 +26,7 @@ pub use transport::websocket::{
pub use transport::mock::{MockClient, MockRequestMatcher, MockRequestMethodMatcher};

use core::fmt;
use core::pin::Pin;

use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
Expand Down Expand Up @@ -271,11 +273,33 @@ pub trait Client {
Ok(self.perform(genesis::Request::default()).await?.genesis)
}

/// `/genesis_chunked`: get genesis file in multiple chunks.
async fn genesis_chunked(&self, chunk: usize) -> Result<genesis_chunked::Response, Error> {
async fn genesis_chunked(&self, chunk: u64) -> Result<genesis_chunked::Response, Error> {
self.perform(genesis_chunked::Request::new(chunk)).await
}

/// `/genesis_chunked`: get genesis file in multiple chunks.
async fn genesis_chunked_stream(
&self,
) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, Error>> + '_>> {
Box::pin(futures::stream::unfold(Some(0), move |chunk| async move {
// Verify if there are more chunks to fetch
let chunk = chunk?;

match self.genesis_chunked(chunk).await {
Ok(response) => {
if response.chunk + 1 >= response.total {
// No more chunks to fetch
Some((Ok(response.data), None))
} else {
// Emit this chunk and fetch the next chunk
Some((Ok(response.data), Some(response.chunk + 1)))
}
},
Err(e) => Some((Err(e), None)), // Abort the stream
}
}))
}

/// `/net_info`: obtain information about P2P and other network connections.
async fn net_info(&self) -> Result<net_info::Response, Error> {
self.perform(net_info::Request).await
Expand Down
6 changes: 3 additions & 3 deletions rpc/src/endpoint/genesis_chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub struct Request {
}

impl Request {
pub fn new(chunk: usize) -> Self {
pub fn new(chunk: u64) -> Self {
Self {
chunk: chunk.to_string(),
}
Expand Down Expand Up @@ -47,9 +47,9 @@ where
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Response {
#[serde(with = "serializers::from_str")]
pub chunk: i32,
pub chunk: u64,
#[serde(with = "serializers::from_str")]
pub total: i32,
pub total: u64,
#[serde(with = "serializers::bytes::base64string")]
pub data: Vec<u8>,
}
Expand Down

0 comments on commit 52fbc03

Please sign in to comment.