From 3e659b6db8d3076965b6a17ab76dd2d0c4807d8e Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Sun, 6 Aug 2023 15:06:23 +0200 Subject: [PATCH] fix: Signal handling (#15) --- src/main.rs | 27 +++++++++++++++++---------- src/sinks/ffmpeg.rs | 12 ++++++++++-- src/sinks/vnc.rs | 14 +++++++++++--- 3 files changed, 38 insertions(+), 15 deletions(-) diff --git a/src/main.rs b/src/main.rs index 14d2339..7336e38 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ use breakwater::{ use clap::Parser; use env_logger::Env; use std::sync::Arc; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::{broadcast, mpsc, oneshot}; #[cfg(feature = "vnc")] use { breakwater::sinks::vnc::VncServer, @@ -30,6 +30,9 @@ async fn main() -> Result<(), Box> { let (statistics_tx, statistics_rx) = mpsc::channel::(100); let (statistics_information_tx, statistics_information_rx_for_prometheus_exporter) = broadcast::channel::(2); + let (ffmpeg_terminate_signal_tx, ffmpeg_terminate_signal_rx) = oneshot::channel(); + #[cfg(feature = "vnc")] + let (vnc_terminate_signal_tx, vnc_terminate_signal_rx) = oneshot::channel(); #[cfg(feature = "vnc")] let statistics_information_rx_for_vnc_server = statistics_information_tx.subscribe(); @@ -53,8 +56,9 @@ async fn main() -> Result<(), Box> { }); let ffmpeg_sink = FfmpegSink::new(&args, Arc::clone(&fb)); - let ffmpeg_thread = - ffmpeg_sink.map(|sink| tokio::spawn(async move { sink.run().await.unwrap() })); + let ffmpeg_thread = ffmpeg_sink.map(|sink| { + tokio::spawn(async move { sink.run(ffmpeg_terminate_signal_rx).await.unwrap() }) + }); #[cfg(feature = "vnc")] let vnc_server_thread = { @@ -73,6 +77,7 @@ async fn main() -> Result<(), Box> { args.fps, statistics_tx, statistics_information_rx_for_vnc_server, + vnc_terminate_signal_rx, &args.text, &args.font, ); @@ -93,17 +98,19 @@ async fn main() -> Result<(), Box> { prometheus_exporter.run().await; }); - prometheus_exporter_thread.await?; - network_listener_thread.await?; + tokio::signal::ctrl_c().await?; + + prometheus_exporter_thread.abort(); + network_listener_thread.abort(); + statistics_thread.abort(); if let Some(ffmpeg_thread) = ffmpeg_thread { - ffmpeg_thread.await?; + ffmpeg_terminate_signal_tx.send("bye bye ffmpeg")?; + ffmpeg_thread.abort(); } - statistics_thread.await?; #[cfg(feature = "vnc")] { - vnc_server_thread - .join() - .expect("Failed to join VNC server thread"); + vnc_terminate_signal_tx.send("bye bye vnc")?; + vnc_server_thread.join().unwrap(); } Ok(()) diff --git a/src/sinks/ffmpeg.rs b/src/sinks/ffmpeg.rs index 66819b3..23cffc4 100644 --- a/src/sinks/ffmpeg.rs +++ b/src/sinks/ffmpeg.rs @@ -1,7 +1,7 @@ use std::{process::Stdio, sync::Arc, time::Duration}; use chrono::Local; -use tokio::{io::AsyncWriteExt, process::Command, time}; +use tokio::{io::AsyncWriteExt, process::Command, sync::oneshot::Receiver, time}; use crate::{args::Args, framebuffer::FrameBuffer}; @@ -26,7 +26,10 @@ impl FfmpegSink { } } - pub async fn run(&self) -> tokio::io::Result<()> { + pub async fn run<'a>( + &self, + mut terminate_signal_rx: Receiver<&'a str>, + ) -> tokio::io::Result<()> { let mut ffmpeg_args: Vec = self .ffmpeg_input_args() .into_iter() @@ -86,6 +89,7 @@ impl FfmpegSink { log::info!("ffmpeg {}", ffmpeg_args.join(" ")); let mut command = Command::new("ffmpeg") + .kill_on_drop(false) .args(ffmpeg_args) .stdin(Stdio::piped()) .spawn() @@ -98,6 +102,10 @@ impl FfmpegSink { let mut interval = time::interval(Duration::from_micros(1_000_000 / 30)); loop { + if terminate_signal_rx.try_recv().is_ok() { + command.kill().await?; + return Ok(()); + } let bytes = self.fb.as_bytes(); stdin.write_all(bytes).await?; interval.tick().await; diff --git a/src/sinks/vnc.rs b/src/sinks/vnc.rs index cc6a351..d117241 100644 --- a/src/sinks/vnc.rs +++ b/src/sinks/vnc.rs @@ -5,8 +5,8 @@ use number_prefix::NumberPrefix; use rusttype::{point, Font, Scale}; use std::sync::Arc; use std::time::Duration; -use tokio::sync::broadcast; use tokio::sync::mpsc::Sender; +use tokio::sync::{broadcast, oneshot}; use vncserver::{ rfb_framebuffer_malloc, rfb_get_screen, rfb_init_server, rfb_mark_rect_as_modified, rfb_run_event_loop, RfbScreenInfoPtr, @@ -14,25 +14,28 @@ use vncserver::{ const STATS_HEIGHT: usize = 35; -pub struct VncServer<'a> { +pub struct VncServer<'a, 'b> { fb: Arc, screen: RfbScreenInfoPtr, target_fps: u32, statistics_tx: Sender, statistics_information_rx: broadcast::Receiver, + terminate_signal_tx: oneshot::Receiver<&'b str>, text: &'a str, font: Font<'a>, } -impl<'a> VncServer<'a> { +impl<'a, 'b> VncServer<'a, 'b> { + #[allow(clippy::too_many_arguments)] pub fn new( fb: Arc, port: u32, target_fps: u32, statistics_tx: Sender, statistics_information_rx: broadcast::Receiver, + terminate_signal_tx: oneshot::Receiver<&'b str>, text: &'a str, font: &'a str, ) -> Self { @@ -74,6 +77,7 @@ impl<'a> VncServer<'a> { target_fps, statistics_tx, statistics_information_rx, + terminate_signal_tx, text, font, } @@ -92,6 +96,10 @@ impl<'a> VncServer<'a> { let fb_size_up_to_stats_text = fb.get_width() * height_up_to_stats_text; loop { + if self.terminate_signal_tx.try_recv().is_ok() { + return; + } + let start = std::time::Instant::now(); vnc_fb_slice[0..fb_size_up_to_stats_text] .copy_from_slice(&fb_slice[0..fb_size_up_to_stats_text]);