Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap Instants to support distant past and future #617

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions src/_internal_test_exports/fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,21 @@ pub fn rtx_buffer(data: &[u8]) {
let buf_size = u16::from_be_bytes([data[0], data[1]]);
let max_age = data[2] as u64;
let max_size = data[3] as usize;
let mut buf = EvictingBuffer::new(buf_size as usize, Duration::from_secs(max_age), max_size);
let mut buf = EvictingBuffer::new(
buf_size as usize,
Duration::from_secs(max_age).into(),
max_size,
);
let mut now = Instant::now();
let mut pos = 0;

for d in &data[4..] {
now += Duration::from_millis(*d as u64);
if d % 2 == 0 {
buf.maybe_evict(now)
buf.maybe_evict(now.into())
} else {
pos += *d as u64;
buf.push(pos, now, d);
buf.push(pos, now.into(), d);
}
}
}
Expand Down Expand Up @@ -69,7 +73,7 @@ pub fn rtp_packet(data: &[u8]) -> Option<()> {
let header = RtpHeader::_parse(rng.slice(len)?, &session.exts)?;
let pkt_len = rng.usize(1500)?;
let data = rng.slice(pkt_len)?;
session.handle_rtp(now, header, data);
session.handle_rtp(now.into(), header, data);
}
}

Expand Down Expand Up @@ -110,7 +114,7 @@ pub fn depack(data: &[u8]) -> Option<()> {
let hlen = rng.usize(76)?;
let header = RtpHeader::_parse(rng.slice(hlen)?, &exts)?;
let meta = RtpMeta {
received: start + Duration::from_millis(rng.u64(10000)?),
received: (start + Duration::from_millis(rng.u64(10000)?)).into(),
time: MediaTime::new(rng.u64(u64::MAX)?, Frequency::MICROS),
seq_no: rng.u64(u64::MAX)?.into(),
header,
Expand All @@ -136,7 +140,7 @@ pub fn receive_register(data: &[u8]) -> Option<()> {
let arrival = start + Duration::from_micros(rng.u64(u64::MAX / 100)?);
let rtp_time = rng.u32(u32::MAX / 2)?;
let clock_rate = rng.u32(u32::MAX / 2)?;
rr.update(seq.into(), arrival, rtp_time, clock_rate);
rr.update(seq.into(), arrival.into(), rtp_time, clock_rate);
}
1 => {
rr.nack_report();
Expand Down
6 changes: 5 additions & 1 deletion src/change/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,11 @@ impl<'a> DirectApi<'a> {
self.rtc.session.send_buffer_video
};

stream.set_rtx_cache(size, DEFAULT_RTX_CACHE_DURATION, DEFAULT_RTX_RATIO_CAP);
stream.set_rtx_cache(
size,
DEFAULT_RTX_CACHE_DURATION.as_std().unwrap(),
DEFAULT_RTX_RATIO_CAP,
);

stream
}
Expand Down
12 changes: 10 additions & 2 deletions src/change/sdp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,11 @@ fn ensure_stream_tx(session: &mut Session) {
session.send_buffer_video
};

stream.set_rtx_cache(size, DEFAULT_RTX_CACHE_DURATION, DEFAULT_RTX_RATIO_CAP);
stream.set_rtx_cache(
size,
DEFAULT_RTX_CACHE_DURATION.as_std().unwrap(),
DEFAULT_RTX_RATIO_CAP,
);
}
}
}
Expand Down Expand Up @@ -919,7 +923,11 @@ fn add_pending_changes(session: &mut Session, pending: Changes) {
session.send_buffer_video
};

stream.set_rtx_cache(size, DEFAULT_RTX_CACHE_DURATION, DEFAULT_RTX_RATIO_CAP);
stream.set_rtx_cache(
size,
DEFAULT_RTX_CACHE_DURATION.as_std().unwrap(),
DEFAULT_RTX_RATIO_CAP,
);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/channel.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Data channel related types.

use std::{fmt, str, time::Instant};
use std::{fmt, str};

use crate::sctp::RtcSctp;
use crate::util::already_happened;
use crate::util::Instant;
use crate::{Rtc, RtcError};

pub use crate::sctp::ChannelConfig;
Expand Down Expand Up @@ -251,7 +251,7 @@ impl ChannelHandler {

pub fn poll_timeout(&self, sctp: &RtcSctp) -> Option<Instant> {
if sctp.is_inited() && (self.need_allocation() || self.need_open()) {
Some(already_happened())
Some(Instant::DistantPast)
} else {
None
}
Expand Down
4 changes: 2 additions & 2 deletions src/crypto/dtls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

use std::collections::VecDeque;
use std::fmt;
use std::time::Instant;

use crate::net::DatagramSend;
use crate::util::Instant;

use super::CryptoProvider;
use super::{CryptoError, Fingerprint, KeyingMaterial, SrtpProfile};
Expand Down Expand Up @@ -154,7 +154,7 @@ impl fmt::Debug for DtlsCert {
}
}

pub trait DtlsInner: Sized {
pub(crate) trait DtlsInner: Sized {
/// Set whether this instance is active or passive.
///
/// i.e. initiating the client hello or not. This must be called
Expand Down
2 changes: 1 addition & 1 deletion src/crypto/ossl/dtls.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::VecDeque;
use std::io::{self, Read, Write};
use std::time::{Duration, Instant};

use openssl::ec::EcKey;
use openssl::nid::Nid;
Expand All @@ -9,6 +8,7 @@ use openssl::ssl::{Ssl, SslContext, SslContextBuilder, SslMethod, SslOptions, Ss
use crate::crypto::dtls::DtlsInner;
use crate::crypto::{DtlsEvent, SrtpProfile};
use crate::io::{DATAGRAM_MTU, DATAGRAM_MTU_WARN};
use crate::util::{Duration, Instant};

use super::cert::OsslDtlsCert;
use super::io_buf::IoBuffer;
Expand Down
2 changes: 1 addition & 1 deletion src/crypto/wincrypto/dtls.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::VecDeque;
use std::time::Instant;

use crate::crypto::dtls::DtlsInner;
use crate::crypto::CryptoError;
use crate::crypto::DtlsEvent;
use crate::crypto::{KeyingMaterial, SrtpProfile};
use crate::io::DATAGRAM_MTU_WARN;
use crate::util::{Duration, Instant};

Check warning on line 8 in src/crypto/wincrypto/dtls.rs

View workflow job for this annotation

GitHub Actions / test_wincrypto (windows-latest, 1.71.1)

unused import: `Duration`

use super::cert::{create_sha256_fingerprint, WinCryptoDtlsCert};

Expand Down Expand Up @@ -77,7 +77,7 @@
}

fn poll_timeout(&mut self, now: Instant) -> Option<Instant> {
self.0.next_timeout(now)

Check failure on line 80 in src/crypto/wincrypto/dtls.rs

View workflow job for this annotation

GitHub Actions / test_wincrypto (windows-latest, 1.71.1)

mismatched types

Check failure on line 80 in src/crypto/wincrypto/dtls.rs

View workflow job for this annotation

GitHub Actions / test_wincrypto (windows-latest, 1.71.1)

mismatched types
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/dtls.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::VecDeque;
use std::time::Instant;
use std::{fmt, io};
use thiserror::Error;

use crate::crypto::{CryptoError, DtlsImpl, Fingerprint};
use crate::util::Instant;

pub use crate::crypto::{DtlsCert, DtlsCertOptions, DtlsEvent};
use crate::net::DatagramSend;
Expand Down Expand Up @@ -32,7 +32,7 @@ impl DtlsError {
}

/// Encapsulation of DTLS.
pub struct Dtls {
pub(crate) struct Dtls {
dtls_impl: DtlsImpl,

/// The fingerprint of the certificate.
Expand Down
41 changes: 22 additions & 19 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,11 +615,11 @@ use change::{DirectApi, SdpApi};
use rtp::RawPacket;
use std::fmt;
use std::net::SocketAddr;
use std::time::{Duration, Instant};
use std::time as std_time;
use streams::RtpPacket;
use streams::StreamPaused;
use thiserror::Error;
use util::InstantExt;
use util::{Duration, Instant};

mod crypto;
use crypto::CryptoProvider;
Expand Down Expand Up @@ -719,7 +719,7 @@ use media::{MediaAdded, MediaChanged, MediaData};
pub mod change;

mod util;
use util::{already_happened, not_happening, Soonest};
use util::Soonest;

mod session;
use session::Session;
Expand Down Expand Up @@ -999,17 +999,17 @@ impl Event {
#[allow(clippy::large_enum_variant)] // We purposely don't want to allocate.
pub enum Input<'a> {
/// A timeout without any network input.
Timeout(Instant),
Timeout(std_time::Instant),
/// Network input.
Receive(Instant, net::Receive<'a>),
Receive(std_time::Instant, net::Receive<'a>),
}

/// Output produced by [`Rtc::poll_output()`]
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum Output {
/// When the [`Rtc`] instance expects an [`Input::Timeout`].
Timeout(Instant),
Timeout(std_time::Instant),

/// Network data that is to be sent.
Transmit(net::Transmit),
Expand Down Expand Up @@ -1159,7 +1159,7 @@ impl Rtc {
remote_addrs: vec![],
send_addr: None,
need_init_time: true,
last_now: already_happened(),
last_now: Instant::DistantPast,
peer_bytes_rx: 0,
peer_bytes_tx: 0,
change_counter: 0,
Expand Down Expand Up @@ -1422,7 +1422,7 @@ impl Rtc {
fn do_poll_output(&mut self) -> Result<Output, RtcError> {
if !self.alive {
self.last_timeout_reason = Reason::NotHappening;
return Ok(Output::Timeout(not_happening()));
return Ok(Output::Timeout(Instant::DistantFuture.as_std()));
}

while let Some(e) = self.ice.poll_event() {
Expand Down Expand Up @@ -1585,15 +1585,16 @@ impl Rtc {

let time_and_reason = (None, Reason::NotHappening)
.soonest((self.dtls.poll_timeout(self.last_now), Reason::DTLS))
.soonest((self.ice.poll_timeout(), Reason::Ice))
.soonest((self.ice.poll_timeout().map(Instant::from), Reason::Ice))
.soonest(self.session.poll_timeout())
.soonest((self.sctp.poll_timeout(), Reason::Sctp))
.soonest((self.chan.poll_timeout(&self.sctp), Reason::Channel))
.soonest((stats.and_then(|s| s.poll_timeout()), Reason::Stats));

// TODO: uncomment or remove?
// trace!("poll_output timeout reason: {}", time_and_reason.1);

let time = time_and_reason.0.unwrap_or_else(not_happening);
let time = time_and_reason.0.unwrap_or(Instant::DistantFuture);
let reason = time_and_reason.1;

// We want to guarantee time doesn't go backwards.
Expand All @@ -1605,7 +1606,7 @@ impl Rtc {

self.last_timeout_reason = reason;

Ok(Output::Timeout(next))
Ok(Output::Timeout(next.as_std()))
}

/// The reason for the last [`Output::Timeout`]
Expand Down Expand Up @@ -1722,10 +1723,10 @@ impl Rtc {
}

match input {
Input::Timeout(now) => self.do_handle_timeout(now)?,
Input::Timeout(now) => self.do_handle_timeout(now.into())?,
Input::Receive(now, r) => {
self.do_handle_receive(now, r)?;
self.do_handle_timeout(now)?;
self.do_handle_receive(now.into(), r)?;
self.do_handle_timeout(now.into())?;
}
}
Ok(())
Expand All @@ -1748,7 +1749,7 @@ impl Rtc {
self.init_time(now);

self.last_now = now;
self.ice.handle_timeout(now);
self.ice.handle_timeout(now.as_std());
self.sctp.handle_timeout(now);
self.chan.handle_timeout(now, &mut self.sctp);
self.session.handle_timeout(now)?;
Expand Down Expand Up @@ -1789,7 +1790,7 @@ impl Rtc {
destination: r.destination,
message: stun,
};
self.ice.handle_packet(now, packet);
self.ice.handle_packet(now.as_std(), packet);
}
Dtls(dtls) => self.dtls.handle_receive(dtls)?,
Rtp(rtp) => self.session.handle_rtp_receive(now, rtp),
Expand Down Expand Up @@ -2164,8 +2165,8 @@ impl RtcConfig {
/// None turns off the stats events.
///
/// This includes [`MediaEgressStats`], [`MediaIngressStats`], [`MediaEgressStats`]
pub fn set_stats_interval(mut self, interval: Option<Duration>) -> Self {
self.stats_interval = interval;
pub fn set_stats_interval(mut self, interval: Option<std_time::Duration>) -> Self {
self.stats_interval = interval.map(Into::into);
self
}

Expand All @@ -2181,8 +2182,10 @@ impl RtcConfig {
/// // Defaults to None.
/// assert_eq!(config.stats_interval(), None);
/// ```
pub fn stats_interval(&self) -> Option<Duration> {
pub fn stats_interval(&self) -> Option<std_time::Duration> {
// Stats interval is set externally so its always safe to convert to std Duration
self.stats_interval
.map(|i| i.as_std().expect("invalid stats interval"))
}

/// Enables estimation of available bandwidth (BWE).
Expand Down
4 changes: 2 additions & 2 deletions src/media/event.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt;
use std::ops::RangeInclusive;
use std::time::Instant;
use std::time as std_time;

use crate::packet::MediaKind;
use crate::rtp_::{Direction, ExtensionValues, MediaTime, Mid, Pt, Rid, SenderInfo, SeqNo};
Expand Down Expand Up @@ -120,7 +120,7 @@ pub struct MediaData {
/// The time of the [`Input::Receive`][crate::Input::Receive] of the first packet that caused this MediaData.
///
/// In simple SFU setups this can be used as wallclock for [`Writer::write`][crate::media::Writer].
pub network_time: Instant,
pub network_time: std_time::Instant,

/// The (RTP) sequence numbers that made up this data.
pub seq_range: RangeInclusive<SeqNo>,
Expand Down
9 changes: 4 additions & 5 deletions src/media/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Media (audio/video) related content.

use std::collections::{HashMap, VecDeque};
use std::time::Instant;

use crate::change::AddMedia;
use crate::format::CodecConfig;
Expand All @@ -17,7 +16,7 @@ use crate::format::PayloadParams;
use crate::sdp::Simulcast as SdpSimulcast;
use crate::sdp::{MediaLine, Msid};
use crate::streams::{RtpPacket, Streams};
use crate::util::already_happened;
use crate::util::Instant;

mod event;
pub use event::*;
Expand Down Expand Up @@ -285,7 +284,7 @@ impl Media {
rid: *rid,
params: *codec,
time: dep.time,
network_time: dep.first_network_time(),
network_time: dep.first_network_time().as_std(),
seq_range: dep.seq_range(),
contiguous: dep.contiguous,
ext_vals: dep.ext_vals().clone(),
Expand Down Expand Up @@ -341,7 +340,7 @@ impl Media {
let buffer = self.depayloaders.get_mut(&key).unwrap();

let meta = RtpMeta {
received: packet.timestamp,
received: packet.timestamp.into(),
time: packet.time,
seq_no: packet.seq_no,
header: packet.header.clone(),
Expand Down Expand Up @@ -394,7 +393,7 @@ impl Media {

pub(crate) fn poll_timeout(&self) -> Option<Instant> {
if !self.to_payload.is_empty() {
Some(already_happened())
Some(Instant::DistantPast)
} else {
None
}
Expand Down
Loading
Loading