diff --git a/Cargo.lock b/Cargo.lock
index 8babd2e..3e628bc 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -220,7 +220,7 @@ dependencies = [
"cfg-if",
"libc",
"miniz_oxide",
- "object",
+ "object 0.36.7",
"rustc-demangle",
"windows-targets 0.52.6",
]
@@ -1952,6 +1952,17 @@ dependencies = [
"memchr",
]
+[[package]]
+name = "object"
+version = "0.37.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ff76201f031d8863c38aa7f905eca4f53abbfa15f609db4277d44cd8938f33fe"
+dependencies = [
+ "flate2",
+ "memchr",
+ "ruzstd",
+]
+
[[package]]
name = "once_cell"
version = "1.21.3"
@@ -2099,7 +2110,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "perf-event-rs"
version = "0.0.0"
-source = "git+https://github.com/OptimatistOpenSource/perf-event-rs.git?rev=423ca26f53b27193d2321028dae5fd362a9673e9#423ca26f53b27193d2321028dae5fd362a9673e9"
+source = "git+https://github.com/OptimatistOpenSource/perf-event-rs.git?rev=d6881f34b8a9cde1d70dab5fb1415271e6b0bb25#d6881f34b8a9cde1d70dab5fb1415271e6b0bb25"
dependencies = [
"bindgen",
"libc",
@@ -2290,10 +2301,13 @@ dependencies = [
"local-ip-address",
"mimalloc",
"nix",
+ "num_cpus",
"nvml-wrapper",
+ "object 0.37.3",
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry_sdk",
+ "perf-event-rs",
"prost",
"psh-proto",
"psh-system",
@@ -2310,9 +2324,11 @@ dependencies = [
[[package]]
name = "psh-proto"
version = "0.1.0"
-source = "git+https://github.com/OptimatistOpenSource/psh-proto.git?rev=ca2919053029cb584b478611f8bf8496bf3cf7f7#ca2919053029cb584b478611f8bf8496bf3cf7f7"
+source = "git+https://github.com/OptimatistOpenSource/psh-proto.git?rev=5cf7cc9#5cf7cc987c936dc751d22ee76837b5fefa897677"
dependencies = [
+ "perf-event-rs",
"prost",
+ "serde",
"tonic",
"tonic-build",
]
@@ -2671,6 +2687,15 @@ version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eded382c5f5f786b989652c49544c4877d9f015cc22e145a5ea8ea66c2921cd2"
+[[package]]
+name = "ruzstd"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3640bec8aad418d7d03c72ea2de10d5c646a598f9883c7babc160d91e3c1b26c"
+dependencies = [
+ "twox-hash",
+]
+
[[package]]
name = "ryu"
version = "1.0.20"
@@ -3301,6 +3326,12 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
+[[package]]
+name = "twox-hash"
+version = "2.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c"
+
[[package]]
name = "typenum"
version = "1.18.0"
@@ -3561,7 +3592,7 @@ dependencies = [
"log",
"mach2",
"memfd",
- "object",
+ "object 0.36.7",
"once_cell",
"paste",
"postcard",
@@ -3660,7 +3691,7 @@ dependencies = [
"gimli",
"itertools 0.12.1",
"log",
- "object",
+ "object 0.36.7",
"smallvec",
"target-lexicon",
"thiserror 1.0.69",
@@ -3682,7 +3713,7 @@ dependencies = [
"gimli",
"indexmap",
"log",
- "object",
+ "object 0.36.7",
"postcard",
"rustc-demangle",
"semver",
@@ -3717,7 +3748,7 @@ version = "28.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cec0a8e5620ae71bfcaaec78e3076be5b6ebf869f4e6191925d73242224a915"
dependencies = [
- "object",
+ "object 0.36.7",
"rustix 0.38.44",
"wasmtime-versioned-export-macros",
]
@@ -3790,7 +3821,7 @@ dependencies = [
"anyhow",
"cranelift-codegen",
"gimli",
- "object",
+ "object 0.36.7",
"target-lexicon",
"wasmparser 0.221.3",
"wasmtime-cranelift",
diff --git a/Cargo.toml b/Cargo.toml
index b0c2684..1ea9eaa 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -19,7 +19,7 @@ chrono = { workspace = true }
clap = { workspace = true, features = ["derive", "wrap_help"] }
tonic = { workspace = true }
prost = { workspace = true }
-tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
+tokio = { workspace = true, features = ["macros", "rt-multi-thread", "fs"] }
nix = { workspace = true, features = ["user", "hostname"] }
wasmtime = { workspace = true }
wasmtime-wasi = { workspace = true }
@@ -50,6 +50,9 @@ influxdb-line-protocol = { workspace = true }
psh-proto = { workspace = true }
mimalloc = { workspace = true }
nvml-wrapper = { workspace = true }
+perf-event-rs = { workspace = true }
+num_cpus = { workspace = true }
+object = { workspace = true }
[lints]
workspace = true
@@ -58,7 +61,7 @@ workspace = true
host-op-perf = { path = "crates/op/host-op-perf" }
host-op-system = { path = "crates/op/host-op-system" }
psh-system = { path = "crates/psh-system" }
-perf-event-rs = { git = "https://github.com/OptimatistOpenSource/perf-event-rs.git", rev = "423ca26f53b27193d2321028dae5fd362a9673e9" }
+perf-event-rs = { git = "https://github.com/OptimatistOpenSource/perf-event-rs.git", rev = "d6881f34b8a9cde1d70dab5fb1415271e6b0bb25" }
tokio = "^1"
libc = "^0.2"
chrono = "^0.4"
@@ -87,9 +90,10 @@ local-ip-address = "^0.6"
TinyUFO = "0.4"
crossbeam = "0.8"
influxdb-line-protocol = "2"
-psh-proto = { git = "https://github.com/OptimatistOpenSource/psh-proto.git", rev = "ca2919053029cb584b478611f8bf8496bf3cf7f7" }
+psh-proto = { git = "https://github.com/OptimatistOpenSource/psh-proto.git", rev = "5cf7cc9" }
mimalloc = "0.1"
nvml-wrapper = "0.10.0"
+object = "0.37"
[workspace.lints.rust]
diff --git a/doc/config.toml b/doc/config.toml
index 2568600..89dc4a0 100644
--- a/doc/config.toml
+++ b/doc/config.toml
@@ -30,3 +30,6 @@ buf_watermark = 2048
enable = false
addr = "https://api.optimatist.com"
interval = 10
+
+[perms]
+allowed_paths = []
diff --git a/src/config.rs b/src/config.rs
index 0347412..459c98e 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -12,7 +12,10 @@
// You should have received a copy of the GNU Lesser General Public License along with Performance Savior Home (PSH). If not,
// see .
-use std::{fs, path::Path};
+use std::{
+ fs,
+ path::{Path, PathBuf},
+};
use anyhow::Result;
use serde::Deserialize;
@@ -24,6 +27,12 @@ const TEMPLATE: &str = include_str!("../doc/config.toml");
pub struct Config {
pub daemon: DaemonConfig,
pub remote: RemoteConfig,
+ pub perms: PermsConfig,
+}
+
+#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Deserialize)]
+pub struct PermsConfig {
+ pub allowed_paths: Vec,
}
#[derive(Clone, Deserialize)]
diff --git a/src/main.rs b/src/main.rs
index 7bd2d82..94fbfcf 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -20,7 +20,7 @@ mod otlp;
mod runtime;
mod services;
-use std::{fs, sync::LazyLock, thread, time::Duration};
+use std::{fs, path::PathBuf, str::FromStr, sync::LazyLock, thread, time::Duration};
use anyhow::{Error, Result, bail};
use args::Args;
@@ -32,11 +32,20 @@ use log::log_init;
use mimalloc::MiMalloc;
use nix::unistd::geteuid;
use opentelemetry_otlp::ExportConfig;
-use psh_proto::HeartbeatReq;
-use runtime::{Task, TaskRuntime};
+use psh_proto::{
+ HeartbeatReq, PerfDataProto,
+ export_data_req::{Data, data::DataType},
+ task_done_req::TaskStatus,
+};
+use runtime::{TaskRuntime, WasmTask};
use services::rpc::RpcClient;
use tokio::{runtime::Runtime, try_join};
+use self::{
+ config::PermsConfig,
+ services::{rpc::WhichTask, sampling::Profiler},
+};
+
#[global_allocator]
static GLOBAL: MiMalloc = mimalloc::MiMalloc;
@@ -86,7 +95,7 @@ fn main() -> Result<()> {
let task_rt = TaskRuntime::new()?;
if let Some(args) = wasm_with_args {
- let task = Task {
+ let task = WasmTask {
id: None,
wasm_component: fs::read(&args[0])?,
wasm_component_args: args,
@@ -96,7 +105,7 @@ fn main() -> Result<()> {
};
thread::spawn(move || -> Result<()> {
- let tasks = async_tasks(cfg.remote, task_rt);
+ let tasks = async_tasks(cfg.remote, task_rt, cfg.perms);
TOKIO_RUNTIME.block_on(tasks)?;
Ok(())
})
@@ -106,7 +115,11 @@ fn main() -> Result<()> {
Ok(())
}
-async fn async_tasks(remote_cfg: RemoteConfig, mut task_rt: TaskRuntime) -> Result<()> {
+async fn async_tasks(
+ remote_cfg: RemoteConfig,
+ mut task_rt: TaskRuntime,
+ perms_cfg: PermsConfig,
+) -> Result<()> {
let token_cloned = remote_cfg.token.clone();
let rpc_task = async move {
if !remote_cfg.rpc.enable {
@@ -143,14 +156,115 @@ async fn async_tasks(remote_cfg: RemoteConfig, mut task_rt: TaskRuntime) -> Resu
loop {
let idle = task_rt.is_idle();
if idle {
- if let Some(mut task) = client.get_task(instance_id.clone()).await? {
- let task_id = task
- .id
- .as_ref()
- .map(|it| it.to_string())
- .expect("No task id provided");
- task.wasm_component_args.insert(0, task_id);
- task_rt.schedule(task)?
+ if let Some(task) = client.get_task(instance_id.clone()).await? {
+ match task {
+ WhichTask::Wasm(mut task) => {
+ let task_id = task
+ .id
+ .as_ref()
+ .map(|it| it.to_string())
+ .expect("No task id provided");
+ task.wasm_component_args.insert(0, task_id);
+ task_rt.schedule(task)?;
+ }
+ WhichTask::Profiling(profiling_task) => {
+ let mut profiler = Profiler::new(
+ profiling_task.process,
+ profiling_task.mmap_pages as _,
+ profiling_task.overflow_by,
+ profiling_task.stack_depth,
+ )?;
+ let task_time_slice = {
+ let delta = profiling_task.end_time.timestamp_millis()
+ - Utc::now().timestamp_millis();
+ delta.max(0) as u64
+ };
+
+ let perf_data = tokio::task::spawn_blocking(
+ move || -> anyhow::Result {
+ profiler.enable()?;
+ std::thread::sleep(Duration::from_millis(task_time_slice));
+ profiler.disable()?;
+ Ok(profiler.perf_data_proto())
+ },
+ )
+ .await??;
+ if let Some(task_id) = profiling_task.id {
+ let mut data = vec![];
+ let dat = Data {
+ data_type: Some(DataType::PerfData(perf_data)),
+ };
+ data.push(dat);
+ client
+ .export_data(psh_proto::ExportDataReq { task_id, data })
+ .await?;
+ } else {
+ // TODO: analyze locally
+ }
+ }
+ WhichTask::UploadElf(upload_elf) => {
+ let mut data = vec![];
+ let file_path = PathBuf::from_str(&upload_elf.filename).unwrap();
+ let allowed = perms_cfg
+ .allowed_paths
+ .iter()
+ .any(|v| file_path.starts_with(v));
+ if !allowed {
+ if let Err(e) = client
+ .task_done(upload_elf.id, TaskStatus::AccessDeined)
+ .await
+ {
+ tracing::error!("{e}");
+ }
+ continue;
+ }
+
+ let bytes = match tokio::fs::read(&upload_elf.filename).await {
+ Ok(o) => o,
+ Err(e) => {
+ if e.kind() == std::io::ErrorKind::NotFound {
+ if let Err(e) = client
+ .task_done(upload_elf.id, TaskStatus::NotFound)
+ .await
+ {
+ tracing::error!("{}", e);
+ }
+ }
+ continue;
+ }
+ };
+
+ let build_id = services::sampling::get_build_id(&bytes)?;
+ match (upload_elf.build_id, build_id) {
+ (Some(tbid), Some(bid)) if tbid != bid.to_string() => {
+ if let Err(e) = client
+ .task_done(upload_elf.id, TaskStatus::BadBuildId)
+ .await
+ {
+ tracing::error!("{e}");
+ }
+ continue;
+ }
+ (_, _) => {}
+ };
+
+ let data_type = DataType::ElfFile(psh_proto::ElfFile {
+ filename: upload_elf.filename.to_owned(),
+ build_id: build_id.map(|v| v.to_string()),
+ arch: std::env::consts::ARCH.to_string(),
+ bytes,
+ });
+ data.push(Data {
+ data_type: Some(data_type),
+ });
+ client
+ .export_data(psh_proto::ExportDataReq {
+ task_id: upload_elf.id,
+ data,
+ })
+ .await?;
+ }
+ };
}
}
@@ -162,7 +276,9 @@ async fn async_tasks(remote_cfg: RemoteConfig, mut task_rt: TaskRuntime) -> Resu
.await?;
if let Some(id) = task_rt.finished_task_id() {
- let _ = client.task_done(id).await;
+ if let Err(e) = client.task_done(id, TaskStatus::Ok).await {
+ tracing::error!("{e}");
+ }
}
tokio::time::sleep(duration).await;
diff --git a/src/runtime/data_export.rs b/src/runtime/data_export.rs
index 03abfab..241e9c6 100644
--- a/src/runtime/data_export.rs
+++ b/src/runtime/data_export.rs
@@ -26,7 +26,10 @@ use profiling::data_export::{
common::FieldValue as WitFieldValue, measurement::Point, metric::Sample,
};
use prost::Message;
-use psh_proto::{Data, DataType, ExportDataReq};
+use psh_proto::{
+ ExportDataReq, LineProtocolData,
+ export_data_req::{Data, data::DataType},
+};
use wasmtime::component::Linker;
use crate::{TOKIO_RUNTIME, services::rpc::RpcClient};
@@ -172,8 +175,7 @@ impl profiling::data_export::file::Host for DataExportCtx {
};
let data = Data {
- ty: DataType::File as _,
- bytes,
+ data_type: Some(DataType::File(psh_proto::FileData { bytes })),
};
ctx.exporter.schedule(data);
@@ -201,8 +203,7 @@ impl profiling::data_export::metric::Host for DataExportCtx {
};
let data = Data {
- ty: DataType::LineProtocol as _,
- bytes,
+ data_type: Some(DataType::LineProtocol(LineProtocolData { bytes })),
};
ctx.exporter.schedule(data);
@@ -238,8 +239,7 @@ impl profiling::data_export::measurement::Host for DataExportCtx {
};
let data = Data {
- ty: DataType::LineProtocol as _,
- bytes,
+ data_type: Some(DataType::LineProtocol(LineProtocolData { bytes })),
};
ctx.exporter.schedule(data);
diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs
index 0302d4c..5da255b 100644
--- a/src/runtime/mod.rs
+++ b/src/runtime/mod.rs
@@ -39,7 +39,8 @@ pub use state::PshState;
use crate::services::rpc::RpcClient;
-pub struct Task {
+#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
+pub struct WasmTask {
pub id: Option,
pub wasm_component: Vec,
pub wasm_component_args: Vec,
@@ -47,8 +48,8 @@ pub struct Task {
}
pub struct TaskRuntime {
- tx: Sender,
- rx: Option>,
+ tx: Sender,
+ rx: Option>,
len: Arc,
finished_task_id: Arc>>,
}
@@ -65,7 +66,7 @@ impl TaskRuntime {
})
}
- pub fn schedule(&self, task: Task) -> Result<()> {
+ pub fn schedule(&self, task: WasmTask) -> Result<()> {
self.len.fetch_add(1, Ordering::Release);
self.tx.send(task)?;
Ok(())
diff --git a/src/services/mod.rs b/src/services/mod.rs
index 403a784..30178a3 100644
--- a/src/services/mod.rs
+++ b/src/services/mod.rs
@@ -14,3 +14,4 @@
pub mod host_info;
pub mod rpc;
+pub mod sampling;
diff --git a/src/services/rpc.rs b/src/services/rpc.rs
index bd648f6..28c9cea 100644
--- a/src/services/rpc.rs
+++ b/src/services/rpc.rs
@@ -13,7 +13,10 @@
// see .
use anyhow::{Result, bail};
+use chrono::DateTime;
use chrono::{TimeZone, Utc, offset::LocalResult};
+use perf_event_rs::sampling::OverflowBy;
+use psh_proto::task::TaskType;
use psh_proto::{
ExportDataReq, GetTaskReq, HeartbeatReq, TaskDoneReq, Unit,
psh_service_client::PshServiceClient,
@@ -26,7 +29,7 @@ use tonic::{
transport::{Channel, ClientTlsConfig, Endpoint},
};
-use crate::{config::RpcConfig, runtime::Task, services::host_info::new_info_req};
+use crate::{config::RpcConfig, runtime::WasmTask, services::host_info::new_info_req};
#[derive(Clone)]
pub struct RpcClient {
@@ -80,6 +83,28 @@ where
}
}
+pub enum WhichTask {
+ Wasm(WasmTask),
+ Profiling(ProfilingTask),
+ UploadElf(UploadElfTask),
+}
+
+#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
+pub struct UploadElfTask {
+ pub id: String,
+ pub filename: String,
+ pub build_id: Option,
+}
+
+pub struct ProfilingTask {
+ pub id: Option,
+ pub process: perf_event_rs::config::Process,
+ pub mmap_pages: u64,
+ pub overflow_by: OverflowBy,
+ pub stack_depth: Option,
+ pub end_time: DateTime,
+}
+
impl RpcClient {
pub async fn new(config: &RpcConfig, token: String) -> Result {
let ep = Endpoint::from_shared(config.addr.clone())?
@@ -132,7 +157,7 @@ impl RpcClient {
Ok(())
}
- pub async fn get_task(&mut self, instance_id: String) -> Result