From 4a7486e411c0428fcaf95e7aac511daa6e4ee911 Mon Sep 17 00:00:00 2001 From: Troy Benson Date: Mon, 29 May 2023 00:19:38 +0000 Subject: [PATCH] fix: refactor stream variants Completes #86 bullet point 4 This change refactors how we store the stream variants and also sets up the transcoder for unit tests and cleans up the way transcoder invokes ffmpeg. --- Cargo.lock | 3 + backend/api/src/database/mod.rs | 2 +- backend/api/src/database/protobuf.rs | 64 ++ backend/api/src/database/stream.rs | 5 + backend/api/src/database/stream_variant.rs | 35 - backend/api/src/dataloader/mod.rs | 1 - backend/api/src/dataloader/stream_variant.rs | 47 - backend/api/src/global/mod.rs | 3 - backend/api/src/gql.nocov.rs | 1 + backend/api/src/grpc/api.rs | 113 +-- backend/api/src/grpc/health.rs | 2 +- backend/api/src/grpc/mod.rs | 1 - backend/api/src/main.rs | 1 + backend/api/src/{grpc => }/pb.rs | 0 backend/api/src/tests/dataloader/mod.rs | 1 - .../src/tests/dataloader/stream_variant.rs | 179 ---- backend/api/src/tests/grpc/api.rs | 411 +++------ backend/api/src/tests/grpc/health.rs | 17 +- backend/api/src/tests/grpc/tls.rs | 15 +- ...024406_Initial_Database_Structure.down.sql | 1 - ...17024406_Initial_Database_Structure.up.sql | 25 +- maskfile.md | 10 +- proto/scuffle/backend/api.proto | 22 +- proto/scuffle/events/transcoder.proto | 4 +- proto/scuffle/types/stream_variant.proto | 45 - proto/scuffle/types/stream_variants.proto | 59 ++ proto/scuffle/video/ingest.proto | 9 + video/container/mp4/src/codec.rs | 211 ++++- video/ingest/Cargo.toml | 1 + video/ingest/src/connection_manager.rs | 11 +- video/ingest/src/grpc/ingest.rs | 38 +- video/ingest/src/ingest/connection.rs | 124 +-- video/ingest/src/ingest/variants.rs | 155 +++- video/ingest/src/tests/grpc/ingest.rs | 6 +- video/ingest/src/tests/ingest.rs | 842 +++++++----------- video/transcoder/Cargo.toml | 2 + video/transcoder/src/global.rs | 4 +- video/transcoder/src/main.rs | 2 +- video/transcoder/src/tests/mod.rs | 1 + video/transcoder/src/tests/transcoder/mod.rs | 185 ++++ video/transcoder/src/transcoder/job/mod.rs | 411 ++++++--- 41 files changed, 1537 insertions(+), 1532 deletions(-) create mode 100644 backend/api/src/database/protobuf.rs delete mode 100644 backend/api/src/database/stream_variant.rs delete mode 100644 backend/api/src/dataloader/stream_variant.rs rename backend/api/src/{grpc => }/pb.rs (100%) delete mode 100644 backend/api/src/tests/dataloader/stream_variant.rs delete mode 100644 proto/scuffle/types/stream_variant.proto create mode 100644 proto/scuffle/types/stream_variants.proto create mode 100644 video/transcoder/src/tests/transcoder/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 1b6057fd..d811fe25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1708,6 +1708,7 @@ dependencies = [ name = "ingest" version = "0.1.0" dependencies = [ + "aac", "anyhow", "async-stream", "async-trait", @@ -3989,6 +3990,7 @@ dependencies = [ name = "transcoder" version = "0.1.0" dependencies = [ + "aac", "anyhow", "async-stream", "async-trait", @@ -4020,6 +4022,7 @@ dependencies = [ "tonic-build", "tracing", "url-parse", + "uuid", ] [[package]] diff --git a/backend/api/src/database/mod.rs b/backend/api/src/database/mod.rs index a77bed5a..1918d3dd 100644 --- a/backend/api/src/database/mod.rs +++ b/backend/api/src/database/mod.rs @@ -2,9 +2,9 @@ pub mod channel_role; pub mod channel_role_grant; pub mod global_role; pub mod global_role_grant; +pub mod protobuf; pub mod session; pub mod stream; pub mod stream_bitrate_update; pub mod stream_event; -pub mod stream_variant; pub mod user; diff --git a/backend/api/src/database/protobuf.rs b/backend/api/src/database/protobuf.rs new file mode 100644 index 00000000..b004118d --- /dev/null +++ b/backend/api/src/database/protobuf.rs @@ -0,0 +1,64 @@ +#[derive(Debug, Clone, Default)] +pub enum ProtobufValue { + #[default] + None, + Some(T), + Err(prost::DecodeError), +} + +impl ProtobufValue { + #[allow(dead_code)] + pub fn unwrap(self) -> Option { + match self { + Self::Some(data) => Some(data), + Self::None => None, + Self::Err(err) => panic!( + "called `ProtobufValue::unwrap()` on a `Err` value: {:?}", + err + ), + } + } +} + +impl From> for ProtobufValue +where + ProtobufValue: From, +{ + fn from(data: Option) -> Self { + match data { + Some(data) => Self::from(data), + None => Self::None, + } + } +} + +impl From> for ProtobufValue { + fn from(data: Vec) -> Self { + match T::decode(data.as_slice()) { + Ok(variants) => Self::Some(variants), + Err(e) => Self::Err(e), + } + } +} + +impl PartialEq for ProtobufValue { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::None, Self::None) => true, + (Self::Some(a), Self::Some(b)) => a == b, + _ => false, + } + } +} + +impl PartialEq> + for ProtobufValue +{ + fn eq(&self, other: &Option) -> bool { + match (self, other) { + (Self::None, None) => true, + (Self::Some(a), Some(b)) => a == b, + _ => false, + } + } +} diff --git a/backend/api/src/database/stream.rs b/backend/api/src/database/stream.rs index 9eb553f7..a12f80c6 100644 --- a/backend/api/src/database/stream.rs +++ b/backend/api/src/database/stream.rs @@ -1,6 +1,9 @@ +use crate::pb::scuffle::types::StreamVariants; use chrono::{DateTime, Utc}; use uuid::Uuid; +use super::protobuf::ProtobufValue; + #[derive(Debug, Clone, Default, Copy, Eq, PartialEq)] #[repr(i64)] pub enum State { @@ -62,6 +65,8 @@ pub struct Model { pub ingest_address: String, /// The connection which owns the stream. pub connection_id: Uuid, + /// The Stream Variants + pub variants: ProtobufValue, /// The time the stream was created. pub created_at: DateTime, /// The time the stream was last updated. diff --git a/backend/api/src/database/stream_variant.rs b/backend/api/src/database/stream_variant.rs deleted file mode 100644 index 642e84d7..00000000 --- a/backend/api/src/database/stream_variant.rs +++ /dev/null @@ -1,35 +0,0 @@ -use chrono::{DateTime, Utc}; -use serde_json::Value; -use uuid::Uuid; - -#[derive(Debug, Clone, Default, sqlx::FromRow)] -pub struct Model { - /// The unique identifier for the stream variant. - pub id: Uuid, - /// The unique identifier for the stream. - pub stream_id: Uuid, - /// The name of the stream variant. - pub name: String, - /// The width of the stream variant. (if null then the stream variant is not a video stream) - pub video_width: Option, - /// The height of the stream variant. (if null then the stream variant is not a video stream) - pub video_height: Option, - /// The framerate of the stream variant. (if null then the stream variant is not a video stream) - pub video_framerate: Option, - /// The bandwidth in bits/s of the stream variant. - pub video_bitrate: Option, - /// Video codec of the stream variant. - pub video_codec: Option, - /// The audio sample rate of the stream variant. - pub audio_sample_rate: Option, - /// The number of audio channels of the stream variant. - pub audio_channels: Option, - /// The bandwidth in bits/s of the stream variant. - pub audio_bitrate: Option, - // Audio Codec of the stream variant. - pub audio_codec: Option, - /// Metadata - pub metadata: Value, - /// The time the stream variant was created. - pub created_at: DateTime, -} diff --git a/backend/api/src/dataloader/mod.rs b/backend/api/src/dataloader/mod.rs index 9d57a17d..09eb5b76 100644 --- a/backend/api/src/dataloader/mod.rs +++ b/backend/api/src/dataloader/mod.rs @@ -1,5 +1,4 @@ pub mod session; pub mod stream; -pub mod stream_variant; pub mod user; pub mod user_permissions; diff --git a/backend/api/src/dataloader/stream_variant.rs b/backend/api/src/dataloader/stream_variant.rs deleted file mode 100644 index 2ad99220..00000000 --- a/backend/api/src/dataloader/stream_variant.rs +++ /dev/null @@ -1,47 +0,0 @@ -use crate::database::stream_variant; -use async_graphql::{ - async_trait::async_trait, - dataloader::{DataLoader, Loader}, -}; -use std::{collections::HashMap, sync::Arc}; -use uuid::Uuid; - -pub struct StreamVariantsByStreamIdLoader { - db: Arc, -} - -impl StreamVariantsByStreamIdLoader { - pub fn new(db: Arc) -> DataLoader { - DataLoader::new(Self { db }, tokio::spawn) - } -} - -#[async_trait] -impl Loader for StreamVariantsByStreamIdLoader { - type Value = Vec; - type Error = Arc; - - async fn load(&self, keys: &[Uuid]) -> Result, Self::Error> { - let results = sqlx::query_as!( - stream_variant::Model, - "SELECT * FROM stream_variants WHERE stream_id = ANY($1)", - &keys - ) - .fetch_all(&*self.db) - .await - .map_err(|e| { - tracing::error!("Failed to fetch stream variants: {}", e); - Arc::new(e) - })?; - - let mut map = HashMap::new(); - - for result in results { - map.entry(result.stream_id) - .or_insert_with(Vec::new) - .push(result); - } - - Ok(map) - } -} diff --git a/backend/api/src/global/mod.rs b/backend/api/src/global/mod.rs index f8a29488..16cdc4c0 100644 --- a/backend/api/src/global/mod.rs +++ b/backend/api/src/global/mod.rs @@ -5,7 +5,6 @@ use common::context::Context; use crate::config::AppConfig; use crate::dataloader::stream::StreamByIdLoader; -use crate::dataloader::stream_variant::StreamVariantsByStreamIdLoader; use crate::dataloader::user_permissions::UserPermissionsByIdLoader; use crate::dataloader::{ session::SessionByIdLoader, user::UserByIdLoader, user::UserByUsernameLoader, @@ -22,7 +21,6 @@ pub struct GlobalState { pub session_by_id_loader: DataLoader, pub user_permisions_by_id_loader: DataLoader, pub stream_by_id_loader: DataLoader, - pub stream_variants_by_stream_id_loader: DataLoader, pub rmq: common::rmq::ConnectionPool, } @@ -41,7 +39,6 @@ impl GlobalState { session_by_id_loader: SessionByIdLoader::new(db.clone()), user_permisions_by_id_loader: UserPermissionsByIdLoader::new(db.clone()), stream_by_id_loader: StreamByIdLoader::new(db.clone()), - stream_variants_by_stream_id_loader: StreamVariantsByStreamIdLoader::new(db.clone()), db, rmq, } diff --git a/backend/api/src/gql.nocov.rs b/backend/api/src/gql.nocov.rs index fed4cd02..5b7ba745 100644 --- a/backend/api/src/gql.nocov.rs +++ b/backend/api/src/gql.nocov.rs @@ -7,6 +7,7 @@ mod config; mod database; mod dataloader; mod global; +mod pb; use api::v1::gql::schema; use async_graphql::SDLExportOptions; diff --git a/backend/api/src/grpc/api.rs b/backend/api/src/grpc/api.rs index 24efad50..6ccdbc9c 100644 --- a/backend/api/src/grpc/api.rs +++ b/backend/api/src/grpc/api.rs @@ -4,22 +4,18 @@ use std::sync::{Arc, Weak}; use crate::database::{ global_role, stream::{self, State}, - stream_event, stream_variant, + stream_event, }; use chrono::{Duration, TimeZone, Utc}; -use sqlx::{Executor, Postgres, QueryBuilder}; +use prost::Message; use tonic::{async_trait, Request, Response, Status}; use uuid::Uuid; -use super::pb::scuffle::{ - backend::{ - api_server, - update_live_stream_request::{event::Level, update::Update}, - AuthenticateLiveStreamRequest, AuthenticateLiveStreamResponse, LiveStreamState, - NewLiveStreamRequest, NewLiveStreamResponse, UpdateLiveStreamRequest, - UpdateLiveStreamResponse, - }, - types::StreamVariant, +use crate::pb::scuffle::backend::{ + api_server, + update_live_stream_request::{event::Level, update::Update}, + AuthenticateLiveStreamRequest, AuthenticateLiveStreamResponse, LiveStreamState, + NewLiveStreamRequest, NewLiveStreamResponse, UpdateLiveStreamRequest, UpdateLiveStreamResponse, }; type Result = std::result::Result; @@ -38,88 +34,6 @@ impl ApiServer { pub fn into_service(self) -> api_server::ApiServer { api_server::ApiServer::new(self) } - - async fn insert_stream_variants<'c, T: Executor<'c, Database = Postgres>>( - tx: T, - stream_id: Uuid, - variants: &Vec, - ) -> Result<()> { - // Insert the new stream variants - let mut values = Vec::new(); - - // Unfortunately, we can't use the `sqlx::query!` macro here because it doesn't support - // batch inserts. So we have to build the query manually. This is a bit of a pain, because - // the query is not compile time checked, but it's better than nothing. - let mut query_builder = QueryBuilder::new( - " - INSERT INTO stream_variants ( - id, - stream_id, - name, - video_framerate, - video_height, - video_width, - video_bitrate, - video_codec, - audio_bitrate, - audio_channels, - audio_sample_rate, - audio_codec, - metadata, - created_at - ) ", - ); - - for variant in variants { - let variant_id = variant.id.parse::().map_err(|_| { - Status::invalid_argument("invalid variant ID: must be a valid UUID") - })?; - - values.push(stream_variant::Model { - id: variant_id, - stream_id, - name: variant.name.clone(), - video_framerate: variant.video_settings.as_ref().map(|v| v.framerate as i64), - video_height: variant.video_settings.as_ref().map(|v| v.height as i64), - video_width: variant.video_settings.as_ref().map(|v| v.width as i64), - video_bitrate: variant.video_settings.as_ref().map(|v| v.bitrate as i64), - video_codec: variant.video_settings.as_ref().map(|v| v.codec.clone()), - audio_bitrate: variant.audio_settings.as_ref().map(|a| a.bitrate as i64), - audio_channels: variant.audio_settings.as_ref().map(|a| a.channels as i64), - audio_sample_rate: variant - .audio_settings - .as_ref() - .map(|a| a.sample_rate as i64), - audio_codec: variant.audio_settings.as_ref().map(|a| a.codec.clone()), - metadata: serde_json::from_str(&variant.metadata).unwrap_or_default(), - created_at: Utc::now(), - }) - } - - query_builder.push_values(values, |mut b, variant| { - b.push_bind(variant.id) - .push_bind(variant.stream_id) - .push_bind(variant.name) - .push_bind(variant.video_framerate) - .push_bind(variant.video_height) - .push_bind(variant.video_width) - .push_bind(variant.video_bitrate) - .push_bind(variant.video_codec) - .push_bind(variant.audio_bitrate) - .push_bind(variant.audio_channels) - .push_bind(variant.audio_sample_rate) - .push_bind(variant.audio_codec) - .push_bind(variant.metadata) - .push_bind(variant.created_at); - }); - - query_builder.build().execute(tx).await.map_err(|e| { - tracing::error!("failed to insert stream variants: {}", e); - Status::internal("internal server error") - })?; - - Ok(()) - } } #[async_trait] impl api_server::Api for ApiServer { @@ -230,8 +144,7 @@ impl api_server::Api for ApiServer { stream_id: stream.id.to_string(), record, transcode, - try_resume: false, - variants: vec![], + variants: None, })) } @@ -396,11 +309,10 @@ impl api_server::Api for ApiServer { })?; } Update::Variants(v) => { - ApiServer::insert_stream_variants(&mut *tx, stream_id, &v.variants).await?; - sqlx::query!( - "UPDATE streams SET updated_at = NOW() WHERE id = $1", + "UPDATE streams SET updated_at = NOW(), variants = $2 WHERE id = $1", stream_id, + v.encode_to_vec(), ) .execute(&mut *tx) .await @@ -485,11 +397,10 @@ impl api_server::Api for ApiServer { Status::internal("internal server error") })?; - ApiServer::insert_stream_variants(&mut *tx, stream_id, &request.variants).await?; - sqlx::query!( - "UPDATE streams SET updated_at = NOW() WHERE id = $1", + "UPDATE streams SET updated_at = NOW(), variants = $2 WHERE id = $1", stream_id, + request.variants.unwrap_or_default().encode_to_vec(), ) .execute(&mut *tx) .await diff --git a/backend/api/src/grpc/health.rs b/backend/api/src/grpc/health.rs index 8dc27bd3..dc7b63cc 100644 --- a/backend/api/src/grpc/health.rs +++ b/backend/api/src/grpc/health.rs @@ -8,7 +8,7 @@ use async_stream::try_stream; use futures_util::Stream; use tonic::{async_trait, Request, Response, Status}; -use super::pb::health::{ +use crate::pb::health::{ health_check_response::ServingStatus, health_server, HealthCheckRequest, HealthCheckResponse, }; diff --git a/backend/api/src/grpc/mod.rs b/backend/api/src/grpc/mod.rs index da18f9b3..e2d0c360 100644 --- a/backend/api/src/grpc/mod.rs +++ b/backend/api/src/grpc/mod.rs @@ -6,7 +6,6 @@ use tonic::transport::{Certificate, Identity, Server, ServerTlsConfig}; pub mod api; pub mod health; -pub mod pb; pub async fn run(global: Arc) -> Result<()> { tracing::info!("GRPC Listening on {}", global.config.grpc.bind_address); diff --git a/backend/api/src/main.rs b/backend/api/src/main.rs index 6a6dd82f..58d518ad 100644 --- a/backend/api/src/main.rs +++ b/backend/api/src/main.rs @@ -13,6 +13,7 @@ mod database; mod dataloader; mod global; mod grpc; +mod pb; #[cfg(test)] mod tests; diff --git a/backend/api/src/grpc/pb.rs b/backend/api/src/pb.rs similarity index 100% rename from backend/api/src/grpc/pb.rs rename to backend/api/src/pb.rs diff --git a/backend/api/src/tests/dataloader/mod.rs b/backend/api/src/tests/dataloader/mod.rs index 2fd88670..e52a97e3 100644 --- a/backend/api/src/tests/dataloader/mod.rs +++ b/backend/api/src/tests/dataloader/mod.rs @@ -1,5 +1,4 @@ mod session; -mod stream_variant; mod streams; mod user; mod user_permissions; diff --git a/backend/api/src/tests/dataloader/stream_variant.rs b/backend/api/src/tests/dataloader/stream_variant.rs deleted file mode 100644 index a4243c6b..00000000 --- a/backend/api/src/tests/dataloader/stream_variant.rs +++ /dev/null @@ -1,179 +0,0 @@ -use crate::tests::global::mock_global_state; - -use crate::database::{stream, stream_variant, user}; -use chrono::Utc; -use serde_json::json; -use serial_test::serial; -use uuid::Uuid; - -#[serial] -#[tokio::test] -async fn test_serial_stream_varariants_by_stream_id_loader() { - let (global, _) = mock_global_state(Default::default()).await; - - sqlx::query!("DELETE FROM users") - .execute(&*global.db) - .await - .unwrap(); - sqlx::query!("DELETE FROM streams") - .execute(&*global.db) - .await - .unwrap(); - let user = - sqlx::query_as!(user::Model, - "INSERT INTO users(username, display_name, email, password_hash, stream_key) VALUES ($1, $1, $2, $3, $4) RETURNING *", - "admin", - "admin@admin.com", - user::hash_password("admin"), - user::generate_stream_key(), - ) - .fetch_one(&*global.db) - .await - .unwrap(); - - let conn_id = Uuid::new_v4(); - let s = sqlx::query_as!(stream::Model, - "INSERT INTO streams (channel_id, title, description, recorded, transcoded, ingest_address, connection_id) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING *", - user.id, - "test", - "test", - false, - false, - "some address", - conn_id, - ).fetch_one(&*global.db).await.unwrap(); - - let variants = vec![ - stream_variant::Model { - id: Uuid::new_v4(), - name: "video-audio".to_string(), - stream_id: s.id, - audio_bitrate: Some(128), - audio_channels: Some(2), - audio_sample_rate: Some(44100), - video_bitrate: Some(12800), - video_framerate: Some(30), - video_height: Some(720), - video_width: Some(1280), - audio_codec: Some("aac".to_string()), - video_codec: Some("h264".to_string()), - created_at: Utc::now(), - metadata: json!({}), - }, - stream_variant::Model { - id: Uuid::new_v4(), - name: "video-only".to_string(), - stream_id: s.id, - audio_bitrate: None, - audio_channels: None, - audio_sample_rate: None, - video_bitrate: Some(12800), - video_framerate: Some(30), - video_height: Some(720), - video_width: Some(1280), - audio_codec: None, - video_codec: Some("h264".to_string()), - created_at: Utc::now(), - metadata: json!({}), - }, - stream_variant::Model { - id: Uuid::new_v4(), - name: "audio-only".to_string(), - stream_id: s.id, - audio_bitrate: Some(128), - audio_channels: Some(2), - audio_sample_rate: Some(44100), - video_bitrate: None, - video_framerate: None, - video_height: None, - video_width: None, - audio_codec: Some("aac".to_string()), - video_codec: None, - created_at: Utc::now(), - metadata: json!({}), - }, - ]; - - for v in &variants { - sqlx::query!("INSERT INTO stream_variants (id, name, stream_id, audio_bitrate, audio_channels, audio_sample_rate, video_bitrate, video_framerate, video_height, video_width, audio_codec, video_codec, created_at, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11,$12, $13, $14)", - v.id, - v.name, - v.stream_id, - v.audio_bitrate, - v.audio_channels, - v.audio_sample_rate, - v.video_bitrate, - v.video_framerate, - v.video_height, - v.video_width, - v.audio_codec, - v.video_codec, - v.created_at, - v.metadata, - ).execute(&*global.db).await.unwrap(); - } - - let loaded = global - .stream_variants_by_stream_id_loader - .load_one(s.id) - .await - .unwrap(); - - assert!(loaded.is_some()); - - let loaded = loaded.unwrap(); - - let audio_video = loaded.iter().find(|v| v.name == "video-audio").unwrap(); - - assert_eq!(audio_video.audio_bitrate, Some(128)); - assert_eq!(audio_video.audio_channels, Some(2)); - assert_eq!(audio_video.audio_sample_rate, Some(44100)); - assert_eq!(audio_video.video_bitrate, Some(12800)); - assert_eq!(audio_video.video_framerate, Some(30)); - assert_eq!(audio_video.video_height, Some(720)); - assert_eq!(audio_video.video_width, Some(1280)); - assert_eq!(audio_video.audio_codec, Some("aac".to_string())); - assert_eq!(audio_video.video_codec, Some("h264".to_string())); - assert_eq!(audio_video.stream_id, s.id); - assert_eq!( - audio_video.created_at.timestamp(), - variants[0].created_at.timestamp() - ); - assert_eq!(audio_video.metadata, variants[0].metadata); - - let video_only = loaded.iter().find(|v| v.name == "video-only").unwrap(); - - assert_eq!(video_only.audio_bitrate, None); - assert_eq!(video_only.audio_channels, None); - assert_eq!(video_only.audio_sample_rate, None); - assert_eq!(video_only.video_bitrate, Some(12800)); - assert_eq!(video_only.video_framerate, Some(30)); - assert_eq!(video_only.video_height, Some(720)); - assert_eq!(video_only.video_width, Some(1280)); - assert_eq!(video_only.audio_codec, None); - assert_eq!(video_only.video_codec, Some("h264".to_string())); - assert_eq!(video_only.stream_id, s.id); - assert_eq!( - video_only.created_at.timestamp(), - variants[1].created_at.timestamp() - ); - assert_eq!(video_only.metadata, variants[1].metadata); - - let audio_only = loaded.iter().find(|v| v.name == "audio-only").unwrap(); - - assert_eq!(audio_only.audio_bitrate, Some(128)); - assert_eq!(audio_only.audio_channels, Some(2)); - assert_eq!(audio_only.audio_sample_rate, Some(44100)); - assert_eq!(audio_only.video_bitrate, None); - assert_eq!(audio_only.video_framerate, None); - assert_eq!(audio_only.video_height, None); - assert_eq!(audio_only.video_width, None); - assert_eq!(audio_only.audio_codec, Some("aac".to_string())); - assert_eq!(audio_only.video_codec, None); - assert_eq!(audio_only.stream_id, s.id); - assert_eq!( - audio_only.created_at.timestamp(), - variants[2].created_at.timestamp() - ); - assert_eq!(audio_only.metadata, variants[2].metadata); -} diff --git a/backend/api/src/tests/grpc/api.rs b/backend/api/src/tests/grpc/api.rs index 672cb233..d556b3d2 100644 --- a/backend/api/src/tests/grpc/api.rs +++ b/backend/api/src/tests/grpc/api.rs @@ -1,17 +1,17 @@ use crate::config::{AppConfig, GrpcConfig}; use crate::database::{global_role::Permission, user}; -use crate::database::{stream, stream_bitrate_update, stream_event, stream_variant}; -use crate::grpc::pb::scuffle::backend::{ +use crate::database::{stream, stream_bitrate_update, stream_event}; +use crate::grpc::run; +use crate::pb; +use crate::pb::scuffle::backend::{ update_live_stream_request, LiveStreamState, NewLiveStreamRequest, }; -use crate::grpc::pb::scuffle::types::stream_variant::{AudioSettings, VideoSettings}; -use crate::grpc::pb::scuffle::types::StreamVariant; -use crate::grpc::{self, run}; +use crate::pb::scuffle::types::stream_variants::transcode_state::{AudioSettings, VideoSettings}; +use crate::pb::scuffle::types::{stream_variants, StreamVariants}; use crate::tests::global::mock_global_state; use chrono::Utc; use common::grpc::make_channel; use common::prelude::FutureTimeout; -use serde_json::json; use serial_test::serial; use std::time::Duration; use uuid::Uuid; @@ -40,9 +40,9 @@ async fn test_serial_grpc_authenticate_invalid_stream_key() { ) .unwrap(); - let mut client = grpc::pb::scuffle::backend::api_client::ApiClient::new(channel); + let mut client = pb::scuffle::backend::api_client::ApiClient::new(channel); let err = client - .authenticate_live_stream(grpc::pb::scuffle::backend::AuthenticateLiveStreamRequest { + .authenticate_live_stream(pb::scuffle::backend::AuthenticateLiveStreamRequest { app_name: "test".to_string(), stream_key: "test".to_string(), ip_address: "127.0.0.1".to_string(), @@ -124,9 +124,9 @@ async fn test_serial_grpc_authenticate_valid_stream_key() { ) .unwrap(); - let mut client = grpc::pb::scuffle::backend::api_client::ApiClient::new(channel); + let mut client = pb::scuffle::backend::api_client::ApiClient::new(channel); let resp = client - .authenticate_live_stream(grpc::pb::scuffle::backend::AuthenticateLiveStreamRequest { + .authenticate_live_stream(pb::scuffle::backend::AuthenticateLiveStreamRequest { app_name: "test".to_string(), stream_key: user.get_stream_key(), ip_address: "127.0.0.1".to_string(), @@ -149,7 +149,7 @@ async fn test_serial_grpc_authenticate_valid_stream_key() { .unwrap(); let resp = client - .authenticate_live_stream(grpc::pb::scuffle::backend::AuthenticateLiveStreamRequest { + .authenticate_live_stream(pb::scuffle::backend::AuthenticateLiveStreamRequest { app_name: "test".to_string(), stream_key: user.get_stream_key(), ip_address: "127.0.0.1".to_string(), @@ -241,10 +241,10 @@ async fn test_serial_grpc_authenticate_valid_stream_key_ext() { ) .unwrap(); - let mut client = grpc::pb::scuffle::backend::api_client::ApiClient::new(channel); + let mut client = pb::scuffle::backend::api_client::ApiClient::new(channel); let resp = client - .authenticate_live_stream(grpc::pb::scuffle::backend::AuthenticateLiveStreamRequest { + .authenticate_live_stream(pb::scuffle::backend::AuthenticateLiveStreamRequest { app_name: "test".to_string(), stream_key: user.get_stream_key(), ip_address: "127.0.0.1".to_string(), @@ -335,10 +335,10 @@ async fn test_serial_grpc_authenticate_valid_stream_key_ext_2() { ) .unwrap(); - let mut client = grpc::pb::scuffle::backend::api_client::ApiClient::new(channel); + let mut client = pb::scuffle::backend::api_client::ApiClient::new(channel); let resp = client - .authenticate_live_stream(grpc::pb::scuffle::backend::AuthenticateLiveStreamRequest { + .authenticate_live_stream(pb::scuffle::backend::AuthenticateLiveStreamRequest { app_name: "test".to_string(), stream_key: user.get_stream_key(), ip_address: "127.0.0.1".to_string(), @@ -419,13 +419,13 @@ async fn test_serial_grpc_update_live_stream_state() { ) .unwrap(); - let mut client = grpc::pb::scuffle::backend::api_client::ApiClient::new(channel); + let mut client = pb::scuffle::backend::api_client::ApiClient::new(channel); { let timestamp = Utc::now().timestamp() as u64; assert!(client - .update_live_stream(grpc::pb::scuffle::backend::UpdateLiveStreamRequest { + .update_live_stream(pb::scuffle::backend::UpdateLiveStreamRequest { connection_id: conn_id.to_string(), stream_id: s.id.to_string(), updates: vec![update_live_stream_request::Update { @@ -451,7 +451,7 @@ async fn test_serial_grpc_update_live_stream_state() { let timestamp = Utc::now().timestamp() as u64; assert!(client - .update_live_stream(grpc::pb::scuffle::backend::UpdateLiveStreamRequest { + .update_live_stream(pb::scuffle::backend::UpdateLiveStreamRequest { connection_id: conn_id.to_string(), stream_id: s.id.to_string(), updates: vec![update_live_stream_request::Update { @@ -477,7 +477,7 @@ async fn test_serial_grpc_update_live_stream_state() { let timestamp = Utc::now().timestamp() as u64; assert!(client - .update_live_stream(grpc::pb::scuffle::backend::UpdateLiveStreamRequest { + .update_live_stream(pb::scuffle::backend::UpdateLiveStreamRequest { connection_id: conn_id.to_string(), stream_id: s.id.to_string(), updates: vec![update_live_stream_request::Update { @@ -504,7 +504,7 @@ async fn test_serial_grpc_update_live_stream_state() { let timestamp = Utc::now().timestamp() as u64; let res = client - .update_live_stream(grpc::pb::scuffle::backend::UpdateLiveStreamRequest { + .update_live_stream(pb::scuffle::backend::UpdateLiveStreamRequest { connection_id: conn_id.to_string(), stream_id: s.id.to_string(), updates: vec![update_live_stream_request::Update { @@ -543,7 +543,7 @@ async fn test_serial_grpc_update_live_stream_state() { let timestamp = Utc::now().timestamp() as u64; let res = client - .update_live_stream(grpc::pb::scuffle::backend::UpdateLiveStreamRequest { + .update_live_stream(pb::scuffle::backend::UpdateLiveStreamRequest { connection_id: conn_id.to_string(), stream_id: s.id.to_string(), updates: vec![update_live_stream_request::Update { @@ -644,13 +644,13 @@ async fn test_serial_grpc_update_live_stream_bitrate() { ) .unwrap(); - let mut client = grpc::pb::scuffle::backend::api_client::ApiClient::new(channel); + let mut client = pb::scuffle::backend::api_client::ApiClient::new(channel); { let timestamp = Utc::now().timestamp() as u64; assert!(client - .update_live_stream(grpc::pb::scuffle::backend::UpdateLiveStreamRequest { + .update_live_stream(pb::scuffle::backend::UpdateLiveStreamRequest { connection_id: conn_id.to_string(), stream_id: s.id.to_string(), updates: vec![update_live_stream_request::Update { @@ -748,13 +748,13 @@ async fn test_serial_grpc_update_live_stream_event() { ) .unwrap(); - let mut client = grpc::pb::scuffle::backend::api_client::ApiClient::new(channel); + let mut client = pb::scuffle::backend::api_client::ApiClient::new(channel); { let timestamp = Utc::now().timestamp() as u64; assert!(client - .update_live_stream(grpc::pb::scuffle::backend::UpdateLiveStreamRequest { + .update_live_stream(pb::scuffle::backend::UpdateLiveStreamRequest { connection_id: conn_id.to_string(), stream_id: s.id.to_string(), updates: vec![update_live_stream_request::Update { @@ -852,156 +852,76 @@ async fn test_serial_grpc_update_live_stream_variants() { ) .unwrap(); - let mut client = grpc::pb::scuffle::backend::api_client::ApiClient::new(channel); + let mut client = pb::scuffle::backend::api_client::ApiClient::new(channel); { let timestamp = Utc::now().timestamp() as u64; - let variants = vec![ - stream_variant::Model { - id: Uuid::new_v4(), - name: "video-audio".to_string(), - stream_id: s.id, - audio_bitrate: Some(128), - audio_channels: Some(2), - audio_sample_rate: Some(44100), - video_bitrate: Some(12800), - video_framerate: Some(30), - video_height: Some(720), - video_width: Some(1280), - audio_codec: Some("aac".to_string()), - video_codec: Some("h264".to_string()), - created_at: Utc::now(), - metadata: json!({}), - }, - stream_variant::Model { - id: Uuid::new_v4(), - name: "video-only".to_string(), - stream_id: s.id, - audio_bitrate: None, - audio_channels: None, - audio_sample_rate: None, - video_bitrate: Some(12800), - video_framerate: Some(30), - video_height: Some(720), - video_width: Some(1280), - audio_codec: None, - video_codec: Some("h264".to_string()), - created_at: Utc::now(), - metadata: json!({}), - }, - stream_variant::Model { - id: Uuid::new_v4(), - name: "audio-only".to_string(), - stream_id: s.id, - audio_bitrate: Some(128), - audio_channels: Some(2), - audio_sample_rate: Some(44100), - video_bitrate: None, - video_framerate: None, - video_height: None, - video_width: None, - audio_codec: Some("aac".to_string()), - video_codec: None, - created_at: Utc::now(), - metadata: json!({}), - }, - ]; + let source_id = Uuid::new_v4().to_string(); + let audio_id = Uuid::new_v4().to_string(); + + let variants = StreamVariants { + stream_variants: vec![ + stream_variants::StreamVariant { + name: "source".to_string(), + group: "aac".to_string(), + transcode_state_ids: vec![source_id.to_string(), audio_id.to_string()], + }, + stream_variants::StreamVariant { + name: "audio-only".to_string(), + group: "aac".to_string(), + transcode_state_ids: vec![audio_id.to_string()], + }, + ], + transcode_states: vec![ + stream_variants::TranscodeState { + bitrate: 8000 * 1024, + codec: "avc1.640028".to_string(), + id: source_id.to_string(), + copy: true, + settings: Some(stream_variants::transcode_state::Settings::Video( + VideoSettings { + framerate: 60, + height: 1080, + width: 1920, + }, + )), + }, + stream_variants::TranscodeState { + bitrate: 128 * 1024, + codec: "mp4a.40.2".to_string(), + id: audio_id.to_string(), + copy: false, + settings: Some(stream_variants::transcode_state::Settings::Audio( + AudioSettings { + channels: 2, + sample_rate: 48000, + }, + )), + }, + ], + }; assert!(client - .update_live_stream(grpc::pb::scuffle::backend::UpdateLiveStreamRequest { + .update_live_stream(pb::scuffle::backend::UpdateLiveStreamRequest { connection_id: conn_id.to_string(), stream_id: s.id.to_string(), updates: vec![update_live_stream_request::Update { timestamp, update: Some(update_live_stream_request::update::Update::Variants( - update_live_stream_request::Variants { - variants: variants - .iter() - .map(|v| { - let audio_settings = - v.audio_bitrate.map(|audio_bitrate| AudioSettings { - bitrate: audio_bitrate as u32, - channels: v.audio_channels.unwrap() as u32, - sample_rate: v.audio_sample_rate.unwrap() as u32, - codec: v.audio_codec.clone().unwrap(), - }); - - let video_settings = - v.video_bitrate.map(|video_bitrate| VideoSettings { - bitrate: video_bitrate as u32, - framerate: v.video_framerate.unwrap() as u32, - height: v.video_height.unwrap() as u32, - width: v.video_width.unwrap() as u32, - codec: v.video_codec.clone().unwrap(), - }); - - StreamVariant { - name: v.name.clone(), - id: v.id.to_string(), - metadata: v.metadata.to_string(), - audio_settings, - video_settings, - } - }) - .collect(), - } + variants.clone() )), }] }) .await .is_ok()); - let s = sqlx::query_as!( - stream_variant::Model, - "SELECT * FROM stream_variants WHERE stream_id = $1", - s.id, - ) - .fetch_all(&*db) - .await - .unwrap(); + let s = sqlx::query_as!(stream::Model, "SELECT * FROM streams WHERE id = $1", s.id,) + .fetch_one(&*db) + .await + .unwrap(); - let v = s.iter().find(|v| v.name == "video-audio").unwrap(); - assert_eq!(v.id, variants[0].id); - assert_eq!(v.audio_bitrate, Some(128)); - assert_eq!(v.audio_channels, Some(2)); - assert_eq!(v.audio_sample_rate, Some(44100)); - assert_eq!(v.video_bitrate, Some(12800)); - assert_eq!(v.video_framerate, Some(30)); - assert_eq!(v.video_height, Some(720)); - assert_eq!(v.video_width, Some(1280)); - assert_eq!(v.audio_codec, Some("aac".to_string())); - assert_eq!(v.video_codec, Some("h264".to_string())); - assert_eq!(v.metadata, json!({})); - assert_eq!(v.created_at.timestamp(), timestamp as i64); - - let v = s.iter().find(|v| v.name == "video-only").unwrap(); - assert_eq!(v.id, variants[1].id); - assert_eq!(v.audio_bitrate, None); - assert_eq!(v.audio_channels, None); - assert_eq!(v.audio_sample_rate, None); - assert_eq!(v.video_bitrate, Some(12800)); - assert_eq!(v.video_framerate, Some(30)); - assert_eq!(v.video_height, Some(720)); - assert_eq!(v.video_width, Some(1280)); - assert_eq!(v.audio_codec, None); - assert_eq!(v.video_codec, Some("h264".to_string())); - assert_eq!(v.metadata, json!({})); - assert_eq!(v.created_at.timestamp(), timestamp as i64); - - let v = s.iter().find(|v| v.name == "audio-only").unwrap(); - assert_eq!(v.id, variants[2].id); - assert_eq!(v.audio_bitrate, Some(128)); - assert_eq!(v.audio_channels, Some(2)); - assert_eq!(v.audio_sample_rate, Some(44100)); - assert_eq!(v.video_bitrate, None); - assert_eq!(v.video_framerate, None); - assert_eq!(v.video_height, None); - assert_eq!(v.video_width, None); - assert_eq!(v.audio_codec, Some("aac".to_string())); - assert_eq!(v.video_codec, None); - assert_eq!(v.metadata, json!({})); - assert_eq!(v.created_at.timestamp(), timestamp as i64); + assert_eq!(s.variants, Some(variants)); } handler @@ -1070,95 +990,63 @@ async fn test_serial_grpc_new_live_stream() { ) .unwrap(); - let mut client = grpc::pb::scuffle::backend::api_client::ApiClient::new(channel); - - let variants = vec![ - stream_variant::Model { - id: Uuid::new_v4(), - name: "video-audio".to_string(), - stream_id: s.id, - audio_bitrate: Some(128), - audio_channels: Some(2), - audio_sample_rate: Some(44100), - video_bitrate: Some(12800), - video_framerate: Some(30), - video_height: Some(720), - video_width: Some(1280), - audio_codec: Some("aac".to_string()), - video_codec: Some("h264".to_string()), - created_at: Utc::now(), - metadata: json!({}), - }, - stream_variant::Model { - id: Uuid::new_v4(), - name: "video-only".to_string(), - stream_id: s.id, - audio_bitrate: None, - audio_channels: None, - audio_sample_rate: None, - video_bitrate: Some(12800), - video_framerate: Some(30), - video_height: Some(720), - video_width: Some(1280), - audio_codec: None, - video_codec: Some("h264".to_string()), - created_at: Utc::now(), - metadata: json!({}), - }, - stream_variant::Model { - id: Uuid::new_v4(), - name: "audio-only".to_string(), - stream_id: s.id, - audio_bitrate: Some(128), - audio_channels: Some(2), - audio_sample_rate: Some(44100), - video_bitrate: None, - video_framerate: None, - video_height: None, - video_width: None, - audio_codec: Some("aac".to_string()), - video_codec: None, - created_at: Utc::now(), - metadata: json!({}), - }, - ]; + let mut client = pb::scuffle::backend::api_client::ApiClient::new(channel); + + let source_id = Uuid::new_v4().to_string(); + let audio_id = Uuid::new_v4().to_string(); + + let variants = StreamVariants { + stream_variants: vec![ + stream_variants::StreamVariant { + name: "source".to_string(), + group: "aac".to_string(), + transcode_state_ids: vec![source_id.to_string(), audio_id.to_string()], + }, + stream_variants::StreamVariant { + name: "audio-only".to_string(), + group: "aac".to_string(), + transcode_state_ids: vec![audio_id.to_string()], + }, + ], + transcode_states: vec![ + stream_variants::TranscodeState { + bitrate: 8000 * 1024, + codec: "avc1.640028".to_string(), + id: source_id.to_string(), + copy: true, + settings: Some(stream_variants::transcode_state::Settings::Video( + VideoSettings { + framerate: 60, + height: 1080, + width: 1920, + }, + )), + }, + stream_variants::TranscodeState { + bitrate: 128 * 1024, + codec: "mp4a.40.2".to_string(), + id: audio_id.to_string(), + copy: false, + settings: Some(stream_variants::transcode_state::Settings::Audio( + AudioSettings { + channels: 2, + sample_rate: 48000, + }, + )), + }, + ], + }; let response = client .new_live_stream(NewLiveStreamRequest { old_stream_id: s.id.to_string(), - variants: variants - .iter() - .map(|v| { - let audio_settings = v.audio_bitrate.map(|audio_bitrate| AudioSettings { - bitrate: audio_bitrate as u32, - channels: v.audio_channels.unwrap() as u32, - sample_rate: v.audio_sample_rate.unwrap() as u32, - codec: v.audio_codec.clone().unwrap(), - }); - - let video_settings = v.video_bitrate.map(|video_bitrate| VideoSettings { - bitrate: video_bitrate as u32, - framerate: v.video_framerate.unwrap() as u32, - height: v.video_height.unwrap() as u32, - width: v.video_width.unwrap() as u32, - codec: v.video_codec.clone().unwrap(), - }); - - StreamVariant { - name: v.name.clone(), - id: v.id.to_string(), - metadata: v.metadata.to_string(), - audio_settings, - video_settings, - } - }) - .collect(), + variants: Some(variants.clone()), }) .await .unwrap() .into_inner(); - let s = sqlx::query_as!(stream::Model, "SELECT * FROM streams WHERE id = $1", s.id,) + let s = sqlx::query_as!(stream::Model, "SELECT * FROM streams WHERE id = $1", s.id) .fetch_one(&*db) .await .unwrap(); @@ -1191,54 +1079,7 @@ async fn test_serial_grpc_new_live_stream() { assert_eq!(s.ingest_address, "some address"); assert_eq!(s.connection_id, conn_id); assert_eq!(s.state, stream::State::NotReady); - - let s = sqlx::query_as!( - stream_variant::Model, - "SELECT * FROM stream_variants WHERE stream_id = $1", - stream_id, - ) - .fetch_all(&*db) - .await - .unwrap(); - - let v = s.iter().find(|v| v.name == "video-audio").unwrap(); - assert_eq!(v.id, variants[0].id); - assert_eq!(v.audio_bitrate, Some(128)); - assert_eq!(v.audio_channels, Some(2)); - assert_eq!(v.audio_sample_rate, Some(44100)); - assert_eq!(v.video_bitrate, Some(12800)); - assert_eq!(v.video_framerate, Some(30)); - assert_eq!(v.video_height, Some(720)); - assert_eq!(v.video_width, Some(1280)); - assert_eq!(v.audio_codec, Some("aac".to_string())); - assert_eq!(v.video_codec, Some("h264".to_string())); - assert_eq!(v.metadata, json!({})); - - let v = s.iter().find(|v| v.name == "video-only").unwrap(); - assert_eq!(v.id, variants[1].id); - assert_eq!(v.audio_bitrate, None); - assert_eq!(v.audio_channels, None); - assert_eq!(v.audio_sample_rate, None); - assert_eq!(v.video_bitrate, Some(12800)); - assert_eq!(v.video_framerate, Some(30)); - assert_eq!(v.video_height, Some(720)); - assert_eq!(v.video_width, Some(1280)); - assert_eq!(v.audio_codec, None); - assert_eq!(v.video_codec, Some("h264".to_string())); - assert_eq!(v.metadata, json!({})); - - let v = s.iter().find(|v| v.name == "audio-only").unwrap(); - assert_eq!(v.id, variants[2].id); - assert_eq!(v.audio_bitrate, Some(128)); - assert_eq!(v.audio_channels, Some(2)); - assert_eq!(v.audio_sample_rate, Some(44100)); - assert_eq!(v.video_bitrate, None); - assert_eq!(v.video_framerate, None); - assert_eq!(v.video_height, None); - assert_eq!(v.video_width, None); - assert_eq!(v.audio_codec, Some("aac".to_string())); - assert_eq!(v.video_codec, None); - assert_eq!(v.metadata, json!({})); + assert_eq!(s.variants, Some(variants)); handler .cancel() diff --git a/backend/api/src/tests/grpc/health.rs b/backend/api/src/tests/grpc/health.rs index 0d2b62aa..8bb2aa8c 100644 --- a/backend/api/src/tests/grpc/health.rs +++ b/backend/api/src/tests/grpc/health.rs @@ -3,7 +3,8 @@ use common::prelude::FutureTimeout; use std::time::Duration; use crate::config::{AppConfig, GrpcConfig}; -use crate::grpc::{self, run}; +use crate::grpc::run; +use crate::pb; use crate::tests::global::mock_global_state; #[tokio::test] @@ -27,14 +28,14 @@ async fn test_grpc_health_check() { ) .unwrap(); - let mut client = grpc::pb::health::health_client::HealthClient::new(channel); + let mut client = pb::health::health_client::HealthClient::new(channel); let resp = client - .check(grpc::pb::health::HealthCheckRequest::default()) + .check(pb::health::HealthCheckRequest::default()) .await .unwrap(); assert_eq!( resp.into_inner().status, - grpc::pb::health::health_check_response::ServingStatus::Serving as i32 + pb::health::health_check_response::ServingStatus::Serving as i32 ); handler .cancel() @@ -70,10 +71,10 @@ async fn test_grpc_health_watch() { ) .unwrap(); - let mut client = grpc::pb::health::health_client::HealthClient::new(channel); + let mut client = pb::health::health_client::HealthClient::new(channel); let resp = client - .watch(grpc::pb::health::HealthCheckRequest::default()) + .watch(pb::health::HealthCheckRequest::default()) .await .unwrap(); @@ -81,7 +82,7 @@ async fn test_grpc_health_watch() { let resp = stream.message().await.unwrap().unwrap(); assert_eq!( resp.status, - grpc::pb::health::health_check_response::ServingStatus::Serving as i32 + pb::health::health_check_response::ServingStatus::Serving as i32 ); let cancel = handler.cancel(); @@ -89,7 +90,7 @@ async fn test_grpc_health_watch() { let resp = stream.message().await.unwrap().unwrap(); assert_eq!( resp.status, - grpc::pb::health::health_check_response::ServingStatus::NotServing as i32 + pb::health::health_check_response::ServingStatus::NotServing as i32 ); cancel diff --git a/backend/api/src/tests/grpc/tls.rs b/backend/api/src/tests/grpc/tls.rs index 7c631898..4a6927c2 100644 --- a/backend/api/src/tests/grpc/tls.rs +++ b/backend/api/src/tests/grpc/tls.rs @@ -5,7 +5,8 @@ use std::time::Duration; use tonic::transport::{Certificate, Identity}; use crate::config::{AppConfig, GrpcConfig, TlsConfig}; -use crate::grpc::{self, run}; +use crate::grpc::run; +use crate::pb; use crate::tests::global::mock_global_state; #[tokio::test] @@ -55,15 +56,15 @@ async fn test_grpc_tls_rsa() { tokio::time::sleep(Duration::from_millis(500)).await; - let mut client = grpc::pb::health::health_client::HealthClient::new(channel); + let mut client = pb::health::health_client::HealthClient::new(channel); let resp = client - .check(grpc::pb::health::HealthCheckRequest::default()) + .check(pb::health::HealthCheckRequest::default()) .await .unwrap(); assert_eq!( resp.into_inner().status, - grpc::pb::health::health_check_response::ServingStatus::Serving as i32 + pb::health::health_check_response::ServingStatus::Serving as i32 ); handler .cancel() @@ -125,15 +126,15 @@ async fn test_grpc_tls_ec() { tokio::time::sleep(Duration::from_millis(500)).await; - let mut client = grpc::pb::health::health_client::HealthClient::new(channel); + let mut client = pb::health::health_client::HealthClient::new(channel); let resp = client - .check(grpc::pb::health::HealthCheckRequest::default()) + .check(pb::health::HealthCheckRequest::default()) .await .unwrap(); assert_eq!( resp.into_inner().status, - grpc::pb::health::health_check_response::ServingStatus::Serving as i32 + pb::health::health_check_response::ServingStatus::Serving as i32 ); handler .cancel() diff --git a/backend/migrations/20230217024406_Initial_Database_Structure.down.sql b/backend/migrations/20230217024406_Initial_Database_Structure.down.sql index ab824731..87733fde 100644 --- a/backend/migrations/20230217024406_Initial_Database_Structure.down.sql +++ b/backend/migrations/20230217024406_Initial_Database_Structure.down.sql @@ -7,5 +7,4 @@ DROP TABLE IF EXISTS channel_roles CASCADE; DROP TABLE IF EXISTS channel_role_grants CASCADE; DROP TABLE IF EXISTS streams CASCADE; DROP TABLE IF EXISTS stream_bitrate_updates CASCADE; -DROP TABLE IF EXISTS stream_variants CASCADE; DROP TABLE IF EXISTS stream_events CASCADE; diff --git a/backend/migrations/20230217024406_Initial_Database_Structure.up.sql b/backend/migrations/20230217024406_Initial_Database_Structure.up.sql index eac00563..28a603af 100644 --- a/backend/migrations/20230217024406_Initial_Database_Structure.up.sql +++ b/backend/migrations/20230217024406_Initial_Database_Structure.up.sql @@ -82,6 +82,7 @@ CREATE TABLE streams ( state int NOT NULL DEFAULT 0, -- 0 = not ready, 1 = ready, 2 = stopped, 3 = stopped resumable, 4 = failed, 5 = was ready ingest_address varchar(255) NOT NULL, connection_id uuid NOT NULL, + variants bytea, -- Timestamps created_at timestamptz NOT NULL DEFAULT NOW(), updated_at timestamptz DEFAULT NULL, -- NULL = not started (last bitrate is report) @@ -97,24 +98,6 @@ CREATE TABLE stream_bitrate_updates ( created_at timestamptz NOT NULL DEFAULT NOW() ); -CREATE TABLE stream_variants ( - id uuid PRIMARY KEY DEFAULT gen_random_uuid(), - stream_id uuid NOT NULL, -- foreign key to streams(id) - name varchar(255) NOT NULL, - video_framerate int, -- null = audio only - video_width int, -- null = audio only - video_height int, -- null = audio only - video_bitrate int, -- null = audio only - video_codec varchar(255), -- null = audio only - audio_sample_rate int, -- null = video only - audio_channels int, -- null = video only - audio_bitrate int, -- null = video only - audio_codec varchar(255), -- null = video only - metadata jsonb NOT NULL DEFAULT '{}', - -- Timestamps - created_at timestamptz NOT NULL DEFAULT NOW() -); - CREATE TABLE stream_events ( id uuid PRIMARY KEY DEFAULT gen_random_uuid(), stream_id uuid NOT NULL, -- foreign key to streams(id) @@ -142,9 +125,8 @@ CREATE INDEX streams_channel_id_idx ON streams (channel_id); CREATE INDEX stream_bitrate_updates_stream_id_idx ON stream_bitrate_updates (stream_id); CREATE INDEX stream_bitrate_updates_created_at_idx ON stream_bitrate_updates (created_at); -CREATE INDEX stream_variants_stream_id_idx ON stream_variants (stream_id); - CREATE INDEX stream_events_stream_id_idx ON stream_events (stream_id); + -- CONSTRAINTS ALTER TABLE IF EXISTS users ADD CONSTRAINT users_username_unique UNIQUE (username); @@ -155,7 +137,6 @@ ALTER TABLE IF EXISTS global_roles ADD CONSTRAINT global_roles_rank_unique UNIQU ALTER TABLE IF EXISTS channel_roles ADD CONSTRAINT channel_roles_name_unique UNIQUE (channel_id, name); ALTER TABLE IF EXISTS channel_roles ADD CONSTRAINT channel_roles_rank_unique UNIQUE (channel_id, rank); -ALTER TABLE IF EXISTS stream_variants ADD CONSTRAINT stream_variants_name_unique UNIQUE (stream_id, name); -- Foreign keys ALTER TABLE sessions ADD CONSTRAINT sessions_user_id_fkey FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE; @@ -170,6 +151,4 @@ ALTER TABLE streams ADD CONSTRAINT streams_channel_id_fkey FOREIGN KEY (channel_ ALTER TABLE stream_bitrate_updates ADD CONSTRAINT stream_bitrate_updates_stream_id_fkey FOREIGN KEY (stream_id) REFERENCES streams(id) ON DELETE CASCADE; -ALTER TABLE stream_variants ADD CONSTRAINT stream_variants_stream_id_fkey FOREIGN KEY (stream_id) REFERENCES streams(id) ON DELETE CASCADE; - ALTER TABLE stream_events ADD CONSTRAINT stream_events_stream_id_fkey FOREIGN KEY (stream_id) REFERENCES streams(id) ON DELETE CASCADE; diff --git a/maskfile.md b/maskfile.md index 4f9a9b04..3c6ffb91 100644 --- a/maskfile.md +++ b/maskfile.md @@ -159,11 +159,6 @@ if [[ "$verbose" == "true" ]]; then set -x fi -if [ "$no_rust" != "true" ]; then - cargo fmt --all - cargo clippy --fix --allow-dirty --allow-staged -fi - if [ "$no_js" != "true" ]; then pnpm --recursive --parallel --stream run format fi @@ -175,6 +170,11 @@ fi if [ "$no_proto" != "true" ]; then find . -name '*.proto' -exec clang-format -i {} \; fi + +if [ "$no_rust" != "true" ]; then + cargo fmt --all + cargo clippy --fix --allow-dirty --allow-staged +fi ``` ## lint diff --git a/proto/scuffle/backend/api.proto b/proto/scuffle/backend/api.proto index 2d6f9683..e71fa7a9 100644 --- a/proto/scuffle/backend/api.proto +++ b/proto/scuffle/backend/api.proto @@ -2,7 +2,7 @@ syntax = "proto3"; package scuffle.backend; -import "scuffle/types/stream_variant.proto"; +import "scuffle/types/stream_variants.proto"; // This is an internal API for the Scuffle service. // Used for communication between scuffle microservices. @@ -47,13 +47,8 @@ message AuthenticateLiveStreamResponse { bool transcode = 3; // should record the stream bool record = 4; - // Try resume the live stream using the variants from the previous stream. - // If the stream was not stopped properly, this will be true. - // If its not possible to resume the stream, ingest should call NewLiveStream - // to create a new stream. - bool try_resume = 5; - // The variants of the stream. (if the stream was resumed) - repeated scuffle.types.StreamVariant variants = 6; + // The variants of the stream. (if present, try resume the stream) + optional scuffle.types.StreamVariants variants = 5; } // This request is created by the Ingest service when we attempt to resume a @@ -62,7 +57,7 @@ message NewLiveStreamRequest { // The ID of the stream to create. string old_stream_id = 1; // The new variants of the stream. - repeated scuffle.types.StreamVariant variants = 2; + scuffle.types.StreamVariants variants = 2; } // This response is sent back to the Ingest service, generated by the API @@ -85,13 +80,6 @@ message UpdateLiveStreamRequest { string connection_id = 2; - // Once transcoding starts this message will be sent to the API service. - message Variants { - repeated scuffle.types.StreamVariant variants = 1; - } - - // If the stream is ready or has stopped, this message will be sent. - // If the stream failed, this message will be sent. message Event { enum Level { @@ -119,7 +107,7 @@ message UpdateLiveStreamRequest { message Update { uint64 timestamp = 1; oneof update { - Variants variants = 2; + scuffle.types.StreamVariants variants = 2; LiveStreamState state = 3; Bitrate bitrate = 4; Event event = 5; diff --git a/proto/scuffle/events/transcoder.proto b/proto/scuffle/events/transcoder.proto index 0dc1a51f..18a09a04 100644 --- a/proto/scuffle/events/transcoder.proto +++ b/proto/scuffle/events/transcoder.proto @@ -2,7 +2,7 @@ syntax = "proto3"; package scuffle.events; -import "scuffle/types/stream_variant.proto"; +import "scuffle/types/stream_variants.proto"; message TranscoderMessage { string id = 1; @@ -17,5 +17,5 @@ message TranscoderMessageNewStream { string request_id = 1; string stream_id = 2; string ingest_address = 3; - repeated scuffle.types.StreamVariant variants = 4; + scuffle.types.StreamVariants variants = 4; } diff --git a/proto/scuffle/types/stream_variant.proto b/proto/scuffle/types/stream_variant.proto deleted file mode 100644 index ed4402b2..00000000 --- a/proto/scuffle/types/stream_variant.proto +++ /dev/null @@ -1,45 +0,0 @@ -syntax = "proto3"; - -package scuffle.types; - -// A variant is a transcoded version of the stream. -message StreamVariant { - // The id of the variant. - string id = 1; - - // The name of the variant. - string name = 3; - - message VideoSettings { - // The width of the video. - uint32 width = 1; - // The height of the video. - uint32 height = 2; - // The framerate of the video. - uint32 framerate = 3; - // The bitrate of the video. - uint32 bitrate = 4; - // The codec of the video. - string codec = 5; - } - - message AudioSettings { - // The sample rate of the audio. - uint32 sample_rate = 1; - // The number of channels of the audio. - uint32 channels = 2; - // The bitrate of the audio. - uint32 bitrate = 3; - // The codec of the audio. - string codec = 4; - } - - // The video settings of the variant. (If the variant is audio-only, this will - // be empty.) - optional VideoSettings video_settings = 4; - // The audio settings of the variant. - optional AudioSettings audio_settings = 5; - - // The metadata of the variant. (This is a JSON string.) - string metadata = 6; -} diff --git a/proto/scuffle/types/stream_variants.proto b/proto/scuffle/types/stream_variants.proto new file mode 100644 index 00000000..6344bed9 --- /dev/null +++ b/proto/scuffle/types/stream_variants.proto @@ -0,0 +1,59 @@ +syntax = "proto3"; + +package scuffle.types; + +message StreamVariants { + // A variant is a transcoded version of the stream. + // A stream variant is unique by its name and group. + message StreamVariant { + // The name of the variant. + string name = 1; + + // Group the variant belongs to. + string group = 2; + + // The transcode states of the variant. + repeated string transcode_state_ids = 3; + } + + // A state that the transcoder should transcode to. + // A transcode state is unique by its id. + message TranscodeState { + // The id of the variant. + string id = 1; + + message VideoSettings { + // The width of the video. + uint32 width = 1; + // The height of the video. + uint32 height = 2; + // The framerate of the video. + uint32 framerate = 3; + } + + message AudioSettings { + // The sample rate of the audio. + uint32 sample_rate = 1; + // The number of channels of the audio. + uint32 channels = 2; + } + + // The settings for the transcode state (video or audio). + oneof settings { + VideoSettings video = 2; + AudioSettings audio = 3; + } + + // The bitrate of the video. + uint32 bitrate = 4; + + // The codec of the video. + string codec = 5; + + // Copy the stream directly from the source. + bool copy = 6; + } + + repeated StreamVariant stream_variants = 1; + repeated TranscodeState transcode_states = 2; +} diff --git a/proto/scuffle/video/ingest.proto b/proto/scuffle/video/ingest.proto index 74498b72..34dbaf44 100644 --- a/proto/scuffle/video/ingest.proto +++ b/proto/scuffle/video/ingest.proto @@ -14,6 +14,8 @@ service Ingest { // Ingest service. rpc TranscoderEvent(TranscoderEventRequest) returns (TranscoderEventResponse) {} + + rpc ShutdownStream(ShutdownStreamRequest) returns (ShutdownStreamResponse) {} } message WatchStreamRequest { @@ -76,3 +78,10 @@ message TranscoderEventRequest { } message TranscoderEventResponse {} + +message ShutdownStreamRequest { + // Stream id of the stream that is being transcoded. + string stream_id = 1; +} + +message ShutdownStreamResponse {} diff --git a/video/container/mp4/src/codec.rs b/video/container/mp4/src/codec.rs index 1f9a8ee2..d9ff04c8 100644 --- a/video/container/mp4/src/codec.rs +++ b/video/container/mp4/src/codec.rs @@ -1,4 +1,4 @@ -use std::fmt; +use std::{fmt, str::FromStr}; use aac::AudioObjectType; @@ -98,6 +98,186 @@ impl fmt::Display for VideoCodec { } } +impl FromStr for VideoCodec { + type Err = String; + + fn from_str(s: &str) -> Result { + let splits = s.split('.').collect::>(); + if splits.is_empty() { + return Err("invalid codec, empty string".into()); + } + + match splits[0] { + "avc1" => { + if splits.len() < 2 { + return Err("invalid codec, missing profile".into()); + } + + let profile = u8::from_str_radix(&splits[1][..2], 16) + .map_err(|e| format!("invalid codec, invalid profile: {}, {}", splits[1], e))?; + let constraint_set = u8::from_str_radix(&splits[1][2..4], 16).map_err(|e| { + format!( + "invalid codec, invalid constraint set: {}, {}", + splits[1], e + ) + })?; + let level = u8::from_str_radix(&splits[1][4..6], 16) + .map_err(|e| format!("invalid codec, invalid level: {}, {}", splits[1], e))?; + + Ok(VideoCodec::Avc { + profile, + constraint_set, + level, + }) + } + "hev1" => { + if splits.len() < 6 { + return Err("invalid codec, missing profile".into()); + } + + let general_profile_space = match splits[1] { + "A" => 1, + "B" => 2, + "C" => 3, + _ => { + return Err(format!( + "invalid codec, invalid general profile space: {}", + splits[1] + )) + } + }; + + let profile = u8::from_str_radix(splits[2], 16) + .map_err(|e| format!("invalid codec, invalid profile: {}, {}", splits[2], e))?; + + let profile_compatibility = u32::from_str_radix(splits[3], 16).map_err(|e| { + format!( + "invalid codec, invalid profile compatibility: {}, {}", + splits[3], e + ) + })?; + + let tier = match splits[4] { + "H" => true, + "L" => false, + _ => return Err(format!("invalid codec, invalid tier: {}", splits[4])), + }; + + let level = u8::from_str_radix(splits[5], 16) + .map_err(|e| format!("invalid codec, invalid level: {}, {}", splits[5], e))?; + + let constraint_indicator = u64::from_str_radix(splits[6], 16).map_err(|e| { + format!( + "invalid codec, invalid constraint indicator: {}, {}", + splits[6], e + ) + })?; + + Ok(VideoCodec::Hevc { + general_profile_space, + profile, + level, + tier, + profile_compatibility, + constraint_indicator, + }) + } + "av01" => { + if splits.len() < 12 { + return Err("invalid codec, missing profile".into()); + } + + let profile = u8::from_str_radix(splits[1], 16) + .map_err(|e| format!("invalid codec, invalid profile: {}, {}", splits[1], e))?; + + let level = u8::from_str_radix(splits[2], 16) + .map_err(|e| format!("invalid codec, invalid level: {}, {}", splits[2], e))?; + + let tier = match splits[3] { + "H" => true, + "M" => false, + _ => return Err(format!("invalid codec, invalid tier: {}", splits[3])), + }; + + let depth = splits[4] + .parse::() + .map_err(|e| format!("invalid codec, invalid depth: {}, {}", splits[4], e))?; + + let monochrome = match splits[5] { + "1" => true, + "0" => false, + _ => return Err(format!("invalid codec, invalid monochrome: {}", splits[5])), + }; + + let sub_sampling_x = match splits[6] { + "1" => true, + "0" => false, + _ => { + return Err(format!( + "invalid codec, invalid sub_sampling_x: {}", + splits[6] + )) + } + }; + + let sub_sampling_y = match splits[7] { + "1" => true, + "0" => false, + _ => { + return Err(format!( + "invalid codec, invalid sub_sampling_y: {}", + splits[7] + )) + } + }; + + let color_primaries = splits[8].parse::().map_err(|e| { + format!( + "invalid codec, invalid color_primaries: {}, {}", + splits[8], e + ) + })?; + + let transfer_characteristics = splits[9].parse::().map_err(|e| { + format!( + "invalid codec, invalid transfer_characteristics: {}, {}", + splits[9], e + ) + })?; + + let matrix_coefficients = splits[10].parse::().map_err(|e| { + format!( + "invalid codec, invalid matrix_coefficients: {}, {}", + splits[10], e + ) + })?; + + let full_range_flag = splits[11].parse::().map_err(|e| { + format!( + "invalid codec, invalid full_range_flag: {}, {}", + splits[11], e + ) + })? == 1; + + Ok(VideoCodec::Av1 { + profile, + level, + tier, + depth, + monochrome, + sub_sampling_x, + sub_sampling_y, + color_primaries, + transfer_characteristics, + matrix_coefficients, + full_range_flag, + }) + } + r => Err(format!("invalid codec, unknown type: {}", r)), + } + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum AudioCodec { Aac { object_type: AudioObjectType }, @@ -112,3 +292,32 @@ impl fmt::Display for AudioCodec { } } } + +impl FromStr for AudioCodec { + type Err = String; + + fn from_str(s: &str) -> Result { + let splits = s.split('.').collect::>(); + if splits.is_empty() { + return Err("invalid codec, empty string".into()); + } + + match splits[0] { + "mp4a" => { + if splits.len() < 3 { + return Err("invalid codec, missing object type".into()); + } + + let object_type = splits[2].parse::().map_err(|e| { + format!("invalid codec, invalid object type: {}, {}", splits[2], e) + })?; + + Ok(AudioCodec::Aac { + object_type: AudioObjectType::from(object_type), + }) + } + "opus" => Ok(AudioCodec::Opus), + r => Err(format!("invalid codec, unknown type: {}", r)), + } + } +} diff --git a/video/ingest/Cargo.toml b/video/ingest/Cargo.toml index 26c720bf..af1600d0 100644 --- a/video/ingest/Cargo.toml +++ b/video/ingest/Cargo.toml @@ -34,6 +34,7 @@ bytesio = { path = "../bytesio" } flv = { path = "../container/flv" } transmuxer = { path = "../transmuxer" } mp4 = { path = "../container/mp4" } +aac = { path = "../codec/aac" } [dev-dependencies] dotenvy = "0" diff --git a/video/ingest/src/connection_manager.rs b/video/ingest/src/connection_manager.rs index 1a09d0e7..f8563323 100644 --- a/video/ingest/src/connection_manager.rs +++ b/video/ingest/src/connection_manager.rs @@ -11,17 +11,18 @@ pub struct StreamConnection { } pub enum GrpcRequest { - Started { - id: Uuid, - }, WatchStream { id: Uuid, channel: mpsc::Sender, }, - ShuttingDown { + ShutdownStream, + TranscoderStarted { + id: Uuid, + }, + TranscoderShuttingDown { id: Uuid, }, - Error { + TranscoderError { id: Uuid, message: String, fatal: bool, diff --git a/video/ingest/src/grpc/ingest.rs b/video/ingest/src/grpc/ingest.rs index 3e9940c5..b42c8c23 100644 --- a/video/ingest/src/grpc/ingest.rs +++ b/video/ingest/src/grpc/ingest.rs @@ -2,8 +2,9 @@ use crate::{ connection_manager::{GrpcRequest, WatchStreamEvent}, global::GlobalState, pb::scuffle::video::{ - ingest_server, transcoder_event_request, watch_stream_response, TranscoderEventRequest, - TranscoderEventResponse, WatchStreamRequest, WatchStreamResponse, + ingest_server, transcoder_event_request, watch_stream_response, ShutdownStreamRequest, + ShutdownStreamResponse, TranscoderEventRequest, TranscoderEventResponse, + WatchStreamRequest, WatchStreamResponse, }, }; use std::{ @@ -122,12 +123,12 @@ impl ingest_server::Ingest for IngestServer { let request = match request.event { Some(transcoder_event_request::Event::Started(_)) => { - GrpcRequest::Started { id: request_id } + GrpcRequest::TranscoderStarted { id: request_id } } Some(transcoder_event_request::Event::ShuttingDown(_)) => { - GrpcRequest::ShuttingDown { id: request_id } + GrpcRequest::TranscoderShuttingDown { id: request_id } } - Some(transcoder_event_request::Event::Error(error)) => GrpcRequest::Error { + Some(transcoder_event_request::Event::Error(error)) => GrpcRequest::TranscoderError { id: request_id, message: error.message, fatal: error.fatal, @@ -145,4 +146,31 @@ impl ingest_server::Ingest for IngestServer { Ok(Response::new(TranscoderEventResponse {})) } + + async fn shutdown_stream( + &self, + request: Request, + ) -> Result> { + let global = self + .global + .upgrade() + .ok_or_else(|| Status::internal("Global state is gone"))?; + + let request = request.into_inner(); + + let stream_id = Uuid::parse_str(&request.stream_id) + .map_err(|_| Status::invalid_argument("Invalid stream ID"))?; + + let request = GrpcRequest::ShutdownStream; + + if !global + .connection_manager + .submit_request(stream_id, request) + .await + { + return Err(Status::not_found("Stream not found")); + } + + Ok(Response::new(ShutdownStreamResponse {})) + } } diff --git a/video/ingest/src/ingest/connection.rs b/video/ingest/src/ingest/connection.rs index a7c3d65c..ef6857e9 100644 --- a/video/ingest/src/ingest/connection.rs +++ b/video/ingest/src/ingest/connection.rs @@ -24,12 +24,15 @@ use crate::{ pb::scuffle::{ backend::{ api_client::ApiClient, - update_live_stream_request::{self, event, update, Bitrate, Event, Update}, + update_live_stream_request::{event, update, Bitrate, Event, Update}, AuthenticateLiveStreamRequest, LiveStreamState, NewLiveStreamRequest, UpdateLiveStreamRequest, }, events::{self, transcoder_message}, - types::StreamVariant, + types::{ + stream_variants::{StreamVariant, TranscodeState}, + StreamVariants, + }, }, }; @@ -69,8 +72,7 @@ struct ApiResponse { id: Uuid, transcode: bool, record: bool, - try_resume: bool, - variants: Vec, + variants: Option, } const BITRATE_UPDATE_INTERVAL: u64 = 5; @@ -237,7 +239,6 @@ impl Connection { id, transcode: response.transcode, record: response.record, - try_resume: response.try_resume, variants: response.variants, }; @@ -466,7 +467,12 @@ impl Connection { }; match req { - GrpcRequest::Started { id } => { + GrpcRequest::ShutdownStream => { + tracing::info!("shutdown stream request"); + self.report_shutdown = false; + return false; + } + GrpcRequest::TranscoderStarted { id } => { tracing::info!("transcoder started: {}", id); if update_channel .try_send(vec![Update { @@ -479,7 +485,7 @@ impl Connection { return false; } } - GrpcRequest::Error { + GrpcRequest::TranscoderError { id, message, fatal: _, @@ -517,7 +523,7 @@ impl Connection { tracing::warn!("transcoder request failure id mismatch"); } } - GrpcRequest::ShuttingDown { id } => { + GrpcRequest::TranscoderShuttingDown { id } => { if self.current_transcoder_id == Some(id) { tracing::info!("transcoder shutting down"); return self.request_transcoder(update_channel, global).await; @@ -604,61 +610,73 @@ impl Connection { audio_settings: &AudioSettings, init_data: Bytes, ) -> bool { - let variants = generate_variants(video_settings, audio_settings, self.api_resp.transcode); + let new_variants = + generate_variants(video_settings, audio_settings, self.api_resp.transcode); // We can now at this point decide what we want to do with the stream. // What variants should be transcoded, ect... - if self.api_resp.try_resume { - // Check if the new variants are the same as the old ones. - let mut old_map = self - .api_resp - .variants - .iter() - .map(|v| (v.name.clone(), v)) - .collect::>(); - - for new_variant in &variants { - if let Some(old_variant) = old_map.remove(&new_variant.name) { - let video_same = if let Some(old_video) = &old_variant.video_settings { - if let Some(new_video) = &new_variant.video_settings { - old_video.codec == new_video.codec - && old_video.bitrate == new_video.bitrate - && old_video.width == new_video.width - && old_video.height == new_video.height - } else { - false - } - } else { - new_variant.video_settings.is_none() - }; - - let audio_same = if let Some(old_audio) = &old_variant.audio_settings { - if let Some(new_audio) = &new_variant.audio_settings { - old_audio.codec == new_audio.codec - && old_audio.bitrate == new_audio.bitrate - && old_audio.channels == new_audio.channels - && old_audio.sample_rate == new_audio.sample_rate + if let Some(old_variants) = self.api_resp.variants.take() { + let mut can_resume = true; + + fn make_map( + variants: &StreamVariants, + ) -> HashMap<(&str, &str), (&StreamVariant, Vec<&TranscodeState>)> { + let transcode_state_map = variants + .transcode_states + .iter() + .map(|s| (s.id.as_str(), s)) + .collect::>(); + + variants + .stream_variants + .iter() + .map(|v| { + let states = v + .transcode_state_ids + .iter() + .map(|id| *transcode_state_map.get(id.as_str()).unwrap()) + .collect::>(); + ((v.group.as_str(), v.name.as_str()), (v, states)) + }) + .collect() + } + + let new_map = make_map(&new_variants); + let mut old_map = make_map(&old_variants); + + for (key, (_, new_transcode_states)) in new_map.iter() { + if let Some((_, mut old_transcode_states)) = old_map.remove(key) { + if new_transcode_states.iter().all(|new| { + let pos = old_transcode_states.iter().position(|old| { + new.copy == old.copy + && new.codec == old.codec + && new.settings == old.settings + }); + + if let Some(pos) = pos { + old_transcode_states.remove(pos); + true } else { false } - } else { - new_variant.audio_settings.is_none() - }; - - if video_same && audio_same && old_variant.metadata == new_variant.metadata { + }) && old_transcode_states.is_empty() + { + // This is resumable, so we can just continue. continue; } } // If we get here, we need to start a new transcode. tracing::info!("new variant detected, starting new transcode"); - self.api_resp.try_resume = false; + can_resume = false; break; } - self.api_resp.try_resume = self.api_resp.try_resume && old_map.is_empty(); + can_resume = can_resume && old_map.is_empty(); - if !self.api_resp.try_resume { + if can_resume { + self.api_resp.variants = Some(old_variants); + } else { // Report to API to get a new stream id. // This is because the variants have changed and therefore the client player wont be able to resume. // We need to get a new stream id so that the player can start a new session. @@ -667,7 +685,7 @@ impl Connection { .api_client .new_live_stream(NewLiveStreamRequest { old_stream_id: self.api_resp.id.to_string(), - variants: variants.clone(), + variants: Some(new_variants.clone()), }) .await { @@ -684,7 +702,7 @@ impl Connection { }; self.api_resp.id = stream_id; - self.api_resp.variants = variants; + self.api_resp.variants = Some(new_variants); } } else if let Err(e) = self .api_client @@ -693,11 +711,7 @@ impl Connection { connection_id: self.id.to_string(), updates: vec![Update { timestamp: Utc::now().timestamp() as u64, - update: Some(update::Update::Variants( - update_live_stream_request::Variants { - variants: variants.clone(), - }, - )), + update: Some(update::Update::Variants(new_variants.clone())), }], }) .await @@ -705,7 +719,7 @@ impl Connection { tracing::error!("Failed to report new stream to API: {}", e); return false; } else { - self.api_resp.variants = variants; + self.api_resp.variants = Some(new_variants); } // At this point now we need to create a new job for a transcoder to pick up and start transcoding. diff --git a/video/ingest/src/ingest/variants.rs b/video/ingest/src/ingest/variants.rs index 2921d404..8ccc0c88 100644 --- a/video/ingest/src/ingest/variants.rs +++ b/video/ingest/src/ingest/variants.rs @@ -1,45 +1,96 @@ +use aac::AudioObjectType; use mp4::codec::{AudioCodec, VideoCodec}; -use serde_json::json; use transmuxer::{AudioSettings, VideoSettings}; use uuid::Uuid; -use crate::pb::scuffle::types::{stream_variant, StreamVariant}; +use crate::pb::scuffle::types::{ + stream_variants::{transcode_state, StreamVariant, TranscodeState}, + StreamVariants, +}; pub fn generate_variants( video_settings: &VideoSettings, - audio_settings: &AudioSettings, + _audio_settings: &AudioSettings, transcode: bool, -) -> Vec { - let mut variants = Vec::new(); - - let audio_settings = stream_variant::AudioSettings { - channels: audio_settings.channels as u32, - bitrate: audio_settings.bitrate, - sample_rate: audio_settings.sample_rate, - codec: AudioCodec::Opus.to_string(), +) -> StreamVariants { + let mut variants = StreamVariants::default(); + + let mut audio_tracks = vec![]; + + if transcode { + let id = Uuid::new_v4().to_string(); + + variants.transcode_states.push(TranscodeState { + id: id.clone(), + settings: Some(transcode_state::Settings::Audio( + transcode_state::AudioSettings { + channels: 2, + sample_rate: 48000, + }, + )), + bitrate: 96 * 1024, + codec: AudioCodec::Opus.to_string(), + copy: false, + }); + + audio_tracks.push((id, "opus")); }; - variants.push(StreamVariant { - id: Uuid::new_v4().to_string(), - name: "source".to_string(), - video_settings: Some(stream_variant::VideoSettings { + { + let id = Uuid::new_v4().to_string(); + + variants.transcode_states.push(TranscodeState { + id: id.clone(), + settings: Some(transcode_state::Settings::Audio( + transcode_state::AudioSettings { + channels: 2, + sample_rate: 48000, + }, + )), + bitrate: 128 * 1024, + codec: AudioCodec::Aac { + object_type: AudioObjectType::AacLowComplexity, + } + .to_string(), + copy: false, + }); + + audio_tracks.push((id, "aac")); + }; + + variants + .stream_variants + .extend(audio_tracks.iter().map(|(id, group)| StreamVariant { + name: "audio-only".to_string(), + group: group.to_string(), + transcode_state_ids: vec![id.clone()], + })); + + { + let id = Uuid::new_v4().to_string(); + + variants.transcode_states.push(TranscodeState { + id: id.clone(), + settings: Some(transcode_state::Settings::Video( + transcode_state::VideoSettings { + framerate: video_settings.framerate as u32, + height: video_settings.height, + width: video_settings.width, + }, + )), bitrate: video_settings.bitrate, codec: video_settings.codec.to_string(), - framerate: video_settings.framerate as u32, - height: video_settings.height, - width: video_settings.width, - }), - audio_settings: Some(audio_settings.clone()), - metadata: json!({}).to_string(), - }); - - variants.push(StreamVariant { - id: Uuid::new_v4().to_string(), - name: "audio".to_string(), - video_settings: None, - audio_settings: Some(audio_settings.clone()), - metadata: json!({}).to_string(), - }); + copy: true, + }); + + variants + .stream_variants + .extend(audio_tracks.iter().map(|(track_id, group)| StreamVariant { + name: "source".to_string(), + group: group.to_string(), + transcode_state_ids: vec![id.clone(), track_id.clone()], + })); + } if transcode { let aspect_ratio = video_settings.width as f64 / video_settings.height as f64; @@ -90,24 +141,34 @@ pub fn generate_variants( continue; } - variants.push(StreamVariant { - id: Uuid::new_v4().to_string(), - name: format!("{}p", res.side), - video_settings: Some(stream_variant::VideoSettings { - width, - height, - bitrate: res.bitrate, - framerate: res.framerate, - codec: VideoCodec::Avc { - profile: 100, // High - level: 51, // 5.1 - constraint_set: 0, - } - .to_string(), - }), - audio_settings: Some(audio_settings.clone()), - metadata: json!({}).to_string(), + let id = Uuid::new_v4().to_string(); + + variants.transcode_states.push(TranscodeState { + id: id.clone(), + bitrate: res.bitrate, + codec: VideoCodec::Avc { + profile: 100, // High + level: 51, // 5.1 + constraint_set: 0, + } + .to_string(), + copy: false, + settings: Some(transcode_state::Settings::Video( + transcode_state::VideoSettings { + framerate: res.framerate, + height, + width, + }, + )), }); + + variants + .stream_variants + .extend(audio_tracks.iter().map(|(track_id, group)| StreamVariant { + name: format!("{}p", res.side), + group: group.to_string(), + transcode_state_ids: vec![id.clone(), track_id.clone()], + })); } } diff --git a/video/ingest/src/tests/grpc/ingest.rs b/video/ingest/src/tests/grpc/ingest.rs index 900f9673..0b5fb1a2 100644 --- a/video/ingest/src/tests/grpc/ingest.rs +++ b/video/ingest/src/tests/grpc/ingest.rs @@ -64,7 +64,7 @@ async fn test_grpc_ingest_transcoder_event() { .expect("failed to receive event"); match event { - GrpcRequest::Started { id } => { + GrpcRequest::TranscoderStarted { id } => { assert_eq!(id, request_id); } _ => panic!("wrong request"), @@ -87,7 +87,7 @@ async fn test_grpc_ingest_transcoder_event() { .expect("failed to receive event"); match event { - GrpcRequest::ShuttingDown { id } => { + GrpcRequest::TranscoderShuttingDown { id } => { assert_eq!(id, request_id); } _ => panic!("wrong request"), @@ -115,7 +115,7 @@ async fn test_grpc_ingest_transcoder_event() { .expect("failed to receive event"); match event { - GrpcRequest::Error { + GrpcRequest::TranscoderError { id, message, fatal: _, diff --git a/video/ingest/src/tests/ingest.rs b/video/ingest/src/tests/ingest.rs index fa6f758e..3e8c56d4 100644 --- a/video/ingest/src/tests/ingest.rs +++ b/video/ingest/src/tests/ingest.rs @@ -6,6 +6,7 @@ use std::time::Duration; use async_stream::stream; use async_trait::async_trait; +use common::prelude::FutureTimeout; use futures::StreamExt; use lapin::options::QueueDeclareOptions; use prost::Message; @@ -28,8 +29,8 @@ use crate::pb::scuffle::backend::{ UpdateLiveStreamRequest, UpdateLiveStreamResponse, }; use crate::pb::scuffle::events::{transcoder_message, TranscoderMessage}; -use crate::pb::scuffle::types::stream_variant::{AudioSettings, VideoSettings}; -use crate::pb::scuffle::types::StreamVariant; +use crate::pb::scuffle::types::stream_variants::{transcode_state, StreamVariant, TranscodeState}; +use crate::pb::scuffle::types::StreamVariants; use crate::tests::global::mock_global_state; #[derive(Debug)] @@ -343,8 +344,7 @@ impl TestState { stream_id: stream_id.to_string(), record, transcode, - try_resume: false, - variants: vec![], + variants: None, })) .await; stream_id @@ -364,36 +364,39 @@ async fn test_ingest_stream() { assert_eq!(request.stream_id, stream_id.to_string()); match &request.updates[0].update { Some(crate::pb::scuffle::backend::update_live_stream_request::update::Update::Variants(v)) => { - assert_eq!(v.variants.len(), 2); // We are not transcoding so this is source and audio only - assert_eq!(v.variants[0].name, "source"); - assert_eq!(v.variants[0].video_settings, Some(VideoSettings { + assert_eq!(v.transcode_states.len(), 2); // We are not transcoding so this is source and audio only + assert_eq!(v.stream_variants.len(), 2); // We are not transcoding so this is source and audio only + + let source_variant = v.stream_variants.iter().find(|v| v.name == "source").unwrap(); + assert_eq!(source_variant.group, "aac"); + assert_eq!(source_variant.transcode_state_ids.len(), 2); + + let audio_only_variant = v.stream_variants.iter().find(|v| v.name == "audio-only").unwrap(); + assert_eq!(audio_only_variant.group, "aac"); + assert_eq!(audio_only_variant.transcode_state_ids.len(), 1); + + let audio_transcode_state = v.transcode_states.iter().find(|s| s.id == audio_only_variant.transcode_state_ids[0]).unwrap(); + + assert_eq!(audio_transcode_state.bitrate, 128 * 1024); + assert_eq!(audio_transcode_state.codec, "mp4a.40.2"); + assert_eq!(audio_transcode_state.settings, Some(transcode_state::Settings::Audio(transcode_state::AudioSettings { + channels: 2, + sample_rate: 48000, + }))); + assert!(!audio_transcode_state.copy); + + let source_transcode_state = v.transcode_states.iter().find(|s| s.id == source_variant.transcode_state_ids[0]).unwrap(); + + assert_eq!(source_transcode_state.codec, "avc1.64001f"); + assert_eq!(source_transcode_state.bitrate, 1276158); + assert_eq!(source_transcode_state.settings, Some(transcode_state::Settings::Video(transcode_state::VideoSettings { width: 468, height: 864, framerate: 30, - bitrate: 1276158, - codec: "avc1.64001f".to_string(), - })); - assert_eq!(v.variants[0].audio_settings, Some(AudioSettings { - sample_rate: 44100, - channels: 2, - bitrate: 69568, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[0].metadata, "{}"); - assert!(!v.variants[0].id.is_empty()); - - assert_eq!(v.variants[1].name, "audio"); - assert_eq!(v.variants[1].video_settings, None); - assert_eq!(v.variants[1].audio_settings, Some(AudioSettings { - sample_rate: 44100, - channels: 2, - bitrate: 69568, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[1].metadata, "{}"); - assert!(!v.variants[1].id.is_empty()); + }))); + assert!(source_transcode_state.copy); - variants = v.variants.clone(); + variants = Some(v.clone()); send.send(Ok(UpdateLiveStreamResponse {})).unwrap(); }, @@ -465,7 +468,10 @@ async fn test_ingest_stream() { state .global .connection_manager - .submit_request(stream_id, GrpcRequest::ShuttingDown { id: request_id }) + .submit_request( + stream_id, + GrpcRequest::TranscoderShuttingDown { id: request_id }, + ) .await; match state.api_recv().await { @@ -588,7 +594,7 @@ async fn test_ingest_stream() { !state .global .connection_manager - .submit_request(stream_id, GrpcRequest::Started { id: request_id }) + .submit_request(stream_id, GrpcRequest::TranscoderStarted { id: request_id }) .await ); @@ -633,53 +639,74 @@ async fn test_ingest_stream_transcoder_disconnect() { assert_eq!(request.stream_id, stream_id.to_string()); match &request.updates[0].update { Some(crate::pb::scuffle::backend::update_live_stream_request::update::Update::Variants(v)) => { - assert_eq!(v.variants.len(), 3); // We are not transcoding so this is source and audio only - assert_eq!(v.variants[0].name, "source"); - assert_eq!(v.variants[0].video_settings, Some(VideoSettings { + assert_eq!(v.transcode_states.len(), 4); // We are not transcoding so this is source and audio only + assert_eq!(v.stream_variants.len(), 6); // We are not transcoding so this is source and audio only + + let audio_only_aac = v.stream_variants.iter().find(|v| v.name == "audio-only" && v.group == "aac").unwrap(); + assert_eq!(audio_only_aac.transcode_state_ids.len(), 1); + + let audio_only_opus = v.stream_variants.iter().find(|v| v.name == "audio-only" && v.group == "opus").unwrap(); + assert_eq!(audio_only_opus.transcode_state_ids.len(), 1); + + let source_aac = v.stream_variants.iter().find(|v| v.name == "source" && v.group == "aac").unwrap(); + assert_eq!(source_aac.transcode_state_ids.len(), 2); + + let source_opus = v.stream_variants.iter().find(|v| v.name == "source" && v.group == "opus").unwrap(); + assert_eq!(source_opus.transcode_state_ids.len(), 2); + + let _360p_aac = v.stream_variants.iter().find(|v| v.name == "360p" && v.group == "aac").unwrap(); + assert_eq!(_360p_aac.transcode_state_ids.len(), 2); + + let _360p_opus = v.stream_variants.iter().find(|v| v.name == "360p" && v.group == "opus").unwrap(); + assert_eq!(_360p_opus.transcode_state_ids.len(), 2); + + let audio_aac_transcode_state = v.transcode_states.iter().find(|s| s.id == audio_only_aac.transcode_state_ids[0]).unwrap(); + assert!(!audio_aac_transcode_state.copy); + assert_eq!(audio_aac_transcode_state.codec, "mp4a.40.2"); + assert_eq!(audio_aac_transcode_state.bitrate, 128 * 1024); + assert_eq!(audio_aac_transcode_state.settings, Some(transcode_state::Settings::Audio(transcode_state::AudioSettings { + channels: 2, + sample_rate: 48000, + }))); + + let audio_opus_transcode_state = v.transcode_states.iter().find(|s| s.id == audio_only_opus.transcode_state_ids[0]).unwrap(); + assert!(!audio_opus_transcode_state.copy); + assert_eq!(audio_opus_transcode_state.codec, "opus"); + assert_eq!(audio_opus_transcode_state.bitrate, 96 * 1024); + assert_eq!(audio_opus_transcode_state.settings, Some(transcode_state::Settings::Audio(transcode_state::AudioSettings { + channels: 2, + sample_rate: 48000, + }))); + + let source_video_transcode_state = v.transcode_states.iter().find(|s| s.id == source_aac.transcode_state_ids[0]).unwrap(); + assert!(source_video_transcode_state.copy); + assert_eq!(source_video_transcode_state.codec, "avc1.64001f"); + assert_eq!(source_video_transcode_state.bitrate, 1276158); + assert_eq!(source_video_transcode_state.settings, Some(transcode_state::Settings::Video(transcode_state::VideoSettings { width: 468, height: 864, framerate: 30, - bitrate: 1276158, - codec: "avc1.64001f".to_string(), - })); - assert_eq!(v.variants[0].audio_settings, Some(AudioSettings { - sample_rate: 44100, - channels: 2, - bitrate: 69568, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[0].metadata, "{}"); - assert!(!v.variants[0].id.is_empty()); - - assert_eq!(v.variants[1].name, "audio"); - assert_eq!(v.variants[1].video_settings, None); - assert_eq!(v.variants[1].audio_settings, Some(AudioSettings { - sample_rate: 44100, - channels: 2, - bitrate: 69568, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[1].metadata, "{}"); - assert!(!v.variants[1].id.is_empty()); - - assert_eq!(v.variants[2].name, "360p"); - assert_eq!(v.variants[2].video_settings, Some(VideoSettings { + }))); + + assert_eq!(source_aac.transcode_state_ids[0], source_opus.transcode_state_ids[0]); + assert_eq!(source_aac.transcode_state_ids[1], audio_aac_transcode_state.id); + assert_eq!(source_opus.transcode_state_ids[1], audio_opus_transcode_state.id); + + let _360p_video_transcode_state = v.transcode_states.iter().find(|s| s.id == _360p_aac.transcode_state_ids[0]).unwrap(); + assert!(!_360p_video_transcode_state.copy); + assert_eq!(_360p_video_transcode_state.codec, "avc1.640033"); + assert_eq!(_360p_video_transcode_state.bitrate, 1024000); + assert_eq!(_360p_video_transcode_state.settings, Some(transcode_state::Settings::Video(transcode_state::VideoSettings { width: 360, height: 665, framerate: 30, - bitrate: 1024000, - codec: "avc1.640033".to_string(), - })); - assert_eq!(v.variants[2].audio_settings, Some(AudioSettings { - sample_rate: 44100, - channels: 2, - bitrate: 69568, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[2].metadata, "{}"); - assert!(!v.variants[2].id.is_empty()); + }))); - variants = v.variants.clone(); + assert_eq!(_360p_aac.transcode_state_ids[0], _360p_opus.transcode_state_ids[0]); + assert_eq!(_360p_aac.transcode_state_ids[1], audio_aac_transcode_state.id); + assert_eq!(_360p_opus.transcode_state_ids[1], audio_opus_transcode_state.id); + + variants = Some(v.clone()); send.send(Ok(UpdateLiveStreamResponse {})).unwrap(); }, @@ -839,7 +866,7 @@ async fn test_ingest_stream_transcoder_disconnect() { !state .global .connection_manager - .submit_request(stream_id, GrpcRequest::Started { id: request_id }) + .submit_request(stream_id, GrpcRequest::TranscoderStarted { id: request_id }) .await ); @@ -881,35 +908,8 @@ async fn test_ingest_stream_shutdown() { assert_eq!(request.stream_id, stream_id.to_string()); match &request.updates[0].update { Some(crate::pb::scuffle::backend::update_live_stream_request::update::Update::Variants(v)) => { - assert_eq!(v.variants.len(), 2); // We are not transcoding so this is source and audio only - assert_eq!(v.variants[0].name, "source"); - assert_eq!(v.variants[0].video_settings, Some(VideoSettings { - width: 468, - height: 864, - framerate: 30, - bitrate: 1276158, - codec: "avc1.64001f".to_string(), - })); - assert_eq!(v.variants[0].audio_settings, Some(AudioSettings { - sample_rate: 44100, - channels: 2, - bitrate: 69568, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[0].metadata, "{}"); - assert!(!v.variants[0].id.is_empty()); - - assert_eq!(v.variants[1].name, "audio"); - assert_eq!(v.variants[1].video_settings, None); - assert_eq!(v.variants[1].audio_settings, Some(AudioSettings { - sample_rate: 44100, - channels: 2, - bitrate: 69568, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[1].metadata, "{}"); - assert!(!v.variants[1].id.is_empty()); - + assert_eq!(v.stream_variants.len(), 2); + assert_eq!(v.transcode_states.len(), 2); send.send(Ok(UpdateLiveStreamResponse {})).unwrap(); }, _ => panic!("unexpected update"), @@ -945,29 +945,18 @@ async fn test_ingest_stream_shutdown() { _ => panic!("unexpected event"), } - // state.global - // .nats - // .publish( - // format!( - // "{}.{}", - // state.global.config.nats.connection_subject_prefix, stream_id - // ), - // IngestMessage { - // timestamp: 0, - // id: Uuid::new_v4().to_string(), - // data: Some(ingest_message::Data::DropStream(IngestMessageDropStream { - // id: stream_id.to_string(), - // })), - // } - // .encode_to_vec() - // .into(), - // ) - // .await - // .unwrap(); + assert!( + state + .global + .connection_manager + .submit_request(stream_id, GrpcRequest::ShutdownStream) + .await + ); - assert!(ffmpeg.wait().await.is_ok()); + tracing::info!("waiting for transcoder to exit"); + + assert!(ffmpeg.wait().timeout(Duration::from_secs(1)).await.is_ok()); - // drop(transcoder_stream); state.finish().await; } @@ -984,87 +973,112 @@ async fn test_ingest_stream_transcoder_full() { assert_eq!(request.stream_id, stream_id.to_string()); match &request.updates[0].update { Some(crate::pb::scuffle::backend::update_live_stream_request::update::Update::Variants(v)) => { - assert_eq!(v.variants.len(), 5); // We are not transcoding so this is source and audio only - assert_eq!(v.variants[0].name, "source"); - assert_eq!(v.variants[0].video_settings, Some(VideoSettings { - width: 3840, - height: 2160, - framerate: 60, - bitrate: 1740285, - codec: "avc1.640034".to_string(), - })); - assert_eq!(v.variants[0].audio_settings, Some(AudioSettings { - sample_rate: 48000, + let aac_audio_only = v.stream_variants.iter().find(|v| v.name == "audio-only" && v.group == "aac").unwrap(); + assert_eq!(aac_audio_only.transcode_state_ids.len(), 1); + + let opus_audio_only = v.stream_variants.iter().find(|v| v.name == "audio-only" && v.group == "opus").unwrap(); + assert_eq!(opus_audio_only.transcode_state_ids.len(), 1); + + let aac_source = v.stream_variants.iter().find(|v| v.name == "source" && v.group == "aac").unwrap(); + assert_eq!(aac_source.transcode_state_ids.len(), 2); + + let opus_source = v.stream_variants.iter().find(|v| v.name == "source" && v.group == "opus").unwrap(); + assert_eq!(opus_source.transcode_state_ids.len(), 2); + + let aac_720p = v.stream_variants.iter().find(|v| v.name == "720p" && v.group == "aac").unwrap(); + assert_eq!(aac_720p.transcode_state_ids.len(), 2); + + let opus_720p = v.stream_variants.iter().find(|v| v.name == "720p" && v.group == "opus").unwrap(); + assert_eq!(opus_720p.transcode_state_ids.len(), 2); + + let aac_480p = v.stream_variants.iter().find(|v| v.name == "480p" && v.group == "aac").unwrap(); + assert_eq!(aac_480p.transcode_state_ids.len(), 2); + + let opus_480p = v.stream_variants.iter().find(|v| v.name == "480p" && v.group == "opus").unwrap(); + assert_eq!(opus_480p.transcode_state_ids.len(), 2); + + let aac_360p = v.stream_variants.iter().find(|v| v.name == "360p" && v.group == "aac").unwrap(); + assert_eq!(aac_360p.transcode_state_ids.len(), 2); + + let opus_360p = v.stream_variants.iter().find(|v| v.name == "360p" && v.group == "opus").unwrap(); + assert_eq!(opus_360p.transcode_state_ids.len(), 2); + + let aac_transcode_state = v.transcode_states.iter().find(|s| s.id == aac_audio_only.transcode_state_ids[0]).unwrap(); + assert_eq!(aac_transcode_state.codec, "mp4a.40.2".to_string()); + assert_eq!(aac_transcode_state.bitrate, 128 * 1024); + assert!(!aac_transcode_state.copy); + assert_eq!(aac_transcode_state.settings, Some(transcode_state::Settings::Audio(transcode_state::AudioSettings { channels: 2, - bitrate: 140304, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[0].metadata, "{}"); - assert!(!v.variants[0].id.is_empty()); - - assert_eq!(v.variants[1].name, "audio"); - assert_eq!(v.variants[1].video_settings, None); - assert_eq!(v.variants[1].audio_settings, Some(AudioSettings { - sample_rate: 48000, + sample_rate: 48_000, + }))); + + let opus_transcode_state = v.transcode_states.iter().find(|s| s.id == opus_audio_only.transcode_state_ids[0]).unwrap(); + assert_eq!(opus_transcode_state.codec, "opus".to_string()); + assert_eq!(opus_transcode_state.bitrate, 96 * 1024); + assert!(!opus_transcode_state.copy); + assert_eq!(opus_transcode_state.settings, Some(transcode_state::Settings::Audio(transcode_state::AudioSettings { channels: 2, - bitrate: 140304, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[1].metadata, "{}"); - assert!(!v.variants[1].id.is_empty()); - - assert_eq!(v.variants[2].name, "720p"); - assert_eq!(v.variants[2].video_settings, Some(VideoSettings { - width: 1280, - height: 720, + sample_rate: 48_000, + }))); + + // Now for the video source + let source_video_transcode_state = v.transcode_states.iter().find(|s| s.id == aac_source.transcode_state_ids[0]).unwrap(); + assert_eq!(source_video_transcode_state.codec, "avc1.640034".to_string()); + assert_eq!(source_video_transcode_state.bitrate, 1740285); + assert!(source_video_transcode_state.copy); + assert_eq!(source_video_transcode_state.settings, Some(transcode_state::Settings::Video(transcode_state::VideoSettings { framerate: 60, - bitrate: 4096000, - codec: "avc1.640033".to_string(), - })); - assert_eq!(v.variants[2].audio_settings, Some(AudioSettings { - sample_rate: 48000, - channels: 2, - bitrate: 140304, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[2].metadata, "{}"); - assert!(!v.variants[2].id.is_empty()); - - assert_eq!(v.variants[3].name, "480p"); - assert_eq!(v.variants[3].video_settings, Some(VideoSettings { - width: 853, + height: 2160, + width: 3840, + }))); + + assert_eq!(aac_source.transcode_state_ids[0], opus_source.transcode_state_ids[0]); + assert_eq!(aac_source.transcode_state_ids[1], aac_transcode_state.id); + assert_eq!(opus_source.transcode_state_ids[1], opus_transcode_state.id); + + let _720p_video_transcode_state = v.transcode_states.iter().find(|s| s.id == aac_720p.transcode_state_ids[0]).unwrap(); + assert_eq!(_720p_video_transcode_state.codec, "avc1.640033".to_string()); + assert_eq!(_720p_video_transcode_state.bitrate, 4000 * 1024); + assert!(!_720p_video_transcode_state.copy); + assert_eq!(_720p_video_transcode_state.settings, Some(transcode_state::Settings::Video(transcode_state::VideoSettings { + framerate: 60, + height: 720, + width: 1280, + }))); + + assert_eq!(aac_720p.transcode_state_ids[0], opus_720p.transcode_state_ids[0]); + assert_eq!(aac_720p.transcode_state_ids[1], aac_transcode_state.id); + assert_eq!(opus_720p.transcode_state_ids[1], opus_transcode_state.id); + + let _480p_video_transcode_state = v.transcode_states.iter().find(|s| s.id == aac_480p.transcode_state_ids[0]).unwrap(); + assert_eq!(_480p_video_transcode_state.codec, "avc1.640033".to_string()); + assert_eq!(_480p_video_transcode_state.bitrate, 2000 * 1024); + assert!(!_480p_video_transcode_state.copy); + assert_eq!(_480p_video_transcode_state.settings, Some(transcode_state::Settings::Video(transcode_state::VideoSettings { + framerate: 30, height: 480, + width: 853, + }))); + + assert_eq!(aac_480p.transcode_state_ids[0], opus_480p.transcode_state_ids[0]); + assert_eq!(aac_480p.transcode_state_ids[1], aac_transcode_state.id); + assert_eq!(opus_480p.transcode_state_ids[1], opus_transcode_state.id); + + let _360p_video_transcode_state = v.transcode_states.iter().find(|s| s.id == aac_360p.transcode_state_ids[0]).unwrap(); + assert_eq!(_360p_video_transcode_state.codec, "avc1.640033".to_string()); + assert_eq!(_360p_video_transcode_state.bitrate, 1000 * 1024); + assert!(!_360p_video_transcode_state.copy); + assert_eq!(_360p_video_transcode_state.settings, Some(transcode_state::Settings::Video(transcode_state::VideoSettings { framerate: 30, - bitrate: 2048000, - codec: "avc1.640033".to_string(), - })); - assert_eq!(v.variants[3].audio_settings, Some(AudioSettings { - sample_rate: 48000, - channels: 2, - bitrate: 140304, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[3].metadata, "{}"); - assert!(!v.variants[3].id.is_empty()); - - assert_eq!(v.variants[4].name, "360p"); - assert_eq!(v.variants[4].video_settings, Some(VideoSettings { - width: 640, height: 360, - framerate: 30, - bitrate: 1024000, - codec: "avc1.640033".to_string(), - })); - assert_eq!(v.variants[4].audio_settings, Some(AudioSettings { - sample_rate: 48000, - channels: 2, - bitrate: 140304, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[4].metadata, "{}"); - assert!(!v.variants[4].id.is_empty()); + width: 640, + }))); + + assert_eq!(aac_360p.transcode_state_ids[0], opus_360p.transcode_state_ids[0]); + assert_eq!(aac_360p.transcode_state_ids[1], aac_transcode_state.id); + assert_eq!(opus_360p.transcode_state_ids[1], opus_transcode_state.id); - variants = v.variants.clone(); + variants = Some(v.clone()); send.send(Ok(UpdateLiveStreamResponse {})).unwrap(); }, @@ -1135,7 +1149,7 @@ async fn test_ingest_stream_transcoder_full() { state .global .connection_manager - .submit_request(stream_id, GrpcRequest::Started { id: request_id }) + .submit_request(stream_id, GrpcRequest::TranscoderStarted { id: request_id }) .await ); @@ -1185,7 +1199,7 @@ async fn test_ingest_stream_transcoder_full() { !state .global .connection_manager - .submit_request(stream_id, GrpcRequest::Started { id: request_id }) + .submit_request(stream_id, GrpcRequest::TranscoderStarted { id: request_id }) .await ); @@ -1238,7 +1252,10 @@ async fn test_ingest_stream_reject() { !state .global .connection_manager - .submit_request(stream_id, GrpcRequest::Started { id: Uuid::new_v4() }) + .submit_request( + stream_id, + GrpcRequest::TranscoderStarted { id: Uuid::new_v4() } + ) .await ); @@ -1258,87 +1275,7 @@ async fn test_ingest_stream_transcoder_error() { assert_eq!(request.stream_id, stream_id.to_string()); match &request.updates[0].update { Some(crate::pb::scuffle::backend::update_live_stream_request::update::Update::Variants(v)) => { - assert_eq!(v.variants.len(), 5); // We are not transcoding so this is source and audio only - assert_eq!(v.variants[0].name, "source"); - assert_eq!(v.variants[0].video_settings, Some(VideoSettings { - width: 3840, - height: 2160, - framerate: 60, - bitrate: 1740285, - codec: "avc1.640034".to_string(), - })); - assert_eq!(v.variants[0].audio_settings, Some(AudioSettings { - sample_rate: 48000, - channels: 2, - bitrate: 140304, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[0].metadata, "{}"); - assert!(!v.variants[0].id.is_empty()); - - assert_eq!(v.variants[1].name, "audio"); - assert_eq!(v.variants[1].video_settings, None); - assert_eq!(v.variants[1].audio_settings, Some(AudioSettings { - sample_rate: 48000, - channels: 2, - bitrate: 140304, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[1].metadata, "{}"); - assert!(!v.variants[1].id.is_empty()); - - assert_eq!(v.variants[2].name, "720p"); - assert_eq!(v.variants[2].video_settings, Some(VideoSettings { - width: 1280, - height: 720, - framerate: 60, - bitrate: 4096000, - codec: "avc1.640033".to_string(), - })); - assert_eq!(v.variants[2].audio_settings, Some(AudioSettings { - sample_rate: 48000, - channels: 2, - bitrate: 140304, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[2].metadata, "{}"); - assert!(!v.variants[2].id.is_empty()); - - assert_eq!(v.variants[3].name, "480p"); - assert_eq!(v.variants[3].video_settings, Some(VideoSettings { - width: 853, - height: 480, - framerate: 30, - bitrate: 2048000, - codec: "avc1.640033".to_string(), - })); - assert_eq!(v.variants[3].audio_settings, Some(AudioSettings { - sample_rate: 48000, - channels: 2, - bitrate: 140304, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[3].metadata, "{}"); - assert!(!v.variants[3].id.is_empty()); - - assert_eq!(v.variants[4].name, "360p"); - assert_eq!(v.variants[4].video_settings, Some(VideoSettings { - width: 640, - height: 360, - framerate: 30, - bitrate: 1024000, - codec: "avc1.640033".to_string(), - })); - assert_eq!(v.variants[4].audio_settings, Some(AudioSettings { - sample_rate: 48000, - channels: 2, - bitrate: 140304, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[4].metadata, "{}"); - assert!(!v.variants[4].id.is_empty()); - - variants = v.variants.clone(); + variants = v.clone(); send.send(Ok(UpdateLiveStreamResponse {})).unwrap(); }, @@ -1385,7 +1322,7 @@ async fn test_ingest_stream_transcoder_error() { assert!(!data.request_id.is_empty()); assert_eq!(data.stream_id, stream_id.to_string()); - assert_eq!(data.variants, variants); + assert_eq!(data.variants, Some(variants)); // We should now be able to join the stream let stream_id = data.stream_id.parse().unwrap(); @@ -1411,7 +1348,7 @@ async fn test_ingest_stream_transcoder_error() { .connection_manager .submit_request( stream_id, - GrpcRequest::Error { + GrpcRequest::TranscoderError { id: request_id, message: "test".to_string(), fatal: false, @@ -1480,7 +1417,7 @@ async fn test_ingest_stream_transcoder_error() { !state .global .connection_manager - .submit_request(stream_id, GrpcRequest::Started { id: request_id }) + .submit_request(stream_id, GrpcRequest::TranscoderStarted { id: request_id }) .await ); @@ -1499,45 +1436,61 @@ async fn test_ingest_stream_try_resume_success() { let mut ffmpeg = stream_with_ffmpeg(state.rtmp_port, "avc_aac_large.mp4"); let stream_id = Uuid::new_v4(); - let variants = vec![ - StreamVariant { - id: Uuid::new_v4().to_string(), - metadata: "{}".to_string(), - name: "source".to_string(), - audio_settings: Some(AudioSettings { - bitrate: 140304, - channels: 2, - sample_rate: 48000, - codec: "opus".to_string(), - }), - video_settings: Some(VideoSettings { - width: 3840, - height: 2160, - framerate: 60, - bitrate: 1740285, + + let audio_transcode_id = Uuid::new_v4(); + let source_transcode_id = Uuid::new_v4(); + + let variants = StreamVariants { + stream_variants: vec![ + StreamVariant { + group: "aac".to_string(), + name: "source".to_string(), + transcode_state_ids: vec![ + source_transcode_id.to_string(), + audio_transcode_id.to_string(), + ], + }, + StreamVariant { + group: "aac".to_string(), + name: "audio-only".to_string(), + transcode_state_ids: vec![audio_transcode_id.to_string()], + }, + ], + transcode_states: vec![ + TranscodeState { + id: source_transcode_id.to_string(), codec: "avc1.640034".to_string(), - }), - }, - StreamVariant { - id: Uuid::new_v4().to_string(), - metadata: "{}".to_string(), - name: "audio".to_string(), - video_settings: None, - audio_settings: Some(AudioSettings { - bitrate: 140304, - channels: 2, - sample_rate: 48000, - codec: "opus".to_string(), - }), - }, - ]; + bitrate: 1740285, + copy: true, + settings: Some(transcode_state::Settings::Video( + transcode_state::VideoSettings { + width: 3840, + height: 2160, + framerate: 60, + }, + )), + }, + TranscodeState { + id: audio_transcode_id.to_string(), + codec: "mp4a.40.2".to_string(), + bitrate: 128 * 1024, + copy: false, + settings: Some(transcode_state::Settings::Audio( + transcode_state::AudioSettings { + channels: 2, + sample_rate: 48000, + }, + )), + }, + ], + }; + state .api_assert_authenticate(Ok(AuthenticateLiveStreamResponse { stream_id: stream_id.to_string(), record: false, transcode: false, - try_resume: true, - variants: variants.clone(), + variants: Some(variants.clone()), })) .await; @@ -1578,7 +1531,7 @@ async fn test_ingest_stream_try_resume_success() { assert!(!data.request_id.is_empty()); assert_eq!(data.stream_id, stream_id.to_string()); - assert_eq!(data.variants, variants); + assert_eq!(data.variants, Some(variants)); // We should now be able to join the stream let stream_id = data.stream_id.parse().unwrap(); @@ -1602,7 +1555,7 @@ async fn test_ingest_stream_try_resume_success() { state .global .connection_manager - .submit_request(stream_id, GrpcRequest::Started { id: request_id }) + .submit_request(stream_id, GrpcRequest::TranscoderStarted { id: request_id }) .await ); @@ -1652,7 +1605,7 @@ async fn test_ingest_stream_try_resume_success() { !state .global .connection_manager - .submit_request(stream_id, GrpcRequest::Started { id: request_id }) + .submit_request(stream_id, GrpcRequest::TranscoderStarted { id: request_id }) .await ); @@ -1688,44 +1641,59 @@ async fn test_ingest_stream_try_resume_failed() { let mut ffmpeg = stream_with_ffmpeg(state.rtmp_port, "avc_aac_large.mp4"); let mut stream_id = Uuid::new_v4(); + + let audio_transcode_id = Uuid::new_v4(); + let source_transcode_id = Uuid::new_v4(); + state .api_assert_authenticate(Ok(AuthenticateLiveStreamResponse { stream_id: stream_id.to_string(), record: false, transcode: false, - try_resume: true, - variants: vec![ - StreamVariant { - id: Uuid::new_v4().to_string(), - metadata: "{}".to_string(), - name: "source".to_string(), - audio_settings: Some(AudioSettings { - bitrate: 140304, - channels: 2, - sample_rate: 48000, - codec: "opus".to_string(), - }), - video_settings: Some(VideoSettings { - width: 1920, - height: 1080, - framerate: 60, - bitrate: 1740285, + variants: Some(StreamVariants { + stream_variants: vec![ + StreamVariant { + group: "aac".to_string(), + name: "source".to_string(), + transcode_state_ids: vec![ + source_transcode_id.to_string(), + audio_transcode_id.to_string(), + ], + }, + StreamVariant { + group: "aac".to_string(), + name: "audio-only".to_string(), + transcode_state_ids: vec![audio_transcode_id.to_string()], + }, + ], + transcode_states: vec![ + TranscodeState { + id: source_transcode_id.to_string(), codec: "avc1.640034".to_string(), - }), - }, - StreamVariant { - id: Uuid::new_v4().to_string(), - metadata: "{}".to_string(), - name: "audio".to_string(), - video_settings: None, - audio_settings: Some(AudioSettings { - bitrate: 140304, - channels: 2, - sample_rate: 48000, - codec: "opus".to_string(), - }), - }, - ], + bitrate: 1740285, + copy: true, + settings: Some(transcode_state::Settings::Video( + transcode_state::VideoSettings { + width: 3840, + height: 2160, + framerate: 30, // Note we changed this to 30fps from 60 so that we could cause the stream to fail + }, + )), + }, + TranscodeState { + id: audio_transcode_id.to_string(), + codec: "mp4a.40.2".to_string(), + bitrate: 128 * 1024, + copy: false, + settings: Some(transcode_state::Settings::Audio( + transcode_state::AudioSettings { + channels: 2, + sample_rate: 48000, + }, + )), + }, + ], + }), })) .await; @@ -1733,59 +1701,8 @@ async fn test_ingest_stream_try_resume_failed() { match state.api_recv().await { IncomingRequest::New((new, response)) => { assert_eq!(new.old_stream_id, stream_id.to_string()); - assert_eq!(new.variants.len(), 2); - - assert_eq!(new.variants[0].name, "source"); - assert_eq!( - new.variants[0].audio_settings.as_ref().unwrap().bitrate, - 140304 - ); - assert_eq!( - new.variants[0].video_settings.as_ref().unwrap().bitrate, - 1740285 - ); - assert_eq!( - new.variants[0].video_settings.as_ref().unwrap().framerate, - 60 - ); - assert_eq!(new.variants[0].video_settings.as_ref().unwrap().width, 3840); - assert_eq!( - new.variants[0].video_settings.as_ref().unwrap().height, - 2160 - ); - assert_eq!( - new.variants[0].video_settings.as_ref().unwrap().codec, - "avc1.640034" - ); - assert_eq!( - new.variants[0].audio_settings.as_ref().unwrap().codec, - "opus" - ); - assert_eq!(new.variants[0].audio_settings.as_ref().unwrap().channels, 2); - assert_eq!( - new.variants[0].audio_settings.as_ref().unwrap().sample_rate, - 48000 - ); - assert_eq!(new.variants[0].metadata, "{}"); - - assert_eq!(new.variants[1].name, "audio"); - assert_eq!( - new.variants[1].audio_settings.as_ref().unwrap().bitrate, - 140304 - ); - assert_eq!(new.variants[1].video_settings, None); - assert_eq!( - new.variants[1].audio_settings.as_ref().unwrap().codec, - "opus" - ); - assert_eq!(new.variants[1].audio_settings.as_ref().unwrap().channels, 2); - assert_eq!( - new.variants[1].audio_settings.as_ref().unwrap().sample_rate, - 48000 - ); - assert_eq!(new.variants[1].metadata, "{}"); - - variants = new.variants; + + variants = Some(new.variants.unwrap()); stream_id = Uuid::new_v4(); @@ -1859,7 +1776,7 @@ async fn test_ingest_stream_try_resume_failed() { state .global .connection_manager - .submit_request(stream_id, GrpcRequest::Started { id: request_id }) + .submit_request(stream_id, GrpcRequest::TranscoderStarted { id: request_id }) .await ); @@ -1909,7 +1826,7 @@ async fn test_ingest_stream_try_resume_failed() { !state .global .connection_manager - .submit_request(stream_id, GrpcRequest::Started { id: request_id }) + .submit_request(stream_id, GrpcRequest::TranscoderStarted { id: request_id }) .await ); @@ -1954,8 +1871,7 @@ async fn test_ingest_stream_transcoder_full_tls(tls_dir: PathBuf) { stream_id: stream_id.to_string(), record: false, transcode: true, - try_resume: false, - variants: vec![], + variants: None, })) .unwrap(); } @@ -1968,88 +1884,7 @@ async fn test_ingest_stream_transcoder_full_tls(tls_dir: PathBuf) { assert_eq!(request.stream_id, stream_id.to_string()); match &request.updates[0].update { Some(crate::pb::scuffle::backend::update_live_stream_request::update::Update::Variants(v)) => { - assert_eq!(v.variants.len(), 5); // We are not transcoding so this is source and audio only - assert_eq!(v.variants[0].name, "source"); - assert_eq!(v.variants[0].video_settings, Some(VideoSettings { - width: 3840, - height: 2160, - framerate: 60, - bitrate: 1740285, - codec: "avc1.640034".to_string(), - })); - assert_eq!(v.variants[0].audio_settings, Some(AudioSettings { - sample_rate: 48000, - channels: 2, - bitrate: 140304, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[0].metadata, "{}"); - assert!(!v.variants[0].id.is_empty()); - - assert_eq!(v.variants[1].name, "audio"); - assert_eq!(v.variants[1].video_settings, None); - assert_eq!(v.variants[1].audio_settings, Some(AudioSettings { - sample_rate: 48000, - channels: 2, - bitrate: 140304, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[1].metadata, "{}"); - assert!(!v.variants[1].id.is_empty()); - - assert_eq!(v.variants[2].name, "720p"); - assert_eq!(v.variants[2].video_settings, Some(VideoSettings { - width: 1280, - height: 720, - framerate: 60, - bitrate: 4096000, - codec: "avc1.640033".to_string(), - })); - assert_eq!(v.variants[2].audio_settings, Some(AudioSettings { - sample_rate: 48000, - channels: 2, - bitrate: 140304, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[2].metadata, "{}"); - assert!(!v.variants[2].id.is_empty()); - - assert_eq!(v.variants[3].name, "480p"); - assert_eq!(v.variants[3].video_settings, Some(VideoSettings { - width: 853, - height: 480, - framerate: 30, - bitrate: 2048000, - codec: "avc1.640033".to_string(), - })); - assert_eq!(v.variants[3].audio_settings, Some(AudioSettings { - sample_rate: 48000, - channels: 2, - bitrate: 140304, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[3].metadata, "{}"); - assert!(!v.variants[3].id.is_empty()); - - assert_eq!(v.variants[4].name, "360p"); - assert_eq!(v.variants[4].video_settings, Some(VideoSettings { - width: 640, - height: 360, - framerate: 30, - bitrate: 1024000, - codec: "avc1.640033".to_string(), - })); - assert_eq!(v.variants[4].audio_settings, Some(AudioSettings { - sample_rate: 48000, - channels: 2, - bitrate: 140304, - codec: "opus".to_string(), - })); - assert_eq!(v.variants[4].metadata, "{}"); - assert!(!v.variants[4].id.is_empty()); - - variants = v.variants.clone(); - + variants = Some(v.clone()); send.send(Ok(UpdateLiveStreamResponse {})).unwrap(); }, _ => panic!("unexpected update"), @@ -2119,7 +1954,7 @@ async fn test_ingest_stream_transcoder_full_tls(tls_dir: PathBuf) { state .global .connection_manager - .submit_request(stream_id, GrpcRequest::Started { id: request_id }) + .submit_request(stream_id, GrpcRequest::TranscoderStarted { id: request_id }) .await ); @@ -2169,7 +2004,7 @@ async fn test_ingest_stream_transcoder_full_tls(tls_dir: PathBuf) { !state .global .connection_manager - .submit_request(stream_id, GrpcRequest::Started { id: request_id }) + .submit_request(stream_id, GrpcRequest::TranscoderStarted { id: request_id }) .await ); @@ -2271,7 +2106,7 @@ async fn test_ingest_stream_transcoder_probe() { state .global .connection_manager - .submit_request(stream_id, GrpcRequest::Started { id: request_id }) + .submit_request(stream_id, GrpcRequest::TranscoderStarted { id: request_id }) .await ); @@ -2341,7 +2176,7 @@ async fn test_ingest_stream_transcoder_probe() { !state .global .connection_manager - .submit_request(stream_id, GrpcRequest::Started { id: request_id }) + .submit_request(stream_id, GrpcRequest::TranscoderStarted { id: request_id }) .await ); @@ -2420,7 +2255,7 @@ async fn test_ingest_stream_transcoder_probe_reconnect() { state .global .connection_manager - .submit_request(stream_id, GrpcRequest::Started { id: request_id }) + .submit_request(stream_id, GrpcRequest::TranscoderStarted { id: request_id }) .await ); @@ -2486,7 +2321,10 @@ async fn test_ingest_stream_transcoder_probe_reconnect() { state .global .connection_manager - .submit_request(stream_id, GrpcRequest::ShuttingDown { id: request_id }) + .submit_request( + stream_id, + GrpcRequest::TranscoderShuttingDown { id: request_id } + ) .await ); @@ -2539,7 +2377,7 @@ async fn test_ingest_stream_transcoder_probe_reconnect() { state .global .connection_manager - .submit_request(stream_id, GrpcRequest::Started { id: request_id }) + .submit_request(stream_id, GrpcRequest::TranscoderStarted { id: request_id }) .await ); @@ -2609,7 +2447,7 @@ async fn test_ingest_stream_transcoder_probe_reconnect() { !state .global .connection_manager - .submit_request(stream_id, GrpcRequest::Started { id: request_id }) + .submit_request(stream_id, GrpcRequest::TranscoderStarted { id: request_id }) .await ); @@ -2688,7 +2526,7 @@ async fn test_ingest_stream_transcoder_probe_reconnect_unexpected() { state .global .connection_manager - .submit_request(stream_id, GrpcRequest::Started { id: request_id }) + .submit_request(stream_id, GrpcRequest::TranscoderStarted { id: request_id }) .await ); @@ -2795,7 +2633,7 @@ async fn test_ingest_stream_transcoder_probe_reconnect_unexpected() { state .global .connection_manager - .submit_request(stream_id, GrpcRequest::Started { id: request_id }) + .submit_request(stream_id, GrpcRequest::TranscoderStarted { id: request_id }) .await ); @@ -2865,7 +2703,7 @@ async fn test_ingest_stream_transcoder_probe_reconnect_unexpected() { !state .global .connection_manager - .submit_request(stream_id, GrpcRequest::Started { id: request_id }) + .submit_request(stream_id, GrpcRequest::TranscoderStarted { id: request_id }) .await ); diff --git a/video/transcoder/Cargo.toml b/video/transcoder/Cargo.toml index ce4744b9..63fbcaf1 100644 --- a/video/transcoder/Cargo.toml +++ b/video/transcoder/Cargo.toml @@ -28,7 +28,9 @@ sha2 = "0" tokio-util = "0" tokio-stream = "0" lapin = { version = "2.0.3", features = ["native-tls"] } +uuid = { version = "1", features = ["v4"] } +aac = { path = "../codec/aac" } mp4 = { path = "../container/mp4" } common = { path = "../../common" } bytesio = { path = "../bytesio" } diff --git a/video/transcoder/src/global.rs b/video/transcoder/src/global.rs index e1a967ed..d93801f5 100644 --- a/video/transcoder/src/global.rs +++ b/video/transcoder/src/global.rs @@ -35,7 +35,7 @@ impl GlobalState { } } -pub async fn init_rmq(global: &Arc) { +pub async fn init_rmq(global: &Arc, durable: bool) { let channel = global.rmq.aquire().await.expect("failed to create channel"); let mut options = FieldTable::default(); @@ -46,7 +46,7 @@ pub async fn init_rmq(global: &Arc) { .queue_declare( &global.config.rmq.transcoder_queue, QueueDeclareOptions { - durable: true, + durable, ..Default::default() }, options, diff --git a/video/transcoder/src/main.rs b/video/transcoder/src/main.rs index 0cc969ed..338f14dd 100644 --- a/video/transcoder/src/main.rs +++ b/video/transcoder/src/main.rs @@ -44,7 +44,7 @@ async fn main() -> Result<()> { let global = Arc::new(global::GlobalState::new(config, ctx, rmq, redis)); - global::init_rmq(&global).await; + global::init_rmq(&global, true).await; tracing::info!("initialized rmq"); let transcoder_future = tokio::spawn(transcoder::run(global.clone())); diff --git a/video/transcoder/src/tests/mod.rs b/video/transcoder/src/tests/mod.rs index 9a32ded3..753fd069 100644 --- a/video/transcoder/src/tests/mod.rs +++ b/video/transcoder/src/tests/mod.rs @@ -1,3 +1,4 @@ mod config; mod global; mod grpc; +mod transcoder; diff --git a/video/transcoder/src/tests/transcoder/mod.rs b/video/transcoder/src/tests/transcoder/mod.rs new file mode 100644 index 00000000..b3f251cc --- /dev/null +++ b/video/transcoder/src/tests/transcoder/mod.rs @@ -0,0 +1,185 @@ +// TODO: This is the test stub for the transcoder service. It is not yet implemented. +#![allow(unused_imports)] +#![allow(dead_code)] + +use std::{net::SocketAddr, pin::Pin, sync::Arc}; + +use async_trait::async_trait; +use chrono::Utc; +use futures_util::Stream; +use lapin::BasicProperties; +use prost::Message; +use tokio::sync::{mpsc, oneshot}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{Request, Response}; +use uuid::Uuid; + +use crate::{ + config::{AppConfig, RmqConfig}, + global::{self, GlobalState}, + pb::scuffle::{ + events::{self, transcoder_message}, + video::{ + ingest_server::{Ingest, IngestServer}, + ShutdownStreamRequest, ShutdownStreamResponse, TranscoderEventRequest, + TranscoderEventResponse, WatchStreamRequest, WatchStreamResponse, + }, + }, + transcoder, +}; + +struct ImplIngestServer { + tx: mpsc::Sender, +} + +#[derive(Debug)] +enum IngestRequest { + WatchStream { + request: WatchStreamRequest, + tx: mpsc::Sender>, + }, + TranscoderEvent { + request: TranscoderEventRequest, + tx: oneshot::Sender, + }, + Shutdown { + request: ShutdownStreamRequest, + tx: oneshot::Sender, + }, +} + +type Result = std::result::Result; + +#[async_trait] +impl Ingest for ImplIngestServer { + type WatchStreamStream = + Pin> + 'static + Send>>; + + async fn watch_stream( + &self, + request: tonic::Request, + ) -> Result> { + let (tx, rx) = mpsc::channel(256); + let request = IngestRequest::WatchStream { + request: request.into_inner(), + tx, + }; + self.tx.send(request).await.unwrap(); + Ok(Response::new(Box::pin(ReceiverStream::new(rx)))) + } + + async fn transcoder_event( + &self, + request: Request, + ) -> Result> { + let (tx, rx) = oneshot::channel(); + let request = IngestRequest::TranscoderEvent { + request: request.into_inner(), + tx, + }; + + self.tx.send(request).await.unwrap(); + Ok(Response::new(rx.await.unwrap())) + } + + async fn shutdown_stream( + &self, + request: Request, + ) -> Result> { + let (tx, rx) = oneshot::channel(); + let request = IngestRequest::Shutdown { + request: request.into_inner(), + tx, + }; + + self.tx.send(request).await.unwrap(); + Ok(Response::new(rx.await.unwrap())) + } +} + +fn setup_ingest_server( + global: Arc, + bind: impl Into, +) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(256); + let server = ImplIngestServer { tx }; + let bind = bind.into(); + + tokio::spawn(async move { + tonic::transport::Server::builder() + .add_service(IngestServer::new(server)) + .serve_with_shutdown(bind, async move { + global.ctx.done().await; + }) + .await + .unwrap(); + }); + + rx +} + +// #[tokio::test] +// async fn test_transcode() { +// let port = portpicker::pick_unused_port().unwrap(); + +// let (global, handler) = crate::tests::global::mock_global_state(AppConfig { +// rmq: RmqConfig { +// transcoder_queue: Uuid::new_v4().to_string(), +// uri: "".to_string(), +// }, +// ..Default::default() +// }) +// .await; + +// global::init_rmq(&global, false).await; + +// let addr = SocketAddr::from(([127, 0, 0, 1], port)); + +// let mut rx = setup_ingest_server(global.clone(), addr); + +// let transcoder_run_handle = tokio::spawn(transcoder::run(global.clone())); + +// let channel = global.rmq.aquire().await.unwrap(); + +// let req_id = Uuid::new_v4(); + +// channel +// .basic_publish( +// "", +// &global.config.rmq.transcoder_queue, +// lapin::options::BasicPublishOptions::default(), +// events::TranscoderMessage { +// id: req_id.to_string(), +// timestamp: Utc::now().timestamp() as u64, +// data: Some(transcoder_message::Data::NewStream( +// events::TranscoderMessageNewStream { +// request_id: req_id.to_string(), +// stream_id: req_id.to_string(), +// ingest_address: addr.to_string(), +// variants: None, +// }, +// )), +// } +// .encode_to_vec() +// .as_slice(), +// BasicProperties::default() +// .with_message_id(req_id.to_string().into()) +// .with_content_type("application/octet-stream".into()) +// .with_expiration("60000".into()), +// ) +// .await +// .unwrap(); + +// let watch_stream_req = match rx.recv().await.unwrap() { +// IngestRequest::WatchStream { request, tx } => { +// assert_eq!(request.stream_id, req_id.to_string()); +// assert_eq!(request.request_id, req_id.to_string()); + +// tx +// } +// _ => panic!("unexpected request"), +// }; + +// drop(global); +// handler.cancel().await; +// } diff --git a/video/transcoder/src/transcoder/job/mod.rs b/video/transcoder/src/transcoder/job/mod.rs index 2bd1b76c..33fee0f8 100644 --- a/video/transcoder/src/transcoder/job/mod.rs +++ b/video/transcoder/src/transcoder/job/mod.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::io; use std::process::Output; use std::{ @@ -12,6 +13,7 @@ use fred::types::Expiration; use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use lapin::message::Delivery; use lapin::options::BasicAckOptions; +use mp4::codec::{AudioCodec, VideoCodec}; use nix::sys::signal; use nix::unistd::Pid; use prost::Message as _; @@ -25,7 +27,7 @@ use tokio::{ use tokio_util::sync::CancellationToken; use tonic::{transport::Channel, Status}; -use crate::pb::scuffle::types::StreamVariant; +use crate::pb::scuffle::types::{stream_variants, StreamVariants}; use crate::transcoder::job::utils::{release_lock, set_lock, SharedFuture}; use crate::{ global::GlobalState, @@ -119,7 +121,7 @@ fn redis_master_playlist_key(stream_id: &str) -> String { fn set_master_playlist( global: Arc, stream_id: &str, - variants: &[StreamVariant], + state: &StreamVariants, lock: CancellationToken, ) -> impl futures::Future> + Send + 'static { let playlist_key = redis_master_playlist_key(stream_id); @@ -128,62 +130,85 @@ fn set_master_playlist( playlist.push_str("#EXTM3U\n"); - // Find audio only variant - let audio_variant = variants - .iter() - .find(|v| v.video_settings.is_none()) - .expect("no audio only variant found"); + let mut state_map = HashMap::new(); - playlist.push_str(&format!("#EXT-X-MEDIA:TYPE=AUDIO,GROUP-ID=\"{}\",NAME=\"{}\",AUTOSELECT=YES,DEFAULT=YES,URI=\"{}/index.m3u8\"\n", audio_variant.name, audio_variant.name, audio_variant.id)); + for transcode_state in state.transcode_states.iter() { + playlist.push_str(format!("#EXT-X-MEDIA:TYPE={},GROUP-ID=\"{}\",NAME=\"{}\",AUTOSELECT=YES,DEFAULT=YES,URI=\"{}/index.m3u8\"\n", match transcode_state.settings.as_ref().unwrap() { + stream_variants::transcode_state::Settings::Video(_) => { + "VIDEO" + }, + stream_variants::transcode_state::Settings::Audio(_) => { + "AUDIO" + }, + }, transcode_state.id, transcode_state.id, transcode_state.id).as_str()); - for variant in variants { - let mut options = vec![ - format!( - "BANDWIDTH={}", - variant - .audio_settings - .as_ref() - .map(|a| a.bitrate) - .unwrap_or_default() - + variant - .video_settings - .as_ref() - .map(|v| v.bitrate) - .unwrap_or_default() - ), - format!( - "CODECS=\"{}\"", - variant - .video_settings - .as_ref() - .map(|v| v.codec.clone()) - .into_iter() - .chain( - variant - .audio_settings - .as_ref() - .map(|a| a.codec.clone()) - .into_iter() - ) - .collect::>() - .join(",") - ), + state_map.insert(transcode_state.id.as_str(), transcode_state); + } + + for stream_variant in state.stream_variants.iter() { + let video_transcode_state = stream_variant.transcode_state_ids.iter().find_map(|id| { + let t = state_map.get(id.as_str()).unwrap(); + if matches!( + t.settings, + Some(stream_variants::transcode_state::Settings::Video(_)) + ) { + Some(t) + } else { + None + } + }); + + let audio_transcode_state = stream_variant.transcode_state_ids.iter().find_map(|id| { + let t = state_map.get(id.as_str()).unwrap(); + if matches!( + t.settings, + Some(stream_variants::transcode_state::Settings::Audio(_)) + ) { + Some(t) + } else { + None + } + }); + + let bandwidth = video_transcode_state.map(|t| t.bitrate).unwrap_or(0) + + audio_transcode_state.map(|t| t.bitrate).unwrap_or(0); + let codecs = video_transcode_state + .iter() + .chain(audio_transcode_state.iter()) + .map(|t| t.codec.as_str()) + .collect::>() + .join(","); + + let mut tags = vec![ + format!("GROUP=\"{}\"", stream_variant.group), + format!("NAME=\"{}\"", stream_variant.name), + format!("BANDWIDTH={}", bandwidth), + format!("CODECS=\"{}\"", codecs), ]; - options.push(format!("AUDIO=\"{}\"", audio_variant.name)); + if let Some(video) = video_transcode_state { + let settings = match video.settings.as_ref() { + Some(stream_variants::transcode_state::Settings::Video(settings)) => settings, + _ => unreachable!(), + }; - if let Some(video_settings) = &variant.video_settings { - options.push(format!( - "RESOLUTION={}x{}", - video_settings.width, video_settings.height - )); - options.push(format!("FRAME-RATE={}", video_settings.framerate)); - options.push(format!("VIDEO=\"{}\"", variant.name)); - playlist.push_str(format!("#EXT-X-MEDIA:TYPE=VIDEO,GROUP-ID=\"{}\",NAME=\"{}\",AUTOSELECT=YES,DEFAULT=YES\n", variant.name, variant.name).as_str()); + tags.push(format!("RESOLUTION={}x{}", settings.width, settings.height)); + tags.push(format!("FRAME-RATE={}", settings.framerate)); + tags.push(format!("VIDEO=\"{}\"", video.id)); + } + + if let Some(audio) = audio_transcode_state { + tags.push(format!("AUDIO=\"{}\"", audio.id)); } - playlist.push_str(&format!("#EXT-X-STREAM-INF:{}\n", options.join(","))); - playlist.push_str(&format!("{}/index.m3u8\n", variant.id)) + playlist.push_str( + format!( + "#EXT-X-STREAM-INF:{}\n{}/index.m3u8\n", + tags.join(","), + video_transcode_state.or(audio_transcode_state).unwrap().id + ) + .as_str(), + ); } async move { @@ -209,6 +234,10 @@ fn set_master_playlist( } impl Job { + fn variants(&self) -> &StreamVariants { + self.req.variants.as_ref().unwrap() + } + async fn run(&mut self, global: Arc, shutdown_token: CancellationToken) { tracing::info!("starting transcode job"); let mut set_lock_fut = pin!(set_lock( @@ -221,7 +250,7 @@ impl Job { let mut update_playlist_fut = pin!(set_master_playlist( global.clone(), &self.req.stream_id, - &self.req.variants, + self.req.variants.as_ref().unwrap(), self.lock_owner.child_token(), )); @@ -236,60 +265,49 @@ impl Job { let mut futures = FuturesUnordered::new(); - for v in &self.req.variants { - let socket = match UnixListener::bind(socket_dir.join(format!("{}.sock", v.id))) { - Ok(s) => s, - Err(err) => { - tracing::error!("failed to bind socket: {}", err); - self.report_error("Failed to bind socket", false).await; - return; - } - }; + let variants = self.variants(); + + for transcode_state in variants.transcode_states.iter() { + let socket = + match UnixListener::bind(socket_dir.join(format!("{}.sock", transcode_state.id))) { + Ok(s) => s, + Err(err) => { + tracing::error!("failed to bind socket: {}", err); + self.report_error("Failed to bind socket", false).await; + return; + } + }; futures.push(variant::handle_variant( global.clone(), self.req.stream_id.clone(), - v.id.clone(), + transcode_state.id.clone(), self.req.request_id.clone(), socket, )); } - let custom_variants = self - .req - .variants + let filter_graph_items = self + .variants() + .transcode_states .iter() - .filter(|v| v.name != "source" && v.video_settings.is_some()) + .filter(|v| { + !v.copy + && matches!( + v.settings, + Some(stream_variants::transcode_state::Settings::Video(_)) + ) + }) .collect::>(); - let Some(source_variant) = self - .req - .variants - .iter() - .find(|v| v.name == "source") else { - self.report_error("no source variant", true).await; - tracing::error!("no source variant"); - return; - }; - - let Some(audio_variant) = self - .req - .variants - .iter() - .find(|v| v.name == "audio") else { - self.report_error("no audio variant", true).await; - tracing::error!("no audio variant"); - return; - }; - - let filter_graph = custom_variants + let filter_graph = filter_graph_items .iter() .enumerate() .map(|(i, v)| { - let video = v - .video_settings - .as_ref() - .expect("video settings checked above"); + let settings = match v.settings.as_ref().unwrap() { + stream_variants::transcode_state::Settings::Video(v) => v, + _ => unreachable!(), + }; let previous = if i == 0 { "[0:v]".to_string() @@ -300,27 +318,18 @@ impl Job { format!( "{}scale={}:{},pad=ceil(iw/2)*2:ceil(ih/2)*2{}", previous, - video.width, - video.height, - if i == custom_variants.len() - 1 { - format!("[{}]", v.name) + settings.width, + settings.height, + if i == filter_graph_items.len() - 1 { + format!("[{}]", v.id) } else { - format!(",split=2[{}][{}_out]", v.name, i) + format!(",split=2[{}][{}_out]", v.id, i) } ) }) .collect::>() .join(";"); - // We need to build a ffmpeg command. - let Some(audio_settings) = audio_variant - .audio_settings - .as_ref() else { - self.report_error("no audio settings", true).await; - tracing::error!("no audio settings"); - return; - }; - const MP4_FLAGS: &str = "+frag_keyframe+empty_moov+default_base_moof"; #[rustfmt::skip] @@ -329,63 +338,169 @@ impl Job { "-i", "-", "-probesize", "250M", "-analyzeduration", "250M", - "-map", "0:v", - "-c:v", "copy", - "-f", "mp4", - "-movflags", MP4_FLAGS, - "-frag_duration", "1", - format!( - "unix://{}", - socket_dir - .join(format!("{}.sock", source_variant.id)) - .display() - ), - "-map", "0:a", - "-c:a", "libopus", - "-b:a", format!("{}", audio_settings.bitrate), - "-ac:a", format!("{}", audio_settings.channels), - "-ar:a", format!("{}", audio_settings.sample_rate), - "-f", "mp4", - "-movflags", MP4_FLAGS, - "-frag_duration", "1", - format!( - "unix://{}", - socket_dir - .join(format!("{}.sock", audio_variant.id)) - .display() - ), ]; if !filter_graph.is_empty() { args.extend(vec_of_strings!["-filter_complex", filter_graph]); } - for v in custom_variants { - let video = v.video_settings.as_ref().expect("video settings"); + for state in variants.transcode_states.iter() { + match state.settings { + Some(stream_variants::transcode_state::Settings::Video(ref video)) => { + if state.copy { + #[rustfmt::skip] + args.extend(vec_of_strings![ + "-map", "0:v", + "-c:v", "copy", + ]); + } else { + let codec: VideoCodec = match state.codec.parse() { + Ok(c) => c, + Err(err) => { + tracing::error!("invalid video codec: {}", err); + self.report_error("Invalid video codec", false).await; + return; + } + }; + + match codec { + VideoCodec::Avc { profile, level, .. } => { + #[rustfmt::skip] + args.extend(vec_of_strings![ + "-map", format!("[{}]", state.id), + "-c:v", "libx264", + "-preset", "medium", + "-b:v", format!("{}", state.bitrate), + "-maxrate", format!("{}", state.bitrate), + "-bufsize", format!("{}", state.bitrate * 2), + "-profile:v", match profile { + 66 => "baseline", + 77 => "main", + 100 => "high", + _ => { + tracing::error!("invalid avc profile: {}", profile); + self.report_error("Invalid avc profile", false).await; + return; + }, + }, + "-level:v", match level { + 30 => "3.0", + 31 => "3.1", + 32 => "3.2", + 40 => "4.0", + 41 => "4.1", + 50 => "5.0", + 51 => "5.1", + 52 => "5.2", + 60 => "6.0", + 61 => "6.1", + 62 => "6.2", + _ => { + tracing::error!("invalid avc level: {}", level); + self.report_error("Invalid avc level", false).await; + return; + }, + }, + "-pix_fmt", "yuv420p", + "-g", format!("{}", video.framerate * 2), + "-keyint_min", format!("{}", video.framerate * 2), + "-sc_threshold", "0", + "-r", format!("{}", video.framerate), + "-crf", "23", + "-tune", "zerolatency", + ]); + } + VideoCodec::Av1 { .. } => { + tracing::error!("av1 is not supported"); + self.report_error("AV1 is not supported", false).await; + return; + } + VideoCodec::Hevc { .. } => { + tracing::error!("hevc is not supported"); + self.report_error("HEVC is not supported", false).await; + return; + } + } + } + } + Some(stream_variants::transcode_state::Settings::Audio(ref audio)) => { + if state.copy { + tracing::error!("audio copy is not supported"); + self.report_error("Audio copy is not supported", false) + .await; + return; + } else { + let codec: AudioCodec = match state.codec.parse() { + Ok(c) => c, + Err(err) => { + tracing::error!("invalid audio codec: {}", err); + self.report_error("Invalid audio codec", false).await; + return; + } + }; + + match codec { + AudioCodec::Aac { object_type } => { + args.extend(vec_of_strings![ + "-map", + "0:a", + "-c:a", + "aac", + "-b:a", + format!("{}", state.bitrate), + "-ar", + format!("{}", audio.sample_rate), + "-ac", + format!("{}", audio.channels), + "-profile:a", + match object_type { + aac::AudioObjectType::AacLowComplexity => { + "aac_low" + } + aac::AudioObjectType::AacMain => { + "aac_main" + } + aac::AudioObjectType::Unknown(profile) => { + tracing::error!("invalid aac profile: {}", profile); + self.report_error("Invalid aac profile", false).await; + return; + } + }, + ]); + } + AudioCodec::Opus => { + args.extend(vec_of_strings![ + "-map", + "0:a", + "-c:a", + "libopus", + "-b:a", + format!("{}", state.bitrate), + "-ar", + format!("{}", audio.sample_rate), + "-ac", + format!("{}", audio.channels), + ]); + } + } + } + } + None => { + tracing::error!("no settings for variant {}", state.id); + self.report_error("No settings for variant", true).await; + return; + } + } + // Common args regardless of copy or transcode mode #[rustfmt::skip] args.extend(vec_of_strings![ - "-map", format!("[{}]", v.name), - "-c:v", "libx264", - "-preset", "medium", - "-b:v", format!("{}", video.bitrate), - "-maxrate", format!("{}", video.bitrate), - "-bufsize", format!("{}", video.bitrate * 2), - "-profile:v", "high", - "-level:v", "5.1", - "-pix_fmt", "yuv420p", - "-g", format!("{}", video.framerate * 2), - "-keyint_min", format!("{}", video.framerate * 2), - "-sc_threshold", "0", - "-r", format!("{}", video.framerate), - "-crf", "23", - "-tune", "zerolatency", "-f", "mp4", "-movflags", MP4_FLAGS, "-frag_duration", "1", format!( "unix://{}", - socket_dir.join(format!("{}.sock", v.id)).display() + socket_dir.join(format!("{}.sock", state.id)).display() ), ]); }