diff --git a/cdviz-collector/src/main.rs b/cdviz-collector/src/main.rs index 0ccc9b4..3a3e5f8 100644 --- a/cdviz-collector/src/main.rs +++ b/cdviz-collector/src/main.rs @@ -19,11 +19,7 @@ use tokio::sync::broadcast; #[derive(Debug, Clone, clap::Parser)] pub(crate) struct Cli { - #[clap( - long = "config", - env("CDVIZ_COLLECTOR_CONFIG"), - default_value = "cdviz-collector.toml" - )] + #[clap(long = "config", env("CDVIZ_COLLECTOR_CONFIG"), default_value = "cdviz-collector.toml")] config: PathBuf, #[command(flatten)] verbose: clap_verbosity_flag::Verbosity, diff --git a/cdviz-collector/src/sinks/db.rs b/cdviz-collector/src/sinks/db.rs index dba75a2..0177f12 100644 --- a/cdviz-collector/src/sinks/db.rs +++ b/cdviz-collector/src/sinks/db.rs @@ -164,26 +164,15 @@ mod tests { .expect("start container"); let config = Config { - url: pg_container - .url() - .await - .expect("find db url") - .replace("localhost", "127.0.0.1"), // replace localhost by 127.0.0.1 because localhost in ipv6 doesn't work + url: pg_container.url().await.expect("find db url").replace("localhost", "127.0.0.1"), // replace localhost by 127.0.0.1 because localhost in ipv6 doesn't work pool_connections_min: 1, pool_connections_max: 30, }; let dbsink = DbSink::try_from(config).unwrap(); //Basic initialize the db schema //TODO improve the loading, initialisation of the db - for sql in read_to_string("../cdviz-db/src/schema.sql") - .unwrap() - .split(';') - { - sqlx::QueryBuilder::new(sql) - .build() - .execute(&dbsink.pool) - .await - .unwrap(); + for sql in read_to_string("../cdviz-db/src/schema.sql").unwrap().split(';') { + sqlx::QueryBuilder::new(sql).build().execute(&dbsink.pool).await.unwrap(); } // container should be keep, else it is remove on drop (dbsink, pg_container) @@ -199,11 +188,7 @@ mod tests { let _tracing_guard = tracing::subscriber::set_default(subscriber); let (sink, _db_guard) = async_pg.await; - TestContext { - sink, - _db_guard, - _tracing_guard, - } + TestContext { sink, _db_guard, _tracing_guard } } #[rstest()] diff --git a/cdviz-collector/src/sources/http.rs b/cdviz-collector/src/sources/http.rs index 0c04ae6..48b758d 100644 --- a/cdviz-collector/src/sources/http.rs +++ b/cdviz-collector/src/sources/http.rs @@ -33,9 +33,7 @@ impl TryFrom for HttpSource { type Error = crate::errors::Error; fn try_from(value: Config) -> Result { - Ok(HttpSource { - config: value.clone(), - }) + Ok(HttpSource { config: value.clone() }) } } diff --git a/cdviz-collector/src/sources/opendal/filter.rs b/cdviz-collector/src/sources/opendal/filter.rs index 5b89f7c..a879e5e 100644 --- a/cdviz-collector/src/sources/opendal/filter.rs +++ b/cdviz-collector/src/sources/opendal/filter.rs @@ -14,11 +14,7 @@ pub(crate) struct Filter { impl Filter { pub(crate) fn from_patterns(path_patterns: Option) -> Self { - Filter { - ts_after: DateTime::::MIN_UTC, - ts_before: Utc::now(), - path_patterns, - } + Filter { ts_after: DateTime::::MIN_UTC, ts_before: Utc::now(), path_patterns } } pub(crate) fn accept(&self, entry: &Entry) -> bool { @@ -30,10 +26,7 @@ impl Filter { && meta.content_length() > 0 && is_match(&self.path_patterns, entry.path()) } else { - tracing::warn!( - path = entry.path(), - "can not read last modified timestamp, skip" - ); + tracing::warn!(path = entry.path(), "can not read last modified timestamp, skip"); false } } else { @@ -61,9 +54,8 @@ pub(crate) fn globset_from(patterns: &[String]) -> Result> { } else { let mut builder = globset::GlobSetBuilder::new(); for pattern in patterns { - let glob = globset::GlobBuilder::new(pattern.as_str()) - .literal_separator(true) - .build()?; + let glob = + globset::GlobBuilder::new(pattern.as_str()).literal_separator(true).build()?; builder.add(glob); } Ok(Some(builder.build()?)) @@ -84,10 +76,7 @@ mod tests { #[case(vec!["**/*.json"], "foo.json")] #[case(vec!["**/*.json"], "bar/foo.json")] fn test_patterns_accept(#[case] patterns: Vec<&str>, #[case] path: &str) { - let patterns = patterns - .into_iter() - .map(String::from) - .collect::>(); + let patterns = patterns.into_iter().map(String::from).collect::>(); let globset = globset_from(&patterns).unwrap(); assert!(is_match(&globset, path)); } @@ -98,10 +87,7 @@ mod tests { #[case(vec!["*.json"], "bar/foo.json")] #[case(vec!["*.json"], "/foo.json")] fn test_patterns_reject(#[case] patterns: Vec<&str>, #[case] path: &str) { - let patterns = patterns - .into_iter() - .map(String::from) - .collect::>(); + let patterns = patterns.into_iter().map(String::from).collect::>(); let globset = globset_from(&patterns).unwrap(); assert!(!is_match(&globset, path)); } diff --git a/cdviz-collector/src/sources/opendal/mod.rs b/cdviz-collector/src/sources/opendal/mod.rs index b90d175..0706c42 100644 --- a/cdviz-collector/src/sources/opendal/mod.rs +++ b/cdviz-collector/src/sources/opendal/mod.rs @@ -63,14 +63,8 @@ pub(crate) struct OpendalSource { impl Source for OpendalSource { async fn run(&mut self, tx: Sender) -> Result<()> { loop { - if let Err(err) = run_once( - &tx, - &self.op, - &self.filter, - self.recursive, - &self.transformer, - ) - .await + if let Err(err) = + run_once(&tx, &self.op, &self.filter, self.recursive, &self.transformer).await { tracing::warn!(?err, filter = ?self.filter, scheme =? self.op.info().scheme(), root =? self.op.info().root(), "fail during scanning"); } diff --git a/cdviz-collector/src/sources/opendal/transformers.rs b/cdviz-collector/src/sources/opendal/transformers.rs index 2c116c9..62ee21b 100644 --- a/cdviz-collector/src/sources/opendal/transformers.rs +++ b/cdviz-collector/src/sources/opendal/transformers.rs @@ -130,10 +130,7 @@ impl Transformer for CsvRowExtractor { let mut out = Vec::new(); for record in rdr.records() { let record = record?; - let content = headers - .iter() - .zip(record.iter()) - .collect::>(); + let content = headers.iter().zip(record.iter()).collect::>(); let data = json!({ "metadata" : metadata.clone(), "content": content, @@ -190,10 +187,7 @@ mod tests { let_assert!(Some(abs_root) = result["root"].as_str()); check!(abs_root.ends_with("examples/assets/opendal_fs")); let_assert!( - Ok(_) = result["last_modified"] - .as_str() - .unwrap_or_default() - .parse::>() + Ok(_) = result["last_modified"].as_str().unwrap_or_default().parse::>() ); }