Skip to content

Commit

Permalink
clear pidmap of stale streams (#91)
Browse files Browse the repository at this point in the history
* clear pidmap of stale streams

* update version to 0.7.6

---------

Co-authored-by: Chris Kennedy <[email protected]>
Co-authored-by: Chris Kennedy <[email protected]>
  • Loading branch information
3 people authored Aug 5, 2024
1 parent bd3f8b4 commit 7c881ae
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license-file = "LICENSE"
homepage = "https://github.com/groovybits/rscap"
repository = "https://github.com/groovybits/rscap"
authors = ["Chris Kennedy"]
version = "0.7.5"
version = "0.7.6"
edition = "2021"

[lib]
Expand Down
2 changes: 1 addition & 1 deletion specs/rsprobe.spec
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Name: rsprobe
Version: 0.7.5
Version: 0.7.6
Release: 1%{?dist}
Summary: MpegTS Stream Analysis Probe with Kafka and GStreamer
License: MIT
Expand Down
11 changes: 5 additions & 6 deletions src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use rdkafka::types::RDKafkaErrorCode;
use rsprobe::get_system_stats;
use rsprobe::stream_data::process_mpegts_packet;
use rsprobe::stream_data::{
get_pid_map, identify_video_pid, parse_and_store_pat, process_packet, update_pid_map, Codec,
ImageData, PmtInfo, StreamData, Tr101290Errors, PAT_PID,
cleanup_stale_streams, get_pid_map, identify_video_pid, parse_and_store_pat, process_packet,
update_pid_map, Codec, ImageData, PmtInfo, StreamData, Tr101290Errors, PAT_PID,
};
#[cfg(feature = "gst")]
use rsprobe::stream_data::{initialize_pipeline, process_video_packets, pull_images};
Expand Down Expand Up @@ -494,7 +494,7 @@ async fn send_to_kafka(
#[derive(Parser, Debug)]
#[clap(
author = "Chris Kennedy",
version = "0.7.5",
version = "0.7.6",
about = "MpegTS Stream Analysis Probe with Kafka and GStreamer"
)]
struct Args {
Expand Down Expand Up @@ -1601,9 +1601,7 @@ async fn rsprobe(running: Arc<AtomicBool>) {
&args.image_framerate,
args.extract_images,
) {
Ok((pipeline, appsrc, appsink)) => {
(pipeline, appsrc, appsink)
}
Ok((pipeline, appsrc, appsink)) => (pipeline, appsrc, appsink),
Err(err) => {
eprintln!("Failed to initialize the pipeline: {}", err);
return;
Expand Down Expand Up @@ -1765,6 +1763,7 @@ async fn rsprobe(running: Arc<AtomicBool>) {
pmt_pid = Some(pid);
}
// Update PID_MAP with new stream types
cleanup_stale_streams();
let program_number_result = update_pid_map(
&packet_chunk,
&pmt_info.packet,
Expand Down
44 changes: 34 additions & 10 deletions src/stream_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ use std::io::Write;
#[cfg(feature = "gst")]
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::RwLock;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{fmt, sync::Arc, sync::Mutex};
#[cfg(feature = "gst")]
use tokio::sync::mpsc;
#[cfg(feature = "gst")]
use tokio::time::Duration;

const IAT_CAPTURE_WINDOW_SIZE: usize = 100;

Expand Down Expand Up @@ -86,14 +85,7 @@ pub fn initialize_pipeline(
scale: bool,
framerate: &str,
extract_images: bool,
) -> Result<
(
gst::Pipeline,
gst_app::AppSrc,
gst_app::AppSink,
),
anyhow::Error,
> {
) -> Result<(gst::Pipeline, gst_app::AppSrc, gst_app::AppSink), anyhow::Error> {
gst::init()?;

let width = (((height as f32 * 16.0 / 9.0) / 16.0).round() * 16.0) as u32;
Expand Down Expand Up @@ -1426,6 +1418,38 @@ pub fn process_packet(
}
}

pub fn cleanup_stale_streams() {
let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;

let one_minute = 60_000; // 1 minute in milliseconds

let mut pid_map = PID_MAP.write().unwrap();
let stale_pids: Vec<u16> = pid_map
.iter()
.filter_map(|(&pid, stream_data)| {
if current_time.saturating_sub(stream_data.last_arrival_time) > one_minute {
Some(pid)
} else {
None
}
})
.collect();

for pid in stale_pids {
if let Some(removed_stream) = pid_map.remove(&pid) {
info!(
"Removed stale stream: PID {}, Program {}, Last seen {} ms ago",
pid,
removed_stream.program_number,
current_time.saturating_sub(removed_stream.last_arrival_time)
);
}
}
}

// Use the stored PAT packet
pub fn update_pid_map(
pmt_packet: &[u8],
Expand Down

0 comments on commit 7c881ae

Please sign in to comment.