Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(platform): offline banners #197

Open
wants to merge 21 commits into
base: feature/website
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions platform/api/src/api/v1/gql/models/channel.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use async_graphql::{ComplexObject, Context, SimpleObject};
use chrono::Utc;
use jwt_next::SignWithKey;
use ulid::Ulid;

use super::category::Category;
use super::date::DateRFC3339;
use super::image_upload::ImageUpload;
use super::ulid::GqlUlid;
use crate::api::v1::gql::error::ext::*;
use crate::api::v1::gql::error::Result;
Expand All @@ -22,11 +24,15 @@ pub struct Channel<G: ApiGlobal> {
pub description: Option<String>,
pub links: Vec<database::ChannelLink>,
pub custom_thumbnail_id: Option<GqlUlid>,
pub offline_banner_id: Option<GqlUlid>,
pub pending_offline_banner_id: Option<GqlUlid>,
pub category_id: Option<GqlUlid>,
pub live: Option<ChannelLive<G>>,
pub last_live_at: Option<DateRFC3339>,

// Custom resolver
#[graphql(skip)]
pub offline_banner_id_: Option<Ulid>,

// Private fields
#[graphql(skip)]
stream_key_: Option<String>,
Expand All @@ -50,6 +56,23 @@ impl<G: ApiGlobal> Channel<G> {
Ok(category.map(Into::into))
}

async fn offline_banner(&self, ctx: &Context<'_>) -> Result<Option<ImageUpload<G>>> {
let Some(offline_banner_id) = self.offline_banner_id_ else {
return Ok(None);
};

let global = ctx.get_global::<G>();

Ok(global
.uploaded_file_by_id_loader()
.load(offline_banner_id)
.await
.map_err_ignored_gql("failed to fetch offline banner")?
.map(ImageUpload::from_uploaded_file)
.transpose()?
.flatten())
}

async fn stream_key(&self, ctx: &Context<'_>) -> Result<Option<&str>> {
auth_guard::<_, G>(ctx, "streamKey", self.stream_key_.as_deref(), self.id.into()).await
}
Expand Down Expand Up @@ -203,7 +226,7 @@ impl<G: ApiGlobal> From<database::Channel> for Channel<G> {
description: value.description,
links: value.links,
custom_thumbnail_id: value.custom_thumbnail_id.map(Into::into),
offline_banner_id: value.offline_banner_id.map(Into::into),
pending_offline_banner_id: value.pending_offline_banner_id.map(Into::into),
category_id: value.category_id.map(Into::into),
live: value.active_connection_id.map(|_| ChannelLive {
room_id: value.room_id.into(),
Expand All @@ -212,6 +235,7 @@ impl<G: ApiGlobal> From<database::Channel> for Channel<G> {
channel_id: value.id,
_phantom: std::marker::PhantomData,
}),
offline_banner_id_: value.offline_banner_id,
last_live_at: value.last_live_at.map(DateRFC3339),
stream_key_,
}
Expand Down
43 changes: 43 additions & 0 deletions platform/api/src/api/v1/gql/mutations/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,47 @@ impl<G: ApiGlobal> ChannelMutation<G> {

Ok(user.into())
}

async fn remove_offline_banner(&self, ctx: &Context<'_>) -> Result<User<G>> {
let global = ctx.get_global::<G>();
let request_context = ctx.get_req_context();

let auth = request_context
.auth(global)
.await?
.map_err_gql(GqlError::Auth(AuthError::NotLoggedIn))?;

let user: database::User = common::database::query(
r#"
UPDATE users
SET
channel_offline_banner_id = NULL,
channel_pending_offline_banner_id = NULL,
updated_at = NOW()
WHERE
id = $1
RETURNING *
"#,
)
.bind(auth.session.user_id)
.build_query_as()
.fetch_one(global.db())
.await?;

global
.nats()
.publish(
SubscriptionTopic::ChannelOfflineBanner(user.id),
pb::scuffle::platform::internal::events::ChannelOfflineBanner {
channel_id: Some(user.id.into()),
offline_banner_id: None,
}
.encode_to_vec()
.into(),
)
.await
.map_err_gql("failed to publish offline banner event")?;

Ok(user.into())
}
}
2 changes: 1 addition & 1 deletion platform/api/src/api/v1/gql/mutations/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl<G: ApiGlobal> UserMutation<G> {
.into(),
)
.await
.map_err_gql("failed to publish message")?;
.map_err_gql("failed to publish profile picture event")?;

Ok(user.into())
}
Expand Down
81 changes: 81 additions & 0 deletions platform/api/src/api/v1/gql/subscription/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::api::auth::AuthError;
use crate::api::v1::gql::error::ext::*;
use crate::api::v1::gql::error::{GqlError, Result};
use crate::api::v1::gql::ext::ContextExt;
use crate::api::v1::gql::models::image_upload::ImageUpload;
use crate::api::v1::gql::models::ulid::GqlUlid;
use crate::global::ApiGlobal;
use crate::subscription::SubscriptionTopic;
Expand All @@ -33,8 +34,88 @@ struct ChannelLiveStream {
pub live: bool,
}

#[derive(SimpleObject)]
struct ChannelOfflineBannerStream<G: ApiGlobal> {
pub channel_id: GqlUlid,
pub offline_banner: Option<ImageUpload<G>>,
}

#[Subscription]
impl<G: ApiGlobal> ChannelSubscription<G> {
async fn channel_offline_banner<'ctx>(
&self,
ctx: &'ctx Context<'ctx>,
channel_id: GqlUlid,
) -> Result<impl Stream<Item = Result<ChannelOfflineBannerStream<G>>> + 'ctx> {
let global = ctx.get_global::<G>();

let Some(offline_banner_id) = global
.user_by_id_loader()
.load(channel_id.to_ulid())
.await
.map_err_ignored_gql("failed to fetch channel")?
.map(|u| u.channel.offline_banner_id)
else {
return Err(GqlError::InvalidInput {
fields: vec!["channelId"],
message: "channel not found",
}
.into());
};

let mut subscription = global
.subscription_manager()
.subscribe(SubscriptionTopic::ChannelOfflineBanner(channel_id.to_ulid()))
.await
.map_err_gql("failed to subscribe to channel offline banner")?;

let offline_banner = if let Some(offline_banner_id) = offline_banner_id {
global
.uploaded_file_by_id_loader()
.load(offline_banner_id)
.await
.map_err_ignored_gql("failed to fetch offline banner")?
.map(ImageUpload::from_uploaded_file)
.transpose()?
.flatten()
} else {
None
};

Ok(async_stream::stream!({
yield Ok(ChannelOfflineBannerStream {
channel_id,
offline_banner,
});

while let Ok(message) = subscription.recv().await {
let event = pb::scuffle::platform::internal::events::ChannelOfflineBanner::decode(message.payload)
.map_err_ignored_gql("failed to decode channel offline banner event")?;

let channel_id = event.channel_id.into_ulid();
let offline_banner_id = event.offline_banner_id.map(|u| u.into_ulid());

let offline_banner = if let Some(offline_banner_id) = offline_banner_id {
global
.uploaded_file_by_id_loader()
.load(offline_banner_id)
.await
.map_err_ignored_gql("failed to fetch offline banner")?
.map(ImageUpload::from_uploaded_file)
.transpose()?
.flatten()
} else {
None
};

yield Ok(ChannelOfflineBannerStream {
channel_id: channel_id.into(),
offline_banner,
});
}
}))
}

async fn channel_follows<'ctx>(
&self,
ctx: &'ctx Context<'ctx>,
Expand Down
12 changes: 6 additions & 6 deletions platform/api/src/api/v1/gql/subscription/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl<G: ApiGlobal> UserSubscription<G> {

while let Ok(message) = subscription.recv().await {
let event = pb::scuffle::platform::internal::events::UserDisplayName::decode(message.payload)
.map_err_ignored_gql("failed to decode user display name")?;
.map_err_ignored_gql("failed to decode user display name event")?;

let user_id = event.user_id.into_ulid();

Expand Down Expand Up @@ -112,7 +112,7 @@ impl<G: ApiGlobal> UserSubscription<G> {
.subscription_manager()
.subscribe(SubscriptionTopic::UserDisplayColor(user_id.to_ulid()))
.await
.map_err_gql("failed to subscribe to user display name")?;
.map_err_gql("failed to subscribe to user display color")?;

Ok(async_stream::stream!({
yield Ok(UserDisplayColorStream {
Expand All @@ -122,7 +122,7 @@ impl<G: ApiGlobal> UserSubscription<G> {

while let Ok(message) = subscription.recv().await {
let event = pb::scuffle::platform::internal::events::UserDisplayColor::decode(message.payload)
.map_err_ignored_gql("failed to decode user display name")?;
.map_err_ignored_gql("failed to decode user display color event")?;

let user_id = event.user_id.into_ulid();

Expand Down Expand Up @@ -159,7 +159,7 @@ impl<G: ApiGlobal> UserSubscription<G> {
.subscription_manager()
.subscribe(SubscriptionTopic::UserProfilePicture(user_id.to_ulid()))
.await
.map_err_gql("failed to subscribe to user display name")?;
.map_err_gql("failed to subscribe to user profile picture")?;

let profile_picture = if let Some(profile_picture_id) = profile_picture_id {
global
Expand All @@ -182,7 +182,7 @@ impl<G: ApiGlobal> UserSubscription<G> {

while let Ok(message) = subscription.recv().await {
let event = pb::scuffle::platform::internal::events::UserProfilePicture::decode(message.payload)
.map_err_ignored_gql("failed to decode user display name")?;
.map_err_ignored_gql("failed to decode user profile picture event")?;

let user_id = event.user_id.into_ulid();
let profile_picture_id = event.profile_picture_id.map(|u| u.into_ulid());
Expand Down Expand Up @@ -259,7 +259,7 @@ impl<G: ApiGlobal> UserSubscription<G> {

while let Ok(message) = subscription.recv().await {
let event = pb::scuffle::platform::internal::events::UserFollowChannel::decode(message.payload)
.map_err_ignored_gql("failed to decode user follow")?;
.map_err_ignored_gql("failed to decode user follow event")?;

let user_id = event.user_id.into_ulid();

Expand Down
Loading
Loading