diff --git a/foundations/src/telemetry/settings.rs b/foundations/src/telemetry/settings.rs index 1f10a0a7..9ccabe08 100644 --- a/foundations/src/telemetry/settings.rs +++ b/foundations/src/telemetry/settings.rs @@ -7,7 +7,6 @@ use std::net::SocketAddr; use opentelemetry_otlp::WithExportConfig; #[cfg(feature = "opentelemetry")] use opentelemetry_sdk::Resource; - #[cfg(feature = "logging")] use tracing_subscriber::fmt::time::{ChronoLocal, ChronoUtc}; diff --git a/image-processor/proto/build.rs b/image-processor/proto/build.rs index 57f757ad..8143c334 100644 --- a/image-processor/proto/build.rs +++ b/image-processor/proto/build.rs @@ -27,12 +27,7 @@ fn main() -> Result<(), Box> { #[cfg(feature = "serde")] pbjson_build::Builder::new() .register_descriptors(&descriptor_set)? - .build( - &[ - ".scuffle.image_processor", - ], - )?; - + .build(&[".scuffle.image_processor"])?; Ok(()) } diff --git a/image-processor/proto/scuffle/image_processor/service.proto b/image-processor/proto/scuffle/image_processor/service.proto index 354acd40..03a532ae 100644 --- a/image-processor/proto/scuffle/image_processor/service.proto +++ b/image-processor/proto/scuffle/image_processor/service.proto @@ -8,6 +8,7 @@ import "scuffle/image_processor/types.proto"; service ImageProcessor { // Submit a task to process an image rpc ProcessImage(ProcessImageRequest) returns (ProcessImageResponse) {} + // Cancel a task rpc CancelTask(CancelTaskRequest) returns (CancelTaskResponse) {} } diff --git a/image-processor/proto/scuffle/image_processor/types.proto b/image-processor/proto/scuffle/image_processor/types.proto index bfa8528d..499a20ee 100644 --- a/image-processor/proto/scuffle/image_processor/types.proto +++ b/image-processor/proto/scuffle/image_processor/types.proto @@ -2,32 +2,30 @@ syntax = "proto3"; package scuffle.image_processor; -// When submitting a task these formats are used to determine what the image processor should do. -// If the image processor is unable to generate a requested format it will not hard fail unless the task is set to hard fail. -// Otherwise it will generate as many formats as it can and return the results with any errors in the response. -enum ImageFormat { - WEBP_ANIM = 0; - AVIF_ANIM = 1; - GIF_ANIM = 2; - WEBP_STATIC = 3; - AVIF_STATIC = 4; - PNG_STATIC = 5; +// The output format type +enum OutputFormat { + // Animated WebP format + WebpAnim = 0; + // Animated AVIF format. + AvifAnim = 1; + // Animated GIF format. + GifAnim = 2; + // Static WebP format. + WebpStatic = 3; + // Static AVIF format. + AvifStatic = 4; + // Static PNG format. + PngStatic = 5; } +// DrivePath is used to determine where the image should be stored. message DrivePath { + // The drive to locate the image. string drive = 1; + // The path in the drive. string path = 2; } -message InputPath { - oneof input_path { - // Drive path to the image. - DrivePath drive_path = 1; - // Public URL to the image. - string public_url = 2; - } -} - // The resize method determines how the image processor should resize the image. enum ResizeMethod { // Fit will resize the image to fit within the desired dimensions without changing the aspect ratio. @@ -87,13 +85,6 @@ message Limits { optional uint32 max_input_duration_ms = 5; } -message Ratio { - // The width of the ratio. - uint32 width = 1; - // The height of the ratio. - uint32 height = 2; -} - // Crop is used to determine what part of the image the image processor should crop. // The processor will crop the image before resizing it. message Crop { @@ -107,13 +98,6 @@ message Crop { uint32 height = 4; } -// Upscale is used to determine if the image processor should upscale the image. -enum Upscale { - Yes = 0; - No = 1; - NoPreserveSource = 2; -} - // Provide extra information about the input to the image processor. message InputMetadata { // If the input is not animated, this will generate a fatal error. If there are not enough frames this will generate a fatal error. @@ -122,93 +106,207 @@ message InputMetadata { // If this is different from the actual frame count the image processor will generate a fatal error. optional uint32 frame_count = 2; // If this is different from the actual width the image processor will generate a fatal error. - optional uint32 width = 3; + uint32 width = 3; // If this is different from the actual height the image processor will generate a fatal error. - optional uint32 height = 4; + uint32 height = 4; } -// Scale is used to determine what the output image size should be. -message Scale { - // The width of the output image. (in pixels, use -1 to keep the aspect ratio) - int32 width = 1; - // The height of the output image. (in pixels, use -1 to keep the aspect ratio) - int32 height = 2; - // Name of the scale. ALlows for template arguments to be passed in. - // For example if the name is "thumbnail_{width}x{height}" and the width is 100 and the height is 200 the name will be "thumbnail_100x200". - // If not set will be "{width}x{height}" - // If multiple scales have the same name the processor will generate a fatal error. - optional string name = 3; - - // Allow upscale for this scale. - // If NoPreserveSource is set and this scale is larger than the input image we will just use the source dimensions. - // If Yes, we will upscale the image. - // If No, we will ignore this scale. - Upscale upscale = 4; +// Confine the aspect ratio of the image to a specific range. +// For example if you want to allow all images that are 3:1 to 1:3 you would set min_ratio to 1/3 and max_ratio to 3. +// Setting the min and max to the same value will restrict the aspect ratio to that value. +// Setting both values to 0 will use the input image's aspect ratio. +// Setting one of the values to 0 will allow any aspect ratio that is less than or greater than the other value. +message AspectRatio { + // The minimum ratio of the image. + // Set to 0 to have no minimum ratio. + // An aspect ratio is the ratio of the width to the height of the image. + double min_ratio = 1; + // The maximum ratio of the image. + // Set to 0 to have no maximum ratio. + // An aspect ratio is the ratio of the width to the height of the image. + double max_ratio = 2; } +// InputUpload is used to upload an image to a drive configured in the image processor config. message InputUpload { // The input image as a binary blob. bytes binary = 1; - // The path to save the image to. + // A prefix to use for the folder the image will be stored in. DrivePath path = 2; } +// Input is used to determine the input image to process. message Input { // The path to the input image. - InputPath path = 1; + oneof path { + // Drive path to the image. + // The image processor will download the image from the drive. + DrivePath drive_path = 1; + // Public URL to the image. + // If public downloads is disabled this will generate a fatal error. + string public_url = 2; + } + // Extra information about the input image. - optional InputMetadata metadata = 2; + optional InputMetadata metadata = 3; +} + +// How to handle scales that are larger than the input image. +enum Upscale { + // Do not upscale the image. + UpscaleNo = 0; + // Upscale the image to the desired dimensions. + UpscaleYes = 1; + // Do not upscale the image, however if not all variants are generated due to the image being too small + // generate a single last variant that has the input image dimensions. + UpscaleNoPreserveSource = 2; } -message OutputFormat { - message Webp { - bool static = 1; +// Scaling is used to specify a linear scaling factor for the various dimensions of the image. +// For example, if you have an image that is 100x100 (and use this as the base) and you want to generate 1x, 2x, and 3x images you would set the scales to [1, 2, 3]. +// The sizes of the output images would be [100x100, 200x200, 300x300]. +message Scaling { + oneof base { + // This is the scale for the input image (after cropping or aspect ratio adjustments are made). + uint32 fixed = 1; + // This is used to automatically determine the scale of the input image based on the width. + // We know what aspect ratio to use based on the aspect ratio adjustments made to the input image. + // We can then use that to determine the (input_width / base_width) scale. + // The scale would be the largest integer that is less than or equal to (input_width / base_width), + // or 1 if the input width is less than base_width. + uint32 base_width = 2; + // Functionally the same as base_width but allows you to specify in terms of height instead. + uint32 base_height = 3; } - message Avif { - bool static = 1; + + // The various scales. + // For example to generate a 1x, 2x, and 3x image you would set scales to [1, 2, 3]. + IntegerList scales = 4; +} + +// A list of integers. +message IntegerList { + repeated uint32 values = 1; +} + +message AnimationConfig { + // Specify an animation loop count for animated images. + // If this is set to -1 the image will loop indefinitely. + // If this is set to 0 the image will not loop. + // If this is set to a positive number the image will loop that many times. + // If this is unset the image will be encoded with the loop value the input image has. + optional int32 loop_count = 1; + + oneof frame_rate { + // Specify the frame duration for every frame in the output image. + // This can be used to specify a constant frame rate for the output image. + // frame_rate = 1000 / frame_duration_ms + uint32 duration_ms = 2; + // Frame durations for each frame in the output image. + // Specify the frame duration for each frame in the output image. + // If this number does not match the number of frames in the output image the processor will generate a fatal error. + IntegerList durations_ms = 3; + // Factor to multiply the frame duration by. + // This can be used to speed up or slow down the animation. + // The frame duration will be multiplied by this value. + // Each frame has a minimum duration of 1ms, if the factor creates some frames that are less than 1ms the processor will, + // drop frames and adjust timings of others to ensure that the total duration of the animation is correctly multiplied. + // This rule only applies for when the factor is greater than 1. + double factor = 4; } - message Gif {} - message Png {} - - // The name is used in the template argument for the output path. - // By default the name is the same as the format. - // Webp (static) -> webp_static - // Webp (animated) -> webp_animated - // Avif (static) -> avif_static - // Avif (animated) -> avif_animated - // Gif -> gif - // Png -> png - string name = 1; - oneof format { - Webp webp = 2; - Avif avif = 3; - Gif gif = 4; - Png png = 5; + // Remove frames idx's from the input image. + // This can be used to reduce the size of the output image. + // If you specify an index that is out of bounds the processor will generate a fatal error. + // If you specify the same index multiple times the processor will ignore the duplicates. + repeated uint32 remove_frame_idxs = 5; +} + +enum OutputQuality { + // Auto quality output. (default) + Auto = 0; + // High quality output. (large file size) + High = 1; + // Medium quality output. (medium file size) + Medium = 2; + // Low quality output. (smaller file size) + Low = 3; + // Lossless output. (very large file size) + Lossless = 4; +} + +message OutputFormatOptions { + // The format of the output image. + OutputFormat format = 1; + // The quality of the output image. + OutputQuality quality = 2; +} + +message OutputVariants { + // A suffix that will be appended to the output.drive_path.path to create the output path. + // This suffix is special as it allows for template arguments. + // Possible template argument values are: + // - {id} - The id of the task. + // - {format} - The format of the output image. (e.g 'webp_anim', 'avif_static', 'png_static', etc.) + // - {scale} - The scale of the output image. (if scaling is used, otherwise empty) + // - {width} - The resulting width of the output image. + // - {height} - The resulting height of the output image. + // - {format_idx} - The index of the output format in the list. + // - {resize_idx} - The index of the resize operation, if the operation is width or height its the index of the value in the list. + // If its scaling its the index of the scale in the list. + // - {static} - '_static' if the input image is static, otherwise empty. + // - {ext} - The extension of the output image. (e.g. 'webp', 'avif', etc.) + string suffix = 1; + + // The desired format to encode the output image. + repeated OutputFormatOptions formats = 2; + + // Allow upscaling if the determined dimensions are larger than the input image. + Upscale upscale = 3; + + // Sometimes we might specify that we want 'WebpAnim' but the input image is a static image. + // In this case we would typically fatally error because we can't generate an animated image from a static image. + // However if this is set to true the processor will ignore these errors and skip the format. + bool skip_impossible_formats = 4; + + // Skips resizing if the resize operation is impossible. + // For example if the resize results in a width or height of less than 1. + // If this is set to true the processor will ignore these errors and skip the resize operation. + bool skip_impossible_resizes = 5; + + // There must be at least one element in the list. + oneof resize { + // Resize to a specific width, the height will be determined by the aspect ratio. + IntegerList width = 6; + // Resize to a specific height, the width will be determined by the aspect ratio. + IntegerList height = 7; + // A scaling config to determine how each dimension should be scaled. + Scaling scaling = 8; } } message Output { - DrivePath path = 1; + // The drive path to store the output image. + // This is a prefix and the processor will append the suffix to this path to determine the final path. + // The suffix is defined in the OutputVariantsConfig. + DrivePath drive_path = 1; + // The desired formats to encode the output image. - repeated OutputFormat formats = 2; + OutputVariants variants = 2; // The resize method used to resize the image. ResizeMethod resize_method = 3; // The resize algorithm used to resize the image. ResizeAlgorithm resize_algorithm = 4; - // The crop used to crop the image before resizing. If the crop settings are not possible the processor will generate a fatal error. - optional Crop crop = 5; - // The minimum and maximum ratios for the scaled image. Used to prevent upscaling too much on wide or tall images. - // If the image does not fit into the min and max ratios the processor will generate a fatal error. If unset the processor will not check the ratios. - // These checks are done after the crop. If the resize method allows for padding or stretching we will use the padded or stretched dimentions to perform the check. - // If scales are provided that are not within the min and max ratios the processor will generate a fatal error. - optional Ratio min_ratio = 6; - optional Ratio max_ratio = 7; - // The target ratio for the scale image, if unset the processor will use the input ratio (after crop but before resize). - // The min and max ratios will be used to detemine if padding or stretching is needed to reach the target ratio. - optional Ratio target_ratio = 8; - // The desired scales of the output image. - repeated Scale scales = 9; + + // The animation configuration for the output image. + optional AnimationConfig animation_config = 5; + + // A crop is applied to the image before resizing and before an aspect ratio change.s + optional Crop crop = 6; + + // Choose one of the following options to determine the aspect ratio of the image. + // An aspect ratio is the ratio of the width to the height of the image. + AspectRatio aspect_ratio = 7; } // Events must be in the format @@ -218,29 +316,37 @@ message Events { // The event to trigger when the task is completed successfully optional EventQueue on_success = 1; // The event to trigger when the task fails - optional EventQueue on_fail = 2; + optional EventQueue on_failure = 2; // The event to trigger when the task is cancelled optional EventQueue on_cancel = 3; // The event to trigger when the task is started optional EventQueue on_start = 4; + + // Metadata to send with the event. + map metadata = 5; } +// EventQueue is used to determine where the image processor should send events. message EventQueue { + // The name of the event queue. string name = 1; + // The topic of the event queue. string topic = 2; } +// A task to process an image. message Task { // The input image to process. Input input = 1; // The output image to generate. - Output output = 2; + Output output = 2; // Result output Events events = 3; // The limits for the image processor. optional Limits limits = 4; } +// A error message. message Error { // The error message. string message = 1; @@ -248,10 +354,12 @@ message Error { ErrorCode code = 2; } +// ErrorCode is used to determine the type of error that occurred. enum ErrorCode { - Unknown = 0; + // Internal error occurred, please file a bug report. + InternalError = 0; + // Invalid input error. Please refer to the error message for more information, and resubmit the task with valid input. InvalidInput = 1; - InternalError = 2; } message EventPayload { diff --git a/image-processor/src/config.rs b/image-processor/src/config.rs index 08bd014d..768ce588 100644 --- a/image-processor/src/config.rs +++ b/image-processor/src/config.rs @@ -119,10 +119,10 @@ pub struct S3DriveConfig { pub endpoint: Option, /// The S3 bucket prefix path #[serde(default)] - pub path: Option, + pub prefix_path: Option, /// Use path style #[serde(default)] - pub path_style: bool, + pub force_path_style: Option, /// The drive mode #[serde(default)] pub mode: DriveMode, diff --git a/image-processor/src/disk/s3.rs b/image-processor/src/disk/s3.rs index aaa9987d..810fb6eb 100644 --- a/image-processor/src/disk/s3.rs +++ b/image-processor/src/disk/s3.rs @@ -1,4 +1,4 @@ -use aws_config::{AppName, Region, SdkConfig}; +use aws_config::{AppName, Region}; use aws_sdk_s3::config::{Credentials, SharedCredentialsProvider}; use aws_sdk_s3::operation::delete_object::DeleteObjectError; use aws_sdk_s3::operation::get_object::GetObjectError; @@ -17,6 +17,8 @@ pub struct S3Drive { mode: DriveMode, client: aws_sdk_s3::Client, bucket: String, + path_prefix: Option, + semaphore: Option, } #[derive(Debug, thiserror::Error)] @@ -38,15 +40,19 @@ impl S3Drive { pub async fn new(config: &S3DriveConfig) -> Result { tracing::debug!("setting up s3 disk"); Ok(Self { - name: config.name.clone(), + name: config.bucket.clone(), mode: config.mode, - client: aws_sdk_s3::Client::new(&{ - let mut builder = SdkConfig::builder(); + client: aws_sdk_s3::Client::from_conf({ + let mut builder = aws_sdk_s3::Config::builder(); + + builder.set_endpoint_url(config.endpoint.clone()); builder.set_app_name(Some(AppName::new(service_info!().name).unwrap())); builder.set_region(Some(Region::new(config.region.clone()))); + builder.set_force_path_style(config.force_path_style); + builder.set_credentials_provider(Some(SharedCredentialsProvider::new(Credentials::new( config.access_key.clone(), config.secret_key.clone(), @@ -57,7 +63,9 @@ impl S3Drive { builder.build() }), + path_prefix: config.prefix_path.clone(), bucket: config.bucket.clone(), + semaphore: config.max_connections.map(tokio::sync::Semaphore::new), }) } } @@ -73,11 +81,22 @@ impl Drive for S3Drive { return Err(DriveError::ReadOnly); } + let _permit = if let Some(semaphore) = &self.semaphore { + Some(semaphore.acquire().await) + } else { + None + }; + + let path = self + .path_prefix + .as_ref() + .map_or_else(|| path.to_string(), |prefix| format!("{}/{}", prefix, path)); + let result = self .client .get_object() .bucket(&self.bucket) - .key(path) + .key(path.trim_start_matches('/')) .send() .await .map_err(S3DriveError::from)?; @@ -93,7 +112,23 @@ impl Drive for S3Drive { return Err(DriveError::WriteOnly); } - let mut req = self.client.put_object().bucket(&self.bucket).key(path).body(data.into()); + let _permit = if let Some(semaphore) = &self.semaphore { + Some(semaphore.acquire().await) + } else { + None + }; + + let path = self + .path_prefix + .as_ref() + .map_or_else(|| path.to_string(), |prefix| format!("{}/{}", prefix, path)); + + let mut req = self + .client + .put_object() + .bucket(&self.bucket) + .key(path.trim_start_matches('/')) + .body(data.into()); if let Some(options) = options { if let Some(cache_control) = &options.cache_control { @@ -121,10 +156,21 @@ impl Drive for S3Drive { return Err(DriveError::WriteOnly); } + let _permit = if let Some(semaphore) = &self.semaphore { + Some(semaphore.acquire().await) + } else { + None + }; + + let path = self + .path_prefix + .as_ref() + .map_or_else(|| path.to_string(), |prefix| format!("{}/{}", prefix, path)); + self.client .delete_object() .bucket(&self.bucket) - .key(path) + .key(path.trim_start_matches('/')) .send() .await .map_err(S3DriveError::from)?; diff --git a/image-processor/src/global.rs b/image-processor/src/global.rs index 1ade9418..b707d728 100644 --- a/image-processor/src/global.rs +++ b/image-processor/src/global.rs @@ -129,7 +129,7 @@ impl HealthCheck for Global { return false; } - for disk in self.drive().values() { + for disk in self.drives().values() { if !disk.healthy().await { tracing::error!(name = %disk.name(), "disk check failed"); return false; diff --git a/image-processor/src/main.rs b/image-processor/src/main.rs index fa4c738b..fbca385e 100644 --- a/image-processor/src/main.rs +++ b/image-processor/src/main.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use anyhow::Context; use scuffle_foundations::bootstrap::{bootstrap, Bootstrap}; -use scuffle_foundations::settings::cli::Matches; use scuffle_foundations::runtime; +use scuffle_foundations::settings::cli::Matches; use scuffle_image_processor_proto::{event_callback, EventCallback}; use tokio::signal::unix::SignalKind; @@ -97,6 +97,5 @@ async fn main(cfg: Matches) { } } - std::process::exit(0); } diff --git a/image-processor/src/management/grpc.rs b/image-processor/src/management/grpc.rs index 5b522dc4..7a83ed83 100644 --- a/image-processor/src/management/grpc.rs +++ b/image-processor/src/management/grpc.rs @@ -17,10 +17,7 @@ impl ManagementServer { #[async_trait::async_trait] impl scuffle_image_processor_proto::image_processor_server::ImageProcessor for ManagementServer { - async fn process_image( - &self, - request: Request, - ) -> tonic::Result> { + async fn process_image(&self, request: Request) -> tonic::Result> { let resp = match self.process_image(request.into_inner()).await { Ok(resp) => resp, Err(err) => ProcessImageResponse { @@ -32,15 +29,10 @@ impl scuffle_image_processor_proto::image_processor_server::ImageProcessor for M Ok(Response::new(resp)) } - async fn cancel_task( - &self, - request: Request, - ) -> tonic::Result> { + async fn cancel_task(&self, request: Request) -> tonic::Result> { let resp = match self.cancel_task(request.into_inner()).await { Ok(resp) => resp, - Err(err) => CancelTaskResponse { - error: Some(err), - }, + Err(err) => CancelTaskResponse { error: Some(err) }, }; Ok(Response::new(resp)) diff --git a/image-processor/src/management/http.rs b/image-processor/src/management/http.rs index 298a590a..54cc2b7e 100644 --- a/image-processor/src/management/http.rs +++ b/image-processor/src/management/http.rs @@ -1,59 +1,66 @@ -use scuffle_foundations::http::server::axum::{extract::State, Json, Router, routing::post}; -use scuffle_image_processor_proto::{CancelTaskRequest, CancelTaskResponse, ErrorCode, ProcessImageRequest, ProcessImageResponse}; +use scuffle_foundations::http::server::axum::extract::State; +use scuffle_foundations::http::server::axum::routing::post; +use scuffle_foundations::http::server::axum::{Json, Router}; +use scuffle_image_processor_proto::{ + CancelTaskRequest, CancelTaskResponse, ErrorCode, ProcessImageRequest, ProcessImageResponse, +}; use super::ManagementServer; impl ManagementServer { pub async fn run_http(&self) -> Result<(), scuffle_foundations::http::server::Error> { - let router = Router::new() - .route("/process_image", post(process_image)) - .route("/cancel_task", post(cancel_task)) - .with_state(self.clone()); - - let addr = self.global.config().management.http.bind; - scuffle_foundations::http::server::Server::builder() - .bind(addr) - .build(router)? - .start_and_wait() - .await - } + let router = Router::new() + .route("/process_image", post(process_image)) + .route("/cancel_task", post(cancel_task)) + .with_state(self.clone()); + + let addr = self.global.config().management.http.bind; + scuffle_foundations::http::server::Server::builder() + .bind(addr) + .build(router)? + .start_and_wait() + .await + } } async fn process_image( - State(server): State, - Json(request): Json, + State(server): State, + Json(request): Json, ) -> (http::StatusCode, Json) { - let resp = match server.process_image(request).await { - Ok(resp) => resp, - Err(err) => ProcessImageResponse { - id: "".to_owned(), - error: Some(err), - } - }; - - let status = resp.error.as_ref().map_or(http::StatusCode::OK, |err| map_error_code(err.code())); - (status, Json(resp)) + let resp = match server.process_image(request).await { + Ok(resp) => resp, + Err(err) => ProcessImageResponse { + id: "".to_owned(), + error: Some(err), + }, + }; + + let status = resp + .error + .as_ref() + .map_or(http::StatusCode::OK, |err| map_error_code(err.code())); + (status, Json(resp)) } async fn cancel_task( - State(server): State, - Json(request): Json, + State(server): State, + Json(request): Json, ) -> (http::StatusCode, Json) { - let resp = match server.cancel_task(request).await { - Ok(resp) => resp, - Err(err) => CancelTaskResponse { - error: Some(err), - } - }; - - let status = resp.error.as_ref().map_or(http::StatusCode::OK, |err| map_error_code(err.code())); - (status, Json(resp)) + let resp = match server.cancel_task(request).await { + Ok(resp) => resp, + Err(err) => CancelTaskResponse { error: Some(err) }, + }; + + let status = resp + .error + .as_ref() + .map_or(http::StatusCode::OK, |err| map_error_code(err.code())); + (status, Json(resp)) } fn map_error_code(code: ErrorCode) -> http::StatusCode { - match code { - ErrorCode::InvalidInput => http::StatusCode::BAD_REQUEST, - ErrorCode::InternalError => http::StatusCode::INTERNAL_SERVER_ERROR, - ErrorCode::Unknown => http::StatusCode::INTERNAL_SERVER_ERROR, - } -} \ No newline at end of file + match code { + ErrorCode::InvalidInput => http::StatusCode::BAD_REQUEST, + ErrorCode::InternalError => http::StatusCode::INTERNAL_SERVER_ERROR, + } +} diff --git a/image-processor/src/management/mod.rs b/image-processor/src/management/mod.rs index 897c0f70..702f6659 100644 --- a/image-processor/src/management/mod.rs +++ b/image-processor/src/management/mod.rs @@ -1,10 +1,13 @@ use std::sync::Arc; use anyhow::Context; -use scuffle_image_processor_proto::{CancelTaskRequest, CancelTaskResponse, Error, ErrorCode, ProcessImageRequest, ProcessImageResponse, input_path::InputPath}; +use scuffle_image_processor_proto::{ + CancelTaskRequest, CancelTaskResponse, Error, ErrorCode, ProcessImageRequest, ProcessImageResponse, +}; use url::Url; use crate::global::Global; +use crate::management::validation::{validate_task, Fragment, FragmentBuf}; pub mod grpc; pub mod http; @@ -13,96 +16,45 @@ mod validation; #[derive(Clone)] struct ManagementServer { - global: Arc, + global: Arc, } impl ManagementServer { - async fn process_image( - &self, - request: ProcessImageRequest, - ) -> Result { - let Some(task) = request.task.as_ref() else { - return Err(Error { - code: ErrorCode::InvalidInput as i32, - message: "task is required".to_string(), - }); - }; + async fn process_image(&self, request: ProcessImageRequest) -> Result { + let mut fragment = FragmentBuf::new(); - let Some(input) = &task.input else { - return Err(Error { - code: ErrorCode::InvalidInput as i32, - message: "input is required".to_string(), - }); - }; + validate_task(&self.global, fragment.push("task"), request.task.as_ref())?; - let Some(input_path) = input.path.as_ref().and_then(|path| path.input_path.as_ref()) else { - return Err(Error { - code: ErrorCode::InvalidInput as i32, - message: "task.input.path is required".to_string(), - }); - }; + // We need to do validation here. + if let Some(input_upload) = request.input_upload.as_ref() {} - + todo!() + } - if let Some(events) = &task.events { - let queues = [ - (&events.on_success, "task.events.on_success"), - (&events.on_fail, "task.events.on_fail"), - (&events.on_start, "task.events.on_start"), - (&events.on_cancel, "task.events.on_cancel"), - ]; - - for (queue, field) in queues { - if let Some(queue) = queue { - if self.global.event_queue(&queue.name).is_none() { - return Err(Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{field}.name: event queue not found"), - }); - } - } - } - } - - // We need to do validation here. - if let Some(image) = request.input_upload.as_ref() { - - } - - todo!() - } - - async fn cancel_task( - &self, - request: CancelTaskRequest, - ) -> Result { - todo!() - } + async fn cancel_task(&self, request: CancelTaskRequest) -> Result { + todo!() + } } - pub async fn start(global: Arc) -> anyhow::Result<()> { - let server = ManagementServer { - global, - }; - - let http = async { - if server.global.config().management.http.enabled { - server.run_http().await.context("http") - } else { - Ok(()) - } - }; - let grpc = async { - if server.global.config().management.grpc.enabled { - server.run_grpc().await.context("grpc") - } else { - Ok(()) - } - }; - - futures::future::try_join(http, grpc).await.context("management")?; - - Ok(()) + let server = ManagementServer { global }; + + let http = async { + if server.global.config().management.http.enabled { + server.run_http().await.context("http") + } else { + Ok(()) + } + }; + let grpc = async { + if server.global.config().management.grpc.enabled { + server.run_grpc().await.context("grpc") + } else { + Ok(()) + } + }; + + futures::future::try_join(http, grpc).await.context("management")?; + + Ok(()) } - diff --git a/image-processor/src/management/validation.rs b/image-processor/src/management/validation.rs index e8bf8150..dc85ea38 100644 --- a/image-processor/src/management/validation.rs +++ b/image-processor/src/management/validation.rs @@ -1,307 +1,427 @@ use std::sync::Arc; -use scuffle_image_processor_proto::{input_path::InputPath, DrivePath, Error, ErrorCode, Input, InputMetadata, Output, Task}; +use scuffle_image_processor_proto::{ + input, DrivePath, Error, ErrorCode, EventQueue, Events, Input, InputMetadata, Limits, Output, OutputVariants, Task, +}; use url::Url; use crate::global::Global; +#[derive(Debug, Clone, Copy)] +pub enum FragmentItem { + Map(&'static str), + Index(usize), +} -#[derive(Debug, Default, Clone)] -pub struct Fragment { - path: Vec<&'static str>, +#[derive(Debug)] +pub struct FragmentBuf { + path: Vec, } -impl std::fmt::Display for Fragment { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.path.join(".")) - } +impl FragmentBuf { + pub fn new() -> Self { + Self { path: Vec::new() } + } + + pub fn push<'a>(&'a mut self, path: impl Into) -> Fragment<'a> { + self.path.push(path.into()); + Fragment::new(&mut self.path) + } + + pub fn as_fagment(&mut self) -> Fragment { + Fragment::new(&mut self.path) + } } -impl Fragment { - pub fn new() -> Self { - Self::default() - } - - pub fn push(&mut self, path: &'static str) { - self.path.push(path); - } - - pub fn pop(&mut self) { - self.path.pop(); - } - - pub fn push_clone(&self, path: &'static str) -> Self { - let mut fragment = self.clone(); - fragment.push(path); - fragment - } - - pub fn push_str(&self, path: &str) -> String { - let mut builder = self.path.join("."); - if !builder.is_empty() { - builder.push('.'); - } - builder.push_str(path); - builder - } +#[derive(Debug)] +pub struct Fragment<'a> { + path: &'a mut Vec, } -pub fn validate_task( - global: &Arc, - mut fragment: Fragment, - task: Option<&Task>, -) -> Result<(), Error> { - let task = task.ok_or_else(|| Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{fragment} is required"), - })?; +impl<'a> Fragment<'a> { + pub fn new(path: &'a mut Vec) -> Self { + Self { path } + } +} - validate_input(global, fragment.push_clone("input"), task.input.as_ref())?; +impl From<&'static str> for FragmentItem { + fn from(value: &'static str) -> Self { + Self::Map(value) + } +} - validate_output(global, fragment.push_clone("output"), task.output.as_ref())?; +impl From for FragmentItem { + fn from(value: usize) -> Self { + Self::Index(value) + } +} - Ok(()) +// This is a bit of a hack to allow us to convert from a reference to a copy. +// &&'static str -> &'static str -> FragmentItem +// &usize -> usize -> FragmentItem +impl From<&T> for FragmentItem + where + T: Copy, + FragmentItem: From, +{ + fn from(value: &T) -> Self { + Self::from(*value) + } } -pub fn validate_output( - global: &Arc, - mut fragment: Fragment, - output: Option<&Output>, -) -> Result<(), Error> { - let output = output.ok_or_else(|| Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{fragment} is required"), - })?; +impl std::fmt::Display for Fragment<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut first = true; + for item in self.path.iter() { + match item { + FragmentItem::Map(value) => { + if first { + write!(f, ".")?; + } + write!(f, "{value}")?; + } + FragmentItem::Index(value) => { + write!(f, "[{value}]")?; + } + } + + first = false; + } + + Ok(()) + } +} - validate_drive_path(global, fragment.push_clone("path"), output.path.as_ref(), false)?; - +impl Fragment<'_> { + pub fn push<'a>(&'a mut self, path: impl Into) -> Fragment<'a> { + self.path.push(path.into()); + Fragment::new(self.path) + } +} - Ok(()) +impl Drop for Fragment<'_> { + fn drop(&mut self) { + self.path.pop(); + } } -pub fn validate_input( - global: &Arc, - mut fragment: Fragment, - input: Option<&Input> -) -> Result<(), Error> { - let input = input.ok_or_else(|| Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{fragment} is required"), - })?; +pub fn validate_task(global: &Arc, mut fragment: Fragment, task: Option<&Task>) -> Result<(), Error> { + let task = task.ok_or_else(|| Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{fragment}: is required"), + })?; + + validate_input(global, fragment.push("input"), task.input.as_ref())?; - let path = input.path.as_ref().ok_or_else(|| Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{} is required", fragment.push_str("path")), - })?; + validate_output(global, fragment.push("output"), task.output.as_ref())?; - validate_input_path(global, fragment.push_clone("path"), path.input_path.as_ref())?; + validate_events(global, fragment.push("events"), task.events.as_ref())?; - // Metadata is optional - if let Some(metadata) = &input.metadata { - validate_input_metadata(global, fragment.push_clone("metadata"), Some(metadata))?; - } + if let Some(limits) = &task.limits { + validate_limits(fragment.push("limits"), Some(limits))?; + } - Ok(()) + Ok(()) } -pub fn validate_input_metadata( - global: &Arc, - mut fragment: Fragment, - metadata: Option<&InputMetadata>, -) -> Result<(), Error> { - let metadata = metadata.ok_or_else(|| Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{} is required", fragment), - })?; - - match (metadata.static_frame_index, metadata.frame_count) { - (None, Some(frame_count)) if frame_count == 0 => { - return Err(Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{}: frame_count must be non 0", fragment), - }); - } - (Some(static_frame_index), Some(frame_count)) if static_frame_index >= frame_count => { - return Err(Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{}: static_frame_index must be less than frame_count, {static_frame_index} >= {frame_count}", fragment), - }); - }, - (Some(_), None) => { - return Err(Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{}: is required when static_frame_index is provided", fragment.push_str("frame_count")), - }); - }, - _ => {}, - } - - match (metadata.width, metadata.height) { - (Some(width), Some(height)) => { - let checks = [ - (width, "width"), - (height, "height"), - ]; - - for (value, field) in checks { - if value == 0 { - return Err(Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{}: must be non 0", fragment.push_str(field)), - }); - } - - if value > u16::MAX as u32 { - return Err(Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{}: must be less than {}", fragment.push_str(field), u16::MAX), - }); - } - } - }, - (None, None) => {}, - (Some(_), None) => { - return Err(Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{}: is required when width is provided", fragment.push_str("height")), - }); - }, - (None, Some(_)) => { - return Err(Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{}: height is required when width is provided", fragment.push_str("width")), - }); - } - } - - Ok(()) +fn validate_limits(mut fragment: Fragment, limits: Option<&Limits>) -> Result<(), Error> { + let limits = limits.ok_or_else(|| Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{fragment}: is required"), + })?; + + let fields = [ + (limits.max_processing_time_ms, "max_processing_time_ms"), + (limits.max_input_frame_count, "max_input_frame_count"), + (limits.max_input_width, "max_input_width"), + (limits.max_input_height, "max_input_height"), + (limits.max_input_duration_ms, "max_input_duration_ms"), + ]; + + for (value, name) in &fields { + if let Some(0) = value { + return Err(Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{}: must be non 0", fragment.push(name)), + }); + } + } + + Ok(()) +} + +fn validate_events(global: &Arc, mut fragment: Fragment, events: Option<&Events>) -> Result<(), Error> { + let events = events.ok_or_else(|| Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{fragment}: is required"), + })?; + + let events = [ + (events.on_success.as_ref(), "on_success"), + (events.on_failure.as_ref(), "on_failure"), + (events.on_cancel.as_ref(), "on_cancel"), + (events.on_start.as_ref(), "on_start"), + ]; + + for (event, name) in &events { + if let Some(event) = event { + validate_event_queue(global, fragment.push(name), Some(event))?; + } + } + + Ok(()) +} + +fn validate_event_queue(global: &Arc, mut fragment: Fragment, event: Option<&EventQueue>) -> Result<(), Error> { + let event_queue = event.ok_or_else(|| Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{fragment}: is required"), + })?; + + if event_queue.name.is_empty() { + return Err(Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{}: is required", fragment.push("name")), + }); + } + + if event_queue.topic.is_empty() { + return Err(Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{}: is required", fragment.push("topic")), + }); + } + + if global.event_queue(&event_queue.name).is_none() { + return Err(Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{fragment}: event queue not found"), + }); + } + + Ok(()) +} + +pub fn validate_output(global: &Arc, mut fragment: Fragment, output: Option<&Output>) -> Result<(), Error> { + let output = output.ok_or_else(|| Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{fragment}: is required"), + })?; + + validate_drive_path(global, fragment.push("path"), output.drive_path.as_ref())?; + + validate_output_variants(fragment.push("variants"), output.variants.as_ref())?; + + Ok(()) +} + +pub fn validate_output_variants(mut fragment: Fragment, variants: Option<&OutputVariants>) -> Result<(), Error> { + let variants = variants.ok_or_else(|| Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{fragment}: is required"), + })?; + + validate_template_string( + fragment.push("suffix"), + &[ + "id", + "format", + "scale", + "width", + "height", + "format_idx", + "resize_idx", + "static", + "ext", + ], + &variants.suffix, + )?; + + if variants.formats.is_empty() { + return Err(Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{}: is required", fragment.push("formats")), + }); + } + + for (idx, format) in variants.formats.iter().enumerate() {} + + Ok(()) +} + +pub fn validate_input(global: &Arc, mut fragment: Fragment, input: Option<&Input>) -> Result<(), Error> { + let input = input.ok_or_else(|| Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{fragment}: is required"), + })?; + + validate_input_path(global, fragment.push("path"), input.path.as_ref())?; + + // Metadata is optional + if let Some(metadata) = &input.metadata { + validate_input_metadata(fragment.push("metadata"), Some(metadata))?; + } + + Ok(()) +} + +pub fn validate_input_metadata(mut fragment: Fragment, metadata: Option<&InputMetadata>) -> Result<(), Error> { + let metadata = metadata.ok_or_else(|| Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{} is required", fragment), + })?; + + match (metadata.static_frame_index, metadata.frame_count) { + (None, Some(frame_count)) if frame_count == 0 => { + return Err(Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{}: frame_count must be non 0", fragment), + }); + } + (Some(static_frame_index), Some(frame_count)) if static_frame_index >= frame_count => { + return Err(Error { + code: ErrorCode::InvalidInput as i32, + message: format!( + "{}: static_frame_index must be less than frame_count, {static_frame_index} >= {frame_count}", + fragment + ), + }); + } + (Some(_), None) => { + return Err(Error { + code: ErrorCode::InvalidInput as i32, + message: format!( + "{}: is required when static_frame_index is provided", + fragment.push("frame_count") + ), + }); + } + _ => {} + } + + if metadata.width == 0 { + return Err(Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{}: width must be non 0", fragment.push("width")), + }); + } + + if metadata.height == 0 { + return Err(Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{}: height must be non 0", fragment.push("height")), + }); + } + + Ok(()) } pub fn validate_input_path( - global: &Arc, - mut fragment: Fragment, - input_path: Option<&InputPath>, + global: &Arc, + mut fragment: Fragment, + input_path: Option<&input::Path>, ) -> Result<(), Error> { - let input_path = input_path.ok_or_else(|| Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{} is required", fragment.push_str("input_path")), - })?; - - match input_path { - InputPath::DrivePath(drive_path) => { - validate_drive_path(global, fragment.push_clone("input_path.drive_path"), Some(drive_path), true)?; - }, - InputPath::PublicUrl(url) => { - validate_public_url(global, fragment.push_clone("input_path.public_url"), url)?; - }, - } - - Ok(()) + let input_path = input_path.ok_or_else(|| Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{} is required", fragment), + })?; + + match input_path { + input::Path::DrivePath(drive_path) => { + validate_drive_path(global, fragment.push("drive_path"), Some(drive_path))?; + } + input::Path::PublicUrl(url) => { + validate_public_url(global, fragment.push("public_url"), url)?; + } + } + + Ok(()) } pub fn validate_drive_path( - global: &Arc, - mut fragment: Fragment, - drive_path: Option<&DrivePath>, - is_input: bool, + global: &Arc, + mut fragment: Fragment, + drive_path: Option<&DrivePath>, ) -> Result<(), Error> { - let drive_path = drive_path.ok_or_else(|| Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{} is required", fragment), - })?; - - if global.drive(&drive_path.drive).is_none() { - return Err(Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{}: drive not found", fragment.push_str("drive")), - }); - } - - const INPUT_PATH_ALLOWED_VARS: &[&str] = &[ - "id", - ]; - - const OUTPUT_PATH_ALLOWED_VARS: &[&str] = &[ - "id", - "scale", - "ext", - "width", - "format", - "height", - ]; - - let allowed_vars = if is_input { - INPUT_PATH_ALLOWED_VARS - } else { - OUTPUT_PATH_ALLOWED_VARS - }; - - validate_template_string(allowed_vars, &drive_path.path).map_err(|err| { - match err { - strfmt::FmtError::KeyError(key) => Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{}: invalid variable '{}' allowed variables {:?}", fragment.push_str("path"), key, allowed_vars), - }, - strfmt::FmtError::TypeError(_) | strfmt::FmtError::Invalid(_) => Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{}: invalid template syntax", fragment.push_str("path")), - }, - } - })?; - - Ok(()) + let drive_path = drive_path.ok_or_else(|| Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{} is required", fragment), + })?; + + if global.drive(&drive_path.drive).is_none() { + return Err(Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{}: drive not found", fragment.push("drive")), + }); + } + + validate_template_string(fragment.push("path"), &["id"], &drive_path.path)?; + + Ok(()) } -pub fn validate_public_url( - global: &Arc, - mut fragment: Fragment, - url: &str, -) -> Result<(), Error> { - if url.is_empty() { - return Err(Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{} is required", fragment), - }); - } else if global.public_http_drive().is_none() { - return Err(Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{}: public http drive not found", fragment), - }); - } - - let url = Url::parse(url).map_err(|e| Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{}: {}", fragment, e), - })?; - - if url.scheme() != "http" && url.scheme() != "https" { - return Err(Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{}: scheme must be http or https", fragment), - }); - } - - if url.host().is_none() { - return Err(Error { - code: ErrorCode::InvalidInput as i32, - message: format!("{}: host is required", fragment), - }); - } - - Ok(()) +pub fn validate_public_url(global: &Arc, fragment: Fragment, url: &str) -> Result<(), Error> { + if url.is_empty() { + return Err(Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{fragment}: is required"), + }); + } else if global.public_http_drive().is_none() { + return Err(Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{fragment}: public http drive not found"), + }); + } + + let url = Url::parse(url).map_err(|e| Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{fragment}: {e}"), + })?; + + if url.scheme() != "http" && url.scheme() != "https" { + return Err(Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{fragment}: scheme must be http or https"), + }); + } + + if url.host().is_none() { + return Err(Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{fragment}: url host is required"), + }); + } + + Ok(()) } -fn validate_template_string( - allowed_vars: &[&str], - template: &str, -) -> Result { - let formatter = |fmt: strfmt::Formatter| { - let k: &str = fmt.key; - if !allowed_vars.contains(&k) { - return Err(strfmt::FmtError::KeyError(k.to_owned())); - } - Ok(()) - }; - - strfmt::strfmt_map(template, formatter) +fn validate_template_string(fragment: Fragment, allowed_vars: &[&str], template: &str) -> Result { + if template.is_empty() { + return Err(Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{fragment}: is required"), + }); + } + + let formatter = |fmt: strfmt::Formatter| { + let k: &str = fmt.key; + if !allowed_vars.contains(&k) { + return Err(strfmt::FmtError::KeyError(k.to_owned())); + } + Ok(()) + }; + + strfmt::strfmt_map(template, formatter).map_err(|err| match err { + strfmt::FmtError::KeyError(key) => Error { + code: ErrorCode::InvalidInput as i32, + message: format!( + "{fragment}: invalid variable '{key}', the allowed variables are {:?}", + allowed_vars + ), + }, + strfmt::FmtError::TypeError(_) | strfmt::FmtError::Invalid(_) => Error { + code: ErrorCode::InvalidInput as i32, + message: format!("{fragment}: invalid template syntax"), + }, + }) } diff --git a/image-processor/src/worker.rs b/image-processor/src/worker.rs index d87f5fd8..dde504b3 100644 --- a/image-processor/src/worker.rs +++ b/image-processor/src/worker.rs @@ -3,6 +3,6 @@ use std::sync::Arc; use crate::global::Global; pub async fn start(global: Arc) -> anyhow::Result<()> { - std::future::pending::<()>().await; + std::future::pending::<()>().await; Ok(()) }