Skip to content

Commit

Permalink
Update control api
Browse files Browse the repository at this point in the history
Signed-off-by: Fabrice Desclaux <[email protected]>
  • Loading branch information
serpilliere committed Mar 15, 2023
1 parent db095c2 commit 698a8a2
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 22 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sanzu/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ rustls-pemfile = "1.0"
sanzu-common = { path="../sanzu-common", default-features = false}
serde = { version = "1.0", features = ["derive"] }
serde_derive = "1.0"
serde_json = "1.0"
spin_sleep = "1.1"
toml = "0.7"
dbus = { version = "0.9", optional = true }
Expand Down
1 change: 0 additions & 1 deletion sanzu/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#[macro_use]
extern crate log;

#[cfg(windows)]
#[macro_use]
extern crate lazy_static;

Expand Down
115 changes: 94 additions & 21 deletions sanzu/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,21 @@ use sanzu_common::{
ReadWrite, Tunnel,
};

use serde::{Deserialize, Serialize};

use spin_sleep::LoopHelper;
use std::{
net::{self, IpAddr, TcpListener},
sync::Mutex,
time::Instant,
};

#[cfg(unix)]
use std::io::{self, BufRead, Write};

#[cfg(unix)]
use std::{
sync::mpsc::channel,
sync::mpsc::{channel, Sender},
thread::{self},
};

Expand All @@ -47,6 +53,21 @@ use crate::server_x11::init_x11rb;
#[cfg(windows)]
use crate::server_windows::init_win;

#[derive(Debug, Serialize, Deserialize, Clone, Default)]
struct ServerStats {
fps: u64,
frame_time: u64,
grab: u64,
enc: u64,
send: u64,
recv: u64,
size: (u16, u16),
}

lazy_static! {
static ref SERVER_STATS: Mutex<ServerStats> = Mutex::new(ServerStats::default());
}

/// Tls auth / Kerberos Auth
fn auth_client(
config_tls: &ConfigTls,
Expand Down Expand Up @@ -97,6 +118,57 @@ fn auth_client(
Ok((tls_conn, username))
}

#[cfg(unix)]
/// Handle control api
/// restart => restart encoder
/// stats => send encoding stats
fn control_api(control_path: &str, control_sender: Sender<()>) {
let pid = std::process::id();
let control_path = control_path.replace("%PID%", &format!("{pid}"));
// Try to remove path first
let _ = std::fs::remove_file(&control_path);
let listener = std::os::unix::net::UnixListener::bind(&control_path)
.unwrap_or_else(|_| panic!("Cannot bind {:?}", control_path));
loop {
let (mut client, addr) = listener.accept().expect("Error in UnixListener accept");
info!("Client {:?}", addr);
let control_sender_cp = control_sender.clone();
thread::spawn(move || {
let mut command = String::new();
if let Ok(length) = io::BufReader::new(&mut client).read_line(&mut command) {
info!("Command: {:?} {}", command, length);
match command.trim_end() {
"restart" => {
info!("Restart encoder requested");
control_sender_cp.send(()).expect("Cannot send control");
if client.write_all("Ok".as_bytes()).is_err() {
warn!("Cannot send ok");
}
}
"stats" => {
info!("Stats requested");
let stats = SERVER_STATS.lock().unwrap().clone();

if let Ok(stats_str) = serde_json::to_string(&stats) {
if client.write_all(stats_str.as_bytes()).is_err() {
warn!("Cannot send stat");
}
} else {
warn!("Cannot generate stats");
}
}
_ => {
error!("Unknown command");
if client.write_all("Unknown command".as_bytes()).is_err() {
warn!("Cannot send stat");
}
}
}
}
});
}
}

/// Exec main loop
///
pub fn run(config: &ConfigServer, arguments: &ArgumentsSrv) -> Result<()> {
Expand Down Expand Up @@ -374,34 +446,18 @@ pub fn run_server(config: &ConfigServer, arguments: &ArgumentsSrv) -> Result<()>
let mut loop_helper = LoopHelper::builder().build_with_target_rate(config.video.max_fps as f64); // limit FPS if possible

let mut new_size = None;
let mut cur_size = None;
let (width, height) = server_info.size();
let mut cur_size = Some((width as u32, height as u32));

// Do socket control
#[cfg(unix)]
let (control_sender, control_receiver) = channel();
#[cfg(unix)]
{
let control_path = config
.video
.control_path
.as_ref()
.map(|path| path.to_owned());
if let Some(control_path) = control_path {
if let Some(control_path) = config.video.control_path.as_ref().cloned() {
info!("Listening on control path {:?}", control_path);
thread::spawn(move || {
let pid = std::process::id();
let control_path = control_path.replace("%PID%", &format!("{pid}"));
// Try to remove path first
let _ = std::fs::remove_file(&control_path);
let listener = std::os::unix::net::UnixListener::bind(&control_path)
.unwrap_or_else(|_| panic!("Cannot bind {:?}", control_path));
loop {
let (_, addr) = listener.accept().expect("Error in UnixListener accept");
info!("Client {:?}", addr);
control_sender
.send("test".to_owned())
.expect("Cannot send control");
}
control_api(&control_path.clone(), control_sender);
});
}
}
Expand Down Expand Up @@ -563,6 +619,23 @@ pub fn run_server(config: &ConfigServer, arguments: &ArgumentsSrv) -> Result<()>
);
debug!("{}", msg);
msg_stats = msg;
let fps = match frame_time_micro as u64 {
0 => 0,
micros => 1_000_000 / micros,
};
let size = match cur_size.as_ref() {
Some((width, height)) => (*width as u16, *height as u16),
None => (0, 0),
};
*SERVER_STATS.lock().unwrap() = ServerStats {
fps,
frame_time: (time_start - prev_time_start).as_millis() as u64,
grab: (time_grab - time_start).as_millis() as u64,
enc: (time_encode - time_event).as_millis() as u64,
send: (time_send - time_sound).as_millis() as u64,
recv: (time_stop - time_send).as_millis() as u64,
size,
};

prev_time_start = time_start;
loop_helper.loop_sleep(); // sleeps to acheive target FPS rate
Expand Down

0 comments on commit 698a8a2

Please sign in to comment.