Skip to content

Commit 2f052a8

Browse files
authored
feat: replace low level IPC with interprocess (#7922)
1 parent ffa36b7 commit 2f052a8

File tree

5 files changed

+77
-184
lines changed

5 files changed

+77
-184
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/rpc/ipc/Cargo.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@ thiserror.workspace = true
3131
futures-util = "0.3.30"
3232
interprocess = { version = "1.2.1", features = ["tokio_support"] }
3333

34-
[target.'cfg(windows)'.dependencies]
35-
windows-sys = { version = "0.52.0", features = ["Win32_Foundation"] }
36-
3734
[dev-dependencies]
3835
tokio-stream = { workspace = true, features = ["sync"] }
3936
reth-tracing.workspace = true

crates/rpc/ipc/src/client/mod.rs

Lines changed: 77 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,85 @@
11
//! [`jsonrpsee`] transport adapter implementation for IPC.
22
3-
use std::{
4-
io,
5-
path::{Path, PathBuf},
6-
};
7-
3+
use crate::stream_codec::StreamCodec;
4+
use futures::StreamExt;
5+
use interprocess::local_socket::tokio::{LocalSocketStream, OwnedReadHalf, OwnedWriteHalf};
86
use jsonrpsee::{
97
async_client::{Client, ClientBuilder},
10-
core::client::{TransportReceiverT, TransportSenderT},
8+
core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT},
9+
};
10+
use std::io;
11+
use tokio::io::AsyncWriteExt;
12+
use tokio_util::{
13+
codec::FramedRead,
14+
compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt},
1115
};
1216

13-
#[cfg(unix)]
14-
use crate::client::unix::IpcTransportClientBuilder;
15-
#[cfg(windows)]
16-
use crate::client::win::IpcTransportClientBuilder;
17+
/// Sending end of IPC transport.
18+
#[derive(Debug)]
19+
pub(crate) struct Sender {
20+
inner: Compat<OwnedWriteHalf>,
21+
}
22+
23+
#[async_trait::async_trait]
24+
impl TransportSenderT for Sender {
25+
type Error = IpcError;
26+
27+
/// Sends out a request. Returns a Future that finishes when the request has been successfully
28+
/// sent.
29+
async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
30+
Ok(self.inner.write_all(msg.as_bytes()).await?)
31+
}
32+
33+
async fn send_ping(&mut self) -> Result<(), Self::Error> {
34+
tracing::trace!("send ping - not implemented");
35+
Err(IpcError::NotSupported)
36+
}
37+
38+
/// Close the connection.
39+
async fn close(&mut self) -> Result<(), Self::Error> {
40+
Ok(())
41+
}
42+
}
43+
44+
/// Receiving end of IPC transport.
45+
#[derive(Debug)]
46+
pub(crate) struct Receiver {
47+
pub(crate) inner: FramedRead<Compat<OwnedReadHalf>, StreamCodec>,
48+
}
49+
50+
#[async_trait::async_trait]
51+
impl TransportReceiverT for Receiver {
52+
type Error = IpcError;
53+
54+
/// Returns a Future resolving when the server sent us something back.
55+
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
56+
self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?)))
57+
}
58+
}
59+
60+
/// Builder for IPC transport [`Sender`] and [`Receiver`] pair.
61+
#[derive(Debug, Clone, Default)]
62+
#[non_exhaustive]
63+
pub(crate) struct IpcTransportClientBuilder;
64+
65+
impl IpcTransportClientBuilder {
66+
pub(crate) async fn build(
67+
self,
68+
endpoint: impl AsRef<str>,
69+
) -> Result<(Sender, Receiver), IpcError> {
70+
let endpoint = endpoint.as_ref().to_string();
71+
let conn = LocalSocketStream::connect(endpoint.clone())
72+
.await
73+
.map_err(|err| IpcError::FailedToConnect { path: endpoint, err })?;
1774

18-
#[cfg(unix)]
19-
mod unix;
20-
#[cfg(windows)]
21-
mod win;
75+
let (rhlf, whlf) = conn.into_split();
76+
77+
Ok((
78+
Sender { inner: whlf.compat_write() },
79+
Receiver { inner: FramedRead::new(rhlf.compat(), StreamCodec::stream_incoming()) },
80+
))
81+
}
82+
}
2283

2384
/// Builder type for [`Client`]
2485
#[derive(Clone, Default, Debug)]
@@ -37,7 +98,7 @@ impl IpcClientBuilder {
3798
/// # Ok(())
3899
/// # }
39100
/// ```
40-
pub async fn build(self, path: impl AsRef<Path>) -> Result<Client, IpcError> {
101+
pub async fn build(self, path: impl AsRef<str>) -> Result<Client, IpcError> {
41102
let (tx, rx) = IpcTransportClientBuilder::default().build(path).await?;
42103
Ok(self.build_with_tokio(tx, rx))
43104
}
@@ -66,7 +127,7 @@ pub enum IpcError {
66127
FailedToConnect {
67128
/// The path of the socket.
68129
#[doc(hidden)]
69-
path: PathBuf,
130+
path: String,
70131
/// The error occurred while connecting.
71132
#[doc(hidden)]
72133
err: io::Error,

crates/rpc/ipc/src/client/unix.rs

Lines changed: 0 additions & 82 deletions
This file was deleted.

crates/rpc/ipc/src/client/win.rs

Lines changed: 0 additions & 82 deletions
This file was deleted.

0 commit comments

Comments
 (0)