Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: fix formatting #96

Merged
merged 1 commit into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions backend/api/src/api/v1/gql/subscription/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,30 @@ impl ChatSubscription {
.await
.map_err_gql("failed to subscribe to chat messages")?;

Ok(stream! {
Ok(stream!({
yield Ok(welcome_message);
while let Ok(message) = message_stream.recv().await {
let event = pb::scuffle::events::ChatMessage::decode(
message.as_bytes().map_err_gql("invalid redis value type")?
).map_err_gql("failed to decode chat message")?;
message.as_bytes().map_err_gql("invalid redis value type")?,
)
.map_err_gql("failed to decode chat message")?;

yield Ok(ChatMessage {
id: Uuid::parse_str(&event.id).map_err_gql("failed to parse chat message id")?,
author_id: Uuid::parse_str(&event.author_id).map_err_gql("failed to parse chat message author id")?,
channel_id: Uuid::parse_str(&event.channel_id).map_err_gql("failed to parse chat message channel id")?,
id: Uuid::parse_str(&event.id)
.map_err_gql("failed to parse chat message id")?,
author_id: Uuid::parse_str(&event.author_id)
.map_err_gql("failed to parse chat message author id")?,
channel_id: Uuid::parse_str(&event.channel_id)
.map_err_gql("failed to parse chat message channel id")?,
content: event.content,
created_at: Utc.timestamp_opt(event.created_at, 0).single().map_err_gql("failed to parse chat message created at")?.into(),
created_at: Utc
.timestamp_opt(event.created_at, 0)
.single()
.map_err_gql("failed to parse chat message created at")?
.into(),
r#type: MessageType::User,
});
}
})
}))
}
}
9 changes: 5 additions & 4 deletions backend/api/src/api/v1/gql/subscription/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,17 @@ impl UserSubscription {
.await
.map_err_gql("failed to subscribe to user display name")?;

Ok(async_stream::stream! {
Ok(async_stream::stream!({
yield Ok(DisplayNameStream {
display_name: user.display_name.clone(),
username: user.username.clone(),
});

while let Ok(message) = subscription.recv().await {
let event = pb::scuffle::events::UserDisplayName::decode(
message.as_bytes().map_err_gql("invalid redis value")?
).map_err_gql("failed to decode user display name")?;
message.as_bytes().map_err_gql("invalid redis value")?,
)
.map_err_gql("failed to decode user display name")?;

if let Some(username) = event.username {
user.username = username;
Expand All @@ -63,6 +64,6 @@ impl UserSubscription {
username: user.username.clone(),
});
}
})
}))
}
}
15 changes: 11 additions & 4 deletions backend/api/src/grpc/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,24 @@ impl health_server::Health for HealthServer {
async fn watch(&self, _: Request<HealthCheckRequest>) -> Result<Response<Self::WatchStream>> {
let global = self.global.clone();

let output = try_stream! {
let output = try_stream!({
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

let serving = global.upgrade().map(|g| !g.ctx.is_done()).unwrap_or_default();
let serving = global
.upgrade()
.map(|g| !g.ctx.is_done())
.unwrap_or_default();

yield HealthCheckResponse {
status: if serving { ServingStatus::Serving.into() } else { ServingStatus::NotServing.into() },
status: if serving {
ServingStatus::Serving.into()
} else {
ServingStatus::NotServing.into()
},
};
}
};
});

Ok(Response::new(Box::pin(output)))
}
Expand Down
28 changes: 14 additions & 14 deletions common/src/rmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,35 +127,35 @@ impl ConnectionPool {
let queue_name = queue_name.to_string();
let connection_name = connection_name.to_string();

stream! {
stream!({
'connection_loop: loop {
let channel = self.aquire().await?;
let mut consumer = channel.basic_consume(&queue_name, &connection_name, options, table.clone()).await?;
let mut consumer = channel
.basic_consume(&queue_name, &connection_name, options, table.clone())
.await?;
loop {
let m = consumer.next().await;
match m {
Some(Ok(m)) => {
yield Ok(m);
},
Some(Err(e)) => {
match e {
lapin::Error::IOError(e) => {
if e.kind() == std::io::ErrorKind::ConnectionReset {
continue 'connection_loop;
}
},
_ => {
yield Err(anyhow!("failed to get message: {}", e));
}
Some(Err(e)) => match e {
lapin::Error::IOError(e) => {
if e.kind() == std::io::ErrorKind::ConnectionReset {
continue 'connection_loop;
}
}
_ => {
yield Err(anyhow!("failed to get message: {}", e));
}
},
None => {
continue 'connection_loop;
},
}
}
}
}
}
})
}

pub async fn aquire(&self) -> Result<Channel> {
Expand Down
15 changes: 11 additions & 4 deletions video/edge/src/grpc/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,24 @@ impl health_server::Health for HealthServer {
async fn watch(&self, _: Request<HealthCheckRequest>) -> Result<Response<Self::WatchStream>> {
let global = self.global.clone();

let output = try_stream! {
let output = try_stream!({
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

let serving = global.upgrade().map(|g| !g.ctx.is_done()).unwrap_or_default();
let serving = global
.upgrade()
.map(|g| !g.ctx.is_done())
.unwrap_or_default();

yield HealthCheckResponse {
status: if serving { ServingStatus::Serving.into() } else { ServingStatus::NotServing.into() },
status: if serving {
ServingStatus::Serving.into()
} else {
ServingStatus::NotServing.into()
},
};
}
};
});

Ok(Response::new(Box::pin(output)))
}
Expand Down
15 changes: 11 additions & 4 deletions video/ingest/src/grpc/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,24 @@ impl health_server::Health for HealthServer {
async fn watch(&self, _: Request<HealthCheckRequest>) -> Result<Response<Self::WatchStream>> {
let global = self.global.clone();

let output = try_stream! {
let output = try_stream!({
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

let serving = global.upgrade().map(|g| !g.ctx.is_done()).unwrap_or_default();
let serving = global
.upgrade()
.map(|g| !g.ctx.is_done())
.unwrap_or_default();

yield HealthCheckResponse {
status: if serving { ServingStatus::Serving.into() } else { ServingStatus::NotServing.into() },
status: if serving {
ServingStatus::Serving.into()
} else {
ServingStatus::NotServing.into()
},
};
}
};
});

Ok(Response::new(Box::pin(output)))
}
Expand Down
48 changes: 23 additions & 25 deletions video/ingest/src/grpc/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,39 +68,37 @@ impl ingest_server::Ingest for IngestServer {
return Err(Status::not_found("Stream not found"));
}

let output = try_stream! {
let output = try_stream!({
while let Some(event) = channel_rx.recv().await {
let event = match event {
WatchStreamEvent::InitSegment(data) => {
WatchStreamResponse {
data: Some(watch_stream_response::Data::InitSegment(data)),
}
WatchStreamEvent::InitSegment(data) => WatchStreamResponse {
data: Some(watch_stream_response::Data::InitSegment(data)),
},
WatchStreamEvent::MediaSegment(ms) => {
WatchStreamResponse {
data: Some(watch_stream_response::Data::MediaSegment(
watch_stream_response::MediaSegment {
data: ms.data,
keyframe: ms.keyframe,
timestamp: ms.timestamp,
data_type: match ms.ty {
transmuxer::MediaType::Audio => watch_stream_response::media_segment::DataType::Audio.into(),
transmuxer::MediaType::Video => watch_stream_response::media_segment::DataType::Video.into(),
WatchStreamEvent::MediaSegment(ms) => WatchStreamResponse {
data: Some(watch_stream_response::Data::MediaSegment(
watch_stream_response::MediaSegment {
data: ms.data,
keyframe: ms.keyframe,
timestamp: ms.timestamp,
data_type: match ms.ty {
transmuxer::MediaType::Audio => {
watch_stream_response::media_segment::DataType::Audio.into()
}
}
)),
}
}
WatchStreamEvent::ShuttingDown(stream_shutdown) => {
WatchStreamResponse {
data: Some(watch_stream_response::Data::ShuttingDown(stream_shutdown)),
}
}
transmuxer::MediaType::Video => {
watch_stream_response::media_segment::DataType::Video.into()
}
},
},
)),
},
WatchStreamEvent::ShuttingDown(stream_shutdown) => WatchStreamResponse {
data: Some(watch_stream_response::Data::ShuttingDown(stream_shutdown)),
},
};

yield event;
}
};
});

Ok(Response::new(Box::pin(output)))
}
Expand Down
11 changes: 8 additions & 3 deletions video/ingest/src/tests/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,13 @@ impl TestState {

let stream = {
let global = global.clone();
stream! {
let mut stream = pin!(global.rmq.basic_consume(global.config.transcoder.events_subject.clone(), "", Default::default(), Default::default()));
stream!({
let mut stream = pin!(global.rmq.basic_consume(
global.config.transcoder.events_subject.clone(),
"",
Default::default(),
Default::default()
));
loop {
select! {
message = stream.next() => {
Expand All @@ -288,7 +293,7 @@ impl TestState {
}
}
}
}
})
};

Self {
Expand Down
15 changes: 11 additions & 4 deletions video/transcoder/src/grpc/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,24 @@ impl health_server::Health for HealthServer {
async fn watch(&self, _: Request<HealthCheckRequest>) -> Result<Response<Self::WatchStream>> {
let global = self.global.clone();

let output = try_stream! {
let output = try_stream!({
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

let serving = global.upgrade().map(|g| !g.ctx.is_done()).unwrap_or_default();
let serving = global
.upgrade()
.map(|g| !g.ctx.is_done())
.unwrap_or_default();

yield HealthCheckResponse {
status: if serving { ServingStatus::Serving.into() } else { ServingStatus::NotServing.into() },
status: if serving {
ServingStatus::Serving.into()
} else {
ServingStatus::NotServing.into()
},
};
}
};
});

Ok(Response::new(Box::pin(output)))
}
Expand Down
4 changes: 2 additions & 2 deletions video/transcoder/src/transcoder/job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ fn report_to_ingest(
mut client: IngestClient<Channel>,
mut channel: mpsc::Receiver<TranscoderEventRequest>,
) -> impl Stream<Item = Result<()>> + Send + 'static {
stream! {
stream!({
loop {
select! {
msg = channel.recv() => {
Expand All @@ -276,7 +276,7 @@ fn report_to_ingest(
}
}
}
}
})
}

impl Job {
Expand Down
Loading
Loading