diff --git a/Cargo.toml b/Cargo.toml index 19d0db6..b35452c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ categories = ["network-programming", "os::linux-apis"] anyhow = "1.0" clap = { version = "4.5", features = ["derive", "std", "help"], default-features = false } configparser = { version = "3.1.0", features = ["indexmap"] } +futures-util = "0.3" indexmap = "2.9" int-enum = "1.1" serde = { version = "1.0", features = ["derive"] } @@ -35,6 +36,7 @@ uzers = "0.12" zbus = { version = "5.11.0", features = ["tokio"], default-features = false } zvariant = "5.7.0" zvariant_derive = "5.7.0" +zlink = "0.4.0" [target.'cfg(target_os = "linux")'.dependencies] procfs = "0.18" diff --git a/README.md b/README.md index af27d04..e7efd35 100644 --- a/README.md +++ b/README.md @@ -439,7 +439,35 @@ always be running / calling all these DBus calls per run. ## Varlink -None yet :(. +monitord supports collecting unit statistics via systemd's [Varlink metrics API](https://github.com/systemd/systemd/pull/39202), +available in systemd v260+. When enabled, monitord connects to the `io.systemd.Metrics` interface +at `/run/systemd/report/io.systemd.Manager` to collect unit counts, active/load states, and restart counts. + +### Enabling Varlink + +Set `enabled = true` in the `[varlink]` section of `monitord.conf`: + +```ini +[varlink] +enabled = true +``` + +When varlink is enabled, monitord will attempt to collect unit stats via the metrics API first. +If the varlink socket is unavailable (e.g., systemd < v260), it automatically falls back to D-Bus collection. + +### Metrics collected via Varlink + +- Unit counts by type (service, mount, socket, target, device, automount, timer, path, slice, scope) +- Unit counts by state (active, failed, inactive) +- Per-unit active state and load state (with allowlist/blocklist filtering) +- Per-unit health status (computed from active + load state) +- Per-service restart counts (`nrestarts`) + +### Containers + +For systemd-nspawn containers, monitord connects to the container's varlink socket via +`/proc//root/run/systemd/report/io.systemd.Manager`, similar to how D-Bus uses +the container-scoped bus socket. ### varlink 101 diff --git a/monitord.conf b/monitord.conf index 70a8f5f..15d0ab0 100644 --- a/monitord.conf +++ b/monitord.conf @@ -51,6 +51,9 @@ sshd.service [machines] enabled = true +[varlink] +enabled = false + [machines.allowlist] fedora38 diff --git a/src/config.rs b/src/config.rs index 7d38703..940d689 100644 --- a/src/config.rs +++ b/src/config.rs @@ -192,6 +192,16 @@ pub struct VerifyConfig { pub blocklist: HashSet, } +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct VarlinkConfig { + pub enabled: bool, +} +impl Default for VarlinkConfig { + fn default() -> Self { + VarlinkConfig { enabled: false } + } +} + /// Config struct /// Each section represents an ini file section #[derive(Clone, Debug, Default, Eq, PartialEq)] @@ -207,6 +217,7 @@ pub struct Config { pub dbus_stats: DBusStatsConfig, pub boot_blame: BootBlameConfig, pub verify: VerifyConfig, + pub varlink: VarlinkConfig, } impl TryFrom for Config { @@ -325,6 +336,9 @@ impl TryFrom for Config { config.verify.blocklist = verify_blocklist.keys().map(|s| s.to_string()).collect(); } + // [varlink] section + config.varlink.enabled = read_config_bool(&ini_config, "varlink", "enabled")?; + Ok(config) } } @@ -427,6 +441,9 @@ foo.service [boot.blocklist] bar.service + +[varlink] +enabled = true "###; const MINIMAL_CONFIG: &str = r###" @@ -513,6 +530,7 @@ output_format = json-flat allowlist: HashSet::new(), blocklist: HashSet::new(), }, + varlink: VarlinkConfig { enabled: true }, }; let mut monitord_config = NamedTempFile::new().expect("Unable to make named tempfile"); diff --git a/src/lib.rs b/src/lib.rs index e184ddb..445e5f7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,7 +31,10 @@ pub mod networkd; pub mod pid1; pub mod system; pub mod timer; +pub mod unit_constants; pub mod units; +pub mod varlink; +pub mod varlink_units; pub mod verify; pub const DEFAULT_DBUS_ADDRESS: &str = "unix:path=/run/dbus/system_bus_socket"; @@ -167,11 +170,20 @@ pub async fn stat_collector( // Run service collectors if there are services listed in config if config.units.enabled { - join_set.spawn(crate::units::update_unit_stats( - Arc::clone(&config), - sdc.clone(), - locked_machine_stats.clone(), - )); + if config.varlink.enabled { + join_set.spawn(crate::varlink_units::update_unit_stats( + Arc::clone(&config), + sdc.clone(), + locked_machine_stats.clone(), + crate::varlink_units::METRICS_SOCKET_PATH.to_string(), + )); + } else { + join_set.spawn(crate::units::update_unit_stats( + Arc::clone(&config), + sdc.clone(), + locked_machine_stats.clone(), + )); + } } if config.machines.enabled { diff --git a/src/machines.rs b/src/machines.rs index 011af5a..deb5383 100644 --- a/src/machines.rs +++ b/src/machines.rs @@ -102,11 +102,25 @@ pub async fn update_machines_stats( )); if config.units.enabled { - join_set.spawn(crate::units::update_unit_stats( - Arc::clone(&config), - sdc.clone(), - locked_machine_stats.clone(), - )); + if config.varlink.enabled { + let container_socket_path = format!( + "/proc/{}/root{}", + leader_pid, + crate::varlink_units::METRICS_SOCKET_PATH + ); + join_set.spawn(crate::varlink_units::update_unit_stats( + Arc::clone(&config), + sdc.clone(), + locked_machine_stats.clone(), + container_socket_path, + )); + } else { + join_set.spawn(crate::units::update_unit_stats( + Arc::clone(&config), + sdc.clone(), + locked_machine_stats.clone(), + )); + } } if config.dbus_stats.enabled { diff --git a/src/unit_constants.rs b/src/unit_constants.rs new file mode 100644 index 0000000..3dc03bd --- /dev/null +++ b/src/unit_constants.rs @@ -0,0 +1,559 @@ +//! # Unit Constants Module +//! +//! Shared constants and enums for systemd unit states and operations. +//! Reference: + +use int_enum::IntEnum; +use serde_repr::*; +use strum_macros::EnumIter; +use strum_macros::EnumString; + +/// Possible systemd unit active states enumerated +#[allow(non_camel_case_types)] +#[derive( + Serialize_repr, + Deserialize_repr, + Clone, + Copy, + Debug, + Default, + Eq, + PartialEq, + EnumIter, + EnumString, + IntEnum, + strum_macros::Display, +)] +#[repr(u8)] +pub enum SystemdUnitActiveState { + #[default] + unknown = 0, + active = 1, + reloading = 2, + inactive = 3, + failed = 4, + activating = 5, + deactivating = 6, +} + +/// Possible systemd unit load states enumerated +#[allow(non_camel_case_types)] +#[derive( + Serialize_repr, + Deserialize_repr, + Clone, + Copy, + Debug, + Default, + Eq, + PartialEq, + EnumIter, + EnumString, + IntEnum, + strum_macros::Display, +)] +#[repr(u8)] +pub enum SystemdUnitLoadState { + #[default] + unknown = 0, + loaded = 1, + error = 2, + masked = 3, + not_found = 4, +} + +/// Check if we're a loaded unit and if so evaluate if we're active or not +/// If we're not +/// Only potentially mark unhealthy for LOADED units that are not active +pub fn is_unit_unhealthy( + active_state: SystemdUnitActiveState, + load_state: SystemdUnitLoadState, +) -> bool { + match load_state { + // We're loaded so let's see if we're active or not + SystemdUnitLoadState::loaded => !matches!(active_state, SystemdUnitActiveState::active), + // An admin can change a unit to be masked on purpose + // so we are going to ignore all masked units due to that + SystemdUnitLoadState::masked => false, + // Otherwise, we're unhealthy + _ => true, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::str::FromStr; + use strum::IntoEnumIterator; + + #[test] + fn test_is_unit_unhealthy() { + // Obvious active/loaded is healthy + assert!(!is_unit_unhealthy( + SystemdUnitActiveState::active, + SystemdUnitLoadState::loaded + )); + // Not active + loaded is not healthy + assert!(is_unit_unhealthy( + SystemdUnitActiveState::activating, + SystemdUnitLoadState::loaded + )); + // Not loaded + anything is just marked healthy as we're not expecting it to ever be healthy + assert!(!is_unit_unhealthy( + SystemdUnitActiveState::activating, + SystemdUnitLoadState::masked + )); + // Make error + not_found unhealthy too + assert!(is_unit_unhealthy( + SystemdUnitActiveState::deactivating, + SystemdUnitLoadState::not_found + )); + assert!(is_unit_unhealthy( + // Can never really be active here with error, but check we ignore it + SystemdUnitActiveState::active, + SystemdUnitLoadState::error, + )); + } + + #[test] + fn test_iterators() { + assert!(SystemdUnitActiveState::iter().collect::>().len() > 0); + assert!(SystemdUnitLoadState::iter().collect::>().len() > 0); + } + + #[test] + fn test_active_state_from_str() { + assert_eq!( + SystemdUnitActiveState::from_str("active").unwrap(), + SystemdUnitActiveState::active + ); + assert_eq!( + SystemdUnitActiveState::from_str("reloading").unwrap(), + SystemdUnitActiveState::reloading + ); + assert_eq!( + SystemdUnitActiveState::from_str("inactive").unwrap(), + SystemdUnitActiveState::inactive + ); + assert_eq!( + SystemdUnitActiveState::from_str("failed").unwrap(), + SystemdUnitActiveState::failed + ); + assert_eq!( + SystemdUnitActiveState::from_str("activating").unwrap(), + SystemdUnitActiveState::activating + ); + assert_eq!( + SystemdUnitActiveState::from_str("deactivating").unwrap(), + SystemdUnitActiveState::deactivating + ); + assert_eq!( + SystemdUnitActiveState::from_str("unknown").unwrap(), + SystemdUnitActiveState::unknown + ); + } + + #[test] + fn test_active_state_from_str_invalid() { + assert!(SystemdUnitActiveState::from_str("invalid").is_err()); + assert!(SystemdUnitActiveState::from_str("").is_err()); + assert!(SystemdUnitActiveState::from_str("ACTIVE").is_err()); + } + + #[test] + fn test_load_state_from_str() { + assert_eq!( + SystemdUnitLoadState::from_str("loaded").unwrap(), + SystemdUnitLoadState::loaded + ); + assert_eq!( + SystemdUnitLoadState::from_str("error").unwrap(), + SystemdUnitLoadState::error + ); + assert_eq!( + SystemdUnitLoadState::from_str("masked").unwrap(), + SystemdUnitLoadState::masked + ); + assert_eq!( + SystemdUnitLoadState::from_str("not_found").unwrap(), + SystemdUnitLoadState::not_found + ); + assert_eq!( + SystemdUnitLoadState::from_str("unknown").unwrap(), + SystemdUnitLoadState::unknown + ); + } + + #[test] + fn test_load_state_from_str_invalid() { + assert!(SystemdUnitLoadState::from_str("invalid").is_err()); + assert!(SystemdUnitLoadState::from_str("").is_err()); + assert!(SystemdUnitLoadState::from_str("LOADED").is_err()); + } + + #[test] + fn test_active_state_display() { + assert_eq!(format!("{}", SystemdUnitActiveState::active), "active"); + assert_eq!( + format!("{}", SystemdUnitActiveState::reloading), + "reloading" + ); + assert_eq!(format!("{}", SystemdUnitActiveState::inactive), "inactive"); + assert_eq!(format!("{}", SystemdUnitActiveState::failed), "failed"); + assert_eq!( + format!("{}", SystemdUnitActiveState::activating), + "activating" + ); + assert_eq!( + format!("{}", SystemdUnitActiveState::deactivating), + "deactivating" + ); + assert_eq!(format!("{}", SystemdUnitActiveState::unknown), "unknown"); + } + + #[test] + fn test_load_state_display() { + assert_eq!(format!("{}", SystemdUnitLoadState::loaded), "loaded"); + assert_eq!(format!("{}", SystemdUnitLoadState::error), "error"); + assert_eq!(format!("{}", SystemdUnitLoadState::masked), "masked"); + assert_eq!(format!("{}", SystemdUnitLoadState::not_found), "not_found"); + assert_eq!(format!("{}", SystemdUnitLoadState::unknown), "unknown"); + } + + #[test] + fn test_active_state_default() { + let state: SystemdUnitActiveState = Default::default(); + assert_eq!(state, SystemdUnitActiveState::unknown); + } + + #[test] + fn test_load_state_default() { + let state: SystemdUnitLoadState = Default::default(); + assert_eq!(state, SystemdUnitLoadState::unknown); + } + + #[test] + fn test_active_state_clone() { + let state = SystemdUnitActiveState::active; + let cloned = state.clone(); + assert_eq!(state, cloned); + } + + #[test] + fn test_load_state_clone() { + let state = SystemdUnitLoadState::loaded; + let cloned = state.clone(); + assert_eq!(state, cloned); + } + + #[test] + fn test_active_state_debug() { + let state = SystemdUnitActiveState::active; + let debug_str = format!("{:?}", state); + assert!(debug_str.contains("active")); + } + + #[test] + fn test_load_state_debug() { + let state = SystemdUnitLoadState::loaded; + let debug_str = format!("{:?}", state); + assert!(debug_str.contains("loaded")); + } + + #[test] + fn test_active_state_equality() { + assert_eq!( + SystemdUnitActiveState::active, + SystemdUnitActiveState::active + ); + assert_ne!( + SystemdUnitActiveState::active, + SystemdUnitActiveState::inactive + ); + } + + #[test] + fn test_load_state_equality() { + assert_eq!(SystemdUnitLoadState::loaded, SystemdUnitLoadState::loaded); + assert_ne!(SystemdUnitLoadState::loaded, SystemdUnitLoadState::masked); + } + + #[test] + fn test_active_state_int_enum() { + assert_eq!(SystemdUnitActiveState::unknown as u8, 0); + assert_eq!(SystemdUnitActiveState::active as u8, 1); + assert_eq!(SystemdUnitActiveState::reloading as u8, 2); + assert_eq!(SystemdUnitActiveState::inactive as u8, 3); + assert_eq!(SystemdUnitActiveState::failed as u8, 4); + assert_eq!(SystemdUnitActiveState::activating as u8, 5); + assert_eq!(SystemdUnitActiveState::deactivating as u8, 6); + } + + #[test] + fn test_load_state_int_enum() { + assert_eq!(SystemdUnitLoadState::unknown as u8, 0); + assert_eq!(SystemdUnitLoadState::loaded as u8, 1); + assert_eq!(SystemdUnitLoadState::error as u8, 2); + assert_eq!(SystemdUnitLoadState::masked as u8, 3); + assert_eq!(SystemdUnitLoadState::not_found as u8, 4); + } + + #[test] + fn test_active_state_from_int() { + assert_eq!( + SystemdUnitActiveState::try_from(0).unwrap(), + SystemdUnitActiveState::unknown + ); + assert_eq!( + SystemdUnitActiveState::try_from(1).unwrap(), + SystemdUnitActiveState::active + ); + assert_eq!( + SystemdUnitActiveState::try_from(2).unwrap(), + SystemdUnitActiveState::reloading + ); + assert_eq!( + SystemdUnitActiveState::try_from(3).unwrap(), + SystemdUnitActiveState::inactive + ); + assert_eq!( + SystemdUnitActiveState::try_from(4).unwrap(), + SystemdUnitActiveState::failed + ); + assert_eq!( + SystemdUnitActiveState::try_from(5).unwrap(), + SystemdUnitActiveState::activating + ); + assert_eq!( + SystemdUnitActiveState::try_from(6).unwrap(), + SystemdUnitActiveState::deactivating + ); + } + + #[test] + fn test_load_state_from_int() { + assert_eq!( + SystemdUnitLoadState::try_from(0).unwrap(), + SystemdUnitLoadState::unknown + ); + assert_eq!( + SystemdUnitLoadState::try_from(1).unwrap(), + SystemdUnitLoadState::loaded + ); + assert_eq!( + SystemdUnitLoadState::try_from(2).unwrap(), + SystemdUnitLoadState::error + ); + assert_eq!( + SystemdUnitLoadState::try_from(3).unwrap(), + SystemdUnitLoadState::masked + ); + assert_eq!( + SystemdUnitLoadState::try_from(4).unwrap(), + SystemdUnitLoadState::not_found + ); + } + + #[test] + fn test_active_state_from_int_invalid() { + assert!(SystemdUnitActiveState::try_from(255).is_err()); + assert!(SystemdUnitActiveState::try_from(100).is_err()); + } + + #[test] + fn test_load_state_from_int_invalid() { + assert!(SystemdUnitLoadState::try_from(255).is_err()); + assert!(SystemdUnitLoadState::try_from(100).is_err()); + } + + #[test] + fn test_is_unit_unhealthy_all_combinations() { + // Test loaded state with all active states + assert!(!is_unit_unhealthy( + SystemdUnitActiveState::active, + SystemdUnitLoadState::loaded + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::reloading, + SystemdUnitLoadState::loaded + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::inactive, + SystemdUnitLoadState::loaded + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::failed, + SystemdUnitLoadState::loaded + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::activating, + SystemdUnitLoadState::loaded + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::deactivating, + SystemdUnitLoadState::loaded + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::unknown, + SystemdUnitLoadState::loaded + )); + + // Test masked state with all active states (always healthy) + assert!(!is_unit_unhealthy( + SystemdUnitActiveState::active, + SystemdUnitLoadState::masked + )); + assert!(!is_unit_unhealthy( + SystemdUnitActiveState::reloading, + SystemdUnitLoadState::masked + )); + assert!(!is_unit_unhealthy( + SystemdUnitActiveState::inactive, + SystemdUnitLoadState::masked + )); + assert!(!is_unit_unhealthy( + SystemdUnitActiveState::failed, + SystemdUnitLoadState::masked + )); + assert!(!is_unit_unhealthy( + SystemdUnitActiveState::activating, + SystemdUnitLoadState::masked + )); + assert!(!is_unit_unhealthy( + SystemdUnitActiveState::deactivating, + SystemdUnitLoadState::masked + )); + assert!(!is_unit_unhealthy( + SystemdUnitActiveState::unknown, + SystemdUnitLoadState::masked + )); + + // Test error state with all active states (always unhealthy) + assert!(is_unit_unhealthy( + SystemdUnitActiveState::active, + SystemdUnitLoadState::error + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::reloading, + SystemdUnitLoadState::error + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::inactive, + SystemdUnitLoadState::error + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::failed, + SystemdUnitLoadState::error + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::activating, + SystemdUnitLoadState::error + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::deactivating, + SystemdUnitLoadState::error + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::unknown, + SystemdUnitLoadState::error + )); + + // Test not_found state with all active states (always unhealthy) + assert!(is_unit_unhealthy( + SystemdUnitActiveState::active, + SystemdUnitLoadState::not_found + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::reloading, + SystemdUnitLoadState::not_found + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::inactive, + SystemdUnitLoadState::not_found + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::failed, + SystemdUnitLoadState::not_found + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::activating, + SystemdUnitLoadState::not_found + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::deactivating, + SystemdUnitLoadState::not_found + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::unknown, + SystemdUnitLoadState::not_found + )); + + // Test unknown state with all active states (always unhealthy) + assert!(is_unit_unhealthy( + SystemdUnitActiveState::active, + SystemdUnitLoadState::unknown + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::reloading, + SystemdUnitLoadState::unknown + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::inactive, + SystemdUnitLoadState::unknown + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::failed, + SystemdUnitLoadState::unknown + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::activating, + SystemdUnitLoadState::unknown + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::deactivating, + SystemdUnitLoadState::unknown + )); + assert!(is_unit_unhealthy( + SystemdUnitActiveState::unknown, + SystemdUnitLoadState::unknown + )); + } + + #[test] + fn test_active_state_serialization() { + let state = SystemdUnitActiveState::active; + let serialized = serde_json::to_string(&state).unwrap(); + assert_eq!(serialized, "1"); + + let deserialized: SystemdUnitActiveState = serde_json::from_str(&serialized).unwrap(); + assert_eq!(deserialized, state); + } + + #[test] + fn test_load_state_serialization() { + let state = SystemdUnitLoadState::loaded; + let serialized = serde_json::to_string(&state).unwrap(); + assert_eq!(serialized, "1"); + + let deserialized: SystemdUnitLoadState = serde_json::from_str(&serialized).unwrap(); + assert_eq!(deserialized, state); + } + + #[test] + fn test_active_state_all_variants_serialization() { + for state in SystemdUnitActiveState::iter() { + let serialized = serde_json::to_string(&state).unwrap(); + let deserialized: SystemdUnitActiveState = serde_json::from_str(&serialized).unwrap(); + assert_eq!(deserialized, state); + } + } + + #[test] + fn test_load_state_all_variants_serialization() { + for state in SystemdUnitLoadState::iter() { + let serialized = serde_json::to_string(&state).unwrap(); + let deserialized: SystemdUnitLoadState = serde_json::from_str(&serialized).unwrap(); + assert_eq!(deserialized, state); + } + } +} diff --git a/src/units.rs b/src/units.rs index 347e0c2..10eb613 100644 --- a/src/units.rs +++ b/src/units.rs @@ -9,11 +9,7 @@ use std::sync::Arc; use std::time::SystemTime; use std::time::UNIX_EPOCH; -use int_enum::IntEnum; -use serde_repr::*; use struct_field_names_as_array::FieldNamesAsArray; -use strum_macros::EnumIter; -use strum_macros::EnumString; use thiserror::Error; use tokio::sync::RwLock; use tracing::debug; @@ -34,6 +30,11 @@ pub enum MonitordUnitsError { use crate::timer::TimerStats; use crate::MachineStats; +// Re-export the enums and function from unit_constants for backwards compatibility +pub use crate::unit_constants::is_unit_unhealthy; +pub use crate::unit_constants::SystemdUnitActiveState; +pub use crate::unit_constants::SystemdUnitLoadState; + #[derive( serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, FieldNamesAsArray, PartialEq, )] @@ -153,75 +154,6 @@ pub struct UnitStates { // Reference: https://www.freedesktop.org/software/systemd/man/org.freedesktop.systemd1.html // SubState can be unit-type-specific so can't enum -/// Systemd unit active states representing the unit's runtime lifecycle. -/// Ref: -#[allow(non_camel_case_types)] -#[derive( - Serialize_repr, - Deserialize_repr, - Clone, - Copy, - Debug, - Default, - Eq, - PartialEq, - EnumIter, - EnumString, - IntEnum, - strum_macros::Display, -)] -#[repr(u8)] -pub enum SystemdUnitActiveState { - /// State could not be determined - #[default] - unknown = 0, - /// Unit is currently running / started - active = 1, - /// Unit is reloading its configuration - reloading = 2, - /// Unit is not running - inactive = 3, - /// Unit has failed (process exited with error, crashed, or an operation timed out) - failed = 4, - /// Unit is in the process of being started - activating = 5, - /// Unit is in the process of being stopped - deactivating = 6, -} - -/// Systemd unit load states indicating whether the unit file was successfully read. -/// Ref: -#[allow(non_camel_case_types)] -#[derive( - Serialize_repr, - Deserialize_repr, - Clone, - Copy, - Debug, - Default, - Eq, - PartialEq, - EnumIter, - EnumString, - IntEnum, - strum_macros::Display, -)] -#[repr(u8)] -pub enum SystemdUnitLoadState { - /// Load state could not be determined - #[default] - unknown = 0, - /// Unit file was successfully parsed and loaded into memory - loaded = 1, - /// Unit file could not be parsed or an error occurred during loading - error = 2, - /// Unit is masked (symlinked to /dev/null), preventing it from being started - masked = 3, - /// Unit file does not exist on disk - not_found = 4, -} - -/// Representation of the returned Tuple from list_units - Better typing etc. #[derive(Debug)] pub struct ListedUnit { pub name: String, // The primary unit name @@ -357,24 +289,6 @@ async fn parse_service( }) } -/// Check if we're a loaded unit and if so evaluate if we're acitive or not -/// If we're not -/// Only potentially mark unhealthy for LOADED units that are not active -pub fn is_unit_unhealthy( - active_state: SystemdUnitActiveState, - load_state: SystemdUnitLoadState, -) -> bool { - match load_state { - // We're loaded so let's see if we're active or not - SystemdUnitLoadState::loaded => !matches!(active_state, SystemdUnitActiveState::active), - // An admin can change a unit to be masked on purpose - // so we are going to ignore all masked units due to that - SystemdUnitLoadState::masked => false, - // Otherwise, we're unhealthy - _ => true, - } -} - async fn get_time_in_state( connection: Option<&zbus::Connection>, unit: &ListedUnit, @@ -598,35 +512,6 @@ mod tests { } } - #[test] - fn test_is_unit_healthy() { - // Obvious active/loaded is healthy - assert!(!is_unit_unhealthy( - SystemdUnitActiveState::active, - SystemdUnitLoadState::loaded - )); - // Not active + loaded is not healthy - assert!(is_unit_unhealthy( - SystemdUnitActiveState::activating, - SystemdUnitLoadState::loaded - )); - // Not loaded + anything is just marked healthy as we're not expecting it to ever be healthy - assert!(!is_unit_unhealthy( - SystemdUnitActiveState::activating, - SystemdUnitLoadState::masked - )); - // Make error + not_found unhealthy too - assert!(is_unit_unhealthy( - SystemdUnitActiveState::deactivating, - SystemdUnitLoadState::not_found - )); - assert!(is_unit_unhealthy( - // Can never really be active here with error, but check we ignore it - SystemdUnitActiveState::active, - SystemdUnitLoadState::error, - )); - } - #[tokio::test] async fn test_state_parse() -> Result<(), MonitordUnitsError> { let test_unit_name = String::from("apport-autoreport.timer"); diff --git a/src/varlink/metrics.rs b/src/varlink/metrics.rs new file mode 100644 index 0000000..215a9c2 --- /dev/null +++ b/src/varlink/metrics.rs @@ -0,0 +1,409 @@ +//! This code is adapted from the code generated by `zlink-codegen` from Varlink IDL. + +use serde::{Deserialize, Serialize}; +use zlink::{proxy, ReplyError}; + +/// Proxy trait for calling methods on the interface. +#[proxy("io.systemd.Metrics")] +pub trait Metrics { + /// A struct representing various metric value types. A metric can be of one type + /// [Requires 'more' flag] + #[zlink(more)] + async fn list( + &mut self, + ) -> zlink::Result< + impl futures_util::Stream>>, + >; + /// Method to get the metric families + /// [Requires 'more' flag] + #[zlink(more)] + async fn describe( + &mut self, + ) -> zlink::Result< + impl futures_util::Stream>>, + >; +} + +/// Output parameters for the List method. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct ListOutput { + pub name: String, + pub value: serde_json::Value, + pub object: Option, + pub fields: Option>, +} + +impl ListOutput { + /// Returns the name of the metric + pub fn name(&self) -> &str { + &self.name + } + + /// Returns the name of the metric + pub fn name_suffix(&self) -> &str { + self.name + .rsplit_once('.') + .map(|(_, suffix)| suffix) + .unwrap_or(&self.name) + } + + /// Returns the value of the metric + pub fn value(&self) -> &serde_json::Value { + &self.value + } + + /// Returns the object name if present + pub fn object(&self) -> Option<&str> { + self.object.as_deref() + } + + /// Returns the object name or empty string if not present + pub fn object_name(&self) -> String { + self.object.as_deref().unwrap_or("").to_string() + } + + /// Returns the string value or default_value if not present + pub fn value_as_string<'a>(&'a self, default_value: &'a str) -> &'a str { + self.value.as_str().unwrap_or(default_value) + } + + /// Returns the int value as u64 or default_value if not present + pub fn value_as_int(&self, default_value: u64) -> u64 { + self.value + .as_i64() + .filter(|v| *v >= 0) + .map(|v| v as u64) + .unwrap_or(default_value) + } + + /// Returns the bool value if present + pub fn value_as_bool(&self) -> Option { + self.value.as_bool() + } + + /// Returns the fields map if present + pub fn fields(&self) -> Option<&std::collections::HashMap> { + self.fields.as_ref() + } + + /// Extract a string field value from the fields map by field name + pub fn get_field_as_str(&self, field_name: &str) -> Option<&str> { + self.fields + .as_ref() + .and_then(|f| f.get(field_name)) + .and_then(|v| v.as_str()) + } +} +/// Output parameters for the Describe method. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct DescribeOutput { + pub name: String, + pub description: String, + pub r#type: MetricFamilyType, +} +/// An enum representing various metric family types +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum MetricFamilyType { + /// A counter metric family type which is a monotonically increasing value + Counter, + /// A gauge metric family type which is a value that can go up and down + Gauge, + /// A string metric family type + String, +} + +/// Errors that can occur in this interface. +#[derive(Debug, Clone, PartialEq, ReplyError)] +#[zlink(interface = "io.systemd.Metrics")] +pub enum MetricsError { + /// No such metric found + NoSuchMetric, +} + +impl std::fmt::Display for MetricsError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + MetricsError::NoSuchMetric => write!(f, "No such metric found"), + } + } +} + +impl std::error::Error for MetricsError {} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_object_name_with_value() { + let output = ListOutput { + name: "test.metric".to_string(), + value: serde_json::Value::Null, + object: Some("my-service.service".to_string()), + fields: None, + }; + + assert_eq!(output.object_name(), "my-service.service"); + } + + #[test] + fn test_object_name_without_value() { + let output = ListOutput { + name: "test.metric".to_string(), + value: serde_json::Value::Null, + object: None, + fields: None, + }; + + assert_eq!(output.object_name(), ""); + } + + #[test] + fn test_object_name_with_empty_string() { + let output = ListOutput { + name: "test.metric".to_string(), + value: serde_json::Value::Null, + object: Some("".to_string()), + fields: None, + }; + + assert_eq!(output.object_name(), ""); + } + + #[test] + fn test_object_returns_option() { + let output_with_object = ListOutput { + name: "test.metric".to_string(), + value: serde_json::Value::Null, + object: Some("service.service".to_string()), + fields: None, + }; + + let output_without_object = ListOutput { + name: "test.metric".to_string(), + value: serde_json::Value::Null, + object: None, + fields: None, + }; + + assert_eq!(output_with_object.object(), Some("service.service")); + assert_eq!(output_without_object.object(), None); + } + + #[test] + fn test_get_field_as_str_existing_field() { + let mut fields = std::collections::HashMap::new(); + fields.insert("type".to_string(), serde_json::json!("service")); + fields.insert("state".to_string(), serde_json::json!("active")); + + let output = ListOutput { + name: "test.metric".to_string(), + value: serde_json::Value::Null, + object: None, + fields: Some(fields), + }; + + assert_eq!(output.get_field_as_str("type"), Some("service")); + assert_eq!(output.get_field_as_str("state"), Some("active")); + } + + #[test] + fn test_get_field_as_str_missing_field() { + let fields = std::collections::HashMap::new(); + + let output = ListOutput { + name: "test.metric".to_string(), + value: serde_json::Value::Null, + object: None, + fields: Some(fields), + }; + + assert_eq!(output.get_field_as_str("nonexistent"), None); + } + + #[test] + fn test_get_field_as_str_no_fields() { + let output = ListOutput { + name: "test.metric".to_string(), + value: serde_json::Value::Null, + object: None, + fields: None, + }; + + assert_eq!(output.get_field_as_str("type"), None); + } + + #[test] + fn test_get_field_as_str_non_string_value() { + let mut fields = std::collections::HashMap::new(); + fields.insert("number".to_string(), serde_json::json!(123)); + fields.insert("bool".to_string(), serde_json::json!(true)); + + let output = ListOutput { + name: "test.metric".to_string(), + value: serde_json::Value::Null, + object: None, + fields: Some(fields), + }; + + assert_eq!(output.get_field_as_str("number"), None); + assert_eq!(output.get_field_as_str("bool"), None); + } + + #[test] + fn test_name_suffix() { + let output = ListOutput { + name: "io.systemd.unit_active_state".to_string(), + value: serde_json::Value::Null, + object: None, + fields: None, + }; + + assert_eq!(output.name_suffix(), "unit_active_state"); + } + + #[test] + fn test_name_suffix_no_dots() { + let output = ListOutput { + name: "simple_name".to_string(), + value: serde_json::Value::Null, + object: None, + fields: None, + }; + + assert_eq!(output.name_suffix(), "simple_name"); + } + + #[test] + fn test_name_suffix_empty() { + let output = ListOutput { + name: "".to_string(), + value: serde_json::Value::Null, + object: None, + fields: None, + }; + + assert_eq!(output.name_suffix(), ""); + } + + #[test] + fn test_value_as_string_with_value() { + let output = ListOutput { + name: "test.metric".to_string(), + value: serde_json::json!("active"), + object: None, + fields: None, + }; + + assert_eq!(output.value_as_string("unknown"), "active"); + } + + #[test] + fn test_value_as_string_without_value() { + let output = ListOutput { + name: "test.metric".to_string(), + value: serde_json::Value::Null, + object: None, + fields: None, + }; + + assert_eq!(output.value_as_string("unknown"), "unknown"); + } + + #[test] + fn test_value_as_string_empty_string() { + let output = ListOutput { + name: "test.metric".to_string(), + value: serde_json::json!(""), + object: None, + fields: None, + }; + + assert_eq!(output.value_as_string("default"), ""); + } + + #[test] + fn test_value_as_int_with_value() { + let output = ListOutput { + name: "test.metric".to_string(), + value: serde_json::json!(42), + object: None, + fields: None, + }; + + assert_eq!(output.value_as_int(0), 42); + } + + #[test] + fn test_value_as_int_without_value() { + let output = ListOutput { + name: "test.metric".to_string(), + value: serde_json::Value::Null, + object: None, + fields: None, + }; + + assert_eq!(output.value_as_int(0), 0); + } + + #[test] + fn test_value_as_int_zero() { + let output = ListOutput { + name: "test.metric".to_string(), + value: serde_json::json!(0), + object: None, + fields: None, + }; + + assert_eq!(output.value_as_int(0), 0); + } + + #[test] + fn test_value_as_int_large_number() { + let output = ListOutput { + name: "test.metric".to_string(), + value: serde_json::json!(9999999999_i64), + object: None, + fields: None, + }; + + assert_eq!(output.value_as_int(0), 9999999999); + } + + #[test] + fn test_value_as_bool_true() { + let output = ListOutput { + name: "test.metric".to_string(), + value: serde_json::json!(true), + object: None, + fields: None, + }; + + assert_eq!(output.value_as_bool(), Some(true)); + } + + #[test] + fn test_value_as_bool_false() { + let output = ListOutput { + name: "test.metric".to_string(), + value: serde_json::json!(false), + object: None, + fields: None, + }; + + assert_eq!(output.value_as_bool(), Some(false)); + } + + #[test] + fn test_value_as_bool_none() { + let output = ListOutput { + name: "test.metric".to_string(), + value: serde_json::Value::Null, + object: None, + fields: None, + }; + + assert_eq!(output.value_as_bool(), None); + } +} diff --git a/src/varlink/mod.rs b/src/varlink/mod.rs new file mode 100644 index 0000000..e144883 --- /dev/null +++ b/src/varlink/mod.rs @@ -0,0 +1 @@ +pub mod metrics; diff --git a/src/varlink_units.rs b/src/varlink_units.rs new file mode 100644 index 0000000..e83813c --- /dev/null +++ b/src/varlink_units.rs @@ -0,0 +1,774 @@ +//! # units module +//! +//! All main systemd unit statistics. Counts of types of units, unit states and +//! queued jobs. We also house service specific statistics and system unit states. + +use std::str::FromStr; +use std::sync::Arc; + +use tokio::sync::RwLock; +use tracing::debug; + +use tracing::warn; + +use crate::unit_constants::{is_unit_unhealthy, SystemdUnitActiveState, SystemdUnitLoadState}; +use crate::units::SystemdUnitStats; +use crate::varlink::metrics::{ListOutput, Metrics}; +use crate::MachineStats; +use futures_util::stream::TryStreamExt; +use zlink::unix; + +pub const METRICS_SOCKET_PATH: &str = "/run/systemd/report/io.systemd.Manager"; + +/// Parse a string value from a metric into an enum type with a default fallback +fn parse_metric_enum(metric: &ListOutput, default: T) -> T { + let value_str = metric.value_as_string("" /* default_value */); + T::from_str(value_str).unwrap_or(default) +} + +/// Check if a unit name should be skipped based on allowlist/blocklist +fn should_skip_unit(object_name: &str, config: &crate::config::UnitsConfig) -> bool { + if config.state_stats_blocklist.contains(object_name) { + debug!("Skipping state stats for {} due to blocklist", object_name); + return true; + } + if !config.state_stats_allowlist.is_empty() + && !config.state_stats_allowlist.contains(object_name) + { + return true; + } + false +} + +/// Parse state of a unit into our unit_states hash +pub fn parse_one_metric( + stats: &mut SystemdUnitStats, + metric: &ListOutput, + config: &crate::config::UnitsConfig, +) -> anyhow::Result<()> { + let metric_name_suffix = metric.name_suffix(); + let object_name = metric.object_name(); + + match metric_name_suffix { + "unit_active_state" => { + if should_skip_unit(&object_name, config) { + return Ok(()); + } + let unit_state = stats + .unit_states + .entry(object_name.to_string()) + .or_default(); + unit_state.active_state = parse_metric_enum(metric, SystemdUnitActiveState::unknown); + unit_state.unhealthy = + is_unit_unhealthy(unit_state.active_state, unit_state.load_state); + } + "unit_load_state" => { + if should_skip_unit(&object_name, config) { + return Ok(()); + } + let value = metric.value_as_string("" /* default_value */); + let unit_state = stats + .unit_states + .entry(object_name.to_string()) + .or_default(); + unit_state.load_state = + SystemdUnitLoadState::from_str(value).unwrap_or(SystemdUnitLoadState::unknown); + unit_state.unhealthy = + is_unit_unhealthy(unit_state.active_state, unit_state.load_state); + } + "nrestarts" => { + if should_skip_unit(&object_name, config) { + return Ok(()); + } + stats + .service_stats + .entry(object_name.to_string()) + .or_default() + .nrestarts = metric.value_as_int(0 /* default_value */) as u32; + } + "units_by_type_total" => { + if let Some(type_str) = metric.get_field_as_str("type") { + let value = metric.value_as_int(0 /* default_value */); + match type_str { + "automount" => stats.automount_units = value, + "device" => stats.device_units = value, + "mount" => stats.mount_units = value, + "path" => stats.path_units = value, + "scope" => stats.scope_units = value, + "service" => stats.service_units = value, + "slice" => stats.slice_units = value, + "socket" => stats.socket_units = value, + "target" => stats.target_units = value, + "timer" => stats.timer_units = value, + _ => debug!("Found unhandled unit type: {:?}", type_str), + } + } + } + "units_by_state_total" => { + if let Some(state_str) = metric.get_field_as_str("state") { + let value = metric.value_as_int(0 /* default_value */); + match state_str { + "active" => stats.active_units = value, + "failed" => stats.failed_units = value, + "inactive" => stats.inactive_units = value, + _ => debug!("Found unhandled unit state: {:?}", state_str), + } + } + } + _ => debug!("Found unhandled metric: {:?}", metric.name()), + } + + Ok(()) +} + +/// Collect all metrics from the varlink socket. +/// Runs on a blocking thread with a dedicated runtime because the zlink +/// stream is !Send and cannot be held across await points in a Send future. +async fn collect_metrics(socket_path: String) -> anyhow::Result> { + tokio::task::spawn_blocking(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + rt.block_on(async move { + let mut conn = unix::connect(&socket_path).await?; + let stream = conn.list().await?; + futures_util::pin_mut!(stream); + + let mut metrics = Vec::new(); + let mut count = 0; + while let Some(result) = stream.try_next().await? { + let result: std::result::Result = result; + match result { + Ok(metric) => { + debug!("Metrics {}: {:?}", count, metric); + count += 1; + metrics.push(metric); + } + Err(e) => { + debug!("Error deserializing metric {}: {:?}", count, e); + return Err(anyhow::anyhow!(e)); + } + } + } + Ok(metrics) + }) + }) + .await? +} + +pub async fn parse_metrics( + stats: &mut SystemdUnitStats, + socket_path: &str, + config: &crate::config::UnitsConfig, +) -> anyhow::Result<()> { + let metrics = collect_metrics(socket_path.to_string()).await?; + + for metric in &metrics { + parse_one_metric(stats, metric, config)?; + } + + Ok(()) +} + +pub async fn get_unit_stats( + config: &crate::config::Config, + socket_path: &str, +) -> std::result::Result> { + if !config.units.state_stats_allowlist.is_empty() { + debug!( + "Using unit state allowlist: {:?}", + config.units.state_stats_allowlist + ); + } + + if !config.units.state_stats_blocklist.is_empty() { + debug!( + "Using unit state blocklist: {:?}", + config.units.state_stats_blocklist, + ); + } + + let mut stats = SystemdUnitStats::default(); + + // Collect per unit state stats - ActiveState + LoadState via metrics API + if config.units.state_stats { + parse_metrics(&mut stats, socket_path, &config.units).await?; + } + + debug!("unit stats: {:?}", stats); + Ok(stats) +} + +/// Async wrapper that can update unit stats when passed a locked struct. +/// Falls back to D-Bus collection if varlink fails. +pub async fn update_unit_stats( + config: Arc, + connection: zbus::Connection, + locked_machine_stats: Arc>, + socket_path: String, +) -> anyhow::Result<()> { + match get_unit_stats(&config, &socket_path).await { + Ok(units_stats) => { + let mut machine_stats = locked_machine_stats.write().await; + machine_stats.units = units_stats; + } + Err(err) => { + warn!( + "Varlink units stats failed, falling back to D-Bus: {:?}", + err + ); + crate::units::update_unit_stats(Arc::clone(&config), connection, locked_machine_stats) + .await?; + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashSet; + + fn string_value(s: &str) -> serde_json::Value { + serde_json::json!(s) + } + + fn int_value(i: i64) -> serde_json::Value { + serde_json::json!(i) + } + + fn empty_value() -> serde_json::Value { + serde_json::Value::Null + } + + fn default_units_config() -> crate::config::UnitsConfig { + crate::config::UnitsConfig { + enabled: true, + state_stats: true, + state_stats_allowlist: HashSet::new(), + state_stats_blocklist: HashSet::new(), + state_stats_time_in_state: false, + } + } + + #[tokio::test] + async fn test_parse_one_metric_unit_active_state() { + let mut stats = SystemdUnitStats::default(); + let config = default_units_config(); + + let metric = ListOutput { + name: "io.systemd.unit_active_state".to_string(), + value: string_value("active"), + object: Some("my-service.service".to_string()), + fields: None, + }; + + parse_one_metric(&mut stats, &metric, &config).unwrap(); + + assert_eq!( + stats + .unit_states + .get("my-service.service") + .unwrap() + .active_state, + SystemdUnitActiveState::active + ); + } + + #[tokio::test] + async fn test_parse_one_metric_unit_load_state() { + let mut stats = SystemdUnitStats::default(); + let config = default_units_config(); + + let metric = ListOutput { + name: "io.systemd.unit_load_state".to_string(), + value: string_value("not_found"), // Enum variant name uses underscore + object: Some("missing.service".to_string()), + fields: None, + }; + + parse_one_metric(&mut stats, &metric, &config).unwrap(); + + assert_eq!( + stats.unit_states.get("missing.service").unwrap().load_state, + SystemdUnitLoadState::not_found + ); + } + + #[tokio::test] + async fn test_parse_one_metric_nrestarts() { + let mut stats = SystemdUnitStats::default(); + let config = default_units_config(); + + let metric = ListOutput { + name: "io.systemd.nrestarts".to_string(), + value: int_value(5), + object: Some("my-service.service".to_string()), + fields: None, + }; + + parse_one_metric(&mut stats, &metric, &config).unwrap(); + + assert_eq!( + stats + .service_stats + .get("my-service.service") + .unwrap() + .nrestarts, + 5 + ); + } + + #[tokio::test] + async fn test_parse_aggregated_metrics() { + let mut stats = SystemdUnitStats::default(); + let config = default_units_config(); + + // Test units_by_type_total + let type_metric = ListOutput { + name: "io.systemd.units_by_type_total".to_string(), + value: int_value(42), + object: None, + fields: Some(std::collections::HashMap::from([( + "type".to_string(), + serde_json::json!("service"), + )])), + }; + parse_one_metric(&mut stats, &type_metric, &config).unwrap(); + assert_eq!(stats.service_units, 42); + + // Test units_by_state_total + let state_metric = ListOutput { + name: "io.systemd.units_by_state_total".to_string(), + value: int_value(10), + object: None, + fields: Some(std::collections::HashMap::from([( + "state".to_string(), + serde_json::json!("active"), + )])), + }; + parse_one_metric(&mut stats, &state_metric, &config).unwrap(); + assert_eq!(stats.active_units, 10); + } + + #[tokio::test] + async fn test_parse_multiple_units() { + let mut stats = SystemdUnitStats::default(); + let config = default_units_config(); + + let metrics = vec![ + ListOutput { + name: "io.systemd.unit_active_state".to_string(), + value: string_value("active"), + object: Some("service1.service".to_string()), + fields: None, + }, + ListOutput { + name: "io.systemd.unit_load_state".to_string(), + value: string_value("loaded"), + object: Some("service1.service".to_string()), + fields: None, + }, + ListOutput { + name: "io.systemd.unit_active_state".to_string(), + value: string_value("failed"), + object: Some("service-2.service".to_string()), + fields: None, + }, + ]; + + for metric in metrics { + parse_one_metric(&mut stats, &metric, &config).unwrap(); + } + + assert_eq!(stats.unit_states.len(), 2); + assert_eq!( + stats + .unit_states + .get("service1.service") + .unwrap() + .active_state, + SystemdUnitActiveState::active + ); + assert_eq!( + stats + .unit_states + .get("service1.service") + .unwrap() + .load_state, + SystemdUnitLoadState::loaded + ); + assert_eq!( + stats + .unit_states + .get("service-2.service") + .unwrap() + .active_state, + SystemdUnitActiveState::failed + ); + } + + #[tokio::test] + async fn test_parse_unknown_and_missing_values() { + let mut stats = SystemdUnitStats::default(); + let config = default_units_config(); + + // Unknown active state defaults to unknown + let metric1 = ListOutput { + name: "io.systemd.unit_active_state".to_string(), + value: string_value("invalid_state"), + object: Some("test.service".to_string()), + fields: None, + }; + parse_one_metric(&mut stats, &metric1, &config).unwrap(); + assert_eq!( + stats.unit_states.get("test.service").unwrap().active_state, + SystemdUnitActiveState::unknown + ); + + // Missing nrestarts value defaults to 0 + let metric2 = ListOutput { + name: "io.systemd.nrestarts".to_string(), + value: empty_value(), + object: Some("test2.service".to_string()), + fields: None, + }; + parse_one_metric(&mut stats, &metric2, &config).unwrap(); + assert_eq!( + stats.service_stats.get("test2.service").unwrap().nrestarts, + 0 + ); + } + + #[tokio::test] + async fn test_parse_edge_cases() { + let mut stats = SystemdUnitStats::default(); + let config = default_units_config(); + + // Unknown unit type is ignored gracefully + let metric1 = ListOutput { + name: "io.systemd.units_by_type_total".to_string(), + value: int_value(999), + object: None, + fields: Some(std::collections::HashMap::from([( + "type".to_string(), + serde_json::json!("unknown_type"), + )])), + }; + parse_one_metric(&mut stats, &metric1, &config).unwrap(); + assert_eq!(stats.service_units, 0); + + // Metric with no fields is handled gracefully + let metric2 = ListOutput { + name: "io.systemd.units_by_type_total".to_string(), + value: int_value(42), + object: None, + fields: None, + }; + parse_one_metric(&mut stats, &metric2, &config).unwrap(); + + // Non-string field value is ignored + let metric3 = ListOutput { + name: "io.systemd.units_by_type_total".to_string(), + value: int_value(42), + object: None, + fields: Some(std::collections::HashMap::from([( + "type".to_string(), + serde_json::json!(123), + )])), + }; + parse_one_metric(&mut stats, &metric3, &config).unwrap(); + + // Unhandled metric name is ignored + let metric4 = ListOutput { + name: "io.systemd.unknown_metric".to_string(), + value: int_value(999), + object: Some("test.service".to_string()), + fields: None, + }; + parse_one_metric(&mut stats, &metric4, &config).unwrap(); + } + + #[tokio::test] + async fn test_get_unit_stats_with_state_stats_disabled() { + let config = crate::config::Config { + units: crate::config::UnitsConfig { + enabled: true, + state_stats: false, + state_stats_allowlist: HashSet::new(), + state_stats_blocklist: HashSet::new(), + state_stats_time_in_state: true, + }, + ..Default::default() + }; + + let result = get_unit_stats(&config, METRICS_SOCKET_PATH).await; + assert!(result.is_ok()); + + let stats = result.unwrap(); + assert_eq!(stats.unit_states.len(), 0); + assert_eq!(stats.service_stats.len(), 0); + } + + #[test] + fn test_parse_metric_enum() { + let metric_active = ListOutput { + name: "io.systemd.unit_active_state".to_string(), + value: string_value("active"), + object: Some("test.service".to_string()), + fields: None, + }; + assert_eq!( + parse_metric_enum(&metric_active, SystemdUnitActiveState::unknown), + SystemdUnitActiveState::active + ); + + let metric_loaded = ListOutput { + name: "io.systemd.unit_load_state".to_string(), + value: string_value("loaded"), + object: Some("test.service".to_string()), + fields: None, + }; + assert_eq!( + parse_metric_enum(&metric_loaded, SystemdUnitLoadState::unknown), + SystemdUnitLoadState::loaded + ); + + // Invalid value uses default + let metric_invalid = ListOutput { + name: "io.systemd.unit_active_state".to_string(), + value: string_value("invalid"), + object: Some("test.service".to_string()), + fields: None, + }; + assert_eq!( + parse_metric_enum(&metric_invalid, SystemdUnitActiveState::unknown), + SystemdUnitActiveState::unknown + ); + + // Empty value uses default + let metric_empty = ListOutput { + name: "io.systemd.unit_active_state".to_string(), + value: empty_value(), + object: Some("test.service".to_string()), + fields: None, + }; + assert_eq!( + parse_metric_enum(&metric_empty, SystemdUnitActiveState::unknown), + SystemdUnitActiveState::unknown + ); + } + + #[test] + fn test_parse_metric_enum_all_states() { + // Test all active states + let active_states = vec![ + ("active", SystemdUnitActiveState::active), + ("reloading", SystemdUnitActiveState::reloading), + ("inactive", SystemdUnitActiveState::inactive), + ("failed", SystemdUnitActiveState::failed), + ("activating", SystemdUnitActiveState::activating), + ("deactivating", SystemdUnitActiveState::deactivating), + ]; + + for (state_str, expected) in active_states { + let metric = ListOutput { + name: "io.systemd.unit_active_state".to_string(), + value: string_value(state_str), + object: Some("test.service".to_string()), + fields: None, + }; + assert_eq!( + parse_metric_enum(&metric, SystemdUnitActiveState::unknown), + expected + ); + } + + // Test all load states + let load_states = vec![ + ("loaded", SystemdUnitLoadState::loaded), + ("error", SystemdUnitLoadState::error), + ("masked", SystemdUnitLoadState::masked), + ("not_found", SystemdUnitLoadState::not_found), + ]; + + for (state_str, expected) in load_states { + let metric = ListOutput { + name: "io.systemd.unit_load_state".to_string(), + value: string_value(state_str), + object: Some("test.service".to_string()), + fields: None, + }; + assert_eq!( + parse_metric_enum(&metric, SystemdUnitLoadState::unknown), + expected + ); + } + } + + #[tokio::test] + async fn test_parse_state_updates() { + let mut stats = SystemdUnitStats::default(); + let config = default_units_config(); + + // Parse initial state + let metric1 = ListOutput { + name: "io.systemd.unit_active_state".to_string(), + value: string_value("inactive"), + object: Some("test.service".to_string()), + fields: None, + }; + parse_one_metric(&mut stats, &metric1, &config).unwrap(); + assert_eq!( + stats.unit_states.get("test.service").unwrap().active_state, + SystemdUnitActiveState::inactive + ); + + // Update to active state + let metric2 = ListOutput { + name: "io.systemd.unit_active_state".to_string(), + value: string_value("active"), + object: Some("test.service".to_string()), + fields: None, + }; + parse_one_metric(&mut stats, &metric2, &config).unwrap(); + assert_eq!( + stats.unit_states.get("test.service").unwrap().active_state, + SystemdUnitActiveState::active + ); + } + + #[tokio::test] + async fn test_unhealthy_computed() { + let mut stats = SystemdUnitStats::default(); + let config = default_units_config(); + + // Set active state to failed + let metric1 = ListOutput { + name: "io.systemd.unit_active_state".to_string(), + value: string_value("failed"), + object: Some("broken.service".to_string()), + fields: None, + }; + parse_one_metric(&mut stats, &metric1, &config).unwrap(); + + // Set load state to loaded + let metric2 = ListOutput { + name: "io.systemd.unit_load_state".to_string(), + value: string_value("loaded"), + object: Some("broken.service".to_string()), + fields: None, + }; + parse_one_metric(&mut stats, &metric2, &config).unwrap(); + + // Should be unhealthy: loaded + failed + assert!(stats.unit_states.get("broken.service").unwrap().unhealthy); + + // Set active state to active + let metric3 = ListOutput { + name: "io.systemd.unit_active_state".to_string(), + value: string_value("active"), + object: Some("healthy.service".to_string()), + fields: None, + }; + parse_one_metric(&mut stats, &metric3, &config).unwrap(); + + // Set load state to loaded + let metric4 = ListOutput { + name: "io.systemd.unit_load_state".to_string(), + value: string_value("loaded"), + object: Some("healthy.service".to_string()), + fields: None, + }; + parse_one_metric(&mut stats, &metric4, &config).unwrap(); + + // Should be healthy: loaded + active + assert!(!stats.unit_states.get("healthy.service").unwrap().unhealthy); + } + + #[tokio::test] + async fn test_allowlist_filtering() { + let mut stats = SystemdUnitStats::default(); + let config = crate::config::UnitsConfig { + enabled: true, + state_stats: true, + state_stats_allowlist: HashSet::from(["allowed.service".to_string()]), + state_stats_blocklist: HashSet::new(), + state_stats_time_in_state: false, + }; + + // Allowed unit should be tracked + let metric1 = ListOutput { + name: "io.systemd.unit_active_state".to_string(), + value: string_value("active"), + object: Some("allowed.service".to_string()), + fields: None, + }; + parse_one_metric(&mut stats, &metric1, &config).unwrap(); + assert!(stats.unit_states.contains_key("allowed.service")); + + // Non-allowed unit should be skipped + let metric2 = ListOutput { + name: "io.systemd.unit_active_state".to_string(), + value: string_value("active"), + object: Some("not-allowed.service".to_string()), + fields: None, + }; + parse_one_metric(&mut stats, &metric2, &config).unwrap(); + assert!(!stats.unit_states.contains_key("not-allowed.service")); + } + + #[tokio::test] + async fn test_blocklist_filtering() { + let mut stats = SystemdUnitStats::default(); + let config = crate::config::UnitsConfig { + enabled: true, + state_stats: true, + state_stats_allowlist: HashSet::new(), + state_stats_blocklist: HashSet::from(["blocked.service".to_string()]), + state_stats_time_in_state: false, + }; + + // Blocked unit should be skipped + let metric1 = ListOutput { + name: "io.systemd.unit_active_state".to_string(), + value: string_value("active"), + object: Some("blocked.service".to_string()), + fields: None, + }; + parse_one_metric(&mut stats, &metric1, &config).unwrap(); + assert!(!stats.unit_states.contains_key("blocked.service")); + + // Non-blocked unit should be tracked + let metric2 = ListOutput { + name: "io.systemd.unit_active_state".to_string(), + value: string_value("active"), + object: Some("ok.service".to_string()), + fields: None, + }; + parse_one_metric(&mut stats, &metric2, &config).unwrap(); + assert!(stats.unit_states.contains_key("ok.service")); + } + + #[tokio::test] + async fn test_blocklist_overrides_allowlist() { + let mut stats = SystemdUnitStats::default(); + let config = crate::config::UnitsConfig { + enabled: true, + state_stats: true, + state_stats_allowlist: HashSet::from(["both.service".to_string()]), + state_stats_blocklist: HashSet::from(["both.service".to_string()]), + state_stats_time_in_state: false, + }; + + // Unit in both lists should be blocked (blocklist takes priority) + let metric = ListOutput { + name: "io.systemd.unit_active_state".to_string(), + value: string_value("active"), + object: Some("both.service".to_string()), + fields: None, + }; + parse_one_metric(&mut stats, &metric, &config).unwrap(); + assert!(!stats.unit_states.contains_key("both.service")); + } +}