diff --git a/src/config.rs b/src/config.rs index a2b1eda..6f525d2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -212,15 +212,10 @@ pub struct VerifyConfig { pub blocklist: HashSet, } -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct VarlinkConfig { pub enabled: bool, } -impl Default for VarlinkConfig { - fn default() -> Self { - VarlinkConfig { enabled: false } - } -} /// Config struct /// Each section represents an ini file section diff --git a/src/lib.rs b/src/lib.rs index 445e5f7..c5bfc32 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -170,20 +170,30 @@ pub async fn stat_collector( // Run service collectors if there are services listed in config if config.units.enabled { - if config.varlink.enabled { - join_set.spawn(crate::varlink_units::update_unit_stats( - Arc::clone(&config), - sdc.clone(), - locked_machine_stats.clone(), - crate::varlink_units::METRICS_SOCKET_PATH.to_string(), - )); - } else { - join_set.spawn(crate::units::update_unit_stats( - Arc::clone(&config), - sdc.clone(), - locked_machine_stats.clone(), - )); - } + let config_clone = Arc::clone(&config); + let sdc_clone = sdc.clone(); + let stats_clone = locked_machine_stats.clone(); + join_set.spawn(async move { + if config_clone.varlink.enabled { + let socket_path = crate::varlink_units::METRICS_SOCKET_PATH.to_string(); + match crate::varlink_units::update_unit_stats( + config_clone, + stats_clone, + socket_path, + ) + .await + { + Ok(()) => return Ok(()), + Err(err) => { + warn!( + "Varlink units stats failed, falling back to D-Bus: {:?}", + err + ); + } + } + } + crate::units::update_unit_stats(config_clone, sdc_clone, stats_clone).await + }); } if config.machines.enabled { diff --git a/src/machines.rs b/src/machines.rs index deb5383..d6d4e78 100644 --- a/src/machines.rs +++ b/src/machines.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use thiserror::Error; use tokio::sync::RwLock; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; use crate::MachineStats; use crate::MonitordStats; @@ -103,17 +103,33 @@ pub async fn update_machines_stats( if config.units.enabled { if config.varlink.enabled { + let config_clone = Arc::clone(&config); + let sdc_clone = sdc.clone(); + let stats_clone = locked_machine_stats.clone(); let container_socket_path = format!( "/proc/{}/root{}", leader_pid, crate::varlink_units::METRICS_SOCKET_PATH ); - join_set.spawn(crate::varlink_units::update_unit_stats( - Arc::clone(&config), - sdc.clone(), - locked_machine_stats.clone(), - container_socket_path, - )); + join_set.spawn(async move { + match crate::varlink_units::update_unit_stats( + Arc::clone(&config_clone), + stats_clone.clone(), + container_socket_path, + ) + .await + { + Ok(()) => Ok(()), + Err(err) => { + warn!( + "Varlink units stats failed, falling back to D-Bus: {:?}", + err + ); + crate::units::update_unit_stats(config_clone, sdc_clone, stats_clone) + .await + } + } + }); } else { join_set.spawn(crate::units::update_unit_stats( Arc::clone(&config), diff --git a/src/varlink/metrics.rs b/src/varlink/metrics.rs index 215a9c2..943dc88 100644 --- a/src/varlink/metrics.rs +++ b/src/varlink/metrics.rs @@ -62,23 +62,25 @@ impl ListOutput { self.object.as_deref().unwrap_or("").to_string() } - /// Returns the string value or default_value if not present - pub fn value_as_string<'a>(&'a self, default_value: &'a str) -> &'a str { - self.value.as_str().unwrap_or(default_value) + /// Returns the string value. Caller must validate value is a string first. + pub fn value_as_string(&self) -> &str { + self.value + .as_str() + .expect("value_as_string called on non-string value; validate metric type first") } - /// Returns the int value as u64 or default_value if not present - pub fn value_as_int(&self, default_value: u64) -> u64 { + /// Returns the int value. Caller must validate value is an integer first. + pub fn value_as_int(&self) -> i64 { self.value .as_i64() - .filter(|v| *v >= 0) - .map(|v| v as u64) - .unwrap_or(default_value) + .expect("value_as_int called on non-integer value; validate metric type first") } - /// Returns the bool value if present - pub fn value_as_bool(&self) -> Option { - self.value.as_bool() + /// Returns the bool value. Caller must validate value is a bool first. + pub fn value_as_bool(&self) -> bool { + self.value + .as_bool() + .expect("value_as_bool called on non-boolean value; validate metric type first") } /// Returns the fields map if present @@ -296,67 +298,68 @@ mod tests { fields: None, }; - assert_eq!(output.value_as_string("unknown"), "active"); + assert_eq!(output.value_as_string(), "active"); } #[test] - fn test_value_as_string_without_value() { + fn test_value_as_string_empty_string() { let output = ListOutput { name: "test.metric".to_string(), - value: serde_json::Value::Null, + value: serde_json::json!(""), object: None, fields: None, }; - assert_eq!(output.value_as_string("unknown"), "unknown"); + assert_eq!(output.value_as_string(), ""); } #[test] - fn test_value_as_string_empty_string() { + fn test_value_as_int_with_value() { let output = ListOutput { name: "test.metric".to_string(), - value: serde_json::json!(""), + value: serde_json::json!(42), object: None, fields: None, }; - assert_eq!(output.value_as_string("default"), ""); + assert_eq!(output.value_as_int(), 42); } #[test] - fn test_value_as_int_with_value() { + #[should_panic(expected = "value_as_int called on non-integer value")] + fn test_value_as_int_without_value() { let output = ListOutput { name: "test.metric".to_string(), - value: serde_json::json!(42), + value: serde_json::Value::Null, object: None, fields: None, }; - assert_eq!(output.value_as_int(0), 42); + output.value_as_int(); } #[test] - fn test_value_as_int_without_value() { + fn test_value_as_int_zero() { let output = ListOutput { name: "test.metric".to_string(), - value: serde_json::Value::Null, + value: serde_json::json!(0), object: None, fields: None, }; - assert_eq!(output.value_as_int(0), 0); + assert_eq!(output.value_as_int(), 0); } #[test] - fn test_value_as_int_zero() { + fn test_value_as_int_negative() { let output = ListOutput { name: "test.metric".to_string(), - value: serde_json::json!(0), + value: serde_json::json!(-5), object: None, fields: None, }; - assert_eq!(output.value_as_int(0), 0); + assert_eq!(output.value_as_int(), -5); } #[test] @@ -368,7 +371,7 @@ mod tests { fields: None, }; - assert_eq!(output.value_as_int(0), 9999999999); + assert_eq!(output.value_as_int(), 9999999999); } #[test] @@ -380,7 +383,7 @@ mod tests { fields: None, }; - assert_eq!(output.value_as_bool(), Some(true)); + assert_eq!(output.value_as_bool(), true); } #[test] @@ -392,10 +395,11 @@ mod tests { fields: None, }; - assert_eq!(output.value_as_bool(), Some(false)); + assert_eq!(output.value_as_bool(), false); } #[test] + #[should_panic(expected = "value_as_bool called on non-boolean value")] fn test_value_as_bool_none() { let output = ListOutput { name: "test.metric".to_string(), @@ -404,6 +408,6 @@ mod tests { fields: None, }; - assert_eq!(output.value_as_bool(), None); + output.value_as_bool(); } } diff --git a/src/varlink_units.rs b/src/varlink_units.rs index e83813c..98ad69f 100644 --- a/src/varlink_units.rs +++ b/src/varlink_units.rs @@ -20,10 +20,28 @@ use zlink::unix; pub const METRICS_SOCKET_PATH: &str = "/run/systemd/report/io.systemd.Manager"; -/// Parse a string value from a metric into an enum type with a default fallback -fn parse_metric_enum(metric: &ListOutput, default: T) -> T { - let value_str = metric.value_as_string("" /* default_value */); - T::from_str(value_str).unwrap_or(default) +/// Parse a string value from a metric into an enum type, warning on failure +fn parse_metric_enum(metric: &ListOutput) -> Option { + if !metric.value().is_string() { + warn!( + "Metric {} has non-string value: {:?}", + metric.name(), + metric.value() + ); + return None; + } + let value_str = metric.value_as_string(); + match T::from_str(value_str) { + Ok(v) => Some(v), + Err(_) => { + warn!( + "Metric {} has unrecognized value: {:?}", + metric.name(), + value_str + ); + None + } + } } /// Check if a unit name should be skipped based on allowlist/blocklist @@ -50,45 +68,102 @@ pub fn parse_one_metric( let object_name = metric.object_name(); match metric_name_suffix { - "unit_active_state" => { + "UnitActiveState" => { if should_skip_unit(&object_name, config) { return Ok(()); } + let active_state: SystemdUnitActiveState = match parse_metric_enum(metric) { + Some(v) => v, + None => return Ok(()), + }; let unit_state = stats .unit_states .entry(object_name.to_string()) .or_default(); - unit_state.active_state = parse_metric_enum(metric, SystemdUnitActiveState::unknown); + unit_state.active_state = active_state; unit_state.unhealthy = is_unit_unhealthy(unit_state.active_state, unit_state.load_state); } - "unit_load_state" => { + "UnitLoadState" => { if should_skip_unit(&object_name, config) { return Ok(()); } - let value = metric.value_as_string("" /* default_value */); + if !metric.value().is_string() { + warn!( + "Metric {} has non-string value: {:?}", + metric.name(), + metric.value() + ); + return Ok(()); + } + let value = metric.value_as_string(); + let load_state = match SystemdUnitLoadState::from_str(value) { + Ok(v) => v, + Err(_) => { + warn!( + "Metric {} has unrecognized value: {:?}", + metric.name(), + value + ); + return Ok(()); + } + }; let unit_state = stats .unit_states .entry(object_name.to_string()) .or_default(); - unit_state.load_state = - SystemdUnitLoadState::from_str(value).unwrap_or(SystemdUnitLoadState::unknown); + unit_state.load_state = load_state; unit_state.unhealthy = is_unit_unhealthy(unit_state.active_state, unit_state.load_state); } - "nrestarts" => { + "NRestarts" => { if should_skip_unit(&object_name, config) { return Ok(()); } + if !metric.value().is_i64() { + warn!( + "Metric {} has non-integer value: {:?}", + metric.name(), + metric.value() + ); + return Ok(()); + } + let value = metric.value_as_int(); + let nrestarts: u32 = match value.try_into() { + Ok(v) => v, + Err(_) => { + warn!( + "Metric {} has out-of-range value for u32: {}", + metric.name(), + value + ); + return Ok(()); + } + }; stats .service_stats .entry(object_name.to_string()) .or_default() - .nrestarts = metric.value_as_int(0 /* default_value */) as u32; + .nrestarts = nrestarts; } - "units_by_type_total" => { + "UnitsByTypeTotal" => { if let Some(type_str) = metric.get_field_as_str("type") { - let value = metric.value_as_int(0 /* default_value */); + if !metric.value().is_i64() { + warn!( + "Metric {} has non-integer value: {:?}", + metric.name(), + metric.value() + ); + return Ok(()); + } + let value = metric.value_as_int(); + let value: u64 = match value.try_into() { + Ok(v) => v, + Err(_) => { + warn!("Metric {} has negative value: {}", metric.name(), value); + return Ok(()); + } + }; match type_str { "automount" => stats.automount_units = value, "device" => stats.device_units = value, @@ -104,9 +179,24 @@ pub fn parse_one_metric( } } } - "units_by_state_total" => { + "UnitsByStateTotal" => { if let Some(state_str) = metric.get_field_as_str("state") { - let value = metric.value_as_int(0 /* default_value */); + if !metric.value().is_i64() { + warn!( + "Metric {} has non-integer value: {:?}", + metric.name(), + metric.value() + ); + return Ok(()); + } + let value = metric.value_as_int(); + let value: u64 = match value.try_into() { + Ok(v) => v, + Err(_) => { + warn!("Metric {} has negative value: {}", metric.name(), value); + return Ok(()); + } + }; match state_str { "active" => stats.active_units = value, "failed" => stats.failed_units = value, @@ -173,7 +263,7 @@ pub async fn parse_metrics( pub async fn get_unit_stats( config: &crate::config::Config, socket_path: &str, -) -> std::result::Result> { +) -> anyhow::Result { if !config.units.state_stats_allowlist.is_empty() { debug!( "Using unit state allowlist: {:?}", @@ -200,27 +290,14 @@ pub async fn get_unit_stats( } /// Async wrapper that can update unit stats when passed a locked struct. -/// Falls back to D-Bus collection if varlink fails. pub async fn update_unit_stats( config: Arc, - connection: zbus::Connection, locked_machine_stats: Arc>, socket_path: String, ) -> anyhow::Result<()> { - match get_unit_stats(&config, &socket_path).await { - Ok(units_stats) => { - let mut machine_stats = locked_machine_stats.write().await; - machine_stats.units = units_stats; - } - Err(err) => { - warn!( - "Varlink units stats failed, falling back to D-Bus: {:?}", - err - ); - crate::units::update_unit_stats(Arc::clone(&config), connection, locked_machine_stats) - .await?; - } - } + let units_stats = get_unit_stats(&config, &socket_path).await?; + let mut machine_stats = locked_machine_stats.write().await; + machine_stats.units = units_stats; Ok(()) } @@ -257,7 +334,7 @@ mod tests { let config = default_units_config(); let metric = ListOutput { - name: "io.systemd.unit_active_state".to_string(), + name: "io.systemd.Manager.UnitActiveState".to_string(), value: string_value("active"), object: Some("my-service.service".to_string()), fields: None, @@ -281,7 +358,7 @@ mod tests { let config = default_units_config(); let metric = ListOutput { - name: "io.systemd.unit_load_state".to_string(), + name: "io.systemd.Manager.UnitLoadState".to_string(), value: string_value("not_found"), // Enum variant name uses underscore object: Some("missing.service".to_string()), fields: None, @@ -301,7 +378,7 @@ mod tests { let config = default_units_config(); let metric = ListOutput { - name: "io.systemd.nrestarts".to_string(), + name: "io.systemd.Manager.NRestarts".to_string(), value: int_value(5), object: Some("my-service.service".to_string()), fields: None, @@ -324,9 +401,9 @@ mod tests { let mut stats = SystemdUnitStats::default(); let config = default_units_config(); - // Test units_by_type_total + // Test UnitsByTypeTotal let type_metric = ListOutput { - name: "io.systemd.units_by_type_total".to_string(), + name: "io.systemd.Manager.UnitsByTypeTotal".to_string(), value: int_value(42), object: None, fields: Some(std::collections::HashMap::from([( @@ -337,9 +414,9 @@ mod tests { parse_one_metric(&mut stats, &type_metric, &config).unwrap(); assert_eq!(stats.service_units, 42); - // Test units_by_state_total + // Test UnitsByStateTotal let state_metric = ListOutput { - name: "io.systemd.units_by_state_total".to_string(), + name: "io.systemd.Manager.UnitsByStateTotal".to_string(), value: int_value(10), object: None, fields: Some(std::collections::HashMap::from([( @@ -358,19 +435,19 @@ mod tests { let metrics = vec![ ListOutput { - name: "io.systemd.unit_active_state".to_string(), + name: "io.systemd.Manager.UnitActiveState".to_string(), value: string_value("active"), object: Some("service1.service".to_string()), fields: None, }, ListOutput { - name: "io.systemd.unit_load_state".to_string(), + name: "io.systemd.Manager.UnitLoadState".to_string(), value: string_value("loaded"), object: Some("service1.service".to_string()), fields: None, }, ListOutput { - name: "io.systemd.unit_active_state".to_string(), + name: "io.systemd.Manager.UnitActiveState".to_string(), value: string_value("failed"), object: Some("service-2.service".to_string()), fields: None, @@ -413,30 +490,30 @@ mod tests { let mut stats = SystemdUnitStats::default(); let config = default_units_config(); - // Unknown active state defaults to unknown + // Unknown active state is skipped (not silently defaulted) let metric1 = ListOutput { - name: "io.systemd.unit_active_state".to_string(), + name: "io.systemd.Manager.UnitActiveState".to_string(), value: string_value("invalid_state"), object: Some("test.service".to_string()), fields: None, }; parse_one_metric(&mut stats, &metric1, &config).unwrap(); - assert_eq!( - stats.unit_states.get("test.service").unwrap().active_state, - SystemdUnitActiveState::unknown + assert!( + !stats.unit_states.contains_key("test.service"), + "invalid state should be skipped" ); - // Missing nrestarts value defaults to 0 + // Missing nrestarts value (null) is skipped let metric2 = ListOutput { - name: "io.systemd.nrestarts".to_string(), + name: "io.systemd.Manager.NRestarts".to_string(), value: empty_value(), object: Some("test2.service".to_string()), fields: None, }; parse_one_metric(&mut stats, &metric2, &config).unwrap(); - assert_eq!( - stats.service_stats.get("test2.service").unwrap().nrestarts, - 0 + assert!( + !stats.service_stats.contains_key("test2.service"), + "null value should be skipped" ); } @@ -447,7 +524,7 @@ mod tests { // Unknown unit type is ignored gracefully let metric1 = ListOutput { - name: "io.systemd.units_by_type_total".to_string(), + name: "io.systemd.Manager.UnitsByTypeTotal".to_string(), value: int_value(999), object: None, fields: Some(std::collections::HashMap::from([( @@ -460,7 +537,7 @@ mod tests { // Metric with no fields is handled gracefully let metric2 = ListOutput { - name: "io.systemd.units_by_type_total".to_string(), + name: "io.systemd.Manager.UnitsByTypeTotal".to_string(), value: int_value(42), object: None, fields: None, @@ -469,7 +546,7 @@ mod tests { // Non-string field value is ignored let metric3 = ListOutput { - name: "io.systemd.units_by_type_total".to_string(), + name: "io.systemd.Manager.UnitsByTypeTotal".to_string(), value: int_value(42), object: None, fields: Some(std::collections::HashMap::from([( @@ -481,7 +558,7 @@ mod tests { // Unhandled metric name is ignored let metric4 = ListOutput { - name: "io.systemd.unknown_metric".to_string(), + name: "io.systemd.Manager.UnknownMetric".to_string(), value: int_value(999), object: Some("test.service".to_string()), fields: None, @@ -513,49 +590,49 @@ mod tests { #[test] fn test_parse_metric_enum() { let metric_active = ListOutput { - name: "io.systemd.unit_active_state".to_string(), + name: "io.systemd.Manager.UnitActiveState".to_string(), value: string_value("active"), object: Some("test.service".to_string()), fields: None, }; assert_eq!( - parse_metric_enum(&metric_active, SystemdUnitActiveState::unknown), - SystemdUnitActiveState::active + parse_metric_enum::(&metric_active), + Some(SystemdUnitActiveState::active) ); let metric_loaded = ListOutput { - name: "io.systemd.unit_load_state".to_string(), + name: "io.systemd.Manager.UnitLoadState".to_string(), value: string_value("loaded"), object: Some("test.service".to_string()), fields: None, }; assert_eq!( - parse_metric_enum(&metric_loaded, SystemdUnitLoadState::unknown), - SystemdUnitLoadState::loaded + parse_metric_enum::(&metric_loaded), + Some(SystemdUnitLoadState::loaded) ); - // Invalid value uses default + // Invalid value returns None let metric_invalid = ListOutput { - name: "io.systemd.unit_active_state".to_string(), + name: "io.systemd.Manager.UnitActiveState".to_string(), value: string_value("invalid"), object: Some("test.service".to_string()), fields: None, }; assert_eq!( - parse_metric_enum(&metric_invalid, SystemdUnitActiveState::unknown), - SystemdUnitActiveState::unknown + parse_metric_enum::(&metric_invalid), + None ); - // Empty value uses default + // Null value returns None let metric_empty = ListOutput { - name: "io.systemd.unit_active_state".to_string(), + name: "io.systemd.Manager.UnitActiveState".to_string(), value: empty_value(), object: Some("test.service".to_string()), fields: None, }; assert_eq!( - parse_metric_enum(&metric_empty, SystemdUnitActiveState::unknown), - SystemdUnitActiveState::unknown + parse_metric_enum::(&metric_empty), + None ); } @@ -573,14 +650,14 @@ mod tests { for (state_str, expected) in active_states { let metric = ListOutput { - name: "io.systemd.unit_active_state".to_string(), + name: "io.systemd.Manager.UnitActiveState".to_string(), value: string_value(state_str), object: Some("test.service".to_string()), fields: None, }; assert_eq!( - parse_metric_enum(&metric, SystemdUnitActiveState::unknown), - expected + parse_metric_enum::(&metric), + Some(expected) ); } @@ -594,14 +671,14 @@ mod tests { for (state_str, expected) in load_states { let metric = ListOutput { - name: "io.systemd.unit_load_state".to_string(), + name: "io.systemd.Manager.UnitLoadState".to_string(), value: string_value(state_str), object: Some("test.service".to_string()), fields: None, }; assert_eq!( - parse_metric_enum(&metric, SystemdUnitLoadState::unknown), - expected + parse_metric_enum::(&metric), + Some(expected) ); } } @@ -613,7 +690,7 @@ mod tests { // Parse initial state let metric1 = ListOutput { - name: "io.systemd.unit_active_state".to_string(), + name: "io.systemd.Manager.UnitActiveState".to_string(), value: string_value("inactive"), object: Some("test.service".to_string()), fields: None, @@ -626,7 +703,7 @@ mod tests { // Update to active state let metric2 = ListOutput { - name: "io.systemd.unit_active_state".to_string(), + name: "io.systemd.Manager.UnitActiveState".to_string(), value: string_value("active"), object: Some("test.service".to_string()), fields: None, @@ -645,7 +722,7 @@ mod tests { // Set active state to failed let metric1 = ListOutput { - name: "io.systemd.unit_active_state".to_string(), + name: "io.systemd.Manager.UnitActiveState".to_string(), value: string_value("failed"), object: Some("broken.service".to_string()), fields: None, @@ -654,7 +731,7 @@ mod tests { // Set load state to loaded let metric2 = ListOutput { - name: "io.systemd.unit_load_state".to_string(), + name: "io.systemd.Manager.UnitLoadState".to_string(), value: string_value("loaded"), object: Some("broken.service".to_string()), fields: None, @@ -666,7 +743,7 @@ mod tests { // Set active state to active let metric3 = ListOutput { - name: "io.systemd.unit_active_state".to_string(), + name: "io.systemd.Manager.UnitActiveState".to_string(), value: string_value("active"), object: Some("healthy.service".to_string()), fields: None, @@ -675,7 +752,7 @@ mod tests { // Set load state to loaded let metric4 = ListOutput { - name: "io.systemd.unit_load_state".to_string(), + name: "io.systemd.Manager.UnitLoadState".to_string(), value: string_value("loaded"), object: Some("healthy.service".to_string()), fields: None, @@ -699,7 +776,7 @@ mod tests { // Allowed unit should be tracked let metric1 = ListOutput { - name: "io.systemd.unit_active_state".to_string(), + name: "io.systemd.Manager.UnitActiveState".to_string(), value: string_value("active"), object: Some("allowed.service".to_string()), fields: None, @@ -709,7 +786,7 @@ mod tests { // Non-allowed unit should be skipped let metric2 = ListOutput { - name: "io.systemd.unit_active_state".to_string(), + name: "io.systemd.Manager.UnitActiveState".to_string(), value: string_value("active"), object: Some("not-allowed.service".to_string()), fields: None, @@ -731,7 +808,7 @@ mod tests { // Blocked unit should be skipped let metric1 = ListOutput { - name: "io.systemd.unit_active_state".to_string(), + name: "io.systemd.Manager.UnitActiveState".to_string(), value: string_value("active"), object: Some("blocked.service".to_string()), fields: None, @@ -741,7 +818,7 @@ mod tests { // Non-blocked unit should be tracked let metric2 = ListOutput { - name: "io.systemd.unit_active_state".to_string(), + name: "io.systemd.Manager.UnitActiveState".to_string(), value: string_value("active"), object: Some("ok.service".to_string()), fields: None, @@ -763,7 +840,7 @@ mod tests { // Unit in both lists should be blocked (blocklist takes priority) let metric = ListOutput { - name: "io.systemd.unit_active_state".to_string(), + name: "io.systemd.Manager.UnitActiveState".to_string(), value: string_value("active"), object: Some("both.service".to_string()), fields: None,