Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions proxy_agent/src/proxy_agent_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ impl ProxyAgentStatusTask {
message: status,
duration: status_report_time.elapsed().as_millis() as i64,
},
);
)
.await;
status_report_time = Instant::now();
}
// write the aggregate status to status.json file
Expand Down Expand Up @@ -290,7 +291,7 @@ impl ProxyAgentStatusTask {

async fn write_aggregate_status_to_file(&self, status: GuestProxyAgentAggregateStatus) {
let full_file_path = self.status_dir.join("status.json");
if let Err(e) = misc_helpers::json_write_to_file(&status, &full_file_path) {
if let Err(e) = misc_helpers::json_write_to_file_async(&status, &full_file_path).await {
self.update_agent_status_message(format!(
"Error writing aggregate status to status file: {e}"
))
Expand Down
2 changes: 1 addition & 1 deletion proxy_agent_shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ serde_json = "1.0.91" # json Deserializer
serde-xml-rs = "0.8.1" # xml Deserializer with xml attribute
regex = "1.11" # match file name
thiserror = "1.0.64"
tokio = { version = "1", features = ["rt", "macros", "net", "sync", "time"] }
tokio = { version = "1", features = ["fs", "rt", "macros", "net", "sync", "time"] }
tokio-util = "0.7.11"
libc = "0.2.147"
log = { version = "0.4.26", features = ["std"] }
Expand Down
79 changes: 60 additions & 19 deletions proxy_agent_shared/src/misc_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,47 @@ pub fn get_thread_identity() -> String {
format!("{:0>8}", thread_id::get())
}

pub fn get_date_time_string_with_milliseconds() -> String {
let date_format =
format_description::parse("[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond]")
.unwrap();
// Static format descriptors parsed once and reused for all calls
static ISO8601_MILLIS_FORMAT: std::sync::LazyLock<
Vec<time::format_description::FormatItem<'static>>,
> = std::sync::LazyLock::new(|| {
format_description::parse("[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond]")
.expect("Invalid ISO8601 millis date format")
});

static ISO8601_FORMAT: std::sync::LazyLock<Vec<time::format_description::FormatItem<'static>>> =
std::sync::LazyLock::new(|| {
format_description::parse("[year]-[month]-[day]T[hour]:[minute]:[second]Z")
.expect("Invalid ISO8601 date format")
});

// This format is also the preferred HTTP date format. https://httpwg.org/specs/rfc9110.html#http.date
static RFC1123_FORMAT: std::sync::LazyLock<Vec<time::format_description::FormatItem<'static>>> =
std::sync::LazyLock::new(|| {
format_description::parse(
"[weekday repr:short], [day] [month repr:short] [year] [hour]:[minute]:[second] GMT",
)
.expect("Invalid RFC1123 date format")
});

let time_str = OffsetDateTime::now_utc().format(&date_format).unwrap();
pub fn get_date_time_string_with_milliseconds() -> String {
let time_str = OffsetDateTime::now_utc()
.format(&*ISO8601_MILLIS_FORMAT)
.expect("Failed to format ISO8601 millis date");
// Truncate to 23 chars: "YYYY-MM-DDTHH:MM:SS.mmm"
time_str.chars().take(23).collect()
}

pub fn get_date_time_string() -> String {
let date_format =
format_description::parse("[year]-[month]-[day]T[hour]:[minute]:[second]Z").unwrap();

let time_str = OffsetDateTime::now_utc().format(&date_format).unwrap();
time_str.chars().collect()
OffsetDateTime::now_utc()
.format(&*ISO8601_FORMAT)
.expect("Failed to format ISO8601 date")
}

// This format is also the preferred HTTP date format. https://httpwg.org/specs/rfc9110.html#http.date
pub fn get_date_time_rfc1123_string() -> String {
let date_format = format_description::parse(
"[weekday repr:short], [day] [month repr:short] [year] [hour]:[minute]:[second] GMT",
)
.unwrap();

let time_str = OffsetDateTime::now_utc().format(&date_format).unwrap();
time_str.chars().collect()
OffsetDateTime::now_utc()
.format(&*RFC1123_FORMAT)
.expect("Failed to format RFC1123 date")
}

pub fn get_date_time_unix_nano() -> i128 {
Expand Down Expand Up @@ -126,19 +141,45 @@ pub fn try_create_folder(dir: &Path) -> Result<()> {
Ok(())
}

/// Writes a serializable object to a file in JSON format.
/// It first writes to a temporary file and then renames it to the target file to avoid leaving a corrupted file if the write operation fails.
/// Remark: it uses BufWriter to reduce system calls and improve performance.
/// Remark: Called from sync code, infrequent writes and small objects
pub fn json_write_to_file<T>(obj: &T, file_path: &Path) -> Result<()>
where
T: ?Sized + Serialize,
{
use std::io::BufWriter;

// write to a temp file and rename to avoid corrupted file
let temp_file_path = file_path.with_extension("tmp");
let file = File::create(&temp_file_path)?;
serde_json::to_writer_pretty(file, obj)?;
let writer = BufWriter::new(file); // Reduces system calls
serde_json::to_writer_pretty(writer, obj)?;
std::fs::rename(temp_file_path, file_path)?;

Ok(())
}

/// Async version of json_write_to_file using tokio::fs
/// Serializes to memory first (CPU work), then writes asynchronously (IO work)
/// This avoids blocking the async runtime during serialization
/// Remark: Called from async context, writing while handing concurrent requests, and potentially larger objects
pub async fn json_write_to_file_async<T>(obj: &T, file_path: &Path) -> Result<()>
where
T: ?Sized + Serialize,
{
// Serialize to memory first (CPU work - fast)
let json_bytes = serde_json::to_vec_pretty(obj)?;

// Write asynchronously (IO work)
let temp_file_path = file_path.with_extension("tmp");
tokio::fs::write(&temp_file_path, json_bytes).await?;
tokio::fs::rename(&temp_file_path, file_path).await?;

Ok(())
}

pub fn json_read_from_file<T>(file_path: &Path) -> Result<T>
where
T: DeserializeOwned,
Expand Down
10 changes: 5 additions & 5 deletions proxy_agent_shared/src/telemetry/event_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub async fn start<F, Fut>(

let mut file_path = event_dir.to_path_buf();
file_path.push(crate::telemetry::new_generic_event_file_name());
match misc_helpers::json_write_to_file(&events, &file_path) {
match misc_helpers::json_write_to_file_async(&events, &file_path).await {
Ok(()) => {
logger_manager::write_log(
Level::Trace,
Expand Down Expand Up @@ -169,7 +169,7 @@ pub fn write_event_only(level: Level, message: String, method_name: &str, module
};
}

pub fn report_extension_status_event(
pub async fn report_extension_status_event(
extension: crate::telemetry::Extension,
operation_status: crate::telemetry::OperationStatus,
) {
Expand Down Expand Up @@ -210,7 +210,7 @@ pub fn report_extension_status_event(
let event = crate::telemetry::ExtensionStatusEvent::new(extension, operation_status);
let mut file_path = event_dir.to_path_buf();
file_path.push(crate::telemetry::new_extension_event_file_name());
if let Err(e) = misc_helpers::json_write_to_file(&event, &file_path) {
if let Err(e) = misc_helpers::json_write_to_file_async(&event, &file_path).await {
logger_manager::write_log(
Level::Warn,
format!(
Expand Down Expand Up @@ -261,7 +261,7 @@ mod tests {
};

// This should not panic even if EVENTS_DIR is not set
super::report_extension_status_event(extension, operation_status);
super::report_extension_status_event(extension, operation_status).await;

// Start the event logger loop and set the EVENTS_DIR
let cloned_events_dir = events_dir.to_path_buf();
Expand Down Expand Up @@ -326,7 +326,7 @@ mod tests {
};

// Call report_extension_status_event
super::report_extension_status_event(extension.clone(), operation_status.clone());
super::report_extension_status_event(extension.clone(), operation_status.clone()).await;

// Wait for the file to be written
tokio::time::sleep(Duration::from_millis(100)).await;
Expand Down
61 changes: 45 additions & 16 deletions proxy_agent_shared/src/telemetry/event_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,15 @@ mod tests {
misc_helpers::try_create_folder(&events_dir).unwrap();
let mut file_path = events_dir.to_path_buf();
file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano()));
misc_helpers::json_write_to_file(&events, &file_path).unwrap();
misc_helpers::json_write_to_file_async(&events, &file_path)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
let mut file_path = events_dir.to_path_buf();
file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano()));
misc_helpers::json_write_to_file(&events, &file_path).unwrap();

misc_helpers::json_write_to_file_async(&events, &file_path)
.await
.unwrap();
// test EventReader with limits
let event_reader_limits = EventReaderLimits::new()
.with_max_event_file_size_bytes(1024 * 10)
Expand Down Expand Up @@ -413,11 +416,15 @@ mod tests {
tokio::time::sleep(Duration::from_millis(1)).await;
let mut file_path = events_dir.to_path_buf();
file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano()));
misc_helpers::json_write_to_file(&events, &file_path).unwrap();
misc_helpers::json_write_to_file_async(&events, &file_path)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
let mut file_path = events_dir.to_path_buf();
file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano()));
misc_helpers::json_write_to_file(&events, &file_path).unwrap();
misc_helpers::json_write_to_file_async(&events, &file_path)
.await
.unwrap();
let files = misc_helpers::get_files(&events_dir).unwrap();
assert_eq!(2, files.len(), "Must have 2 event files.");

Expand All @@ -435,10 +442,14 @@ mod tests {
"{}.notjson",
misc_helpers::get_date_time_unix_nano()
));
misc_helpers::json_write_to_file(&events, &file_path).unwrap();
misc_helpers::json_write_to_file_async(&events, &file_path)
.await
.unwrap();
let mut file_path = events_dir.to_path_buf();
file_path.push(format!("a{}.json", misc_helpers::get_date_time_unix_nano()));
misc_helpers::json_write_to_file(&events, &file_path).unwrap();
misc_helpers::json_write_to_file_async(&events, &file_path)
.await
.unwrap();
let events_processed = event_reader.process_once().await;
assert_eq!(0, events_processed, "events_processed must be 0.");
let files = misc_helpers::get_files(&events_dir).unwrap();
Expand Down Expand Up @@ -492,13 +503,17 @@ mod tests {
// Write extension event files with proper naming pattern
let mut file_path = events_dir.to_path_buf();
file_path.push(crate::telemetry::new_extension_event_file_name());
misc_helpers::json_write_to_file(&event, &file_path).unwrap();
misc_helpers::json_write_to_file_async(&event, &file_path)
.await
.unwrap();

tokio::time::sleep(Duration::from_millis(1)).await;

let mut file_path2 = events_dir.to_path_buf();
file_path2.push(crate::telemetry::new_extension_event_file_name());
misc_helpers::json_write_to_file(&event, &file_path2).unwrap();
misc_helpers::json_write_to_file_async(&event, &file_path2)
.await
.unwrap();

// Verify files were created
let files = misc_helpers::search_files(
Expand Down Expand Up @@ -529,7 +544,9 @@ mod tests {
// Test with non-matching file names (should not be processed)
let mut non_matching_file = events_dir.to_path_buf();
non_matching_file.push("not_extension_event.json");
misc_helpers::json_write_to_file(&event, &non_matching_file).unwrap();
misc_helpers::json_write_to_file_async(&event, &non_matching_file)
.await
.unwrap();

let events_processed = event_reader.process_extension_status_events().await;
assert_eq!(
Expand All @@ -547,7 +564,9 @@ mod tests {
// Write another event file
let mut file_path3 = events_dir.to_path_buf();
file_path3.push(crate::telemetry::new_extension_event_file_name());
misc_helpers::json_write_to_file(&event, &file_path3).unwrap();
misc_helpers::json_write_to_file_async(&event, &file_path3)
.await
.unwrap();

// Start the processor in a separate task
let event_reader_for_task = EventReader::new(
Expand Down Expand Up @@ -623,13 +642,17 @@ mod tests {
// Write 2 generic event files
let mut generic_file1 = events_dir.to_path_buf();
generic_file1.push(crate::telemetry::new_generic_event_file_name());
misc_helpers::json_write_to_file(&generic_events, &generic_file1).unwrap();
misc_helpers::json_write_to_file_async(&generic_events, &generic_file1)
.await
.unwrap();

tokio::time::sleep(Duration::from_millis(1)).await;

let mut generic_file2 = events_dir.to_path_buf();
generic_file2.push(crate::telemetry::new_generic_event_file_name());
misc_helpers::json_write_to_file(&generic_events, &generic_file2).unwrap();
misc_helpers::json_write_to_file_async(&generic_events, &generic_file2)
.await
.unwrap();

// Create extension status event files
let extension = crate::telemetry::Extension {
Expand All @@ -654,7 +677,9 @@ mod tests {
for _ in 0..3 {
let mut ext_file = events_dir.to_path_buf();
ext_file.push(crate::telemetry::new_extension_event_file_name());
misc_helpers::json_write_to_file(&extension_event, &ext_file).unwrap();
misc_helpers::json_write_to_file_async(&extension_event, &ext_file)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
}

Expand Down Expand Up @@ -728,11 +753,15 @@ mod tests {
// Write one of each type again
let mut generic_file = events_dir.to_path_buf();
generic_file.push(crate::telemetry::new_generic_event_file_name());
misc_helpers::json_write_to_file(&generic_events, &generic_file).unwrap();
misc_helpers::json_write_to_file_async(&generic_events, &generic_file)
.await
.unwrap();

let mut ext_file = events_dir.to_path_buf();
ext_file.push(crate::telemetry::new_extension_event_file_name());
misc_helpers::json_write_to_file(&extension_event, &ext_file).unwrap();
misc_helpers::json_write_to_file_async(&extension_event, &ext_file)
.await
.unwrap();

// Process extension events - should only process extension file
let extension_events_processed = event_reader.process_extension_status_events().await;
Expand Down
Loading