Skip to content

Commit

Permalink
wip: sourcepipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
davidB committed Sep 1, 2024
1 parent a93921a commit c2a81d3
Show file tree
Hide file tree
Showing 13 changed files with 346 additions and 199 deletions.
3 changes: 3 additions & 0 deletions cdviz-collector/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod errors;
mod pipes;
mod sinks;
mod sources;

Expand Down Expand Up @@ -29,6 +30,8 @@ pub(crate) struct Cli {
pub(crate) struct Config {
sources: HashMap<String, sources::Config>,
sinks: HashMap<String, sinks::Config>,
extractors: HashMap<String, sources::extractors::Config>,
transformers: HashMap<String, sources::transformers::Config>,
}

type Sender<T> = tokio::sync::broadcast::Sender<T>;
Expand Down
36 changes: 36 additions & 0 deletions cdviz-collector/src/pipes/collect_to_vec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use super::Pipe;
use crate::errors::Result;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;

#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct Config {}

pub(crate) struct Processor<I> {
buffer: Vec<I>,
}

impl<I> Processor<I>
where
I: Clone,
{
fn new() -> Self {
Self { buffer: vec![] }
}

fn try_from(_config: Config) -> Result<Self> {
Ok(Self::new())
}

fn collected(&self) -> Vec<I> {
self.buffer.clone()
}
}

impl<I> Pipe for Processor<I> {
type Input = I;
fn send(&mut self, input: Self::Input) -> Result<()> {
self.buffer.push(input);
Ok(())
}
}
29 changes: 29 additions & 0 deletions cdviz-collector/src/pipes/discard_all.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use super::Pipe;
use crate::errors::Result;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::marker::PhantomData;

#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct Config {}

struct Processor<I> {
input_type: PhantomData<I>,
}

impl<I> Processor<I> {
pub fn new() -> Self {
Self { input_type: PhantomData }
}

pub fn try_from(_config: Config) -> Result<Self> {
Ok(Self::new())
}
}

impl<I> Pipe for Processor<I> {
type Input = I;
fn send(&mut self, _input: Self::Input) -> Result<()> {
Ok(())
}
}
38 changes: 38 additions & 0 deletions cdviz-collector/src/pipes/log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use super::Pipe;
use crate::errors::Result;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::marker::PhantomData;

#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct Config {
target: String,
}

struct Processor<I, N> {
target: String,
next: N,
input_type: PhantomData<I>,
}

impl<I, N> Processor<I, N> {
pub(crate) fn new(target: String, next: N) -> Self {
Self { target, next, input_type: PhantomData }
}

pub(crate) fn try_from(config: Config, next: N) -> Result<Self> {
Ok(Self::new(config.target, next))
}
}

impl<I, N> Pipe for Processor<I, N>
where
I: Debug,
N: Pipe<Input = I>,
{
type Input = I;
fn send(&mut self, input: Self::Input) -> Result<()> {
tracing::info!(target=self.target, input=?input);
self.next.send(input)
}
}
37 changes: 37 additions & 0 deletions cdviz-collector/src/pipes/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use crate::errors::Result;
pub mod collect_to_vec;
pub mod discard_all;
pub mod log;

/// A pipe is an interface to implement processor for inputs.
/// The implementations can:
/// - discard / drop all inputs
/// - filter
/// - transform
/// - split
/// - retry
/// - timeout
/// - ...
/// The composition of Pipes to create pipeline could be done by configuration,
/// and the behavior of the pipe should be internal,
/// so chaining of pipes should not depends of method `map`, `fold`, `filter`,
/// `filter_map`, `drop`,... like for `Iterator`, `Stream`, `RxRust`.
/// Also being able to return Error to the sender could help the Sender to ease handling (vs `Stream`)
/// like retry, buffering, forward to its caller...
///
/// The approach and goal is similar to middleware used in some webframework
/// or in [tower](https://crates.io/crates/tower), Except it's not async.
/// Maybe if we need to support async, `Pipe` will become a specialization of tower's middleware,
/// like [axum](https://crates.io/crates/axum), [warp](https://crates.io/crates/warp), [tonic](https://crates.io/crates/tonic),... do.
pub trait Pipe {
type Input;

fn send(&mut self, input: Self::Input) -> Result<()>;
}

impl<I, T: Pipe<Input = I> + ?Sized> Pipe for Box<T> {
type Input = I;
fn send(&mut self, input: Self::Input) -> Result<()> {
T::send(self, input)
}
}
26 changes: 26 additions & 0 deletions cdviz-collector/src/sources/extractors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use super::{http, opendal, EventSourcePipe, Extractor};
use crate::errors::Result;
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize)]
#[serde(tag = "kind")]
pub(crate) enum Config {
#[cfg(feature = "source_http")]
#[serde(alias = "http")]
Http(http::Config),
#[cfg(feature = "source_opendal")]
#[serde(alias = "opendal")]
Opendal(opendal::Config),
}

impl Config {
fn into_extractor(&self, next: EventSourcePipe) -> Result<Box<dyn Extractor>> {
let out = match self {
// #[cfg(feature = "source_http")]
// Config::Http(config) => Box::new(http::HttpExtractor::try_from(config, next)?),
// #[cfg(feature = "source_opendal")]
// Config::Opendal(config) => OpendalSource::try_from(config)?.into(),
};
Ok(out)
}
}
39 changes: 39 additions & 0 deletions cdviz-collector/src/sources/hbs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use super::{EventSource, EventSourcePipe};
use crate::errors::Result;
use crate::pipes::Pipe;
use handlebars::Handlebars;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;

#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct Config {
template: String,
}

pub(crate) struct Processor {
next: EventSourcePipe,
hbs: Handlebars<'static>,
}

impl Processor {
pub(crate) fn new(template: &str, next: EventSourcePipe) -> Result<Self> {
let mut hbs = Handlebars::new();
hbs.set_dev_mode(false);
hbs.set_strict_mode(true);
hbs.register_template_string("tpl", template)?;
Ok(Self { next, hbs })
}

pub(crate) fn try_from(config: Config, next: EventSourcePipe) -> Result<Self> {
Self::new(&config.template, next)
}
}

impl Pipe for Processor {
type Input = EventSource;
fn send(&mut self, input: Self::Input) -> Result<()> {
let res = self.hbs.render("tpl", &input)?;
let output: EventSource = serde_json::from_str(&res)?;
self.next.send(output)
}
}
53 changes: 29 additions & 24 deletions cdviz-collector/src/sources/http.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::Source;
use super::{EventSourcePipe, Extractor};
use crate::{
errors::{self, Error},
Message, Sender,
sources::EventSource,
};
use axum::{
extract::State,
Expand All @@ -11,11 +11,14 @@ use axum::{
Json, Router,
};
use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer};
use cdevents_sdk::CDEvent;
use errors::Result;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::net::{IpAddr, SocketAddr};
use std::{
collections::HashMap,
net::{IpAddr, SocketAddr},
sync::{Arc, Mutex},
};

/// The http server config
#[derive(Clone, Debug, Deserialize, Serialize)]
Expand All @@ -29,26 +32,24 @@ pub(crate) struct Config {
pub(crate) port: u16,
}

impl TryFrom<Config> for HttpSource {
type Error = crate::errors::Error;

fn try_from(value: Config) -> Result<Self> {
Ok(HttpSource { config: value.clone() })
}
}

pub(crate) struct HttpSource {
pub(crate) struct HttpExtractor {
config: Config,
next: Arc<Mutex<EventSourcePipe>>,
}

impl HttpExtractor {
pub fn try_from(value: &Config, next: EventSourcePipe) -> Result<Self> {
Ok(HttpExtractor { config: value.clone(), next: Arc::new(Mutex::new(next)) })
}
}
#[derive(Clone)]
struct AppState {
tx: Sender<Message>,
next: Arc<Mutex<EventSourcePipe>>,
}

impl Source for HttpSource {
async fn run(&mut self, tx: Sender<Message>) -> Result<()> {
let app_state = AppState { tx };
impl Extractor for HttpExtractor {
async fn run(&mut self) -> Result<()> {
let app_state = AppState { next: Arc::clone(&self.next) };

let app = app().with_state(app_state);
// run it
Expand All @@ -64,10 +65,11 @@ impl Source for HttpSource {
}
}

//TODO make route per extractor/sources
fn app() -> Router<AppState> {
// build our application with a route
Router::new()
.route("/cdevents", post(cdevents_collect))
.route("/cdevents", post(events_collect))
// include trace context as header into the response
.layer(OtelInResponseLayer)
//start OpenTelemetry trace on incoming request
Expand All @@ -80,17 +82,20 @@ async fn health() -> impl IntoResponse {
http::StatusCode::OK
}

//TODO validate format of cdevents JSON
//TODO support events in cloudevents format (extract info from headers)
//TODO try [deser](https://crates.io/crates/deserr) to return good error
//TODO use cloudevents
#[tracing::instrument(skip(app_state, payload))]
async fn cdevents_collect(
//TODO add metadata & headers info into SourceEvent
//TODO log & convert error
#[tracing::instrument(skip(app_state, body))]
async fn events_collect(
State(app_state): State<AppState>,
Json(payload): Json<CDEvent>,
Json(body): Json<serde_json::Value>,
) -> Result<http::StatusCode> {
tracing::trace!("received cloudevent {:?}", &payload);
app_state.tx.send(Message::from(payload))?;
tracing::trace!("received {:?}", &body);
let event = EventSource { metadata: serde_json::Value::Null, header: HashMap::new(), body };
let mut next = app_state.next.lock().unwrap();
next.as_mut().send(event)?;
Ok(http::StatusCode::CREATED)
}

Expand Down
Loading

0 comments on commit c2a81d3

Please sign in to comment.