Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mod otlp;
mod runtime;
mod services;

use std::{fs, thread, time::Duration};
use std::{fs, sync::LazyLock, thread, time::Duration};

use anyhow::{Error, Result, bail};
use args::Args;
Expand All @@ -35,11 +35,14 @@ use opentelemetry_otlp::ExportConfig;
use psh_proto::HeartbeatReq;
use runtime::{Task, TaskRuntime};
use services::rpc::RpcClient;
use tokio::try_join;
use tokio::{runtime::Runtime, try_join};

#[global_allocator]
static GLOBAL: MiMalloc = mimalloc::MiMalloc;

static TOKIO_RUNTIME: LazyLock<Runtime> =
LazyLock::new(|| Runtime::new().expect("Create tokio runtime failed"));

fn main() -> Result<()> {
log_init();

Expand Down Expand Up @@ -93,9 +96,8 @@ fn main() -> Result<()> {
};

thread::spawn(move || -> Result<()> {
let rt = tokio::runtime::Runtime::new()?;
let tasks = async_tasks(cfg.remote, task_rt);
rt.block_on(tasks)?;
TOKIO_RUNTIME.block_on(tasks)?;
Ok(())
})
.join()
Expand Down Expand Up @@ -183,7 +185,7 @@ async fn async_tasks(remote_cfg: RemoteConfig, mut task_rt: TaskRuntime) -> Resu
export_conf,
)?;

otlp.otlp_tasks().await?;
otlp.otlp_tasks().await;
Ok::<(), Error>(())
};

Expand Down
51 changes: 2 additions & 49 deletions src/otlp/gauges/cpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use opentelemetry::{KeyValue, metrics::ObservableGauge};
use psh_system::cpu::CpuHandle;

impl super::super::Otlp {
pub fn cpu_gauges(&self) -> anyhow::Result<ObservableGauge<u64>> {
pub fn cpu_gauges(&self) -> ObservableGauge<u64> {
let cpu = CpuHandle::new();
let host = self.host.clone();
let interval = self.interval;
Expand Down Expand Up @@ -106,26 +106,6 @@ impl super::super::Otlp {
gauge.observe(m, a);
});

macro_rules! gauges {
($($item:ident,)+) => {
[
$((
cpu_time.$item(),
[
KeyValue::new("cpu", cpu as i64),
KeyValue::new("stat", stringify!($item)),
KeyValue::new("desc", desc),
],
),)*
]
};
}
let gauges = gauges![user_ms, nice_ms, system_ms, idle_ms,];
gauges.into_iter().for_each(|(m, [kv1, kv2, kv3])| {
let a = &[KeyValue::new("host", host.clone()), kv1, kv2, kv3];
gauge.observe(m, a);
});

macro_rules! gauges {
($($item_o:ident,)+) => {
[
Expand All @@ -145,36 +125,9 @@ impl super::super::Otlp {
let a = &[KeyValue::new("host", host.clone()), kv1, kv2, kv3];
gauge.observe(m, a);
});

macro_rules! gauges {
($($item_o:ident,)+) => {
[
$((
cpu_time.$item_o().unwrap_or(0),
[
KeyValue::new("cpu", cpu as i64),
KeyValue::new("stat", stringify!($item_o)),
KeyValue::new("desc", desc),
],
),)*
]
};
}
let gauges = gauges![
iowait_ms,
irq_ms,
softirq_ms,
steal_ms,
guest_ms,
guest_nice_ms,
];
gauges.into_iter().for_each(|(m, [kv1, kv2, kv3])| {
let a = &[KeyValue::new("host", host.clone()), kv1, kv2, kv3];
gauge.observe(m, a);
});
}
})
.build();
Ok(gauge)
gauge
}
}
4 changes: 2 additions & 2 deletions src/otlp/gauges/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use opentelemetry::{KeyValue, metrics::ObservableGauge};
use psh_system::disk::DiskHandle;

impl super::super::Otlp {
pub fn disk_gagues(&self) -> anyhow::Result<ObservableGauge<u64>> {
pub fn disk_gagues(&self) -> ObservableGauge<u64> {
let host = self.host.clone();
let interval = self.interval;
let disk = DiskHandle::new();
Expand Down Expand Up @@ -87,6 +87,6 @@ impl super::super::Otlp {
}
})
.build();
Ok(gauge)
gauge
}
}
4 changes: 2 additions & 2 deletions src/otlp/gauges/gpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use opentelemetry::{KeyValue, metrics::ObservableGauge};
use psh_system::gpu::NvidiaHandle;

impl super::super::Otlp {
pub fn gpu_gauges(&self) -> anyhow::Result<ObservableGauge<u64>> {
pub fn gpu_gauges(&self) -> ObservableGauge<u64> {
let host = self.host.clone();
let interval = self.interval;
let nvgpu = NvidiaHandle::new();
Expand Down Expand Up @@ -69,6 +69,6 @@ impl super::super::Otlp {
}
})
.build();
Ok(gauge)
gauge
}
}
4 changes: 2 additions & 2 deletions src/otlp/gauges/interrupt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use opentelemetry::{KeyValue, metrics::ObservableGauge};
use psh_system::interrupt::InterruptHandle;

impl super::super::Otlp {
pub fn irq_gauges(&self) -> anyhow::Result<ObservableGauge<u64>> {
pub fn irq_gauges(&self) -> ObservableGauge<u64> {
let host = self.host.clone();
let interval = self.interval;
let interrupt = InterruptHandle::new();
Expand Down Expand Up @@ -46,6 +46,6 @@ impl super::super::Otlp {
}
})
.build();
Ok(gauge)
gauge
}
}
20 changes: 10 additions & 10 deletions src/otlp/gauges/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use opentelemetry::{KeyValue, metrics::ObservableGauge};
use psh_system::memory::MemoryHandle;

impl super::super::Otlp {
pub fn mem_gauges(&self) -> anyhow::Result<ObservableGauge<u64>> {
pub fn mem_gauges(&self) -> ObservableGauge<u64> {
let interval = self.interval;
let host = self.host.clone();
let memory = MemoryHandle::new();
Expand Down Expand Up @@ -63,14 +63,14 @@ impl super::super::Otlp {
});

macro_rules! gauges {
($($stat:ident,)+) => {
[
$(
(mem.$stat.unwrap_or(0), KeyValue::new("stat", stringify!($stat))),
)*
]
};
}
($($stat:ident,)+) => {
[
$(
(mem.$stat.unwrap_or(0), KeyValue::new("stat", stringify!($stat))),
)*
]
};
}
let gauges = gauges![
cma_total,
cma_free,
Expand Down Expand Up @@ -118,6 +118,6 @@ impl super::super::Otlp {
})
})
.build();
Ok(gauge)
gauge
}
}
4 changes: 2 additions & 2 deletions src/otlp/gauges/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use opentelemetry::{KeyValue, metrics::ObservableGauge};
use psh_system::network::NetworkHandle;

impl super::super::Otlp {
pub fn net_gauges(&self) -> anyhow::Result<ObservableGauge<u64>> {
pub fn net_gauges(&self) -> ObservableGauge<u64> {
let interval = self.interval;
let host = self.host.clone();
let network = NetworkHandle::new();
Expand Down Expand Up @@ -77,6 +77,6 @@ impl super::super::Otlp {
}
})
.build();
Ok(gauge)
gauge
}
}
4 changes: 2 additions & 2 deletions src/otlp/gauges/rps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use opentelemetry::{Array, KeyValue, Value, metrics::ObservableGauge};
use psh_system::rps::RpsHandle;

impl super::super::Otlp {
pub fn rps_gauges(&self) -> anyhow::Result<ObservableGauge<u64>> {
pub fn rps_gauges(&self) -> ObservableGauge<u64> {
let host = self.host.clone();
let rps = RpsHandle::new();

Expand Down Expand Up @@ -50,6 +50,6 @@ impl super::super::Otlp {
}
})
.build();
Ok(gauge)
gauge
}
}
4 changes: 2 additions & 2 deletions src/otlp/gauges/vmstat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use opentelemetry::{KeyValue, metrics::ObservableGauge};
use psh_system::vmstat::VmstatHandle;

impl super::super::Otlp {
pub fn vmstat_gauges(&self) -> anyhow::Result<ObservableGauge<u64>> {
pub fn vmstat_gauges(&self) -> ObservableGauge<u64> {
let host = self.host.clone();
let interval = self.interval;
let vmstat = VmstatHandle::new();
Expand All @@ -41,6 +41,6 @@ impl super::super::Otlp {
}
})
.build();
Ok(gauge)
gauge
}
}
88 changes: 36 additions & 52 deletions src/otlp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct Otlp {

impl Otlp {
pub fn new(token: String, interval: Duration, export_config: ExportConfig) -> Result<Self> {
let provider = meter_provider(export_config, &token, interval)?;
let provider = Self::meter_provider(export_config, &token, interval)?;
let host = nix::unistd::gethostname()
.ok()
.map(|v| v.to_string_lossy().to_string())
Expand All @@ -69,64 +69,48 @@ impl Otlp {
speed
}

pub async fn otlp_tasks(&self) -> anyhow::Result<()> {
pub async fn otlp_tasks(&self) {
let interval = self.interval;

if let Err(e) = self.mem_gauges() {
tracing::error!("Otlp memory: {e}")
}
if let Err(e) = self.net_gauges() {
tracing::error!("Otlp network: {e}")
}
if let Err(e) = self.disk_gagues() {
tracing::error!("Otlp disk: {e}")
}
if let Err(e) = self.irq_gauges() {
tracing::error!("Otlp interrupt: {e}")
}
if let Err(e) = self.cpu_gauges() {
tracing::error!("Otlp cpu: {e}")
}
if let Err(e) = self.rps_gauges() {
tracing::error!("Otlp rps: {e}")
}
if let Err(e) = self.vmstat_gauges() {
tracing::error!("Otlp vmstat: {e}")
}
if let Err(e) = self.gpu_gauges() {
tracing::error!("Otlp gpu: {e}")
}
self.mem_gauges();
self.net_gauges();
self.disk_gagues();
self.irq_gauges();
self.cpu_gauges();
self.rps_gauges();
self.vmstat_gauges();
self.gpu_gauges();

loop {
tokio::time::sleep(interval).await;
}
}
}

fn meter_provider(
export_config: ExportConfig,
token: &str,
interval: Duration,
) -> Result<SdkMeterProvider> {
let mut meta = MetadataMap::new();
meta.insert("authorization", format!("Bearer {}", token).parse()?);
let otlp_exporter = MetricExporter::builder()
.with_tonic()
.with_tls_config(ClientTlsConfig::new().with_native_roots())
.with_metadata(meta)
.with_timeout(Duration::from_secs(10))
.with_export_config(export_config)
.build()?;
let reader = PeriodicReader::builder(otlp_exporter)
.with_interval(interval)
.build();
let resource = Resource::builder()
.with_attribute(KeyValue::new("service.name", "PSH"))
.build();
let a = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(resource)
.build();
fn meter_provider(
export_config: ExportConfig,
token: &str,
interval: Duration,
) -> Result<SdkMeterProvider> {
let mut meta = MetadataMap::new();
meta.insert("authorization", format!("Bearer {}", token).parse()?);
let otlp_exporter = MetricExporter::builder()
.with_tonic()
.with_tls_config(ClientTlsConfig::new().with_native_roots())
.with_metadata(meta)
.with_timeout(Duration::from_secs(10))
.with_export_config(export_config)
.build()?;
let reader = PeriodicReader::builder(otlp_exporter)
.with_interval(interval)
.build();
let resource = Resource::builder()
.with_attribute(KeyValue::new("service.name", "PSH"))
.build();
let a = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(resource)
.build();

Ok(a)
Ok(a)
}
}
6 changes: 2 additions & 4 deletions src/runtime/data_export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ use profiling::data_export::{
};
use prost::Message;
use psh_proto::{Data, DataType, ExportDataReq};
use tokio::runtime::Runtime;
use wasmtime::component::Linker;

use crate::services::rpc::RpcClient;
use crate::{TOKIO_RUNTIME, services::rpc::RpcClient};

wasmtime::component::bindgen!({
path: "psh-sdk-wit/wit/deps/data-export",
Expand Down Expand Up @@ -82,7 +81,6 @@ impl DataExporter {
let bytes_len = Arc::clone(&bytes_len);
let task_id = task_id.clone();
move || {
let rt = Runtime::new().expect("Failed to init exporter runtime");
let mut data = Vec::new();
loop {
match data_queue.pop() {
Expand All @@ -99,7 +97,7 @@ impl DataExporter {
};

let mut rpc_client = rpc_client.clone();
rt.block_on(async move {
TOKIO_RUNTIME.block_on(async move {
let _ = rpc_client.export_data(merged).await;
});
data.clear();
Expand Down
Loading