Skip to content

Commit

Permalink
Change some things around
Browse files Browse the repository at this point in the history
  • Loading branch information
YaLTeR committed Apr 19, 2024
1 parent c462763 commit f852ef3
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 136 deletions.
7 changes: 3 additions & 4 deletions niri-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,11 @@ use std::str::FromStr;
use serde::{Deserialize, Serialize};

mod socket;

pub use socket::{NiriSocket, SOCKET_PATH_ENV};
pub use socket::{Socket, SOCKET_PATH_ENV};

/// Request from client to niri.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum Request {
/// Always responds with an error. (For testing error handling)
ReturnError,
/// Request the version string for the running niri instance.
Version,
/// Request information about connected outputs.
Expand All @@ -23,6 +20,8 @@ pub enum Request {
FocusedWindow,
/// Perform an action.
Action(Action),
/// Respond with an error (for testing error handling).
ReturnError,
}

/// Reply from niri to client.
Expand Down
78 changes: 34 additions & 44 deletions niri-ipc/src/socket.rs
Original file line number Diff line number Diff line change
@@ -1,73 +1,63 @@
use std::io::{self, Write};
//! Helper for blocking communication over the niri socket.
use std::env;
use std::io::{self, Read, Write};
use std::net::Shutdown;
use std::os::unix::net::UnixStream;
use std::path::Path;

use serde_json::de::IoRead;
use serde_json::StreamDeserializer;

use crate::{Reply, Request};

/// Name of the environment variable containing the niri IPC socket path.
pub const SOCKET_PATH_ENV: &str = "NIRI_SOCKET";

/// A client for the niri IPC server.
/// Helper for blocking communication over the niri socket.
///
/// This struct is used to communicate with the niri IPC server. It handles the socket connection
/// and serialization/deserialization of messages.
pub struct NiriSocket {
pub struct Socket {
stream: UnixStream,
responses: StreamDeserializer<'static, IoRead<UnixStream>, Reply>,
}

impl TryFrom<UnixStream> for NiriSocket {
type Error = io::Error;
fn try_from(stream: UnixStream) -> io::Result<Self> {
let responses = serde_json::Deserializer::from_reader(stream.try_clone()?).into_iter();
Ok(Self { stream, responses })
}
}

impl NiriSocket {
/// Connects to the default niri IPC socket
impl Socket {
/// Connects to the default niri IPC socket.
///
/// This is equivalent to calling [Self::connect] with the value of the [SOCKET_PATH_ENV]
/// environment variable.
pub fn new() -> io::Result<Self> {
let socket_path = std::env::var_os(SOCKET_PATH_ENV).ok_or_else(|| {
/// This is equivalent to calling [`Self::connect_to`] with the path taken from the
/// [`SOCKET_PATH_ENV`] environment variable.
pub fn connect() -> io::Result<Self> {
let socket_path = env::var_os(SOCKET_PATH_ENV).ok_or_else(|| {
io::Error::new(
io::ErrorKind::NotFound,
format!("{SOCKET_PATH_ENV} is not set, are you running this within niri?"),
)
})?;
Self::connect(socket_path)
Self::connect_to(socket_path)
}

/// Connect to the socket at the given path
///
/// See also: [UnixStream::connect]
pub fn connect(path: impl AsRef<Path>) -> io::Result<Self> {
Self::try_from(UnixStream::connect(path.as_ref())?)
/// Connects to the niri IPC socket at the given path.
pub fn connect_to(path: impl AsRef<Path>) -> io::Result<Self> {
let stream = UnixStream::connect(path.as_ref())?;
Ok(Self { stream })
}

/// Handle a request to the niri IPC server
/// Sends a request to niri and returns the response.
///
/// Return values:
///
/// # Returns
/// Ok(Ok([Response](crate::Response))) corresponds to a successful response from the running
/// niri instance. Ok(Err([String])) corresponds to an error received from the running niri
/// instance. Err([std::io::Error]) corresponds to an error in the IPC communication.
pub fn send(mut self, request: Request) -> io::Result<Reply> {
/// * `Ok(Ok(response))`: successful [`Response`](crate::Response) from niri
/// * `Ok(Err(message))`: error message from niri
/// * `Err(error)`: error communicating with niri
pub fn send(self, request: Request) -> io::Result<Reply> {
let Self { mut stream } = self;

let mut buf = serde_json::to_vec(&request).unwrap();
writeln!(buf).unwrap();
self.stream.write_all(&buf)?; // .context("error writing IPC request")?;
self.stream.flush()?;
stream.write_all(&buf)?;
stream.shutdown(Shutdown::Write)?;

buf.clear();
stream.read_to_end(&mut buf)?;

if let Some(next) = self.responses.next() {
next.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
} else {
Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"no response from server",
))
}
let reply = serde_json::from_slice(&buf)?;
Ok(reply)
}
}
6 changes: 3 additions & 3 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ pub enum Sub {

#[derive(Subcommand)]
pub enum Msg {
/// Print an error message.
Error,
/// Print the version string of the running niri instance.
/// Print the version of the running niri instance.
Version,
/// List connected outputs.
Outputs,
Expand All @@ -65,4 +63,6 @@ pub enum Msg {
#[command(subcommand)]
action: Action,
},
/// Request an error from the running niri instance.
RequestError,
}
118 changes: 57 additions & 61 deletions src/ipc/client.rs
Original file line number Diff line number Diff line change
@@ -1,102 +1,98 @@
use anyhow::{bail, Context};
use niri_ipc::{LogicalOutput, Mode, NiriSocket, Output, Request, Response};
use anyhow::{anyhow, bail, Context};
use niri_ipc::{LogicalOutput, Mode, Output, Request, Response, Socket};
use serde_json::json;

use crate::cli::Msg;
use crate::utils::version;

pub fn handle_msg(msg: Msg, json: bool) -> anyhow::Result<()> {
let client = NiriSocket::new()
.context("a communication error occured while trying to initialize the socket")?;

// Default SIGPIPE so that our prints don't panic on stdout closing.
unsafe {
libc::signal(libc::SIGPIPE, libc::SIG_DFL);
}

let request = match &msg {
Msg::Error => Request::ReturnError,
Msg::Version => Request::Version,
Msg::Outputs => Request::Outputs,
Msg::FocusedWindow => Request::FocusedWindow,
Msg::Action { action } => Request::Action(action.clone()),
Msg::RequestError => Request::ReturnError,
};

let reply = client
let socket = Socket::connect().context("error connecting to the niri socket")?;

let reply = socket
.send(request)
.context("a communication error occurred while sending request to niri")?;
.context("error communicating with niri")?;

let compositor_version = match reply {
Err(_) if !matches!(msg, Msg::Version) => {
// If we got an error, it might be that the CLI is a different version from the running
// niri instance. Request the running instance version to compare and print a message.
Socket::connect()
.and_then(|socket| socket.send(Request::Version))
.ok()
}
_ => None,
};

let response = match reply {
Ok(r) => r,
Err(err_msg) => {
eprintln!("The compositor returned an error:");
eprintln!();
eprintln!("{err_msg}");
// Default SIGPIPE so that our prints don't panic on stdout closing.
unsafe {
libc::signal(libc::SIGPIPE, libc::SIG_DFL);
}

if matches!(msg, Msg::Version) {
eprintln!();
eprintln!("Note: unable to get the compositor's version.");
eprintln!("Did you forget to restart niri after an update?");
} else {
match NiriSocket::new().and_then(|client| client.send(Request::Version)) {
Ok(Ok(Response::Version(server_version))) => {
let my_version = version();
if my_version != server_version {
eprintln!();
eprintln!("Note: niri msg was invoked with a different version of niri than the running compositor.");
eprintln!("niri msg: {my_version}");
eprintln!("compositor: {server_version}");
eprintln!("Did you forget to restart niri after an update?");
}
}
Ok(Ok(_)) => {
// nonsensical response, do not add confusing context
}
Ok(Err(_)) => {
eprintln!();
eprintln!("Note: unable to get the compositor's version.");
eprintln!("Did you forget to restart niri after an update?");
}
Err(_) => {
// communication error, do not add irrelevant context
}
let response = reply.map_err(|err_msg| {
// Check for CLI-server version mismatch to add helpful context.
match compositor_version {
Some(Ok(Response::Version(compositor_version))) => {
let cli_version = version();
if cli_version != compositor_version {
eprintln!("Running niri compositor has a different version from the niri CLI:");
eprintln!("Compositor version: {compositor_version}");
eprintln!("CLI version: {cli_version}");
eprintln!("Did you forget to restart niri after an update?");
eprintln!();
}
}

return Ok(());
Some(_) => {
eprintln!("Unable to get the running niri compositor version.");
eprintln!("Did you forget to restart niri after an update?");
eprintln!();
}
None => {
// Communication error, or the original request was already a version request.
// Don't add irrelevant context.
}
}
};

anyhow!(err_msg).context("niri returned an error")
})?;

match msg {
Msg::Error => {
Msg::RequestError => {
bail!("unexpected response: expected an error, got {response:?}");
}
Msg::Version => {
let Response::Version(server_version) = response else {
let Response::Version(compositor_version) = response else {
bail!("unexpected response: expected Version, got {response:?}");
};

let cli_version = version();

if json {
println!(
"{}",
json!({
"cli": version(),
"compositor": server_version,
"compositor": compositor_version,
"cli": cli_version,
})
);
return Ok(());
}

let client_version = version();

println!("niri msg is {client_version}");
println!("the compositor is {server_version}");
if client_version != server_version {
eprintln!();
eprintln!("These are different");
if cli_version != compositor_version {
eprintln!("Running niri compositor has a different version from the niri CLI.");
eprintln!("Did you forget to restart niri after an update?");
eprintln!();
}
println!();

println!("Compositor version: {compositor_version}");
println!("CLI version: {cli_version}");
}
Msg::Outputs => {
let Response::Outputs(outputs) = response else {
Expand Down
41 changes: 17 additions & 24 deletions src/ipc/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::io::Write;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
Expand All @@ -8,7 +7,7 @@ use anyhow::Context;
use calloop::io::Async;
use directories::BaseDirs;
use futures_util::io::{AsyncReadExt, BufReader};
use futures_util::{AsyncBufReadExt, AsyncWriteExt, StreamExt};
use futures_util::{AsyncBufReadExt, AsyncWriteExt};
use niri_ipc::{Reply, Request, Response};
use smithay::desktop::Window;
use smithay::reexports::calloop::generic::Generic;
Expand Down Expand Up @@ -108,42 +107,36 @@ fn on_new_ipc_client(state: &mut State, stream: UnixStream) {

async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow::Result<()> {
let (read, mut write) = stream.split();
let mut buf = String::new();

// note that we can't use the stream json deserializer here
// because the stream is asynchronous and the deserializer doesn't support that
// https://github.com/serde-rs/json/issues/575
// Read a single line to allow extensibility in the future to keep reading.
BufReader::new(read)
.read_line(&mut buf)
.await
.context("error reading request")?;

let mut lines = BufReader::new(read).lines();
let request = serde_json::from_str(&buf)
.context("error parsing request")
.map_err(|err| err.to_string());
let requested_error = matches!(request, Ok(Request::ReturnError));

let line = match lines.next().await.unwrap_or(Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Unreachable; BufReader returned None but when the stream ends, the connection should be reset"))) {
Ok(line) => line,
Err(err) if err.kind() == io::ErrorKind::ConnectionReset => return Ok(()),
Err(err) => return Err(err).context("error reading line"),
};

let reply: Reply = serde_json::from_str(&line)
.map_err(|err| format!("error parsing request: {err}"))
.and_then(|req| process(&ctx, req));
let reply = request.and_then(|request| process(&ctx, request));

if let Err(err) = &reply {
warn!("error processing IPC request: {err:?}");
if !requested_error {
warn!("error processing IPC request: {err:?}");
}
}

let mut buf = serde_json::to_vec(&reply).context("error formatting reply")?;
writeln!(buf).unwrap();
let buf = serde_json::to_vec(&reply).context("error formatting reply")?;
write.write_all(&buf).await.context("error writing reply")?;
write.flush().await.context("error flushing reply")?;

// We do not check for more lines at this moment.
// Dropping the stream will reset the connection before we read them.
// For now, a client should not be sending more than one request per connection.

Ok(())
}

fn process(ctx: &ClientCtx, request: Request) -> Reply {
let response = match request {
Request::ReturnError => return Err("client wanted an error".into()),
Request::ReturnError => return Err(String::from("example compositor error")),
Request::Version => Response::Version(version()),
Request::Outputs => {
let ipc_outputs = ctx.ipc_outputs.lock().unwrap().clone();
Expand Down

0 comments on commit f852ef3

Please sign in to comment.