diff --git a/proxy_agent/src/proxy_agent_status.rs b/proxy_agent/src/proxy_agent_status.rs index 51a944ba..aec85804 100644 --- a/proxy_agent/src/proxy_agent_status.rs +++ b/proxy_agent/src/proxy_agent_status.rs @@ -154,7 +154,8 @@ impl ProxyAgentStatusTask { message: status, duration: status_report_time.elapsed().as_millis() as i64, }, - ); + ) + .await; status_report_time = Instant::now(); } // write the aggregate status to status.json file @@ -290,7 +291,7 @@ impl ProxyAgentStatusTask { async fn write_aggregate_status_to_file(&self, status: GuestProxyAgentAggregateStatus) { let full_file_path = self.status_dir.join("status.json"); - if let Err(e) = misc_helpers::json_write_to_file(&status, &full_file_path) { + if let Err(e) = misc_helpers::json_write_to_file_async(&status, &full_file_path).await { self.update_agent_status_message(format!( "Error writing aggregate status to status file: {e}" )) diff --git a/proxy_agent_shared/Cargo.toml b/proxy_agent_shared/Cargo.toml index 269dd726..04b53a8a 100644 --- a/proxy_agent_shared/Cargo.toml +++ b/proxy_agent_shared/Cargo.toml @@ -16,7 +16,7 @@ serde_json = "1.0.91" # json Deserializer serde-xml-rs = "0.8.1" # xml Deserializer with xml attribute regex = "1.11" # match file name thiserror = "1.0.64" -tokio = { version = "1", features = ["rt", "macros", "net", "sync", "time"] } +tokio = { version = "1", features = ["fs", "rt", "macros", "net", "sync", "time"] } tokio-util = "0.7.11" libc = "0.2.147" log = { version = "0.4.26", features = ["std"] } diff --git a/proxy_agent_shared/src/misc_helpers.rs b/proxy_agent_shared/src/misc_helpers.rs index 57781523..c066eb2f 100644 --- a/proxy_agent_shared/src/misc_helpers.rs +++ b/proxy_agent_shared/src/misc_helpers.rs @@ -25,32 +25,47 @@ pub fn get_thread_identity() -> String { format!("{:0>8}", thread_id::get()) } -pub fn get_date_time_string_with_milliseconds() -> String { - let date_format = - format_description::parse("[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond]") - .unwrap(); +// Static format descriptors parsed once and reused for all calls +static ISO8601_MILLIS_FORMAT: std::sync::LazyLock< + Vec>, +> = std::sync::LazyLock::new(|| { + format_description::parse("[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond]") + .expect("Invalid ISO8601 millis date format") +}); + +static ISO8601_FORMAT: std::sync::LazyLock>> = + std::sync::LazyLock::new(|| { + format_description::parse("[year]-[month]-[day]T[hour]:[minute]:[second]Z") + .expect("Invalid ISO8601 date format") + }); + +// This format is also the preferred HTTP date format. https://httpwg.org/specs/rfc9110.html#http.date +static RFC1123_FORMAT: std::sync::LazyLock>> = + std::sync::LazyLock::new(|| { + format_description::parse( + "[weekday repr:short], [day] [month repr:short] [year] [hour]:[minute]:[second] GMT", + ) + .expect("Invalid RFC1123 date format") + }); - let time_str = OffsetDateTime::now_utc().format(&date_format).unwrap(); +pub fn get_date_time_string_with_milliseconds() -> String { + let time_str = OffsetDateTime::now_utc() + .format(&*ISO8601_MILLIS_FORMAT) + .expect("Failed to format ISO8601 millis date"); + // Truncate to 23 chars: "YYYY-MM-DDTHH:MM:SS.mmm" time_str.chars().take(23).collect() } pub fn get_date_time_string() -> String { - let date_format = - format_description::parse("[year]-[month]-[day]T[hour]:[minute]:[second]Z").unwrap(); - - let time_str = OffsetDateTime::now_utc().format(&date_format).unwrap(); - time_str.chars().collect() + OffsetDateTime::now_utc() + .format(&*ISO8601_FORMAT) + .expect("Failed to format ISO8601 date") } -// This format is also the preferred HTTP date format. https://httpwg.org/specs/rfc9110.html#http.date pub fn get_date_time_rfc1123_string() -> String { - let date_format = format_description::parse( - "[weekday repr:short], [day] [month repr:short] [year] [hour]:[minute]:[second] GMT", - ) - .unwrap(); - - let time_str = OffsetDateTime::now_utc().format(&date_format).unwrap(); - time_str.chars().collect() + OffsetDateTime::now_utc() + .format(&*RFC1123_FORMAT) + .expect("Failed to format RFC1123 date") } pub fn get_date_time_unix_nano() -> i128 { @@ -126,19 +141,45 @@ pub fn try_create_folder(dir: &Path) -> Result<()> { Ok(()) } +/// Writes a serializable object to a file in JSON format. +/// It first writes to a temporary file and then renames it to the target file to avoid leaving a corrupted file if the write operation fails. +/// Remark: it uses BufWriter to reduce system calls and improve performance. +/// Remark: Called from sync code, infrequent writes and small objects pub fn json_write_to_file(obj: &T, file_path: &Path) -> Result<()> where T: ?Sized + Serialize, { + use std::io::BufWriter; + // write to a temp file and rename to avoid corrupted file let temp_file_path = file_path.with_extension("tmp"); let file = File::create(&temp_file_path)?; - serde_json::to_writer_pretty(file, obj)?; + let writer = BufWriter::new(file); // Reduces system calls + serde_json::to_writer_pretty(writer, obj)?; std::fs::rename(temp_file_path, file_path)?; Ok(()) } +/// Async version of json_write_to_file using tokio::fs +/// Serializes to memory first (CPU work), then writes asynchronously (IO work) +/// This avoids blocking the async runtime during serialization +/// Remark: Called from async context, writing while handing concurrent requests, and potentially larger objects +pub async fn json_write_to_file_async(obj: &T, file_path: &Path) -> Result<()> +where + T: ?Sized + Serialize, +{ + // Serialize to memory first (CPU work - fast) + let json_bytes = serde_json::to_vec_pretty(obj)?; + + // Write asynchronously (IO work) + let temp_file_path = file_path.with_extension("tmp"); + tokio::fs::write(&temp_file_path, json_bytes).await?; + tokio::fs::rename(&temp_file_path, file_path).await?; + + Ok(()) +} + pub fn json_read_from_file(file_path: &Path) -> Result where T: DeserializeOwned, diff --git a/proxy_agent_shared/src/telemetry/event_logger.rs b/proxy_agent_shared/src/telemetry/event_logger.rs index 96bda630..46961af9 100644 --- a/proxy_agent_shared/src/telemetry/event_logger.rs +++ b/proxy_agent_shared/src/telemetry/event_logger.rs @@ -102,7 +102,7 @@ pub async fn start( let mut file_path = event_dir.to_path_buf(); file_path.push(crate::telemetry::new_generic_event_file_name()); - match misc_helpers::json_write_to_file(&events, &file_path) { + match misc_helpers::json_write_to_file_async(&events, &file_path).await { Ok(()) => { logger_manager::write_log( Level::Trace, @@ -169,7 +169,7 @@ pub fn write_event_only(level: Level, message: String, method_name: &str, module }; } -pub fn report_extension_status_event( +pub async fn report_extension_status_event( extension: crate::telemetry::Extension, operation_status: crate::telemetry::OperationStatus, ) { @@ -210,7 +210,7 @@ pub fn report_extension_status_event( let event = crate::telemetry::ExtensionStatusEvent::new(extension, operation_status); let mut file_path = event_dir.to_path_buf(); file_path.push(crate::telemetry::new_extension_event_file_name()); - if let Err(e) = misc_helpers::json_write_to_file(&event, &file_path) { + if let Err(e) = misc_helpers::json_write_to_file_async(&event, &file_path).await { logger_manager::write_log( Level::Warn, format!( @@ -261,7 +261,7 @@ mod tests { }; // This should not panic even if EVENTS_DIR is not set - super::report_extension_status_event(extension, operation_status); + super::report_extension_status_event(extension, operation_status).await; // Start the event logger loop and set the EVENTS_DIR let cloned_events_dir = events_dir.to_path_buf(); @@ -326,7 +326,7 @@ mod tests { }; // Call report_extension_status_event - super::report_extension_status_event(extension.clone(), operation_status.clone()); + super::report_extension_status_event(extension.clone(), operation_status.clone()).await; // Wait for the file to be written tokio::time::sleep(Duration::from_millis(100)).await; diff --git a/proxy_agent_shared/src/telemetry/event_reader.rs b/proxy_agent_shared/src/telemetry/event_reader.rs index 8002dfae..5259b107 100644 --- a/proxy_agent_shared/src/telemetry/event_reader.rs +++ b/proxy_agent_shared/src/telemetry/event_reader.rs @@ -376,12 +376,15 @@ mod tests { misc_helpers::try_create_folder(&events_dir).unwrap(); let mut file_path = events_dir.to_path_buf(); file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano())); - misc_helpers::json_write_to_file(&events, &file_path).unwrap(); + misc_helpers::json_write_to_file_async(&events, &file_path) + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(1)).await; let mut file_path = events_dir.to_path_buf(); file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano())); - misc_helpers::json_write_to_file(&events, &file_path).unwrap(); - + misc_helpers::json_write_to_file_async(&events, &file_path) + .await + .unwrap(); // test EventReader with limits let event_reader_limits = EventReaderLimits::new() .with_max_event_file_size_bytes(1024 * 10) @@ -413,11 +416,15 @@ mod tests { tokio::time::sleep(Duration::from_millis(1)).await; let mut file_path = events_dir.to_path_buf(); file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano())); - misc_helpers::json_write_to_file(&events, &file_path).unwrap(); + misc_helpers::json_write_to_file_async(&events, &file_path) + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(1)).await; let mut file_path = events_dir.to_path_buf(); file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano())); - misc_helpers::json_write_to_file(&events, &file_path).unwrap(); + misc_helpers::json_write_to_file_async(&events, &file_path) + .await + .unwrap(); let files = misc_helpers::get_files(&events_dir).unwrap(); assert_eq!(2, files.len(), "Must have 2 event files."); @@ -435,10 +442,14 @@ mod tests { "{}.notjson", misc_helpers::get_date_time_unix_nano() )); - misc_helpers::json_write_to_file(&events, &file_path).unwrap(); + misc_helpers::json_write_to_file_async(&events, &file_path) + .await + .unwrap(); let mut file_path = events_dir.to_path_buf(); file_path.push(format!("a{}.json", misc_helpers::get_date_time_unix_nano())); - misc_helpers::json_write_to_file(&events, &file_path).unwrap(); + misc_helpers::json_write_to_file_async(&events, &file_path) + .await + .unwrap(); let events_processed = event_reader.process_once().await; assert_eq!(0, events_processed, "events_processed must be 0."); let files = misc_helpers::get_files(&events_dir).unwrap(); @@ -492,13 +503,17 @@ mod tests { // Write extension event files with proper naming pattern let mut file_path = events_dir.to_path_buf(); file_path.push(crate::telemetry::new_extension_event_file_name()); - misc_helpers::json_write_to_file(&event, &file_path).unwrap(); + misc_helpers::json_write_to_file_async(&event, &file_path) + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(1)).await; let mut file_path2 = events_dir.to_path_buf(); file_path2.push(crate::telemetry::new_extension_event_file_name()); - misc_helpers::json_write_to_file(&event, &file_path2).unwrap(); + misc_helpers::json_write_to_file_async(&event, &file_path2) + .await + .unwrap(); // Verify files were created let files = misc_helpers::search_files( @@ -529,7 +544,9 @@ mod tests { // Test with non-matching file names (should not be processed) let mut non_matching_file = events_dir.to_path_buf(); non_matching_file.push("not_extension_event.json"); - misc_helpers::json_write_to_file(&event, &non_matching_file).unwrap(); + misc_helpers::json_write_to_file_async(&event, &non_matching_file) + .await + .unwrap(); let events_processed = event_reader.process_extension_status_events().await; assert_eq!( @@ -547,7 +564,9 @@ mod tests { // Write another event file let mut file_path3 = events_dir.to_path_buf(); file_path3.push(crate::telemetry::new_extension_event_file_name()); - misc_helpers::json_write_to_file(&event, &file_path3).unwrap(); + misc_helpers::json_write_to_file_async(&event, &file_path3) + .await + .unwrap(); // Start the processor in a separate task let event_reader_for_task = EventReader::new( @@ -623,13 +642,17 @@ mod tests { // Write 2 generic event files let mut generic_file1 = events_dir.to_path_buf(); generic_file1.push(crate::telemetry::new_generic_event_file_name()); - misc_helpers::json_write_to_file(&generic_events, &generic_file1).unwrap(); + misc_helpers::json_write_to_file_async(&generic_events, &generic_file1) + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(1)).await; let mut generic_file2 = events_dir.to_path_buf(); generic_file2.push(crate::telemetry::new_generic_event_file_name()); - misc_helpers::json_write_to_file(&generic_events, &generic_file2).unwrap(); + misc_helpers::json_write_to_file_async(&generic_events, &generic_file2) + .await + .unwrap(); // Create extension status event files let extension = crate::telemetry::Extension { @@ -654,7 +677,9 @@ mod tests { for _ in 0..3 { let mut ext_file = events_dir.to_path_buf(); ext_file.push(crate::telemetry::new_extension_event_file_name()); - misc_helpers::json_write_to_file(&extension_event, &ext_file).unwrap(); + misc_helpers::json_write_to_file_async(&extension_event, &ext_file) + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(1)).await; } @@ -728,11 +753,15 @@ mod tests { // Write one of each type again let mut generic_file = events_dir.to_path_buf(); generic_file.push(crate::telemetry::new_generic_event_file_name()); - misc_helpers::json_write_to_file(&generic_events, &generic_file).unwrap(); + misc_helpers::json_write_to_file_async(&generic_events, &generic_file) + .await + .unwrap(); let mut ext_file = events_dir.to_path_buf(); ext_file.push(crate::telemetry::new_extension_event_file_name()); - misc_helpers::json_write_to_file(&extension_event, &ext_file).unwrap(); + misc_helpers::json_write_to_file_async(&extension_event, &ext_file) + .await + .unwrap(); // Process extension events - should only process extension file let extension_events_processed = event_reader.process_extension_status_events().await;