Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions proxy_agent/src/provision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{proxy_agent_status, redirector};
use proxy_agent_shared::logger::LoggerLevel;
use proxy_agent_shared::telemetry::event_logger;
use proxy_agent_shared::telemetry::event_reader::EventReader;
use proxy_agent_shared::telemetry::event_sender::EventSender;
use proxy_agent_shared::{misc_helpers, proxy_agent_aggregate_status};
use std::path::PathBuf;
use std::time::Duration;
Expand Down Expand Up @@ -370,18 +371,23 @@ pub async fn start_event_threads(event_threads_shared_state: EventThreadsSharedS
tokio::spawn({
let event_reader = EventReader::new(
config::get_events_dir(),
true,
event_threads_shared_state.cancellation_token.clone(),
event_threads_shared_state.common_state.clone(),
"ProxyAgent".to_string(),
"MicrosoftAzureGuestProxyAgent".to_string(),
);
async move {
event_reader
.start(Some(Duration::from_secs(300)), None, None)
.start(true, Some(Duration::from_secs(300)))
.await;
}
});
tokio::spawn({
let event_sender = EventSender::new(event_threads_shared_state.common_state.clone());
async move {
event_sender.start(None, None).await;
}
});

if let Err(e) = event_threads_shared_state
.provision_shared_state
.set_event_log_threads_initialized()
Expand Down
2 changes: 1 addition & 1 deletion proxy_agent_shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ serde_json = "1.0.91" # json Deserializer
serde-xml-rs = "0.8.1" # xml Deserializer with xml attribute
regex = "1.11" # match file name
thiserror = "1.0.64"
tokio = { version = "1", features = ["rt", "macros", "sync", "time"] }
tokio = { version = "1", features = ["rt", "macros", "net", "sync", "time"] }
tokio-util = "0.7.11"
log = { version = "0.4.26", features = ["std"] }
ctor = "0.3.6" # used for test setup and clean up
Expand Down
67 changes: 59 additions & 8 deletions proxy_agent_shared/src/common_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
//! This module contains the logic to get and update common states.

use crate::result::Result;
use crate::{error::Error, logger::logger_manager, telemetry::event_reader::VmMetaData};
use tokio::sync::{mpsc, oneshot};
use crate::{error::Error, logger::logger_manager, telemetry::telemetry_event::VmMetaData};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, Notify};
use tokio_util::sync::CancellationToken;

pub const SECURE_KEY_GUID: &str = "key_guid";
pub const SECURE_KEY_VALUE: &str = "key_value";
Expand All @@ -27,10 +29,17 @@ enum CommonStateAction {
key: String,
response: oneshot::Sender<Option<String>>,
},
GetTelemetryEventNotify {
response: oneshot::Sender<Arc<Notify>>,
},
}

#[derive(Clone, Debug)]
pub struct CommonState(mpsc::Sender<CommonStateAction>);
pub struct CommonState {
/// The cancellation token is used to cancel the agent when the agent is stopped
cancellation_token: CancellationToken,
sender: mpsc::Sender<CommonStateAction>,
}

impl CommonState {
pub fn start_new() -> Self {
Expand All @@ -39,6 +48,8 @@ impl CommonState {
let mut vm_meta_data: Option<VmMetaData> = None;
let mut states: std::collections::HashMap<String, String> =
std::collections::HashMap::new();
let telemetry_event_notify = Arc::new(Notify::new());

loop {
match receiver.recv().await {
Some(CommonStateAction::SetVmMetaData {
Expand Down Expand Up @@ -79,19 +90,29 @@ impl CommonState {
));
}
}
Some(CommonStateAction::GetTelemetryEventNotify { response }) => {
if let Err(notify) = response.send(telemetry_event_notify.clone()) {
logger_manager::write_warn(format!(
"Failed to send response to CommonStateAction::GetTelemetryEventNotify '{notify:?}'"
));
}
}
None => {
break;
}
}
}
});

Self(sender)
Self {
cancellation_token: CancellationToken::new(),
sender,
}
}

pub async fn set_vm_meta_data(&self, vm_meta_data: Option<VmMetaData>) -> Result<()> {
let (response, receiver) = oneshot::channel();
self.0
self.sender
.send(CommonStateAction::SetVmMetaData {
vm_meta_data,
response,
Expand All @@ -110,7 +131,7 @@ impl CommonState {

pub async fn get_vm_meta_data(&self) -> Result<Option<VmMetaData>> {
let (response, receiver) = oneshot::channel();
self.0
self.sender
.send(CommonStateAction::GetVmMetaData { response })
.await
.map_err(|e| {
Expand All @@ -126,7 +147,7 @@ impl CommonState {

pub async fn set_state(&self, key: String, value: String) -> Result<()> {
let (response, receiver) = oneshot::channel();
self.0
self.sender
.send(CommonStateAction::SetState {
key,
value,
Expand All @@ -143,7 +164,7 @@ impl CommonState {

pub async fn get_state(&self, key: String) -> Result<Option<String>> {
let (response, receiver) = oneshot::channel();
self.0
self.sender
.send(CommonStateAction::GetState { key, response })
.await
.map_err(|e| {
Expand All @@ -153,4 +174,34 @@ impl CommonState {
.await
.map_err(|e| Error::RecvError("CommonStateAction::GetState".to_string(), e))
}

pub async fn get_telemetry_event_notify(&self) -> Result<Arc<Notify>> {
let (response, receiver) = oneshot::channel();
self.sender
.send(CommonStateAction::GetTelemetryEventNotify { response })
.await
.map_err(|e| {
Error::SendError(
"CommonStateAction::GetTelemetryEventNotify".to_string(),
e.to_string(),
)
})?;
receiver.await.map_err(|e| {
Error::RecvError("CommonStateAction::GetTelemetryEventNotify".to_string(), e)
})
}

pub async fn notify_telemetry_event(&self) -> Result<()> {
let notify = self.get_telemetry_event_notify().await?;
notify.notify_one();
Ok(())
}

pub fn get_cancellation_token(&self) -> CancellationToken {
self.cancellation_token.clone()
}

pub fn cancel_cancellation_token(&self) {
self.cancellation_token.cancel();
}
}
6 changes: 6 additions & 0 deletions proxy_agent_shared/src/current_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ static CURRENT_OS_INFO: Lazy<(String, String)> = Lazy::new(|| {
(arch, os)
});

static CURRENT_EXE_VERSION: Lazy<String> = Lazy::new(misc_helpers::get_current_exe_version);

pub fn get_ram_in_mb() -> u64 {
CURRENT_SYS_INFO.0
}
Expand All @@ -61,6 +63,10 @@ pub fn get_long_os_version() -> String {
CURRENT_OS_INFO.1.to_string()
}

pub fn get_current_exe_version() -> String {
CURRENT_EXE_VERSION.clone()
}

#[cfg(test)]
mod tests {
#[test]
Expand Down
91 changes: 89 additions & 2 deletions proxy_agent_shared/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,23 @@
// SPDX-License-Identifier: MIT
pub mod event_logger;
pub mod event_reader;
pub mod event_sender;
pub mod span;
pub mod telemetry_event;

use crate::misc_helpers;
use crate::{current_info, misc_helpers};
use serde_derive::{Deserialize, Serialize};

pub const GENERIC_EVENT_FILE_SEARCH_PATTERN: &str = r"^[0-9]+\.json$";
pub const EXTENSION_EVENT_FILE_SEARCH_PATTERN: &str = r"^extension_[0-9]+\.json$";
pub const MAX_EXTENSION_EVENT_FILE_COUNT: usize = 1000;
pub fn new_extension_event_file_name() -> String {
format!("extension_{}.json", misc_helpers::get_date_time_unix_nano())
}
pub fn new_generic_event_file_name() -> String {
format!("{}.json", misc_helpers::get_date_time_unix_nano())
}

#[derive(Serialize, Deserialize)]
#[allow(non_snake_case)]
pub struct Event {
Expand All @@ -26,7 +37,7 @@ impl Event {
Event {
EventLevel: level,
Message: message,
Version: misc_helpers::get_current_exe_version(),
Version: current_info::get_current_exe_version(),
TaskName: task_name,
EventPid: std::process::id().to_string(),
EventTid: misc_helpers::get_thread_identity(),
Expand All @@ -36,6 +47,53 @@ impl Event {
}
}

#[derive(Serialize, Deserialize, Clone)]
pub struct Extension {
pub name: String,
pub version: String,
pub is_internal: bool,
pub extension_type: String,
}

#[derive(Serialize, Deserialize, Clone)]
pub struct OperationStatus {
pub operation_success: bool,
pub operation: String,
pub task_name: String,
pub message: String,
pub duration: i64,
}

#[derive(Serialize, Deserialize, Clone)]
pub struct ExtensionStatusEvent {
pub extension: Extension,
pub operation_status: OperationStatus,

pub event_pid: String,
pub event_tid: String,
pub time_stamp: String,
}

impl ExtensionStatusEvent {
/// Create a new ExtensionStatusEvent
/// Rust does not recommend using too many arguments in a function,
/// so we use structs to group related arguments together.
/// # Arguments
/// * `extension` - The extension information
/// * `operation_status` - The operation status information
/// # Returns
/// A new instance of `ExtensionStatusEvent`
pub fn new(extension: Extension, operation_status: OperationStatus) -> Self {
ExtensionStatusEvent {
extension,
operation_status,
event_pid: std::process::id().to_string(),
event_tid: misc_helpers::get_thread_identity(),
time_stamp: misc_helpers::get_date_time_string_with_milliseconds(),
}
}
}

#[cfg(test)]
mod tests {
#[test]
Expand All @@ -51,4 +109,33 @@ mod tests {
assert_eq!(event.TaskName, "test task name".to_string());
assert_eq!(event.OperationId, "test operation id".to_string());
}

#[test]
fn test_extension_status_event_new() {
let extension = super::Extension {
name: "test extension".to_string(),
version: "1.0.0".to_string(),
is_internal: true,
extension_type: "test type".to_string(),
};
let operation_status = super::OperationStatus {
operation_success: true,
task_name: "test task".to_string(),
operation: "test operation".to_string(),
message: "test message".to_string(),
duration: 100,
};
let event = super::ExtensionStatusEvent::new(extension.clone(), operation_status.clone());
assert_eq!(event.extension.name, extension.name);
assert_eq!(event.extension.version, extension.version);
assert_eq!(event.extension.is_internal, extension.is_internal);
assert_eq!(event.extension.extension_type, extension.extension_type);
assert_eq!(
event.operation_status.operation_success,
operation_status.operation_success
);
assert_eq!(event.operation_status.operation, operation_status.operation);
assert_eq!(event.operation_status.message, operation_status.message);
assert_eq!(event.operation_status.duration, operation_status.duration);
}
}
Loading