Skip to content

Commit

Permalink
♻️ cdviz-collector prepare to have transformer
Browse files Browse the repository at this point in the history
  • Loading branch information
davidB committed Apr 7, 2024
1 parent 6e8e8b3 commit 466dfd8
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 15 deletions.
4 changes: 3 additions & 1 deletion cdviz-collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ cdevents-sdk = { git = "https://github.com/cdevents/sdk-rust" }
chrono = "0.4"
clap = { version = "4", features = ["derive", "env"] }
clap-verbosity-flag = "2.2.0"
csv = { version = "1", optional = true }
enum_dispatch = "0.3"
figment = { version = "0.10", features = ["toml", "env"] }
futures = "0.3"
globset = { version = "0.4", optional = true }
handlebars = { version = "5.1", optional = true }
humantime-serde = "1.1.1"
init-tracing-opentelemetry = { version = "0.18", features = [
"otlp",
Expand Down Expand Up @@ -68,7 +70,7 @@ tracing-subscriber = "0.3"
default = ["source_http", "source_opendal", "sink_db"]
sink_db = ["dep:sqlx"]
source_http = ["dep:axum", "dep:axum-tracing-opentelemetry"]
source_opendal = ["dep:opendal", "dep:globset"]
source_opendal = ["dep:opendal", "dep:globset", "dep:csv", "dep:handlebars"]

[package.metadata.release]
pre-release-commit-message = "🚀 (cargo-release) version {{version}}"
Expand Down
1 change: 1 addition & 0 deletions cdviz-collector/cdviz-collector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ polling_interval = "10s"
parameters = { root = "../cdevents-spec/examples" }
recursive = true
path_patterns = ["**/*.json"]
transformer = { type = "identity" }

[sources.cdevents_webhook]
type = "http"
Expand Down
44 changes: 30 additions & 14 deletions cdviz-collector/src/sources/opendal/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
//TODO add persistance for state (time window to not reprocess same file after restart)

mod filter;
mod transformers;

use crate::errors::{Error, Result};
use self::filter::{globset_from, Filter};
use self::transformers::{Transformer, TransformerEnum};
use super::Source;
use crate::errors::Result;
use crate::{Message, Sender};
use cdevents_sdk::CDEvent;
use filter::{globset_from, Filter};
use futures::TryStreamExt;
use opendal::Entry;
use opendal::Metakey;
use opendal::Operator;
use opendal::Scheme;
Expand All @@ -17,11 +21,6 @@ use std::time::Duration;
use tokio::time::sleep;
use tracing::instrument;

use super::Source;

//TODO add persistance for state (time window to not reprocess same file after restart)
//TODO add transformer: identity, csv+template -> bunch, jsonl

#[serde_as]
#[derive(Debug, Deserialize, Serialize, Default)]
pub(crate) struct Config {
Expand All @@ -32,6 +31,7 @@ pub(crate) struct Config {
parameters: HashMap<String, String>,
recursive: bool,
path_patterns: Vec<String>,
transformer: transformers::Config,
}

impl TryFrom<Config> for OpendalSource {
Expand All @@ -40,11 +40,13 @@ impl TryFrom<Config> for OpendalSource {
fn try_from(value: Config) -> Result<Self> {
let op: Operator = Operator::via_map(value.kind, value.parameters)?;
let filter = Filter::from_patterns(globset_from(&value.path_patterns)?);
let transformer = value.transformer.try_into()?;
Ok(Self {
op,
polling_interval: value.polling_interval,
recursive: value.recursive,
filter,
transformer,
})
}
}
Expand All @@ -54,12 +56,21 @@ pub(crate) struct OpendalSource {
polling_interval: Duration,
recursive: bool,
filter: Filter,
transformer: TransformerEnum,
}

impl Source for OpendalSource {
async fn run(&mut self, tx: Sender<Message>) -> Result<()> {
loop {
if let Err(err) = run_once(&tx, &self.op, &self.filter, self.recursive).await {
if let Err(err) = run_once(
&tx,
&self.op,
&self.filter,
self.recursive,
&self.transformer,
)
.await
{
tracing::warn!(?err, filter = ?self.filter, scheme =? self.op.info().scheme(), root =? self.op.info().root(), "fail during scanning");
}
sleep(self.polling_interval).await;
Expand All @@ -74,6 +85,7 @@ pub(crate) async fn run_once(
op: &Operator,
filter: &Filter,
recursive: bool,
transformer: &TransformerEnum,
) -> Result<()> {
// TODO convert into arg of instrument
tracing::debug!(filter=? filter, scheme =? op.info().scheme(), root =? op.info().root(), "scanning");
Expand All @@ -85,16 +97,20 @@ pub(crate) async fn run_once(
.await?;
while let Some(entry) = lister.try_next().await? {
if filter.accept(&entry) {
if let Err(err) = process_entry(tx, op, &entry).await {
if let Err(err) = process_entry(tx, transformer.transform(op, &entry).await?) {
tracing::warn!(?err, path = entry.path(), "fail to process, skip")
}
}
}
Ok(())
}

async fn process_entry(tx: &Sender<Message>, op: &Operator, entry: &Entry) -> Result<usize> {
let read = op.read(entry.path()).await?;
let cdevent: CDEvent = serde_json::from_slice::<CDEvent>(&read)?;
tx.send(cdevent.into()).map_err(Error::from)
fn process_entry(tx: &Sender<Message>, provider: impl Iterator<Item = Vec<u8>>) -> Result<usize> {
let mut count = 0;
for json in provider {
let cdevent: CDEvent = serde_json::from_slice::<CDEvent>(&json)?;
tx.send(cdevent.into())?;
count += 1;
}
Ok(count)
}
55 changes: 55 additions & 0 deletions cdviz-collector/src/sources/opendal/transformers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//TODO add transformer: identity, csv+template -> bunch, jsonl

use crate::errors::Result;
use enum_dispatch::enum_dispatch;
use opendal::{Entry, Operator};
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize, Default)]
#[serde(tag = "type")]
pub(crate) enum Config {
#[serde(alias = "identity")]
#[default]
Identity,
}

impl TryFrom<Config> for TransformerEnum {
type Error = crate::errors::Error;

fn try_from(value: Config) -> Result<Self> {
let out = match value {
Config::Identity => Identity {}.into(),
};
Ok(out)
}
}

#[enum_dispatch]
#[allow(clippy::enum_variant_names)]
#[derive(Debug)]
pub(crate) enum TransformerEnum {
Identity,
}

#[enum_dispatch(TransformerEnum)]
pub(crate) trait Transformer {
async fn transform(
&self,
op: &Operator,
entry: &Entry,
) -> Result<impl Iterator<Item = Vec<u8>>>;
}

#[derive(Debug)]
pub(crate) struct Identity;

impl Transformer for Identity {
async fn transform(
&self,
op: &Operator,
entry: &Entry,
) -> Result<impl Iterator<Item = Vec<u8>>> {
let input = op.read(entry.path()).await?;
Ok(Some(input).into_iter())
}
}

0 comments on commit 466dfd8

Please sign in to comment.