Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions crates/rpc/ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ thiserror.workspace = true
futures-util = "0.3.30"
interprocess = { version = "1.2.1", features = ["tokio_support"] }

[target.'cfg(windows)'.dependencies]
windows-sys = { version = "0.52.0", features = ["Win32_Foundation"] }

[dev-dependencies]
tokio-stream = { workspace = true, features = ["sync"] }
reth-tracing.workspace = true
Expand Down
93 changes: 77 additions & 16 deletions crates/rpc/ipc/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,85 @@
//! [`jsonrpsee`] transport adapter implementation for IPC.

use std::{
io,
path::{Path, PathBuf},
};

use crate::stream_codec::StreamCodec;
use futures::StreamExt;
use interprocess::local_socket::tokio::{LocalSocketStream, OwnedReadHalf, OwnedWriteHalf};
use jsonrpsee::{
async_client::{Client, ClientBuilder},
core::client::{TransportReceiverT, TransportSenderT},
core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT},
};
use std::io;
use tokio::io::AsyncWriteExt;
use tokio_util::{
codec::FramedRead,
compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt},
};

#[cfg(unix)]
use crate::client::unix::IpcTransportClientBuilder;
#[cfg(windows)]
use crate::client::win::IpcTransportClientBuilder;
/// Sending end of IPC transport.
#[derive(Debug)]
pub(crate) struct Sender {
inner: Compat<OwnedWriteHalf>,
}

#[async_trait::async_trait]
impl TransportSenderT for Sender {
type Error = IpcError;

/// Sends out a request. Returns a Future that finishes when the request has been successfully
/// sent.
async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
Ok(self.inner.write_all(msg.as_bytes()).await?)
}

async fn send_ping(&mut self) -> Result<(), Self::Error> {
tracing::trace!("send ping - not implemented");
Err(IpcError::NotSupported)
}

/// Close the connection.
async fn close(&mut self) -> Result<(), Self::Error> {
Ok(())
}
}

/// Receiving end of IPC transport.
#[derive(Debug)]
pub(crate) struct Receiver {
pub(crate) inner: FramedRead<Compat<OwnedReadHalf>, StreamCodec>,
}

#[async_trait::async_trait]
impl TransportReceiverT for Receiver {
type Error = IpcError;

/// Returns a Future resolving when the server sent us something back.
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?)))
}
}

/// Builder for IPC transport [`Sender`] and [`Receiver`] pair.
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub(crate) struct IpcTransportClientBuilder;

impl IpcTransportClientBuilder {
pub(crate) async fn build(
self,
endpoint: impl AsRef<str>,
) -> Result<(Sender, Receiver), IpcError> {
let endpoint = endpoint.as_ref().to_string();
let conn = LocalSocketStream::connect(endpoint.clone())
.await
.map_err(|err| IpcError::FailedToConnect { path: endpoint, err })?;

#[cfg(unix)]
mod unix;
#[cfg(windows)]
mod win;
let (rhlf, whlf) = conn.into_split();

Ok((
Sender { inner: whlf.compat_write() },
Receiver { inner: FramedRead::new(rhlf.compat(), StreamCodec::stream_incoming()) },
))
}
}

/// Builder type for [`Client`]
#[derive(Clone, Default, Debug)]
Expand All @@ -37,7 +98,7 @@ impl IpcClientBuilder {
/// # Ok(())
/// # }
/// ```
pub async fn build(self, path: impl AsRef<Path>) -> Result<Client, IpcError> {
pub async fn build(self, path: impl AsRef<str>) -> Result<Client, IpcError> {
let (tx, rx) = IpcTransportClientBuilder::default().build(path).await?;
Ok(self.build_with_tokio(tx, rx))
}
Expand Down Expand Up @@ -66,7 +127,7 @@ pub enum IpcError {
FailedToConnect {
/// The path of the socket.
#[doc(hidden)]
path: PathBuf,
path: String,
/// The error occurred while connecting.
#[doc(hidden)]
err: io::Error,
Expand Down
82 changes: 0 additions & 82 deletions crates/rpc/ipc/src/client/unix.rs

This file was deleted.

82 changes: 0 additions & 82 deletions crates/rpc/ipc/src/client/win.rs

This file was deleted.