From 6a710831ced1db9e6f9809f620cd2ea9b34d28f1 Mon Sep 17 00:00:00 2001 From: "Zhidong Peng (HE/HIM)" Date: Thu, 15 Jan 2026 14:01:33 -0800 Subject: [PATCH 1/3] Add EventReaderLimits into telemetry event reader mod --- proxy_agent_shared/Cargo.toml | 2 +- .../src/telemetry/event_reader.rs | 217 ++++++++++++++---- .../src/telemetry/telemetry_event.rs | 9 +- 3 files changed, 187 insertions(+), 41 deletions(-) diff --git a/proxy_agent_shared/Cargo.toml b/proxy_agent_shared/Cargo.toml index 182100b3..3fc52c9b 100644 --- a/proxy_agent_shared/Cargo.toml +++ b/proxy_agent_shared/Cargo.toml @@ -16,7 +16,7 @@ serde_json = "1.0.91" # json Deserializer serde-xml-rs = "0.8.1" # xml Deserializer with xml attribute regex = "1.11" # match file name thiserror = "1.0.64" -tokio = { version = "1", features = ["rt", "macros", "sync", "time"] } +tokio = { version = "1", features = ["rt", "macros", "net", "sync", "time"] } tokio-util = "0.7.11" log = { version = "0.4.26", features = ["std"] } ctor = "0.3.6" # used for test setup and clean up diff --git a/proxy_agent_shared/src/telemetry/event_reader.rs b/proxy_agent_shared/src/telemetry/event_reader.rs index ac3d526f..e99170aa 100644 --- a/proxy_agent_shared/src/telemetry/event_reader.rs +++ b/proxy_agent_shared/src/telemetry/event_reader.rs @@ -59,6 +59,35 @@ impl VmMetaData { } } +/// Configuration for limiting EventReader behavior +#[derive(Default, Clone)] +pub struct EventReaderLimits { + pub max_events_per_round: Option, + pub max_event_file_size_bytes: Option, + pub version: Option, +} + +impl EventReaderLimits { + pub fn new() -> Self { + EventReaderLimits::default() + } + + pub fn with_max_events_per_round(mut self, max: usize) -> Self { + self.max_events_per_round = Some(max); + self + } + + pub fn with_max_event_file_size_bytes(mut self, max: u64) -> Self { + self.max_event_file_size_bytes = Some(max); + self + } + + pub fn with_version(mut self, version: String) -> Self { + self.version = Some(version); + self + } +} + pub struct EventReader { dir_path: PathBuf, delay_start: bool, @@ -66,9 +95,16 @@ pub struct EventReader { common_state: CommonState, execution_mode: String, event_name: String, + limits: EventReaderLimits, } impl EventReader { + /// Create a new EventReader without limits on event file size and max events per round. + /// The event reader will read the event files from the specified directory. + /// If delay_start is true, the event reader will delay start for 60 seconds. + /// The common_state is used to store the vm metadata. + /// The execution_mode is used to indicate the mode of the agent. + /// The event_name is used to indicate the name of the event reader. pub fn new( dir_path: PathBuf, delay_start: bool, @@ -84,6 +120,28 @@ impl EventReader { common_state, execution_mode, event_name, + limits: EventReaderLimits::default(), + } + } + + /// Create a new EventReader with limits configuration. + pub fn new_with_limits( + dir_path: PathBuf, + delay_start: bool, + cancellation_token: CancellationToken, + common_state: CommonState, + execution_mode: String, + event_name: String, + limits: EventReaderLimits, + ) -> EventReader { + EventReader { + dir_path, + delay_start, + cancellation_token, + common_state, + execution_mode, + event_name, + limits, } } @@ -145,24 +203,28 @@ impl EventReader { } } - if let Ok(Some(vm_meta_data)) = self.common_state.get_vm_meta_data().await { - let _processed = self - .process_events(&wire_server_client, &vm_meta_data) - .await; - } - + self.process_once(&wire_server_client).await; tokio::time::sleep(interval).await; } } + /// Process the event files from the directory once. + pub async fn process_once(&self, wire_server_client: &WireServerClient) -> usize { + if let Ok(Some(vm_meta_data)) = self.common_state.get_vm_meta_data().await { + self.process_events(wire_server_client, &vm_meta_data).await + } else { + 0 + } + } + async fn process_events( &self, wire_server_client: &WireServerClient, vm_meta_data: &VmMetaData, ) -> usize { let event_count: usize; - // get all .json event files in the directory - match misc_helpers::search_files(&self.dir_path, r"^(.*\.json)$") { + // get all [0-9]+.json event filenames with numbers in the directory + match misc_helpers::search_files(&self.dir_path, r"^[0-9]+\.json$") { Ok(files) => { let file_count = files.len(); event_count = self @@ -185,7 +247,7 @@ impl EventReader { event_count } - async fn update_vm_meta_data( + pub async fn update_vm_meta_data( &self, wire_server_client: &WireServerClient, imds_client: &ImdsClient, @@ -240,6 +302,42 @@ impl EventReader { ) -> usize { let mut num_events_logged = 0; for file in files { + if let Some(max_events) = self.limits.max_events_per_round { + if num_events_logged >= max_events { + logger_manager::write_warn(format!( + "EventReader:: Reached the max number of events to be read per round: {}. Stop processing file {} this round.", + max_events, + file.display() + )); + // do not delete this event json file, will try process it at next round + continue; + } + } + + match file.metadata() { + Err(e) => { + logger_manager::write_warn(format!( + "EventReader:: Failed to get metadata for file {}: {}", + file.display(), + e + )); + continue; + } + Ok(metadata) => { + if let Some(max_size) = self.limits.max_event_file_size_bytes { + if metadata.len() > max_size { + logger_manager::write_warn(format!( + "EventReader:: File {} exceeds the size limit of {} bytes, skip it.", + file.display(), + max_size + )); + // clean up the file to avoid blocking further processing + Self::clean_file(file); + continue; + } + } + } + } match misc_helpers::json_read_from_file::>(&file) { Ok(events) => { num_events_logged += events.len(); @@ -248,13 +346,13 @@ impl EventReader { } Err(e) => { logger_manager::write_warn(format!( - "Failed to read events from file {}: {}", + "EventReader:: Failed to read events from file {}: {}", file.display(), e )); } } - Self::clean_files(file); + Self::clean_file(file); } num_events_logged } @@ -277,6 +375,7 @@ impl EventReader { vm_meta_data.clone(), self.execution_mode.clone(), self.event_name.clone(), + self.limits.version.clone(), )); if telemetry_data.get_size() >= Self::MAX_MESSAGE_SIZE { @@ -337,7 +436,7 @@ impl EventReader { } } - fn clean_files(file: PathBuf) { + fn clean_file(file: PathBuf) { match remove_file(&file) { Ok(_) => { logger_manager::write_info(format!("Removed File: {}", file.display())); @@ -351,15 +450,6 @@ impl EventReader { } } } - - #[cfg(test)] - async fn get_vm_meta_data(&self) -> VmMetaData { - if let Ok(Some(vm_meta_data)) = self.common_state.get_vm_meta_data().await { - vm_meta_data - } else { - VmMetaData::empty() - } - } } #[cfg(test)] @@ -383,14 +473,6 @@ mod tests { let port = 7071u16; let cancellation_token = CancellationToken::new(); let common_state = CommonState::start_new(); - let event_reader = EventReader { - dir_path: events_dir.clone(), - delay_start: false, - cancellation_token: cancellation_token.clone(), - common_state: common_state.clone(), - execution_mode: "Test".to_string(), - event_name: "test_event_reader_thread".to_string(), - }; let wire_server_client = WireServerClient::new(ip, port); let imds_client = ImdsClient::new(ip, port); tokio::spawn(server_mock::start( @@ -401,6 +483,16 @@ mod tests { tokio::time::sleep(Duration::from_millis(100)).await; logger_manager::write_info("server_mock started.".to_string()); + let event_reader = EventReader::new( + events_dir.clone(), + false, + cancellation_token.clone(), + common_state.clone(), + "Test".to_string(), + "test_event_reader_thread".to_string(), + ); + + // refresh vm metadata match event_reader .update_vm_meta_data(&wire_server_client, &imds_client) .await @@ -413,7 +505,7 @@ mod tests { } } - // Write 10 events to events dir + // Write events to events dir let message = r#"{\"method\":\"GET\",\"url\":\"/machine/37569ad2-69a3-44fd-b653-813e62a177cf/68938c06%2D5233%2D4ff9%2Da173%2D0ac0a2754f8a.%5FWS2022?comp=config&type=hostingEnvironmentConfig&incarnation=2\",\"ip\":\"168.63.129.16\",\"port\":80,\"userId\":999,\"userName\":\"WS2022$\",\"processName\":\"C:\\\\WindowsAzure\\\\GuestAgent_2.7.41491.1071_2023-03-02_185502\\\\WindowsAzureGuestAgent.exe\",\"runAsElevated\":true,\"responseStatus\":\"200 OK\",\"elapsedTime\":8}"#; let mut events: Vec = Vec::new(); for _ in [0; 10] { @@ -429,28 +521,75 @@ mod tests { let mut file_path = events_dir.to_path_buf(); file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano())); misc_helpers::json_write_to_file(&events, &file_path).unwrap(); + tokio::time::sleep(Duration::from_millis(1)).await; + let mut file_path = events_dir.to_path_buf(); + file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano())); + misc_helpers::json_write_to_file(&events, &file_path).unwrap(); + // test EventReader with limits + let event_reader_limits = EventReaderLimits::new() + .with_max_event_file_size_bytes(1024 * 10) + .with_max_events_per_round(10) + .with_version("test_version".to_string()); + let event_reader_with_limits = EventReader::new_with_limits( + events_dir.clone(), + false, + cancellation_token.clone(), + common_state.clone(), + "Test".to_string(), + "test_event_reader_thread".to_string(), + event_reader_limits.clone(), + ); // Check the events processed - let vm_meta_data = event_reader.get_vm_meta_data().await; - let events_processed = event_reader - .process_events(&wire_server_client, &vm_meta_data) + let events_processed = event_reader_with_limits + .process_once(&wire_server_client) + .await; + logger_manager::write_info(format!("Send {} events from event files", events_processed)); + //Should be 10 events processed and read into events Vector + assert_eq!(events_processed, 10, "Events processed should be 10"); + let files = misc_helpers::get_files(&events_dir).unwrap(); + assert_eq!(1, files.len(), "Must still have 1 event file."); + // test EventReader with limits - second round + let events_processed = event_reader_with_limits + .process_once(&wire_server_client) .await; logger_manager::write_info(format!("Send {} events from event files", events_processed)); - //Should be 10 events written and read into events Vector + //Should be 10 events processed and read into events Vector assert_eq!(events_processed, 10, "Events processed should be 10"); let files = misc_helpers::get_files(&events_dir).unwrap(); - assert!(files.is_empty(), "Events files not cleaned up."); + assert!(files.is_empty(), "Must have no event files."); + + // Write 2 event files again for next test + tokio::time::sleep(Duration::from_millis(1)).await; + let mut file_path = events_dir.to_path_buf(); + file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano())); + misc_helpers::json_write_to_file(&events, &file_path).unwrap(); + tokio::time::sleep(Duration::from_millis(1)).await; + let mut file_path = events_dir.to_path_buf(); + file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano())); + misc_helpers::json_write_to_file(&events, &file_path).unwrap(); + let files = misc_helpers::get_files(&events_dir).unwrap(); + assert_eq!(2, files.len(), "Must have 2 event files."); + + // test EventReader without limits + let events_processed = event_reader.process_once(&wire_server_client).await; + logger_manager::write_info(format!("Send {} events from event files", events_processed)); + //Should be 20 events processed and read into events Vector + assert_eq!(events_processed, 20, "Events processed should be 20"); + let files = misc_helpers::get_files(&events_dir).unwrap(); + assert!(files.is_empty(), "Must have no event files."); - // Test not processing the non-json files + // Test not processing the non-json files, nor the file name containing non-numeric characters let mut file_path = events_dir.to_path_buf(); file_path.push(format!( "{}.notjson", misc_helpers::get_date_time_unix_nano() )); misc_helpers::json_write_to_file(&events, &file_path).unwrap(); - let events_processed = event_reader - .process_events(&wire_server_client, &vm_meta_data) - .await; + let mut file_path = events_dir.to_path_buf(); + file_path.push(format!("a{}.json", misc_helpers::get_date_time_unix_nano())); + misc_helpers::json_write_to_file(&events, &file_path).unwrap(); + let events_processed = event_reader.process_once(&wire_server_client).await; assert_eq!(0, events_processed, "events_processed must be 0."); let files = misc_helpers::get_files(&events_dir).unwrap(); assert!( diff --git a/proxy_agent_shared/src/telemetry/telemetry_event.rs b/proxy_agent_shared/src/telemetry/telemetry_event.rs index 79149d70..a2ecad5d 100644 --- a/proxy_agent_shared/src/telemetry/telemetry_event.rs +++ b/proxy_agent_shared/src/telemetry/telemetry_event.rs @@ -91,11 +91,17 @@ impl TelemetryEvent { vm_meta_data: VmMetaData, execution_mode: String, event_name: String, + ga_version: Option, ) -> Self { + // if ga_version is provided, append it to event version to event_name + // if ga_version is None, use event_log.Version as ga_version and keep event_name unchanged + let (ga_version, event_name) = match ga_version { + Some(version) => (version, format!("{}-{}", event_name, event_log.Version)), + None => (event_log.Version.to_string(), event_name), + }; TelemetryEvent { event_pid: event_log.EventPid.parse::().unwrap_or(0), event_tid: event_log.EventTid.parse::().unwrap_or(0), - ga_version: event_log.Version.to_string(), task_name: event_log.TaskName.to_string(), opcode_name: event_log.TimeStamp.to_string(), capability_used: event_log.EventLevel.to_string(), @@ -103,6 +109,7 @@ impl TelemetryEvent { context2: event_log.TimeStamp.to_string(), context3: event_log.OperationId.to_string(), + ga_version, execution_mode, event_name, os_version: current_info::get_long_os_version(), From 093274b94634b7afde2fb24a9f6c013b9f6e4857 Mon Sep 17 00:00:00 2001 From: "Zhidong Peng (HE/HIM)" Date: Thu, 15 Jan 2026 14:08:12 -0800 Subject: [PATCH 2/3] update comments --- proxy_agent_shared/src/telemetry/telemetry_event.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy_agent_shared/src/telemetry/telemetry_event.rs b/proxy_agent_shared/src/telemetry/telemetry_event.rs index a2ecad5d..dbb06952 100644 --- a/proxy_agent_shared/src/telemetry/telemetry_event.rs +++ b/proxy_agent_shared/src/telemetry/telemetry_event.rs @@ -93,7 +93,7 @@ impl TelemetryEvent { event_name: String, ga_version: Option, ) -> Self { - // if ga_version is provided, append it to event version to event_name + // if ga_version is provided, append event_log.version to event_name // if ga_version is None, use event_log.Version as ga_version and keep event_name unchanged let (ga_version, event_name) = match ga_version { Some(version) => (version, format!("{}-{}", event_name, event_log.Version)), From 5f3a3871fe7e428e2c9a6eda0694fedc6ea4769b Mon Sep 17 00:00:00 2001 From: "Zhidong Peng (HE/HIM)" Date: Thu, 22 Jan 2026 11:40:00 -0800 Subject: [PATCH 3/3] support ExtensionStatusEvents --- proxy_agent/src/provision.rs | 12 +- proxy_agent_shared/src/common_state.rs | 67 ++- proxy_agent_shared/src/current_info.rs | 6 + proxy_agent_shared/src/telemetry.rs | 91 +++- .../src/telemetry/event_logger.rs | 67 ++- .../src/telemetry/event_reader.rs | 407 +++++----------- .../src/telemetry/event_sender.rs | 218 +++++++++ .../src/telemetry/telemetry_event.rs | 449 +++++++++++++----- 8 files changed, 916 insertions(+), 401 deletions(-) create mode 100644 proxy_agent_shared/src/telemetry/event_sender.rs diff --git a/proxy_agent/src/provision.rs b/proxy_agent/src/provision.rs index fd11fbea..5b2fa519 100644 --- a/proxy_agent/src/provision.rs +++ b/proxy_agent/src/provision.rs @@ -19,6 +19,7 @@ use crate::{proxy_agent_status, redirector}; use proxy_agent_shared::logger::LoggerLevel; use proxy_agent_shared::telemetry::event_logger; use proxy_agent_shared::telemetry::event_reader::EventReader; +use proxy_agent_shared::telemetry::event_sender::EventSender; use proxy_agent_shared::{misc_helpers, proxy_agent_aggregate_status}; use std::path::PathBuf; use std::time::Duration; @@ -370,18 +371,23 @@ pub async fn start_event_threads(event_threads_shared_state: EventThreadsSharedS tokio::spawn({ let event_reader = EventReader::new( config::get_events_dir(), - true, - event_threads_shared_state.cancellation_token.clone(), event_threads_shared_state.common_state.clone(), "ProxyAgent".to_string(), "MicrosoftAzureGuestProxyAgent".to_string(), ); async move { event_reader - .start(Some(Duration::from_secs(300)), None, None) + .start(true, Some(Duration::from_secs(300))) .await; } }); + tokio::spawn({ + let event_sender = EventSender::new(event_threads_shared_state.common_state.clone()); + async move { + event_sender.start(None, None).await; + } + }); + if let Err(e) = event_threads_shared_state .provision_shared_state .set_event_log_threads_initialized() diff --git a/proxy_agent_shared/src/common_state.rs b/proxy_agent_shared/src/common_state.rs index e5ab29a5..039c2b75 100644 --- a/proxy_agent_shared/src/common_state.rs +++ b/proxy_agent_shared/src/common_state.rs @@ -4,8 +4,10 @@ //! This module contains the logic to get and update common states. use crate::result::Result; -use crate::{error::Error, logger::logger_manager, telemetry::event_reader::VmMetaData}; -use tokio::sync::{mpsc, oneshot}; +use crate::{error::Error, logger::logger_manager, telemetry::telemetry_event::VmMetaData}; +use std::sync::Arc; +use tokio::sync::{mpsc, oneshot, Notify}; +use tokio_util::sync::CancellationToken; pub const SECURE_KEY_GUID: &str = "key_guid"; pub const SECURE_KEY_VALUE: &str = "key_value"; @@ -27,10 +29,17 @@ enum CommonStateAction { key: String, response: oneshot::Sender>, }, + GetTelemetryEventNotify { + response: oneshot::Sender>, + }, } #[derive(Clone, Debug)] -pub struct CommonState(mpsc::Sender); +pub struct CommonState { + /// The cancellation token is used to cancel the agent when the agent is stopped + cancellation_token: CancellationToken, + sender: mpsc::Sender, +} impl CommonState { pub fn start_new() -> Self { @@ -39,6 +48,8 @@ impl CommonState { let mut vm_meta_data: Option = None; let mut states: std::collections::HashMap = std::collections::HashMap::new(); + let telemetry_event_notify = Arc::new(Notify::new()); + loop { match receiver.recv().await { Some(CommonStateAction::SetVmMetaData { @@ -79,6 +90,13 @@ impl CommonState { )); } } + Some(CommonStateAction::GetTelemetryEventNotify { response }) => { + if let Err(notify) = response.send(telemetry_event_notify.clone()) { + logger_manager::write_warn(format!( + "Failed to send response to CommonStateAction::GetTelemetryEventNotify '{notify:?}'" + )); + } + } None => { break; } @@ -86,12 +104,15 @@ impl CommonState { } }); - Self(sender) + Self { + cancellation_token: CancellationToken::new(), + sender, + } } pub async fn set_vm_meta_data(&self, vm_meta_data: Option) -> Result<()> { let (response, receiver) = oneshot::channel(); - self.0 + self.sender .send(CommonStateAction::SetVmMetaData { vm_meta_data, response, @@ -110,7 +131,7 @@ impl CommonState { pub async fn get_vm_meta_data(&self) -> Result> { let (response, receiver) = oneshot::channel(); - self.0 + self.sender .send(CommonStateAction::GetVmMetaData { response }) .await .map_err(|e| { @@ -126,7 +147,7 @@ impl CommonState { pub async fn set_state(&self, key: String, value: String) -> Result<()> { let (response, receiver) = oneshot::channel(); - self.0 + self.sender .send(CommonStateAction::SetState { key, value, @@ -143,7 +164,7 @@ impl CommonState { pub async fn get_state(&self, key: String) -> Result> { let (response, receiver) = oneshot::channel(); - self.0 + self.sender .send(CommonStateAction::GetState { key, response }) .await .map_err(|e| { @@ -153,4 +174,34 @@ impl CommonState { .await .map_err(|e| Error::RecvError("CommonStateAction::GetState".to_string(), e)) } + + pub async fn get_telemetry_event_notify(&self) -> Result> { + let (response, receiver) = oneshot::channel(); + self.sender + .send(CommonStateAction::GetTelemetryEventNotify { response }) + .await + .map_err(|e| { + Error::SendError( + "CommonStateAction::GetTelemetryEventNotify".to_string(), + e.to_string(), + ) + })?; + receiver.await.map_err(|e| { + Error::RecvError("CommonStateAction::GetTelemetryEventNotify".to_string(), e) + }) + } + + pub async fn notify_telemetry_event(&self) -> Result<()> { + let notify = self.get_telemetry_event_notify().await?; + notify.notify_one(); + Ok(()) + } + + pub fn get_cancellation_token(&self) -> CancellationToken { + self.cancellation_token.clone() + } + + pub fn cancel_cancellation_token(&self) { + self.cancellation_token.cancel(); + } } diff --git a/proxy_agent_shared/src/current_info.rs b/proxy_agent_shared/src/current_info.rs index e9e66544..c2070e8c 100644 --- a/proxy_agent_shared/src/current_info.rs +++ b/proxy_agent_shared/src/current_info.rs @@ -45,6 +45,8 @@ static CURRENT_OS_INFO: Lazy<(String, String)> = Lazy::new(|| { (arch, os) }); +static CURRENT_EXE_VERSION: Lazy = Lazy::new(misc_helpers::get_current_exe_version); + pub fn get_ram_in_mb() -> u64 { CURRENT_SYS_INFO.0 } @@ -61,6 +63,10 @@ pub fn get_long_os_version() -> String { CURRENT_OS_INFO.1.to_string() } +pub fn get_current_exe_version() -> String { + CURRENT_EXE_VERSION.clone() +} + #[cfg(test)] mod tests { #[test] diff --git a/proxy_agent_shared/src/telemetry.rs b/proxy_agent_shared/src/telemetry.rs index 77be8ee0..b3599e3e 100644 --- a/proxy_agent_shared/src/telemetry.rs +++ b/proxy_agent_shared/src/telemetry.rs @@ -2,12 +2,23 @@ // SPDX-License-Identifier: MIT pub mod event_logger; pub mod event_reader; +pub mod event_sender; pub mod span; pub mod telemetry_event; -use crate::misc_helpers; +use crate::{current_info, misc_helpers}; use serde_derive::{Deserialize, Serialize}; +pub const GENERIC_EVENT_FILE_SEARCH_PATTERN: &str = r"^[0-9]+\.json$"; +pub const EXTENSION_EVENT_FILE_SEARCH_PATTERN: &str = r"^extension_[0-9]+\.json$"; +pub const MAX_EXTENSION_EVENT_FILE_COUNT: usize = 1000; +pub fn new_extension_event_file_name() -> String { + format!("extension_{}.json", misc_helpers::get_date_time_unix_nano()) +} +pub fn new_generic_event_file_name() -> String { + format!("{}.json", misc_helpers::get_date_time_unix_nano()) +} + #[derive(Serialize, Deserialize)] #[allow(non_snake_case)] pub struct Event { @@ -26,7 +37,7 @@ impl Event { Event { EventLevel: level, Message: message, - Version: misc_helpers::get_current_exe_version(), + Version: current_info::get_current_exe_version(), TaskName: task_name, EventPid: std::process::id().to_string(), EventTid: misc_helpers::get_thread_identity(), @@ -36,6 +47,53 @@ impl Event { } } +#[derive(Serialize, Deserialize, Clone)] +pub struct Extension { + pub name: String, + pub version: String, + pub is_internal: bool, + pub extension_type: String, +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct OperationStatus { + pub operation_success: bool, + pub operation: String, + pub task_name: String, + pub message: String, + pub duration: i64, +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct ExtensionStatusEvent { + pub extension: Extension, + pub operation_status: OperationStatus, + + pub event_pid: String, + pub event_tid: String, + pub time_stamp: String, +} + +impl ExtensionStatusEvent { + /// Create a new ExtensionStatusEvent + /// Rust does not recommend using too many arguments in a function, + /// so we use structs to group related arguments together. + /// # Arguments + /// * `extension` - The extension information + /// * `operation_status` - The operation status information + /// # Returns + /// A new instance of `ExtensionStatusEvent` + pub fn new(extension: Extension, operation_status: OperationStatus) -> Self { + ExtensionStatusEvent { + extension, + operation_status, + event_pid: std::process::id().to_string(), + event_tid: misc_helpers::get_thread_identity(), + time_stamp: misc_helpers::get_date_time_string_with_milliseconds(), + } + } +} + #[cfg(test)] mod tests { #[test] @@ -51,4 +109,33 @@ mod tests { assert_eq!(event.TaskName, "test task name".to_string()); assert_eq!(event.OperationId, "test operation id".to_string()); } + + #[test] + fn test_extension_status_event_new() { + let extension = super::Extension { + name: "test extension".to_string(), + version: "1.0.0".to_string(), + is_internal: true, + extension_type: "test type".to_string(), + }; + let operation_status = super::OperationStatus { + operation_success: true, + task_name: "test task".to_string(), + operation: "test operation".to_string(), + message: "test message".to_string(), + duration: 100, + }; + let event = super::ExtensionStatusEvent::new(extension.clone(), operation_status.clone()); + assert_eq!(event.extension.name, extension.name); + assert_eq!(event.extension.version, extension.version); + assert_eq!(event.extension.is_internal, extension.is_internal); + assert_eq!(event.extension.extension_type, extension.extension_type); + assert_eq!( + event.operation_status.operation_success, + operation_status.operation_success + ); + assert_eq!(event.operation_status.operation, operation_status.operation); + assert_eq!(event.operation_status.message, operation_status.message); + assert_eq!(event.operation_status.duration, operation_status.duration); + } } diff --git a/proxy_agent_shared/src/telemetry/event_logger.rs b/proxy_agent_shared/src/telemetry/event_logger.rs index 6d7ea3a9..33570652 100644 --- a/proxy_agent_shared/src/telemetry/event_logger.rs +++ b/proxy_agent_shared/src/telemetry/event_logger.rs @@ -17,6 +17,7 @@ pub const MAX_MESSAGE_LENGTH: usize = 1024 * 4; // 4KB static EVENT_QUEUE: Lazy> = Lazy::new(|| ConcurrentQueue::::bounded(1000)); static SHUT_DOWN: Lazy> = Lazy::new(|| Arc::new(AtomicBool::new(false))); +static EVENT_DIR: tokio::sync::OnceCell = tokio::sync::OnceCell::const_new(); pub async fn start( event_dir: PathBuf, @@ -37,6 +38,12 @@ pub async fn start( set_status_fn(message.to_string()); } + if EVENT_DIR.set(event_dir.clone()).is_err() { + let message = "Event directory is already set, cannot set it again."; + set_status_fn(message.to_string()); + logger_manager::write_log(Level::Warn, message.to_string()); + } + let shutdown = SHUT_DOWN.clone(); if interval == Duration::default() { interval = Duration::from_secs(60); @@ -72,7 +79,10 @@ pub async fn start( // Check the event file counts, // if it exceeds the max file number, drop the new events - match misc_helpers::get_files(&event_dir) { + match misc_helpers::search_files( + &event_dir, + crate::telemetry::GENERIC_EVENT_FILE_SEARCH_PATTERN, + ) { Ok(files) => { if files.len() >= max_event_file_count { logger_manager::write_log(Level::Warn, format!( @@ -90,8 +100,7 @@ pub async fn start( } let mut file_path = event_dir.to_path_buf(); - - file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano())); + file_path.push(crate::telemetry::new_generic_event_file_name()); match misc_helpers::json_write_to_file(&events, &file_path) { Ok(()) => { logger_manager::write_log( @@ -155,6 +164,58 @@ pub fn write_event_only(level: Level, message: String, method_name: &str, module }; } +pub fn report_extension_status_event( + extension: crate::telemetry::Extension, + operation_status: crate::telemetry::OperationStatus, +) { + let event_dir = match EVENT_DIR.get() { + Some(dir) => dir.clone(), + None => { + logger_manager::write_log( + Level::Warn, + "Event directory is not set, cannot report extension status event.".to_string(), + ); + return; + } + }; + + // Check the event file counts, + // if it exceeds the max file number, drop the new events + match misc_helpers::search_files( + &event_dir, + crate::telemetry::EXTENSION_EVENT_FILE_SEARCH_PATTERN, + ) { + Ok(files) => { + if files.len() >= crate::telemetry::MAX_EXTENSION_EVENT_FILE_COUNT { + logger_manager::write_log(Level::Warn, format!( + "Event files exceed the max file count {}, drop and skip the write to disk.", + crate::telemetry::MAX_EXTENSION_EVENT_FILE_COUNT + )); + return; + } + } + Err(e) => { + logger_manager::write_log( + Level::Warn, + format!("Failed to get event files with error: {e}"), + ); + } + } + + let event = crate::telemetry::ExtensionStatusEvent::new(extension, operation_status); + let mut file_path = event_dir.to_path_buf(); + file_path.push(crate::telemetry::new_extension_event_file_name()); + if let Err(e) = misc_helpers::json_write_to_file(&event, &file_path) { + logger_manager::write_log( + Level::Warn, + format!( + "Failed to write extension status event to the file {} with error: {}", + file_path.display(), + e + ), + ); + } +} #[cfg(test)] mod tests { use crate::misc_helpers; diff --git a/proxy_agent_shared/src/telemetry/event_reader.rs b/proxy_agent_shared/src/telemetry/event_reader.rs index e99170aa..a1e897da 100644 --- a/proxy_agent_shared/src/telemetry/event_reader.rs +++ b/proxy_agent_shared/src/telemetry/event_reader.rs @@ -1,63 +1,21 @@ // Copyright (c) Microsoft Corporation // SPDX-License-Identifier: MIT -//! This module contains the logic to read the telemetry event files and send them to the wire server. +//! This module contains the logic to read the telemetry event files. //! The telemetry event files are written by the event_logger module. -use crate::common_state; use crate::common_state::CommonState; -use crate::host_clients::imds_client::ImdsClient; -use crate::host_clients::wire_server_client::WireServerClient; +use crate::current_info; use crate::logger::logger_manager; use crate::misc_helpers; -use crate::result::Result; -use crate::telemetry::telemetry_event::TelemetryData; +use crate::telemetry::event_sender; use crate::telemetry::telemetry_event::TelemetryEvent; +use crate::telemetry::telemetry_event::TelemetryExtensionEventsEvent; +use crate::telemetry::telemetry_event::TelemetryGenericLogsEvent; use crate::telemetry::Event; use std::fs::remove_file; use std::path::PathBuf; use std::time::Duration; -use tokio_util::sync::CancellationToken; - -#[cfg(test)] -const EMPTY_GUID: &str = "00000000-0000-0000-0000-000000000000"; - -const WIRE_SERVER_IP: &str = "168.63.129.16"; -const WIRE_SERVER_PORT: u16 = 80u16; -const IMDS_IP: &str = "169.254.169.254"; -const IMDS_PORT: u16 = 80u16; - -/// VmMetaData contains the metadata of the VM. -/// The metadata is used to identify the VM and the image origin. -/// It will be part of the telemetry data send to the wire server. -/// The metadata is updated by the wire server and the IMDS client. -#[derive(Clone, Debug)] -pub struct VmMetaData { - pub container_id: String, - pub tenant_name: String, - pub role_name: String, - pub role_instance_name: String, - pub subscription_id: String, - pub resource_group_name: String, - pub vm_id: String, - pub image_origin: u64, -} - -impl VmMetaData { - #[cfg(test)] - pub fn empty() -> Self { - VmMetaData { - container_id: EMPTY_GUID.to_string(), - tenant_name: EMPTY_GUID.to_string(), - role_name: EMPTY_GUID.to_string(), - role_instance_name: EMPTY_GUID.to_string(), - subscription_id: EMPTY_GUID.to_string(), - resource_group_name: EMPTY_GUID.to_string(), - vm_id: EMPTY_GUID.to_string(), - image_origin: 3, // unknown - } - } -} /// Configuration for limiting EventReader behavior #[derive(Default, Clone)] @@ -90,8 +48,6 @@ impl EventReaderLimits { pub struct EventReader { dir_path: PathBuf, - delay_start: bool, - cancellation_token: CancellationToken, common_state: CommonState, execution_mode: String, event_name: String, @@ -107,16 +63,12 @@ impl EventReader { /// The event_name is used to indicate the name of the event reader. pub fn new( dir_path: PathBuf, - delay_start: bool, - cancellation_token: CancellationToken, common_state: CommonState, execution_mode: String, event_name: String, ) -> EventReader { EventReader { dir_path, - delay_start, - cancellation_token, common_state, execution_mode, event_name, @@ -127,8 +79,6 @@ impl EventReader { /// Create a new EventReader with limits configuration. pub fn new_with_limits( dir_path: PathBuf, - delay_start: bool, - cancellation_token: CancellationToken, common_state: CommonState, execution_mode: String, event_name: String, @@ -136,8 +86,6 @@ impl EventReader { ) -> EventReader { EventReader { dir_path, - delay_start, - cancellation_token, common_state, execution_mode, event_name, @@ -145,91 +93,130 @@ impl EventReader { } } - pub async fn start( + pub async fn start_extension_status_event_processor( &self, + delay_start: bool, interval: Option, - server_ip: Option<&str>, - server_port: Option, ) { - logger_manager::write_info("telemetry event reader task started.".to_string()); + if delay_start { + // delay start the event_reader task to give additional CPU cycles to more important threads + tokio::time::sleep(Duration::from_secs(60)).await; + } - let wire_server_client = WireServerClient::new( - server_ip.unwrap_or(WIRE_SERVER_IP), - server_port.unwrap_or(WIRE_SERVER_PORT), + logger_manager::write_info( + "telemetry extension status event reader task started.".to_string(), ); - let imds_client = ImdsClient::new( - server_ip.unwrap_or(IMDS_IP), - server_port.unwrap_or(IMDS_PORT), - ); - - let interval = interval.unwrap_or(Duration::from_secs(300)); + let interval = interval.unwrap_or(Duration::from_secs(60)); + let cancellation_token = self.common_state.get_cancellation_token(); tokio::select! { - _ = self.loop_reader(interval, wire_server_client, imds_client ) => {} - _ = self.cancellation_token.cancelled() => { - logger_manager::write_warn("cancellation token signal received, stop the telemetry event reader task.".to_string()); + _ = self.loop_extension_status_event_processor(interval ) => {} + _ = cancellation_token.cancelled() => { + logger_manager::write_warn("cancellation token signal received, stop the telemetry extension status event reader task.".to_string()); } } } - async fn loop_reader( - &self, - interval: Duration, - wire_server_client: WireServerClient, - imds_client: ImdsClient, - ) { - let mut first = true; - + async fn loop_extension_status_event_processor(&self, interval: Duration) { loop { - if first { - if self.delay_start { - // delay start the event_reader task to give additional CPU cycles to more important threads - tokio::time::sleep(Duration::from_secs(60)).await; + self.process_extension_status_events().await; + tokio::time::sleep(interval).await; + } + } + + async fn process_extension_status_events(&self) -> usize { + let mut event_count: usize = 0; + // get all extension status event filenames in the directory + match misc_helpers::search_files( + &self.dir_path, + crate::telemetry::EXTENSION_EVENT_FILE_SEARCH_PATTERN, + ) { + Ok(files) => { + let file_count = files.len(); + for file in files { + event_count += self.process_one_extension_status_event_file(file).await; } - first = false; + logger_manager::write_info( format!( + "Telemetry event reader sent {event_count} extension status events from {file_count} files" + )); + } + Err(e) => { + logger_manager::write_warn(format!( + "Extension Status Event Files not found in directory {}: {}", + self.dir_path.display(), + e + )); } + } + event_count + } - // refresh vm metadata - match self - .update_vm_meta_data(&wire_server_client, &imds_client) - .await - { - Ok(()) => { - logger_manager::write_info("success updated the vm metadata.".to_string()); - } - Err(e) => { + async fn process_one_extension_status_event_file(&self, file: PathBuf) -> usize { + let mut num_events_logged = 0; + + match misc_helpers::json_read_from_file::(&file) { + Ok(event) => { + num_events_logged += 1; + let telemetry_event = TelemetryExtensionEventsEvent::from_extension_status_event( + &event, + self.execution_mode.clone(), + current_info::get_current_exe_version(), + ); + let telemetry_event = TelemetryEvent::ExtensionEvent(telemetry_event); + event_sender::enqueue_event(telemetry_event); + if let Err(e) = self.common_state.notify_telemetry_event().await { logger_manager::write_warn(format!( - "Failed to read vm metadata with error {e}." + "Failed to notify telemetry event with error: {e}" )); } } + Err(e) => { + logger_manager::write_warn(format!( + "EventReader:: Failed to read extension status event from file {}: {}", + file.display(), + e + )); + } + } - self.process_once(&wire_server_client).await; - tokio::time::sleep(interval).await; + Self::clean_file(file); + num_events_logged + } + + pub async fn start(&self, delay_start: bool, interval: Option) { + if delay_start { + // delay start the event_reader task to give additional CPU cycles to more important threads + tokio::time::sleep(Duration::from_secs(60)).await; + } + + logger_manager::write_info("telemetry event reader task started.".to_string()); + let interval = interval.unwrap_or(Duration::from_secs(300)); + let cancellation_token = self.common_state.get_cancellation_token(); + tokio::select! { + _ = self.loop_reader(interval ) => {} + _ = cancellation_token.cancelled() => { + logger_manager::write_warn("cancellation token signal received, stop the telemetry event reader task.".to_string()); + } } } - /// Process the event files from the directory once. - pub async fn process_once(&self, wire_server_client: &WireServerClient) -> usize { - if let Ok(Some(vm_meta_data)) = self.common_state.get_vm_meta_data().await { - self.process_events(wire_server_client, &vm_meta_data).await - } else { - 0 + async fn loop_reader(&self, interval: Duration) { + loop { + self.process_once().await; + tokio::time::sleep(interval).await; } } - async fn process_events( - &self, - wire_server_client: &WireServerClient, - vm_meta_data: &VmMetaData, - ) -> usize { + /// Process the event files from the directory once. + pub async fn process_once(&self) -> usize { let event_count: usize; // get all [0-9]+.json event filenames with numbers in the directory - match misc_helpers::search_files(&self.dir_path, r"^[0-9]+\.json$") { + match misc_helpers::search_files( + &self.dir_path, + crate::telemetry::GENERIC_EVENT_FILE_SEARCH_PATTERN, + ) { Ok(files) => { let file_count = files.len(); - event_count = self - .process_events_and_clean(files, wire_server_client, vm_meta_data) - .await; + event_count = self.process_events_and_clean(files).await; let message = format!( "Telemetry event reader sent {event_count} events from {file_count} files" ); @@ -247,59 +234,7 @@ impl EventReader { event_count } - pub async fn update_vm_meta_data( - &self, - wire_server_client: &WireServerClient, - imds_client: &ImdsClient, - ) -> Result<()> { - let guid = self - .common_state - .get_state(common_state::SECURE_KEY_GUID.to_string()) - .await - .unwrap_or(None); - let key = self - .common_state - .get_state(common_state::SECURE_KEY_VALUE.to_string()) - .await - .unwrap_or(None); - let goal_state = wire_server_client - .get_goalstate(guid.clone(), key.clone()) - .await?; - let shared_config = wire_server_client - .get_shared_config( - goal_state.get_shared_config_uri(), - guid.clone(), - key.clone(), - ) - .await?; - - let instance_info = imds_client - .get_imds_instance_info(guid.clone(), key.clone()) - .await?; - let vm_meta_data = VmMetaData { - container_id: goal_state.get_container_id(), - role_name: shared_config.get_role_name(), - role_instance_name: shared_config.get_role_instance_name(), - tenant_name: shared_config.get_deployment_name(), - subscription_id: instance_info.get_subscription_id(), - resource_group_name: instance_info.get_resource_group_name(), - vm_id: instance_info.get_vm_id(), - image_origin: instance_info.get_image_origin(), - }; - - self.common_state - .set_vm_meta_data(Some(vm_meta_data)) - .await?; - - Ok(()) - } - - async fn process_events_and_clean( - &self, - files: Vec, - wire_server_client: &WireServerClient, - vm_meta_data: &VmMetaData, - ) -> usize { + async fn process_events_and_clean(&self, files: Vec) -> usize { let mut num_events_logged = 0; for file in files { if let Some(max_events) = self.limits.max_events_per_round { @@ -341,8 +276,7 @@ impl EventReader { match misc_helpers::json_read_from_file::>(&file) { Ok(events) => { num_events_logged += events.len(); - self.send_events(events, wire_server_client, vm_meta_data) - .await; + self.handle_events(events).await; } Err(e) => { logger_manager::write_warn(format!( @@ -357,81 +291,32 @@ impl EventReader { num_events_logged } - const MAX_MESSAGE_SIZE: usize = 1024 * 64; - async fn send_events( - &self, - mut events: Vec, - wire_server_client: &WireServerClient, - vm_meta_data: &VmMetaData, - ) { + async fn handle_events(&self, mut events: Vec) { + let mut queued_event = false; while !events.is_empty() { - let mut telemetry_data = TelemetryData::new(); - let mut add_more_events = true; - while !events.is_empty() && add_more_events { - match events.pop() { - Some(event) => { - telemetry_data.add_event(TelemetryEvent::from_event_log( - &event, - vm_meta_data.clone(), - self.execution_mode.clone(), - self.event_name.clone(), - self.limits.version.clone(), - )); - - if telemetry_data.get_size() >= Self::MAX_MESSAGE_SIZE { - telemetry_data.remove_last_event(); - if telemetry_data.event_count() == 0 { - match serde_json::to_string(&event) { - Ok(json) => { - logger_manager::write_warn(format!( - "Event data too large. Not sending to wire-server. Event: {json}.", - )); - } - Err(_) => { - logger_manager::write_warn( - "Event data too large. Not sending to wire-server. Event cannot be displayed.".to_string() - ); - } - } - } else { - events.push(event); - } - add_more_events = false; - } - } - None => { - break; - } + match events.pop() { + Some(event) => { + let telemetry_event = TelemetryGenericLogsEvent::from_event_log( + &event, + self.execution_mode.clone(), + self.event_name.clone(), + self.limits.version.clone(), + ); + let telemetry_event = TelemetryEvent::GenericLogsEvent(telemetry_event); + event_sender::enqueue_event(telemetry_event); + queued_event = true; + } + None => { + break; } } - - Self::send_data_to_wire_server(telemetry_data, wire_server_client).await; - } - } - - async fn send_data_to_wire_server( - telemetry_data: TelemetryData, - wire_server_client: &WireServerClient, - ) { - if telemetry_data.event_count() == 0 { - return; } - for _ in [0; 5] { - match wire_server_client - .send_telemetry_data(telemetry_data.to_xml()) - .await - { - Ok(()) => { - break; - } - Err(e) => { - logger_manager::write_warn(format!( - "Failed to send telemetry data to host with error: {e}" - )); - // wait 15 seconds and retry - tokio::time::sleep(Duration::from_secs(15)).await; - } + if queued_event { + if let Err(e) = self.common_state.notify_telemetry_event().await { + logger_manager::write_warn(format!( + "Failed to notify telemetry event with error: {e}" + )); } } } @@ -456,7 +341,6 @@ impl EventReader { mod tests { use super::*; use crate::misc_helpers; - use crate::server_mock; use std::{env, fs}; #[tokio::test] @@ -468,43 +352,14 @@ mod tests { let mut events_dir = temp_dir.to_path_buf(); events_dir.push("Events"); - // start wire_server listener - let ip = "127.0.0.1"; - let port = 7071u16; - let cancellation_token = CancellationToken::new(); let common_state = CommonState::start_new(); - let wire_server_client = WireServerClient::new(ip, port); - let imds_client = ImdsClient::new(ip, port); - tokio::spawn(server_mock::start( - ip.to_string(), - port, - cancellation_token.clone(), - )); - tokio::time::sleep(Duration::from_millis(100)).await; - logger_manager::write_info("server_mock started.".to_string()); - let event_reader = EventReader::new( events_dir.clone(), - false, - cancellation_token.clone(), common_state.clone(), "Test".to_string(), "test_event_reader_thread".to_string(), ); - // refresh vm metadata - match event_reader - .update_vm_meta_data(&wire_server_client, &imds_client) - .await - { - Ok(()) => { - logger_manager::write_info("success updated the vm metadata.".to_string()); - } - Err(e) => { - logger_manager::write_warn(format!("Failed to read vm metadata with error {}.", e)); - } - } - // Write events to events dir let message = r#"{\"method\":\"GET\",\"url\":\"/machine/37569ad2-69a3-44fd-b653-813e62a177cf/68938c06%2D5233%2D4ff9%2Da173%2D0ac0a2754f8a.%5FWS2022?comp=config&type=hostingEnvironmentConfig&incarnation=2\",\"ip\":\"168.63.129.16\",\"port\":80,\"userId\":999,\"userName\":\"WS2022$\",\"processName\":\"C:\\\\WindowsAzure\\\\GuestAgent_2.7.41491.1071_2023-03-02_185502\\\\WindowsAzureGuestAgent.exe\",\"runAsElevated\":true,\"responseStatus\":\"200 OK\",\"elapsedTime\":8}"#; let mut events: Vec = Vec::new(); @@ -533,26 +388,20 @@ mod tests { .with_version("test_version".to_string()); let event_reader_with_limits = EventReader::new_with_limits( events_dir.clone(), - false, - cancellation_token.clone(), common_state.clone(), "Test".to_string(), "test_event_reader_thread".to_string(), event_reader_limits.clone(), ); // Check the events processed - let events_processed = event_reader_with_limits - .process_once(&wire_server_client) - .await; + let events_processed = event_reader_with_limits.process_once().await; logger_manager::write_info(format!("Send {} events from event files", events_processed)); //Should be 10 events processed and read into events Vector assert_eq!(events_processed, 10, "Events processed should be 10"); let files = misc_helpers::get_files(&events_dir).unwrap(); assert_eq!(1, files.len(), "Must still have 1 event file."); // test EventReader with limits - second round - let events_processed = event_reader_with_limits - .process_once(&wire_server_client) - .await; + let events_processed = event_reader_with_limits.process_once().await; logger_manager::write_info(format!("Send {} events from event files", events_processed)); //Should be 10 events processed and read into events Vector assert_eq!(events_processed, 10, "Events processed should be 10"); @@ -572,7 +421,7 @@ mod tests { assert_eq!(2, files.len(), "Must have 2 event files."); // test EventReader without limits - let events_processed = event_reader.process_once(&wire_server_client).await; + let events_processed = event_reader.process_once().await; logger_manager::write_info(format!("Send {} events from event files", events_processed)); //Should be 20 events processed and read into events Vector assert_eq!(events_processed, 20, "Events processed should be 20"); @@ -589,7 +438,7 @@ mod tests { let mut file_path = events_dir.to_path_buf(); file_path.push(format!("a{}.json", misc_helpers::get_date_time_unix_nano())); misc_helpers::json_write_to_file(&events, &file_path).unwrap(); - let events_processed = event_reader.process_once(&wire_server_client).await; + let events_processed = event_reader.process_once().await; assert_eq!(0, events_processed, "events_processed must be 0."); let files = misc_helpers::get_files(&events_dir).unwrap(); assert!( @@ -597,7 +446,7 @@ mod tests { ".notjson files should not been cleaned up." ); - cancellation_token.cancel(); + common_state.cancel_cancellation_token(); _ = fs::remove_dir_all(&temp_dir); } } diff --git a/proxy_agent_shared/src/telemetry/event_sender.rs b/proxy_agent_shared/src/telemetry/event_sender.rs new file mode 100644 index 00000000..1e1004d6 --- /dev/null +++ b/proxy_agent_shared/src/telemetry/event_sender.rs @@ -0,0 +1,218 @@ +// Copyright (c) Microsoft Corporation +// SPDX-License-Identifier: MIT + +//! This module contains the logic to send the telemetry event to the wire server. +use std::time::Duration; + +use crate::common_state::{self, CommonState}; +use crate::host_clients::imds_client::ImdsClient; +use crate::host_clients::wire_server_client::WireServerClient; +use crate::logger::logger_manager; +use crate::result::Result; +use crate::telemetry::telemetry_event::{ + TelemetryData, TelemetryEvent, TelemetryEventVMData, VmMetaData, +}; +use concurrent_queue::ConcurrentQueue; +use once_cell::sync::Lazy; + +static TELEMETRY_EVENT_QUEUE: Lazy> = + Lazy::new(|| ConcurrentQueue::::bounded(1000)); + +const MAX_MESSAGE_SIZE: usize = 1024 * 64; +const WIRE_SERVER_IP: &str = "168.63.129.16"; +const WIRE_SERVER_PORT: u16 = 80u16; +const IMDS_IP: &str = "169.254.169.254"; +const IMDS_PORT: u16 = 80u16; + +pub struct EventSender { + common_state: CommonState, +} + +impl EventSender { + pub fn new(common_state: CommonState) -> Self { + EventSender { common_state } + } + + pub async fn start(&self, server_ip: Option<&str>, server_port: Option) { + logger_manager::write_info("telemetry event sender task started.".to_string()); + let notify = match self.common_state.get_telemetry_event_notify().await { + Ok(notify) => notify, + Err(e) => { + logger_manager::write_err(format!("Failed to get notify: {e}")); + return; + } + }; + let cancellation_token = self.common_state.get_cancellation_token(); + + loop { + tokio::select! { + _ = cancellation_token.cancelled() => { + logger_manager::write_info("telemetry event sender task cancelled.".to_string()); + // Close the event queue to stop accepting new events + TELEMETRY_EVENT_QUEUE.close(); + break; + } + _ = notify.notified() => { + self.process_event_queue(server_ip, server_port).await; + } + } + } + } + + async fn process_event_queue(&self, server_ip: Option<&str>, server_port: Option) { + if TELEMETRY_EVENT_QUEUE.is_empty() { + return; + } + + let wire_server_client = WireServerClient::new( + server_ip.unwrap_or(WIRE_SERVER_IP), + server_port.unwrap_or(WIRE_SERVER_PORT), + ); + let imds_client = ImdsClient::new( + server_ip.unwrap_or(IMDS_IP), + server_port.unwrap_or(IMDS_PORT), + ); + // refresh vm metadata + match self + .update_vm_meta_data(&wire_server_client, &imds_client) + .await + { + Ok(()) => { + logger_manager::write_info("success updated the vm metadata.".to_string()); + } + Err(e) => { + logger_manager::write_warn(format!("Failed to update vm metadata with error {e}.")); + } + } + + if let Ok(Some(vm_meta_data)) = self.common_state.get_vm_meta_data().await { + let vm_data = TelemetryEventVMData::new_from_vm_meta_data(&vm_meta_data); + self.send_events(&wire_server_client, &vm_data).await + } else { + logger_manager::write_warn( + "VmMetaData is not available. Skipping sending telemetry events.".to_string(), + ); + } + } + + pub async fn update_vm_meta_data( + &self, + wire_server_client: &WireServerClient, + imds_client: &ImdsClient, + ) -> Result<()> { + let guid = self + .common_state + .get_state(common_state::SECURE_KEY_GUID.to_string()) + .await + .unwrap_or(None); + let key = self + .common_state + .get_state(common_state::SECURE_KEY_VALUE.to_string()) + .await + .unwrap_or(None); + let goal_state = wire_server_client + .get_goalstate(guid.clone(), key.clone()) + .await?; + let shared_config = wire_server_client + .get_shared_config( + goal_state.get_shared_config_uri(), + guid.clone(), + key.clone(), + ) + .await?; + + let instance_info = imds_client + .get_imds_instance_info(guid.clone(), key.clone()) + .await?; + let vm_meta_data = VmMetaData { + container_id: goal_state.get_container_id(), + role_name: shared_config.get_role_name(), + role_instance_name: shared_config.get_role_instance_name(), + tenant_name: shared_config.get_deployment_name(), + subscription_id: instance_info.get_subscription_id(), + resource_group_name: instance_info.get_resource_group_name(), + vm_id: instance_info.get_vm_id(), + image_origin: instance_info.get_image_origin(), + }; + + self.common_state + .set_vm_meta_data(Some(vm_meta_data)) + .await?; + + Ok(()) + } + + async fn send_events( + &self, + wire_server_client: &WireServerClient, + vm_data: &TelemetryEventVMData, + ) { + while !TELEMETRY_EVENT_QUEUE.close() && !TELEMETRY_EVENT_QUEUE.is_empty() { + let mut telemetry_data = TelemetryData::new_with_vm_data(vm_data.clone()); + let mut add_more_events = true; + while !TELEMETRY_EVENT_QUEUE.is_empty() && add_more_events { + match TELEMETRY_EVENT_QUEUE.pop() { + Ok(event) => { + telemetry_data.add_event(event.clone()); + + if telemetry_data.get_size() >= MAX_MESSAGE_SIZE { + _ = telemetry_data.remove_last_event(event.clone()); + if telemetry_data.event_count() == 0 { + logger_manager::write_warn(format!( + "Event data too large. Not sending to wire-server. Event: {}.", + event.to_xml_event(vm_data), + )); + } else if let Err(e) = TELEMETRY_EVENT_QUEUE.push(event) { + logger_manager::write_warn(format!( + "Failed to re-enqueue telemetry event with error: {e}" + )); + } + add_more_events = false; + } + } + Err(err) => { + logger_manager::write_warn(format!( + "Failed to pop telemetry event from queue with error: {err}" + )); + break; + } + } + } + + Self::send_data_to_wire_server(telemetry_data, wire_server_client).await; + } + } + + async fn send_data_to_wire_server( + telemetry_data: TelemetryData, + wire_server_client: &WireServerClient, + ) { + if telemetry_data.event_count() == 0 { + return; + } + + for _ in [0; 5] { + match wire_server_client + .send_telemetry_data(telemetry_data.to_xml()) + .await + { + Ok(()) => { + break; + } + Err(e) => { + logger_manager::write_warn(format!( + "Failed to send telemetry data to host with error: {e}" + )); + // wait 15 seconds and retry + tokio::time::sleep(Duration::from_secs(15)).await; + } + } + } + } +} + +pub(crate) fn enqueue_event(event: TelemetryEvent) { + if let Err(e) = TELEMETRY_EVENT_QUEUE.push(event) { + logger_manager::write_warn(format!("Failed to enqueue telemetry event with error: {e}")); + } +} diff --git a/proxy_agent_shared/src/telemetry/telemetry_event.rs b/proxy_agent_shared/src/telemetry/telemetry_event.rs index dbb06952..9e280e4b 100644 --- a/proxy_agent_shared/src/telemetry/telemetry_event.rs +++ b/proxy_agent_shared/src/telemetry/telemetry_event.rs @@ -3,26 +3,78 @@ //! This module contains the logic to generate the telemetry data to be send to wire server. -use super::event_reader::VmMetaData; -use crate::telemetry::Event; +use crate::telemetry::{Event, ExtensionStatusEvent}; use crate::{current_info, misc_helpers}; use once_cell::sync::Lazy; use serde_derive::{Deserialize, Serialize}; -/// TelemetryData struct to hold the telemetry events send to wire server. -pub struct TelemetryData { +const METRICS_PROVIDER_ID: &str = "FFF0196F-EE4C-4EAF-9AA5-776F622DEB4F"; +const STATUS_PROVIDER_ID: &str = "69B669B9-4AF8-4C50-BDC4-6006FA76E975"; + +pub struct TelemetryProvider { + pub id: String, events: Vec, } -impl Default for TelemetryData { - fn default() -> Self { - Self::new() +impl TelemetryProvider { + pub fn new(id: String) -> Self { + TelemetryProvider { + id, + events: Vec::new(), + } + } + + pub fn add_event(&mut self, event: TelemetryEvent) { + self.events.push(event); + } + + pub fn event_count(&self) -> usize { + self.events.len() + } + + pub fn remove_event(&mut self, event: TelemetryEvent) -> Option { + if let Some(pos) = self.events.iter().position(|x| *x == event) { + Some(self.events.remove(pos)) + } else { + None + } + } + + pub fn to_xml(&self, vm_data: &TelemetryEventVMData) -> String { + let mut xml: String = String::new(); + xml.push_str(&format!( + "", + misc_helpers::xml_escape(self.id.to_string()) + )); + + for e in &self.events { + match e { + TelemetryEvent::GenericLogsEvent(event) => { + xml.push_str(&event.to_xml_event(vm_data)); + } + TelemetryEvent::ExtensionEvent(event) => { + xml.push_str(&event.to_xml_event(vm_data)); + } + } + } + + xml.push_str(""); + xml } } +/// TelemetryData struct to hold the telemetry providers and their events send to wire server. +pub struct TelemetryData { + providers: Vec, + vm_data: TelemetryEventVMData, +} + impl TelemetryData { - pub fn new() -> Self { - TelemetryData { events: Vec::new() } + pub fn new_with_vm_data(vm_data: TelemetryEventVMData) -> Self { + TelemetryData { + providers: Vec::new(), + vm_data, + } } /// Convert the telemetry data to xml format. @@ -30,13 +82,13 @@ impl TelemetryData { pub fn to_xml(&self) -> String { let mut xml: String = String::new(); - xml.push_str(""); + xml.push_str(""); - for e in &self.events { - xml.push_str(&e.to_xml_event()); + for p in &self.providers { + xml.push_str(&p.to_xml(&self.vm_data)); } - xml.push_str(""); + xml.push_str(""); xml } @@ -46,104 +98,116 @@ impl TelemetryData { } pub fn add_event(&mut self, event: TelemetryEvent) { - self.events.push(event); + for provider in &mut self.providers { + match &event { + TelemetryEvent::GenericLogsEvent(_) => { + if provider.id == METRICS_PROVIDER_ID { + provider.add_event(event); + return; + } + } + TelemetryEvent::ExtensionEvent(_) => { + if provider.id == STATUS_PROVIDER_ID { + provider.add_event(event); + return; + } + } + } + } + let mut p = TelemetryProvider::new(match &event { + TelemetryEvent::GenericLogsEvent(_) => METRICS_PROVIDER_ID.to_string(), + TelemetryEvent::ExtensionEvent(_) => STATUS_PROVIDER_ID.to_string(), + }); + p.add_event(event); + self.providers.push(p); } - pub fn remove_last_event(&mut self) -> Option { - self.events.pop() + pub fn remove_last_event(&mut self, last_event: TelemetryEvent) -> Option { + for provider in &mut self.providers { + match &last_event { + TelemetryEvent::GenericLogsEvent(_) => { + if provider.id == METRICS_PROVIDER_ID { + return provider.remove_event(last_event); + } + } + TelemetryEvent::ExtensionEvent(_) => { + if provider.id == STATUS_PROVIDER_ID { + return provider.remove_event(last_event); + } + } + } + } + None } pub fn event_count(&self) -> usize { - self.events.len() + self.providers.iter().map(|p| p.event_count()).sum() } } -pub struct TelemetryEvent { - event_pid: u64, - event_tid: u64, - ga_version: String, - container_id: String, - task_name: String, - opcode_name: String, - keyword_name: String, - os_version: String, - execution_mode: String, - ram: u64, - processors: u64, - tenant_name: String, - role_name: String, - role_instance_name: String, - subscription_id: String, - resource_group_name: String, - vm_id: String, - image_origin: u64, - - event_name: String, - capability_used: String, - context1: String, - context2: String, - context3: String, +#[derive(PartialEq, Eq, Hash, Clone)] +pub enum TelemetryEvent { + GenericLogsEvent(TelemetryGenericLogsEvent), + ExtensionEvent(TelemetryExtensionEventsEvent), } impl TelemetryEvent { - pub fn from_event_log( - event_log: &Event, - vm_meta_data: VmMetaData, - execution_mode: String, - event_name: String, - ga_version: Option, - ) -> Self { - // if ga_version is provided, append event_log.version to event_name - // if ga_version is None, use event_log.Version as ga_version and keep event_name unchanged - let (ga_version, event_name) = match ga_version { - Some(version) => (version, format!("{}-{}", event_name, event_log.Version)), - None => (event_log.Version.to_string(), event_name), - }; - TelemetryEvent { - event_pid: event_log.EventPid.parse::().unwrap_or(0), - event_tid: event_log.EventTid.parse::().unwrap_or(0), - task_name: event_log.TaskName.to_string(), - opcode_name: event_log.TimeStamp.to_string(), - capability_used: event_log.EventLevel.to_string(), - context1: event_log.Message.to_string(), - context2: event_log.TimeStamp.to_string(), - context3: event_log.OperationId.to_string(), + pub fn get_provider_id(&self) -> String { + match self { + TelemetryEvent::GenericLogsEvent(_) => TelemetryGenericLogsEvent::get_provider_id(), + TelemetryEvent::ExtensionEvent(_) => TelemetryExtensionEventsEvent::get_provider_id(), + } + } - ga_version, - execution_mode, - event_name, - os_version: current_info::get_long_os_version(), + pub fn to_xml_event(&self, vm_data: &TelemetryEventVMData) -> String { + match self { + TelemetryEvent::GenericLogsEvent(event) => event.to_xml_event(vm_data), + TelemetryEvent::ExtensionEvent(event) => event.to_xml_event(vm_data), + } + } +} + +/// Base struct containing common fields shared between telemetry event types. +#[derive(PartialEq, Eq, Hash, Clone)] +pub struct TelemetryEventVMData { + pub container_id: String, + pub keyword_name: String, + pub os_version: String, + pub ram: u64, + pub processors: u64, + pub tenant_name: String, + pub role_name: String, + pub role_instance_name: String, + pub subscription_id: String, + pub resource_group_name: String, + pub vm_id: String, + pub image_origin: u64, +} + +impl TelemetryEventVMData { + pub fn new_from_vm_meta_data(vm_meta_data: &VmMetaData) -> Self { + TelemetryEventVMData { keyword_name: CURRENT_KEYWORD_NAME.to_string(), + os_version: current_info::get_long_os_version(), ram: current_info::get_ram_in_mb(), processors: current_info::get_cpu_count() as u64, - - container_id: vm_meta_data.container_id, - tenant_name: vm_meta_data.tenant_name, - role_name: vm_meta_data.role_name, - role_instance_name: vm_meta_data.role_instance_name, - subscription_id: vm_meta_data.subscription_id, - resource_group_name: vm_meta_data.resource_group_name, - vm_id: vm_meta_data.vm_id, + container_id: vm_meta_data.container_id.clone(), + tenant_name: vm_meta_data.tenant_name.clone(), + role_name: vm_meta_data.role_name.clone(), + role_instance_name: vm_meta_data.role_instance_name.clone(), + subscription_id: vm_meta_data.subscription_id.clone(), + resource_group_name: vm_meta_data.resource_group_name.clone(), + vm_id: vm_meta_data.vm_id.clone(), image_origin: vm_meta_data.image_origin, } } - - fn to_xml_event(&self) -> String { - let mut xml: String = String::new(); - xml.push_str("", - misc_helpers::xml_escape(self.opcode_name.to_string()) - )); + /// Convert the base fields to XML format. + pub fn to_xml_params(&self) -> String { + let mut xml = String::new(); xml.push_str(&format!( "", misc_helpers::xml_escape(self.keyword_name.to_string()) )); - xml.push_str(&format!( - "", - misc_helpers::xml_escape(self.task_name.to_string()) - )); xml.push_str(&format!( "", misc_helpers::xml_escape(self.tenant_name.to_string()) @@ -172,31 +236,14 @@ impl TelemetryEvent { "", misc_helpers::xml_escape(self.vm_id.to_string()) )); - xml.push_str(&format!( - "", - self.event_pid - )); - xml.push_str(&format!( - "", - self.event_tid - )); xml.push_str(&format!( "", self.image_origin )); - - xml.push_str(&format!( - "", - misc_helpers::xml_escape(self.execution_mode.to_string()) - )); xml.push_str(&format!( "", misc_helpers::xml_escape(self.os_version.to_string()) )); - xml.push_str(&format!( - "", - misc_helpers::xml_escape(self.ga_version.to_string()) - )); xml.push_str(&format!( "", self.ram @@ -205,6 +252,64 @@ impl TelemetryEvent { "", self.processors )); + xml + } +} + +#[derive(PartialEq, Eq, Hash, Clone)] +pub struct TelemetryGenericLogsEvent { + event_pid: u64, + event_tid: u64, + ga_version: String, + task_name: String, + opcode_name: String, + execution_mode: String, + + event_name: String, + capability_used: String, + context1: String, + context2: String, + context3: String, +} + +impl TelemetryGenericLogsEvent { + pub fn from_event_log( + event_log: &Event, + execution_mode: String, + event_name: String, + ga_version: Option, + ) -> Self { + // if ga_version is provided, append event_log.version to event_name + // if ga_version is None, use event_log.Version as ga_version and keep event_name unchanged + let (ga_version, event_name) = match ga_version { + Some(version) => (version, format!("{}-{}", event_name, event_log.Version)), + None => (event_log.Version.to_string(), event_name), + }; + TelemetryGenericLogsEvent { + event_name, + ga_version, + execution_mode, + event_pid: event_log.EventPid.parse::().unwrap_or(0), + event_tid: event_log.EventTid.parse::().unwrap_or(0), + task_name: event_log.TaskName.to_string(), + opcode_name: event_log.TimeStamp.to_string(), + capability_used: event_log.EventLevel.to_string(), + context1: event_log.Message.to_string(), + context2: event_log.TimeStamp.to_string(), + context3: event_log.OperationId.to_string(), + } + } + + pub fn get_provider_id() -> String { + METRICS_PROVIDER_ID.to_string() + } + + fn to_xml_event(&self, vm_data: &TelemetryEventVMData) -> String { + let mut xml: String = String::new(); + // Event ID 7 is for Generic Logs Events + xml.push_str("", @@ -232,6 +337,103 @@ impl TelemetryEvent { } } +#[derive(PartialEq, Eq, Hash, Clone)] +pub struct TelemetryExtensionEventsEvent { + event_pid: u64, + event_tid: u64, + ga_version: String, + task_name: String, + opcode_name: String, + execution_mode: String, + + extension_type: String, + is_internal: bool, + name: String, + version: String, + operation: String, + operation_success: bool, + message: String, + duration: u64, +} + +impl TelemetryExtensionEventsEvent { + pub fn from_extension_status_event( + event: &ExtensionStatusEvent, + execution_mode: String, + ga_version: String, + ) -> Self { + TelemetryExtensionEventsEvent { + ga_version, + execution_mode, + event_pid: event.event_pid.parse::().unwrap_or(0), + event_tid: event.event_tid.parse::().unwrap_or(0), + opcode_name: event.time_stamp.to_string(), + extension_type: event.extension.extension_type.to_string(), + is_internal: event.extension.is_internal, + name: event.extension.name.to_string(), + version: event.extension.version.to_string(), + operation: event.operation_status.operation.to_string(), + task_name: event.operation_status.task_name.to_string(), + operation_success: event.operation_status.operation_success, + message: event.operation_status.message.to_string(), + duration: event.operation_status.duration as u64, + } + } + + pub fn get_provider_id() -> String { + STATUS_PROVIDER_ID.to_string() + } + + fn to_xml_event(&self, vm_data: &TelemetryEventVMData) -> String { + let mut xml: String = String::new(); + // Event ID 1 is for Extension Events + xml.push_str("", + misc_helpers::xml_escape(self.extension_type.to_string()) + )); + xml.push_str(&format!( + "", + if self.is_internal { "True" } else { "False" } + )); + xml.push_str(&format!( + "", + misc_helpers::xml_escape(self.name.to_string()) + )); + xml.push_str(&format!( + "", + misc_helpers::xml_escape(self.version.to_string()) + )); + xml.push_str(&format!( + "", + misc_helpers::xml_escape(self.operation.to_string()) + )); + xml.push_str(&format!( + "", + if self.operation_success { + "True" + } else { + "False" + } + )); + xml.push_str(&format!( + "", + misc_helpers::xml_escape(self.message.to_string()) + )); + xml.push_str(&format!( + "", + self.duration + )); + + xml.push_str("]]>"); + xml + } +} + static CURRENT_KEYWORD_NAME: Lazy = Lazy::new(|| KeywordName::new(current_info::get_cpu_arch()).to_json()); @@ -252,3 +454,38 @@ impl KeywordName { serde_json::to_string(self).unwrap_or_else(|_| "".to_owned()) } } + +/// VmMetaData contains the metadata of the VM. +/// The metadata is used to identify the VM and the image origin. +/// It will be part of the telemetry data send to the wire server. +/// The metadata is updated by the wire server and the IMDS client. +#[derive(Clone, Debug)] +pub struct VmMetaData { + pub container_id: String, + pub tenant_name: String, + pub role_name: String, + pub role_instance_name: String, + pub subscription_id: String, + pub resource_group_name: String, + pub vm_id: String, + pub image_origin: u64, +} + +#[cfg(test)] +const EMPTY_GUID: &str = "00000000-0000-0000-0000-000000000000"; + +impl VmMetaData { + #[cfg(test)] + pub fn empty() -> Self { + VmMetaData { + container_id: EMPTY_GUID.to_string(), + tenant_name: EMPTY_GUID.to_string(), + role_name: EMPTY_GUID.to_string(), + role_instance_name: EMPTY_GUID.to_string(), + subscription_id: EMPTY_GUID.to_string(), + resource_group_name: EMPTY_GUID.to_string(), + vm_id: EMPTY_GUID.to_string(), + image_origin: 3, // unknown + } + } +}