diff --git a/cdviz-collector/src/main.rs b/cdviz-collector/src/main.rs index 3a3e5f8..f642bdf 100644 --- a/cdviz-collector/src/main.rs +++ b/cdviz-collector/src/main.rs @@ -1,4 +1,5 @@ mod errors; +mod pipes; mod sinks; mod sources; @@ -29,6 +30,8 @@ pub(crate) struct Cli { pub(crate) struct Config { sources: HashMap, sinks: HashMap, + extractors: HashMap, + transformers: HashMap, } type Sender = tokio::sync::broadcast::Sender; diff --git a/cdviz-collector/src/pipes/collect_to_vec.rs b/cdviz-collector/src/pipes/collect_to_vec.rs new file mode 100644 index 0000000..697aaa6 --- /dev/null +++ b/cdviz-collector/src/pipes/collect_to_vec.rs @@ -0,0 +1,36 @@ +use super::Pipe; +use crate::errors::Result; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; + +#[derive(Debug, Deserialize, Serialize)] +pub(crate) struct Config {} + +pub(crate) struct Processor { + buffer: Vec, +} + +impl Processor +where + I: Clone, +{ + fn new() -> Self { + Self { buffer: vec![] } + } + + fn try_from(_config: Config) -> Result { + Ok(Self::new()) + } + + fn collected(&self) -> Vec { + self.buffer.clone() + } +} + +impl Pipe for Processor { + type Input = I; + fn send(&mut self, input: Self::Input) -> Result<()> { + self.buffer.push(input); + Ok(()) + } +} diff --git a/cdviz-collector/src/pipes/discard_all.rs b/cdviz-collector/src/pipes/discard_all.rs new file mode 100644 index 0000000..47604e9 --- /dev/null +++ b/cdviz-collector/src/pipes/discard_all.rs @@ -0,0 +1,29 @@ +use super::Pipe; +use crate::errors::Result; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; +use std::marker::PhantomData; + +#[derive(Debug, Deserialize, Serialize)] +pub(crate) struct Config {} + +struct Processor { + input_type: PhantomData, +} + +impl Processor { + pub fn new() -> Self { + Self { input_type: PhantomData } + } + + pub fn try_from(_config: Config) -> Result { + Ok(Self::new()) + } +} + +impl Pipe for Processor { + type Input = I; + fn send(&mut self, _input: Self::Input) -> Result<()> { + Ok(()) + } +} diff --git a/cdviz-collector/src/pipes/log.rs b/cdviz-collector/src/pipes/log.rs new file mode 100644 index 0000000..1ebdfd5 --- /dev/null +++ b/cdviz-collector/src/pipes/log.rs @@ -0,0 +1,38 @@ +use super::Pipe; +use crate::errors::Result; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; +use std::marker::PhantomData; + +#[derive(Debug, Deserialize, Serialize)] +pub(crate) struct Config { + target: String, +} + +struct Processor { + target: String, + next: N, + input_type: PhantomData, +} + +impl Processor { + pub(crate) fn new(target: String, next: N) -> Self { + Self { target, next, input_type: PhantomData } + } + + pub(crate) fn try_from(config: Config, next: N) -> Result { + Ok(Self::new(config.target, next)) + } +} + +impl Pipe for Processor +where + I: Debug, + N: Pipe, +{ + type Input = I; + fn send(&mut self, input: Self::Input) -> Result<()> { + tracing::info!(target=self.target, input=?input); + self.next.send(input) + } +} diff --git a/cdviz-collector/src/pipes/mod.rs b/cdviz-collector/src/pipes/mod.rs new file mode 100644 index 0000000..59eae75 --- /dev/null +++ b/cdviz-collector/src/pipes/mod.rs @@ -0,0 +1,37 @@ +use crate::errors::Result; +pub mod collect_to_vec; +pub mod discard_all; +pub mod log; + +/// A pipe is an interface to implement processor for inputs. +/// The implementations can: +/// - discard / drop all inputs +/// - filter +/// - transform +/// - split +/// - retry +/// - timeout +/// - ... +/// The composition of Pipes to create pipeline could be done by configuration, +/// and the behavior of the pipe should be internal, +/// so chaining of pipes should not depends of method `map`, `fold`, `filter`, +/// `filter_map`, `drop`,... like for `Iterator`, `Stream`, `RxRust`. +/// Also being able to return Error to the sender could help the Sender to ease handling (vs `Stream`) +/// like retry, buffering, forward to its caller... +/// +/// The approach and goal is similar to middleware used in some webframework +/// or in [tower](https://crates.io/crates/tower), Except it's not async. +/// Maybe if we need to support async, `Pipe` will become a specialization of tower's middleware, +/// like [axum](https://crates.io/crates/axum), [warp](https://crates.io/crates/warp), [tonic](https://crates.io/crates/tonic),... do. +pub trait Pipe { + type Input; + + fn send(&mut self, input: Self::Input) -> Result<()>; +} + +impl + ?Sized> Pipe for Box { + type Input = I; + fn send(&mut self, input: Self::Input) -> Result<()> { + T::send(self, input) + } +} diff --git a/cdviz-collector/src/sources/extractors.rs b/cdviz-collector/src/sources/extractors.rs new file mode 100644 index 0000000..dc761bf --- /dev/null +++ b/cdviz-collector/src/sources/extractors.rs @@ -0,0 +1,26 @@ +use super::{http, opendal, EventSourcePipe, Extractor}; +use crate::errors::Result; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(tag = "kind")] +pub(crate) enum Config { + #[cfg(feature = "source_http")] + #[serde(alias = "http")] + Http(http::Config), + #[cfg(feature = "source_opendal")] + #[serde(alias = "opendal")] + Opendal(opendal::Config), +} + +impl Config { + fn into_extractor(&self, next: EventSourcePipe) -> Result> { + let out = match self { + // #[cfg(feature = "source_http")] + // Config::Http(config) => Box::new(http::HttpExtractor::try_from(config, next)?), + // #[cfg(feature = "source_opendal")] + // Config::Opendal(config) => OpendalSource::try_from(config)?.into(), + }; + Ok(out) + } +} diff --git a/cdviz-collector/src/sources/hbs.rs b/cdviz-collector/src/sources/hbs.rs new file mode 100644 index 0000000..a2e4cdb --- /dev/null +++ b/cdviz-collector/src/sources/hbs.rs @@ -0,0 +1,39 @@ +use super::{EventSource, EventSourcePipe}; +use crate::errors::Result; +use crate::pipes::Pipe; +use handlebars::Handlebars; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; + +#[derive(Debug, Deserialize, Serialize)] +pub(crate) struct Config { + template: String, +} + +pub(crate) struct Processor { + next: EventSourcePipe, + hbs: Handlebars<'static>, +} + +impl Processor { + pub(crate) fn new(template: &str, next: EventSourcePipe) -> Result { + let mut hbs = Handlebars::new(); + hbs.set_dev_mode(false); + hbs.set_strict_mode(true); + hbs.register_template_string("tpl", template)?; + Ok(Self { next, hbs }) + } + + pub(crate) fn try_from(config: Config, next: EventSourcePipe) -> Result { + Self::new(&config.template, next) + } +} + +impl Pipe for Processor { + type Input = EventSource; + fn send(&mut self, input: Self::Input) -> Result<()> { + let res = self.hbs.render("tpl", &input)?; + let output: EventSource = serde_json::from_str(&res)?; + self.next.send(output) + } +} diff --git a/cdviz-collector/src/sources/http.rs b/cdviz-collector/src/sources/http.rs index 48b758d..535a1ed 100644 --- a/cdviz-collector/src/sources/http.rs +++ b/cdviz-collector/src/sources/http.rs @@ -1,7 +1,7 @@ -use super::Source; +use super::{EventSourcePipe, Extractor}; use crate::{ errors::{self, Error}, - Message, Sender, + sources::EventSource, }; use axum::{ extract::State, @@ -11,11 +11,14 @@ use axum::{ Json, Router, }; use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer}; -use cdevents_sdk::CDEvent; use errors::Result; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::net::{IpAddr, SocketAddr}; +use std::{ + collections::HashMap, + net::{IpAddr, SocketAddr}, + sync::{Arc, Mutex}, +}; /// The http server config #[derive(Clone, Debug, Deserialize, Serialize)] @@ -29,26 +32,24 @@ pub(crate) struct Config { pub(crate) port: u16, } -impl TryFrom for HttpSource { - type Error = crate::errors::Error; - - fn try_from(value: Config) -> Result { - Ok(HttpSource { config: value.clone() }) - } -} - -pub(crate) struct HttpSource { +pub(crate) struct HttpExtractor { config: Config, + next: Arc>, } +impl HttpExtractor { + pub fn try_from(value: &Config, next: EventSourcePipe) -> Result { + Ok(HttpExtractor { config: value.clone(), next: Arc::new(Mutex::new(next)) }) + } +} #[derive(Clone)] struct AppState { - tx: Sender, + next: Arc>, } -impl Source for HttpSource { - async fn run(&mut self, tx: Sender) -> Result<()> { - let app_state = AppState { tx }; +impl Extractor for HttpExtractor { + async fn run(&mut self) -> Result<()> { + let app_state = AppState { next: Arc::clone(&self.next) }; let app = app().with_state(app_state); // run it @@ -64,10 +65,11 @@ impl Source for HttpSource { } } +//TODO make route per extractor/sources fn app() -> Router { // build our application with a route Router::new() - .route("/cdevents", post(cdevents_collect)) + .route("/cdevents", post(events_collect)) // include trace context as header into the response .layer(OtelInResponseLayer) //start OpenTelemetry trace on incoming request @@ -80,17 +82,20 @@ async fn health() -> impl IntoResponse { http::StatusCode::OK } -//TODO validate format of cdevents JSON //TODO support events in cloudevents format (extract info from headers) //TODO try [deser](https://crates.io/crates/deserr) to return good error //TODO use cloudevents -#[tracing::instrument(skip(app_state, payload))] -async fn cdevents_collect( +//TODO add metadata & headers info into SourceEvent +//TODO log & convert error +#[tracing::instrument(skip(app_state, body))] +async fn events_collect( State(app_state): State, - Json(payload): Json, + Json(body): Json, ) -> Result { - tracing::trace!("received cloudevent {:?}", &payload); - app_state.tx.send(Message::from(payload))?; + tracing::trace!("received {:?}", &body); + let event = EventSource { metadata: serde_json::Value::Null, header: HashMap::new(), body }; + let mut next = app_state.next.lock().unwrap(); + next.as_mut().send(event)?; Ok(http::StatusCode::CREATED) } diff --git a/cdviz-collector/src/sources/mod.rs b/cdviz-collector/src/sources/mod.rs index cc44c99..56a179a 100644 --- a/cdviz-collector/src/sources/mod.rs +++ b/cdviz-collector/src/sources/mod.rs @@ -1,59 +1,34 @@ +pub(crate) mod extractors; +mod hbs; #[cfg(feature = "source_http")] pub(crate) mod http; -pub(crate) mod noop; #[cfg(feature = "source_opendal")] -pub(crate) mod opendal; +mod opendal; +mod send_cdevents; +pub(crate) mod transformers; + +use std::collections::HashMap; use crate::errors::Result; +use crate::pipes::Pipe; use crate::{Message, Sender}; use enum_dispatch::enum_dispatch; -#[cfg(feature = "source_http")] -use http::HttpSource; -use noop::NoopSource; #[cfg(feature = "source_opendal")] use opendal::OpendalSource; use serde::{Deserialize, Serialize}; +use serde_json::Value; use tokio::task::JoinHandle; -#[derive(Debug, Deserialize, Serialize)] -#[serde(tag = "type")] -pub(crate) enum Config { - #[cfg(feature = "source_http")] - #[serde(alias = "http")] - Http(http::Config), - #[serde(alias = "noop")] - Noop(noop::Config), - #[cfg(feature = "source_opendal")] - #[serde(alias = "opendal")] - Opendal(opendal::Config), -} - -impl TryFrom for SourceEnum { - type Error = crate::errors::Error; - - fn try_from(value: Config) -> Result { - let out = match value { - #[cfg(feature = "source_http")] - Config::Http(config) => HttpSource::try_from(config)?.into(), - Config::Noop(config) => NoopSource::try_from(config)?.into(), - #[cfg(feature = "source_opendal")] - Config::Opendal(config) => OpendalSource::try_from(config)?.into(), - }; - Ok(out) - } -} - -#[enum_dispatch] -#[allow(clippy::enum_variant_names, clippy::large_enum_variant)] -enum SourceEnum { - #[cfg(feature = "source_http")] - HttpSource, - NoopSource, - #[cfg(feature = "source_opendal")] - OpendalSource, -} +// #[enum_dispatch] +// #[allow(clippy::enum_variant_names, clippy::large_enum_variant)] +// enum SourceEnum { +// #[cfg(feature = "source_http")] +// HttpSource, +// NoopSource, +// #[cfg(feature = "source_opendal")] +// OpendalSource, +// } -#[enum_dispatch(SourceEnum)] trait Source { async fn run(&mut self, tx: Sender) -> Result<()>; } @@ -67,49 +42,53 @@ pub(crate) fn start(_name: String, config: Config, tx: Sender) -> JoinH } #[derive(Debug, Deserialize, Serialize)] -pub struct EventPipeline { - metadata: Json::Value, +pub struct EventSource { + metadata: Value, header: HashMap, - body: Json::Value, + body: Value, } -pub struct SourcePipeline { - extractor: Extractor, - transformers: Vec, -} +// TODO explore to use enum_dispatch instead of Box(dyn) on EventSourcePipe (a recursive structure) +type EventSourcePipe = Box + Send>; -trait Extractor { - async fn try_next(&mut self) -> Option>; +pub trait Extractor { + async fn run(&mut self) -> Result<()>; } -impl SourcePipeline { - fn new(extractor: Extractor, transformers: Vec) -> Self { - Self { - extractor, - transformers, - } - } -} +// pub struct SourcePipeline { +// extractor: Extractor, +// transformers: Vec, +// } -impl Source for SourcePipeline { - // TODO optimize avoid using vector and wave (eg replace by stream pipeline, rx for rust ? (stream/visitor flow are harder to test) - // TODO avoid crash on Error - // Poll from extractor or provide a function "push" to extractor? - async fn run(&mut self, tx: Sender) -> Result<()> { - while let Some(event) = self.extractor.try_next().await? { - let mut events = vec![event]; - for transformer in transformers { - let mut next_events = vec![]; - for e in events { - next_events.push_all(transformer.process(event)?); - } - events = next_events; - } - for e in events { - let cdevent: CDEvent = serde_json::from_slice::(&e.body)?; - // TODO include headers into message - tx.send(cdevent.into())?; - } - } - } -} +// trait Extractor { +// async fn try_next(&mut self) -> Option>; +// } + +// impl SourcePipeline { +// fn new(extractor: Extractor, transformers: Vec) -> Self { +// Self { extractor, transformers } +// } +// } + +// impl Source for SourcePipeline { +// // TODO optimize avoid using vector and wave (eg replace by stream pipeline, rx for rust ? (stream/visitor flow are harder to test) +// // TODO avoid crash on Error +// // Poll from extractor or provide a function "push" to extractor? +// async fn run(&mut self, tx: Sender) -> Result<()> { +// while let Some(event) = self.extractor.try_next().await? { +// let mut events = vec![event]; +// for transformer in transformers { +// let mut next_events = vec![]; +// for e in events { +// next_events.push_all(transformer.process(event)?); +// } +// events = next_events; +// } +// for e in events { +// let cdevent: CDEvent = serde_json::from_slice::(&e.body)?; +// // TODO include headers into message +// tx.send(cdevent.into())?; +// } +// } +// } +// } diff --git a/cdviz-collector/src/sources/noop.rs b/cdviz-collector/src/sources/noop.rs deleted file mode 100644 index df43be9..0000000 --- a/cdviz-collector/src/sources/noop.rs +++ /dev/null @@ -1,29 +0,0 @@ -use crate::errors::Result; -use crate::{Message, Sender}; -use serde::Deserialize; -use serde::Serialize; -use std::time::Duration; -use tokio::time::sleep; - -use super::Source; - -#[derive(Debug, Deserialize, Serialize)] -pub(crate) struct Config {} - -impl TryFrom for NoopSource { - type Error = crate::errors::Error; - - fn try_from(_value: Config) -> Result { - Ok(Self {}) - } -} - -pub(crate) struct NoopSource {} - -impl Source for NoopSource { - async fn run(&mut self, _tx: Sender) -> Result<()> { - loop { - sleep(Duration::MAX).await; - } - } -} diff --git a/cdviz-collector/src/sources/send_cdevents.rs b/cdviz-collector/src/sources/send_cdevents.rs new file mode 100644 index 0000000..db3710e --- /dev/null +++ b/cdviz-collector/src/sources/send_cdevents.rs @@ -0,0 +1,27 @@ +use crate::errors::Result; +use crate::pipes::Pipe; +use crate::Message; +use cdevents_sdk::CDEvent; +use tokio::sync::broadcast::Sender; + +use super::EventSource; + +struct Processor { + next: Sender, +} + +impl Processor { + pub(crate) fn new(next: Sender) -> Self { + Self { next } + } +} + +impl Pipe for Processor { + type Input = EventSource; + fn send(&mut self, input: Self::Input) -> Result<()> { + let cdevent: CDEvent = serde_json::from_value(input.body)?; + // TODO include headers into message + self.next.send(cdevent.into())?; + Ok(()) + } +} diff --git a/cdviz-collector/src/sources/transformer.rs b/cdviz-collector/src/sources/transformer.rs deleted file mode 100644 index de29004..0000000 --- a/cdviz-collector/src/sources/transformer.rs +++ /dev/null @@ -1,64 +0,0 @@ -use crate::errors::Result; -use enum_dispatch::enum_dispatch; -use handlebars::Handlebars; -use serde::{Deserialize, Serialize}; -use serde_json::Value; - -#[derive(Debug, Deserialize, Serialize)] -#[serde(tag = "format", content = "content")] -pub(crate) enum TransformerConfig { - #[serde(alias = "hbs")] - Hbs(String), - // #[serde(alias = "vrl")] - // Vrl(String), -} - -impl TryFrom for TransformerEnum { - type Error = crate::errors::Error; - - fn try_from(value: TransformerConfig) -> Result { - let out = match value { - TransformerConfig::Hbs(template) => Hbs::new(&template)?.into(), - }; - Ok(out) - } -} - -#[enum_dispatch] -#[derive(Debug)] -pub(crate) enum TransformerEnum { - Hbs, -} - - -#[enum_dispatch(TransformerEnum)] -pub(crate) trait Transformer { - //TODO return a common Iterator or stream type (better than Vec) - // transform the event - // return a list of events, empty if event to remove/filter out, severals event for 1-to-many, - // single for 1-to-1 (and same as input of no transformation to apply) - fn process(&self, e: EventPipeline) -> Result>; -} - -#[derive(Debug)] -pub(crate) struct Hbs { - hbs: Handlebars<'static>, -} - -impl Hbs { - pub(crate) fn new(template: &str) -> Result { - let mut hbs = Handlebars::new(); - hbs.set_dev_mode(false); - hbs.set_strict_mode(true); - hbs.register_template_string("tpl", template)?; - Ok(Self { hbs }) - } -} - -impl Transformer for Hbs { - fn process(&self, e: EventPipeline) -> Result> { - let bytes = self.hbs.render("tpl", &e)?.into_bytes(); - serde_json::from_slice::>(&bytes)?; - Ok() - } -} diff --git a/cdviz-collector/src/sources/transformers.rs b/cdviz-collector/src/sources/transformers.rs new file mode 100644 index 0000000..042fbd2 --- /dev/null +++ b/cdviz-collector/src/sources/transformers.rs @@ -0,0 +1,21 @@ +use super::{hbs, EventSourcePipe}; +use crate::errors::Result; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(tag = "kind")] +pub(crate) enum Config { + #[serde(alias = "hbs")] + Hbs { template: String }, + // #[serde(alias = "vrl")] + // Vrl(String), +} + +impl Config { + fn into_transformer(&self, next: EventSourcePipe) -> Result { + let out = match &self { + Config::Hbs { template } => Box::new(hbs::Processor::new(&template, next)?), + }; + Ok(out) + } +}