Skip to content

Commit

Permalink
wip: sourcepipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
davidB committed Aug 20, 2024
1 parent 487703b commit 86a1e6a
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 0 deletions.
48 changes: 48 additions & 0 deletions cdviz-collector/src/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,51 @@ pub(crate) fn start(_name: String, config: Config, tx: Sender<Message>) -> JoinH
Ok(())
})
}

#[derive(Debug, Deserialize, Serialize)]
pub struct EventPipeline {
metadata: Json::Value,
header: HashMap<String, String>,
body: Json::Value,
}

pub struct SourcePipeline {
extractor: Extractor,
transformers: Vec<TransformerEnum>,
}

trait Extractor {
async fn try_next(&mut self) -> Option<Vec<EventPipeline>>;
}

impl SourcePipeline {
fn new(extractor: Extractor, transformers: Vec<Transformer>) -> Self {
Self {
extractor,
transformers,
}
}
}

impl Source for SourcePipeline {
// TODO optimize avoid using vector and wave (eg replace by stream pipeline, rx for rust ? (stream/visitor flow are harder to test)
// TODO avoid crash on Error
// Poll from extractor or provide a function "push" to extractor?
async fn run(&mut self, tx: Sender<Message>) -> Result<()> {
while let Some(event) = self.extractor.try_next().await? {
let mut events = vec![event];
for transformer in transformers {
let mut next_events = vec![];
for e in events {
next_events.push_all(transformer.process(event)?);
}
events = next_events;
}
for e in events {
let cdevent: CDEvent = serde_json::from_slice::<CDEvent>(&e.body)?;
// TODO include headers into message
tx.send(cdevent.into())?;
}
}
}
}
64 changes: 64 additions & 0 deletions cdviz-collector/src/sources/transformer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use crate::errors::Result;
use enum_dispatch::enum_dispatch;
use handlebars::Handlebars;
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, Deserialize, Serialize)]
#[serde(tag = "format", content = "content")]
pub(crate) enum TransformerConfig {
#[serde(alias = "hbs")]
Hbs(String),
// #[serde(alias = "vrl")]
// Vrl(String),
}

impl TryFrom<TransformerConfig> for TransformerEnum {
type Error = crate::errors::Error;

fn try_from(value: TransformerConfig) -> Result<Self> {
let out = match value {
TransformerConfig::Hbs(template) => Hbs::new(&template)?.into(),
};
Ok(out)
}
}

#[enum_dispatch]
#[derive(Debug)]
pub(crate) enum TransformerEnum {
Hbs,
}


#[enum_dispatch(TransformerEnum)]
pub(crate) trait Transformer {
//TODO return a common Iterator or stream type (better than Vec)
// transform the event
// return a list of events, empty if event to remove/filter out, severals event for 1-to-many,
// single for 1-to-1 (and same as input of no transformation to apply)
fn process(&self, e: EventPipeline) -> Result<Vec<EventPipeline>>;
}

#[derive(Debug)]
pub(crate) struct Hbs {
hbs: Handlebars<'static>,
}

impl Hbs {
pub(crate) fn new(template: &str) -> Result<Self> {
let mut hbs = Handlebars::new();
hbs.set_dev_mode(false);
hbs.set_strict_mode(true);
hbs.register_template_string("tpl", template)?;
Ok(Self { hbs })
}
}

impl Transformer for Hbs {
fn process(&self, e: EventPipeline) -> Result<Vec<EventPipeline>> {
let bytes = self.hbs.render("tpl", &e)?.into_bytes();
serde_json::from_slice::<Vec<EventPipeline>>(&bytes)?;
Ok()
}
}

0 comments on commit 86a1e6a

Please sign in to comment.