Skip to content

Commit 8db3679

Browse files
authored
Bugfix/fix interceptor/webrtc unit test memory leak (#629)
* Fix MockStream cyclic dependency to itself - it often binds itself into Interceptor and owns the returned sender/writer. The returned sender/writer might contain the Arc<> to MockStream itself. Thus, the cyclic dependency is formed. We create an internal struct to avoid this. * Fix cyclic dependency between PeerConnectionInternal and StatsInterceptor - PeerConnectionInternal should not own StatsInterceptor. Make it Weak<> * Fix cyclic dependency for dtls_transport - IceTransport holds OnConnectionStateChangeFn, which hods DtlsTransport and DtlsTransport holds IceTransport. In the callback we should use Weak<> * Fix self reference test case in data_channel_test.rs * Fix formatting
1 parent 03237d0 commit 8db3679

File tree

4 files changed

+250
-230
lines changed

4 files changed

+250
-230
lines changed

Diff for: interceptor/src/mock/mock_stream.rs

+38-34
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ pub struct MockStream {
1717
rtcp_writer: Mutex<Option<Arc<dyn RTCPWriter + Send + Sync>>>,
1818
rtp_writer: Mutex<Option<Arc<dyn RTPWriter + Send + Sync>>>,
1919

20+
internal: Arc<MockStreamInternal>,
21+
}
22+
23+
struct MockStreamInternal {
2024
rtcp_out_modified_tx: mpsc::Sender<RTCPPackets>,
2125
rtp_out_modified_tx: mpsc::Sender<rtp::packet::Packet>,
2226
rtcp_in_rx: Mutex<mpsc::Receiver<RTCPPackets>>,
@@ -46,44 +50,44 @@ impl MockStream {
4650

4751
let stream = Arc::new(MockStream {
4852
interceptor: Arc::clone(&interceptor),
49-
5053
rtcp_writer: Mutex::new(None),
5154
rtp_writer: Mutex::new(None),
52-
53-
rtcp_in_tx: Mutex::new(Some(rtcp_in_tx)),
54-
rtp_in_tx: Mutex::new(Some(rtp_in_tx)),
55-
rtcp_in_rx: Mutex::new(rtcp_in_rx),
56-
rtp_in_rx: Mutex::new(rtp_in_rx),
57-
58-
rtcp_out_modified_tx,
59-
rtp_out_modified_tx,
60-
rtcp_out_modified_rx: Mutex::new(rtcp_out_modified_rx),
61-
rtp_out_modified_rx: Mutex::new(rtp_out_modified_rx),
62-
63-
rtcp_in_modified_rx: Mutex::new(rtcp_in_modified_rx),
64-
rtp_in_modified_rx: Mutex::new(rtp_in_modified_rx),
55+
internal: Arc::new(MockStreamInternal {
56+
rtcp_in_tx: Mutex::new(Some(rtcp_in_tx)),
57+
rtp_in_tx: Mutex::new(Some(rtp_in_tx)),
58+
rtcp_in_rx: Mutex::new(rtcp_in_rx),
59+
rtp_in_rx: Mutex::new(rtp_in_rx),
60+
61+
rtcp_out_modified_tx,
62+
rtp_out_modified_tx,
63+
rtcp_out_modified_rx: Mutex::new(rtcp_out_modified_rx),
64+
rtp_out_modified_rx: Mutex::new(rtp_out_modified_rx),
65+
66+
rtcp_in_modified_rx: Mutex::new(rtcp_in_modified_rx),
67+
rtp_in_modified_rx: Mutex::new(rtp_in_modified_rx),
68+
}),
6569
});
6670

6771
let rtcp_writer = interceptor
68-
.bind_rtcp_writer(Arc::clone(&stream) as Arc<dyn RTCPWriter + Send + Sync>)
72+
.bind_rtcp_writer(Arc::clone(&stream.internal) as Arc<dyn RTCPWriter + Send + Sync>)
6973
.await;
7074
{
7175
let mut rw = stream.rtcp_writer.lock().await;
72-
*rw = Some(rtcp_writer);
76+
*rw = Some(Arc::clone(&rtcp_writer));
7377
}
7478
let rtp_writer = interceptor
7579
.bind_local_stream(
7680
info,
77-
Arc::clone(&stream) as Arc<dyn RTPWriter + Send + Sync>,
81+
Arc::clone(&stream.internal) as Arc<dyn RTPWriter + Send + Sync>,
7882
)
7983
.await;
8084
{
8185
let mut rw = stream.rtp_writer.lock().await;
82-
*rw = Some(rtp_writer);
86+
*rw = Some(Arc::clone(&rtp_writer));
8387
}
8488

8589
let rtcp_reader = interceptor
86-
.bind_rtcp_reader(Arc::clone(&stream) as Arc<dyn RTCPReader + Send + Sync>)
90+
.bind_rtcp_reader(Arc::clone(&stream.internal) as Arc<dyn RTCPReader + Send + Sync>)
8791
.await;
8892
tokio::spawn(async move {
8993
let mut buf = vec![0u8; 1500];
@@ -104,7 +108,7 @@ impl MockStream {
104108
let rtp_reader = interceptor
105109
.bind_remote_stream(
106110
info,
107-
Arc::clone(&stream) as Arc<dyn RTPReader + Send + Sync>,
111+
Arc::clone(&stream.internal) as Arc<dyn RTPReader + Send + Sync>,
108112
)
109113
.await;
110114
tokio::spawn(async move {
@@ -153,23 +157,23 @@ impl MockStream {
153157

154158
/// receive_rtcp schedules a new rtcp batch, so it can be read be the stream
155159
pub async fn receive_rtcp(&self, pkts: Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>) {
156-
let rtcp_in_tx = self.rtcp_in_tx.lock().await;
160+
let rtcp_in_tx = self.internal.rtcp_in_tx.lock().await;
157161
if let Some(tx) = &*rtcp_in_tx {
158162
let _ = tx.send(pkts).await;
159163
}
160164
}
161165

162166
/// receive_rtp schedules a rtp packet, so it can be read be the stream
163167
pub async fn receive_rtp(&self, pkt: rtp::packet::Packet) {
164-
let rtp_in_tx = self.rtp_in_tx.lock().await;
168+
let rtp_in_tx = self.internal.rtp_in_tx.lock().await;
165169
if let Some(tx) = &*rtp_in_tx {
166170
let _ = tx.send(pkt).await;
167171
}
168172
}
169173

170174
/// written_rtcp returns a channel containing the rtcp batches written, modified by the interceptor
171175
pub async fn written_rtcp(&self) -> Option<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>> {
172-
let mut rtcp_out_modified_rx = self.rtcp_out_modified_rx.lock().await;
176+
let mut rtcp_out_modified_rx = self.internal.rtcp_out_modified_rx.lock().await;
173177
rtcp_out_modified_rx.recv().await
174178
}
175179

@@ -180,7 +184,7 @@ impl MockStream {
180184
&self,
181185
) -> Option<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>> {
182186
let mut last = None;
183-
let mut rtcp_out_modified_rx = self.rtcp_out_modified_rx.lock().await;
187+
let mut rtcp_out_modified_rx = self.internal.rtcp_out_modified_rx.lock().await;
184188

185189
while let Ok(v) = rtcp_out_modified_rx.try_recv() {
186190
last = Some(v);
@@ -191,40 +195,40 @@ impl MockStream {
191195

192196
/// written_rtp returns a channel containing rtp packets written, modified by the interceptor
193197
pub async fn written_rtp(&self) -> Option<rtp::packet::Packet> {
194-
let mut rtp_out_modified_rx = self.rtp_out_modified_rx.lock().await;
198+
let mut rtp_out_modified_rx = self.internal.rtp_out_modified_rx.lock().await;
195199
rtp_out_modified_rx.recv().await
196200
}
197201

198202
/// read_rtcp returns a channel containing the rtcp batched read, modified by the interceptor
199203
pub async fn read_rtcp(
200204
&self,
201205
) -> Option<Result<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>>> {
202-
let mut rtcp_in_modified_rx = self.rtcp_in_modified_rx.lock().await;
206+
let mut rtcp_in_modified_rx = self.internal.rtcp_in_modified_rx.lock().await;
203207
rtcp_in_modified_rx.recv().await
204208
}
205209

206210
/// read_rtp returns a channel containing the rtp packets read, modified by the interceptor
207211
pub async fn read_rtp(&self) -> Option<Result<rtp::packet::Packet>> {
208-
let mut rtp_in_modified_rx = self.rtp_in_modified_rx.lock().await;
212+
let mut rtp_in_modified_rx = self.internal.rtp_in_modified_rx.lock().await;
209213
rtp_in_modified_rx.recv().await
210214
}
211215

212-
/// close closes the stream and the underlying interceptor
216+
/// close closes the stream
213217
pub async fn close(&self) -> Result<()> {
214218
{
215-
let mut rtcp_in_tx = self.rtcp_in_tx.lock().await;
219+
let mut rtcp_in_tx = self.internal.rtcp_in_tx.lock().await;
216220
rtcp_in_tx.take();
217221
}
218222
{
219-
let mut rtp_in_tx = self.rtp_in_tx.lock().await;
223+
let mut rtp_in_tx = self.internal.rtp_in_tx.lock().await;
220224
rtp_in_tx.take();
221225
}
222226
self.interceptor.close().await
223227
}
224228
}
225229

226230
#[async_trait]
227-
impl RTCPWriter for MockStream {
231+
impl RTCPWriter for MockStreamInternal {
228232
async fn write(
229233
&self,
230234
pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>],
@@ -237,7 +241,7 @@ impl RTCPWriter for MockStream {
237241
}
238242

239243
#[async_trait]
240-
impl RTCPReader for MockStream {
244+
impl RTCPReader for MockStreamInternal {
241245
async fn read(
242246
&self,
243247
buf: &mut [u8],
@@ -260,15 +264,15 @@ impl RTCPReader for MockStream {
260264
}
261265

262266
#[async_trait]
263-
impl RTPWriter for MockStream {
267+
impl RTPWriter for MockStreamInternal {
264268
async fn write(&self, pkt: &rtp::packet::Packet, _a: &Attributes) -> Result<usize> {
265269
let _ = self.rtp_out_modified_tx.send(pkt.clone()).await;
266270
Ok(0)
267271
}
268272
}
269273

270274
#[async_trait]
271-
impl RTPReader for MockStream {
275+
impl RTPReader for MockStreamInternal {
272276
async fn read(
273277
&self,
274278
buf: &mut [u8],

Diff for: webrtc/src/data_channel/data_channel_test.rs

+17-9
Original file line numberDiff line numberDiff line change
@@ -202,11 +202,15 @@ async fn test_data_channel_send_before_signaling() -> Result<()> {
202202
return Box::pin(async {});
203203
}
204204
Box::pin(async move {
205-
let d2 = Arc::clone(&d);
205+
let d2 = Arc::downgrade(&d);
206206
d.on_message(Box::new(move |_: DataChannelMessage| {
207-
let d3 = Arc::clone(&d2);
207+
let d3 = d2.clone();
208208
Box::pin(async move {
209-
let result = d3.send(&Bytes::from(b"Pong".to_vec())).await;
209+
let result = d3
210+
.upgrade()
211+
.unwrap()
212+
.send(&Bytes::from(b"Pong".to_vec()))
213+
.await;
210214
assert!(result.is_ok(), "Failed to send string on data channel");
211215
})
212216
}));
@@ -218,11 +222,11 @@ async fn test_data_channel_send_before_signaling() -> Result<()> {
218222

219223
assert!(dc.ordered(), "Ordered should be set to true");
220224

221-
let dc2 = Arc::clone(&dc);
225+
let dc2 = Arc::downgrade(&dc);
222226
dc.on_open(Box::new(move || {
223-
let dc3 = Arc::clone(&dc2);
227+
let dc3 = dc2.clone();
224228
Box::pin(async move {
225-
let result = dc3.send_text("Ping".to_owned()).await;
229+
let result = dc3.upgrade().unwrap().send_text("Ping".to_owned()).await;
226230
assert!(result.is_ok(), "Failed to send string on data channel");
227231
})
228232
}));
@@ -258,12 +262,16 @@ async fn test_data_channel_send_after_connected() -> Result<()> {
258262
return Box::pin(async {});
259263
}
260264
Box::pin(async move {
261-
let d2 = Arc::clone(&d);
265+
let d2 = Arc::downgrade(&d);
262266
d.on_message(Box::new(move |_: DataChannelMessage| {
263-
let d3 = Arc::clone(&d2);
267+
let d3 = d2.clone();
264268

265269
Box::pin(async move {
266-
let result = d3.send(&Bytes::from(b"Pong".to_vec())).await;
270+
let result = d3
271+
.upgrade()
272+
.unwrap()
273+
.send(&Bytes::from(b"Pong".to_vec()))
274+
.await;
267275
assert!(result.is_ok(), "Failed to send string on data channel");
268276
})
269277
}));

Diff for: webrtc/src/peer_connection/mod.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -236,9 +236,13 @@ impl RTCPeerConnection {
236236
};
237237

238238
let weak_interceptor = Arc::downgrade(&interceptor);
239-
let (internal, configuration) =
240-
PeerConnectionInternal::new(api, weak_interceptor, stats_interceptor, configuration)
241-
.await?;
239+
let (internal, configuration) = PeerConnectionInternal::new(
240+
api,
241+
weak_interceptor,
242+
Arc::downgrade(&stats_interceptor),
243+
configuration,
244+
)
245+
.await?;
242246
let internal_rtcp_writer = Arc::clone(&internal) as Arc<dyn RTCPWriter + Send + Sync>;
243247
let interceptor_rtcp_writer = interceptor.bind_rtcp_writer(internal_rtcp_writer).await;
244248

0 commit comments

Comments
 (0)