Skip to content

Commit

Permalink
Improve signal handling (slightly)
Browse files Browse the repository at this point in the history
  • Loading branch information
sbernauer committed May 14, 2024
1 parent acf7273 commit a924288
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 19 deletions.
59 changes: 48 additions & 11 deletions breakwater/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{num::TryFromIntError, sync::Arc};
use std::{env, num::TryFromIntError, sync::Arc};

use breakwater_core::framebuffer::FrameBuffer;
use clap::Parser;
use env_logger::Env;
use log::{info, trace};
use prometheus_exporter::PrometheusExporter;
use sinks::ffmpeg::FfmpegSink;
use snafu::{ResultExt, Snafu};
Expand All @@ -17,6 +17,7 @@ use crate::{
#[cfg(feature = "vnc")]
use {
crate::sinks::vnc::{self, VncServer},
log::warn,
thread_priority::{ThreadBuilderExt, ThreadPriority},
};

Expand Down Expand Up @@ -46,6 +47,15 @@ pub enum Error {
network_buffer_size: i64,
},

#[snafu(display("ffmpeg dump thread error"))]
FfmpegDumpThread { source: sinks::ffmpeg::Error },

#[snafu(display("Failed to send ffmpg dump thread termination signal"))]
SendFfmpegDumpTerminationSignal {},

#[snafu(display("Failed to join ffmpg dump thread"))]
JoinFfmpegDumpThread { source: tokio::task::JoinError },

#[cfg(feature = "vnc")]
#[snafu(display("Failed to spawn VNC server thread"))]
SpawnVncServerThread { source: std::io::Error },
Expand All @@ -69,7 +79,11 @@ pub enum Error {

#[tokio::main]
async fn main() -> Result<(), Error> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
if env::var("RUST_LOG").is_err() {
env::set_var("RUST_LOG", "info")
}
env_logger::init();

let args = CliArgs::parse();

let fb = Arc::new(FrameBuffer::new(args.width, args.height));
Expand Down Expand Up @@ -125,7 +139,12 @@ async fn main() -> Result<(), Error> {

let ffmpeg_sink = FfmpegSink::new(&args, Arc::clone(&fb));
let ffmpeg_thread = ffmpeg_sink.map(|sink| {
tokio::spawn(async move { sink.run(ffmpeg_terminate_signal_rx).await.unwrap() })
tokio::spawn(async move {
sink.run(ffmpeg_terminate_signal_rx)
.await
.context(FfmpegDumpThreadSnafu)?;
Ok::<(), Error>(())
})
});

#[cfg(feature = "vnc")]
Expand Down Expand Up @@ -165,23 +184,41 @@ async fn main() -> Result<(), Error> {

prometheus_exporter_thread.abort();
server_listener_thread.abort();
statistics_thread.abort();

let ffmpeg_thread_present = ffmpeg_thread.is_some();
if let Some(ffmpeg_thread) = ffmpeg_thread {
let _ = ffmpeg_terminate_signal_tx.send(());
ffmpeg_thread.abort();
ffmpeg_terminate_signal_tx
.send(())
.map_err(|_| Error::SendFfmpegDumpTerminationSignal {})?;

trace!("Waiting for thread dumping data into ffmpeg to terminate");
ffmpeg_thread.await.context(JoinFfmpegDumpThreadSnafu)??;
trace!("thread dumping data into ffmpeg terminated");
}

#[cfg(feature = "vnc")]
{
vnc_terminate_signal_tx
.send("bye bye vnc".to_string())
.map_err(|_| Error::SendVncServerShutdownSignal {})?;
trace!("Sending termination signal to vnc thread");
if let Err(err) = vnc_terminate_signal_tx.send(()) {
warn!(
"Failed to send termination signal to vnc thread, it seems to already have terminated: {err:?}",
)
}
trace!("Joining vnc thread");
vnc_server_thread
.join()
.map_err(|_| Error::StopVncServerThread {})??;
trace!("Vnc thread terminated");
}

log::info!("Successfully shut down");
// We need to stop this thread as the last, as others always try to send statistics to it
statistics_thread.abort();

if ffmpeg_thread_present {
info!("Successfully shut down (there might still be a ffmped process running - it's complicated)");
} else {
info!("Successfully shut down");
}

Ok(())
}
45 changes: 39 additions & 6 deletions breakwater/src/sinks/ffmpeg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@ use std::{process::Stdio, sync::Arc, time::Duration};

use breakwater_core::framebuffer::FrameBuffer;
use chrono::Local;
use log::debug;
use snafu::{ResultExt, Snafu};
use tokio::{io::AsyncWriteExt, process::Command, sync::oneshot::Receiver, time};
use tokio::{
io::AsyncWriteExt,
process::Command,
sync::oneshot::Receiver,
time::{self},
};

use crate::cli_args::CliArgs;

Expand All @@ -15,9 +21,6 @@ pub enum Error {
command: String,
},

#[snafu(display("Failed to kill ffmpeg command"))]
KillFfmpeg { source: std::io::Error },

#[snafu(display("Failed to write new data to ffmpeg via stdout"))]
WriteDataToFfmeg { source: std::io::Error },
}
Expand Down Expand Up @@ -99,7 +102,7 @@ impl FfmpegSink {
}

let ffmpeg_command = format!("ffmpeg {}", ffmpeg_args.join(" "));
log::info!("{ffmpeg_command}");
debug!("Executing {ffmpeg_command:?}");
let mut command = Command::new("ffmpeg")
.kill_on_drop(false)
.args(ffmpeg_args.clone())
Expand All @@ -117,7 +120,37 @@ impl FfmpegSink {
let mut interval = time::interval(Duration::from_micros(1_000_000 / 30));
loop {
if terminate_signal_rx.try_recv().is_ok() {
command.kill().await.context(KillFfmpegSnafu)?;
// Normally we would send SIGINT to ffmpeg and let the process shutdown gracefully and afterwards call
// `command.wait().await`. Hopever using the `nix` crate to send a `SIGINT` resulted in ffmpeg
// [2024-05-14T21:35:25Z TRACE breakwater::sinks::ffmpeg] Sending SIGINT to ffmpeg process with pid 58786
// [out#0/mp4 @ 0x1048740] Error writing trailer: Immediate exit requested
//
// As you can see this also corrupted the output mp4 :(
// So instead we let the process running here and let the kernel clean up (?), which seems to work (?)

// trace!("Killing ffmpeg process");

// if cfg!(target_os = "linux") {
// if let Some(pid) = command.id() {
// trace!("Sending SIGINT to ffmpeg process with pid {pid}");
// nix::sys::signal::kill(
// nix::unistd::Pid::from_raw(pid.try_into().unwrap()),
// nix::sys::signal::Signal::SIGINT,
// )
// .unwrap();
// } else {
// error!("The ffmpeg process had no PID, so I could not kill it. Will let tokio kill it instead");
// command.start_kill().unwrap();
// }
// } else {
// trace!("As I'm not on Linux, YOLO-ing it by letting tokio kill it ");
// command.start_kill().unwrap();
// }

// let start = Instant::now();
// command.wait().await.unwrap();
// trace!("Killied ffmpeg process in {:?}", start.elapsed());

return Ok(());
}
let bytes = self.fb.as_bytes();
Expand Down
4 changes: 2 additions & 2 deletions breakwater/src/sinks/vnc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct VncServer<'a> {

statistics_tx: Sender<StatisticsEvent>,
statistics_information_rx: broadcast::Receiver<StatisticsInformationEvent>,
terminate_signal_tx: oneshot::Receiver<String>,
terminate_signal_tx: oneshot::Receiver<()>,

text: String,
font: Font<'a>,
Expand All @@ -64,7 +64,7 @@ impl<'a> VncServer<'a> {
target_fps: u32,
statistics_tx: Sender<StatisticsEvent>,
statistics_information_rx: broadcast::Receiver<StatisticsInformationEvent>,
terminate_signal_tx: oneshot::Receiver<String>,
terminate_signal_tx: oneshot::Receiver<()>,
text: String,
font: String,
) -> Result<Self, Error> {
Expand Down

0 comments on commit a924288

Please sign in to comment.