From 2ce984cb790f4e6f68ac54e39afe85d678b2bce6 Mon Sep 17 00:00:00 2001 From: Harry Date: Fri, 28 Jan 2022 01:24:19 +0000 Subject: [PATCH 1/3] Add `signal-hook` as a dependency for transmit --- srt-transmit/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/srt-transmit/Cargo.toml b/srt-transmit/Cargo.toml index 92f47676..d1337d25 100644 --- a/srt-transmit/Cargo.toml +++ b/srt-transmit/Cargo.toml @@ -20,6 +20,7 @@ bytes = "1.0" anyhow = "1" pretty_env_logger = { version = "0.4", default-features = false } futures = { version = "0.3", default-features = false, features = ["std", "async-await"] } +signal-hook = "0.3.12" [dependencies.tokio] version = "1" From de575e9fc9983ef0570d99f02f4cdddf8a7e9905 Mon Sep 17 00:00:00 2001 From: Harry Date: Fri, 28 Jan 2022 01:25:51 +0000 Subject: [PATCH 2/3] Alter piping loop in transmit to break if SIGINT is set --- srt-transmit/src/main.rs | 42 +++++++++++++++++++++++++++++++++------- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/srt-transmit/src/main.rs b/srt-transmit/src/main.rs index b571b5a0..bb72ac67 100644 --- a/srt-transmit/src/main.rs +++ b/srt-transmit/src/main.rs @@ -10,6 +10,7 @@ use std::{ process::exit, task::{Context, Poll}, time::{Duration, Instant}, + sync::{Arc, atomic::{AtomicBool, Ordering}}, }; use anyhow::{anyhow, bail, format_err, Error}; @@ -30,6 +31,8 @@ use tokio::{ net::TcpListener, net::TcpStream, net::UdpSocket, + time::interval, + select, }; use tokio_util::{codec::BytesCodec, codec::Framed, codec::FramedWrite, udp::UdpFramed}; @@ -40,6 +43,8 @@ use srt_tokio::{ use streamer_server::*; +use signal_hook::{flag::register, consts::SIGINT}; + const AFTER_HELPTEXT: &str = include_str!("helptext.txt"); // boxed() combinator for sink, which somehow doesn't exist @@ -637,13 +642,36 @@ async fn run() -> Result<(), Error> { let mut sinks = MultiSinkFlatten::new(sink_streams.drain(..)); - // poll sink and stream in parallel, only yielding when there is something ready for the sink and the stream is good. - while let (_, Some(stream)) = try_join!( - future::poll_fn(|cx| Pin::new(&mut sinks).poll_ready(cx)), - stream_stream.try_next() - )? { - // let a: () = &mut *stream; - sinks.send_all(&mut stream.map(Ok)).await?; + // setup SIGINT handling prerequisites. + let mut sig_intval = interval(Duration::from_millis(500)); + let sig_flg = Arc::new(AtomicBool::new(false)); + register(SIGINT, sig_flg.clone()).unwrap(); + + loop { + select! { + // we need to ensure the main future is driven to completion so we poll a future that only ever completes + // if we've received a shutdown signal. allows use to use select! without issue. + _ = async { + while !sig_flg.load(Ordering::Relaxed) { + sig_intval.tick().await; + } + } => break, + // poll sink and stream in parallel, only yielding when there is something ready for the sink and the + // stream is good. + stream_result = async { + try_join!( + future::poll_fn(|cx| Pin::new(&mut sinks).poll_ready(cx)), + stream_stream.try_next(), + ) + } => { + if let (_, Some(stream)) = stream_result? { + // let a: () = &mut *stream; + sinks.send_all(&mut stream.map(Ok)).await?; + } else { + break; + } + } + } } sinks.close().await?; From d368e85760ce4bfd550d67352ec57fed5729feb3 Mon Sep 17 00:00:00 2001 From: Harry Date: Fri, 28 Jan 2022 01:26:41 +0000 Subject: [PATCH 3/3] Prefer tokio's implementation of `try_join` in transmit --- srt-transmit/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/srt-transmit/src/main.rs b/srt-transmit/src/main.rs index bb72ac67..dab2bc77 100644 --- a/srt-transmit/src/main.rs +++ b/srt-transmit/src/main.rs @@ -24,7 +24,6 @@ use futures::{ prelude::*, ready, stream::{self, once, unfold, BoxStream}, - try_join, }; use tokio::{ io::{AsyncRead, AsyncReadExt}, @@ -33,6 +32,7 @@ use tokio::{ net::UdpSocket, time::interval, select, + try_join, }; use tokio_util::{codec::BytesCodec, codec::Framed, codec::FramedWrite, udp::UdpFramed};