From ac4d79bf06b45b98530fb977cce478af71a38f83 Mon Sep 17 00:00:00 2001 From: Troy Benson Date: Wed, 14 Jun 2023 20:25:05 +0000 Subject: [PATCH] chore: fix formatting on stream! and try_stream! macros https://github.com/tokio-rs/async-stream/issues/68 pointed out we can use the `stream!({ ... })` macro syntax to force rustfmt to format the inner code block. --- .../api/src/api/v1/gql/subscription/chat.rs | 24 ++++--- .../api/src/api/v1/gql/subscription/user.rs | 9 +-- backend/api/src/grpc/health.rs | 15 +++-- common/src/rmq.rs | 28 ++++---- video/edge/src/grpc/health.rs | 15 +++-- video/ingest/src/grpc/health.rs | 15 +++-- video/ingest/src/grpc/ingest.rs | 48 +++++++------- video/ingest/src/tests/ingest.rs | 11 +++- video/transcoder/src/grpc/health.rs | 15 +++-- video/transcoder/src/transcoder/job/mod.rs | 4 +- .../src/transcoder/job/track_parser.rs | 66 ++++++++++++++----- video/transcoder/src/transcoder/job/utils.rs | 27 ++++---- 12 files changed, 177 insertions(+), 100 deletions(-) diff --git a/backend/api/src/api/v1/gql/subscription/chat.rs b/backend/api/src/api/v1/gql/subscription/chat.rs index d9733a28..24f0461c 100644 --- a/backend/api/src/api/v1/gql/subscription/chat.rs +++ b/backend/api/src/api/v1/gql/subscription/chat.rs @@ -49,22 +49,30 @@ impl ChatSubscription { .await .map_err_gql("failed to subscribe to chat messages")?; - Ok(stream! { + Ok(stream!({ yield Ok(welcome_message); while let Ok(message) = message_stream.recv().await { let event = pb::scuffle::events::ChatMessage::decode( - message.as_bytes().map_err_gql("invalid redis value type")? - ).map_err_gql("failed to decode chat message")?; + message.as_bytes().map_err_gql("invalid redis value type")?, + ) + .map_err_gql("failed to decode chat message")?; yield Ok(ChatMessage { - id: Uuid::parse_str(&event.id).map_err_gql("failed to parse chat message id")?, - author_id: Uuid::parse_str(&event.author_id).map_err_gql("failed to parse chat message author id")?, - channel_id: Uuid::parse_str(&event.channel_id).map_err_gql("failed to parse chat message channel id")?, + id: Uuid::parse_str(&event.id) + .map_err_gql("failed to parse chat message id")?, + author_id: Uuid::parse_str(&event.author_id) + .map_err_gql("failed to parse chat message author id")?, + channel_id: Uuid::parse_str(&event.channel_id) + .map_err_gql("failed to parse chat message channel id")?, content: event.content, - created_at: Utc.timestamp_opt(event.created_at, 0).single().map_err_gql("failed to parse chat message created at")?.into(), + created_at: Utc + .timestamp_opt(event.created_at, 0) + .single() + .map_err_gql("failed to parse chat message created at")? + .into(), r#type: MessageType::User, }); } - }) + })) } } diff --git a/backend/api/src/api/v1/gql/subscription/user.rs b/backend/api/src/api/v1/gql/subscription/user.rs index 523528f8..0c31217e 100644 --- a/backend/api/src/api/v1/gql/subscription/user.rs +++ b/backend/api/src/api/v1/gql/subscription/user.rs @@ -39,7 +39,7 @@ impl UserSubscription { .await .map_err_gql("failed to subscribe to user display name")?; - Ok(async_stream::stream! { + Ok(async_stream::stream!({ yield Ok(DisplayNameStream { display_name: user.display_name.clone(), username: user.username.clone(), @@ -47,8 +47,9 @@ impl UserSubscription { while let Ok(message) = subscription.recv().await { let event = pb::scuffle::events::UserDisplayName::decode( - message.as_bytes().map_err_gql("invalid redis value")? - ).map_err_gql("failed to decode user display name")?; + message.as_bytes().map_err_gql("invalid redis value")?, + ) + .map_err_gql("failed to decode user display name")?; if let Some(username) = event.username { user.username = username; @@ -63,6 +64,6 @@ impl UserSubscription { username: user.username.clone(), }); } - }) + })) } } diff --git a/backend/api/src/grpc/health.rs b/backend/api/src/grpc/health.rs index dc7b63cc..dcfde825 100644 --- a/backend/api/src/grpc/health.rs +++ b/backend/api/src/grpc/health.rs @@ -53,17 +53,24 @@ impl health_server::Health for HealthServer { async fn watch(&self, _: Request) -> Result> { let global = self.global.clone(); - let output = try_stream! { + let output = try_stream!({ loop { tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - let serving = global.upgrade().map(|g| !g.ctx.is_done()).unwrap_or_default(); + let serving = global + .upgrade() + .map(|g| !g.ctx.is_done()) + .unwrap_or_default(); yield HealthCheckResponse { - status: if serving { ServingStatus::Serving.into() } else { ServingStatus::NotServing.into() }, + status: if serving { + ServingStatus::Serving.into() + } else { + ServingStatus::NotServing.into() + }, }; } - }; + }); Ok(Response::new(Box::pin(output))) } diff --git a/common/src/rmq.rs b/common/src/rmq.rs index 84cf9c8a..c8cbc899 100644 --- a/common/src/rmq.rs +++ b/common/src/rmq.rs @@ -127,35 +127,35 @@ impl ConnectionPool { let queue_name = queue_name.to_string(); let connection_name = connection_name.to_string(); - stream! { + stream!({ 'connection_loop: loop { let channel = self.aquire().await?; - let mut consumer = channel.basic_consume(&queue_name, &connection_name, options, table.clone()).await?; + let mut consumer = channel + .basic_consume(&queue_name, &connection_name, options, table.clone()) + .await?; loop { let m = consumer.next().await; match m { Some(Ok(m)) => { yield Ok(m); - }, - Some(Err(e)) => { - match e { - lapin::Error::IOError(e) => { - if e.kind() == std::io::ErrorKind::ConnectionReset { - continue 'connection_loop; - } - }, - _ => { - yield Err(anyhow!("failed to get message: {}", e)); + } + Some(Err(e)) => match e { + lapin::Error::IOError(e) => { + if e.kind() == std::io::ErrorKind::ConnectionReset { + continue 'connection_loop; } } + _ => { + yield Err(anyhow!("failed to get message: {}", e)); + } }, None => { continue 'connection_loop; - }, + } } } } - } + }) } pub async fn aquire(&self) -> Result { diff --git a/video/edge/src/grpc/health.rs b/video/edge/src/grpc/health.rs index 8f63d9ff..ebab92a9 100644 --- a/video/edge/src/grpc/health.rs +++ b/video/edge/src/grpc/health.rs @@ -49,17 +49,24 @@ impl health_server::Health for HealthServer { async fn watch(&self, _: Request) -> Result> { let global = self.global.clone(); - let output = try_stream! { + let output = try_stream!({ loop { tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - let serving = global.upgrade().map(|g| !g.ctx.is_done()).unwrap_or_default(); + let serving = global + .upgrade() + .map(|g| !g.ctx.is_done()) + .unwrap_or_default(); yield HealthCheckResponse { - status: if serving { ServingStatus::Serving.into() } else { ServingStatus::NotServing.into() }, + status: if serving { + ServingStatus::Serving.into() + } else { + ServingStatus::NotServing.into() + }, }; } - }; + }); Ok(Response::new(Box::pin(output))) } diff --git a/video/ingest/src/grpc/health.rs b/video/ingest/src/grpc/health.rs index 8f63d9ff..ebab92a9 100644 --- a/video/ingest/src/grpc/health.rs +++ b/video/ingest/src/grpc/health.rs @@ -49,17 +49,24 @@ impl health_server::Health for HealthServer { async fn watch(&self, _: Request) -> Result> { let global = self.global.clone(); - let output = try_stream! { + let output = try_stream!({ loop { tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - let serving = global.upgrade().map(|g| !g.ctx.is_done()).unwrap_or_default(); + let serving = global + .upgrade() + .map(|g| !g.ctx.is_done()) + .unwrap_or_default(); yield HealthCheckResponse { - status: if serving { ServingStatus::Serving.into() } else { ServingStatus::NotServing.into() }, + status: if serving { + ServingStatus::Serving.into() + } else { + ServingStatus::NotServing.into() + }, }; } - }; + }); Ok(Response::new(Box::pin(output))) } diff --git a/video/ingest/src/grpc/ingest.rs b/video/ingest/src/grpc/ingest.rs index b42c8c23..86f70b7e 100644 --- a/video/ingest/src/grpc/ingest.rs +++ b/video/ingest/src/grpc/ingest.rs @@ -68,39 +68,37 @@ impl ingest_server::Ingest for IngestServer { return Err(Status::not_found("Stream not found")); } - let output = try_stream! { + let output = try_stream!({ while let Some(event) = channel_rx.recv().await { let event = match event { - WatchStreamEvent::InitSegment(data) => { - WatchStreamResponse { - data: Some(watch_stream_response::Data::InitSegment(data)), - } + WatchStreamEvent::InitSegment(data) => WatchStreamResponse { + data: Some(watch_stream_response::Data::InitSegment(data)), }, - WatchStreamEvent::MediaSegment(ms) => { - WatchStreamResponse { - data: Some(watch_stream_response::Data::MediaSegment( - watch_stream_response::MediaSegment { - data: ms.data, - keyframe: ms.keyframe, - timestamp: ms.timestamp, - data_type: match ms.ty { - transmuxer::MediaType::Audio => watch_stream_response::media_segment::DataType::Audio.into(), - transmuxer::MediaType::Video => watch_stream_response::media_segment::DataType::Video.into(), + WatchStreamEvent::MediaSegment(ms) => WatchStreamResponse { + data: Some(watch_stream_response::Data::MediaSegment( + watch_stream_response::MediaSegment { + data: ms.data, + keyframe: ms.keyframe, + timestamp: ms.timestamp, + data_type: match ms.ty { + transmuxer::MediaType::Audio => { + watch_stream_response::media_segment::DataType::Audio.into() } - } - )), - } - } - WatchStreamEvent::ShuttingDown(stream_shutdown) => { - WatchStreamResponse { - data: Some(watch_stream_response::Data::ShuttingDown(stream_shutdown)), - } - } + transmuxer::MediaType::Video => { + watch_stream_response::media_segment::DataType::Video.into() + } + }, + }, + )), + }, + WatchStreamEvent::ShuttingDown(stream_shutdown) => WatchStreamResponse { + data: Some(watch_stream_response::Data::ShuttingDown(stream_shutdown)), + }, }; yield event; } - }; + }); Ok(Response::new(Box::pin(output))) } diff --git a/video/ingest/src/tests/ingest.rs b/video/ingest/src/tests/ingest.rs index fb277e4b..85ae62e5 100644 --- a/video/ingest/src/tests/ingest.rs +++ b/video/ingest/src/tests/ingest.rs @@ -275,8 +275,13 @@ impl TestState { let stream = { let global = global.clone(); - stream! { - let mut stream = pin!(global.rmq.basic_consume(global.config.transcoder.events_subject.clone(), "", Default::default(), Default::default())); + stream!({ + let mut stream = pin!(global.rmq.basic_consume( + global.config.transcoder.events_subject.clone(), + "", + Default::default(), + Default::default() + )); loop { select! { message = stream.next() => { @@ -288,7 +293,7 @@ impl TestState { } } } - } + }) }; Self { diff --git a/video/transcoder/src/grpc/health.rs b/video/transcoder/src/grpc/health.rs index 8f63d9ff..ebab92a9 100644 --- a/video/transcoder/src/grpc/health.rs +++ b/video/transcoder/src/grpc/health.rs @@ -49,17 +49,24 @@ impl health_server::Health for HealthServer { async fn watch(&self, _: Request) -> Result> { let global = self.global.clone(); - let output = try_stream! { + let output = try_stream!({ loop { tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - let serving = global.upgrade().map(|g| !g.ctx.is_done()).unwrap_or_default(); + let serving = global + .upgrade() + .map(|g| !g.ctx.is_done()) + .unwrap_or_default(); yield HealthCheckResponse { - status: if serving { ServingStatus::Serving.into() } else { ServingStatus::NotServing.into() }, + status: if serving { + ServingStatus::Serving.into() + } else { + ServingStatus::NotServing.into() + }, }; } - }; + }); Ok(Response::new(Box::pin(output))) } diff --git a/video/transcoder/src/transcoder/job/mod.rs b/video/transcoder/src/transcoder/job/mod.rs index a1e4a53f..37c9a517 100644 --- a/video/transcoder/src/transcoder/job/mod.rs +++ b/video/transcoder/src/transcoder/job/mod.rs @@ -250,7 +250,7 @@ fn report_to_ingest( mut client: IngestClient, mut channel: mpsc::Receiver, ) -> impl Stream> + Send + 'static { - stream! { + stream!({ loop { select! { msg = channel.recv() => { @@ -276,7 +276,7 @@ fn report_to_ingest( } } } - } + }) } impl Job { diff --git a/video/transcoder/src/transcoder/job/track_parser.rs b/video/transcoder/src/transcoder/job/track_parser.rs index a8f4f7bc..dcf4ed9f 100644 --- a/video/transcoder/src/transcoder/job/track_parser.rs +++ b/video/transcoder/src/transcoder/job/track_parser.rs @@ -29,7 +29,7 @@ pub struct TrackSample { pub fn track_parser( mut input: impl Stream> + Unpin, ) -> impl Stream> { - stream! { + stream!({ let mut buffer = BytesMut::new(); // Main loop for parsing the stream @@ -56,28 +56,50 @@ pub fn track_parser( match b { mp4::DynBox::Moov(moov) => { if moov.traks.len() != 1 { - yield Err(io::Error::new(io::ErrorKind::InvalidData, anyhow!("moov box must have exactly one trak box"))); + yield Err(io::Error::new( + io::ErrorKind::InvalidData, + anyhow!("moov box must have exactly one trak box"), + )); return; } yield Ok(TrackOut::Moov(moov)); - }, + } mp4::DynBox::Moof(moof) => { if moof.traf.len() != 1 { - yield Err(io::Error::new(io::ErrorKind::InvalidData, anyhow!("moof box must have exactly one traf box"))); + yield Err(io::Error::new( + io::ErrorKind::InvalidData, + anyhow!("moof box must have exactly one traf box"), + )); return; } let traf = &moof.traf[0]; - let trun = traf.trun.as_ref().ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, anyhow!("traf box must have a trun box")))?; + let trun = traf.trun.as_ref().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidData, + anyhow!("traf box must have a trun box"), + ) + })?; let tfhd = &traf.tfhd; let samples = trun.samples.iter().enumerate().map(|(idx, sample)| { let mut sample = sample.clone(); sample.duration = sample.duration.or(tfhd.default_sample_duration); sample.size = sample.size.or(tfhd.default_sample_size); - sample.flags = Some(sample.flags.or(if idx == 0 { trun.first_sample_flags } else { None }).or(tfhd.default_sample_flags).unwrap_or_default()); - sample.composition_time_offset = Some(sample.composition_time_offset.unwrap_or_default()); + sample.flags = Some( + sample + .flags + .or(if idx == 0 { + trun.first_sample_flags + } else { + None + }) + .or(tfhd.default_sample_flags) + .unwrap_or_default(), + ); + sample.composition_time_offset = + Some(sample.composition_time_offset.unwrap_or_default()); sample }); @@ -85,9 +107,12 @@ pub fn track_parser( let mdat = match mp4::DynBox::demux(&mut cursor) { Ok(DynBox::Mdat(mdat)) => mdat, Ok(_) => { - yield Err(io::Error::new(io::ErrorKind::InvalidData, anyhow!("moof box must be followed by an mdat box"))); + yield Err(io::Error::new( + io::ErrorKind::InvalidData, + anyhow!("moof box must be followed by an mdat box"), + )); return; - }, + } Err(e) => { if e.kind() == io::ErrorKind::UnexpectedEof { // We need more data to parse this box @@ -101,31 +126,42 @@ pub fn track_parser( }; if mdat.data.len() != 1 { - yield Err(io::Error::new(io::ErrorKind::InvalidData, anyhow!("mdat box must have exactly one data box"))); + yield Err(io::Error::new( + io::ErrorKind::InvalidData, + anyhow!("mdat box must have exactly one data box"), + )); return; } let mut mdat_cursor = io::Cursor::new(mdat.data[0].clone()); for sample in samples { let data = if let Some(size) = sample.size { - mdat_cursor.read_slice(size as usize).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, anyhow!("mdat data size not big enough for sample: {}", e)))? + mdat_cursor.read_slice(size as usize).map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + anyhow!("mdat data size not big enough for sample: {}", e), + ) + })? } else { mdat_cursor.get_remaining() }; yield Ok(TrackOut::Sample(TrackSample { duration: sample.duration.unwrap_or_default(), - keyframe: sample.flags.map(|f| f.sample_depends_on == 2).unwrap_or_default(), + keyframe: sample + .flags + .map(|f| f.sample_depends_on == 2) + .unwrap_or_default(), sample, data, })); } - }, - _ => {}, + } + _ => {} } } buffer.extend_from_slice(&cursor.get_remaining()); } - } + }) } diff --git a/video/transcoder/src/transcoder/job/utils.rs b/video/transcoder/src/transcoder/job/utils.rs index c3abd761..810c00cd 100644 --- a/video/transcoder/src/transcoder/job/utils.rs +++ b/video/transcoder/src/transcoder/job/utils.rs @@ -16,13 +16,13 @@ pub fn unix_stream( listener: UnixListener, buffer_size: usize, ) -> impl futures::Stream> { - stream! { + stream!({ let (sock, _) = match listener.accept().timeout(Duration::from_secs(1)).await { Ok(Ok(connection)) => connection, Ok(Err(err)) => { yield Err(err); return; - }, + } Err(_) => { // Timeout tracing::debug!("unix stream timeout"); @@ -38,20 +38,21 @@ pub fn unix_stream( match bio.read().await { Ok(bytes) => { yield Ok(bytes.freeze()); - }, - Err(err) => { - match err { - BytesIOError::ClientClosed => { - return; - }, - _ => { - yield Err(io::Error::new(io::ErrorKind::UnexpectedEof, anyhow!("failed to read from socket: {}", err))); - } - } } + Err(err) => match err { + BytesIOError::ClientClosed => { + return; + } + _ => { + yield Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + anyhow!("failed to read from socket: {}", err), + )); + } + }, } } - } + }) } pub struct SharedFuture> {