Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: reformat using nightly fmt #101

Merged
merged 1 commit into from
Jul 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions backend/api/src/api/middleware/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub fn auth_middleware(_: &Arc<GlobalState>) -> Middleware<Body, RouteError> {

let global = req.get_global()?;
let Ok(token) = token.to_str() else {
fail_fast!(mode, req);
fail_fast!(mode, req);
};

// Token's will start with "Bearer " so we need to remove that
Expand All @@ -100,7 +100,8 @@ pub fn auth_middleware(_: &Arc<GlobalState>) -> Middleware<Body, RouteError> {
.session_by_id_loader
.load_one(jwt.session_id)
.await
.map_err_route("failed to fetch session")? else {
.map_err_route("failed to fetch session")?
else {
fail_fast!(mode, req);
};

Expand Down
22 changes: 17 additions & 5 deletions backend/api/src/api/v1/gql/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,34 @@ async fn websocket_handler(
// We silently ignore invalid tokens since we don't want to force the user to login
// if the token is invalid when they make a request which requires authentication, it will fail.
let Some(jwt) = JwtState::verify(&global, token) else {
return Err(GqlError::InvalidSession.with_message("invalid session token").extend());
return Err(GqlError::InvalidSession
.with_message("invalid session token")
.extend());
};

let Some(session) = global.session_by_id_loader.load_one(jwt.session_id).await.map_err_gql("failed to fetch session")? else {
return Err(GqlError::InvalidSession.with_message("invalid session").extend());
let Some(session) = global
.session_by_id_loader
.load_one(jwt.session_id)
.await
.map_err_gql("failed to fetch session")?
else {
return Err(GqlError::InvalidSession
.with_message("invalid session")
.extend());
};

if !session.is_valid() {
return Err(GqlError::InvalidSession.with_message("session has invalidated").extend());
return Err(GqlError::InvalidSession
.with_message("session has invalidated")
.extend());
}

let permissions = global
.user_permisions_by_id_loader
.load_one(session.id)
.await
.map_err_gql("failed to fetch permissions")?.unwrap_or_default();
.map_err_gql("failed to fetch permissions")?
.unwrap_or_default();

request_context.set_session(Some((session, permissions)));
}
Expand Down
2 changes: 1 addition & 1 deletion backend/api/src/api/v1/gql/request_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl RequestContext {
) -> Result<Option<(session::Model, UserPermission)>> {
let guard = self.session.load();
let Some(session) = guard.as_ref() else {
return Ok(None)
return Ok(None);
};

if !self.is_websocket {
Expand Down
11 changes: 9 additions & 2 deletions backend/api/src/api/v1/gql/subscription/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,15 @@ impl UserSubscription {
) -> Result<impl Stream<Item = Result<DisplayNameStream>> + 'ctx> {
let global = ctx.get_global();

let Some(mut user) = global.user_by_id_loader.load_one(user_id).await.map_err_gql("failed to fetch user")? else {
return Err(GqlError::NotFound.with_message("user not found").with_field(vec!["user_id"]));
let Some(mut user) = global
.user_by_id_loader
.load_one(user_id)
.await
.map_err_gql("failed to fetch user")?
else {
return Err(GqlError::NotFound
.with_message("user not found")
.with_field(vec!["user_id"]));
};

let mut subscription = global
Expand Down
23 changes: 17 additions & 6 deletions backend/api/src/grpc/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,18 @@ impl api_server::Api for ApiServer {
}

// Check user permissions
let Ok(permissions) = global.user_permisions_by_id_loader.load_one(channel_id).await else {
let Ok(permissions) = global
.user_permisions_by_id_loader
.load_one(channel_id)
.await
else {
return Err(Status::internal("failed to query database"));
};

let Some(user_permissions) = permissions else {
return Err(Status::permission_denied("user has no permission to go live"));
return Err(Status::permission_denied(
"user has no permission to go live",
));
};

if !user_permissions
Expand Down Expand Up @@ -348,10 +354,15 @@ impl api_server::Api for ApiServer {
.parse::<Uuid>()
.map_err(|_| Status::invalid_argument("invalid old stream ID: must be a valid UUID"))?;

let Some(old_stream) = global.stream_by_id_loader.load_one(old_stream_id).await.map_err(|e| {
tracing::error!("failed to load stream by ID: {}", e);
Status::internal("internal server error")
})? else {
let Some(old_stream) = global
.stream_by_id_loader
.load_one(old_stream_id)
.await
.map_err(|e| {
tracing::error!("failed to load stream by ID: {}", e);
Status::internal("internal server error")
})?
else {
return Err(Status::not_found("stream not found"));
};

Expand Down
14 changes: 12 additions & 2 deletions video/container/flv/src/tests/demuxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,12 +732,22 @@ fn test_demux_flv_hevc_aac() {
assert_eq!(config.num_temporal_layers, 1);

// We should be able to find a SPS NAL unit in the sequence header
let Some(sps) = config.arrays.iter().find(|a| a.nal_unit_type == h265::NaluType::Sps).and_then(|v| v.nalus.get(0)) else {
let Some(sps) = config
.arrays
.iter()
.find(|a| a.nal_unit_type == h265::NaluType::Sps)
.and_then(|v| v.nalus.get(0))
else {
panic!("expected sps");
};

// We should be able to find a PPS NAL unit in the sequence header
let Some(_) = config.arrays.iter().find(|a| a.nal_unit_type == h265::NaluType::Pps).and_then(|v| v.nalus.get(0)) else {
let Some(_) = config
.arrays
.iter()
.find(|a| a.nal_unit_type == h265::NaluType::Pps)
.and_then(|v| v.nalus.get(0))
else {
panic!("expected pps");
};

Expand Down
46 changes: 33 additions & 13 deletions video/transcoder/src/transcoder/job/variant/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,11 @@ impl Variant {
// Discontinuities are a bit of a special case.
// Any samples before the keyframe on track 1 are discarded.
// Samples on other tracks will be added to the next fragment.
let Some(idx) = self.tracks[0].samples.iter().position(|(_, sample)| sample.keyframe) else {
let Some(idx) = self.tracks[0]
.samples
.iter()
.position(|(_, sample)| sample.keyframe)
else {
// We dont have a place to create a discontinuity yet, so we need to wait a little bit.
return Ok(());
};
Expand Down Expand Up @@ -533,7 +537,10 @@ impl Variant {

// If we have an index we can cut a new segment.
if let Some(idx) = idx {
let Some((segment, _)) = self.segment_state.get_mut(&self.redis_state.current_segment_idx()) else {
let Some((segment, _)) = self
.segment_state
.get_mut(&self.redis_state.current_segment_idx())
else {
// This should never happen, but just in case.
return Err(anyhow!("failed to get current segment state"));
};
Expand Down Expand Up @@ -579,22 +586,32 @@ impl Variant {
}

// We want to find out if we have enough samples to create a fragment.
let Some(idx) = sample_durations.iter().enumerate().find_map(|(idx, duration)| {
if *duration >= consts::FRAGMENT_CUT_TARGET_DURATION && (*duration * 1000.0).fract() == 0.0 {
return Some(Some(idx));
}
let Some(idx) = sample_durations
.iter()
.enumerate()
.find_map(|(idx, duration)| {
if *duration >= consts::FRAGMENT_CUT_TARGET_DURATION
&& (*duration * 1000.0).fract() == 0.0
{
return Some(Some(idx));
}

if *duration >= consts::FRAGMENT_CUT_MAX_DURATION {
return Some(None);
}
if *duration >= consts::FRAGMENT_CUT_MAX_DURATION {
return Some(None);
}

None
}) else {
None
})
else {
// We dont have a place to create a fragment yet, so we need to wait a little bit.
return Ok(());
};

let Some(idx) = idx.or_else(|| sample_durations.iter().position(|d| *d >= consts::FRAGMENT_CUT_TARGET_DURATION)) else {
let Some(idx) = idx.or_else(|| {
sample_durations
.iter()
.position(|d| *d >= consts::FRAGMENT_CUT_TARGET_DURATION)
}) else {
// We dont have a place to create a fragment yet, so we need to wait a little bit.
return Ok(());
};
Expand Down Expand Up @@ -628,7 +645,10 @@ impl Variant {

fn create_fragment(&mut self, samples: Vec<Vec<TrackSample>>) -> Result<()> {
// Get the current segment
let Some((segment, segment_data_state)) = self.segment_state.get_mut(&self.redis_state.current_segment_idx()) else {
let Some((segment, segment_data_state)) = self
.segment_state
.get_mut(&self.redis_state.current_segment_idx())
else {
return Err(anyhow!("failed to get current segment"));
};

Expand Down
7 changes: 6 additions & 1 deletion video/transmuxer/src/codecs/hevc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ use mp4::{
use crate::TransmuxError;

pub fn stsd_entry(config: HEVCDecoderConfigurationRecord) -> Result<(DynBox, Sps), TransmuxError> {
let Some(sps) = config.arrays.iter().find(|a| a.nal_unit_type == h265::NaluType::Sps).and_then(|v| v.nalus.get(0)) else {
let Some(sps) = config
.arrays
.iter()
.find(|a| a.nal_unit_type == h265::NaluType::Sps)
.and_then(|v| v.nalus.get(0))
else {
return Err(TransmuxError::InvalidHEVCDecoderConfigurationRecord);
};

Expand Down
2 changes: 1 addition & 1 deletion video/transmuxer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl Transmuxer {
let mut writer = BytesWriter::default();

let Some((video_settings, _)) = &self.settings else {
let Some((video_settings, audio_settings)) = self.init_sequence(&mut writer)? else {
let Some((video_settings, audio_settings)) = self.init_sequence(&mut writer)? else {
if self.tags.len() > 30 {
// We are clearly not getting any sequence headers, so we should just give up
return Err(TransmuxError::NoSequenceHeaders);
Expand Down
Loading