diff --git a/src/packet/mod.rs b/src/packet/mod.rs index 768fbca6..b34a88e1 100644 --- a/src/packet/mod.rs +++ b/src/packet/mod.rs @@ -5,7 +5,7 @@ use std::fmt; use std::panic::UnwindSafe; use thiserror::Error; -use crate::format::Codec; +use crate::format::{Codec, CodecSpec}; use crate::sdp::MediaType; mod g7xx; @@ -206,10 +206,12 @@ pub(crate) enum CodecDepacketizer { Boxed(Box), } -impl From for CodecPacketizer { - fn from(c: Codec) -> Self { - match c { - Codec::Opus => CodecPacketizer::Opus(OpusPacketizer), +impl From for CodecPacketizer { + fn from(c: CodecSpec) -> Self { + match c.codec { + Codec::Opus => { + CodecPacketizer::Opus(OpusPacketizer::new(c.format.use_dtx.unwrap_or_default())) + } Codec::H264 => CodecPacketizer::H264(H264Packetizer::default()), Codec::H265 => unimplemented!("Missing packetizer for H265"), Codec::Vp8 => CodecPacketizer::Vp8(Vp8Packetizer::default()), diff --git a/src/packet/opus.rs b/src/packet/opus.rs index 365da501..df108db6 100644 --- a/src/packet/opus.rs +++ b/src/packet/opus.rs @@ -1,8 +1,21 @@ use super::{CodecExtra, Depacketizer, MediaKind, PacketError, Packetizer}; /// Packetizes Opus RTP packets. -#[derive(Default, Debug, Copy, Clone)] -pub struct OpusPacketizer; +#[derive(Debug, Copy, Clone)] +pub struct OpusPacketizer { + // stores if a marker was previously set + marker: bool, + use_dtx: bool, +} + +impl OpusPacketizer { + pub fn new(use_dtx: bool) -> Self { + Self { + marker: false, + use_dtx, + } + } +} impl Packetizer for OpusPacketizer { fn packetize(&mut self, mtu: usize, payload: &[u8]) -> Result>, PacketError> { @@ -23,8 +36,26 @@ impl Packetizer for OpusPacketizer { } fn is_marker(&mut self, data: &[u8], previous: Option<&[u8]>, last: bool) -> bool { - // TODO: dtx - false + if !self.use_dtx { + return false; + } + // any non silenced packet would generally have more than 2 byts + let mut is_marker = data.len() > 2; + + match self.marker { + true => { + if !is_marker { + self.marker = false; + } + is_marker = false; + } + false => { + if is_marker { + self.marker = true; + } + } + } + is_marker } } @@ -84,7 +115,7 @@ mod test { #[test] fn test_opus_payload() -> Result<(), PacketError> { - let mut pck = OpusPacketizer; + let mut pck = OpusPacketizer::new(true); let empty = &[]; let payload = &[0x90, 0x90, 0x90]; @@ -103,6 +134,43 @@ mod test { Ok(()) } + #[test] + fn test_opus_packetizer_dtx() -> Result<(), PacketError> { + // packetizer with dtx on + let mut pck = OpusPacketizer::new(true); + + let payload = &[0x90, 0x90, 0x90]; + + // Start of talking spurt, marker bit is set. + let is_marker = pck.is_marker(payload, None, false); + assert!(is_marker); + assert!(pck.marker); + + // More talking so is_marker should be false. + let is_marker = pck.is_marker(payload, None, false); + assert!(!is_marker); + assert!(pck.marker); + + // silence packet inserted, internal packetizer state should be reset. + let is_marker = pck.is_marker(&[], None, false); + assert!(!is_marker); + assert!(!pck.marker); + + // talking start again, marker bit is set. + let is_marker = pck.is_marker(payload, None, false); + assert!(is_marker); + assert!(pck.marker); + + let mut pck = OpusPacketizer::new(false); + + // Start of talking spurt, since dtx is false marker should be false + let is_marker = pck.is_marker(payload, None, false); + assert!(!is_marker); + assert!(!pck.marker); + + Ok(()) + } + #[test] fn test_opus_is_partition_head() -> Result<(), PacketError> { let opus = OpusDepacketizer; diff --git a/src/packet/payload.rs b/src/packet/payload.rs index c9244d0b..565139f6 100644 --- a/src/packet/payload.rs +++ b/src/packet/payload.rs @@ -21,7 +21,7 @@ pub struct Payloader { impl Payloader { pub(crate) fn new(spec: CodecSpec) -> Self { Payloader { - pack: spec.codec.into(), + pack: spec.into(), clock_rate: spec.clock_rate, } } diff --git a/tests/rtp_to_frame.rs b/tests/rtp_to_frame.rs deleted file mode 100644 index 62f3af14..00000000 --- a/tests/rtp_to_frame.rs +++ /dev/null @@ -1,110 +0,0 @@ -use std::collections::VecDeque; -use std::time::Duration; - -use str0m::format::Codec; -use str0m::media::MediaKind; -use str0m::rtp::{ExtensionValues, Ssrc}; -use str0m::{Event, Rtc, RtcError}; - -mod common; -use common::{connect_l_r_with_rtc, init_log, progress}; - -#[test] -pub fn audio_start_of_talk_spurt() -> Result<(), RtcError> { - init_log(); - - let rtc1 = Rtc::builder().set_rtp_mode(true).build(); - let rtc2 = Rtc::builder().set_reordering_size_audio(0).build(); - - let (mut l, mut r) = connect_l_r_with_rtc(rtc1, rtc2); - - let mid = "audio".into(); - let ssrc_tx: Ssrc = 1337.into(); - - l.direct_api().declare_media(mid, MediaKind::Audio); - l.direct_api().declare_stream_tx(ssrc_tx, None, mid, None); - r.direct_api().declare_media(mid, MediaKind::Audio); - - let max = l.last.max(r.last); - l.last = max; - r.last = max; - - let params = l.params_opus(); - let ssrc = l.direct_api().stream_tx_by_mid(mid, None).unwrap().ssrc(); - assert_eq!(params.spec().codec, Codec::Opus); - let pt = params.pt(); - - let to_write: Vec<&[u8]> = vec![ - // 1 - &[0x1, 0x2, 0x3, 0x4], - // 3 - &[0x9, 0xa, 0xb, 0xc], - // 2 - &[0x5, 0x6, 0x7, 0x8], - ]; - - let mut to_write: VecDeque<_> = to_write.into(); - - let mut write_at = l.last + Duration::from_millis(300); - - let mut counts: Vec = vec![0, 3, 1]; - - loop { - if l.start + l.duration() > write_at { - write_at = l.last + Duration::from_millis(300); - if let Some(packet) = to_write.pop_front() { - let wallclock = l.start + l.duration(); - - let mut direct = l.direct_api(); - let stream = direct.stream_tx(&ssrc).unwrap(); - - let count = counts.remove(0); - let time = (count * 1000 + 47_000_000) as u32; - let seq_no = (47_000 + count).into(); - - let exts = ExtensionValues { - audio_level: Some(-42 - count as i8), - voice_activity: Some(false), - ..Default::default() - }; - - stream - .write_rtp( - pt, - seq_no, - time, - wallclock, - *seq_no % 2 == 0, // set marker bit on every second packet - exts, - false, - packet.to_vec(), - ) - .expect("clean write"); - } - } - - progress(&mut l, &mut r)?; - - if l.duration() > Duration::from_secs(10) { - break; - } - } - - let media: Vec<_> = r - .events - .iter() - .filter_map(|(_, e)| { - if let Event::MediaData(v) = e { - Some(v) - } else { - None - } - }) - .collect(); - - for m in media { - assert!(m.audio_start_of_talk_spurt == (**m.seq_range.start() % 2 == 0)); - } - - Ok(()) -} diff --git a/tests/talk_start_spurt.rs b/tests/talk_start_spurt.rs new file mode 100644 index 00000000..012d7a01 --- /dev/null +++ b/tests/talk_start_spurt.rs @@ -0,0 +1,229 @@ +use std::collections::VecDeque; +use std::time::Duration; + +use str0m::format::{Codec, FormatParams}; +use str0m::media::{Frequency, MediaKind, MediaTime}; +use str0m::rtp::{ExtensionValues, Ssrc}; +use str0m::{Event, Rtc, RtcError}; + +mod common; +use common::{connect_l_r_with_rtc, init_log, progress}; + +#[test] +pub fn audio_start_of_talk_spurt_frame() -> Result<(), RtcError> { + init_log(); + + let rtc1 = Rtc::builder().set_rtp_mode(true).build(); + let rtc2 = Rtc::builder().set_reordering_size_audio(0).build(); + + let (mut l, mut r) = connect_l_r_with_rtc(rtc1, rtc2); + + let mid = "audio".into(); + let ssrc_tx: Ssrc = 1337.into(); + + l.direct_api().declare_media(mid, MediaKind::Audio); + l.direct_api().declare_stream_tx(ssrc_tx, None, mid, None); + r.direct_api().declare_media(mid, MediaKind::Audio); + + let max = l.last.max(r.last); + l.last = max; + r.last = max; + + let params = l.params_opus(); + let ssrc = l.direct_api().stream_tx_by_mid(mid, None).unwrap().ssrc(); + assert_eq!(params.spec().codec, Codec::Opus); + let pt = params.pt(); + + let to_write: Vec<&[u8]> = vec![ + // 1 + &[0x1, 0x2, 0x3, 0x4], + // 3 + &[0x9, 0xa, 0xb, 0xc], + // 2 + &[0x5, 0x6, 0x7, 0x8], + ]; + + let mut to_write: VecDeque<_> = to_write.into(); + + let mut write_at = l.last + Duration::from_millis(300); + + let mut counts: Vec = vec![0, 3, 1]; + + loop { + if l.start + l.duration() > write_at { + write_at = l.last + Duration::from_millis(300); + if let Some(packet) = to_write.pop_front() { + let wallclock = l.start + l.duration(); + + let mut direct = l.direct_api(); + let stream = direct.stream_tx(&ssrc).unwrap(); + + let count = counts.remove(0); + let time = (count * 1000 + 47_000_000) as u32; + let seq_no = (47_000 + count).into(); + + let exts = ExtensionValues { + audio_level: Some(-42 - count as i8), + voice_activity: Some(false), + ..Default::default() + }; + + stream + .write_rtp( + pt, + seq_no, + time, + wallclock, + *seq_no % 2 == 0, // set marker bit on every second packet + exts, + false, + packet.to_vec(), + ) + .expect("clean write"); + } + } + + progress(&mut l, &mut r)?; + + if l.duration() > Duration::from_secs(10) { + break; + } + } + + let media: Vec<_> = r + .events + .iter() + .filter_map(|(_, e)| { + if let Event::MediaData(v) = e { + Some(v) + } else { + None + } + }) + .collect(); + + for m in media { + assert!(m.audio_start_of_talk_spurt == (**m.seq_range.start() % 2 == 0)); + } + + Ok(()) +} + +#[test] +pub fn audio_start_of_talk_spurt_rtp() -> Result<(), RtcError> { + init_log(); + + let mut rtc1_config = Rtc::builder().clear_codecs(); + let codec_config = rtc1_config.codec_config(); + + // add a custom opus config with dtx flag set to true + codec_config.add_config( + 111.into(), + None, + Codec::Opus, + Frequency::FORTY_EIGHT_KHZ, + Some(2), + FormatParams { + min_p_time: Some(10), + use_inband_fec: Some(true), + use_dtx: Some(true), + ..Default::default() + }, + ); + + let rtc1 = rtc1_config.build(); + + let rtc2 = Rtc::builder() + .set_reordering_size_audio(0) + .set_rtp_mode(true) + .build(); + + let (mut l, mut r) = connect_l_r_with_rtc(rtc1, rtc2); + + let mid = "audio".into(); + let ssrc_tx: Ssrc = 1337.into(); + + l.direct_api().declare_media(mid, MediaKind::Audio); + l.direct_api().declare_stream_tx(ssrc_tx, None, mid, None); + r.direct_api().declare_media(mid, MediaKind::Audio); + + let max = l.last.max(r.last); + l.last = max; + r.last = max; + + let params = l.params_opus(); + + assert_eq!(params.spec().codec, Codec::Opus); + let pt = params.pt(); + + let to_write: Vec<&[u8]> = vec![ + // 1 + &[0x1], + // 2 + &[0x1, 0x2, 0x3, 0x4], + // 4 + &[0x9, 0xa, 0xb, 0xc], + // 3 + &[0x5, 0x6, 0x7, 0x8], + // 5 + &[0x1], + // 6 + &[0x9, 0xa, 0xb, 0xc], + ]; + + let mut to_write: VecDeque<_> = to_write.into(); + + let mut write_at = l.last + Duration::from_millis(300); + + let mut counts: Vec = vec![0, 1, 2, 4, 3, 5, 6]; + + loop { + if l.start + l.duration() > write_at { + write_at = l.last + Duration::from_millis(300); + if let Some(packet) = to_write.pop_front() { + let wallclock = l.start + l.duration(); + + let count = counts.remove(0); + let time = count * 1000 + 47_000_000; + + l.writer(mid) + .unwrap() + .write( + pt, + wallclock, + MediaTime::new(time, Frequency::FORTY_EIGHT_KHZ), + packet.to_vec(), + ) + .unwrap(); + } + } + + progress(&mut l, &mut r)?; + + if l.duration() > Duration::from_secs(10) { + break; + } + } + + let rtp_packets: Vec<_> = r + .events + .iter() + .filter_map(|(_, e)| { + if let Event::RtpPacket(p) = e { + Some(p) + } else { + None + } + }) + .collect(); + + assert_eq!(rtp_packets.len(), 6); + let is_marker = [false, true, false, false, false, true]; + + rtp_packets + .iter() + .enumerate() + .for_each(|(i, r)| assert_eq!(r.header.marker, is_marker[i])); + + Ok(()) +}