diff --git a/Cargo.lock b/Cargo.lock index 2f57e0474..77f99f128 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -414,6 +414,7 @@ checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" dependencies = [ "iana-time-zone", "num-traits", + "serde", "windows-link", ] @@ -777,8 +778,10 @@ dependencies = [ name = "infra-compass-db" version = "0.0.8" dependencies = [ + "chrono", "csv", "duckdb", + "regex", "serde", "serde_json", "sha2", diff --git a/Cargo.toml b/Cargo.toml index 8ba59b19c..6149cd563 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,8 +20,10 @@ keywords = ["NLR", "database", "energy", "ordinance"] infra-compass-db = { version = "0.0.8", path = "crates/compass" } anyhow = { version = "1.0.98" } clap = { version = "4.5.40", features = ["cargo"] } +chrono = { version = "0.4.42", default-features = false, features = ["serde"] } csv = { version = "1.3.1" } duckdb = { version = "1.4.0", features = ["bundled"] } +regex = { version = "1.12.2" } serde = { version = "1.0.219", features = ["derive"] } serde_json = { version = "1.0.140" } sha2 = { version = "0.10.8" } diff --git a/crates/compass/Cargo.toml b/crates/compass/Cargo.toml index 319832ad9..b1b7dcef4 100644 --- a/crates/compass/Cargo.toml +++ b/crates/compass/Cargo.toml @@ -12,8 +12,10 @@ categories.workspace = true keywords.workspace = true [dependencies] +chrono = { workspace = true } csv = { workspace = true } duckdb.workspace = true +regex = { workspace = true } serde.workspace = true serde_json.workspace = true sha2 = { workspace = true } diff --git a/crates/compass/src/error.rs b/crates/compass/src/error.rs index 560157106..4c3d9a402 100644 --- a/crates/compass/src/error.rs +++ b/crates/compass/src/error.rs @@ -12,6 +12,9 @@ pub enum Error { #[error(transparent)] DuckDB(#[from] duckdb::Error), + #[error(transparent)] + Regex(#[from] regex::Error), + #[allow(dead_code)] #[error("Undefined error")] // Used during development while it is not clear a category of error diff --git a/crates/compass/src/lib.rs b/crates/compass/src/lib.rs index 2a373f723..aeea7030d 100644 --- a/crates/compass/src/lib.rs +++ b/crates/compass/src/lib.rs @@ -118,7 +118,7 @@ pub fn load_ordinance + std::fmt::Debug>( let ordinance = runtime .block_on(scraper::ScrapedOrdinance::open(ordinance_path)) - .unwrap(); + .expect("Failed to open ordinance data source"); conn.commit().unwrap(); tracing::debug!("Transaction committed"); diff --git a/crates/compass/src/scraper/log/loglevel.rs b/crates/compass/src/scraper/log/loglevel.rs new file mode 100644 index 000000000..b0e695ef3 --- /dev/null +++ b/crates/compass/src/scraper/log/loglevel.rs @@ -0,0 +1,128 @@ +//! Runtime logs +//! +//! Parse and record the logs emitted by the runtime to support +//! pos-processing and analysis. + +/// Log levels emitted by the (Python) runtime. +#[derive(Debug, PartialEq, serde::Deserialize)] +pub(super) enum LogLevel { + #[serde(rename = "DEBUG_TO_FILE")] + DebugToFile, + #[serde(rename = "TRACE")] + Trace, + #[serde(rename = "DEBUG")] + Debug, + #[serde(rename = "INFO")] + Info, + #[serde(rename = "WARNING")] + Warning, + #[serde(rename = "ERROR")] + Error, +} + +#[cfg(test)] +mod test_loglevel { + use super::*; + + #[test] + fn deserialize_trace() { + let json = r#""TRACE""#; + let level: LogLevel = serde_json::from_str(json).unwrap(); + assert!(matches!(level, LogLevel::Trace)); + } + + #[test] + fn deserialize_debug() { + let json = r#""DEBUG""#; + let level: LogLevel = serde_json::from_str(json).unwrap(); + assert!(matches!(level, LogLevel::Debug)); + } + + #[test] + fn deserialize_info() { + let json = r#""INFO""#; + let level: LogLevel = serde_json::from_str(json).unwrap(); + assert!(matches!(level, LogLevel::Info)); + } + + #[test] + fn deserialize_warning() { + let json = r#""WARNING""#; + let level: LogLevel = serde_json::from_str(json).unwrap(); + assert!(matches!(level, LogLevel::Warning)); + } + + #[test] + fn deserialize_error() { + let json = r#""ERROR""#; + let level: LogLevel = serde_json::from_str(json).unwrap(); + assert!(matches!(level, LogLevel::Error)); + } + + #[test] + fn deserialize_invalid_variant() { + let json = r#""INVALID""#; + let result: std::result::Result = serde_json::from_str(json); + assert!(result.is_err()); + } + + #[test] + fn deserialize_lowercase_fails() { + let json = r#""trace""#; + let result: std::result::Result = serde_json::from_str(json); + assert!(result.is_err()); + } + + #[test] + fn deserialize_mixed_case_fails() { + let json = r#""Trace""#; + let result: std::result::Result = serde_json::from_str(json); + assert!(result.is_err()); + } + + #[test] + fn deserialize_in_struct() { + #[derive(serde::Deserialize)] + struct LogEntry { + level: LogLevel, + message: String, + } + + let json = r#"{"level": "ERROR", "message": "Something went wrong"}"#; + let entry: LogEntry = serde_json::from_str(json).unwrap(); + assert!(matches!(entry.level, LogLevel::Error)); + assert_eq!(entry.message, "Something went wrong"); + } + + #[test] + fn deserialize_array_of_levels() { + let json = r#"["TRACE", "INFO", "ERROR"]"#; + let levels: Vec = serde_json::from_str(json).unwrap(); + assert_eq!(levels.len(), 3); + assert!(matches!(levels[0], LogLevel::Trace)); + assert!(matches!(levels[1], LogLevel::Info)); + assert!(matches!(levels[2], LogLevel::Error)); + } + + #[test] + fn deserialize_with_whitespace() { + let json = r#" "INFO" "#; + let level: LogLevel = serde_json::from_str(json).unwrap(); + assert!(matches!(level, LogLevel::Info)); + } + + #[test] + fn error_message_contains_valid_options() { + let json = r#""FATAL""#; + let result: std::result::Result = serde_json::from_str(json); + + match result { + Err(e) => { + let error_msg = e.to_string(); + // The error message should mention valid variants + assert!(error_msg.contains("unknown variant")); + } + Ok(_) => panic!("Expected deserialization to fail"), + } + } +} diff --git a/crates/compass/src/scraper/log/mod.rs b/crates/compass/src/scraper/log/mod.rs new file mode 100644 index 000000000..936cd617c --- /dev/null +++ b/crates/compass/src/scraper/log/mod.rs @@ -0,0 +1,218 @@ +//! Runtime logs +//! +//! Parse and record the logs from the ordinance runtime to support +//! post-processing and analysis. +//! +//! The most verbose levels are ignored to minimize the impact on the +//! final database size. The outputs are archived, so any forensics +//! can be done by inspecting the raw log files if needed. The purpose +//! here is to support questions such as what is the distribution of +//! cost and runtime per jurisdictions? Which exceptions were captured +//! on each jurisdiction? + +mod loglevel; + +use std::sync::LazyLock; + +use chrono::NaiveDateTime; +use regex::Regex; +use tracing::{debug, error, trace}; + +use crate::error::Result; +use loglevel::LogLevel; + +#[derive(Debug)] +/// A single log record parsed from the runtime logs. +/// +/// Expect something like: +/// [2025-12-06 15:15:14,272] INFO - Task-1: Running COMPASS +/// or +/// [2025-12-06 15:15:14,572] INFO - Jefferson County, Colorado: Running COMPASS +/// +/// The 'message' field only supports single-line log messages. +pub(super) struct LogRecord { + timestamp: NaiveDateTime, + level: LogLevel, + subject: String, + message: String, +} + +impl LogRecord { + /// Parse a single log line into a LogRecord + fn parse(line: &str) -> Result { + // Regex pattern: [timestamp] LEVEL - subject: message + static RE: LazyLock = + LazyLock::new(|| Regex::new(r"^\[([^\]]+)\]\s+(\w+)\s+-\s+([^:]+):\s+(.+)$").unwrap()); + + let caps = RE.captures(line).ok_or_else(|| { + crate::error::Error::Undefined(format!("Failed to parse log line: {}", line)) + })?; + + let timestamp_str = caps.get(1).unwrap().as_str().to_string(); + trace!("Parsing timestamp: {}", timestamp_str); + let timestamp = NaiveDateTime::parse_from_str(×tamp_str, "%Y-%m-%d %H:%M:%S,%3f") + .map_err(|e| { + error!("Failed to parse timestamp: {}, error: {}", timestamp_str, e); + crate::error::Error::Undefined(format!( + "Failed to parse timestamp '{}': {}", + timestamp_str, e + )) + })?; + + let level_str = caps.get(2).unwrap().as_str(); + // Parse the log level + let level = serde_json::from_str(&format!(r#""{}""#, level_str)).map_err(|e| { + crate::error::Error::Undefined(format!("Invalid log level '{}': {}", level_str, e)) + })?; + + let subject = caps.get(3).unwrap().as_str().to_string(); + let message = caps.get(4).unwrap().as_str().to_string(); + + Ok(LogRecord { + timestamp, + level, + subject, + message, + }) + } + + fn record(&self, conn: &duckdb::Transaction, bookkeeper_id: usize) -> Result<()> { + trace!("Recording log record: {:?}", self); + conn.execute( + "INSERT INTO logs (bookkeeper_lnk, timestamp, level, subject, message) VALUES (?, ?, ?, ?, ?)", + duckdb::params![ + bookkeeper_id, + self.timestamp.format("%Y-%m-%d %H:%M:%S.%3f").to_string(), + format!("{:?}", self.level), + &self.subject, + &self.message, + ], + )?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_single_line() { + let line = "[2025-12-06 15:15:14,272] INFO - Task-1: Running COMPASS"; + let record = LogRecord::parse(line).unwrap(); + + assert_eq!( + record.timestamp.date(), + chrono::NaiveDate::from_ymd_opt(2025, 12, 6).unwrap() + ); + assert_eq!( + record.timestamp.time(), + chrono::NaiveTime::from_hms_milli_opt(15, 15, 14, 272).unwrap() + ); + + assert!(matches!(record.level, LogLevel::Info)); + assert_eq!(record.subject, "Task-1"); + assert_eq!(record.message, "Running COMPASS"); + } +} + +#[derive(Debug)] +/// Collection of runtime log records +pub(super) struct RuntimeLogs(Vec); + +impl RuntimeLogs { + /// Parse runtime logs from text input + /// + /// # Notes + /// - For now, hardcoded to only keep INFO, WARNING, and ERROR levels. + /// These logs are quite long with the more verbose levels. Since I + /// collect everything, it is better to filter and reduce it here. + /// In the future, consider returning an iterator instead. + /// - Ignore mulltiple lines messages. + /// - Lines that fail to parse are skipped with a warning. + fn parse(input: &str) -> Result { + let records: Vec = input + .lines() + .filter(|line| !line.trim().is_empty()) + .filter_map(|line| match LogRecord::parse(line) { + Ok(record) => { + trace!("Parsed log line: {}", line); + Some(record) + } + Err(e) => { + trace!("Failed to parse log line: {}. Error: {}", line, e); + None + } + }) + .filter(|record| { + (record.level == LogLevel::Info) + || (record.level == LogLevel::Warning) + || (record.level == LogLevel::Error) + }) + .map(|record| { + debug!("Keeping log record: {:?}", record); + record + }) + .collect(); + Ok(RuntimeLogs(records)) + } + + pub(super) fn init_db(conn: &duckdb::Transaction) -> Result<()> { + conn.execute_batch( + r" + CREATE SEQUENCE IF NOT EXISTS scraper_log_seq START 1; + CREATE TABLE IF NOT EXISTS logs ( + id INTEGER PRIMARY KEY DEFAULT NEXTVAL('scraper_log_seq'), + bookkeeper_lnk INTEGER REFERENCES bookkeeper(id) NOT NULL, + timestamp TIMESTAMP, + level VARCHAR, + subject VARCHAR, + message VARCHAR + ); + ", + )?; + Ok(()) + } + + pub(super) async fn open>(root: P) -> Result { + let path = root.as_ref().join("logs").join("all.log"); + let content = tokio::fs::read_to_string(path).await?; + let records = Self::parse(&content)?; + Ok(records) + } + + pub(super) fn record(&self, conn: &duckdb::Transaction, commit_id: usize) -> Result<()> { + // debug!("Recording log: {:?}", self); + debug!("Recording runtime logs"); + + for record in &self.0 { + record.record(conn, commit_id)?; + } + + Ok(()) + } +} + +#[cfg(test)] +/// Samples of runtime logs for testing purposes +pub(crate) mod sample { + use crate::error::Result; + use std::io::Write; + + pub(crate) fn as_text_v1() -> String { + r#" +[2025-12-06 15:15:14,272] INFO - Task-1: Running COMPASS version 0.11.3.dev8+g69a75b7.d20251111 +[2025-12-06 15:15:14,872] INFO - Task-1: Processing 250 jurisdiction(s) +[2025-12-06 15:15:14,272] INFO - Task-1: Running COMPASS +[2025-12-06 15:15:14,572] INFO - Jefferson County, Colorado: Running COMPASS +[2025-12-06 19:48:10,503] INFO - Task-1: Total runtime: 4:32:55 + "# + .to_string() + } + + pub(crate) fn as_file>(path: P) -> Result { + let mut file = std::fs::File::create(path).unwrap(); + writeln!(file, "{}", as_text_v1()).unwrap(); + Ok(file) + } +} diff --git a/crates/compass/src/scraper/mod.rs b/crates/compass/src/scraper/mod.rs index 71eefc2b7..e8c37184c 100644 --- a/crates/compass/src/scraper/mod.rs +++ b/crates/compass/src/scraper/mod.rs @@ -1,5 +1,6 @@ //! Support for the ordinance scraper output +mod log; mod metadata; mod ordinance; mod source; @@ -11,6 +12,7 @@ use tracing::{self, debug, trace}; use crate::error; use crate::error::Result; +use log::RuntimeLogs; use metadata::Metadata; use ordinance::Ordinance; #[allow(unused_imports)] @@ -73,6 +75,8 @@ pub(crate) struct ScrapedOrdinance { usage: Usage, /// The ordinance section ordinance: Ordinance, + /// The runtime logs + logs: RuntimeLogs, } impl ScrapedOrdinance { @@ -88,6 +92,7 @@ impl ScrapedOrdinance { pub(super) fn init_db(conn: &duckdb::Transaction) -> Result<()> { debug!("Initializing ScrapedOrdinance database"); + log::RuntimeLogs::init_db(conn)?; source::Source::init_db(conn)?; metadata::Metadata::init_db(conn)?; usage::Usage::init_db(conn)?; @@ -112,11 +117,12 @@ impl ScrapedOrdinance { return Err(error::Error::Undefined("Path does not exist".to_string())); } - let (source, metadata, usage, ordinance) = tokio::try_join!( + let (source, metadata, usage, ordinance, logs) = tokio::try_join!( source::Source::open(&root), metadata::Metadata::open(&root), usage::Usage::open(&root), - ordinance::Ordinance::open(&root) + ordinance::Ordinance::open(&root), + log::RuntimeLogs::open(&root), )?; trace!("Scraped ordinance opened successfully"); @@ -127,13 +133,14 @@ impl ScrapedOrdinance { source, usage, ordinance, + logs, }) } #[allow(dead_code)] pub(crate) async fn push(&self, conn: &mut duckdb::Connection, commit_id: usize) -> Result<()> { // Load the ordinance into the database - tracing::trace!("Pushing scraped ordinance into the database"); + tracing::info!("Recording scraped ordinance into the database"); let conn = conn.transaction().unwrap(); tracing::trace!("Transaction started"); @@ -143,6 +150,7 @@ impl ScrapedOrdinance { self.metadata.write(&conn, commit_id).unwrap(); self.usage().await.unwrap().write(&conn, commit_id).unwrap(); self.ordinance.write(&conn, commit_id).unwrap(); + self.logs.record(&conn, commit_id).unwrap(); tracing::trace!("Committing transaction"); conn.commit()?; @@ -169,11 +177,7 @@ impl ScrapedOrdinance { #[cfg(test)] mod tests { - use super::ScrapedOrdinance; - use super::metadata; - use super::ordinance; - use super::source; - use super::usage; + use super::*; use std::io::Write; #[tokio::test] @@ -203,6 +207,8 @@ mod tests { let _metadata_file = metadata::sample::as_file(target.path().join("meta.json")).unwrap(); let _usage_file = usage::sample::as_file(target.path().join("usage.json")).unwrap(); + std::fs::create_dir(target.path().join("logs")).unwrap(); + let _log_file = log::sample::as_file(target.path().join("logs").join("all.log")).unwrap(); ordinance::sample::as_file(target.path()).unwrap(); let demo = ScrapedOrdinance::open(target).await.unwrap(); diff --git a/crates/compass/src/scraper/usage.rs b/crates/compass/src/scraper/usage.rs index 5032715b1..831f9d1d4 100644 --- a/crates/compass/src/scraper/usage.rs +++ b/crates/compass/src/scraper/usage.rs @@ -63,7 +63,7 @@ pub(super) struct UsageValues { impl Usage { /// Initialize the database for the Usage context pub(super) fn init_db(conn: &duckdb::Transaction) -> Result<()> { - tracing::trace!("Initializing database for Usage"); + tracing::debug!("Initializing database for Usage"); conn.execute_batch( r" CREATE SEQUENCE usage_sequence START 1;