diff --git a/cdviz-collector/src/sources/mod.rs b/cdviz-collector/src/sources/mod.rs index 1df4768..cc44c99 100644 --- a/cdviz-collector/src/sources/mod.rs +++ b/cdviz-collector/src/sources/mod.rs @@ -65,3 +65,51 @@ pub(crate) fn start(_name: String, config: Config, tx: Sender) -> JoinH Ok(()) }) } + +#[derive(Debug, Deserialize, Serialize)] +pub struct EventPipeline { + metadata: Json::Value, + header: HashMap, + body: Json::Value, +} + +pub struct SourcePipeline { + extractor: Extractor, + transformers: Vec, +} + +trait Extractor { + async fn try_next(&mut self) -> Option>; +} + +impl SourcePipeline { + fn new(extractor: Extractor, transformers: Vec) -> Self { + Self { + extractor, + transformers, + } + } +} + +impl Source for SourcePipeline { + // TODO optimize avoid using vector and wave (eg replace by stream pipeline, rx for rust ? (stream/visitor flow are harder to test) + // TODO avoid crash on Error + // Poll from extractor or provide a function "push" to extractor? + async fn run(&mut self, tx: Sender) -> Result<()> { + while let Some(event) = self.extractor.try_next().await? { + let mut events = vec![event]; + for transformer in transformers { + let mut next_events = vec![]; + for e in events { + next_events.push_all(transformer.process(event)?); + } + events = next_events; + } + for e in events { + let cdevent: CDEvent = serde_json::from_slice::(&e.body)?; + // TODO include headers into message + tx.send(cdevent.into())?; + } + } + } +} diff --git a/cdviz-collector/src/sources/transformer.rs b/cdviz-collector/src/sources/transformer.rs new file mode 100644 index 0000000..de29004 --- /dev/null +++ b/cdviz-collector/src/sources/transformer.rs @@ -0,0 +1,64 @@ +use crate::errors::Result; +use enum_dispatch::enum_dispatch; +use handlebars::Handlebars; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(tag = "format", content = "content")] +pub(crate) enum TransformerConfig { + #[serde(alias = "hbs")] + Hbs(String), + // #[serde(alias = "vrl")] + // Vrl(String), +} + +impl TryFrom for TransformerEnum { + type Error = crate::errors::Error; + + fn try_from(value: TransformerConfig) -> Result { + let out = match value { + TransformerConfig::Hbs(template) => Hbs::new(&template)?.into(), + }; + Ok(out) + } +} + +#[enum_dispatch] +#[derive(Debug)] +pub(crate) enum TransformerEnum { + Hbs, +} + + +#[enum_dispatch(TransformerEnum)] +pub(crate) trait Transformer { + //TODO return a common Iterator or stream type (better than Vec) + // transform the event + // return a list of events, empty if event to remove/filter out, severals event for 1-to-many, + // single for 1-to-1 (and same as input of no transformation to apply) + fn process(&self, e: EventPipeline) -> Result>; +} + +#[derive(Debug)] +pub(crate) struct Hbs { + hbs: Handlebars<'static>, +} + +impl Hbs { + pub(crate) fn new(template: &str) -> Result { + let mut hbs = Handlebars::new(); + hbs.set_dev_mode(false); + hbs.set_strict_mode(true); + hbs.register_template_string("tpl", template)?; + Ok(Self { hbs }) + } +} + +impl Transformer for Hbs { + fn process(&self, e: EventPipeline) -> Result> { + let bytes = self.hbs.render("tpl", &e)?.into_bytes(); + serde_json::from_slice::>(&bytes)?; + Ok() + } +}