diff --git a/cdviz-collector/.mise.toml b/cdviz-collector/.mise.toml index 69ed033..bfb9187 100644 --- a/cdviz-collector/.mise.toml +++ b/cdviz-collector/.mise.toml @@ -7,7 +7,7 @@ OTEL_TRACES_SAMPLER = "always_off" [tools] task = '3' # to have a set of simple commands for repetive task (and CI) -rust = '1.80.1' # the rust tool stack (with cargo, fmt, clippy) to build source +rust = '1.81.0' # the rust tool stack (with cargo, fmt, clippy) to build source binstall = '1.7.4' # do not use cargo-binstall (it's a special name used by mise) [plugins] diff --git a/cdviz-collector/Cargo.toml b/cdviz-collector/Cargo.toml index 2f89888..73534a6 100644 --- a/cdviz-collector/Cargo.toml +++ b/cdviz-collector/Cargo.toml @@ -8,10 +8,11 @@ version = "0.1.0" authors = ["David Bernard"] license = "Apache-2.0" repository = "https://github.com/davidB/cdviz" -rust-version = "1.80.1" +rust-version = "1.81.0" publish = false [dependencies] +async-trait = "0.1" axum = { version = "0.7", optional = true } axum-tracing-opentelemetry = { version = "0.21", optional = true } bytes = { version = "1.7", optional = true } diff --git a/cdviz-collector/Dockerfile b/cdviz-collector/Dockerfile index afd7a06..86d4ff8 100644 --- a/cdviz-collector/Dockerfile +++ b/cdviz-collector/Dockerfile @@ -4,7 +4,7 @@ # see https://edu.chainguard.dev/chainguard/chainguard-images/reference/rust/image_specs/ # checkov:skip=CKV_DOCKER_7:Ensure the base image uses a non latest version tag # trivy:ignore:AVD-DS-0001 -FROM cgr.dev/chainguard/rust:1.80.1 as build +FROM cgr.dev/chainguard/rust:1.81.0 as build ARG PROFILE=release USER nonroot WORKDIR /work diff --git a/cdviz-collector/examples/assets/cdviz-collector.toml b/cdviz-collector/examples/assets/cdviz-collector.toml index 6c815f9..1e864c2 100644 --- a/cdviz-collector/examples/assets/cdviz-collector.toml +++ b/cdviz-collector/examples/assets/cdviz-collector.toml @@ -8,6 +8,9 @@ pool_connections_min = 1 pool_connections_max = 10 [sources.cdevents_json] +transformers = [] + +[sources.cdevents_json.extractor] type = "opendal" kind = "fs" polling_interval = "10s" @@ -15,16 +18,11 @@ polling_interval = "10s" parameters = { root = "./opendal_fs" } recursive = true path_patterns = ["**/*.json"] -transformer = { extractor = "json" } +parser = "json" -[sources.cdevents_csv] -type = "opendal" -kind = "fs" -polling_interval = "10s" -parameters = { root = "./opendal_fs" } -recursive = false -path_patterns = ["cdevents.csv"] -transformer = { extractor = "csv_row", transform = { format = "hbs", content = """ +[[sources.cdevents_csv.transformers]] +type = "hbs" +template = """ { "context": { "version": "0.4.0-draft", @@ -45,9 +43,18 @@ transformer = { extractor = "csv_row", transform = { format = "hbs", content = " } } } -""" } } +""" + +[sources.cdevents_csv.extractor] +type = "opendal" +kind = "fs" +polling_interval = "10s" +parameters = { root = "./opendal_fs" } +recursive = false +path_patterns = ["cdevents.csv"] +parser = "csv_row" -[sources.cdevents_webhook] +[sources.cdevents_webhook.extractor] type = "http" host = "0.0.0.0" port = 8080 diff --git a/cdviz-collector/rust-toolchain.toml b/cdviz-collector/rust-toolchain.toml index a56a283..1de01fa 100644 --- a/cdviz-collector/rust-toolchain.toml +++ b/cdviz-collector/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "1.80.1" +channel = "1.81.0" diff --git a/cdviz-collector/src/main.rs b/cdviz-collector/src/main.rs index 3a3e5f8..487a913 100644 --- a/cdviz-collector/src/main.rs +++ b/cdviz-collector/src/main.rs @@ -1,4 +1,5 @@ mod errors; +mod pipes; mod sinks; mod sources; @@ -9,7 +10,7 @@ use clap::Parser; use clap_verbosity_flag::Verbosity; use errors::{Error, Result}; use figment::{ - providers::{Env, Format, Toml}, + providers::{Env, Format, Serialized, Toml}, Figment, }; use futures::future::TryJoinAll; @@ -25,10 +26,12 @@ pub(crate) struct Cli { verbose: clap_verbosity_flag::Verbosity, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize, Default)] pub(crate) struct Config { sources: HashMap, sinks: HashMap, + // extractors: HashMap, + // transformers: HashMap, } type Sender = tokio::sync::broadcast::Sender; @@ -85,7 +88,7 @@ async fn main() -> Result<()> { if let Some(dir) = cli.config.parent() { std::env::set_current_dir(dir)?; } - let config: Config = Figment::new() + let config: Config = Figment::from(Serialized::defaults(Config::default())) .merge(Toml::file(cli.config.as_path())) .merge(Env::prefixed("CDVIZ_COLLECTOR_")) .extract()?; diff --git a/cdviz-collector/src/pipes/discard_all.rs b/cdviz-collector/src/pipes/discard_all.rs new file mode 100644 index 0000000..6035ae6 --- /dev/null +++ b/cdviz-collector/src/pipes/discard_all.rs @@ -0,0 +1,20 @@ +use super::Pipe; +use crate::errors::Result; +use std::marker::PhantomData; + +pub(crate) struct Processor { + input_type: PhantomData, +} + +impl Processor { + pub(crate) fn new() -> Self { + Self { input_type: PhantomData } + } +} + +impl Pipe for Processor { + type Input = I; + fn send(&mut self, _input: Self::Input) -> Result<()> { + Ok(()) + } +} diff --git a/cdviz-collector/src/pipes/log.rs b/cdviz-collector/src/pipes/log.rs new file mode 100644 index 0000000..8cce10f --- /dev/null +++ b/cdviz-collector/src/pipes/log.rs @@ -0,0 +1,38 @@ +use super::Pipe; +use crate::errors::Result; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; +use std::marker::PhantomData; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub(crate) struct Config { + target: String, +} + +pub(crate) struct Processor { + target: String, + next: N, + input_type: PhantomData, +} + +impl Processor { + pub(crate) fn new(target: String, next: N) -> Self { + Self { target, next, input_type: PhantomData } + } + + pub(crate) fn try_from(config: &Config, next: N) -> Result { + Ok(Self::new(config.target.clone(), next)) + } +} + +impl Pipe for Processor +where + I: Debug, + N: Pipe, +{ + type Input = I; + fn send(&mut self, input: Self::Input) -> Result<()> { + tracing::info!(target=self.target, input=?input); + self.next.send(input) + } +} diff --git a/cdviz-collector/src/pipes/mod.rs b/cdviz-collector/src/pipes/mod.rs new file mode 100644 index 0000000..5fcc351 --- /dev/null +++ b/cdviz-collector/src/pipes/mod.rs @@ -0,0 +1,39 @@ +use crate::errors::Result; +pub mod discard_all; +pub mod log; +pub mod passthrough; + +/// A pipe is an interface to implement processor for inputs. +/// The implementations can: +/// +/// - discard / drop all inputs +/// - filter +/// - transform +/// - split +/// - retry +/// - timeout +/// - ... +/// +/// The composition of Pipes to create pipeline could be done by configuration, +/// and the behavior of the pipe should be internal, +/// so chaining of pipes should not depends of method `map`, `fold`, `filter`, +/// `filter_map`, `drop`,... like for `Iterator`, `Stream`, `RxRust`. +/// Also being able to return Error to the sender could help the Sender to ease handling (vs `Stream`) +/// like retry, buffering, forward to its caller... +/// +/// The approach and goal is similar to middleware used in some webframework +/// or in [tower](https://crates.io/crates/tower), Except it's not async. +/// Maybe if we need to support async, `Pipe` will become a specialization of tower's middleware, +/// like [axum](https://crates.io/crates/axum), [warp](https://crates.io/crates/warp), [tonic](https://crates.io/crates/tonic),... do. +pub trait Pipe { + type Input; + + fn send(&mut self, input: Self::Input) -> Result<()>; +} + +impl + ?Sized> Pipe for Box { + type Input = I; + fn send(&mut self, input: Self::Input) -> Result<()> { + T::send(self, input) + } +} diff --git a/cdviz-collector/src/pipes/passthrough.rs b/cdviz-collector/src/pipes/passthrough.rs new file mode 100644 index 0000000..50d6f10 --- /dev/null +++ b/cdviz-collector/src/pipes/passthrough.rs @@ -0,0 +1,30 @@ +use super::Pipe; +use crate::errors::Result; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; +use std::marker::PhantomData; + +#[derive(Debug, Deserialize, Serialize, Default)] +pub(crate) struct Config {} + +pub(crate) struct Processor { + next: N, + input_type: PhantomData, +} + +impl Processor { + pub(crate) fn new(next: N) -> Self { + Self { next, input_type: PhantomData } + } +} + +impl Pipe for Processor +where + I: Debug, + N: Pipe, +{ + type Input = I; + fn send(&mut self, input: Self::Input) -> Result<()> { + self.next.send(input) + } +} diff --git a/cdviz-collector/src/sinks/db.rs b/cdviz-collector/src/sinks/db.rs index 0177f12..676bf5d 100644 --- a/cdviz-collector/src/sinks/db.rs +++ b/cdviz-collector/src/sinks/db.rs @@ -9,7 +9,7 @@ use tracing::Instrument; use super::Sink; /// The database client config -#[derive(Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub(crate) struct Config { /// The database url (with username, password and the database) url: String, diff --git a/cdviz-collector/src/sinks/debug.rs b/cdviz-collector/src/sinks/debug.rs index a3c1a22..b79478d 100644 --- a/cdviz-collector/src/sinks/debug.rs +++ b/cdviz-collector/src/sinks/debug.rs @@ -5,7 +5,7 @@ use crate::Message; use super::Sink; -#[derive(Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize, Default)] pub(crate) struct Config {} impl TryFrom for DebugSink { diff --git a/cdviz-collector/src/sinks/http.rs b/cdviz-collector/src/sinks/http.rs index fd12035..9e5c487 100644 --- a/cdviz-collector/src/sinks/http.rs +++ b/cdviz-collector/src/sinks/http.rs @@ -11,7 +11,7 @@ use reqwest_tracing::TracingMiddleware; use super::Sink; -#[derive(Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub(crate) struct Config { destination: Url, } diff --git a/cdviz-collector/src/sinks/mod.rs b/cdviz-collector/src/sinks/mod.rs index 912ee3b..d7e7bf9 100644 --- a/cdviz-collector/src/sinks/mod.rs +++ b/cdviz-collector/src/sinks/mod.rs @@ -14,7 +14,7 @@ use db::DbSink; use debug::DebugSink; use http::HttpSink; -#[derive(Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] #[serde(tag = "type")] pub(crate) enum Config { #[cfg(feature = "sink_db")] @@ -26,6 +26,12 @@ pub(crate) enum Config { Http(http::Config), } +impl Default for Config { + fn default() -> Self { + Self::Debug(debug::Config {}) + } +} + impl TryFrom for SinkEnum { type Error = crate::errors::Error; diff --git a/cdviz-collector/src/sources/extractors.rs b/cdviz-collector/src/sources/extractors.rs new file mode 100644 index 0000000..ec01f25 --- /dev/null +++ b/cdviz-collector/src/sources/extractors.rs @@ -0,0 +1,43 @@ +use super::{http, opendal, EventSourcePipe, Extractor}; +use crate::errors::Result; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Deserialize, Serialize, Default)] +#[serde(tag = "type")] +pub(crate) enum Config { + #[serde(alias = "noop")] + #[default] + Sleep, + #[cfg(feature = "source_http")] + #[serde(alias = "http")] + Http(http::Config), + #[cfg(feature = "source_opendal")] + #[serde(alias = "opendal")] + Opendal(opendal::Config), +} + +impl Config { + pub(crate) fn make_extractor(&self, next: EventSourcePipe) -> Result> { + let out: Box = match self { + Config::Sleep => Box::new(SleepExtractor {}), + #[cfg(feature = "source_http")] + Config::Http(config) => Box::new(http::HttpExtractor::try_from(config, next)?), + #[cfg(feature = "source_opendal")] + Config::Opendal(config) => Box::new(opendal::OpendalExtractor::try_from(config, next)?), + }; + Ok(out) + } +} + +struct SleepExtractor {} + +#[async_trait::async_trait] +impl Extractor for SleepExtractor { + async fn run(&mut self) -> Result<()> { + use std::future; + + let future = future::pending(); + let () = future.await; + unreachable!() + } +} diff --git a/cdviz-collector/src/sources/hbs.rs b/cdviz-collector/src/sources/hbs.rs new file mode 100644 index 0000000..f4a39c8 --- /dev/null +++ b/cdviz-collector/src/sources/hbs.rs @@ -0,0 +1,28 @@ +use super::{EventSource, EventSourcePipe}; +use crate::errors::Result; +use crate::pipes::Pipe; +use handlebars::Handlebars; + +pub(crate) struct Processor { + next: EventSourcePipe, + hbs: Handlebars<'static>, +} + +impl Processor { + pub(crate) fn new(template: &str, next: EventSourcePipe) -> Result { + let mut hbs = Handlebars::new(); + hbs.set_dev_mode(false); + hbs.set_strict_mode(true); + hbs.register_template_string("tpl", template)?; + Ok(Self { next, hbs }) + } +} + +impl Pipe for Processor { + type Input = EventSource; + fn send(&mut self, input: Self::Input) -> Result<()> { + let res = self.hbs.render("tpl", &input)?; + let output: EventSource = serde_json::from_str(&res)?; + self.next.send(output) + } +} diff --git a/cdviz-collector/src/sources/http.rs b/cdviz-collector/src/sources/http.rs index 48b758d..6c79183 100644 --- a/cdviz-collector/src/sources/http.rs +++ b/cdviz-collector/src/sources/http.rs @@ -1,7 +1,7 @@ -use super::Source; +use super::{EventSourcePipe, Extractor}; use crate::{ errors::{self, Error}, - Message, Sender, + sources::EventSource, }; use axum::{ extract::State, @@ -11,11 +11,13 @@ use axum::{ Json, Router, }; use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer}; -use cdevents_sdk::CDEvent; use errors::Result; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::net::{IpAddr, SocketAddr}; +use std::{ + net::{IpAddr, SocketAddr}, + sync::{Arc, Mutex}, +}; /// The http server config #[derive(Clone, Debug, Deserialize, Serialize)] @@ -29,26 +31,25 @@ pub(crate) struct Config { pub(crate) port: u16, } -impl TryFrom for HttpSource { - type Error = crate::errors::Error; - - fn try_from(value: Config) -> Result { - Ok(HttpSource { config: value.clone() }) - } -} - -pub(crate) struct HttpSource { +pub(crate) struct HttpExtractor { config: Config, + next: Arc>, } +impl HttpExtractor { + pub fn try_from(value: &Config, next: EventSourcePipe) -> Result { + Ok(HttpExtractor { config: value.clone(), next: Arc::new(Mutex::new(next)) }) + } +} #[derive(Clone)] struct AppState { - tx: Sender, + next: Arc>, } -impl Source for HttpSource { - async fn run(&mut self, tx: Sender) -> Result<()> { - let app_state = AppState { tx }; +#[async_trait::async_trait] +impl Extractor for HttpExtractor { + async fn run(&mut self) -> Result<()> { + let app_state = AppState { next: Arc::clone(&self.next) }; let app = app().with_state(app_state); // run it @@ -64,10 +65,11 @@ impl Source for HttpSource { } } +//TODO make route per extractor/sources fn app() -> Router { // build our application with a route Router::new() - .route("/cdevents", post(cdevents_collect)) + .route("/cdevents", post(events_collect)) // include trace context as header into the response .layer(OtelInResponseLayer) //start OpenTelemetry trace on incoming request @@ -80,17 +82,20 @@ async fn health() -> impl IntoResponse { http::StatusCode::OK } -//TODO validate format of cdevents JSON //TODO support events in cloudevents format (extract info from headers) //TODO try [deser](https://crates.io/crates/deserr) to return good error //TODO use cloudevents -#[tracing::instrument(skip(app_state, payload))] -async fn cdevents_collect( +//TODO add metadata & headers info into SourceEvent +//TODO log & convert error +#[tracing::instrument(skip(app_state, body))] +async fn events_collect( State(app_state): State, - Json(payload): Json, + Json(body): Json, ) -> Result { - tracing::trace!("received cloudevent {:?}", &payload); - app_state.tx.send(Message::from(payload))?; + tracing::trace!("received {:?}", &body); + let event = EventSource { body, ..Default::default() }; + let mut next = app_state.next.lock().unwrap(); + next.as_mut().send(event)?; Ok(http::StatusCode::CREATED) } diff --git a/cdviz-collector/src/sources/mod.rs b/cdviz-collector/src/sources/mod.rs index 1df4768..a4590b5 100644 --- a/cdviz-collector/src/sources/mod.rs +++ b/cdviz-collector/src/sources/mod.rs @@ -1,67 +1,65 @@ +pub(crate) mod extractors; +mod hbs; #[cfg(feature = "source_http")] pub(crate) mod http; -pub(crate) mod noop; #[cfg(feature = "source_opendal")] -pub(crate) mod opendal; +mod opendal; +mod send_cdevents; +pub(crate) mod transformers; use crate::errors::Result; +use crate::pipes::Pipe; use crate::{Message, Sender}; -use enum_dispatch::enum_dispatch; -#[cfg(feature = "source_http")] -use http::HttpSource; -use noop::NoopSource; -#[cfg(feature = "source_opendal")] -use opendal::OpendalSource; +use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::collections::HashMap; use tokio::task::JoinHandle; -#[derive(Debug, Deserialize, Serialize)] -#[serde(tag = "type")] -pub(crate) enum Config { - #[cfg(feature = "source_http")] - #[serde(alias = "http")] - Http(http::Config), - #[serde(alias = "noop")] - Noop(noop::Config), - #[cfg(feature = "source_opendal")] - #[serde(alias = "opendal")] - Opendal(opendal::Config), -} - -impl TryFrom for SourceEnum { - type Error = crate::errors::Error; +// #[enum_dispatch] +// #[allow(clippy::enum_variant_names, clippy::large_enum_variant)] +// enum SourceEnum { +// #[cfg(feature = "source_http")] +// HttpSource, +// NoopSource, +// #[cfg(feature = "source_opendal")] +// OpendalSource, +// } - fn try_from(value: Config) -> Result { - let out = match value { - #[cfg(feature = "source_http")] - Config::Http(config) => HttpSource::try_from(config)?.into(), - Config::Noop(config) => NoopSource::try_from(config)?.into(), - #[cfg(feature = "source_opendal")] - Config::Opendal(config) => OpendalSource::try_from(config)?.into(), - }; - Ok(out) - } -} - -#[enum_dispatch] -#[allow(clippy::enum_variant_names, clippy::large_enum_variant)] -enum SourceEnum { - #[cfg(feature = "source_http")] - HttpSource, - NoopSource, - #[cfg(feature = "source_opendal")] - OpendalSource, -} - -#[enum_dispatch(SourceEnum)] -trait Source { - async fn run(&mut self, tx: Sender) -> Result<()>; +// TODO support name/reference for extractor / transformer +#[derive(Clone, Debug, Deserialize, Serialize, Default)] +pub(crate) struct Config { + #[serde(default)] + extractor: extractors::Config, + #[serde(default)] + transformers: Vec, } pub(crate) fn start(_name: String, config: Config, tx: Sender) -> JoinHandle> { tokio::spawn(async move { - let mut source = SourceEnum::try_from(config)?; - source.run(tx).await?; + let mut pipe: EventSourcePipe = Box::new(send_cdevents::Processor::new(tx)); + let mut tconfigs = config.transformers.clone(); + tconfigs.reverse(); + for tconfig in tconfigs { + pipe = tconfig.make_transformer(pipe)? + } + let mut extractor = config.extractor.make_extractor(pipe)?; + extractor.run().await?; Ok(()) }) } + +#[derive(Debug, Deserialize, Serialize, Default)] +pub struct EventSource { + metadata: Value, + header: HashMap, + body: Value, +} + +// TODO explore to use enum_dispatch instead of Box(dyn) on EventSourcePipe (a recursive structure) +type EventSourcePipe = Box + Send + Sync>; + +#[async_trait] +pub trait Extractor: Send + Sync { + async fn run(&mut self) -> Result<()>; +} diff --git a/cdviz-collector/src/sources/noop.rs b/cdviz-collector/src/sources/noop.rs deleted file mode 100644 index df43be9..0000000 --- a/cdviz-collector/src/sources/noop.rs +++ /dev/null @@ -1,29 +0,0 @@ -use crate::errors::Result; -use crate::{Message, Sender}; -use serde::Deserialize; -use serde::Serialize; -use std::time::Duration; -use tokio::time::sleep; - -use super::Source; - -#[derive(Debug, Deserialize, Serialize)] -pub(crate) struct Config {} - -impl TryFrom for NoopSource { - type Error = crate::errors::Error; - - fn try_from(_value: Config) -> Result { - Ok(Self {}) - } -} - -pub(crate) struct NoopSource {} - -impl Source for NoopSource { - async fn run(&mut self, _tx: Sender) -> Result<()> { - loop { - sleep(Duration::MAX).await; - } - } -} diff --git a/cdviz-collector/src/sources/opendal/mod.rs b/cdviz-collector/src/sources/opendal/mod.rs index 0706c42..9962e2a 100644 --- a/cdviz-collector/src/sources/opendal/mod.rs +++ b/cdviz-collector/src/sources/opendal/mod.rs @@ -1,15 +1,13 @@ //TODO add persistance for state (time window to not reprocess same file after restart) mod filter; -mod texecutors; -mod transformers; +mod parsers; use self::filter::{globset_from, Filter}; -use self::transformers::{Transformer, TransformerEnum}; -use super::Source; +use self::parsers::{Parser, ParserEnum}; +use super::{EventSourcePipe, Extractor}; use crate::errors::Result; -use crate::{Message, Sender}; -use cdevents_sdk::CDEvent; +use async_trait::async_trait; use futures::TryStreamExt; use opendal::Metakey; use opendal::Operator; @@ -23,7 +21,7 @@ use tokio::time::sleep; use tracing::instrument; #[serde_as] -#[derive(Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub(crate) struct Config { #[serde(with = "humantime_serde")] polling_interval: Duration, @@ -32,39 +30,38 @@ pub(crate) struct Config { parameters: HashMap, recursive: bool, path_patterns: Vec, - transformer: transformers::Config, + parser: parsers::Config, } -impl TryFrom for OpendalSource { - type Error = crate::errors::Error; +pub(crate) struct OpendalExtractor { + op: Operator, + polling_interval: Duration, + recursive: bool, + filter: Filter, + parser: ParserEnum, +} - fn try_from(value: Config) -> Result { - let op: Operator = Operator::via_iter(value.kind, value.parameters)?; +impl OpendalExtractor { + pub(crate) fn try_from(value: &Config, next: EventSourcePipe) -> Result { + let op: Operator = Operator::via_iter(value.kind, value.parameters.clone())?; let filter = Filter::from_patterns(globset_from(&value.path_patterns)?); - let transformer = value.transformer.try_into()?; + let parser = value.parser.make_parser(next)?; Ok(Self { op, polling_interval: value.polling_interval, recursive: value.recursive, filter, - transformer, + parser, }) } } -pub(crate) struct OpendalSource { - op: Operator, - polling_interval: Duration, - recursive: bool, - filter: Filter, - transformer: TransformerEnum, -} - -impl Source for OpendalSource { - async fn run(&mut self, tx: Sender) -> Result<()> { +#[async_trait] +impl Extractor for OpendalExtractor { + async fn run(&mut self) -> Result<()> { loop { if let Err(err) = - run_once(&tx, &self.op, &self.filter, self.recursive, &self.transformer).await + run_once(&self.op, &self.filter, self.recursive, &mut self.parser).await { tracing::warn!(?err, filter = ?self.filter, scheme =? self.op.info().scheme(), root =? self.op.info().root(), "fail during scanning"); } @@ -74,13 +71,12 @@ impl Source for OpendalSource { } } -#[instrument] +#[instrument(skip(parser))] pub(crate) async fn run_once( - tx: &Sender, op: &Operator, filter: &Filter, recursive: bool, - transformer: &TransformerEnum, + parser: &mut ParserEnum, ) -> Result<()> { // TODO convert into arg of instrument tracing::debug!(filter=? filter, scheme =? op.info().scheme(), root =? op.info().root(), "scanning"); @@ -92,22 +88,10 @@ pub(crate) async fn run_once( .await?; while let Some(entry) = lister.try_next().await? { if filter.accept(&entry) { - if let Err(err) = - process_entry(tx, transformer.transform(op, &entry).await?.into_iter()) - { + if let Err(err) = parser.parse(op, &entry).await { tracing::warn!(?err, path = entry.path(), "fail to process, skip") } } } Ok(()) } - -fn process_entry(tx: &Sender, provider: impl Iterator>) -> Result { - let mut count = 0; - for json in provider { - let cdevent: CDEvent = serde_json::from_slice::(&json)?; - tx.send(cdevent.into())?; - count += 1; - } - Ok(count) -} diff --git a/cdviz-collector/src/sources/opendal/parsers.rs b/cdviz-collector/src/sources/opendal/parsers.rs new file mode 100644 index 0000000..3fe3f5e --- /dev/null +++ b/cdviz-collector/src/sources/opendal/parsers.rs @@ -0,0 +1,170 @@ +use std::collections::HashMap; + +use crate::{ + errors::Result, + sources::{EventSource, EventSourcePipe}, +}; +use bytes::Buf; +use enum_dispatch::enum_dispatch; +use opendal::{Entry, Operator}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub(crate) enum Config { + #[serde(alias = "json")] + Json, + #[serde(alias = "metadata")] + Metadata, + #[serde(alias = "csv_row")] + CsvRow, +} + +impl Config { + pub(crate) fn make_parser(&self, next: EventSourcePipe) -> Result { + let out = match self { + Config::Json => JsonParser::new(next).into(), + Config::Metadata => MetadataParser::new(next).into(), + Config::CsvRow => CsvRowParser::new(next).into(), + }; + Ok(out) + } +} + +#[enum_dispatch] +#[allow(clippy::enum_variant_names)] +pub(crate) enum ParserEnum { + JsonParser, + MetadataParser, + CsvRowParser, +} + +#[enum_dispatch(ParserEnum)] +pub(crate) trait Parser { + async fn parse(&mut self, op: &Operator, entry: &Entry) -> Result<()>; +} + +pub(crate) struct MetadataParser { + next: EventSourcePipe, +} + +impl MetadataParser { + fn new(next: EventSourcePipe) -> Self { + Self { next } + } +} + +impl Parser for MetadataParser { + async fn parse(&mut self, op: &Operator, entry: &Entry) -> Result<()> { + let metadata = extract_metadata(op, entry); + let event = EventSource { metadata, ..Default::default() }; + self.next.send(event) + } +} + +pub(crate) struct JsonParser { + next: EventSourcePipe, +} + +impl JsonParser { + fn new(next: EventSourcePipe) -> Self { + Self { next } + } +} + +impl Parser for JsonParser { + async fn parse(&mut self, op: &Operator, entry: &Entry) -> Result<()> { + let bytes = op.read(entry.path()).await?; + let metadata = extract_metadata(op, entry); + let body: serde_json::Value = serde_json::from_reader(bytes.reader())?; + let event = EventSource { metadata, body, ..Default::default() }; + self.next.send(event) + } +} + +pub(crate) struct CsvRowParser { + next: EventSourcePipe, +} + +impl CsvRowParser { + fn new(next: EventSourcePipe) -> Self { + Self { next } + } +} + +impl Parser for CsvRowParser { + async fn parse(&mut self, op: &Operator, entry: &Entry) -> Result<()> { + use csv::Reader; + + let bytes = op.read(entry.path()).await?; + let mut rdr = Reader::from_reader(bytes.reader()); + let headers = rdr.headers()?.clone(); + let metadata = extract_metadata(op, entry); + for record in rdr.records() { + let record = record?; + let body = json!(headers.iter().zip(record.iter()).collect::>()); + let event = EventSource { metadata: metadata.clone(), body, ..Default::default() }; + self.next.send(event)?; + } + Ok(()) + } +} + +fn extract_metadata(op: &Operator, entry: &Entry) -> Value { + json!({ + "name": entry.name(), + "path": entry.path(), + "root": op.info().root(), + "last_modified": entry.metadata().last_modified().map(|dt| dt.to_rfc3339()), + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use assert2::{check, let_assert}; + use chrono::prelude::*; + use futures::TryStreamExt; + use opendal::Metakey; + use std::path::Path; + + async fn provide_op_entry(prefix: &str) -> (Operator, Entry) { + // Create fs backend builder. + let root = Path::new("examples/assets/opendal_fs"); + let builder = opendal::services::Fs::default().root(&root.to_string_lossy()); + let op: Operator = Operator::new(builder).unwrap().finish(); + let mut entries = op + .lister_with(prefix) + .metakey(Metakey::ContentLength | Metakey::LastModified) + .await + .unwrap(); + let_assert!(Ok(Some(entry)) = entries.try_next().await); + (op, entry) + } + + #[tokio::test] + async fn extract_metadata_works() { + let (op, entry) = provide_op_entry("dir1/file").await; + // Extract the metadata and check that it's what we expect + let result = extract_metadata(&op, &entry); + check!(result["name"] == "file01.txt"); + check!(result["path"] == "dir1/file01.txt"); + let_assert!(Some(abs_root) = result["root"].as_str()); + check!(abs_root.ends_with("examples/assets/opendal_fs")); + let_assert!( + Ok(_) = result["last_modified"].as_str().unwrap_or_default().parse::>() + ); + } + + // TODO + // #[tokio::test] + // async fn csv_row_via_template_works() { + // let (op, entry) = provide_op_entry("cdevents.").await; + // let dest = collect_to_vec::Processor::new(); + // let collector = dest.collector(); + // let sut = CsvRowParser::new(Box::new(collector)); + // assert!(Ok(()) == sut.parse(&op, &entry).await); + // check!(collector.len() == 3); + // // TODO check!(collector[0]. == "dev".as_bytes()); + // } +} diff --git a/cdviz-collector/src/sources/opendal/texecutors.rs b/cdviz-collector/src/sources/opendal/texecutors.rs deleted file mode 100644 index b336a83..0000000 --- a/cdviz-collector/src/sources/opendal/texecutors.rs +++ /dev/null @@ -1,58 +0,0 @@ -use crate::errors::Result; -use enum_dispatch::enum_dispatch; -use handlebars::Handlebars; -use serde::{Deserialize, Serialize}; -use serde_json::Value; - -#[derive(Debug, Deserialize, Serialize)] -#[serde(tag = "format", content = "content")] -pub(crate) enum TExecutorConfig { - #[serde(alias = "hbs")] - Hbs(String), - // #[serde(alias = "vrl")] - // Vrl(String), -} - -impl TryFrom for TExecutorEnum { - type Error = crate::errors::Error; - - fn try_from(value: TExecutorConfig) -> Result { - let out = match value { - TExecutorConfig::Hbs(template) => Hbs::new(&template)?.into(), - }; - Ok(out) - } -} - -#[enum_dispatch] -#[derive(Debug)] -pub(crate) enum TExecutorEnum { - Hbs, -} - -#[enum_dispatch(TExecutorEnum)] -pub(crate) trait TExecutor { - //TODO return a common Iterator or stream type (better than Vec) - fn execute(&self, json: Value) -> Result>; -} - -#[derive(Debug)] -pub(crate) struct Hbs { - hbs: Handlebars<'static>, -} - -impl Hbs { - pub(crate) fn new(template: &str) -> Result { - let mut hbs = Handlebars::new(); - hbs.set_dev_mode(false); - hbs.set_strict_mode(true); - hbs.register_template_string("tpl", template)?; - Ok(Self { hbs }) - } -} - -impl TExecutor for Hbs { - fn execute(&self, data: Value) -> Result> { - Ok(self.hbs.render("tpl", &data)?.into_bytes()) - } -} diff --git a/cdviz-collector/src/sources/opendal/transformers.rs b/cdviz-collector/src/sources/opendal/transformers.rs deleted file mode 100644 index 62ee21b..0000000 --- a/cdviz-collector/src/sources/opendal/transformers.rs +++ /dev/null @@ -1,203 +0,0 @@ -use std::collections::HashMap; - -use crate::errors::Result; -use bytes::Buf; -use enum_dispatch::enum_dispatch; -use opendal::{Entry, Operator}; -use serde::{Deserialize, Serialize}; -use serde_json::{json, Value}; - -use super::texecutors::{TExecutor, TExecutorConfig, TExecutorEnum}; - -#[derive(Debug, Deserialize, Serialize)] -#[serde(tag = "extractor")] -pub(crate) enum Config { - #[serde(alias = "json")] - Json { transform: Option }, - #[serde(alias = "metadata")] - Metadata { transform: TExecutorConfig }, - #[serde(alias = "csv_row")] - CsvRow { transform: TExecutorConfig }, -} - -impl TryFrom for TransformerEnum { - type Error = crate::errors::Error; - - fn try_from(value: Config) -> Result { - let out = match value { - Config::Json { transform } => match transform { - None => JsonContentAsIs {}.into(), - Some(te) => JsonExtractor::new(te.try_into()?).into(), - }, - Config::Metadata { transform } => MetadataExtractor::new(transform.try_into()?).into(), - Config::CsvRow { transform } => CsvRowExtractor::new(transform.try_into()?).into(), - }; - Ok(out) - } -} - -#[enum_dispatch] -#[derive(Debug)] -pub(crate) enum TransformerEnum { - JsonContentAsIs, - JsonExtractor, - MetadataExtractor, - CsvRowExtractor, -} - -#[enum_dispatch(TransformerEnum)] -pub(crate) trait Transformer { - //TODO return a common Iterator type (better than Vec) - async fn transform(&self, op: &Operator, entry: &Entry) -> Result>>; -} - -#[derive(Debug)] -pub(crate) struct JsonContentAsIs; - -impl Transformer for JsonContentAsIs { - async fn transform(&self, op: &Operator, entry: &Entry) -> Result>> { - let input = op.read(entry.path()).await?; - Ok(vec![input.to_vec()]) - } -} - -#[derive(Debug)] -pub(crate) struct MetadataExtractor { - texecutor: TExecutorEnum, -} - -impl MetadataExtractor { - fn new(texecutor: TExecutorEnum) -> Self { - Self { texecutor } - } -} - -impl Transformer for MetadataExtractor { - async fn transform(&self, op: &Operator, entry: &Entry) -> Result>> { - let metadata = extract_metadata(op, entry); - let data = json!({ - "metadata" : metadata, - }); - let output = self.texecutor.execute(data)?; - Ok(vec![output]) - } -} - -#[derive(Debug)] -pub(crate) struct JsonExtractor { - texecutor: TExecutorEnum, -} - -impl JsonExtractor { - fn new(texecutor: TExecutorEnum) -> Self { - Self { texecutor } - } -} - -impl Transformer for JsonExtractor { - async fn transform(&self, op: &Operator, entry: &Entry) -> Result>> { - let bytes = op.read(entry.path()).await?; - let metadata = extract_metadata(op, entry); - let content: serde_json::Value = serde_json::from_reader(bytes.reader())?; - let data = json!({ - "metadata" : metadata, - "content": content, - }); - let output = self.texecutor.execute(data)?; - Ok(vec![output]) - } -} - -#[derive(Debug)] -pub(crate) struct CsvRowExtractor { - texecutor: TExecutorEnum, -} - -impl CsvRowExtractor { - fn new(texecutor: TExecutorEnum) -> Self { - Self { texecutor } - } -} - -impl Transformer for CsvRowExtractor { - async fn transform(&self, op: &Operator, entry: &Entry) -> Result>> { - use csv::Reader; - - let bytes = op.read(entry.path()).await?; - let mut rdr = Reader::from_reader(bytes.reader()); - let headers = rdr.headers()?.clone(); - let metadata = extract_metadata(op, entry); - let mut out = Vec::new(); - for record in rdr.records() { - let record = record?; - let content = headers.iter().zip(record.iter()).collect::>(); - let data = json!({ - "metadata" : metadata.clone(), - "content": content, - }); - let output = self.texecutor.execute(data)?; - out.push(output); - } - Ok(out) - } -} - -fn extract_metadata(op: &Operator, entry: &Entry) -> Value { - json!({ - "name": entry.name(), - "path": entry.path(), - "root": op.info().root(), - "last_modified": entry.metadata().last_modified().map(|dt| dt.to_rfc3339()), - }) -} - -#[cfg(test)] -mod tests { - use std::path::Path; - - use crate::sources::opendal::texecutors::Hbs; - - use super::*; - use assert2::{check, let_assert}; - use chrono::prelude::*; - use futures::TryStreamExt; - use opendal::Metakey; - - async fn provide_op_entry(prefix: &str) -> (Operator, Entry) { - // Create fs backend builder. - let root = Path::new("examples/assets/opendal_fs"); - let builder = opendal::services::Fs::default().root(&root.to_string_lossy()); - let op: Operator = Operator::new(builder).unwrap().finish(); - let mut entries = op - .lister_with(prefix) - .metakey(Metakey::ContentLength | Metakey::LastModified) - .await - .unwrap(); - let_assert!(Ok(Some(entry)) = entries.try_next().await); - (op, entry) - } - - #[tokio::test] - async fn extract_metadata_works() { - let (op, entry) = provide_op_entry("dir1/file").await; - // Extract the metadata and check that it's what we expect - let result = extract_metadata(&op, &entry); - check!(result["name"] == "file01.txt"); - check!(result["path"] == "dir1/file01.txt"); - let_assert!(Some(abs_root) = result["root"].as_str()); - check!(abs_root.ends_with("examples/assets/opendal_fs")); - let_assert!( - Ok(_) = result["last_modified"].as_str().unwrap_or_default().parse::>() - ); - } - - #[tokio::test] - async fn csv_row_via_template_works() { - let (op, entry) = provide_op_entry("cdevents.").await; - let sut = - CsvRowExtractor::new(TExecutorEnum::from(Hbs::new(r#"{{content.env}}"#).unwrap())); - let_assert!(Ok(actual) = sut.transform(&op, &entry).await); - check!(actual.len() == 3); - check!(actual[0] == "dev".as_bytes()); - } -} diff --git a/cdviz-collector/src/sources/send_cdevents.rs b/cdviz-collector/src/sources/send_cdevents.rs new file mode 100644 index 0000000..1b0a1fa --- /dev/null +++ b/cdviz-collector/src/sources/send_cdevents.rs @@ -0,0 +1,27 @@ +use crate::errors::Result; +use crate::pipes::Pipe; +use crate::Message; +use cdevents_sdk::CDEvent; +use tokio::sync::broadcast::Sender; + +use super::EventSource; + +pub(crate) struct Processor { + next: Sender, +} + +impl Processor { + pub(crate) fn new(next: Sender) -> Self { + Self { next } + } +} + +impl Pipe for Processor { + type Input = EventSource; + fn send(&mut self, input: Self::Input) -> Result<()> { + let cdevent: CDEvent = serde_json::from_value(input.body)?; + // TODO include headers into message + self.next.send(cdevent.into())?; + Ok(()) + } +} diff --git a/cdviz-collector/src/sources/transformers.rs b/cdviz-collector/src/sources/transformers.rs new file mode 100644 index 0000000..f789526 --- /dev/null +++ b/cdviz-collector/src/sources/transformers.rs @@ -0,0 +1,48 @@ +use super::{hbs, EventSourcePipe}; +use crate::{errors::Result, pipes::discard_all, pipes::log, pipes::passthrough}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Deserialize, Serialize, Default)] +#[serde(tag = "type")] +pub(crate) enum Config { + #[serde(alias = "passthrough")] + #[default] + Passthrough, + #[serde(alias = "log")] + Log(log::Config), + #[serde(alias = "discrad_all")] + DiscardAll, + #[serde(alias = "hbs")] + Hbs { template: String }, + // #[serde(alias = "vrl")] + // Vrl(String), +} + +impl Config { + pub(crate) fn make_transformer(&self, next: EventSourcePipe) -> Result { + let out: EventSourcePipe = match &self { + Config::Passthrough => Box::new(passthrough::Processor::new(next)), + Config::Log(config) => Box::new(log::Processor::try_from(config, next)?), + Config::DiscardAll => Box::new(discard_all::Processor::new()), + Config::Hbs { template } => Box::new(hbs::Processor::new(template, next)?), + }; + Ok(out) + } +} + +// pub struct Identity { +// next: EventSourcePipe, +// } + +// impl Identity { +// fn new(next: EventSourcePipe) -> Self { +// Self { next } +// } +// } + +// impl Pipe for Identity { +// type Input = EventSource; +// fn send(&mut self, input: Self::Input) -> Result<()> { +// self.next.send(input) +// } +// }