Skip to content

Commit

Permalink
refactor: source flow with extractor and transformer (#131)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidB authored Sep 23, 2024
1 parent 4453113 commit 767ff57
Show file tree
Hide file tree
Showing 25 changed files with 584 additions and 427 deletions.
2 changes: 1 addition & 1 deletion cdviz-collector/.mise.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ OTEL_TRACES_SAMPLER = "always_off"

[tools]
task = '3' # to have a set of simple commands for repetive task (and CI)
rust = '1.80.1' # the rust tool stack (with cargo, fmt, clippy) to build source
rust = '1.81.0' # the rust tool stack (with cargo, fmt, clippy) to build source
binstall = '1.7.4' # do not use cargo-binstall (it's a special name used by mise)

[plugins]
Expand Down
3 changes: 2 additions & 1 deletion cdviz-collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ version = "0.1.0"
authors = ["David Bernard"]
license = "Apache-2.0"
repository = "https://github.com/davidB/cdviz"
rust-version = "1.80.1"
rust-version = "1.81.0"
publish = false

[dependencies]
async-trait = "0.1"
axum = { version = "0.7", optional = true }
axum-tracing-opentelemetry = { version = "0.21", optional = true }
bytes = { version = "1.7", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion cdviz-collector/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# see https://edu.chainguard.dev/chainguard/chainguard-images/reference/rust/image_specs/
# checkov:skip=CKV_DOCKER_7:Ensure the base image uses a non latest version tag
# trivy:ignore:AVD-DS-0001
FROM cgr.dev/chainguard/rust:1.80.1 as build
FROM cgr.dev/chainguard/rust:1.81.0 as build
ARG PROFILE=release
USER nonroot
WORKDIR /work
Expand Down
29 changes: 18 additions & 11 deletions cdviz-collector/examples/assets/cdviz-collector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,21 @@ pool_connections_min = 1
pool_connections_max = 10

[sources.cdevents_json]
transformers = []

[sources.cdevents_json.extractor]
type = "opendal"
kind = "fs"
polling_interval = "10s"
# parameters = { root = "../cdevents-spec/examples" }
parameters = { root = "./opendal_fs" }
recursive = true
path_patterns = ["**/*.json"]
transformer = { extractor = "json" }
parser = "json"

[sources.cdevents_csv]
type = "opendal"
kind = "fs"
polling_interval = "10s"
parameters = { root = "./opendal_fs" }
recursive = false
path_patterns = ["cdevents.csv"]
transformer = { extractor = "csv_row", transform = { format = "hbs", content = """
[[sources.cdevents_csv.transformers]]
type = "hbs"
template = """
{
"context": {
"version": "0.4.0-draft",
Expand All @@ -45,9 +43,18 @@ transformer = { extractor = "csv_row", transform = { format = "hbs", content = "
}
}
}
""" } }
"""

[sources.cdevents_csv.extractor]
type = "opendal"
kind = "fs"
polling_interval = "10s"
parameters = { root = "./opendal_fs" }
recursive = false
path_patterns = ["cdevents.csv"]
parser = "csv_row"

[sources.cdevents_webhook]
[sources.cdevents_webhook.extractor]
type = "http"
host = "0.0.0.0"
port = 8080
2 changes: 1 addition & 1 deletion cdviz-collector/rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "1.80.1"
channel = "1.81.0"
9 changes: 6 additions & 3 deletions cdviz-collector/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod errors;
mod pipes;
mod sinks;
mod sources;

Expand All @@ -9,7 +10,7 @@ use clap::Parser;
use clap_verbosity_flag::Verbosity;
use errors::{Error, Result};
use figment::{
providers::{Env, Format, Toml},
providers::{Env, Format, Serialized, Toml},
Figment,
};
use futures::future::TryJoinAll;
Expand All @@ -25,10 +26,12 @@ pub(crate) struct Cli {
verbose: clap_verbosity_flag::Verbosity,
}

#[derive(Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize, Default)]
pub(crate) struct Config {
sources: HashMap<String, sources::Config>,
sinks: HashMap<String, sinks::Config>,
// extractors: HashMap<String, sources::extractors::Config>,
// transformers: HashMap<String, sources::transformers::Config>,
}

type Sender<T> = tokio::sync::broadcast::Sender<T>;
Expand Down Expand Up @@ -85,7 +88,7 @@ async fn main() -> Result<()> {
if let Some(dir) = cli.config.parent() {
std::env::set_current_dir(dir)?;
}
let config: Config = Figment::new()
let config: Config = Figment::from(Serialized::defaults(Config::default()))
.merge(Toml::file(cli.config.as_path()))
.merge(Env::prefixed("CDVIZ_COLLECTOR_"))
.extract()?;
Expand Down
20 changes: 20 additions & 0 deletions cdviz-collector/src/pipes/discard_all.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use super::Pipe;
use crate::errors::Result;
use std::marker::PhantomData;

pub(crate) struct Processor<I> {
input_type: PhantomData<I>,
}

impl<I> Processor<I> {
pub(crate) fn new() -> Self {
Self { input_type: PhantomData }
}
}

impl<I> Pipe for Processor<I> {
type Input = I;
fn send(&mut self, _input: Self::Input) -> Result<()> {
Ok(())
}
}
38 changes: 38 additions & 0 deletions cdviz-collector/src/pipes/log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use super::Pipe;
use crate::errors::Result;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::marker::PhantomData;

#[derive(Clone, Debug, Deserialize, Serialize)]
pub(crate) struct Config {
target: String,
}

pub(crate) struct Processor<I, N> {
target: String,
next: N,
input_type: PhantomData<I>,
}

impl<I, N> Processor<I, N> {
pub(crate) fn new(target: String, next: N) -> Self {
Self { target, next, input_type: PhantomData }
}

pub(crate) fn try_from(config: &Config, next: N) -> Result<Self> {
Ok(Self::new(config.target.clone(), next))
}
}

impl<I, N> Pipe for Processor<I, N>
where
I: Debug,
N: Pipe<Input = I>,
{
type Input = I;
fn send(&mut self, input: Self::Input) -> Result<()> {
tracing::info!(target=self.target, input=?input);
self.next.send(input)
}
}
39 changes: 39 additions & 0 deletions cdviz-collector/src/pipes/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use crate::errors::Result;
pub mod discard_all;
pub mod log;
pub mod passthrough;

/// A pipe is an interface to implement processor for inputs.
/// The implementations can:
///
/// - discard / drop all inputs
/// - filter
/// - transform
/// - split
/// - retry
/// - timeout
/// - ...
///
/// The composition of Pipes to create pipeline could be done by configuration,
/// and the behavior of the pipe should be internal,
/// so chaining of pipes should not depends of method `map`, `fold`, `filter`,
/// `filter_map`, `drop`,... like for `Iterator`, `Stream`, `RxRust`.
/// Also being able to return Error to the sender could help the Sender to ease handling (vs `Stream`)
/// like retry, buffering, forward to its caller...
///
/// The approach and goal is similar to middleware used in some webframework
/// or in [tower](https://crates.io/crates/tower), Except it's not async.
/// Maybe if we need to support async, `Pipe` will become a specialization of tower's middleware,
/// like [axum](https://crates.io/crates/axum), [warp](https://crates.io/crates/warp), [tonic](https://crates.io/crates/tonic),... do.
pub trait Pipe {
type Input;

fn send(&mut self, input: Self::Input) -> Result<()>;
}

impl<I, T: Pipe<Input = I> + ?Sized> Pipe for Box<T> {
type Input = I;
fn send(&mut self, input: Self::Input) -> Result<()> {
T::send(self, input)
}
}
30 changes: 30 additions & 0 deletions cdviz-collector/src/pipes/passthrough.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use super::Pipe;
use crate::errors::Result;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::marker::PhantomData;

#[derive(Debug, Deserialize, Serialize, Default)]
pub(crate) struct Config {}

pub(crate) struct Processor<I, N> {
next: N,
input_type: PhantomData<I>,
}

impl<I, N> Processor<I, N> {
pub(crate) fn new(next: N) -> Self {
Self { next, input_type: PhantomData }
}
}

impl<I, N> Pipe for Processor<I, N>
where
I: Debug,
N: Pipe<Input = I>,
{
type Input = I;
fn send(&mut self, input: Self::Input) -> Result<()> {
self.next.send(input)
}
}
2 changes: 1 addition & 1 deletion cdviz-collector/src/sinks/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tracing::Instrument;
use super::Sink;

/// The database client config
#[derive(Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub(crate) struct Config {
/// The database url (with username, password and the database)
url: String,
Expand Down
2 changes: 1 addition & 1 deletion cdviz-collector/src/sinks/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::Message;

use super::Sink;

#[derive(Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize, Default)]
pub(crate) struct Config {}

impl TryFrom<Config> for DebugSink {
Expand Down
2 changes: 1 addition & 1 deletion cdviz-collector/src/sinks/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use reqwest_tracing::TracingMiddleware;

use super::Sink;

#[derive(Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub(crate) struct Config {
destination: Url,
}
Expand Down
8 changes: 7 additions & 1 deletion cdviz-collector/src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use db::DbSink;
use debug::DebugSink;
use http::HttpSink;

#[derive(Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(tag = "type")]
pub(crate) enum Config {
#[cfg(feature = "sink_db")]
Expand All @@ -26,6 +26,12 @@ pub(crate) enum Config {
Http(http::Config),
}

impl Default for Config {
fn default() -> Self {
Self::Debug(debug::Config {})
}
}

impl TryFrom<Config> for SinkEnum {
type Error = crate::errors::Error;

Expand Down
43 changes: 43 additions & 0 deletions cdviz-collector/src/sources/extractors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use super::{http, opendal, EventSourcePipe, Extractor};
use crate::errors::Result;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Deserialize, Serialize, Default)]
#[serde(tag = "type")]
pub(crate) enum Config {
#[serde(alias = "noop")]
#[default]
Sleep,
#[cfg(feature = "source_http")]
#[serde(alias = "http")]
Http(http::Config),
#[cfg(feature = "source_opendal")]
#[serde(alias = "opendal")]
Opendal(opendal::Config),
}

impl Config {
pub(crate) fn make_extractor(&self, next: EventSourcePipe) -> Result<Box<dyn Extractor>> {
let out: Box<dyn Extractor> = match self {
Config::Sleep => Box::new(SleepExtractor {}),
#[cfg(feature = "source_http")]
Config::Http(config) => Box::new(http::HttpExtractor::try_from(config, next)?),
#[cfg(feature = "source_opendal")]
Config::Opendal(config) => Box::new(opendal::OpendalExtractor::try_from(config, next)?),
};
Ok(out)
}
}

struct SleepExtractor {}

#[async_trait::async_trait]
impl Extractor for SleepExtractor {
async fn run(&mut self) -> Result<()> {
use std::future;

let future = future::pending();
let () = future.await;
unreachable!()
}
}
28 changes: 28 additions & 0 deletions cdviz-collector/src/sources/hbs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use super::{EventSource, EventSourcePipe};
use crate::errors::Result;
use crate::pipes::Pipe;
use handlebars::Handlebars;

pub(crate) struct Processor {
next: EventSourcePipe,
hbs: Handlebars<'static>,
}

impl Processor {
pub(crate) fn new(template: &str, next: EventSourcePipe) -> Result<Self> {
let mut hbs = Handlebars::new();
hbs.set_dev_mode(false);
hbs.set_strict_mode(true);
hbs.register_template_string("tpl", template)?;
Ok(Self { next, hbs })
}
}

impl Pipe for Processor {
type Input = EventSource;
fn send(&mut self, input: Self::Input) -> Result<()> {
let res = self.hbs.render("tpl", &input)?;
let output: EventSource = serde_json::from_str(&res)?;
self.next.send(output)
}
}
Loading

0 comments on commit 767ff57

Please sign in to comment.