Skip to content

Commit

Permalink
style: reformat code
Browse files Browse the repository at this point in the history
  • Loading branch information
davidB committed Sep 1, 2024
1 parent 3cdec8f commit 1ad9b00
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 63 deletions.
6 changes: 1 addition & 5 deletions cdviz-collector/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@ use tokio::sync::broadcast;

#[derive(Debug, Clone, clap::Parser)]
pub(crate) struct Cli {
#[clap(
long = "config",
env("CDVIZ_COLLECTOR_CONFIG"),
default_value = "cdviz-collector.toml"
)]
#[clap(long = "config", env("CDVIZ_COLLECTOR_CONFIG"), default_value = "cdviz-collector.toml")]
config: PathBuf,
#[command(flatten)]
verbose: clap_verbosity_flag::Verbosity,
Expand Down
23 changes: 4 additions & 19 deletions cdviz-collector/src/sinks/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,26 +164,15 @@ mod tests {
.expect("start container");

let config = Config {
url: pg_container
.url()
.await
.expect("find db url")
.replace("localhost", "127.0.0.1"), // replace localhost by 127.0.0.1 because localhost in ipv6 doesn't work
url: pg_container.url().await.expect("find db url").replace("localhost", "127.0.0.1"), // replace localhost by 127.0.0.1 because localhost in ipv6 doesn't work
pool_connections_min: 1,
pool_connections_max: 30,
};
let dbsink = DbSink::try_from(config).unwrap();
//Basic initialize the db schema
//TODO improve the loading, initialisation of the db
for sql in read_to_string("../cdviz-db/src/schema.sql")
.unwrap()
.split(';')
{
sqlx::QueryBuilder::new(sql)
.build()
.execute(&dbsink.pool)
.await
.unwrap();
for sql in read_to_string("../cdviz-db/src/schema.sql").unwrap().split(';') {
sqlx::QueryBuilder::new(sql).build().execute(&dbsink.pool).await.unwrap();
}
// container should be keep, else it is remove on drop
(dbsink, pg_container)
Expand All @@ -199,11 +188,7 @@ mod tests {
let _tracing_guard = tracing::subscriber::set_default(subscriber);

let (sink, _db_guard) = async_pg.await;
TestContext {
sink,
_db_guard,
_tracing_guard,
}
TestContext { sink, _db_guard, _tracing_guard }
}

#[rstest()]
Expand Down
4 changes: 1 addition & 3 deletions cdviz-collector/src/sources/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ impl TryFrom<Config> for HttpSource {
type Error = crate::errors::Error;

fn try_from(value: Config) -> Result<Self> {
Ok(HttpSource {
config: value.clone(),
})
Ok(HttpSource { config: value.clone() })
}
}

Expand Down
26 changes: 6 additions & 20 deletions cdviz-collector/src/sources/opendal/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@ pub(crate) struct Filter {

impl Filter {
pub(crate) fn from_patterns(path_patterns: Option<GlobSet>) -> Self {
Filter {
ts_after: DateTime::<Utc>::MIN_UTC,
ts_before: Utc::now(),
path_patterns,
}
Filter { ts_after: DateTime::<Utc>::MIN_UTC, ts_before: Utc::now(), path_patterns }
}

pub(crate) fn accept(&self, entry: &Entry) -> bool {
Expand All @@ -30,10 +26,7 @@ impl Filter {
&& meta.content_length() > 0
&& is_match(&self.path_patterns, entry.path())
} else {
tracing::warn!(
path = entry.path(),
"can not read last modified timestamp, skip"
);
tracing::warn!(path = entry.path(), "can not read last modified timestamp, skip");
false
}
} else {
Expand Down Expand Up @@ -61,9 +54,8 @@ pub(crate) fn globset_from(patterns: &[String]) -> Result<Option<GlobSet>> {
} else {
let mut builder = globset::GlobSetBuilder::new();
for pattern in patterns {
let glob = globset::GlobBuilder::new(pattern.as_str())
.literal_separator(true)
.build()?;
let glob =
globset::GlobBuilder::new(pattern.as_str()).literal_separator(true).build()?;
builder.add(glob);
}
Ok(Some(builder.build()?))
Expand All @@ -84,10 +76,7 @@ mod tests {
#[case(vec!["**/*.json"], "foo.json")]
#[case(vec!["**/*.json"], "bar/foo.json")]
fn test_patterns_accept(#[case] patterns: Vec<&str>, #[case] path: &str) {
let patterns = patterns
.into_iter()
.map(String::from)
.collect::<Vec<String>>();
let patterns = patterns.into_iter().map(String::from).collect::<Vec<String>>();
let globset = globset_from(&patterns).unwrap();
assert!(is_match(&globset, path));
}
Expand All @@ -98,10 +87,7 @@ mod tests {
#[case(vec!["*.json"], "bar/foo.json")]
#[case(vec!["*.json"], "/foo.json")]
fn test_patterns_reject(#[case] patterns: Vec<&str>, #[case] path: &str) {
let patterns = patterns
.into_iter()
.map(String::from)
.collect::<Vec<String>>();
let patterns = patterns.into_iter().map(String::from).collect::<Vec<String>>();
let globset = globset_from(&patterns).unwrap();
assert!(!is_match(&globset, path));
}
Expand Down
10 changes: 2 additions & 8 deletions cdviz-collector/src/sources/opendal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,8 @@ pub(crate) struct OpendalSource {
impl Source for OpendalSource {
async fn run(&mut self, tx: Sender<Message>) -> Result<()> {
loop {
if let Err(err) = run_once(
&tx,
&self.op,
&self.filter,
self.recursive,
&self.transformer,
)
.await
if let Err(err) =
run_once(&tx, &self.op, &self.filter, self.recursive, &self.transformer).await
{
tracing::warn!(?err, filter = ?self.filter, scheme =? self.op.info().scheme(), root =? self.op.info().root(), "fail during scanning");
}
Expand Down
10 changes: 2 additions & 8 deletions cdviz-collector/src/sources/opendal/transformers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,7 @@ impl Transformer for CsvRowExtractor {
let mut out = Vec::new();
for record in rdr.records() {
let record = record?;
let content = headers
.iter()
.zip(record.iter())
.collect::<HashMap<&str, &str>>();
let content = headers.iter().zip(record.iter()).collect::<HashMap<&str, &str>>();
let data = json!({
"metadata" : metadata.clone(),
"content": content,
Expand Down Expand Up @@ -190,10 +187,7 @@ mod tests {
let_assert!(Some(abs_root) = result["root"].as_str());
check!(abs_root.ends_with("examples/assets/opendal_fs"));
let_assert!(
Ok(_) = result["last_modified"]
.as_str()
.unwrap_or_default()
.parse::<DateTime<Utc>>()
Ok(_) = result["last_modified"].as_str().unwrap_or_default().parse::<DateTime<Utc>>()
);
}

Expand Down

0 comments on commit 1ad9b00

Please sign in to comment.