Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ chrono = { workspace = true }
clap = { workspace = true, features = ["derive", "wrap_help"] }
tonic = { workspace = true }
prost = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "fs"] }
nix = { workspace = true, features = ["user", "hostname"] }
wasmtime = { workspace = true }
wasmtime-wasi = { workspace = true }
Expand Down Expand Up @@ -50,6 +50,8 @@ influxdb-line-protocol = { workspace = true }
psh-proto = { workspace = true }
mimalloc = { workspace = true }
nvml-wrapper = { workspace = true }
perf-event-rs = { workspace = true }
num_cpus = { workspace = true }

[lints]
workspace = true
Expand All @@ -58,7 +60,7 @@ workspace = true
host-op-perf = { path = "crates/op/host-op-perf" }
host-op-system = { path = "crates/op/host-op-system" }
psh-system = { path = "crates/psh-system" }
perf-event-rs = { git = "https://github.com/OptimatistOpenSource/perf-event-rs.git", rev = "423ca26f53b27193d2321028dae5fd362a9673e9" }
perf-event-rs = { git = "https://github.com/OptimatistOpenSource/perf-event-rs.git", rev = "d6881f34b8a9cde1d70dab5fb1415271e6b0bb25" }
tokio = "^1"
libc = "^0.2"
chrono = "^0.4"
Expand Down Expand Up @@ -87,7 +89,7 @@ local-ip-address = "^0.6"
TinyUFO = "0.4"
crossbeam = "0.8"
influxdb-line-protocol = "2"
psh-proto = { git = "https://github.com/OptimatistOpenSource/psh-proto.git", rev = "ca2919053029cb584b478611f8bf8496bf3cf7f7" }
psh-proto = { git = "https://github.com/OptimatistOpenSource/psh-proto.git", rev = "c82356c52925ac0abf9cd93c7245c3518d78e096" }
mimalloc = "0.1"
nvml-wrapper = "0.10.0"

Expand Down
83 changes: 72 additions & 11 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@ use log::log_init;
use mimalloc::MiMalloc;
use nix::unistd::geteuid;
use opentelemetry_otlp::ExportConfig;
use psh_proto::HeartbeatReq;
use runtime::{Task, TaskRuntime};
use psh_proto::{
HeartbeatReq, PerfDataProto,
export_data_req::{Data, data::DataType},
};
use runtime::{TaskRuntime, WasmTask};
use services::rpc::RpcClient;
use tokio::{runtime::Runtime, try_join};

use self::services::{rpc::WhichTask, sampling::Profiler};

#[global_allocator]
static GLOBAL: MiMalloc = mimalloc::MiMalloc;

Expand Down Expand Up @@ -86,7 +91,7 @@ fn main() -> Result<()> {
let task_rt = TaskRuntime::new()?;

if let Some(args) = wasm_with_args {
let task = Task {
let task = WasmTask {
id: None,
wasm_component: fs::read(&args[0])?,
wasm_component_args: args,
Expand Down Expand Up @@ -143,14 +148,70 @@ async fn async_tasks(remote_cfg: RemoteConfig, mut task_rt: TaskRuntime) -> Resu
loop {
let idle = task_rt.is_idle();
if idle {
if let Some(mut task) = client.get_task(instance_id.clone()).await? {
let task_id = task
.id
.as_ref()
.map(|it| it.to_string())
.expect("No task id provided");
task.wasm_component_args.insert(0, task_id);
task_rt.schedule(task)?
if let Some(task) = client.get_task(instance_id.clone()).await? {
match task {
WhichTask::Wasm(mut task) => {
let task_id = task
.id
.as_ref()
.map(|it| it.to_string())
.expect("No task id provided");
task.wasm_component_args.insert(0, task_id);
task_rt.schedule(task)?;
}
WhichTask::Profiling(profiling_task) => {
let mut profiler = Profiler::new(
profiling_task.process,
profiling_task.mmap_pages as _,
profiling_task.overflow_by,
profiling_task.stack_depth,
)?;
let task_time_slice = {
let delta = profiling_task.end_time.timestamp_millis()
- Utc::now().timestamp_millis();
delta.max(0) as u64
};

let perf_data = tokio::task::spawn_blocking(
move || -> anyhow::Result<PerfDataProto> {
profiler.enable()?;
std::thread::sleep(Duration::from_millis(task_time_slice));
profiler.disable()?;
Ok(profiler.perf_data_proto())
},
)
.await??;
if let Some(task_id) = profiling_task.id {
let mut data = vec![];
for ele in &perf_data.events {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ele -> event?

let Some(event_type) = &ele.event_type else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

读取ELF文件为什么需要event_type? 这个不是强关联吧?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

文件名是在MmapEvent里面的,所以需要从里面获取文件名。

continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这些地方需要打印错误日志,帮助后续的排查。默认静默掉这些错误,排查起来比较麻烦,可能是网络问题,也可能是参数错误。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

event_type 在proto里面是用 oneof定义, oneof event_type {,使用oneof就会生成一个option的字段,这里没有就跳过是合适的。

};

if let psh_proto::perf_data_proto::perf_event::EventType::MmapEvent(event)= event_type{
let Some(filename) = &event.filename else {
continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上

};

let data_type = DataType::ElfFile(psh_proto::ElfFile {
filename: filename.to_owned(),
build_id: event.build_id.clone(),
arch: std::env::consts::ARCH.to_string(),
bytes: tokio::fs::read(filename).await?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

应该是先做一些校验,然后在判断是不是要读取文件内容,而不是直接读取。比如文件不存在怎么处理, build_id不一致怎么处理,用户是否允许读取某个文件/目录等等。

});
data.push(Data { data_type: Some(data_type) });
}
}
let dat = Data {
data_type: Some(DataType::PerfData(perf_data)),
};
data.push(dat);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

采样和上传ELF文件两个可以分开,放在一起反而不方便。不是所有的ELF文件都需要上传。

client
.export_data(psh_proto::ExportDataReq { task_id, data })
.await?;
}
}
};
}
}

Expand Down
14 changes: 7 additions & 7 deletions src/runtime/data_export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ use profiling::data_export::{
common::FieldValue as WitFieldValue, measurement::Point, metric::Sample,
};
use prost::Message;
use psh_proto::{Data, DataType, ExportDataReq};
use psh_proto::{
ExportDataReq, LineProtocolData,
export_data_req::{Data, data::DataType},
};
use wasmtime::component::Linker;

use crate::{TOKIO_RUNTIME, services::rpc::RpcClient};
Expand Down Expand Up @@ -172,8 +175,7 @@ impl profiling::data_export::file::Host for DataExportCtx {
};

let data = Data {
ty: DataType::File as _,
bytes,
data_type: Some(DataType::File(psh_proto::FileData { bytes })),
};
ctx.exporter.schedule(data);

Expand Down Expand Up @@ -201,8 +203,7 @@ impl profiling::data_export::metric::Host for DataExportCtx {
};

let data = Data {
ty: DataType::LineProtocol as _,
bytes,
data_type: Some(DataType::LineProtocol(LineProtocolData { bytes })),
};
ctx.exporter.schedule(data);

Expand Down Expand Up @@ -238,8 +239,7 @@ impl profiling::data_export::measurement::Host for DataExportCtx {
};

let data = Data {
ty: DataType::LineProtocol as _,
bytes,
data_type: Some(DataType::LineProtocol(LineProtocolData { bytes })),
};
ctx.exporter.schedule(data);

Expand Down
9 changes: 5 additions & 4 deletions src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,17 @@ pub use state::PshState;

use crate::services::rpc::RpcClient;

pub struct Task {
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct WasmTask {
pub id: Option<String>,
pub wasm_component: Vec<u8>,
pub wasm_component_args: Vec<String>,
pub end_time: DateTime<Utc>,
}

pub struct TaskRuntime {
tx: Sender<Task>,
rx: Option<Receiver<Task>>,
tx: Sender<WasmTask>,
rx: Option<Receiver<WasmTask>>,
len: Arc<AtomicUsize>,
finished_task_id: Arc<Mutex<Vec<String>>>,
}
Expand All @@ -65,7 +66,7 @@ impl TaskRuntime {
})
}

pub fn schedule(&self, task: Task) -> Result<()> {
pub fn schedule(&self, task: WasmTask) -> Result<()> {
self.len.fetch_add(1, Ordering::Release);
self.tx.send(task)?;
Ok(())
Expand Down
1 change: 1 addition & 0 deletions src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@

pub mod host_info;
pub mod rpc;
pub mod sampling;
64 changes: 51 additions & 13 deletions src/services/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
// see <https://www.gnu.org/licenses/>.

use anyhow::{Result, bail};
use chrono::DateTime;
use chrono::{TimeZone, Utc, offset::LocalResult};
use perf_event_rs::sampling::OverflowBy;
use psh_proto::task::TaskType;
use psh_proto::{
ExportDataReq, GetTaskReq, HeartbeatReq, TaskDoneReq, Unit,
psh_service_client::PshServiceClient,
Expand All @@ -26,7 +29,7 @@ use tonic::{
transport::{Channel, ClientTlsConfig, Endpoint},
};

use crate::{config::RpcConfig, runtime::Task, services::host_info::new_info_req};
use crate::{config::RpcConfig, runtime::WasmTask, services::host_info::new_info_req};

#[derive(Clone)]
pub struct RpcClient {
Expand Down Expand Up @@ -80,6 +83,20 @@ where
}
}

pub enum WhichTask {
Wasm(WasmTask),
Profiling(ProfilingTask),
}

pub struct ProfilingTask {
pub id: Option<String>,
pub process: perf_event_rs::config::Process,
pub mmap_pages: u64,
pub overflow_by: OverflowBy,
pub stack_depth: Option<u16>,
pub end_time: DateTime<Utc>,
}

impl RpcClient {
pub async fn new(config: &RpcConfig, token: String) -> Result<Self> {
let ep = Endpoint::from_shared(config.addr.clone())?
Expand Down Expand Up @@ -132,7 +149,7 @@ impl RpcClient {
Ok(())
}

pub async fn get_task(&mut self, instance_id: String) -> Result<Option<Task>> {
pub async fn get_task(&mut self, instance_id: String) -> Result<Option<WhichTask>> {
let get_task_req = GetTaskReq { instance_id };
let token = &self.token;

Expand All @@ -142,20 +159,41 @@ impl RpcClient {
self.client.get_task(req).await
})
.await?;
let task = match response.into_inner().task {
Some(task) => task,
None => return Ok(None),
let Some(task): Option<psh_proto::Task> = response.into_inner().task else {
return Ok(None);
};

let end_time = match Utc.timestamp_millis_opt(task.end_time as _) {
LocalResult::Single(t) => t,
_ => bail!("Invalid task end time"),
let LocalResult::Single(end_time) = Utc.timestamp_millis_opt(task.end_time as _) else {
bail!("Invalid task end time")
};
let task = Task {
id: Some(task.id),
wasm_component: task.wasm,
wasm_component_args: task.wasm_args,
end_time,
let Some(task_type) = task.task_type else {
return Ok(None);
};

let task = match task_type {
TaskType::Profiling(profiling_task) => {
let Some(process) = profiling_task.process else {
return Ok(None);
};
let Some(overflow_by) = profiling_task.overflow_by else {
return Ok(None);
};
let process: perf_event_rs::config::Process = process.into();
WhichTask::Profiling(ProfilingTask {
id: task.id.into(),
process,
mmap_pages: profiling_task.mmap_pages,
overflow_by: overflow_by.into(),
stack_depth: profiling_task.stack_depth.map(|v| v as _),
end_time,
})
}
TaskType::Wasm(wasm_task) => WhichTask::Wasm(WasmTask {
id: Some(task.id),
wasm_component: wasm_task.wasm,
wasm_component_args: wasm_task.wasm_args,
end_time,
}),
};

Ok(Some(task))
Expand Down
Loading
Loading