Skip to content

Commit

Permalink
wd
Browse files Browse the repository at this point in the history
  • Loading branch information
TroyKomodo committed May 3, 2024
1 parent d9e26a5 commit fc48d0e
Show file tree
Hide file tree
Showing 192 changed files with 1,692 additions and 1,248 deletions.
462 changes: 417 additions & 45 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ h265 = { path = "video/lib/h265" }
mp4 = { path = "video/lib/mp4" }
rtmp = { path = "video/lib/rtmp" }
transmuxer = { path = "video/lib/transmuxer" }
utils = { path = "utils", default-features = false, package = "scuffle-utils" }
scuffle-utils = { path = "utils", default-features = false }
config = { path = "config", package = "scuffle-config" }
pb = { path = "proto" }
video-common = { path = "video/common" }
Expand All @@ -62,7 +62,7 @@ video-edge = { path = "video/edge" }
video-ingest = { path = "video/ingest" }
video-transcoder = { path = "video/transcoder" }
binary-helper = { path = "binary-helper" }
ffmpeg = { path = "ffmpeg" }
scuffle-ffmpeg = { path = "ffmpeg" }

# These patches are pending PRs to the upstream crates
# TODO: Remove these once the PRs are merged
Expand Down
2 changes: 1 addition & 1 deletion binary-helper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ postgres-from-row = { version = "0.5" }
prost = { version = "0.12" }

config = { workspace = true }
utils = { workspace = true, features = ["all"] }
scuffle-utils = { workspace = true, features = ["all"] }
pb = { workspace = true }
32 changes: 16 additions & 16 deletions binary-helper/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use fred::interfaces::ClientLike;
use fred::types::ServerConfig;
use hyper::StatusCode;
use rustls::RootCertStore;
use utils::database::deadpool_postgres::{ManagerConfig, PoolConfig, RecyclingMethod, Runtime};
use utils::database::tokio_postgres::NoTls;
use utils::database::Pool;
use utils::http::RouteError;
use scuffle_utils::database::deadpool_postgres::{ManagerConfig, PoolConfig, RecyclingMethod, Runtime};
use scuffle_utils::database::tokio_postgres::NoTls;
use scuffle_utils::database::Pool;
use scuffle_utils::http::RouteError;

use crate::config::{DatabaseConfig, NatsConfig, RedisConfig};

Expand Down Expand Up @@ -40,7 +40,7 @@ macro_rules! impl_global_traits {

impl binary_helper::global::GlobalDb for $struct {
#[inline(always)]
fn db(&self) -> &Arc<utils::database::Pool> {
fn db(&self) -> &Arc<scuffle_utils::database::Pool> {
&self.db
}
}
Expand All @@ -50,7 +50,7 @@ macro_rules! impl_global_traits {
}

pub trait GlobalCtx {
fn ctx(&self) -> &utils::context::Context;
fn ctx(&self) -> &scuffle_utils::context::Context;
}

pub trait GlobalConfig {
Expand Down Expand Up @@ -124,16 +124,16 @@ pub async fn setup_nats(
Ok((nats, jetstream))
}

pub async fn setup_database(config: &DatabaseConfig) -> anyhow::Result<Arc<utils::database::Pool>> {
pub async fn setup_database(config: &DatabaseConfig) -> anyhow::Result<Arc<scuffle_utils::database::Pool>> {
let mut pg_config = config
.uri
.parse::<utils::database::tokio_postgres::Config>()
.parse::<scuffle_utils::database::tokio_postgres::Config>()
.context("invalid database uri")?;

pg_config.ssl_mode(if config.tls.is_some() {
utils::database::tokio_postgres::config::SslMode::Require
scuffle_utils::database::tokio_postgres::config::SslMode::Require
} else {
utils::database::tokio_postgres::config::SslMode::Disable
scuffle_utils::database::tokio_postgres::config::SslMode::Disable
});

let manager = if let Some(tls) = &config.tls {
Expand Down Expand Up @@ -164,15 +164,15 @@ pub async fn setup_database(config: &DatabaseConfig) -> anyhow::Result<Arc<utils
.with_client_auth_cert(certs, key)
.context("failed to create redis tls config")?;

utils::database::deadpool_postgres::Manager::from_config(
scuffle_utils::database::deadpool_postgres::Manager::from_config(
pg_config,
tokio_postgres_rustls::MakeRustlsConnect::new(tls),
ManagerConfig {
recycling_method: RecyclingMethod::Fast,
},
)
} else {
utils::database::deadpool_postgres::Manager::from_config(
scuffle_utils::database::deadpool_postgres::Manager::from_config(
pg_config,
NoTls,
ManagerConfig {
Expand Down Expand Up @@ -241,12 +241,12 @@ pub async fn setup_redis(config: &RedisConfig) -> anyhow::Result<Arc<fred::clien
}

Some(fred::types::TlsConfig::from(fred::types::TlsConnector::Rustls(
tokio_rustls::TlsConnector::from(
Arc::new(tokio_rustls::rustls::ClientConfig::builder()
tokio_rustls::TlsConnector::from(Arc::new(
tokio_rustls::rustls::ClientConfig::builder()
.with_root_certificates(cert_store)
.with_client_auth_cert(certs, key)
.context("failed to create redis tls config")?)
),
.context("failed to create redis tls config")?,
)),
)))
} else {
None
Expand Down
4 changes: 2 additions & 2 deletions binary-helper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::Context as _;
use scuffle_utils::context::Context;
use scuffle_utils::signal;
use tokio::signal::unix::SignalKind;
use tokio::{select, time};
use tonic::transport::{Certificate, Identity, Server, ServerTlsConfig};
pub use traits::{Config, Global};
use utils::context::Context;
use utils::signal;

use self::config::GrpcConfig;

Expand Down
2 changes: 1 addition & 1 deletion binary-helper/src/traits.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use utils::context::Context;
use scuffle_utils::context::Context;

pub trait Config {
fn parse() -> anyhow::Result<Self>
Expand Down
2 changes: 1 addition & 1 deletion config/src/sources/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,6 @@ impl<C: Config> CliSource<C> {

impl<C: Config> Source<C> for CliSource<C> {
fn get_key(&self, path: &KeyPath) -> Result<Option<Value>> {
utils::get_key::<C>(&self.value, path).map_err(|e| e.with_source(ErrorSource::Cli))
scuffle_utilsget_key::<C>(&self.value, path).map_err(|e| e.with_source(ErrorSource::Cli))
}
}
2 changes: 1 addition & 1 deletion config/src/sources/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,6 @@ fn extract_keys(

impl<C: Config> Source<C> for EnvSource<C> {
fn get_key(&self, path: &KeyPath) -> Result<Option<Value>> {
utils::get_key::<C>(&self.value, path).map_err(|e| e.with_source(ErrorSource::Env))
scuffle_utilsget_key::<C>(&self.value, path).map_err(|e| e.with_source(ErrorSource::Env))
}
}
2 changes: 1 addition & 1 deletion config/src/sources/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,6 @@ impl<C: Config> FileSource<C> {

impl<C: Config> Source<C> for FileSource<C> {
fn get_key(&self, path: &KeyPath) -> Result<Option<Value>> {
utils::get_key::<C>(&self.content, path).map_err(|e| e.with_source(ErrorSource::File(self.location.clone())))
scuffle_utilsget_key::<C>(&self.content, path).map_err(|e| e.with_source(ErrorSource::File(self.location.clone())))
}
}
2 changes: 1 addition & 1 deletion config/src/sources/manual.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl<C: Config> ManualSource<C> {
impl<C: Config> Source<C> for ManualSource<C> {
fn get_key(&self, path: &crate::KeyPath) -> crate::Result<Option<Value>> {
match &self.value {
Some(value) => utils::get_key::<C>(value, path).map_err(|e| e.with_source(ErrorSource::Manual)),
Some(value) => scuffle_utilsget_key::<C>(value, path).map_err(|e| e.with_source(ErrorSource::Manual)),
None => Ok(None),
}
}
Expand Down
6 changes: 3 additions & 3 deletions ffmpeg/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "ffmpeg"
name = "scuffle-ffmpeg"
version = "0.1.0"
edition = "2021"
license = "MIT OR Apache-2.0"
Expand All @@ -11,11 +11,11 @@ bytes = { optional = true, version = "1" }
tokio = { optional = true, version = "1" }
crossbeam-channel = { optional = true, version = "0.5" }
tracing = { optional = true, version = "0.1" }
utils = { workspace = true, optional = true }
scuffle-utils = { path = "../utils", version = "*", optional = true, features = ["task"]}

[features]
default = []
task-abort = ["dep:utils"]
task-abort = ["dep:scuffle-utils"]
channel = ["dep:bytes"]
tokio-channel = ["channel", "dep:tokio"]
crossbeam-channel = ["channel", "dep:crossbeam-channel"]
Expand Down
6 changes: 3 additions & 3 deletions ffmpeg/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl GenericDecoder {

pub fn send_packet(&mut self, packet: &Packet) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _guard = utils::task::AbortGuard::new();
let _guard = scuffle_utils::task::AbortGuard::new();

// Safety: `packet` is a valid pointer, and `self.decoder` is a valid pointer.
let ret = unsafe { avcodec_send_packet(self.decoder.as_mut_ptr(), packet.as_ptr()) };
Expand All @@ -166,7 +166,7 @@ impl GenericDecoder {

pub fn send_eof(&mut self) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _guard = utils::task::AbortGuard::new();
let _guard = scuffle_utils::task::AbortGuard::new();

// Safety: `self.decoder` is a valid pointer.
let ret = unsafe { avcodec_send_packet(self.decoder.as_mut_ptr(), std::ptr::null()) };
Expand All @@ -179,7 +179,7 @@ impl GenericDecoder {

pub fn receive_frame(&mut self) -> Result<Option<VideoFrame>, FfmpegError> {
#[cfg(feature = "task-abort")]
let _guard = utils::task::AbortGuard::new();
let _guard = scuffle_utils::task::AbortGuard::new();

let mut frame = Frame::new()?;

Expand Down
10 changes: 5 additions & 5 deletions ffmpeg/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ impl Encoder {
settings: impl Into<EncoderSettings>,
) -> Result<Self, FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();
let _abort_guard = scuffle_utils::task::AbortGuard::new();

if codec.as_ptr().is_null() {
return Err(FfmpegError::NoEncoder);
Expand Down Expand Up @@ -490,7 +490,7 @@ impl Encoder {

pub fn send_eof(&mut self) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();
let _abort_guard = scuffle_utils::task::AbortGuard::new();

// Safety: `self.encoder` is a valid pointer.
let ret = unsafe { avcodec_send_frame(self.encoder.as_mut_ptr(), std::ptr::null()) };
Expand All @@ -503,7 +503,7 @@ impl Encoder {

pub fn send_frame(&mut self, frame: &Frame) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();
let _abort_guard = scuffle_utils::task::AbortGuard::new();

// Safety: `self.encoder` and `frame` are valid pointers.
let ret = unsafe { avcodec_send_frame(self.encoder.as_mut_ptr(), frame.as_ptr()) };
Expand All @@ -516,7 +516,7 @@ impl Encoder {

pub fn receive_packet(&mut self) -> Result<Option<Packet>, FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();
let _abort_guard = scuffle_utils::task::AbortGuard::new();

let mut packet = Packet::new()?;

Expand Down Expand Up @@ -632,7 +632,7 @@ impl<T: Send + Sync> MuxerEncoder<T> {

pub fn send_eof(&mut self) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();
let _abort_guard = scuffle_utils::task::AbortGuard::new();

self.encoder.send_eof()?;
self.handle_packets()?;
Expand Down
12 changes: 6 additions & 6 deletions ffmpeg/src/filter_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ unsafe impl Send for FilterGraph {}
impl FilterGraph {
pub fn new() -> Result<Self, FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();
let _abort_guard = scuffle_utils::task::AbortGuard::new();

// Safety: the pointer returned from avfilter_graph_alloc is valid
unsafe { Self::wrap(avfilter_graph_alloc()) }
Expand All @@ -24,7 +24,7 @@ impl FilterGraph {
/// Safety: `ptr` must be a valid pointer to an `AVFilterGraph`.
unsafe fn wrap(ptr: *mut AVFilterGraph) -> Result<Self, FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();
let _abort_guard = scuffle_utils::task::AbortGuard::new();

Ok(Self(
SmartPtr::wrap_non_null(ptr, |ptr| unsafe { avfilter_graph_free(ptr) }).ok_or(FfmpegError::Alloc)?,
Expand All @@ -41,7 +41,7 @@ impl FilterGraph {

pub fn add(&mut self, filter: Filter, name: &str, args: &str) -> Result<FilterContext<'_>, FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();
let _abort_guard = scuffle_utils::task::AbortGuard::new();

let name = CString::new(name).expect("failed to convert name to CString");
let args = CString::new(args).expect("failed to convert args to CString");
Expand Down Expand Up @@ -239,7 +239,7 @@ unsafe impl Send for FilterContextSource<'_> {}
impl FilterContextSource<'_> {
pub fn send_frame(&mut self, frame: &Frame) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();
let _abort_guard = scuffle_utils::task::AbortGuard::new();

// Safety: `frame` is a valid pointer, and `self.0` is a valid pointer.
unsafe {
Expand All @@ -252,7 +252,7 @@ impl FilterContextSource<'_> {

pub fn send_eof(&mut self, pts: Option<i64>) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();
let _abort_guard = scuffle_utils::task::AbortGuard::new();

// Safety: `self.0` is a valid pointer.
unsafe {
Expand All @@ -276,7 +276,7 @@ unsafe impl Send for FilterContextSink<'_> {}
impl FilterContextSink<'_> {
pub fn receive_frame(&mut self) -> Result<Option<Frame>, FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();
let _abort_guard = scuffle_utils::task::AbortGuard::new();

let mut frame = Frame::new()?;

Expand Down
4 changes: 2 additions & 2 deletions ffmpeg/src/io/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl Default for InnerOptions {
impl<T: Send + Sync> Inner<T> {
pub fn new(data: T, options: InnerOptions) -> Result<Self, FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();
let _abort_guard = scuffle_utils::task::AbortGuard::new();

// Safety: av_malloc is safe to call
let buffer = unsafe {
Expand Down Expand Up @@ -228,7 +228,7 @@ impl Inner<()> {

pub fn open_output(path: &str) -> Result<Self, FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();
let _abort_guard = scuffle_utils::task::AbortGuard::new();

let path = std::ffi::CString::new(path).expect("Failed to convert path to CString");

Expand Down
14 changes: 7 additions & 7 deletions ffmpeg/src/io/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl<T: Send + Sync> Output<T> {

pub fn add_stream(&mut self, codec: Option<*const AVCodec>) -> Option<Stream<'_>> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();
let _abort_guard = scuffle_utils::task::AbortGuard::new();

// Safety: `avformat_new_stream` is safe to call.
let stream = unsafe { avformat_new_stream(self.as_mut_ptr(), codec.unwrap_or_else(std::ptr::null)) };
Expand All @@ -168,7 +168,7 @@ impl<T: Send + Sync> Output<T> {

pub fn copy_stream<'a>(&'a mut self, stream: &Stream<'_>) -> Option<Stream<'a>> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();
let _abort_guard = scuffle_utils::task::AbortGuard::new();

let codec_param = stream.codec_parameters()?;

Expand Down Expand Up @@ -196,7 +196,7 @@ impl<T: Send + Sync> Output<T> {

pub fn write_header(&mut self) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();
let _abort_guard = scuffle_utils::task::AbortGuard::new();

if self.witten_header {
return Err(FfmpegError::Arguments("header already written"));
Expand All @@ -217,7 +217,7 @@ impl<T: Send + Sync> Output<T> {

pub fn write_header_with_options(&mut self, options: &mut Dictionary) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();
let _abort_guard = scuffle_utils::task::AbortGuard::new();

if self.witten_header {
return Err(FfmpegError::Arguments("header already written"));
Expand All @@ -238,7 +238,7 @@ impl<T: Send + Sync> Output<T> {

pub fn write_trailer(&mut self) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();
let _abort_guard = scuffle_utils::task::AbortGuard::new();

if !self.witten_header {
return Err(FfmpegError::Arguments("header not written"));
Expand All @@ -255,7 +255,7 @@ impl<T: Send + Sync> Output<T> {

pub fn write_interleaved_packet(&mut self, mut packet: Packet) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();
let _abort_guard = scuffle_utils::task::AbortGuard::new();

if !self.witten_header {
return Err(FfmpegError::Arguments("header not written"));
Expand All @@ -273,7 +273,7 @@ impl<T: Send + Sync> Output<T> {

pub fn write_packet(&mut self, packet: &Packet) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();
let _abort_guard = scuffle_utils::task::AbortGuard::new();

if !self.witten_header {
return Err(FfmpegError::Arguments("header not written"));
Expand Down
Loading

0 comments on commit fc48d0e

Please sign in to comment.