Skip to content

Commit

Permalink
feat: add a sink folder to write event into a folder (local, S3, ...)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidB committed Oct 5, 2024
1 parent 8573af7 commit ac1f6ad
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 8 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -407,3 +407,4 @@ report/
megalinter-reports/
Chart.lock
.aider*
proptest-regressions/
22 changes: 14 additions & 8 deletions cdviz-collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,31 @@ figment = { version = "0.10", features = ["toml", "env", "test"] }
proptest = "1"
rstest = "0.23"
rustainers = "0.12"
tempfile = "3"
test-strategy = "0.4"
tracing-subscriber = "0.3"
uuid = "1.10"

[features]
# default is "full" feature set
default = [
"sink_db",
"sink_folder",
"sink_http",
"source_http",
"source_opendal",
]
sink_db = ["dep:sqlx"]
sink_folder = ["dep:opendal"]
sink_http = [
"dep:reqwest",
"dep:reqwest-middleware",
"dep:reqwest-tracing",
#reqwest is also used as transitive dependencies
"reqwest/charset",
"reqwest/http2",
"reqwest/json",
"reqwest/rustls-tls",
"dep:reqwest",
"dep:reqwest-middleware",
"dep:reqwest-tracing",
#reqwest is also used as transitive dependencies
"reqwest/charset",
"reqwest/http2",
"reqwest/json",
"reqwest/rustls-tls",
]
source_http = ["dep:axum", "dep:axum-tracing-opentelemetry"]
source_opendal = [
Expand Down
80 changes: 80 additions & 0 deletions cdviz-collector/src/sinks/folder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use crate::errors::Result;
use crate::Message;
use opendal::{Operator, Scheme};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::collections::HashMap;

use super::Sink;

#[serde_as]
#[derive(Clone, Debug, Deserialize, Serialize, Default)]
pub(crate) struct Config {
/// Is the sink is enabled?
pub(crate) enabled: bool,
#[serde_as(as = "DisplayFromStr")]
kind: Scheme,
parameters: HashMap<String, String>,
}

impl TryFrom<Config> for FolderSink {
type Error = crate::errors::Error;

fn try_from(value: Config) -> Result<Self> {
let op = Operator::via_iter(value.kind, value.parameters.clone())?;
Ok(Self { op })
}
}

pub(crate) struct FolderSink {
op: Operator,
}

impl Sink for FolderSink {
async fn send(&self, msg: &Message) -> Result<()> {
let id = msg.cdevent.id();
let mut writer = self.op.writer(&format!("{id}.json")).await?;
writer.write(serde_json::to_string(&msg.cdevent)?).await?;
writer.close().await?;
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use assert2::let_assert;
use cdevents_sdk::CDEvent;
use test_strategy::proptest;

#[proptest(async = "tokio", cases = 10)]
//TODO reuse same sink for all tests (but be sure to drop it after)
async fn test_send_valid(#[any] cdevent0: CDEvent) {
// TODO allow any id (not only uuid) or change the policy in cdevents
let cdevent = cdevent0.with_id(uuid::Uuid::new_v4().to_string().try_into().unwrap());
let tmp_dir = tempfile::tempdir().unwrap();
let config = Config {
enabled: true,
kind: Scheme::Fs,
parameters: HashMap::from([(
"root".to_string(),
tmp_dir.path().to_string_lossy().to_string(),
)]),
};
let sink = FolderSink::try_from(config).unwrap();

let id = cdevent.id();
let msg = crate::Message { cdevent: cdevent.clone() };
let file = tmp_dir.path().join(format!("{id}.json"));
assert!(!file.exists());
let_assert!(Ok(()) = sink.send(&msg).await);
assert!(file.exists());
let example_txt = std::fs::read_to_string(file).unwrap();
let example_json: serde_json::Value =
serde_json::from_str(&example_txt).expect("to parse as json");
let example: CDEvent =
serde_json::from_value(example_json.clone()).expect("to parse as cdevent");
assert_eq!(example, cdevent);
proptest::prop_assert!(true);
}
}
15 changes: 15 additions & 0 deletions cdviz-collector/src/sinks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#[cfg(feature = "sink_db")]
pub(crate) mod db;
pub(crate) mod debug;
#[cfg(feature = "sink_folder")]
pub(crate) mod folder;
#[cfg(feature = "sink_http")]
pub(crate) mod http;

use crate::errors::Result;
Expand All @@ -12,6 +15,8 @@ use tokio::task::JoinHandle;
#[cfg(feature = "sink_db")]
use db::DbSink;
use debug::DebugSink;
#[cfg(feature = "sink_folder")]
use folder::FolderSink;
#[cfg(feature = "sink_http")]
use http::HttpSink;

Expand All @@ -26,6 +31,9 @@ pub(crate) enum Config {
#[cfg(feature = "sink_http")]
#[serde(alias = "http")]
Http(http::Config),
#[cfg(feature = "sink_folder")]
#[serde(alias = "folder")]
Folder(folder::Config),
}

impl Default for Config {
Expand All @@ -37,8 +45,11 @@ impl Default for Config {
impl Config {
pub(crate) fn is_enabled(&self) -> bool {
match self {
#[cfg(feature = "sink_db")]
Self::Db(db::Config { enabled, .. }) => *enabled,
Self::Debug(debug::Config { enabled, .. }) => *enabled,
#[cfg(feature = "sink_folder")]
Self::Folder(folder::Config { enabled, .. }) => *enabled,
#[cfg(feature = "sink_http")]
Self::Http(http::Config { enabled, .. }) => *enabled,
}
Expand All @@ -53,6 +64,8 @@ impl TryFrom<Config> for SinkEnum {
#[cfg(feature = "sink_db")]
Config::Db(config) => DbSink::try_from(config)?.into(),
Config::Debug(config) => DebugSink::try_from(config)?.into(),
#[cfg(feature = "sink_folder")]
Config::Folder(config) => FolderSink::try_from(config)?.into(),
#[cfg(feature = "sink_http")]
Config::Http(config) => HttpSink::try_from(config)?.into(),
};
Expand All @@ -66,6 +79,8 @@ enum SinkEnum {
#[cfg(feature = "sink_db")]
DbSink,
DebugSink,
#[cfg(feature = "sink_folder")]
FolderSink,
#[cfg(feature = "sink_http")]
HttpSink,
}
Expand Down

0 comments on commit ac1f6ad

Please sign in to comment.