Skip to content

Commit

Permalink
inital commit
Browse files Browse the repository at this point in the history
  • Loading branch information
ReactorScram committed Jan 29, 2024
1 parent c6d2b51 commit 0598172
Show file tree
Hide file tree
Showing 8 changed files with 1,076 additions and 2 deletions.
37 changes: 37 additions & 0 deletions .github/workflows/_rust.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name: Rust

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

env:
CARGO_TERM_COLOR: always

jobs:
test:
strategy:
fail-fast: false
matrix:
runs-on:
- windows-2019
- windows-2022
runs-on: ${{ matrix.runs-on }}

steps:
- uses: actions/checkout@v3
- name: cargo doc
env: RUSTDOCFLAGS: "-D warnings"
run: cargo doc --all-features --no-deps --document-private-items
- name: cargo fmt
run: cargo fmt -- --check
- name: cargo clippy
run: cargo clippy --all-targets --all-features -- -D warnings
- name: Single-process test
env: RUST_LOG=debug
run: cargo test
- name: Multi-process test
env: RUST_LOG=debug
run: cargo run

31 changes: 31 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "subzone"
version = "0.1.0"
keywords = ["command", "process", "subprocess", "worker"]
description = "Worker subprocesses with async IPC for Windows"
edition = "2021"

[dependencies]
anyhow = { version = "1.0" }
clap = { version = "4.4", features = ["derive", "env"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = { version = "1.0", default-features = false }
tokio = { version = "1.33.0", features = ["io-util", "net", "process", "rt-multi-thread", "sync", "time"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
uuid = { version = "1.7.0", features = ["v4"] }

[target.'cfg(windows)'.dependencies.windows]
version = "0.52.0"
features = [
# Needed for `CreateJobObjectA`
"Win32_Foundation",
# Needed for `CreateJobObjectA`
"Win32_Security",
# Needed for Windows to automatically kill child processes if the main process crashes
"Win32_System_JobObjects",
# Needed to check process ID of named pipe clients
"Win32_System_Pipes",
"Win32_System_Threading",
]
13 changes: 11 additions & 2 deletions README.md
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
# subzone
IPC for Windows
# Testing

```bash
cargo test && cargo run && echo good
```

# Git history

This repo split off from <https://github.com/firezone/firezone> at 634a5439b54e459d4d0109b2b1f64c983abac139

See comment <https://github.com/firezone/firezone/pull/3414#issuecomment-1915123675>
89 changes: 89 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use anyhow::Result;
use serde::{de::DeserializeOwned, Serialize};
use std::marker::PhantomData;
use tokio::{
io::AsyncWriteExt,
net::windows::named_pipe::{self, NamedPipeClient},
sync::mpsc,
};

use crate::{read_deserialize, write_serialize, Error, ManagerMsgInternal, WorkerMsgInternal};

/// A client that's connected to a server
///
/// Manual testing shows that if the corresponding Server's process crashes, Windows will
/// be nice and return errors for anything trying to read from the Client
pub struct Client<M, W> {
pipe_writer: tokio::io::WriteHalf<NamedPipeClient>,
/// Needed to make `next` cancel-safe
read_rx: mpsc::Receiver<Vec<u8>>,
/// Needed to make `next` cancel-safe
reader_task: tokio::task::JoinHandle<Result<()>>,
_manager_msg: PhantomData<M>,
_worker_msg: PhantomData<W>,
}

impl<M: DeserializeOwned, W: Serialize> Client<M, W> {
/// Creates a `Client` and echoes the security cookie back to the `Server`
///
/// Doesn't block, fails instantly if the server isn't up.
pub async fn new(server_id: &str) -> Result<Self> {
let mut client = Client::new_unsecured(server_id)?;
let mut cookie = String::new();
std::io::stdin().read_line(&mut cookie)?;
let cookie = WorkerMsgInternal::Cookie(cookie.trim().to_string());
client.send_internal(&cookie).await?;
Ok(client)
}

/// Creates a `Client`. Requires a Tokio context
///
/// Doesn't block, will fail instantly if the server isn't ready
#[tracing::instrument(skip_all)]
pub(crate) fn new_unsecured(server_id: &str) -> Result<Self> {
let pipe = named_pipe::ClientOptions::new().open(server_id)?;
let (mut pipe_reader, pipe_writer) = tokio::io::split(pipe);
let (read_tx, read_rx) = mpsc::channel(1);
let reader_task = tokio::spawn(async move {
loop {
let msg = read_deserialize(&mut pipe_reader).await?;
read_tx.send(msg).await?;
}
});

Ok(Self {
pipe_writer,
read_rx,
reader_task,
_manager_msg: Default::default(),
_worker_msg: Default::default(),
})
}

pub async fn close(mut self) -> Result<()> {
self.pipe_writer.shutdown().await?;
self.reader_task.abort();
tracing::debug!("Client closing gracefully");
Ok(())
}

/// Receives a message from the server
///
/// # Cancel safety
///
/// This method is cancel-safe, internally it calls `tokio::sync::mpsc::Receiver::recv`
pub async fn next(&mut self) -> Result<ManagerMsgInternal<M>, Error> {
let buf = self.read_rx.recv().await.ok_or_else(|| Error::Eof)?;
let buf = std::str::from_utf8(&buf)?;
let msg = serde_json::from_str(buf)?;
Ok(msg)
}

pub async fn send(&mut self, msg: W) -> Result<(), Error> {
self.send_internal(&WorkerMsgInternal::User(msg)).await
}

async fn send_internal(&mut self, msg: &WorkerMsgInternal<W>) -> Result<(), Error> {
write_serialize(&mut self.pipe_writer, msg).await
}
}
Loading

0 comments on commit 0598172

Please sign in to comment.