Skip to content

Commit

Permalink
fix: Signal handling (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
sbernauer committed Aug 6, 2023
1 parent 5ed8ff9 commit 3e659b6
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 15 deletions.
27 changes: 17 additions & 10 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use breakwater::{
use clap::Parser;
use env_logger::Env;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use tokio::sync::{broadcast, mpsc, oneshot};
#[cfg(feature = "vnc")]
use {
breakwater::sinks::vnc::VncServer,
Expand All @@ -30,6 +30,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (statistics_tx, statistics_rx) = mpsc::channel::<StatisticsEvent>(100);
let (statistics_information_tx, statistics_information_rx_for_prometheus_exporter) =
broadcast::channel::<StatisticsInformationEvent>(2);
let (ffmpeg_terminate_signal_tx, ffmpeg_terminate_signal_rx) = oneshot::channel();
#[cfg(feature = "vnc")]
let (vnc_terminate_signal_tx, vnc_terminate_signal_rx) = oneshot::channel();
#[cfg(feature = "vnc")]
let statistics_information_rx_for_vnc_server = statistics_information_tx.subscribe();

Expand All @@ -53,8 +56,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});

let ffmpeg_sink = FfmpegSink::new(&args, Arc::clone(&fb));
let ffmpeg_thread =
ffmpeg_sink.map(|sink| tokio::spawn(async move { sink.run().await.unwrap() }));
let ffmpeg_thread = ffmpeg_sink.map(|sink| {
tokio::spawn(async move { sink.run(ffmpeg_terminate_signal_rx).await.unwrap() })
});

#[cfg(feature = "vnc")]
let vnc_server_thread = {
Expand All @@ -73,6 +77,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
args.fps,
statistics_tx,
statistics_information_rx_for_vnc_server,
vnc_terminate_signal_rx,
&args.text,
&args.font,
);
Expand All @@ -93,17 +98,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
prometheus_exporter.run().await;
});

prometheus_exporter_thread.await?;
network_listener_thread.await?;
tokio::signal::ctrl_c().await?;

prometheus_exporter_thread.abort();
network_listener_thread.abort();
statistics_thread.abort();
if let Some(ffmpeg_thread) = ffmpeg_thread {
ffmpeg_thread.await?;
ffmpeg_terminate_signal_tx.send("bye bye ffmpeg")?;
ffmpeg_thread.abort();
}
statistics_thread.await?;
#[cfg(feature = "vnc")]
{
vnc_server_thread
.join()
.expect("Failed to join VNC server thread");
vnc_terminate_signal_tx.send("bye bye vnc")?;
vnc_server_thread.join().unwrap();
}

Ok(())
Expand Down
12 changes: 10 additions & 2 deletions src/sinks/ffmpeg.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{process::Stdio, sync::Arc, time::Duration};

use chrono::Local;
use tokio::{io::AsyncWriteExt, process::Command, time};
use tokio::{io::AsyncWriteExt, process::Command, sync::oneshot::Receiver, time};

use crate::{args::Args, framebuffer::FrameBuffer};

Expand All @@ -26,7 +26,10 @@ impl FfmpegSink {
}
}

pub async fn run(&self) -> tokio::io::Result<()> {
pub async fn run<'a>(
&self,
mut terminate_signal_rx: Receiver<&'a str>,
) -> tokio::io::Result<()> {
let mut ffmpeg_args: Vec<String> = self
.ffmpeg_input_args()
.into_iter()
Expand Down Expand Up @@ -86,6 +89,7 @@ impl FfmpegSink {

log::info!("ffmpeg {}", ffmpeg_args.join(" "));
let mut command = Command::new("ffmpeg")
.kill_on_drop(false)
.args(ffmpeg_args)
.stdin(Stdio::piped())
.spawn()
Expand All @@ -98,6 +102,10 @@ impl FfmpegSink {

let mut interval = time::interval(Duration::from_micros(1_000_000 / 30));
loop {
if terminate_signal_rx.try_recv().is_ok() {
command.kill().await?;
return Ok(());
}
let bytes = self.fb.as_bytes();
stdin.write_all(bytes).await?;
interval.tick().await;
Expand Down
14 changes: 11 additions & 3 deletions src/sinks/vnc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,37 @@ use number_prefix::NumberPrefix;
use rusttype::{point, Font, Scale};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::sync::mpsc::Sender;
use tokio::sync::{broadcast, oneshot};
use vncserver::{
rfb_framebuffer_malloc, rfb_get_screen, rfb_init_server, rfb_mark_rect_as_modified,
rfb_run_event_loop, RfbScreenInfoPtr,
};

const STATS_HEIGHT: usize = 35;

pub struct VncServer<'a> {
pub struct VncServer<'a, 'b> {
fb: Arc<FrameBuffer>,
screen: RfbScreenInfoPtr,
target_fps: u32,

statistics_tx: Sender<StatisticsEvent>,
statistics_information_rx: broadcast::Receiver<StatisticsInformationEvent>,
terminate_signal_tx: oneshot::Receiver<&'b str>,

text: &'a str,
font: Font<'a>,
}

impl<'a> VncServer<'a> {
impl<'a, 'b> VncServer<'a, 'b> {
#[allow(clippy::too_many_arguments)]
pub fn new(
fb: Arc<FrameBuffer>,
port: u32,
target_fps: u32,
statistics_tx: Sender<StatisticsEvent>,
statistics_information_rx: broadcast::Receiver<StatisticsInformationEvent>,
terminate_signal_tx: oneshot::Receiver<&'b str>,
text: &'a str,
font: &'a str,
) -> Self {
Expand Down Expand Up @@ -74,6 +77,7 @@ impl<'a> VncServer<'a> {
target_fps,
statistics_tx,
statistics_information_rx,
terminate_signal_tx,
text,
font,
}
Expand All @@ -92,6 +96,10 @@ impl<'a> VncServer<'a> {
let fb_size_up_to_stats_text = fb.get_width() * height_up_to_stats_text;

loop {
if self.terminate_signal_tx.try_recv().is_ok() {
return;
}

let start = std::time::Instant::now();
vnc_fb_slice[0..fb_size_up_to_stats_text]
.copy_from_slice(&fb_slice[0..fb_size_up_to_stats_text]);
Expand Down

0 comments on commit 3e659b6

Please sign in to comment.