Skip to content

Commit

Permalink
exit on gstreamer failure
Browse files Browse the repository at this point in the history
restart and avoid issues
  • Loading branch information
ltn-chriskennedy committed Aug 26, 2024
1 parent 4ea0e3d commit 82ee00b
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license-file = "LICENSE"
homepage = "https://github.com/groovybits/rscap"
repository = "https://github.com/groovybits/rscap"
authors = ["Chris Kennedy"]
version = "0.7.8"
version = "0.7.9"
edition = "2021"

[lib]
Expand Down
2 changes: 1 addition & 1 deletion specs/rsprobe.spec
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Name: rsprobe
Version: 0.7.8
Version: 0.7.9
Release: 1%{?dist}
Summary: MpegTS Stream Analysis Probe with Kafka and GStreamer
License: MIT
Expand Down
19 changes: 12 additions & 7 deletions src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ async fn send_to_kafka(
#[derive(Parser, Debug)]
#[clap(
author = "Chris Kennedy",
version = "0.7.8",
version = "0.7.9",
about = "MpegTS Stream Analysis Probe with Kafka and GStreamer"
)]
struct Args {
Expand Down Expand Up @@ -660,7 +660,7 @@ struct Args {
image_buffer_size: usize,

/// Video buffer size - Size of the buffer for the video to gstreamer
#[clap(long, env = "VIDEO_BUFFER_SIZE", default_value_t = 100000)]
#[clap(long, env = "VIDEO_BUFFER_SIZE", default_value_t = 1000)]
video_buffer_size: usize,

/// Kafka Broker
Expand Down Expand Up @@ -1893,16 +1893,17 @@ async fn rsprobe(running: Arc<AtomicBool>) {
);

// Send the video packet to the processing task
let mut exit_now = false;
if let Err(_) = video_packet_sender
.try_send(Arc::try_unwrap(video_packet).unwrap_or_default())
{
// If the channel is full, drop the packet
eprintln!("Video packet channel is full. Dropping packet. {} errors so far.", video_packet_errors);
video_packet_errors += 1;
if video_packet_errors > 32 {
if video_packet_errors > 300 {
eprintln!("Probe: Video packet channel has {} errors, restarting Gstreamer.", video_packet_errors);
//running.store(false, Ordering::SeqCst);
//return;
running.store(false, Ordering::SeqCst);
exit_now = true;

// Pause Gstreamer
if gstreamer_playing == true {
Expand All @@ -1912,15 +1913,15 @@ async fn rsprobe(running: Arc<AtomicBool>) {
Err(err) => {
eprintln!("Failed to set the pipeline state to Paused: {}", err);
running.store(false, Ordering::SeqCst);
return;
exit_now = true;
}
}
match pipeline.set_state(gst::State::Playing) {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to set the pipeline state to Playing: {}", err);
running.store(false, Ordering::SeqCst);
return;
exit_now = true;
}
}
}
Expand All @@ -1929,6 +1930,10 @@ async fn rsprobe(running: Arc<AtomicBool>) {
video_packet_errors = 0;
}

if exit_now {
break;
}

// Receive and process images
#[cfg(feature = "gst")]
if let Ok((image_data, pts, duplicates, hash, hamming)) =
Expand Down
6 changes: 3 additions & 3 deletions src/stream_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,10 @@ pub fn process_video_packets(

// Push buffer only if not full
if let Err(err) = appsrc.push_buffer(buffer) {
eprintln!("Buffer full with {} errors, dropping packet: {}", errors, err);
eprintln!("process_video_packets: Gstreamer Buffer full with {} errors, dropping packet: {}", errors, err);
errors += 1;
if errors > 1000 {
break;
if errors > 1000 && errors < 1010 {
eprintln!("process_video_packerts: Too many errors in a row with gstreamer {}!", errors);
}
} else {
errors = 0;
Expand Down

0 comments on commit 82ee00b

Please sign in to comment.