From 698a8a2517cfac27d026efb5bd8af920b99bdd4b Mon Sep 17 00:00:00 2001 From: Fabrice Desclaux Date: Wed, 1 Mar 2023 07:27:30 +0100 Subject: [PATCH] Update control api Signed-off-by: Fabrice Desclaux --- Cargo.lock | 1 + sanzu/Cargo.toml | 1 + sanzu/src/lib.rs | 1 - sanzu/src/server.rs | 115 ++++++++++++++++++++++++++++++++++++-------- 4 files changed, 96 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d925e86..d96c64b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2306,6 +2306,7 @@ dependencies = [ "sanzu-common", "serde", "serde_derive", + "serde_json", "spin_sleep", "toml 0.7.0", "vsock", diff --git a/sanzu/Cargo.toml b/sanzu/Cargo.toml index 69bd0ce..a21d76f 100644 --- a/sanzu/Cargo.toml +++ b/sanzu/Cargo.toml @@ -35,6 +35,7 @@ rustls-pemfile = "1.0" sanzu-common = { path="../sanzu-common", default-features = false} serde = { version = "1.0", features = ["derive"] } serde_derive = "1.0" +serde_json = "1.0" spin_sleep = "1.1" toml = "0.7" dbus = { version = "0.9", optional = true } diff --git a/sanzu/src/lib.rs b/sanzu/src/lib.rs index fefd661..6d96c58 100644 --- a/sanzu/src/lib.rs +++ b/sanzu/src/lib.rs @@ -1,7 +1,6 @@ #[macro_use] extern crate log; -#[cfg(windows)] #[macro_use] extern crate lazy_static; diff --git a/sanzu/src/server.rs b/sanzu/src/server.rs index 6e4289c..2958f67 100644 --- a/sanzu/src/server.rs +++ b/sanzu/src/server.rs @@ -14,15 +14,21 @@ use sanzu_common::{ ReadWrite, Tunnel, }; +use serde::{Deserialize, Serialize}; + use spin_sleep::LoopHelper; use std::{ net::{self, IpAddr, TcpListener}, + sync::Mutex, time::Instant, }; +#[cfg(unix)] +use std::io::{self, BufRead, Write}; + #[cfg(unix)] use std::{ - sync::mpsc::channel, + sync::mpsc::{channel, Sender}, thread::{self}, }; @@ -47,6 +53,21 @@ use crate::server_x11::init_x11rb; #[cfg(windows)] use crate::server_windows::init_win; +#[derive(Debug, Serialize, Deserialize, Clone, Default)] +struct ServerStats { + fps: u64, + frame_time: u64, + grab: u64, + enc: u64, + send: u64, + recv: u64, + size: (u16, u16), +} + +lazy_static! { + static ref SERVER_STATS: Mutex = Mutex::new(ServerStats::default()); +} + /// Tls auth / Kerberos Auth fn auth_client( config_tls: &ConfigTls, @@ -97,6 +118,57 @@ fn auth_client( Ok((tls_conn, username)) } +#[cfg(unix)] +/// Handle control api +/// restart => restart encoder +/// stats => send encoding stats +fn control_api(control_path: &str, control_sender: Sender<()>) { + let pid = std::process::id(); + let control_path = control_path.replace("%PID%", &format!("{pid}")); + // Try to remove path first + let _ = std::fs::remove_file(&control_path); + let listener = std::os::unix::net::UnixListener::bind(&control_path) + .unwrap_or_else(|_| panic!("Cannot bind {:?}", control_path)); + loop { + let (mut client, addr) = listener.accept().expect("Error in UnixListener accept"); + info!("Client {:?}", addr); + let control_sender_cp = control_sender.clone(); + thread::spawn(move || { + let mut command = String::new(); + if let Ok(length) = io::BufReader::new(&mut client).read_line(&mut command) { + info!("Command: {:?} {}", command, length); + match command.trim_end() { + "restart" => { + info!("Restart encoder requested"); + control_sender_cp.send(()).expect("Cannot send control"); + if client.write_all("Ok".as_bytes()).is_err() { + warn!("Cannot send ok"); + } + } + "stats" => { + info!("Stats requested"); + let stats = SERVER_STATS.lock().unwrap().clone(); + + if let Ok(stats_str) = serde_json::to_string(&stats) { + if client.write_all(stats_str.as_bytes()).is_err() { + warn!("Cannot send stat"); + } + } else { + warn!("Cannot generate stats"); + } + } + _ => { + error!("Unknown command"); + if client.write_all("Unknown command".as_bytes()).is_err() { + warn!("Cannot send stat"); + } + } + } + } + }); + } +} + /// Exec main loop /// pub fn run(config: &ConfigServer, arguments: &ArgumentsSrv) -> Result<()> { @@ -374,34 +446,18 @@ pub fn run_server(config: &ConfigServer, arguments: &ArgumentsSrv) -> Result<()> let mut loop_helper = LoopHelper::builder().build_with_target_rate(config.video.max_fps as f64); // limit FPS if possible let mut new_size = None; - let mut cur_size = None; + let (width, height) = server_info.size(); + let mut cur_size = Some((width as u32, height as u32)); // Do socket control #[cfg(unix)] let (control_sender, control_receiver) = channel(); #[cfg(unix)] { - let control_path = config - .video - .control_path - .as_ref() - .map(|path| path.to_owned()); - if let Some(control_path) = control_path { + if let Some(control_path) = config.video.control_path.as_ref().cloned() { info!("Listening on control path {:?}", control_path); thread::spawn(move || { - let pid = std::process::id(); - let control_path = control_path.replace("%PID%", &format!("{pid}")); - // Try to remove path first - let _ = std::fs::remove_file(&control_path); - let listener = std::os::unix::net::UnixListener::bind(&control_path) - .unwrap_or_else(|_| panic!("Cannot bind {:?}", control_path)); - loop { - let (_, addr) = listener.accept().expect("Error in UnixListener accept"); - info!("Client {:?}", addr); - control_sender - .send("test".to_owned()) - .expect("Cannot send control"); - } + control_api(&control_path.clone(), control_sender); }); } } @@ -563,6 +619,23 @@ pub fn run_server(config: &ConfigServer, arguments: &ArgumentsSrv) -> Result<()> ); debug!("{}", msg); msg_stats = msg; + let fps = match frame_time_micro as u64 { + 0 => 0, + micros => 1_000_000 / micros, + }; + let size = match cur_size.as_ref() { + Some((width, height)) => (*width as u16, *height as u16), + None => (0, 0), + }; + *SERVER_STATS.lock().unwrap() = ServerStats { + fps, + frame_time: (time_start - prev_time_start).as_millis() as u64, + grab: (time_grab - time_start).as_millis() as u64, + enc: (time_encode - time_event).as_millis() as u64, + send: (time_send - time_sound).as_millis() as u64, + recv: (time_stop - time_send).as_millis() as u64, + size, + }; prev_time_start = time_start; loop_helper.loop_sleep(); // sleeps to acheive target FPS rate