diff --git a/.gitignore b/.gitignore index 7e746e5..dbaf393 100644 --- a/.gitignore +++ b/.gitignore @@ -407,3 +407,4 @@ report/ megalinter-reports/ Chart.lock .aider* +proptest-regressions/ diff --git a/cdviz-collector/Cargo.toml b/cdviz-collector/Cargo.toml index fdcf7a5..eb7151b 100644 --- a/cdviz-collector/Cargo.toml +++ b/cdviz-collector/Cargo.toml @@ -72,25 +72,31 @@ figment = { version = "0.10", features = ["toml", "env", "test"] } proptest = "1" rstest = "0.23" rustainers = "0.12" +tempfile = "3" +test-strategy = "0.4" tracing-subscriber = "0.3" +uuid = "1.10" [features] +# default is "full" feature set default = [ "sink_db", + "sink_folder", "sink_http", "source_http", "source_opendal", ] sink_db = ["dep:sqlx"] +sink_folder = ["dep:opendal"] sink_http = [ - "dep:reqwest", - "dep:reqwest-middleware", - "dep:reqwest-tracing", - #reqwest is also used as transitive dependencies - "reqwest/charset", - "reqwest/http2", - "reqwest/json", - "reqwest/rustls-tls", + "dep:reqwest", + "dep:reqwest-middleware", + "dep:reqwest-tracing", + #reqwest is also used as transitive dependencies + "reqwest/charset", + "reqwest/http2", + "reqwest/json", + "reqwest/rustls-tls", ] source_http = ["dep:axum", "dep:axum-tracing-opentelemetry"] source_opendal = [ diff --git a/cdviz-collector/src/sinks/folder.rs b/cdviz-collector/src/sinks/folder.rs new file mode 100644 index 0000000..3d6b04d --- /dev/null +++ b/cdviz-collector/src/sinks/folder.rs @@ -0,0 +1,80 @@ +use crate::errors::Result; +use crate::Message; +use opendal::{Operator, Scheme}; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; +use std::collections::HashMap; + +use super::Sink; + +#[serde_as] +#[derive(Clone, Debug, Deserialize, Serialize, Default)] +pub(crate) struct Config { + /// Is the sink is enabled? + pub(crate) enabled: bool, + #[serde_as(as = "DisplayFromStr")] + kind: Scheme, + parameters: HashMap, +} + +impl TryFrom for FolderSink { + type Error = crate::errors::Error; + + fn try_from(value: Config) -> Result { + let op = Operator::via_iter(value.kind, value.parameters.clone())?; + Ok(Self { op }) + } +} + +pub(crate) struct FolderSink { + op: Operator, +} + +impl Sink for FolderSink { + async fn send(&self, msg: &Message) -> Result<()> { + let id = msg.cdevent.id(); + let mut writer = self.op.writer(&format!("{id}.json")).await?; + writer.write(serde_json::to_string(&msg.cdevent)?).await?; + writer.close().await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use assert2::let_assert; + use cdevents_sdk::CDEvent; + use test_strategy::proptest; + + #[proptest(async = "tokio", cases = 10)] + //TODO reuse same sink for all tests (but be sure to drop it after) + async fn test_send_valid(#[any] cdevent0: CDEvent) { + // TODO allow any id (not only uuid) or change the policy in cdevents + let cdevent = cdevent0.with_id(uuid::Uuid::new_v4().to_string().try_into().unwrap()); + let tmp_dir = tempfile::tempdir().unwrap(); + let config = Config { + enabled: true, + kind: Scheme::Fs, + parameters: HashMap::from([( + "root".to_string(), + tmp_dir.path().to_string_lossy().to_string(), + )]), + }; + let sink = FolderSink::try_from(config).unwrap(); + + let id = cdevent.id(); + let msg = crate::Message { cdevent: cdevent.clone() }; + let file = tmp_dir.path().join(format!("{id}.json")); + assert!(!file.exists()); + let_assert!(Ok(()) = sink.send(&msg).await); + assert!(file.exists()); + let example_txt = std::fs::read_to_string(file).unwrap(); + let example_json: serde_json::Value = + serde_json::from_str(&example_txt).expect("to parse as json"); + let example: CDEvent = + serde_json::from_value(example_json.clone()).expect("to parse as cdevent"); + assert_eq!(example, cdevent); + proptest::prop_assert!(true); + } +} diff --git a/cdviz-collector/src/sinks/mod.rs b/cdviz-collector/src/sinks/mod.rs index b195707..b3eb047 100644 --- a/cdviz-collector/src/sinks/mod.rs +++ b/cdviz-collector/src/sinks/mod.rs @@ -1,6 +1,9 @@ #[cfg(feature = "sink_db")] pub(crate) mod db; pub(crate) mod debug; +#[cfg(feature = "sink_folder")] +pub(crate) mod folder; +#[cfg(feature = "sink_http")] pub(crate) mod http; use crate::errors::Result; @@ -12,6 +15,8 @@ use tokio::task::JoinHandle; #[cfg(feature = "sink_db")] use db::DbSink; use debug::DebugSink; +#[cfg(feature = "sink_folder")] +use folder::FolderSink; #[cfg(feature = "sink_http")] use http::HttpSink; @@ -26,6 +31,9 @@ pub(crate) enum Config { #[cfg(feature = "sink_http")] #[serde(alias = "http")] Http(http::Config), + #[cfg(feature = "sink_folder")] + #[serde(alias = "folder")] + Folder(folder::Config), } impl Default for Config { @@ -37,8 +45,11 @@ impl Default for Config { impl Config { pub(crate) fn is_enabled(&self) -> bool { match self { + #[cfg(feature = "sink_db")] Self::Db(db::Config { enabled, .. }) => *enabled, Self::Debug(debug::Config { enabled, .. }) => *enabled, + #[cfg(feature = "sink_folder")] + Self::Folder(folder::Config { enabled, .. }) => *enabled, #[cfg(feature = "sink_http")] Self::Http(http::Config { enabled, .. }) => *enabled, } @@ -53,6 +64,8 @@ impl TryFrom for SinkEnum { #[cfg(feature = "sink_db")] Config::Db(config) => DbSink::try_from(config)?.into(), Config::Debug(config) => DebugSink::try_from(config)?.into(), + #[cfg(feature = "sink_folder")] + Config::Folder(config) => FolderSink::try_from(config)?.into(), #[cfg(feature = "sink_http")] Config::Http(config) => HttpSink::try_from(config)?.into(), }; @@ -66,6 +79,8 @@ enum SinkEnum { #[cfg(feature = "sink_db")] DbSink, DebugSink, + #[cfg(feature = "sink_folder")] + FolderSink, #[cfg(feature = "sink_http")] HttpSink, }