diff --git a/src/main.rs b/src/main.rs index 5f003e3..7bd2d82 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,7 +20,7 @@ mod otlp; mod runtime; mod services; -use std::{fs, thread, time::Duration}; +use std::{fs, sync::LazyLock, thread, time::Duration}; use anyhow::{Error, Result, bail}; use args::Args; @@ -35,11 +35,14 @@ use opentelemetry_otlp::ExportConfig; use psh_proto::HeartbeatReq; use runtime::{Task, TaskRuntime}; use services::rpc::RpcClient; -use tokio::try_join; +use tokio::{runtime::Runtime, try_join}; #[global_allocator] static GLOBAL: MiMalloc = mimalloc::MiMalloc; +static TOKIO_RUNTIME: LazyLock = + LazyLock::new(|| Runtime::new().expect("Create tokio runtime failed")); + fn main() -> Result<()> { log_init(); @@ -93,9 +96,8 @@ fn main() -> Result<()> { }; thread::spawn(move || -> Result<()> { - let rt = tokio::runtime::Runtime::new()?; let tasks = async_tasks(cfg.remote, task_rt); - rt.block_on(tasks)?; + TOKIO_RUNTIME.block_on(tasks)?; Ok(()) }) .join() @@ -183,7 +185,7 @@ async fn async_tasks(remote_cfg: RemoteConfig, mut task_rt: TaskRuntime) -> Resu export_conf, )?; - otlp.otlp_tasks().await?; + otlp.otlp_tasks().await; Ok::<(), Error>(()) }; diff --git a/src/otlp/gauges/cpu.rs b/src/otlp/gauges/cpu.rs index d0f3e8d..f7155e6 100644 --- a/src/otlp/gauges/cpu.rs +++ b/src/otlp/gauges/cpu.rs @@ -16,7 +16,7 @@ use opentelemetry::{KeyValue, metrics::ObservableGauge}; use psh_system::cpu::CpuHandle; impl super::super::Otlp { - pub fn cpu_gauges(&self) -> anyhow::Result> { + pub fn cpu_gauges(&self) -> ObservableGauge { let cpu = CpuHandle::new(); let host = self.host.clone(); let interval = self.interval; @@ -106,26 +106,6 @@ impl super::super::Otlp { gauge.observe(m, a); }); - macro_rules! gauges { - ($($item:ident,)+) => { - [ - $(( - cpu_time.$item(), - [ - KeyValue::new("cpu", cpu as i64), - KeyValue::new("stat", stringify!($item)), - KeyValue::new("desc", desc), - ], - ),)* - ] - }; - } - let gauges = gauges![user_ms, nice_ms, system_ms, idle_ms,]; - gauges.into_iter().for_each(|(m, [kv1, kv2, kv3])| { - let a = &[KeyValue::new("host", host.clone()), kv1, kv2, kv3]; - gauge.observe(m, a); - }); - macro_rules! gauges { ($($item_o:ident,)+) => { [ @@ -145,36 +125,9 @@ impl super::super::Otlp { let a = &[KeyValue::new("host", host.clone()), kv1, kv2, kv3]; gauge.observe(m, a); }); - - macro_rules! gauges { - ($($item_o:ident,)+) => { - [ - $(( - cpu_time.$item_o().unwrap_or(0), - [ - KeyValue::new("cpu", cpu as i64), - KeyValue::new("stat", stringify!($item_o)), - KeyValue::new("desc", desc), - ], - ),)* - ] - }; - } - let gauges = gauges![ - iowait_ms, - irq_ms, - softirq_ms, - steal_ms, - guest_ms, - guest_nice_ms, - ]; - gauges.into_iter().for_each(|(m, [kv1, kv2, kv3])| { - let a = &[KeyValue::new("host", host.clone()), kv1, kv2, kv3]; - gauge.observe(m, a); - }); } }) .build(); - Ok(gauge) + gauge } } diff --git a/src/otlp/gauges/disk.rs b/src/otlp/gauges/disk.rs index 590f72a..34e5b2f 100644 --- a/src/otlp/gauges/disk.rs +++ b/src/otlp/gauges/disk.rs @@ -16,7 +16,7 @@ use opentelemetry::{KeyValue, metrics::ObservableGauge}; use psh_system::disk::DiskHandle; impl super::super::Otlp { - pub fn disk_gagues(&self) -> anyhow::Result> { + pub fn disk_gagues(&self) -> ObservableGauge { let host = self.host.clone(); let interval = self.interval; let disk = DiskHandle::new(); @@ -87,6 +87,6 @@ impl super::super::Otlp { } }) .build(); - Ok(gauge) + gauge } } diff --git a/src/otlp/gauges/gpu.rs b/src/otlp/gauges/gpu.rs index d01096f..5a9945b 100644 --- a/src/otlp/gauges/gpu.rs +++ b/src/otlp/gauges/gpu.rs @@ -16,7 +16,7 @@ use opentelemetry::{KeyValue, metrics::ObservableGauge}; use psh_system::gpu::NvidiaHandle; impl super::super::Otlp { - pub fn gpu_gauges(&self) -> anyhow::Result> { + pub fn gpu_gauges(&self) -> ObservableGauge { let host = self.host.clone(); let interval = self.interval; let nvgpu = NvidiaHandle::new(); @@ -69,6 +69,6 @@ impl super::super::Otlp { } }) .build(); - Ok(gauge) + gauge } } diff --git a/src/otlp/gauges/interrupt.rs b/src/otlp/gauges/interrupt.rs index 16d506c..1dfd1ae 100644 --- a/src/otlp/gauges/interrupt.rs +++ b/src/otlp/gauges/interrupt.rs @@ -18,7 +18,7 @@ use opentelemetry::{KeyValue, metrics::ObservableGauge}; use psh_system::interrupt::InterruptHandle; impl super::super::Otlp { - pub fn irq_gauges(&self) -> anyhow::Result> { + pub fn irq_gauges(&self) -> ObservableGauge { let host = self.host.clone(); let interval = self.interval; let interrupt = InterruptHandle::new(); @@ -46,6 +46,6 @@ impl super::super::Otlp { } }) .build(); - Ok(gauge) + gauge } } diff --git a/src/otlp/gauges/memory.rs b/src/otlp/gauges/memory.rs index 0c33e6e..c736c32 100644 --- a/src/otlp/gauges/memory.rs +++ b/src/otlp/gauges/memory.rs @@ -16,7 +16,7 @@ use opentelemetry::{KeyValue, metrics::ObservableGauge}; use psh_system::memory::MemoryHandle; impl super::super::Otlp { - pub fn mem_gauges(&self) -> anyhow::Result> { + pub fn mem_gauges(&self) -> ObservableGauge { let interval = self.interval; let host = self.host.clone(); let memory = MemoryHandle::new(); @@ -63,14 +63,14 @@ impl super::super::Otlp { }); macro_rules! gauges { - ($($stat:ident,)+) => { - [ - $( - (mem.$stat.unwrap_or(0), KeyValue::new("stat", stringify!($stat))), - )* - ] - }; - } + ($($stat:ident,)+) => { + [ + $( + (mem.$stat.unwrap_or(0), KeyValue::new("stat", stringify!($stat))), + )* + ] + }; + } let gauges = gauges![ cma_total, cma_free, @@ -118,6 +118,6 @@ impl super::super::Otlp { }) }) .build(); - Ok(gauge) + gauge } } diff --git a/src/otlp/gauges/network.rs b/src/otlp/gauges/network.rs index 2243201..7d092b6 100644 --- a/src/otlp/gauges/network.rs +++ b/src/otlp/gauges/network.rs @@ -16,7 +16,7 @@ use opentelemetry::{KeyValue, metrics::ObservableGauge}; use psh_system::network::NetworkHandle; impl super::super::Otlp { - pub fn net_gauges(&self) -> anyhow::Result> { + pub fn net_gauges(&self) -> ObservableGauge { let interval = self.interval; let host = self.host.clone(); let network = NetworkHandle::new(); @@ -77,6 +77,6 @@ impl super::super::Otlp { } }) .build(); - Ok(gauge) + gauge } } diff --git a/src/otlp/gauges/rps.rs b/src/otlp/gauges/rps.rs index 28ac45c..802429a 100644 --- a/src/otlp/gauges/rps.rs +++ b/src/otlp/gauges/rps.rs @@ -16,7 +16,7 @@ use opentelemetry::{Array, KeyValue, Value, metrics::ObservableGauge}; use psh_system::rps::RpsHandle; impl super::super::Otlp { - pub fn rps_gauges(&self) -> anyhow::Result> { + pub fn rps_gauges(&self) -> ObservableGauge { let host = self.host.clone(); let rps = RpsHandle::new(); @@ -50,6 +50,6 @@ impl super::super::Otlp { } }) .build(); - Ok(gauge) + gauge } } diff --git a/src/otlp/gauges/vmstat.rs b/src/otlp/gauges/vmstat.rs index 1d06dbf..5ffcac5 100644 --- a/src/otlp/gauges/vmstat.rs +++ b/src/otlp/gauges/vmstat.rs @@ -16,7 +16,7 @@ use opentelemetry::{KeyValue, metrics::ObservableGauge}; use psh_system::vmstat::VmstatHandle; impl super::super::Otlp { - pub fn vmstat_gauges(&self) -> anyhow::Result> { + pub fn vmstat_gauges(&self) -> ObservableGauge { let host = self.host.clone(); let interval = self.interval; let vmstat = VmstatHandle::new(); @@ -41,6 +41,6 @@ impl super::super::Otlp { } }) .build(); - Ok(gauge) + gauge } } diff --git a/src/otlp/mod.rs b/src/otlp/mod.rs index 41ccd61..a9261be 100644 --- a/src/otlp/mod.rs +++ b/src/otlp/mod.rs @@ -44,7 +44,7 @@ pub struct Otlp { impl Otlp { pub fn new(token: String, interval: Duration, export_config: ExportConfig) -> Result { - let provider = meter_provider(export_config, &token, interval)?; + let provider = Self::meter_provider(export_config, &token, interval)?; let host = nix::unistd::gethostname() .ok() .map(|v| v.to_string_lossy().to_string()) @@ -69,64 +69,48 @@ impl Otlp { speed } - pub async fn otlp_tasks(&self) -> anyhow::Result<()> { + pub async fn otlp_tasks(&self) { let interval = self.interval; - if let Err(e) = self.mem_gauges() { - tracing::error!("Otlp memory: {e}") - } - if let Err(e) = self.net_gauges() { - tracing::error!("Otlp network: {e}") - } - if let Err(e) = self.disk_gagues() { - tracing::error!("Otlp disk: {e}") - } - if let Err(e) = self.irq_gauges() { - tracing::error!("Otlp interrupt: {e}") - } - if let Err(e) = self.cpu_gauges() { - tracing::error!("Otlp cpu: {e}") - } - if let Err(e) = self.rps_gauges() { - tracing::error!("Otlp rps: {e}") - } - if let Err(e) = self.vmstat_gauges() { - tracing::error!("Otlp vmstat: {e}") - } - if let Err(e) = self.gpu_gauges() { - tracing::error!("Otlp gpu: {e}") - } + self.mem_gauges(); + self.net_gauges(); + self.disk_gagues(); + self.irq_gauges(); + self.cpu_gauges(); + self.rps_gauges(); + self.vmstat_gauges(); + self.gpu_gauges(); loop { tokio::time::sleep(interval).await; } } -} -fn meter_provider( - export_config: ExportConfig, - token: &str, - interval: Duration, -) -> Result { - let mut meta = MetadataMap::new(); - meta.insert("authorization", format!("Bearer {}", token).parse()?); - let otlp_exporter = MetricExporter::builder() - .with_tonic() - .with_tls_config(ClientTlsConfig::new().with_native_roots()) - .with_metadata(meta) - .with_timeout(Duration::from_secs(10)) - .with_export_config(export_config) - .build()?; - let reader = PeriodicReader::builder(otlp_exporter) - .with_interval(interval) - .build(); - let resource = Resource::builder() - .with_attribute(KeyValue::new("service.name", "PSH")) - .build(); - let a = SdkMeterProvider::builder() - .with_reader(reader) - .with_resource(resource) - .build(); + fn meter_provider( + export_config: ExportConfig, + token: &str, + interval: Duration, + ) -> Result { + let mut meta = MetadataMap::new(); + meta.insert("authorization", format!("Bearer {}", token).parse()?); + let otlp_exporter = MetricExporter::builder() + .with_tonic() + .with_tls_config(ClientTlsConfig::new().with_native_roots()) + .with_metadata(meta) + .with_timeout(Duration::from_secs(10)) + .with_export_config(export_config) + .build()?; + let reader = PeriodicReader::builder(otlp_exporter) + .with_interval(interval) + .build(); + let resource = Resource::builder() + .with_attribute(KeyValue::new("service.name", "PSH")) + .build(); + let a = SdkMeterProvider::builder() + .with_reader(reader) + .with_resource(resource) + .build(); - Ok(a) + Ok(a) + } } diff --git a/src/runtime/data_export.rs b/src/runtime/data_export.rs index 99c7c0f..03abfab 100644 --- a/src/runtime/data_export.rs +++ b/src/runtime/data_export.rs @@ -27,10 +27,9 @@ use profiling::data_export::{ }; use prost::Message; use psh_proto::{Data, DataType, ExportDataReq}; -use tokio::runtime::Runtime; use wasmtime::component::Linker; -use crate::services::rpc::RpcClient; +use crate::{TOKIO_RUNTIME, services::rpc::RpcClient}; wasmtime::component::bindgen!({ path: "psh-sdk-wit/wit/deps/data-export", @@ -82,7 +81,6 @@ impl DataExporter { let bytes_len = Arc::clone(&bytes_len); let task_id = task_id.clone(); move || { - let rt = Runtime::new().expect("Failed to init exporter runtime"); let mut data = Vec::new(); loop { match data_queue.pop() { @@ -99,7 +97,7 @@ impl DataExporter { }; let mut rpc_client = rpc_client.clone(); - rt.block_on(async move { + TOKIO_RUNTIME.block_on(async move { let _ = rpc_client.export_data(merged).await; }); data.clear();