Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/rpc/ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,9 @@ tracing.workspace = true
bytes.workspace = true
thiserror.workspace = true

[target.'cfg(windows)'.dependencies]
windows-sys = {version = "0.52.0", features = ["Win32_Foundation"]}

[dev-dependencies]
tokio-stream = { workspace = true, features = ["sync"] }

73 changes: 58 additions & 15 deletions crates/rpc/ipc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,26 @@ use std::{
io,
path::{Path, PathBuf},
};
use tokio::{io::AsyncWriteExt, net::UnixStream};
use tokio::io::AsyncWriteExt;
use tokio_util::codec::FramedRead;

#[cfg(unix)]
use tokio::net::{
unix::{OwnedReadHalf, OwnedWriteHalf},
UnixStream,
};

#[cfg(windows)]
use {
std::sync::Arc,
tokio::{
net::windows::named_pipe::{ClientOptions, NamedPipeClient},
time,
time::Duration,
},
windows_sys::Win32::Foundation::ERROR_PIPE_BUSY,
};

/// Builder type for [`Client`]
#[derive(Clone, Default, Debug)]
#[non_exhaustive]
Expand All @@ -38,7 +55,10 @@ impl IpcClientBuilder {
/// Sending end of IPC transport.
#[derive(Debug)]
pub struct Sender {
inner: tokio::net::unix::OwnedWriteHalf,
#[cfg(unix)]
inner: OwnedWriteHalf,
#[cfg(windows)]
inner: Arc<NamedPipeClient>,
}

#[async_trait::async_trait]
Expand All @@ -65,7 +85,10 @@ impl TransportSenderT for Sender {
/// Receiving end of IPC transport.
#[derive(Debug)]
pub struct Receiver {
inner: FramedRead<tokio::net::unix::OwnedReadHalf, StreamCodec>,
#[cfg(unix)]
inner: FramedRead<OwnedReadHalf, StreamCodec>,
#[cfg(windows)]
inner: FramedRead<Arc<NamedPipeClient>, StreamCodec>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is great,

what I would like to do instead is:
convert client into a module with
unix.rs
win.rs

and duplicate the types then client/mod.rs reexports based on windows or UNIX

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

#[async_trait::async_trait]
Expand Down Expand Up @@ -96,18 +119,38 @@ impl IpcTransportClientBuilder {
/// # }
/// ```
pub async fn build(self, path: impl AsRef<Path>) -> Result<(Sender, Receiver), IpcError> {
let path = path.as_ref();

let stream = UnixStream::connect(path)
.await
.map_err(|err| IpcError::FailedToConnect { path: path.to_path_buf(), err })?;

let (rhlf, whlf) = stream.into_split();

Ok((
Sender { inner: whlf },
Receiver { inner: FramedRead::new(rhlf, StreamCodec::stream_incoming()) },
))
#[cfg(unix)]
{
let path = path.as_ref();

let stream = UnixStream::connect(path)
.await
.map_err(|err| IpcError::FailedToConnect { path: path.to_path_buf(), err })?;

let (rhlf, whlf) = stream.into_split();

Ok((
Sender { inner: whlf },
Receiver { inner: FramedRead::new(rhlf, StreamCodec::stream_incoming()) },
))
}
#[cfg(windows)]
{
let addr = path.as_ref().as_os_str();
let client = loop {
match ClientOptions::new().open(addr) {
Ok(client) => break client,
Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
Err(e) => return IpcError::FailedToConnect { path: path.to_path_buf(), err: e },
}
time::sleep(Duration::from_mills(50)).await;
};
let client = Arc::new(client);
Ok((
Sender { inner: client.clone() },
Receiver { inner: FramedRead::new(client, StreamCodec::stream_incoming()) },
))
}
}
}

Expand Down
1 change: 0 additions & 1 deletion crates/rpc/ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

#[cfg(unix)]
pub mod client;
pub mod server;

Expand Down