diff --git a/uplink/src/base/mod.rs b/uplink/src/base/mod.rs index 765f8049d..3257e9890 100644 --- a/uplink/src/base/mod.rs +++ b/uplink/src/base/mod.rs @@ -192,7 +192,8 @@ impl Default for DeviceShadowConfig { } #[derive(Clone, Debug, Deserialize)] -pub struct StdoutConfig { +pub struct LogReaderConfig { + pub paths: Vec, pub stream_name: String, pub log_template: String, pub timestamp_template: String, @@ -231,7 +232,7 @@ pub struct Config { pub system_stats: Stats, pub simulator: Option, pub ota_installer: Option, - pub stdout: Option, + pub log_reader: Option, pub prometheus: Option, #[serde(default)] pub device_shadow: DeviceShadowConfig, diff --git a/uplink/src/collector/log_reader.rs b/uplink/src/collector/log_reader.rs new file mode 100644 index 000000000..984fcb147 --- /dev/null +++ b/uplink/src/collector/log_reader.rs @@ -0,0 +1,365 @@ +use log::error; +use regex::{Match, Regex}; +use time::{Month, OffsetDateTime}; +use tokio::fs::File; +use tokio::io::{AsyncBufReadExt, BufReader, Lines}; + +use serde::Serialize; +use serde_json::json; +use tokio::task::JoinSet; + +use crate::base::bridge::{BridgeTx, Payload}; +use crate::base::LogReaderConfig; + +#[derive(Debug, Serialize, Clone, PartialEq)] +struct LogEntry { + pub line: String, + pub tag: Option, + pub level: Option, + #[serde(skip)] + pub timestamp: u64, + pub message: Option, +} + +/// Parse timestamp from log line, use current time as default if unable to ascertain partially +pub fn parse_timestamp(date: &mut OffsetDateTime, s: &str, template: &Regex) -> Option<()> { + let matches = template.captures(s)?; + let to_int = |m: Match<'_>| m.as_str().parse().ok(); + + let mut year = matches.name("year").map(to_int).flatten().unwrap_or(2000); + if year < 2000 { + year += 2000 + } + *date = date.replace_year(year).ok()?; + + let month = matches.name("month").map(to_int).flatten().unwrap_or(0); + let month = Month::try_from(month as u8).ok()?; + *date = date.replace_month(month).ok()?; + + let day = matches.name("day").map(to_int).flatten().unwrap_or(0); + *date = date.replace_day(day as u8).ok()?; + + let hour = matches.name("hour").map(to_int).flatten().unwrap_or(0); + *date = date.replace_hour(hour as u8).ok()?; + + let minute = matches.name("minute").map(to_int).flatten().unwrap_or(0); + *date = date.replace_minute(minute as u8).ok()?; + + let second = matches.name("second").map(to_int).flatten().unwrap_or(0); + *date = date.replace_second(second as u8).ok()?; + + let millisecond = + matches.name("subsecond").map(|m| m.as_str()[0..3].parse().ok()).flatten().unwrap_or(0); + *date = date.replace_millisecond(millisecond).ok()?; + + Some(()) +} + +impl LogEntry { + // NOTE: expects log lines to contain information as defined in log_template e.g. "{log_timestamp} {level} {tag}: {message}" else treats them as message lines + fn parse( + current_line: &mut Option, + line: &str, + log_template: &Regex, + timestamp_template: &Regex, + ) -> Option { + let to_string = |x: Match| x.as_str().to_string(); + let line = line.trim().to_string(); + if let Some(captures) = log_template.captures(&line) { + // Use current time if not able to parse properly + let mut date = time::OffsetDateTime::now_utc(); + if let Some(t) = captures.name("timestamp") { + parse_timestamp(&mut date, t.as_str(), timestamp_template); + } + + let timestamp = (date.unix_timestamp_nanos() / 1_000_000) as u64; + let level = captures.name("level").map(to_string); + let tag = captures.name("tag").map(to_string); + let message = captures.name("message").map(to_string); + + return current_line.replace(LogEntry { line, tag, level, timestamp, message }); + } else if let Some(log_entry) = current_line { + log_entry.line += &format!("\n{line}"); + match &mut log_entry.message { + Some(msg) => *msg += &format!("\n{line}"), + _ => log_entry.message = Some(line), + }; + } + + None + } + + fn payload(self, stream: String, sequence: u32) -> Payload { + Payload { + stream, + device_id: None, + sequence, + timestamp: self.timestamp, + payload: json!(self), + } + } +} + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Io error {0}")] + Io(#[from] std::io::Error), + #[error("Serde error {0}")] + Json(#[from] serde_json::error::Error), +} + +pub struct LogParser { + lines: Lines, + log_entry: Option, + log_template: Regex, + timestamp_template: Regex, +} + +impl LogParser { + fn new(lines: Lines, log_template: Regex, timestamp_template: Regex) -> Self { + Self { lines, log_entry: None, log_template, timestamp_template } + } + + async fn next(&mut self) -> Option { + while let Some(line) = self.lines.next_line().await.ok()? { + if let Some(entry) = LogEntry::parse( + &mut self.log_entry, + &line, + &self.log_template, + &self.timestamp_template, + ) { + return Some(entry); + } + } + + self.log_entry.take() + } +} + +#[derive(Debug, Clone)] +pub struct LogFileReader { + config: LogReaderConfig, + tx: BridgeTx, + log_template: Regex, + timestamp_template: Regex, +} + +impl LogFileReader { + pub fn new(config: LogReaderConfig, tx: BridgeTx) -> Self { + let log_template = Regex::new(&config.log_template).unwrap(); + let timestamp_template = Regex::new(&config.timestamp_template).unwrap(); + Self { config, tx, log_template, timestamp_template } + } + + #[tokio::main(flavor = "current_thread")] + pub async fn start(self) -> Result<(), Error> { + let mut set = JoinSet::new(); + + for path in self.config.paths { + let file = File::open(path).await?; + let lines = BufReader::new(file).lines(); + let mut parser = + LogParser::new(lines, self.log_template.clone(), self.timestamp_template.clone()); + let mut sequence = 0; + let stream_name = self.config.stream_name.to_owned(); + let tx = self.tx.clone(); + + set.spawn(async move { + loop { + match parser.next().await { + Some(entry) => { + sequence += 1; + let payload = entry.payload(stream_name.clone(), sequence); + tx.send_payload(payload).await + } + None => return, + } + } + }); + } + + while let Some(Err(e)) = set.join_next().await { + error!("{e}") + } + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + async fn parse_single_log_line() { + let raw = r#"2023-07-03T17:59:22.979012Z DEBUG uplink::base::mqtt: Outgoing = Publish(9)"#; + let lines = BufReader::new(raw.as_bytes()).lines(); + + let log_template = + Regex::new(r#"^(?P.*)Z\s(?P\S+)\s(?P\S+):\s(?P.*)"#) + .unwrap(); + let timestamp_template = Regex::new(r#"^(?P\S+)-(?P\S+)-(?P\S+)T(?P\S+):(?P\S+):(?P\S+)\.(?P\S\S\S)"#).unwrap(); + let mut parser = LogParser::new(lines, log_template, timestamp_template); + + let entry = parser.next().await.unwrap(); + assert_eq!( + entry, + LogEntry { + level: Some("DEBUG".to_string()), + line: + "2023-07-03T17:59:22.979012Z DEBUG uplink::base::mqtt: Outgoing = Publish(9)".to_string() + , + timestamp: 1688407162000, + message: Some("Outgoing = Publish(9)".to_string()), + tag: Some("uplink::base::mqtt".to_string()) + } + ); + + assert!(parser.next().await.is_none()); + } + + #[tokio::test] + async fn parse_timestamp() { + let raw = r#"2023-07-03T17:59:22.979012"#; + let lines = BufReader::new(raw.as_bytes()).lines(); + + let log_template = Regex::new(r#"^(?P.*)"#).unwrap(); + let timestamp_template = Regex::new(r#"^(?P\S+)-(?P\S+)-(?P\S+)T(?P\S+):(?P\S+):(?P\S+)\.(?P\S\S\S)"#).unwrap(); + let mut parser = LogParser::new(lines, log_template.clone(), timestamp_template); + + let entry = parser.next().await.unwrap(); + assert_eq!(entry.timestamp, 1688407162979); + + let raw = r#"23-07-11 18:03:32"#; + let lines = BufReader::new(raw.as_bytes()).lines(); + + let timestamp_template= Regex::new(r#"^(?P\S+)-(?P\S+)-(?P\S+)\s(?P\S+):(?P\S+):(?P\S+)"#).unwrap(); + let mut parser = LogParser::new(lines, log_template.clone(), timestamp_template); + + let entry = parser.next().await.unwrap(); + + assert_eq!(entry.timestamp, 1689098612000); + } + + #[tokio::test] + async fn parse_multiple_log_lines() { + let raw = r#"2023-07-03T17:59:22.979012Z DEBUG uplink::base::mqtt: Outgoing = Publish(9) +2023-07-03T17:59:23.012000Z DEBUG uplink::base::mqtt: Incoming = PubAck(9)"#; + let lines = BufReader::new(raw.as_bytes()).lines(); + + let log_template = + Regex::new(r#"^(?P.*)Z\s(?P\S+)\s(?P\S+):\s(?P.*)"#) + .unwrap(); + let timestamp_template= Regex::new(r#"^(?P\S+)-(?P\S+)-(?P\S+)T(?P\S+):(?P\S+):(?P\S+)\.(?P\S\S\S)"#).unwrap(); + let mut parser = LogParser::new(lines, log_template, timestamp_template); + + let entry = parser.next().await.unwrap(); + assert_eq!( + entry, + LogEntry { + level: Some("DEBUG".to_string()), + line: + "2023-07-03T17:59:22.979012Z DEBUG uplink::base::mqtt: Outgoing = Publish(9)".to_string() + , + timestamp: 1688407162979, + message: Some("Outgoing = Publish(9)".to_string()), + tag: Some("uplink::base::mqtt".to_string()) + } + ); + + let entry = parser.next().await.unwrap(); + assert_eq!( + entry, + LogEntry { + level: Some("DEBUG".to_string()), + line: + "2023-07-03T17:59:23.012000Z DEBUG uplink::base::mqtt: Incoming = PubAck(9)".to_string() + , + timestamp: 1688407163012, + message: Some("Incoming = PubAck(9)".to_string()), + tag: Some("uplink::base::mqtt".to_string()) + } + ); + + assert!(parser.next().await.is_none()); + } + + #[tokio::test] + async fn parse_beamd_log_lines() { + let raw = r#"2023-07-11T13:56:44.101585Z INFO beamd::http::endpoint: Method = "POST", Uri = "/tenants/naveentest/devices/8/actions", Payload = "{\"name\":\"update_firmware\",\"id\":\"830\",\"payload\":\"{\\\"content-length\\\":35393,\\\"status\\\":false,\\\"url\\\":\\\"https://firmware.stage.bytebeam.io/api/v1/firmwares/one/artifact\\\",\\\"version\\\":\\\"one\\\"}\",\"kind\":\"process\"}" +2023-07-11T13:56:44.113343Z INFO beamd::http::endpoint: Method = "POST", Uri = "/tenants/rpi/devices/6/actions", Payload = "{\"name\":\"tunshell\",\"id\":\"226\",\"payload\":\"{}\",\"kind\":\"process\"}" +2023-07-11T13:56:44.221249Z ERROR beamd::clickhouse: Flush-error: [Status - 500] Ok("Code: 243. DB::Exception: Cannot reserve 11.58 MiB, not enough space. (NOT_ENOUGH_SPACE) (version 22.6.2.12 (official build))\n"), back_up_enabled: true + in beamd::clickhouse::clickhouse_flush with stream: "demo.uplink_process_stats""#; + let lines = BufReader::new(raw.as_bytes()).lines(); + + let log_template = + Regex::new(r#"^(?P.*)Z\s+(?P\S+)\s+(?P\S+):\s+(?P.*)"#) + .unwrap(); + let timestamp_template= Regex::new(r#"^(?P\S+)-(?P\S+)-(?P\S+)T(?P\S+):(?P\S+):(?P\S+)\.(?P\S\S\S)"#).unwrap(); + let mut parser = LogParser::new(lines, log_template.clone(), timestamp_template); + + let entry = parser.next().await.unwrap(); + assert_eq!( + entry, + LogEntry { + level: Some("INFO".to_string()), + line: + "2023-07-11T13:56:44.101585Z INFO beamd::http::endpoint: Method = \"POST\", Uri = \"/tenants/naveentest/devices/8/actions\", Payload = \"{\\\"name\\\":\\\"update_firmware\\\",\\\"id\\\":\\\"830\\\",\\\"payload\\\":\\\"{\\\\\\\"content-length\\\\\\\":35393,\\\\\\\"status\\\\\\\":false,\\\\\\\"url\\\\\\\":\\\\\\\"https://firmware.stage.bytebeam.io/api/v1/firmwares/one/artifact\\\\\\\",\\\\\\\"version\\\\\\\":\\\\\\\"one\\\\\\\"}\\\",\\\"kind\\\":\\\"process\\\"}\"".to_string() + , + timestamp: 1689083804101, + message: Some("Method = \"POST\", Uri = \"/tenants/naveentest/devices/8/actions\", Payload = \"{\\\"name\\\":\\\"update_firmware\\\",\\\"id\\\":\\\"830\\\",\\\"payload\\\":\\\"{\\\\\\\"content-length\\\\\\\":35393,\\\\\\\"status\\\\\\\":false,\\\\\\\"url\\\\\\\":\\\\\\\"https://firmware.stage.bytebeam.io/api/v1/firmwares/one/artifact\\\\\\\",\\\\\\\"version\\\\\\\":\\\\\\\"one\\\\\\\"}\\\",\\\"kind\\\":\\\"process\\\"}\"".to_string()), + tag: Some("beamd::http::endpoint".to_string()) + } + ); + + let entry = parser.next().await.unwrap(); + assert_eq!( + entry, + LogEntry { level: Some("INFO".to_string()), + line: + "2023-07-11T13:56:44.113343Z INFO beamd::http::endpoint: Method = \"POST\", Uri = \"/tenants/rpi/devices/6/actions\", Payload = \"{\\\"name\\\":\\\"tunshell\\\",\\\"id\\\":\\\"226\\\",\\\"payload\\\":\\\"{}\\\",\\\"kind\\\":\\\"process\\\"}\"".to_string() + , + timestamp: 1689083804113, + message: Some("Method = \"POST\", Uri = \"/tenants/rpi/devices/6/actions\", Payload = \"{\\\"name\\\":\\\"tunshell\\\",\\\"id\\\":\\\"226\\\",\\\"payload\\\":\\\"{}\\\",\\\"kind\\\":\\\"process\\\"}\"".to_string()), + tag: Some("beamd::http::endpoint".to_string()) + } +); + + let entry = parser.next().await.unwrap(); + assert_eq!( + entry, + LogEntry { line: "2023-07-11T13:56:44.221249Z ERROR beamd::clickhouse: Flush-error: [Status - 500] Ok(\"Code: 243. DB::Exception: Cannot reserve 11.58 MiB, not enough space. (NOT_ENOUGH_SPACE) (version 22.6.2.12 (official build))\\n\"), back_up_enabled: true\nin beamd::clickhouse::clickhouse_flush with stream: \"demo.uplink_process_stats\"".to_string(), tag: Some("beamd::clickhouse".to_string()), level: Some("ERROR".to_string()), timestamp: 1689083804221, message: Some("Flush-error: [Status - 500] Ok(\"Code: 243. DB::Exception: Cannot reserve 11.58 MiB, not enough space. (NOT_ENOUGH_SPACE) (version 22.6.2.12 (official build))\\n\"), back_up_enabled: true\nin beamd::clickhouse::clickhouse_flush with stream: \"demo.uplink_process_stats\"".to_string()) } +); + + assert!(parser.next().await.is_none()); + } + + #[tokio::test] + async fn parse_consoled_log_lines() { + let raw = r#"23-07-11 18:03:32 consoled-6cd8795566-76km9 INFO [ring.logger:0] - {:request-method :get, :uri "/api/v1/devices/count", :server-name "cloud.bytebeam.io", :ring.logger/type :finish, :status 200, :ring.logger/ms 11} +10.13.2.69 - - [11/Jul/2023:18:03:32 +0000] "GET /api/v1/devices/count?status=active HTTP/1.1" 200 1 "https://cloud.bytebeam.io/projects/kptl/device-management/devices" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"rt=0.016 uct=0.000 cn= o= +"Notifying broker for tenant reactlabs device 305 action 683022""#; + let lines = BufReader::new(raw.as_bytes()).lines(); + + let log_template = Regex::new( + r#"^(?P\S+-\S+-\S+\s\S+:\S+:\S+)\s+(?P\S+)\s+(?P\S+)\s+(?P.*)"# + ).unwrap(); + let timestamp_template = Regex::new(r#"^(?P\S+)-(?P\S+)-(?P\S+)\s(?P\S+):(?P\S+):(?P\S+)"#).unwrap(); + let mut parser = LogParser::new(lines, log_template.clone(), timestamp_template); + + let entry = parser.next().await.unwrap(); + assert_eq!( + entry, + LogEntry { level: Some("INFO".to_string()), + line: + "23-07-11 18:03:32 consoled-6cd8795566-76km9 INFO [ring.logger:0] - {:request-method :get, :uri \"/api/v1/devices/count\", :server-name \"cloud.bytebeam.io\", :ring.logger/type :finish, :status 200, :ring.logger/ms 11}\n10.13.2.69 - - [11/Jul/2023:18:03:32 +0000] \"GET /api/v1/devices/count?status=active HTTP/1.1\" 200 1 \"https://cloud.bytebeam.io/projects/kptl/device-management/devices\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36\"rt=0.016 uct=0.000 cn= o=\n\"Notifying broker for tenant reactlabs device 305 action 683022\"".to_string() + , + timestamp: 1689098612000, + message: Some("[ring.logger:0] - {:request-method :get, :uri \"/api/v1/devices/count\", :server-name \"cloud.bytebeam.io\", :ring.logger/type :finish, :status 200, :ring.logger/ms 11}\n10.13.2.69 - - [11/Jul/2023:18:03:32 +0000] \"GET /api/v1/devices/count?status=active HTTP/1.1\" 200 1 \"https://cloud.bytebeam.io/projects/kptl/device-management/devices\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36\"rt=0.016 uct=0.000 cn= o=\n\"Notifying broker for tenant reactlabs device 305 action 683022\"".to_string()), + tag: Some("consoled-6cd8795566-76km9".to_string()) + } +); + + assert!(parser.next().await.is_none()); + } +} diff --git a/uplink/src/collector/mod.rs b/uplink/src/collector/mod.rs index 4de3c9375..9e3939819 100644 --- a/uplink/src/collector/mod.rs +++ b/uplink/src/collector/mod.rs @@ -3,12 +3,12 @@ pub mod downloader; pub mod installer; #[cfg(target_os = "linux")] pub mod journalctl; +pub mod log_reader; #[cfg(target_os = "android")] pub mod logcat; pub mod process; pub mod prometheus; pub mod simulator; -pub mod stdout; pub mod systemstats; pub mod tcpjson; pub mod tunshell; diff --git a/uplink/src/collector/stdout.rs b/uplink/src/collector/stdout.rs deleted file mode 100644 index fb7c5ebc2..000000000 --- a/uplink/src/collector/stdout.rs +++ /dev/null @@ -1,400 +0,0 @@ -use regex::{Match, Regex}; -use time::{Month, OffsetDateTime}; -use tokio::io::{stdin, AsyncBufReadExt, BufReader, Lines}; - -use serde::Serialize; -use serde_json::json; - -use crate::base::bridge::{BridgeTx, Payload}; -use crate::base::StdoutConfig; - -#[derive(Debug, Serialize)] -struct LogEntry { - pub line: String, - pub tag: Option, - pub level: Option, - #[serde(skip)] - pub timestamp: u64, - pub message: Option, -} - -/// Parse timestamp from log line, use current time as default if unable to ascertain partially -pub fn parse_timestamp(date: &mut OffsetDateTime, s: &str, template: &Regex) -> Option<()> { - let matches = template.captures(s)?; - if let Some(year) = matches.name("year") { - let year = year.as_str().parse().ok()?; - *date = date.replace_year(year).ok()?; - } - if let Some(month) = matches.name("month") { - let month: u8 = month.as_str().parse().ok()?; - let month = Month::try_from(month).ok()?; - *date = date.replace_month(month).ok()?; - } - if let Some(day) = matches.name("day") { - let day = day.as_str().parse().ok()?; - *date = date.replace_day(day).ok()?; - } - if let Some(hour) = matches.name("hour") { - let hour = hour.as_str().parse().ok()?; - *date = date.replace_hour(hour).ok()?; - } - if let Some(minute) = matches.name("minute") { - let minute = minute.as_str().parse().ok()?; - *date = date.replace_minute(minute).ok()?; - } - if let Some(second) = matches.name("second") { - let second = second.as_str().parse().ok()?; - *date = date.replace_second(second).ok()?; - } - if let Some(microsecond) = matches.name("microsecond") { - let microsecond = microsecond.as_str().parse().ok()?; - *date = date.replace_microsecond(microsecond).ok()?; - } - - Some(()) -} - -impl LogEntry { - // NOTE: expects log lines to contain information as defined in log_template e.g. "{log_timestamp} {level} {tag}: {message}" else treats them as message lines - fn parse( - current_line: &mut Option, - line: &str, - log_template: &Regex, - timestamp_template: &Regex, - ) -> Option { - let to_string = |x: Match| x.as_str().to_string(); - let line = line.trim().to_string(); - if let Some(captures) = log_template.captures(&line) { - // Use current time if not able to parse properly - let mut date = time::OffsetDateTime::now_utc(); - if let Some(t) = captures.name("timestamp") { - parse_timestamp(&mut date, t.as_str(), timestamp_template); - } - - let timestamp = (date.unix_timestamp_nanos() / 1_000_000) as u64; - let level = captures.name("level").map(to_string); - let tag = captures.name("tag").map(to_string); - let message = captures.name("message").map(to_string); - - return current_line.replace(LogEntry { line, tag, level, timestamp, message }); - } else if let Some(log_entry) = current_line { - log_entry.line += &format!("\n{line}"); - match &mut log_entry.message { - Some(msg) => *msg += &format!("\n{line}"), - _ => log_entry.message = Some(line), - }; - } - - None - } - - fn payload(self, stream: String, sequence: u32) -> Payload { - Payload { - stream, - device_id: None, - sequence, - timestamp: self.timestamp, - payload: json!(self), - } - } -} - -#[derive(thiserror::Error, Debug)] -pub enum Error { - #[error("Io error {0}")] - Io(#[from] std::io::Error), - #[error("Serde error {0}")] - Json(#[from] serde_json::error::Error), -} - -pub struct Stdout { - config: StdoutConfig, - tx: BridgeTx, - sequence: u32, - log_template: Regex, - timestamp_template: Regex, - log_entry: Option, -} - -impl Stdout { - pub fn new(config: StdoutConfig, tx: BridgeTx) -> Self { - let log_template = Regex::new(&config.log_template).unwrap(); - let timestamp_template = Regex::new(&config.timestamp_template).unwrap(); - Self { config, tx, log_template, timestamp_template, sequence: 0, log_entry: None } - } - - #[tokio::main(flavor = "current_thread")] - pub async fn start(mut self) -> Result<(), Error> { - let stdin = stdin(); - let mut lines = BufReader::new(stdin).lines(); - - loop { - match self.parse_lines(&mut lines).await { - Some(payload) => self.tx.send_payload(payload).await, - None => return Ok(()), - } - } - } - - async fn parse_lines( - &mut self, - lines: &mut Lines, - ) -> Option { - while let Some(line) = lines.next_line().await.ok()? { - if let Some(entry) = LogEntry::parse( - &mut self.log_entry, - &line, - &self.log_template, - &self.timestamp_template, - ) { - self.sequence += 1; - return Some(entry.payload(self.config.stream_name.to_owned(), self.sequence)); - } - } - - self.log_entry - .take() - .map(|e| e.payload(self.config.stream_name.to_owned(), self.sequence + 1)) - } -} - -#[cfg(test)] -mod test { - use flume::bounded; - - use super::*; - - #[tokio::test] - async fn parse_single_log_line() { - let raw = r#"2023-07-03T17:59:22.979012Z DEBUG uplink::base::mqtt: Outgoing = Publish(9)"#; - let mut lines = BufReader::new(raw.as_bytes()).lines(); - - let config = StdoutConfig { - stream_name: "".to_string(), - log_template: - r#"^(?P.*)Z\s(?P\S+)\s(?P\S+):\s(?P.*)"# - .to_string(), - timestamp_template: r#"^(?P\S+)-(?P\S+)-(?P\S+)T(?P\S+):(?P\S+):(?P\S+).(?P\S\S\S)"#.to_string(), - }; - let tx = BridgeTx { - events_tx: { - let (tx, _) = bounded(1); - tx - }, - shutdown_handle: { - let (tx, _) = bounded(1); - tx - }, - }; - let mut handle = Stdout::new(config, tx); - - let Payload { payload, sequence, .. } = handle.parse_lines(&mut lines).await.unwrap(); - assert_eq!( - payload, - json!({"level": "DEBUG", "line": "2023-07-03T17:59:22.979012Z DEBUG uplink::base::mqtt: Outgoing = Publish(9)", "message": "Outgoing = Publish(9)", "tag": "uplink::base::mqtt"}) - ); - assert_eq!(sequence, 1); - - assert!(handle.parse_lines(&mut lines).await.is_none()); - } - - #[tokio::test] - async fn parse_timestamp() { - let raw = r#"2023-07-03T17:59:22.979012"#; - let mut lines = BufReader::new(raw.as_bytes()).lines(); - - let config = StdoutConfig { - stream_name: "".to_string(), - log_template: r#"^(?P.*)"#.to_string(), - timestamp_template: r#"^(?P\S+)-(?P\S+)-(?P\S+)T(?P\S+):(?P\S+):(?P\S+).(?P\S\S\S).*"#.to_string(), - }; - let tx = BridgeTx { - events_tx: { - let (tx, _) = bounded(1); - tx - }, - shutdown_handle: { - let (tx, _) = bounded(1); - tx - }, - }; - let mut handle = Stdout::new(config, tx); - - let Payload { timestamp, sequence, .. } = handle.parse_lines(&mut lines).await.unwrap(); - assert_eq!(sequence, 1); - assert_eq!(timestamp, 1689138886255); - - let raw = r#"23-07-11 18:03:32"#; - let mut lines = BufReader::new(raw.as_bytes()).lines(); - - let config = StdoutConfig { - stream_name: "".to_string(), - log_template: r#"^(?P.*)"#.to_string(), - timestamp_template: r#"^(?P\S+)-(?P\S+)-(?P\S+)\s(?P\S+):(?P\S+):(?P\S+)"#.to_string(), - }; - let tx = BridgeTx { - events_tx: { - let (tx, _) = bounded(1); - tx - }, - shutdown_handle: { - let (tx, _) = bounded(1); - tx - }, - }; - let mut handle = Stdout::new(config, tx); - - let Payload { timestamp, sequence, .. } = handle.parse_lines(&mut lines).await.unwrap(); - assert_eq!(sequence, 1); - assert_eq!(timestamp, 1689138886255); - } - - #[tokio::test] - async fn parse_multiple_log_lines() { - let raw = r#"2023-07-03T17:59:22.979012Z DEBUG uplink::base::mqtt: Outgoing = Publish(9) -2023-07-03T17:59:23.012000Z DEBUG uplink::base::mqtt: Incoming = PubAck(9)"#; - let mut lines = BufReader::new(raw.as_bytes()).lines(); - - let config = StdoutConfig { - stream_name: "".to_string(), - log_template: - r#"^(?P.*)Z\s(?P\S+)\s(?P\S+):\s(?P.*)"# - .to_string(), - timestamp_template: r#"^(?P\S+)-(?P\S+)-(?P\S+)T(?P\S+):(?P\S+):(?P\S+).(?P\S\S\S)"#.to_string(), - }; - let tx = BridgeTx { - events_tx: { - let (tx, _) = bounded(1); - tx - }, - shutdown_handle: { - let (tx, _) = bounded(1); - tx - }, - }; - let mut handle = Stdout::new(config, tx); - - let Payload { payload, sequence, .. } = handle.parse_lines(&mut lines).await.unwrap(); - assert_eq!( - payload, - json!({"level": "DEBUG", "line": "2023-07-03T17:59:22.979012Z DEBUG uplink::base::mqtt: Outgoing = Publish(9)", "message": "Outgoing = Publish(9)", "tag": "uplink::base::mqtt"}) - ); - assert_eq!(sequence, 1); - - let Payload { payload, sequence, .. } = handle.parse_lines(&mut lines).await.unwrap(); - assert_eq!( - payload, - json!({"level": "DEBUG", "line": "2023-07-03T17:59:23.012000Z DEBUG uplink::base::mqtt: Incoming = PubAck(9)", "message": "Incoming = PubAck(9)", "tag": "uplink::base::mqtt"}) - ); - assert_eq!(sequence, 2); - - assert!(handle.parse_lines(&mut lines).await.is_none()); - } - - #[tokio::test] - async fn parse_beamd_log_lines() { - let raw = r#"2023-07-11T13:56:44.101585Z INFO beamd::http::endpoint: Method = "POST", Uri = "/tenants/naveentest/devices/8/actions", Payload = "{\"name\":\"update_firmware\",\"id\":\"830\",\"payload\":\"{\\\"content-length\\\":35393,\\\"status\\\":false,\\\"url\\\":\\\"https://firmware.stage.bytebeam.io/api/v1/firmwares/one/artifact\\\",\\\"version\\\":\\\"one\\\"}\",\"kind\":\"process\"}" -2023-07-11T13:56:44.113343Z INFO beamd::http::endpoint: Method = "POST", Uri = "/tenants/rpi/devices/6/actions", Payload = "{\"name\":\"tunshell\",\"id\":\"226\",\"payload\":\"{}\",\"kind\":\"process\"}" -2023-07-11T13:56:44.221249Z ERROR beamd::clickhouse: Flush-error: [Status - 500] Ok("Code: 243. DB::Exception: Cannot reserve 11.58 MiB, not enough space. (NOT_ENOUGH_SPACE) (version 22.6.2.12 (official build))\n"), back_up_enabled: true - in beamd::clickhouse::clickhouse_flush with stream: "demo.uplink_process_stats""#; - let mut lines = BufReader::new(raw.as_bytes()).lines(); - - let config = StdoutConfig { - stream_name: "".to_string(), - log_template: - r#"^(?P.*)Z\s+(?P\S+)\s+(?P\S+):\s+(?P.*)"# - .to_string(), - timestamp_template: r#"^(?P\S+)-(?P\S+)-(?P\S+)T(?P\S+):(?P\S+):(?P\S+).(?P\S\S\S)"#.to_string(), - }; - let tx = BridgeTx { - events_tx: { - let (tx, _) = bounded(1); - tx - }, - shutdown_handle: { - let (tx, _) = bounded(1); - tx - }, - }; - let mut handle = Stdout::new(config, tx); - - let Payload { payload, sequence, .. } = handle.parse_lines(&mut lines).await.unwrap(); - assert_eq!( - payload, - json!({ - "level": "INFO", - "line": "2023-07-11T13:56:44.101585Z INFO beamd::http::endpoint: Method = \"POST\", Uri = \"/tenants/naveentest/devices/8/actions\", Payload = \"{\\\"name\\\":\\\"update_firmware\\\",\\\"id\\\":\\\"830\\\",\\\"payload\\\":\\\"{\\\\\\\"content-length\\\\\\\":35393,\\\\\\\"status\\\\\\\":false,\\\\\\\"url\\\\\\\":\\\\\\\"https://firmware.stage.bytebeam.io/api/v1/firmwares/one/artifact\\\\\\\",\\\\\\\"version\\\\\\\":\\\\\\\"one\\\\\\\"}\\\",\\\"kind\\\":\\\"process\\\"}\"", - "message": "Method = \"POST\", Uri = \"/tenants/naveentest/devices/8/actions\", Payload = \"{\\\"name\\\":\\\"update_firmware\\\",\\\"id\\\":\\\"830\\\",\\\"payload\\\":\\\"{\\\\\\\"content-length\\\\\\\":35393,\\\\\\\"status\\\\\\\":false,\\\\\\\"url\\\\\\\":\\\\\\\"https://firmware.stage.bytebeam.io/api/v1/firmwares/one/artifact\\\\\\\",\\\\\\\"version\\\\\\\":\\\\\\\"one\\\\\\\"}\\\",\\\"kind\\\":\\\"process\\\"}\"", - "tag": "beamd::http::endpoint" - }) - ); - assert_eq!(sequence, 1); - - let Payload { payload, sequence, .. } = handle.parse_lines(&mut lines).await.unwrap(); - assert_eq!( - payload, - json!({ - "level": "INFO", - "line": "2023-07-11T13:56:44.113343Z INFO beamd::http::endpoint: Method = \"POST\", Uri = \"/tenants/rpi/devices/6/actions\", Payload = \"{\\\"name\\\":\\\"tunshell\\\",\\\"id\\\":\\\"226\\\",\\\"payload\\\":\\\"{}\\\",\\\"kind\\\":\\\"process\\\"}\"", - "message": "Method = \"POST\", Uri = \"/tenants/rpi/devices/6/actions\", Payload = \"{\\\"name\\\":\\\"tunshell\\\",\\\"id\\\":\\\"226\\\",\\\"payload\\\":\\\"{}\\\",\\\"kind\\\":\\\"process\\\"}\"", - "tag": "beamd::http::endpoint" - }) - ); - assert_eq!(sequence, 2); - - let Payload { payload, sequence, .. } = handle.parse_lines(&mut lines).await.unwrap(); - assert_eq!( - payload, - json!({ - "level": "ERROR", - "line": "2023-07-11T13:56:44.221249Z ERROR beamd::clickhouse: Flush-error: [Status - 500] Ok(\"Code: 243. DB::Exception: Cannot reserve 11.58 MiB, not enough space. (NOT_ENOUGH_SPACE) (version 22.6.2.12 (official build))\\n\"), back_up_enabled: true\n in beamd::clickhouse::clickhouse_flush with stream: \"demo.uplink_process_stats\"", - "message": "Flush-error: [Status - 500] Ok(\"Code: 243. DB::Exception: Cannot reserve 11.58 MiB, not enough space. (NOT_ENOUGH_SPACE) (version 22.6.2.12 (official build))\\n\"), back_up_enabled: true\n in beamd::clickhouse::clickhouse_flush with stream: \"demo.uplink_process_stats\"", - "tag": "beamd::clickhouse" - }) - ); - assert_eq!(sequence, 3); - - assert!(handle.parse_lines(&mut lines).await.is_none()); - } - - #[tokio::test] - async fn parse_consoled_log_lines() { - let raw = r#"23-07-11 18:03:32 consoled-6cd8795566-76km9 INFO [ring.logger:0] - {:request-method :get, :uri "/api/v1/devices/count", :server-name "cloud.bytebeam.io", :ring.logger/type :finish, :status 200, :ring.logger/ms 11} -10.13.2.69 - - [11/Jul/2023:18:03:32 +0000] "GET /api/v1/devices/count?status=active HTTP/1.1" 200 1 "https://cloud.bytebeam.io/projects/kptl/device-management/devices" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"rt=0.016 uct=0.000 cn= o= -"Notifying broker for tenant reactlabs device 305 action 683022""#; - let mut lines = BufReader::new(raw.as_bytes()).lines(); - - let config = StdoutConfig { - stream_name: "".to_string(), - log_template: - r#"^(?P\S+-\S+-\S+\s\S+:\S+:\S+)\s+(?P\S+)\s+(?P\S+)\s+(?P.*)"# - .to_string(), - timestamp_template: r#"^(?P\S+)-(?P\S+)-(?P\S+)\s(?P\S+):(?P\S+):(?P\S+)"#.to_string(), - }; - let tx = BridgeTx { - events_tx: { - let (tx, _) = bounded(1); - tx - }, - shutdown_handle: { - let (tx, _) = bounded(1); - tx - }, - }; - let mut handle = Stdout::new(config, tx); - - let Payload { payload, sequence, .. } = handle.parse_lines(&mut lines).await.unwrap(); - assert_eq!( - payload, - json!({ - "level": "INFO", - "line": "23-07-11 18:03:32 consoled-6cd8795566-76km9 INFO [ring.logger:0] - {:request-method :get, :uri \"/api/v1/devices/count\", :server-name \"cloud.bytebeam.io\", :ring.logger/type :finish, :status 200, :ring.logger/ms 11}\n10.13.2.69 - - [11/Jul/2023:18:03:32 +0000] \"GET /api/v1/devices/count?status=active HTTP/1.1\" 200 1 \"https://cloud.bytebeam.io/projects/kptl/device-management/devices\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36\"rt=0.016 uct=0.000 cn= o=\n\"Notifying broker for tenant reactlabs device 305 action 683022\"", - "message": "[ring.logger:0] - {:request-method :get, :uri \"/api/v1/devices/count\", :server-name \"cloud.bytebeam.io\", :ring.logger/type :finish, :status 200, :ring.logger/ms 11}\n10.13.2.69 - - [11/Jul/2023:18:03:32 +0000] \"GET /api/v1/devices/count?status=active HTTP/1.1\" 200 1 \"https://cloud.bytebeam.io/projects/kptl/device-management/devices\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36\"rt=0.016 uct=0.000 cn= o=\n\"Notifying broker for tenant reactlabs device 305 action 683022\"", - "tag": "consoled-6cd8795566-76km9" - }) - ); - assert_eq!(sequence, 1); - - assert!(handle.parse_lines(&mut lines).await.is_none()); - } -} diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index fe13fa4cf..090cac462 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -52,8 +52,8 @@ use base::Compression; use collector::device_shadow::DeviceShadow; use collector::downloader::FileDownloader; use collector::installer::OTAInstaller; +use collector::log_reader::LogFileReader; use collector::process::ProcessHandler; -use collector::stdout::Stdout; use collector::systemstats::StatCollector; use collector::tunshell::TunshellSession; use flume::{bounded, Receiver, RecvError, Sender}; @@ -434,8 +434,8 @@ impl Uplink { let processes = config.processes.clone(); thread::spawn(move || process_handler.start(processes)); - if let Some(config) = &config.stdout { - let stdout_collector = Stdout::new(config.clone(), bridge_tx.clone()); + if let Some(config) = &config.log_reader { + let stdout_collector = LogFileReader::new(config.clone(), bridge_tx.clone()); thread::spawn(move || stdout_collector.start()); }