diff --git a/srt-transmit/Cargo.toml b/srt-transmit/Cargo.toml index 92f47676..d1337d25 100644 --- a/srt-transmit/Cargo.toml +++ b/srt-transmit/Cargo.toml @@ -20,6 +20,7 @@ bytes = "1.0" anyhow = "1" pretty_env_logger = { version = "0.4", default-features = false } futures = { version = "0.3", default-features = false, features = ["std", "async-await"] } +signal-hook = "0.3.12" [dependencies.tokio] version = "1" diff --git a/srt-transmit/src/main.rs b/srt-transmit/src/main.rs index b571b5a0..dab2bc77 100644 --- a/srt-transmit/src/main.rs +++ b/srt-transmit/src/main.rs @@ -10,6 +10,7 @@ use std::{ process::exit, task::{Context, Poll}, time::{Duration, Instant}, + sync::{Arc, atomic::{AtomicBool, Ordering}}, }; use anyhow::{anyhow, bail, format_err, Error}; @@ -23,13 +24,15 @@ use futures::{ prelude::*, ready, stream::{self, once, unfold, BoxStream}, - try_join, }; use tokio::{ io::{AsyncRead, AsyncReadExt}, net::TcpListener, net::TcpStream, net::UdpSocket, + time::interval, + select, + try_join, }; use tokio_util::{codec::BytesCodec, codec::Framed, codec::FramedWrite, udp::UdpFramed}; @@ -40,6 +43,8 @@ use srt_tokio::{ use streamer_server::*; +use signal_hook::{flag::register, consts::SIGINT}; + const AFTER_HELPTEXT: &str = include_str!("helptext.txt"); // boxed() combinator for sink, which somehow doesn't exist @@ -637,13 +642,36 @@ async fn run() -> Result<(), Error> { let mut sinks = MultiSinkFlatten::new(sink_streams.drain(..)); - // poll sink and stream in parallel, only yielding when there is something ready for the sink and the stream is good. - while let (_, Some(stream)) = try_join!( - future::poll_fn(|cx| Pin::new(&mut sinks).poll_ready(cx)), - stream_stream.try_next() - )? { - // let a: () = &mut *stream; - sinks.send_all(&mut stream.map(Ok)).await?; + // setup SIGINT handling prerequisites. + let mut sig_intval = interval(Duration::from_millis(500)); + let sig_flg = Arc::new(AtomicBool::new(false)); + register(SIGINT, sig_flg.clone()).unwrap(); + + loop { + select! { + // we need to ensure the main future is driven to completion so we poll a future that only ever completes + // if we've received a shutdown signal. allows use to use select! without issue. + _ = async { + while !sig_flg.load(Ordering::Relaxed) { + sig_intval.tick().await; + } + } => break, + // poll sink and stream in parallel, only yielding when there is something ready for the sink and the + // stream is good. + stream_result = async { + try_join!( + future::poll_fn(|cx| Pin::new(&mut sinks).poll_ready(cx)), + stream_stream.try_next(), + ) + } => { + if let (_, Some(stream)) = stream_result? { + // let a: () = &mut *stream; + sinks.send_all(&mut stream.map(Ok)).await?; + } else { + break; + } + } + } } sinks.close().await?;