From ee23cbfdd7d99c88c7cdcd1eb0ca0c8ea4fb885b Mon Sep 17 00:00:00 2001 From: alexlapa Date: Tue, 24 Dec 2024 12:47:13 +0200 Subject: [PATCH 1/5] dedup adapter impl + test --- src/packet/bwe/mod.rs | 140 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 137 insertions(+), 3 deletions(-) diff --git a/src/packet/bwe/mod.rs b/src/packet/bwe/mod.rs index 2c7916f27..29603df79 100644 --- a/src/packet/bwe/mod.rs +++ b/src/packet/bwe/mod.rs @@ -5,6 +5,7 @@ //! not been ported, only a smaller part that corresponds roughly to the IETF draft is implemented. use std::cmp::Ordering; +use std::collections::VecDeque; use std::fmt; use std::time::{Duration, Instant}; @@ -38,6 +39,7 @@ pub struct SendSideBandwithEstimator { loss_controller: Option, acked_bitrate_estimator: AckedBitrateEstimator, started_at: Option, + acked_packets_deduper: HandledPacketsTracker<256> } impl SendSideBandwithEstimator { @@ -55,6 +57,7 @@ impl SendSideBandwithEstimator { BITRATE_WINDOW, ), started_at: None, + acked_packets_deduper: HandledPacketsTracker::default(), } } @@ -66,19 +69,23 @@ impl SendSideBandwithEstimator { ) { let _ = self.started_at.get_or_insert(now); - let send_records: Vec<_> = records.collect(); - let mut acked_packets = vec![]; + let send_records: Vec<_> = records.filter(|r| { + // Skip acked packets that have already been processed before. + !self.acked_packets_deduper.contains(r.seq()) + }).collect(); + let mut acked_packets = Vec::with_capacity(send_records.len()); let mut max_rtt = None; let mut count = 0; let mut lost = 0; for record in send_records.iter() { count += 1; - let Ok(acked_packet) = (*record).try_into() else { + let Ok(acked_packet) = AckedPacket::try_from(*record) else { lost += 1; continue; }; acked_packets.push(acked_packet); + self.acked_packets_deduper.add(acked_packet.seq_no); max_rtt = max_rtt.max(record.rtt()); } acked_packets.sort_by(AckedPacket::order_by_receive_time); @@ -231,3 +238,130 @@ impl fmt::Display for BandwidthUsage { } } } + +/// Sliding window [`SeqNo`]s storage. +/// +/// Only remembers the last [`SIZE`] packets added. +#[derive(Debug)] +struct HandledPacketsTracker { + /// Recently added packets. + history: [Option; SIZE], + + /// Queue that tracks added packets order so older packets are removed. + queue: VecDeque, +} + +impl HandledPacketsTracker { + /// Remembers the give [`SeqNo`]. + /// + /// Expects somewhat sequential data with reordering no more than configured + /// [`SIZE`]. + fn add(&mut self, seq: SeqNo) { + let seq = seq.as_u16(); + let history_idx = seq as usize % SIZE; + + self.queue.push_back(seq); + if self.queue.len() == SIZE { + let to_remove = self.queue.pop_front().unwrap(); + let remove_idx = to_remove as usize % SIZE; + if self.history[remove_idx] == Some(seq) { + self.history[remove_idx] = None; + } + } + self.history[history_idx] = Some(seq); + } + + /// Checks if provided [`SeqNo`] has been seen in the window. + fn contains(&self, mut seq: SeqNo) -> bool { + let seq = seq.as_u16(); + let history_idx = seq as usize % SIZE; + + self.history[history_idx] == Some(seq) + } +} + +impl Default for HandledPacketsTracker { + fn default() -> Self { + Self { + history: [None; SIZE], + queue: VecDeque::with_capacity(SIZE), + } + } +} + +#[cfg(test)] +mod test { + use std::time::{Duration, Instant}; + + use crate::rtp_::{TwccRecvRegister, TwccSendRegister}; + + use super::AckedPacket; + + #[test] + fn libwebrtc_captured() { + // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 1, send_time_ms = 41013423 + // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 2, send_time_ms = 41013443 + // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 3, send_time_ms = 41013464 + // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 4, send_time_ms = 41013484 + // (rtp_transport_controller_send.cc:652): RtpTransportControllerSend::OnTransportFeedback: new TWCC received: base_seq = 1, status_count = 4, feedback_seq = 0, received_packets = [1, 2, 4], receive_time = 41013500160 + // (acknowledged_bitrate_estimator.cc:69): AcknowledgedBitrateEstimator::IncomingPacketFeedbackVector: received_packets = [{ seq = 1, recv_time = 41013547, send_time = 41013423}, { seq = 2, recv_time = 41013568, send_time = 41013443}, { seq = 4, recv_time = 41013608, send_time = 41013484}] + + // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 5, send_time_ms = 41013504 + // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 6, send_time_ms = 41013524 + // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 7, send_time_ms = 41013544 + // (rtp_transport_controller_send.cc:652): RtpTransportControllerSend::OnTransportFeedback: new TWCC received: base_seq = 3, status_count = 5, feedback_seq = 1, received_packets = [3, 4, 7], receive_time = 41013562660 + // (acknowledged_bitrate_estimator.cc:69): AcknowledgedBitrateEstimator::IncomingPacketFeedbackVector: received_packets = [{ seq = 3, recv_time = 41013638, send_time = 41013464}, { seq = 7, recv_time = 41013669, send_time = 41013544}] + + // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 8, send_time_ms = 41013565 + // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 9, send_time_ms = 41013585 + // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 10, send_time_ms = 41013605 + // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 11, send_time_ms = 41013625 + // (rtp_transport_controller_send.cc:652): RtpTransportControllerSend::OnTransportFeedback: new TWCC received: base_seq = 5, status_count = 6, feedback_seq = 2, received_packets = [5, 6, 7, 8, 9, 10], receive_time = 41013639138 + // (acknowledged_bitrate_estimator.cc:69): AcknowledgedBitrateEstimator::IncomingPacketFeedbackVector: received_packets = [{ seq = 5, recv_time = 41013705, send_time = 41013504}, { seq = 9, recv_time = 41013716, send_time = 41013585}, { seq = 6, recv_time = 41013722, send_time = 41013524}, { seq = 8, recv_time = 41013729, send_time = 41013565}, { seq = 10, recv_time = 41013729, send_time = 41013605}] + + let now = Instant::now(); + let mut twcc_gen = TwccRecvRegister::new(1000); + let mut twcc_handler = TwccSendRegister::new(1000); + + twcc_handler.register_seq(1.into(), now + Duration::from_millis(41013423), 0); + twcc_handler.register_seq(2.into(), now + Duration::from_millis(41013443), 0); + twcc_handler.register_seq(3.into(), now + Duration::from_millis(41013464), 0); + twcc_handler.register_seq(4.into(), now + Duration::from_millis(41013484), 0); + + { + let range = twcc_handler.apply_report( + { + twcc_gen.update_seq(1.into(), now + Duration::from_millis(41013423)); + twcc_gen.update_seq(2.into(), now + Duration::from_millis(41013568)); + twcc_gen.update_seq(4.into(), now + Duration::from_millis(41013608)); + twcc_gen.build_report(10_000).unwrap() + }, + now + Duration::from_micros(41013500160)).unwrap(); + + let mut acked_packets = twcc_handler.send_records(range).unwrap().filter_map(|r|AckedPacket::try_from(r).ok()).collect::>(); + acked_packets.sort_by(AckedPacket::order_by_receive_time); + let acked_packets: Vec<_> = acked_packets.into_iter().map(|p|p.seq_no.as_u16()).collect(); + assert_eq!(acked_packets, [1, 2, 4]); + } + + twcc_handler.register_seq(5.into(), now + Duration::from_millis(41013504), 0); + twcc_handler.register_seq(6.into(), now + Duration::from_millis(41013524), 0); + twcc_handler.register_seq(7.into(), now + Duration::from_millis(41013544), 0); + + { + let range = twcc_handler.apply_report( + { + twcc_gen.update_seq(3.into(), now + Duration::from_millis(41013638)); + twcc_gen.update_seq(7.into(), now + Duration::from_millis(41013669)); + twcc_gen.build_report(10_000).unwrap() + }, + now + Duration::from_micros(41013562660)).unwrap(); + + let mut acked_packets = twcc_handler.send_records(range).unwrap().filter_map(|r|AckedPacket::try_from(r).ok()).collect::>(); + acked_packets.sort_by(AckedPacket::order_by_receive_time); + let acked_packets: Vec<_> = acked_packets.into_iter().map(|p|p.seq_no.as_u16()).collect(); + assert_eq!(acked_packets, [3, 7]); + } + } +} + From a0e8c1b6c781261f6ad45acb82c7715d0d5a1a10 Mon Sep 17 00:00:00 2001 From: alexlapa Date: Tue, 24 Dec 2024 13:37:25 +0200 Subject: [PATCH 2/5] fmt and clippy --- src/packet/bwe/mod.rs | 77 +++++++++++++++++++++++++++---------------- 1 file changed, 49 insertions(+), 28 deletions(-) diff --git a/src/packet/bwe/mod.rs b/src/packet/bwe/mod.rs index 29603df79..e82672032 100644 --- a/src/packet/bwe/mod.rs +++ b/src/packet/bwe/mod.rs @@ -39,7 +39,7 @@ pub struct SendSideBandwithEstimator { loss_controller: Option, acked_bitrate_estimator: AckedBitrateEstimator, started_at: Option, - acked_packets_deduper: HandledPacketsTracker<256> + acked_packets_deduper: HandledPacketsTracker<256>, } impl SendSideBandwithEstimator { @@ -69,10 +69,12 @@ impl SendSideBandwithEstimator { ) { let _ = self.started_at.get_or_insert(now); - let send_records: Vec<_> = records.filter(|r| { - // Skip acked packets that have already been processed before. - !self.acked_packets_deduper.contains(r.seq()) - }).collect(); + let send_records: Vec<_> = records + .filter(|r| { + // Skip acked packets that have already been processed before. + !self.acked_packets_deduper.contains(r.seq()) + }) + .collect(); let mut acked_packets = Vec::with_capacity(send_records.len()); let mut max_rtt = None; @@ -272,7 +274,7 @@ impl HandledPacketsTracker { } /// Checks if provided [`SeqNo`] has been seen in the window. - fn contains(&self, mut seq: SeqNo) -> bool { + fn contains(&self, seq: SeqNo) -> bool { let seq = seq.as_u16(); let history_idx = seq as usize % SIZE; @@ -329,18 +331,28 @@ mod test { twcc_handler.register_seq(4.into(), now + Duration::from_millis(41013484), 0); { - let range = twcc_handler.apply_report( - { - twcc_gen.update_seq(1.into(), now + Duration::from_millis(41013423)); - twcc_gen.update_seq(2.into(), now + Duration::from_millis(41013568)); - twcc_gen.update_seq(4.into(), now + Duration::from_millis(41013608)); - twcc_gen.build_report(10_000).unwrap() - }, - now + Duration::from_micros(41013500160)).unwrap(); - - let mut acked_packets = twcc_handler.send_records(range).unwrap().filter_map(|r|AckedPacket::try_from(r).ok()).collect::>(); + let range = twcc_handler + .apply_report( + { + twcc_gen.update_seq(1.into(), now + Duration::from_millis(41013423)); + twcc_gen.update_seq(2.into(), now + Duration::from_millis(41013568)); + twcc_gen.update_seq(4.into(), now + Duration::from_millis(41013608)); + twcc_gen.build_report(10_000).unwrap() + }, + now + Duration::from_micros(41013500160), + ) + .unwrap(); + + let mut acked_packets = twcc_handler + .send_records(range) + .unwrap() + .filter_map(|r| AckedPacket::try_from(r).ok()) + .collect::>(); acked_packets.sort_by(AckedPacket::order_by_receive_time); - let acked_packets: Vec<_> = acked_packets.into_iter().map(|p|p.seq_no.as_u16()).collect(); + let acked_packets: Vec<_> = acked_packets + .into_iter() + .map(|p| p.seq_no.as_u16()) + .collect(); assert_eq!(acked_packets, [1, 2, 4]); } @@ -349,19 +361,28 @@ mod test { twcc_handler.register_seq(7.into(), now + Duration::from_millis(41013544), 0); { - let range = twcc_handler.apply_report( - { - twcc_gen.update_seq(3.into(), now + Duration::from_millis(41013638)); - twcc_gen.update_seq(7.into(), now + Duration::from_millis(41013669)); - twcc_gen.build_report(10_000).unwrap() - }, - now + Duration::from_micros(41013562660)).unwrap(); - - let mut acked_packets = twcc_handler.send_records(range).unwrap().filter_map(|r|AckedPacket::try_from(r).ok()).collect::>(); + let range = twcc_handler + .apply_report( + { + twcc_gen.update_seq(3.into(), now + Duration::from_millis(41013638)); + twcc_gen.update_seq(7.into(), now + Duration::from_millis(41013669)); + twcc_gen.build_report(10_000).unwrap() + }, + now + Duration::from_micros(41013562660), + ) + .unwrap(); + + let mut acked_packets = twcc_handler + .send_records(range) + .unwrap() + .filter_map(|r| AckedPacket::try_from(r).ok()) + .collect::>(); acked_packets.sort_by(AckedPacket::order_by_receive_time); - let acked_packets: Vec<_> = acked_packets.into_iter().map(|p|p.seq_no.as_u16()).collect(); + let acked_packets: Vec<_> = acked_packets + .into_iter() + .map(|p| p.seq_no.as_u16()) + .collect(); assert_eq!(acked_packets, [3, 7]); } } } - From 65c544620c40aa4e2d534feb80ea6afaebec1180 Mon Sep 17 00:00:00 2001 From: alexlapa Date: Mon, 30 Dec 2024 16:52:46 +0200 Subject: [PATCH 3/5] rework to use bitvec --- src/packet/bwe/mod.rs | 79 +++++++++++++++++++++++++++---------------- 1 file changed, 49 insertions(+), 30 deletions(-) diff --git a/src/packet/bwe/mod.rs b/src/packet/bwe/mod.rs index e82672032..d87105942 100644 --- a/src/packet/bwe/mod.rs +++ b/src/packet/bwe/mod.rs @@ -5,8 +5,8 @@ //! not been ported, only a smaller part that corresponds roughly to the IETF draft is implemented. use std::cmp::Ordering; -use std::collections::VecDeque; use std::fmt; +use std::ops::{Deref, RangeInclusive}; use std::time::{Duration, Instant}; use crate::rtp_::{Bitrate, DataSize, SeqNo, TwccSendRecord}; @@ -39,7 +39,7 @@ pub struct SendSideBandwithEstimator { loss_controller: Option, acked_bitrate_estimator: AckedBitrateEstimator, started_at: Option, - acked_packets_deduper: HandledPacketsTracker<256>, + acked_packets_deduper: HandledPacketsTracker<64>, } impl SendSideBandwithEstimator { @@ -241,52 +241,71 @@ impl fmt::Display for BandwidthUsage { } } -/// Sliding window [`SeqNo`]s storage. +/// Sliding window [`SeqNo`]s tracker. /// -/// Only remembers the last [`SIZE`] packets added. -#[derive(Debug)] +/// [`SIZE`] is the number of bytes in underling bitvector, so the actual window +/// size is [`SIZE`] * 8. struct HandledPacketsTracker { - /// Recently added packets. - history: [Option; SIZE], + /// Range of currently tracked [`SeqNo`]s. + window: RangeInclusive, - /// Queue that tracks added packets order so older packets are removed. - queue: VecDeque, + /// Bit vector of recently added packets. + history: [u8; SIZE], } impl HandledPacketsTracker { + /// Tracked [`SeqNo`]s window size. + const WINDOW_SIZE: usize = SIZE * 8; + /// Remembers the give [`SeqNo`]. /// - /// Expects somewhat sequential data with reordering no more than configured - /// [`SIZE`]. - fn add(&mut self, seq: SeqNo) { - let seq = seq.as_u16(); - let history_idx = seq as usize % SIZE; - - self.queue.push_back(seq); - if self.queue.len() == SIZE { - let to_remove = self.queue.pop_front().unwrap(); - let remove_idx = to_remove as usize % SIZE; - if self.history[remove_idx] == Some(seq) { - self.history[remove_idx] = None; - } + /// Expects somewhat sequential data since window always advances to hold + /// latest added value forgetting older ones. + pub fn add(&mut self, seq: SeqNo) { + self.maybe_advance_window(seq); + + let (byte_idx, bit_idx) = self.pos_of_seq(seq); + self.history[byte_idx] |= 1 << bit_idx; + } + + /// Checks if the provided [`SeqNo`] has been seen in the window. + pub fn contains(&self, seq: SeqNo) -> bool { + if self.window.contains(&seq) { + let (byte_idx, bit_idx) = self.pos_of_seq(seq); + (self.history[byte_idx] & (1 << bit_idx)) != 0 + } else { + false + } + } + + /// Advances the window to include the given [`SeqNo`]. + fn maybe_advance_window(&mut self, new_max_seq: SeqNo) { + if new_max_seq <= *self.window.end() { + return; + } + // Clear newly included bits + for i in **self.window.end() + 1..*new_max_seq { + let (byte_idx, bit_idx) = self.pos_of_seq(&i); + self.history[byte_idx] &= !(1 << bit_idx); } - self.history[history_idx] = Some(seq); + let new_start = new_max_seq.saturating_sub(Self::WINDOW_SIZE as u64); + self.window = RangeInclusive::new(SeqNo::from(new_start), new_max_seq); } - /// Checks if provided [`SeqNo`] has been seen in the window. - fn contains(&self, seq: SeqNo) -> bool { - let seq = seq.as_u16(); - let history_idx = seq as usize % SIZE; + /// Maps a given sequence number to its position in the bit vector. + fn pos_of_seq(&self, seq: impl Deref) -> (usize, u8) { + let byte_idx = (*seq / 8) as usize % self.history.len(); + let bit_idx = (*seq % 8) as u8; - self.history[history_idx] == Some(seq) + (byte_idx, bit_idx) } } impl Default for HandledPacketsTracker { fn default() -> Self { Self { - history: [None; SIZE], - queue: VecDeque::with_capacity(SIZE), + window: RangeInclusive::new(SeqNo::from(0), SeqNo::from(Self::WINDOW_SIZE as u64)), + history: [0; SIZE], } } } From 4e02b3881acad24eb439e09c65fafd4b4d4b6b76 Mon Sep 17 00:00:00 2001 From: alexlapa Date: Mon, 6 Jan 2025 10:29:39 +0200 Subject: [PATCH 4/5] sync with mainline, review corrections --- src/packet/bwe/mod.rs | 124 ++++-------------------------------------- 1 file changed, 12 insertions(+), 112 deletions(-) diff --git a/src/packet/bwe/mod.rs b/src/packet/bwe/mod.rs index d87105942..7f2a469bd 100644 --- a/src/packet/bwe/mod.rs +++ b/src/packet/bwe/mod.rs @@ -39,7 +39,7 @@ pub struct SendSideBandwithEstimator { loss_controller: Option, acked_bitrate_estimator: AckedBitrateEstimator, started_at: Option, - acked_packets_deduper: HandledPacketsTracker<64>, + acked_packets_deduper: HandledPacketsTracker, } impl SendSideBandwithEstimator { @@ -75,7 +75,7 @@ impl SendSideBandwithEstimator { !self.acked_packets_deduper.contains(r.seq()) }) .collect(); - let mut acked_packets = Vec::with_capacity(send_records.len()); + let mut acked_packets = vec![]; let mut max_rtt = None; let mut count = 0; @@ -242,21 +242,15 @@ impl fmt::Display for BandwidthUsage { } /// Sliding window [`SeqNo`]s tracker. -/// -/// [`SIZE`] is the number of bytes in underling bitvector, so the actual window -/// size is [`SIZE`] * 8. -struct HandledPacketsTracker { +struct HandledPacketsTracker { /// Range of currently tracked [`SeqNo`]s. window: RangeInclusive, - /// Bit vector of recently added packets. - history: [u8; SIZE], + /// Bit array of recently added packets. + history: [u8; 64], } -impl HandledPacketsTracker { - /// Tracked [`SeqNo`]s window size. - const WINDOW_SIZE: usize = SIZE * 8; - +impl HandledPacketsTracker { /// Remembers the give [`SeqNo`]. /// /// Expects somewhat sequential data since window always advances to hold @@ -288,7 +282,7 @@ impl HandledPacketsTracker { let (byte_idx, bit_idx) = self.pos_of_seq(&i); self.history[byte_idx] &= !(1 << bit_idx); } - let new_start = new_max_seq.saturating_sub(Self::WINDOW_SIZE as u64); + let new_start = new_max_seq.saturating_sub(self.history.len() as u64 * 8); self.window = RangeInclusive::new(SeqNo::from(new_start), new_max_seq); } @@ -301,107 +295,13 @@ impl HandledPacketsTracker { } } -impl Default for HandledPacketsTracker { +impl Default for HandledPacketsTracker { fn default() -> Self { - Self { - window: RangeInclusive::new(SeqNo::from(0), SeqNo::from(Self::WINDOW_SIZE as u64)), - history: [0; SIZE], - } - } -} - -#[cfg(test)] -mod test { - use std::time::{Duration, Instant}; - - use crate::rtp_::{TwccRecvRegister, TwccSendRegister}; - - use super::AckedPacket; - - #[test] - fn libwebrtc_captured() { - // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 1, send_time_ms = 41013423 - // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 2, send_time_ms = 41013443 - // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 3, send_time_ms = 41013464 - // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 4, send_time_ms = 41013484 - // (rtp_transport_controller_send.cc:652): RtpTransportControllerSend::OnTransportFeedback: new TWCC received: base_seq = 1, status_count = 4, feedback_seq = 0, received_packets = [1, 2, 4], receive_time = 41013500160 - // (acknowledged_bitrate_estimator.cc:69): AcknowledgedBitrateEstimator::IncomingPacketFeedbackVector: received_packets = [{ seq = 1, recv_time = 41013547, send_time = 41013423}, { seq = 2, recv_time = 41013568, send_time = 41013443}, { seq = 4, recv_time = 41013608, send_time = 41013484}] - - // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 5, send_time_ms = 41013504 - // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 6, send_time_ms = 41013524 - // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 7, send_time_ms = 41013544 - // (rtp_transport_controller_send.cc:652): RtpTransportControllerSend::OnTransportFeedback: new TWCC received: base_seq = 3, status_count = 5, feedback_seq = 1, received_packets = [3, 4, 7], receive_time = 41013562660 - // (acknowledged_bitrate_estimator.cc:69): AcknowledgedBitrateEstimator::IncomingPacketFeedbackVector: received_packets = [{ seq = 3, recv_time = 41013638, send_time = 41013464}, { seq = 7, recv_time = 41013669, send_time = 41013544}] - - // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 8, send_time_ms = 41013565 - // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 9, send_time_ms = 41013585 - // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 10, send_time_ms = 41013605 - // (transport_feedback_adapter.cc:115): ProcessSentPacket packet_id = 11, send_time_ms = 41013625 - // (rtp_transport_controller_send.cc:652): RtpTransportControllerSend::OnTransportFeedback: new TWCC received: base_seq = 5, status_count = 6, feedback_seq = 2, received_packets = [5, 6, 7, 8, 9, 10], receive_time = 41013639138 - // (acknowledged_bitrate_estimator.cc:69): AcknowledgedBitrateEstimator::IncomingPacketFeedbackVector: received_packets = [{ seq = 5, recv_time = 41013705, send_time = 41013504}, { seq = 9, recv_time = 41013716, send_time = 41013585}, { seq = 6, recv_time = 41013722, send_time = 41013524}, { seq = 8, recv_time = 41013729, send_time = 41013565}, { seq = 10, recv_time = 41013729, send_time = 41013605}] - - let now = Instant::now(); - let mut twcc_gen = TwccRecvRegister::new(1000); - let mut twcc_handler = TwccSendRegister::new(1000); - - twcc_handler.register_seq(1.into(), now + Duration::from_millis(41013423), 0); - twcc_handler.register_seq(2.into(), now + Duration::from_millis(41013443), 0); - twcc_handler.register_seq(3.into(), now + Duration::from_millis(41013464), 0); - twcc_handler.register_seq(4.into(), now + Duration::from_millis(41013484), 0); - - { - let range = twcc_handler - .apply_report( - { - twcc_gen.update_seq(1.into(), now + Duration::from_millis(41013423)); - twcc_gen.update_seq(2.into(), now + Duration::from_millis(41013568)); - twcc_gen.update_seq(4.into(), now + Duration::from_millis(41013608)); - twcc_gen.build_report(10_000).unwrap() - }, - now + Duration::from_micros(41013500160), - ) - .unwrap(); - - let mut acked_packets = twcc_handler - .send_records(range) - .unwrap() - .filter_map(|r| AckedPacket::try_from(r).ok()) - .collect::>(); - acked_packets.sort_by(AckedPacket::order_by_receive_time); - let acked_packets: Vec<_> = acked_packets - .into_iter() - .map(|p| p.seq_no.as_u16()) - .collect(); - assert_eq!(acked_packets, [1, 2, 4]); - } + let history = [0; 64]; - twcc_handler.register_seq(5.into(), now + Duration::from_millis(41013504), 0); - twcc_handler.register_seq(6.into(), now + Duration::from_millis(41013524), 0); - twcc_handler.register_seq(7.into(), now + Duration::from_millis(41013544), 0); - - { - let range = twcc_handler - .apply_report( - { - twcc_gen.update_seq(3.into(), now + Duration::from_millis(41013638)); - twcc_gen.update_seq(7.into(), now + Duration::from_millis(41013669)); - twcc_gen.build_report(10_000).unwrap() - }, - now + Duration::from_micros(41013562660), - ) - .unwrap(); - - let mut acked_packets = twcc_handler - .send_records(range) - .unwrap() - .filter_map(|r| AckedPacket::try_from(r).ok()) - .collect::>(); - acked_packets.sort_by(AckedPacket::order_by_receive_time); - let acked_packets: Vec<_> = acked_packets - .into_iter() - .map(|p| p.seq_no.as_u16()) - .collect(); - assert_eq!(acked_packets, [3, 7]); + Self { + window: RangeInclusive::new(SeqNo::from(0), SeqNo::from(history.len() as u64 * 8)), + history, } } } From e9b0bccf71014afc754aee9ed4be01c480c5f402 Mon Sep 17 00:00:00 2001 From: alexlapa Date: Wed, 8 Jan 2025 11:22:16 +0200 Subject: [PATCH 5/5] #605 based solution --- CHANGELOG.md | 1 + src/packet/bwe/mod.rs | 78 +----------------- src/rtp/rtcp/twcc.rs | 181 ++++++++++++++++++++++++++++++------------ src/session.rs | 10 +-- 4 files changed, 138 insertions(+), 132 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 31c107cdd..ec7b532bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ * Do not disconnect whilst we still check new candidates #489 * Ensure lexical ordering of SDP-formatted candidates follows priority #557 * Limit TWCC iteration with packet status count #606 + * Dedupe acked packets from `TwccSendRegister::apply_report()` #601, #605 # 0.6.2 diff --git a/src/packet/bwe/mod.rs b/src/packet/bwe/mod.rs index 7f2a469bd..2c7916f27 100644 --- a/src/packet/bwe/mod.rs +++ b/src/packet/bwe/mod.rs @@ -6,7 +6,6 @@ use std::cmp::Ordering; use std::fmt; -use std::ops::{Deref, RangeInclusive}; use std::time::{Duration, Instant}; use crate::rtp_::{Bitrate, DataSize, SeqNo, TwccSendRecord}; @@ -39,7 +38,6 @@ pub struct SendSideBandwithEstimator { loss_controller: Option, acked_bitrate_estimator: AckedBitrateEstimator, started_at: Option, - acked_packets_deduper: HandledPacketsTracker, } impl SendSideBandwithEstimator { @@ -57,7 +55,6 @@ impl SendSideBandwithEstimator { BITRATE_WINDOW, ), started_at: None, - acked_packets_deduper: HandledPacketsTracker::default(), } } @@ -69,12 +66,7 @@ impl SendSideBandwithEstimator { ) { let _ = self.started_at.get_or_insert(now); - let send_records: Vec<_> = records - .filter(|r| { - // Skip acked packets that have already been processed before. - !self.acked_packets_deduper.contains(r.seq()) - }) - .collect(); + let send_records: Vec<_> = records.collect(); let mut acked_packets = vec![]; let mut max_rtt = None; @@ -82,12 +74,11 @@ impl SendSideBandwithEstimator { let mut lost = 0; for record in send_records.iter() { count += 1; - let Ok(acked_packet) = AckedPacket::try_from(*record) else { + let Ok(acked_packet) = (*record).try_into() else { lost += 1; continue; }; acked_packets.push(acked_packet); - self.acked_packets_deduper.add(acked_packet.seq_no); max_rtt = max_rtt.max(record.rtt()); } acked_packets.sort_by(AckedPacket::order_by_receive_time); @@ -240,68 +231,3 @@ impl fmt::Display for BandwidthUsage { } } } - -/// Sliding window [`SeqNo`]s tracker. -struct HandledPacketsTracker { - /// Range of currently tracked [`SeqNo`]s. - window: RangeInclusive, - - /// Bit array of recently added packets. - history: [u8; 64], -} - -impl HandledPacketsTracker { - /// Remembers the give [`SeqNo`]. - /// - /// Expects somewhat sequential data since window always advances to hold - /// latest added value forgetting older ones. - pub fn add(&mut self, seq: SeqNo) { - self.maybe_advance_window(seq); - - let (byte_idx, bit_idx) = self.pos_of_seq(seq); - self.history[byte_idx] |= 1 << bit_idx; - } - - /// Checks if the provided [`SeqNo`] has been seen in the window. - pub fn contains(&self, seq: SeqNo) -> bool { - if self.window.contains(&seq) { - let (byte_idx, bit_idx) = self.pos_of_seq(seq); - (self.history[byte_idx] & (1 << bit_idx)) != 0 - } else { - false - } - } - - /// Advances the window to include the given [`SeqNo`]. - fn maybe_advance_window(&mut self, new_max_seq: SeqNo) { - if new_max_seq <= *self.window.end() { - return; - } - // Clear newly included bits - for i in **self.window.end() + 1..*new_max_seq { - let (byte_idx, bit_idx) = self.pos_of_seq(&i); - self.history[byte_idx] &= !(1 << bit_idx); - } - let new_start = new_max_seq.saturating_sub(self.history.len() as u64 * 8); - self.window = RangeInclusive::new(SeqNo::from(new_start), new_max_seq); - } - - /// Maps a given sequence number to its position in the bit vector. - fn pos_of_seq(&self, seq: impl Deref) -> (usize, u8) { - let byte_idx = (*seq / 8) as usize % self.history.len(); - let bit_idx = (*seq % 8) as u8; - - (byte_idx, bit_idx) - } -} - -impl Default for HandledPacketsTracker { - fn default() -> Self { - let history = [0; 64]; - - Self { - window: RangeInclusive::new(SeqNo::from(0), SeqNo::from(history.len() as u64 * 8)), - history, - } - } -} diff --git a/src/rtp/rtcp/twcc.rs b/src/rtp/rtcp/twcc.rs index a61f71c76..f5947b774 100644 --- a/src/rtp/rtcp/twcc.rs +++ b/src/rtp/rtcp/twcc.rs @@ -227,7 +227,7 @@ pub struct TwccRecvRegister { /// /// Once the queue has some content, we will always keep at least one entry to "remember" for the /// next report. - queue: VecDeque, + queue: VecDeque, /// Index into queue from where we start reporting on next build_report(). report_from: usize, @@ -252,7 +252,7 @@ pub struct TwccRecvRegister { } #[derive(Debug, Clone, Copy, PartialEq, Eq)] -struct Receiption { +struct Receipt { seq: SeqNo, time: Instant, } @@ -294,7 +294,7 @@ impl TwccRecvRegister { } } - self.queue.insert(idx, Receiption { seq, time }); + self.queue.insert(idx, Receipt { seq, time }); if idx < self.report_from { self.report_from = idx; @@ -500,7 +500,7 @@ impl TwccRecvRegister { /// Interims are deltas between `Receiption` which is an intermediary format before /// we populate the Twcc report. fn build_interims( - queue: &VecDeque, + queue: &VecDeque, report_from: usize, base_seq: SeqNo, base_time: Instant, @@ -1005,6 +1005,10 @@ pub struct TwccSendRegister { /// 0 offset for remote time in Twcc structs. time_zero: Option, + /// Counter of invocations of apply_report. Used to identify + /// which TwccSendRecord resulted from each invocation. + apply_report_counter: u64, + /// Last registered Twcc number. last_registered: SeqNo, } @@ -1072,6 +1076,9 @@ pub struct TwccRecvReport { /// The remote time the other side received the seq. remote_recv_time: Option, + + /// The invocation count of apply_report(). Used for filtering. + apply_report_counter: u64, } impl TwccSendRegister { @@ -1080,6 +1087,7 @@ impl TwccSendRegister { keep, queue: VecDeque::new(), time_zero: None, + apply_report_counter: 0, last_registered: 0.into(), } } @@ -1102,13 +1110,20 @@ impl TwccSendRegister { /// Apply a TWCC RTCP report. /// - /// Returns a range of the sequence numbers for the applied packets if the report was - /// successfully applied. - pub fn apply_report(&mut self, twcc: Twcc, now: Instant) -> Option> { + /// Returns iterator over [`TwccSendRecord`]s included in the given [`Twcc`] + /// except for ones that was already acked and returned before. + pub fn apply_report( + &mut self, + twcc: Twcc, + now: Instant, + ) -> Option> { if self.time_zero.is_none() { self.time_zero = Some(now); } + self.apply_report_counter += 1; + let apply_report_counter = self.apply_report_counter; + let time_zero = self.time_zero.unwrap(); let head_seq = self.queue.front().map(|r| r.seq)?; @@ -1124,14 +1139,30 @@ impl TwccSendRegister { now: Instant, r: &mut TwccSendRecord, seq: SeqNo, - instant: Option, + remote_recv_time: Option, + apply_report_counter: u64, ) -> bool { if r.seq != seq { return false; } + + let apply_report_counter = if let Some(rr) = r.recv_report { + // This packed was already acked and handled before so carry + // over previous apply_report_counter, so it won't be included + // in the current apply_report() call result. + rr.remote_recv_time + .map(|_| rr.apply_report_counter) + .unwrap_or_else(|| apply_report_counter) + } else { + apply_report_counter + }; + + // Carry over remote recv time if this packet was acked before. + let remote_recv_time = r.remote_recv_time().or(remote_recv_time); let recv_report = TwccRecvReport { local_recv_time: now, - remote_recv_time: instant, + remote_recv_time, + apply_report_counter, }; r.recv_report = Some(recv_report); @@ -1145,7 +1176,13 @@ impl TwccSendRegister { let mut problematic_seq = None; - if !update(now, first_record, first_seq_no, first_instant) { + if !update( + now, + first_record, + first_seq_no, + first_instant, + apply_report_counter, + ) { problematic_seq = Some((first_record.seq, first_seq_no)); } @@ -1155,7 +1192,7 @@ impl TwccSendRegister { break; } - if !update(now, record, seq, instant) { + if !update(now, record, seq, instant, apply_report_counter) { problematic_seq = Some((record.seq, seq)); } last_seq_no = seq; @@ -1170,13 +1207,29 @@ impl TwccSendRegister { ); } - Some(first_seq_no..=last_seq_no) - } + let first_index = self + .queue + .binary_search_by_key(&first_seq_no, |r| r.seq) + .expect("first_seq_no to be registered"); - pub fn send_record(&self, seq: SeqNo) -> Option<&TwccSendRecord> { - let index = self.queue.binary_search_by_key(&seq, |r| r.seq).ok()?; + let range = first_seq_no..=last_seq_no; - Some(&self.queue[index]) + Some( + TwccSendRecordsIter { + range: range.clone(), + index: first_index, + current: first_seq_no, + queue: &self.queue, + } + // We only want the records that were registered in this invocation of + // apply_report_counter(). This is to not double count in the BWE, + // which is the consumer of this returned iterator. + .filter(move |s| { + s.recv_report + .map(|r| r.apply_report_counter == apply_report_counter) + .unwrap_or_default() + }), + ) } /// Calculate the egress loss for given time window. @@ -1214,26 +1267,6 @@ impl TwccSendRegister { Some((lost as f32) / (total as f32)) } - - /// Get all send records in a range. - pub fn send_records( - &self, - range: RangeInclusive, - ) -> Option> { - let first_index = self - .queue - .binary_search_by_key(range.start(), |r| r.seq) - .ok()?; - - let current = *range.start(); - - Some(TwccSendRecordsIter { - range, - index: first_index, - current, - queue: &self.queue, - }) - } } #[derive()] @@ -1806,7 +1839,7 @@ mod test { ); now = now + Duration::from_millis(35); - reg.apply_report( + let iter = reg.apply_report( Twcc { sender_ssrc: Ssrc::new(), ssrc: Ssrc::new(), @@ -1829,15 +1862,12 @@ mod test { }, now, ); + let iter = iter.unwrap(); - for seq in 25..=27 { - let record = reg - .send_record(seq.into()) - .unwrap_or_else(|| panic!("Should have send record for seq {seq}")); - + for record in iter { assert!( record.recv_report.is_some(), - "Report should have recorded recv_report for {seq}" + "Report should have recorded recv_report" ); } } @@ -1962,7 +1992,7 @@ mod test { now = now + Duration::from_micros(15); } - let range = reg + let iter = reg .apply_report( Twcc { sender_ssrc: Ssrc::new(), @@ -1988,11 +2018,6 @@ mod test { ) .expect("apply_report to return Some(_)"); - assert_eq!(range, 0.into()..=7.into()); - - let iter = reg - .send_records(range) - .expect("send_records to return Some(_)"); assert_eq!( iter.map(|r| *r.seq).collect::>(), vec![0, 1, 2, 3, 4, 5, 6, 7] @@ -2009,6 +2034,7 @@ mod test { } now = now + Duration::from_millis(5); + #[allow(unused_must_use)] reg.apply_report( Twcc { sender_ssrc: Ssrc::new(), @@ -2077,4 +2103,61 @@ mod test { assert_eq!(reg.loss(), Some(4.0 / 10.0)); } + + #[test] + fn no_acked_duplicates_when_reordered() { + let now = Instant::now(); + let mut twcc_gen = TwccRecvRegister::new(1000); + let mut twcc_handler = TwccSendRegister::new(1000); + + twcc_handler.register_seq(1.into(), now + Duration::from_millis(1), 0); + twcc_handler.register_seq(2.into(), now + Duration::from_millis(2), 0); + twcc_handler.register_seq(3.into(), now + Duration::from_millis(3), 0); + twcc_handler.register_seq(4.into(), now + Duration::from_millis(4), 0); + + { + let acked_packets = twcc_handler + .apply_report( + { + // 3rd packet is delayed + twcc_gen.update_seq(1.into(), now + Duration::from_millis(5)); + twcc_gen.update_seq(2.into(), now + Duration::from_millis(6)); + twcc_gen.update_seq(4.into(), now + Duration::from_millis(7)); + twcc_gen.build_report(10_000).unwrap() + }, + now + Duration::from_millis(8), + ) + .unwrap() + .filter_map(|sr| sr.remote_recv_time().map(|_| sr.seq.as_u16())) + .collect::>(); + + assert_eq!(acked_packets, [1, 2, 4]); + } + + twcc_handler.register_seq(5.into(), now + Duration::from_millis(9), 0); + twcc_handler.register_seq(6.into(), now + Duration::from_millis(10), 0); + twcc_handler.register_seq(7.into(), now + Duration::from_millis(11), 0); + + { + let acked_packets = twcc_handler + .apply_report( + { + // So the receipt order is 1, 2, 4, 3, 7 + twcc_gen.update_seq(3.into(), now + Duration::from_millis(12)); + twcc_gen.update_seq(7.into(), now + Duration::from_millis(13)); + twcc_gen.build_report(10_000).unwrap() + }, + now + Duration::from_millis(14), + ) + .unwrap() + .filter_map(|sr| sr.remote_recv_time().map(|_| sr.seq.as_u16())) + .collect::>(); + + // 3 was delayed before and is acked now + // 4 is excluded since it was already returned from the previous call + // [5, 6] are delayed/lost + // 7 is acked in the last report + assert_eq!(acked_packets, [3, 7]); + } + } } diff --git a/src/session.rs b/src/session.rs index a78c69ee5..21435d6a8 100644 --- a/src/session.rs +++ b/src/session.rs @@ -517,14 +517,10 @@ impl Session { for fb in RtcpFb::from_rtcp(self.feedback_rx.drain(..)) { if let RtcpFb::Twcc(twcc) = fb { trace!("Handle TWCC: {:?}", twcc); - let range = self.twcc_tx_register.apply_report(twcc, now); + let maybe_records = self.twcc_tx_register.apply_report(twcc, now); - if let Some(bwe) = &mut self.bwe { - let records = range.and_then(|range| self.twcc_tx_register.send_records(range)); - - if let Some(records) = records { - bwe.update(records, now); - } + if let (Some(maybe_records), Some(bwe)) = (maybe_records, &mut self.bwe) { + bwe.update(maybe_records, now); } need_configure_pacer = true;