Skip to content

Commit 11c25ee

Browse files
committed
Don't retain RR loss info when stats are disabled
This prevents unbounded memory growth of the Vec that holds onto packet loss records derived from RTCP Receiver Reports, if the method that would drain that Vec will never get called.
1 parent ecf1e78 commit 11c25ee

File tree

3 files changed

+50
-40
lines changed

3 files changed

+50
-40
lines changed

Diff for: src/session.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ impl Session {
131131
Session {
132132
id,
133133
medias: vec![],
134-
streams: Streams::default(),
134+
streams: Streams::new(config.stats_interval.is_some()),
135135
app: None,
136136
reordering_size_audio: config.reordering_size_audio,
137137
reordering_size_video: config.reordering_size_video,

Diff for: src/streams/mod.rs

+8-5
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,10 @@ pub(crate) struct Streams {
149149
/// Whether nack reports are enabled. This is an optimization to avoid too frequent
150150
/// Session::nack_at() when we don't need to send nacks.
151151
any_nack_active: Option<bool>,
152+
153+
/// Whether periodic statistics reports are expected to be generated. This informs us on
154+
/// whether we should be holding onto data needed for those reports or not.
155+
needs_stat_reports: bool,
152156
}
153157

154158
/// Delay between cleaning up the RxLookup.
@@ -164,8 +168,8 @@ struct RxLookup {
164168
last_used: Instant,
165169
}
166170

167-
impl Default for Streams {
168-
fn default() -> Self {
171+
impl Streams {
172+
pub(crate) fn new(needs_stat_reports: bool) -> Self {
169173
Self {
170174
streams_rx: Default::default(),
171175
rx_lookup: Default::default(),
@@ -174,11 +178,10 @@ impl Default for Streams {
174178
default_ssrc_tx: 0.into(), // this will be changed
175179
mids_to_report: Vec::with_capacity(10),
176180
any_nack_active: None,
181+
needs_stat_reports,
177182
}
178183
}
179-
}
180184

181-
impl Streams {
182185
pub(crate) fn map_dynamic_by_rid(
183186
&mut self,
184187
ssrc: Ssrc,
@@ -328,7 +331,7 @@ impl Streams {
328331
) -> &mut StreamTx {
329332
self.streams_tx
330333
.entry(ssrc)
331-
.or_insert_with(|| StreamTx::new(ssrc, rtx, midrid))
334+
.or_insert_with(|| StreamTx::new(ssrc, rtx, midrid, self.needs_stat_reports))
332335
}
333336

334337
pub fn remove_stream_tx(&mut self, ssrc: Ssrc) -> bool {

Diff for: src/streams/send.rs

+41-34
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ pub(crate) struct StreamTxStats {
153153
/// Can be null in case of missing or bad reports
154154
rtt: Option<f32>,
155155
/// losses collecter from RR (known packets, lost ratio)
156-
losses: Vec<(u64, f32)>,
156+
losses: Option<Vec<(u64, f32)>>,
157157

158158
/// `None` if `rtx_ratio_cap` is `None`.
159159
bytes_transmitted: Option<ValueHistory<u64>>,
@@ -162,26 +162,13 @@ pub(crate) struct StreamTxStats {
162162
bytes_retransmitted: Option<ValueHistory<u64>>,
163163
}
164164

165-
impl Default for StreamTxStats {
166-
fn default() -> Self {
167-
Self {
168-
bytes: 0,
169-
bytes_resent: 0,
170-
packets: 0,
171-
packets_resent: 0,
172-
firs: 0,
173-
plis: 0,
174-
nacks: 0,
175-
rtt: None,
176-
losses: Vec::default(),
177-
bytes_transmitted: Some(Default::default()),
178-
bytes_retransmitted: Some(Default::default()),
179-
}
180-
}
181-
}
182-
183165
impl StreamTx {
184-
pub(crate) fn new(ssrc: Ssrc, rtx: Option<Ssrc>, midrid: MidRid) -> Self {
166+
pub(crate) fn new(
167+
ssrc: Ssrc,
168+
rtx: Option<Ssrc>,
169+
midrid: MidRid,
170+
needs_stat_reports: bool,
171+
) -> Self {
185172
debug!("Create StreamTx for SSRC: {}", ssrc);
186173

187174
StreamTx {
@@ -205,7 +192,7 @@ impl StreamTx {
205192
last_sender_report: already_happened(),
206193
pending_request_keyframe: None,
207194
pending_request_remb: None,
208-
stats: StreamTxStats::default(),
195+
stats: StreamTxStats::new(needs_stat_reports),
209196
rtx_ratio: (0.0, already_happened()),
210197
pt_for_padding: None,
211198
remote_acked_ssrc: false,
@@ -1071,6 +1058,25 @@ impl StreamTx {
10711058
}
10721059

10731060
impl StreamTxStats {
1061+
fn new(needs_stat_reports: bool) -> Self {
1062+
Self {
1063+
bytes: 0,
1064+
bytes_resent: 0,
1065+
packets: 0,
1066+
packets_resent: 0,
1067+
firs: 0,
1068+
plis: 0,
1069+
nacks: 0,
1070+
rtt: None,
1071+
1072+
// This Vec is unbounded, don't collect if we're never going to drain it
1073+
losses: needs_stat_reports.then_some(Vec::default()),
1074+
1075+
bytes_transmitted: Some(Default::default()),
1076+
bytes_retransmitted: Some(Default::default()),
1077+
}
1078+
}
1079+
10741080
fn update_packet_counts(&mut self, bytes: u64, is_resend: bool) {
10751081
self.packets += 1;
10761082
self.bytes += bytes;
@@ -1097,41 +1103,42 @@ impl StreamTxStats {
10971103
let rtt = calculate_rtt_ms(ntp_time, r.last_sr_delay, r.last_sr_time);
10981104
self.rtt = rtt;
10991105

1100-
let ext_seq = {
1101-
let prev = self.losses.last().map(|s| s.0).unwrap_or(r.max_seq as u64);
1102-
let next = (r.max_seq & 0xffff) as u16;
1103-
extend_u16(Some(prev), next)
1104-
};
1106+
if let Some(losses) = self.losses.as_mut() {
1107+
let ext_seq = {
1108+
let prev = losses.last().map(|s| s.0).unwrap_or(r.max_seq as u64);
1109+
let next = (r.max_seq & 0xffff) as u16;
1110+
extend_u16(Some(prev), next)
1111+
};
11051112

1106-
self.losses
1107-
.push((ext_seq, r.fraction_lost as f32 / u8::MAX as f32));
1113+
losses.push((ext_seq, r.fraction_lost as f32 / u8::MAX as f32));
1114+
}
11081115
}
11091116

11101117
pub(crate) fn fill(&mut self, snapshot: &mut StatsSnapshot, midrid: MidRid, now: Instant) {
11111118
if self.bytes == 0 {
11121119
return;
11131120
}
11141121

1115-
let loss = {
1122+
let loss = self.losses.as_mut().and_then(|losses| {
11161123
let mut value = 0_f32;
11171124
let mut total_weight = 0_u64;
11181125

11191126
// just in case we received RRs out of order
1120-
self.losses.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
1127+
losses.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
11211128

11221129
// average known RR losses weighted by their number of packets
1123-
for it in self.losses.windows(2) {
1130+
for it in losses.windows(2) {
11241131
let [prev, next] = it else { continue };
11251132
let weight = next.0.saturating_sub(prev.0);
11261133
value += next.1 * weight as f32;
11271134
total_weight += weight;
11281135
}
11291136

1137+
losses.drain(..losses.len().saturating_sub(1));
1138+
11301139
let result = value / total_weight as f32;
11311140
result.is_finite().then_some(result)
1132-
};
1133-
1134-
self.losses.drain(..self.losses.len().saturating_sub(1));
1141+
});
11351142

11361143
snapshot.egress.insert(
11371144
midrid,

0 commit comments

Comments
 (0)