Skip to content

Commit

Permalink
0.7.7 fix various stream identification issues
Browse files Browse the repository at this point in the history
pmt parsing improvements, adaption field is checked now :)
  • Loading branch information
ltn-chriskennedy committed Aug 19, 2024
1 parent 7c881ae commit fc79bef
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 51 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license-file = "LICENSE"
homepage = "https://github.com/groovybits/rscap"
repository = "https://github.com/groovybits/rscap"
authors = ["Chris Kennedy"]
version = "0.7.6"
version = "0.7.7"
edition = "2021"

[lib]
Expand Down
2 changes: 1 addition & 1 deletion specs/rsprobe.spec
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Name: rsprobe
Version: 0.7.6
Version: 0.7.7
Release: 1%{?dist}
Summary: MpegTS Stream Analysis Probe with Kafka and GStreamer
License: MIT
Expand Down
3 changes: 2 additions & 1 deletion src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ async fn send_to_kafka(
#[derive(Parser, Debug)]
#[clap(
author = "Chris Kennedy",
version = "0.7.6",
version = "0.7.7",
about = "MpegTS Stream Analysis Probe with Kafka and GStreamer"
)]
struct Args {
Expand Down Expand Up @@ -1677,6 +1677,7 @@ async fn rsprobe(running: Arc<AtomicBool>) {
let mut pmt_info: PmtInfo = PmtInfo {
pid: 0xFFFF,
packet: Vec::new(),
program_number: 0,
};
let mut pmt_pid: Option<u16> = Some(0xFFFF);
let mut program_number: Option<u16> = Some(0xFFFF);
Expand Down
179 changes: 131 additions & 48 deletions src/stream_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ pub fn pull_images(
pub const PAT_PID: u16 = 0;
pub const TS_PACKET_SIZE: usize = 188;

#[derive(Debug)]
pub struct PatEntry {
pub program_number: u16,
pub pmt_pid: u16,
Expand All @@ -412,10 +413,18 @@ pub struct PmtEntry {
pub stream_pid: u16,
pub stream_type: u8, // Stream type (e.g., 0x02 for MPEG video)
pub program_number: u16,
pub descriptors: Vec<Descriptor>,
}

pub struct Pmt {
pub entries: Vec<PmtEntry>,
pub descriptors: Vec<Descriptor>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Descriptor {
pub tag: u8,
pub data: Vec<u8>,
}

#[derive(Clone, PartialEq)]
Expand Down Expand Up @@ -1167,6 +1176,7 @@ pub fn extract_pid(packet: &[u8]) -> u16 {
pub struct PmtInfo {
pub pid: u16,
pub packet: Vec<u8>,
pub program_number: u16,
}

// Helper function to parse PAT and update global PAT packet storage
Expand All @@ -1175,104 +1185,177 @@ pub fn parse_and_store_pat(packet: &[u8]) -> PmtInfo {
let mut pmt_info = PmtInfo {
pid: 0xFFFF,
packet: Vec::new(),
program_number: 0,
};
pmt_info.packet = packet.to_vec();

// loook for the program that is non zero and below 0x1FFF
debug!("ParseAndStorePAT: Found {} PAT entries {:?}", pat_entries.len(), pat_entries);

let mut found_pmt = false;

// look for the PMT Pid and Program Number that are greater zero and less than 0x1FFF for PMT PID
for entry in pat_entries {
if entry.pmt_pid != 0 && entry.pmt_pid < 0x1FFF {
pmt_info.pid = entry.pmt_pid;
break;
if entry.pmt_pid > 0 && entry.pmt_pid <= 0x1FFF {
if entry.pmt_pid > 0 && entry.pmt_pid <= 0x1FFF && entry.program_number > 0 {
// TODO: return an array of all valid PMT PIDs and Program Numbers
pmt_info.pid = entry.pmt_pid;
pmt_info.program_number = entry.program_number;

debug!(
"ParseAndStorePAT: Found Program Number: {} PMT PID: {}",
entry.program_number, entry.pmt_pid
);
found_pmt = true;
} else {
log::warn!(
"ParseAndStorePAT: PMT Pid OUT OF RANGE, Skipping Program Number: {} PMT PID: {}",
entry.program_number, entry.pmt_pid
);
}
}
}
// print info on if we found a valid PMT PID or not
if found_pmt {
debug!("ParseAndStorePAT: Found PMT PID: {} Program Number: {}", pmt_info.pid, pmt_info.program_number);
}
pmt_info
}

// Helper function to extract descriptors with bounds checking
fn parse_descriptors(packet: &[u8], offset: usize, length: usize) -> Vec<Descriptor> {
let mut descriptors = Vec::new();
let mut i = offset;

// Ensure that the descriptor parsing does not exceed packet bounds
while i < offset + length && i + 2 <= packet.len() {
let tag = packet[i];
let len = packet[i + 1] as usize;

// Check if the remaining data is sufficient to read the descriptor
if i + 2 + len <= packet.len() {
let data = packet[i + 2..i + 2 + len].to_vec();
descriptors.push(Descriptor { tag, data });
} else {
// If there's not enough data for the descriptor, break the loop
break;
}

i += 2 + len;
}

descriptors
}

// Parse PAT packets
pub fn parse_pat(packet: &[u8]) -> Vec<PatEntry> {
let mut entries = Vec::new();

// Check for minimum packet size
if packet.len() < TS_PACKET_SIZE {
return entries;
}

// Check if Payload Unit Start Indicator (PUSI) is set
let pusi = (packet[1] & 0x40) != 0;
if !pusi {
// If Payload Unit Start Indicator is not set, this packet does not start a new PAT
return entries;
}

let adaptation_field_control = (packet[3] & 0x30) >> 4;
let mut offset = 4; // start after TS header
let mut offset = 4;

// Check for adaptation field and skip it
if adaptation_field_control == 0x02 || adaptation_field_control == 0x03 {
let adaptation_field_length = packet[4] as usize;
offset += 1 + adaptation_field_length; // +1 for the length byte itself
offset += 1 + adaptation_field_length;
}

// Pointer field indicates the start of the PAT section
let pointer_field = packet[offset] as usize;
offset += 1 + pointer_field; // Skip pointer field
offset += 1 + pointer_field;

// Now, 'offset' points to the start of the PAT section
while offset + 4 <= packet.len() {
let program_number = ((packet[offset] as u16) << 8) | (packet[offset + 1] as u16);
let pmt_pid = (((packet[offset + 2] as u16) & 0x1F) << 8) | (packet[offset + 3] as u16);

// Only add valid entries (non-zero program_number and pmt_pid)
if program_number != 0 && pmt_pid != 0 && pmt_pid < 0x1FFF && program_number < 100 {
if program_number > 0 && pmt_pid > 0 && pmt_pid <= 0x1FFF && program_number < 5000 {
entries.push(PatEntry {
program_number,
pmt_pid,
});
debug!(
"ParsePAT: Found Program Number: {} PMT PID: {}",
program_number, pmt_pid
);
} else {
if program_number <= 0 {
debug!(
"ParsePAT: Skipping Program Number <= 0: {} PMT PID: {}",
program_number, pmt_pid
);
} else if pmt_pid <= 0 {
debug!(
"ParsePAT: Skipping PMT PID <= 0: {} Program Number: {}",
pmt_pid, program_number
);
} else if pmt_pid > 0x1FFF {
debug!(
"ParsePAT: Skipping PMT PID >= 0x1FFF: {} Program Number: {}",
pmt_pid, program_number
);
}
}

debug!(
"ParsePAT: Program Number: {} PMT PID: {}",
program_number, pmt_pid
);

offset += 4; // Move to the next PAT entry
offset += 4;
}

entries
}

// Parse PMT packets
pub fn parse_pmt(packet: &[u8]) -> Pmt {
let mut entries = Vec::new();
let program_number = ((packet[8] as u16) << 8) | (packet[9] as u16);

// Calculate the starting position for stream entries
let section_length = (((packet[6] as usize) & 0x0F) << 8) | packet[7] as usize;
let program_info_length = (((packet[15] as usize) & 0x0F) << 8) | packet[16] as usize;
let mut i = 17 + program_info_length; // Starting index of the first stream in the PMT

debug!(
"ParsePMT: Program Number: {} PMT PID: {} starting at position {}",
program_number,
extract_pid(packet),
i
);
while i + 5 <= packet.len() && i < 17 + section_length - 4 {
let stream_type = packet[i];
let stream_pid = (((packet[i + 1] as u16) & 0x1F) << 8) | (packet[i + 2] as u16);
let es_info_length = (((packet[i + 3] as usize) & 0x0F) << 8) | packet[i + 4] as usize;
i += 5 + es_info_length; // Update index to point to next stream's info

let adaptation_field_control = (packet[3] & 0x30) >> 4;
let mut offset = 0;

if adaptation_field_control == 0x02 || adaptation_field_control == 0x03 {
let adaptation_field_length = packet[4] as usize;
offset += 1 + adaptation_field_length;
}

if offset + 4 > packet.len() {
error!("ParsePMT: Packet size is incorrect: {}", packet.len());
return Pmt {
entries,
descriptors: Vec::new(),
};
}

let program_number = ((packet[8 + offset] as u16) << 8) | (packet[9 + offset] as u16);

let section_length = (((packet[6+ offset] as usize) & 0x0F) << 8) | packet[7 + offset] as usize;
let program_info_length = (((packet[15 + offset] as usize) & 0x0F) << 8) | packet[16 + offset] as usize;
let mut i = 17 + program_info_length;

// Parse program descriptors
let descriptors = parse_descriptors(packet, 17 + offset, program_info_length);

while i + 5 + offset <= packet.len() && i < 17 + section_length - 4 {
let stream_type = packet[i + offset];
let stream_pid = (((packet[i + 1 + offset] as u16) & 0x1F) << 8) | (packet[i + 2 + offset] as u16);
let es_info_length = (((packet[i + 3 + offset] as usize) & 0x0F) << 8) | packet[i + 4 + offset] as usize;

// Parse ES descriptors
let es_descriptors = parse_descriptors(packet, i + 5 + offset, es_info_length);

entries.push(PmtEntry {
stream_pid,
stream_type,
program_number,
descriptors: es_descriptors,
});
debug!(
"ParsePMT: ProgramNumber: {}, Stream PID: {}, Stream Type: {}",
program_number, stream_pid, stream_type
);

i += 5 + es_info_length;
}

Pmt { entries }
Pmt { entries, descriptors }
}

// Invoke this function for each MPEG-TS packet
Expand Down Expand Up @@ -1481,7 +1564,7 @@ pub fn update_pid_map(
program_number_result = program_number;

// Ensure the current PMT packet matches the PMT PID from the PAT
if extract_pid(pmt_packet) == pmt_pid {
//if extract_pid(pmt_packet) == pmt_pid {
let pmt = parse_pmt(pmt_packet);

for pmt_entry in pmt.entries.iter() {
Expand Down Expand Up @@ -1584,9 +1667,9 @@ pub fn update_pid_map(
pid_map.insert(stream_pid, stream_data);
}
}
} else {
error!("UpdatePIDmap: Skipping PMT PID: {} as it does not match with current PMT packet PID", pmt_pid);
}
//} else {
// error!("UpdatePIDmap: Skipping PMT PID: {} as it does not match with current PMT packet PID", pmt_pid);
//}
}
program_number_result
}
Expand Down

0 comments on commit fc79bef

Please sign in to comment.