Skip to content

Commit

Permalink
wip: sourcepipeline (4)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidB committed Sep 23, 2024
1 parent 4a39ac1 commit 0ba77e3
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 52 deletions.
29 changes: 18 additions & 11 deletions cdviz-collector/examples/assets/cdviz-collector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,21 @@ pool_connections_min = 1
pool_connections_max = 10

[sources.cdevents_json]
transformers = []

[sources.cdevents_json.extractor]
type = "opendal"
kind = "fs"
polling_interval = "10s"
# parameters = { root = "../cdevents-spec/examples" }
parameters = { root = "./opendal_fs" }
recursive = true
path_patterns = ["**/*.json"]
transformer = { extractor = "json" }
parser = "json"

[sources.cdevents_csv]
type = "opendal"
kind = "fs"
polling_interval = "10s"
parameters = { root = "./opendal_fs" }
recursive = false
path_patterns = ["cdevents.csv"]
transformer = { extractor = "csv_row", transform = { format = "hbs", content = """
[[sources.cdevents_csv.transformers]]
type = "hbs"
template = """
{
"context": {
"version": "0.4.0-draft",
Expand All @@ -45,9 +43,18 @@ transformer = { extractor = "csv_row", transform = { format = "hbs", content = "
}
}
}
""" } }
"""

[sources.cdevents_csv.extractor]
type = "opendal"
kind = "fs"
polling_interval = "10s"
parameters = { root = "./opendal_fs" }
recursive = false
path_patterns = ["cdevents.csv"]
parser = "csv_row"

[sources.cdevents_webhook]
[sources.cdevents_webhook.extractor]
type = "http"
host = "0.0.0.0"
port = 8080
10 changes: 5 additions & 5 deletions cdviz-collector/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use clap::Parser;
use clap_verbosity_flag::Verbosity;
use errors::{Error, Result};
use figment::{
providers::{Env, Format, Toml},
providers::{Env, Format, Serialized, Toml},
Figment,
};
use futures::future::TryJoinAll;
Expand All @@ -26,12 +26,12 @@ pub(crate) struct Cli {
verbose: clap_verbosity_flag::Verbosity,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize, Default)]
pub(crate) struct Config {
sources: HashMap<String, sources::Config>,
sinks: HashMap<String, sinks::Config>,
extractors: HashMap<String, sources::extractors::Config>,
transformers: HashMap<String, sources::transformers::Config>,
// extractors: HashMap<String, sources::extractors::Config>,
// transformers: HashMap<String, sources::transformers::Config>,
}

type Sender<T> = tokio::sync::broadcast::Sender<T>;
Expand Down Expand Up @@ -88,7 +88,7 @@ async fn main() -> Result<()> {
if let Some(dir) = cli.config.parent() {
std::env::set_current_dir(dir)?;
}
let config: Config = Figment::new()
let config: Config = Figment::from(Serialized::defaults(Config::default()))
.merge(Toml::file(cli.config.as_path()))
.merge(Env::prefixed("CDVIZ_COLLECTOR_"))
.extract()?;
Expand Down
13 changes: 2 additions & 11 deletions cdviz-collector/src/pipes/discard_all.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
use super::Pipe;
use crate::errors::Result;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::marker::PhantomData;

#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct Config {}

struct Processor<I> {
pub(crate) struct Processor<I> {
input_type: PhantomData<I>,
}

impl<I> Processor<I> {
pub fn new() -> Self {
pub(crate) fn new() -> Self {
Self { input_type: PhantomData }
}

pub fn try_from(_config: Config) -> Result<Self> {
Ok(Self::new())
}
}

impl<I> Pipe for Processor<I> {
Expand Down
8 changes: 4 additions & 4 deletions cdviz-collector/src/pipes/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::marker::PhantomData;

#[derive(Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub(crate) struct Config {
target: String,
}

struct Processor<I, N> {
pub(crate) struct Processor<I, N> {
target: String,
next: N,
input_type: PhantomData<I>,
Expand All @@ -20,8 +20,8 @@ impl<I, N> Processor<I, N> {
Self { target, next, input_type: PhantomData }
}

pub(crate) fn try_from(config: Config, next: N) -> Result<Self> {
Ok(Self::new(config.target, next))
pub(crate) fn try_from(config: &Config, next: N) -> Result<Self> {
Ok(Self::new(config.target.clone(), next))
}
}

Expand Down
1 change: 1 addition & 0 deletions cdviz-collector/src/pipes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::errors::Result;
pub mod collect_to_vec;
pub mod discard_all;
pub mod log;
pub mod passthrough;

/// A pipe is an interface to implement processor for inputs.
/// The implementations can:
Expand Down
34 changes: 34 additions & 0 deletions cdviz-collector/src/pipes/passthrough.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use super::Pipe;
use crate::errors::Result;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::marker::PhantomData;

#[derive(Debug, Deserialize, Serialize, Default)]
pub(crate) struct Config {}

pub(crate) struct Processor<I, N> {
next: N,
input_type: PhantomData<I>,
}

impl<I, N> Processor<I, N> {
pub(crate) fn new(next: N) -> Self {
Self { next, input_type: PhantomData }
}

pub(crate) fn try_from(_config: &Config, next: N) -> Result<Self> {
Ok(Self::new(next))
}
}

impl<I, N> Pipe for Processor<I, N>
where
I: Debug,
N: Pipe<Input = I>,
{
type Input = I;
fn send(&mut self, input: Self::Input) -> Result<()> {
self.next.send(input)
}
}
2 changes: 1 addition & 1 deletion cdviz-collector/src/sinks/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::Message;

use super::Sink;

#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize, Default)]
pub(crate) struct Config {}

impl TryFrom<Config> for DebugSink {
Expand Down
6 changes: 6 additions & 0 deletions cdviz-collector/src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ pub(crate) enum Config {
Http(http::Config),
}

impl Default for Config {
fn default() -> Self {
Self::Debug(debug::Config {})
}
}

impl TryFrom<Config> for SinkEnum {
type Error = crate::errors::Error;

Expand Down
21 changes: 19 additions & 2 deletions cdviz-collector/src/sources/extractors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ use super::{http, opendal, EventSourcePipe, Extractor};
use crate::errors::Result;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "kind")]
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
#[serde(tag = "type")]
pub(crate) enum Config {
#[serde(alias = "noop")]
#[default]
Sleep,
#[cfg(feature = "source_http")]
#[serde(alias = "http")]
Http(http::Config),
Expand All @@ -16,6 +19,7 @@ pub(crate) enum Config {
impl Config {
pub(crate) fn into_extractor(&self, next: EventSourcePipe) -> Result<Box<dyn Extractor>> {
let out: Box<dyn Extractor> = match self {
Config::Sleep => Box::new(SleepExtractor {}),
#[cfg(feature = "source_http")]
Config::Http(config) => Box::new(http::HttpExtractor::try_from(config, next)?),
#[cfg(feature = "source_opendal")]
Expand All @@ -24,3 +28,16 @@ impl Config {
Ok(out)
}
}

struct SleepExtractor {}

#[async_trait::async_trait]
impl Extractor for SleepExtractor {
async fn run(&mut self) -> Result<()> {
use std::future;

let future = future::pending();
let () = future.await;
unreachable!()
}
}
11 changes: 0 additions & 11 deletions cdviz-collector/src/sources/hbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,6 @@ use super::{EventSource, EventSourcePipe};
use crate::errors::Result;
use crate::pipes::Pipe;
use handlebars::Handlebars;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;

#[derive(Debug, Clone, Deserialize, Serialize)]
pub(crate) struct Config {
template: String,
}

pub(crate) struct Processor {
next: EventSourcePipe,
Expand All @@ -23,10 +16,6 @@ impl Processor {
hbs.register_template_string("tpl", template)?;
Ok(Self { next, hbs })
}

pub(crate) fn try_from(config: Config, next: EventSourcePipe) -> Result<Self> {
Self::new(&config.template, next)
}
}

impl Pipe for Processor {
Expand Down
3 changes: 1 addition & 2 deletions cdviz-collector/src/sources/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::{
errors::{self, Error},
sources::EventSource,
};
use async_trait::async_trait;
use axum::{
extract::State,
http,
Expand Down Expand Up @@ -47,7 +46,7 @@ struct AppState {
next: Arc<Mutex<EventSourcePipe>>,
}

#[async_trait]
#[async_trait::async_trait]
impl Extractor for HttpExtractor {
async fn run(&mut self) -> Result<()> {
let app_state = AppState { next: Arc::clone(&self.next) };
Expand Down
4 changes: 3 additions & 1 deletion cdviz-collector/src/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ use tokio::task::JoinHandle;
// }

// TODO support name/reference for extractor / transformer
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize, Default)]
pub(crate) struct Config {
#[serde(default)]
extractor: extractors::Config,
#[serde(default)]
transformers: Vec<transformers::Config>,
}

Expand Down
35 changes: 31 additions & 4 deletions cdviz-collector/src/sources/transformers.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
use super::{hbs, EventSourcePipe};
use crate::errors::Result;
use crate::{errors::Result, pipes::discard_all, pipes::log, pipes::passthrough};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "kind")]
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
#[serde(tag = "type")]
pub(crate) enum Config {
#[serde(alias = "passthrough")]
#[default]
Passthrough,
#[serde(alias = "log")]
Log(log::Config),
#[serde(alias = "discrad_all")]
DiscardAll,
#[serde(alias = "hbs")]
Hbs { template: String },
// #[serde(alias = "vrl")]
Expand All @@ -13,9 +20,29 @@ pub(crate) enum Config {

impl Config {
pub(crate) fn into_transformer(&self, next: EventSourcePipe) -> Result<EventSourcePipe> {
let out = match &self {
let out: EventSourcePipe = match &self {
Config::Passthrough => Box::new(passthrough::Processor::new(next)),
Config::Log(config) => Box::new(log::Processor::try_from(config, next)?),
Config::DiscardAll => Box::new(discard_all::Processor::new()),
Config::Hbs { template } => Box::new(hbs::Processor::new(&template, next)?),
};
Ok(out)
}
}

// pub struct Identity {
// next: EventSourcePipe,
// }

// impl Identity {
// fn new(next: EventSourcePipe) -> Self {
// Self { next }
// }
// }

// impl Pipe for Identity {
// type Input = EventSource;
// fn send(&mut self, input: Self::Input) -> Result<()> {
// self.next.send(input)
// }
// }

0 comments on commit 0ba77e3

Please sign in to comment.