Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reformat procfs scraping to handle errors #1178

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 154 additions & 130 deletions lading/src/observer/linux/procfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,158 +118,182 @@ impl Sampler {
}
}
}
// Update the process_info map to only hold processes seen by the current poll call.
self.process_info.retain(|pid, _| pids.contains(pid));
Comment on lines +120 to +121
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could potentially be a separate PR, since it's technically out of scope for the change description.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it fits within the spirit of handling pids that exit I added a line to the change description


let pid = process.pid();
if let Err(e) = self.handle_process(process, &mut aggr).await {
warn!("Encountered uncaught error when handling `/proc/{pid}/`: {e}");
}
}
// END pid loop
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop is now short enough you could cut these.


// `/proc/{pid}/status`
let status = match process.status() {
Ok(status) => status,
Err(e) => {
warn!("Couldn't read status: {:?}", e);
// The pid may have exited since we scanned it or we may not
// have sufficient permission.
continue;
}
};
if status.tgid != pid {
// This is a thread, not a process and we do not wish to scan it.
continue;
gauge!("total_rss_bytes").set(aggr.rss as f64);
gauge!("total_pss_bytes").set(aggr.pss as f64);

Ok(())
}

#[allow(
clippy::similar_names,
clippy::too_many_lines,
clippy::cast_sign_loss,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap
)]
async fn handle_process(
&mut self,
process: Process,
aggr: &mut memory::smaps_rollup::Aggregator,
) -> Result<(), Error> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any codepath that returns an error? I'm not seeing any.

My thought here is that we need to explicitly handle errors when reading process information, ie, avoid the ? operator. So if we make this process not have a return type, we could avoid any accidental error handling that uses ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm yeah I think I see what you are saying. I chose this implementation specifically to handle ? without having to explicitly handle errors. Mostly just because the proc_exe(), proc_comm(), and proc_cmdline() all can error if the pid had exited and I thought it was cleaner to just blanket catch any of those failures rather then add messiness catching different failures that are all resulting from the same root cause (the pid exiting)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very open to changing this tho to catch every error explicitly, just thought this may be easier to add other code later in handle_process without having to risk observer crashes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

proc_exe(), proc_comm(), and proc_cmdline() all can error if the pid had exited and I thought it was cleaner to just blanket catch any of those failures rather then add messiness catching different failures that are all resulting from the same root cause (the pid exiting)

What makes these failures different from, eg process.status() failing?

In my understanding, any and all errors that could be caused by a process exiting during execution of handle_process should be explicitly caught and handled by ignoring and return/return Ok(()).

If there are errors that can occur that are not caused by the process exiting early, then those would be a valid reason to bubble an error out of this function, but I don't think process-exit-errors should escape this function, otherwise what is the point of extracting into handle_process?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah I see what you are saying. It is the same type of failure as process.status(). I will update the code to catch what I expect to be specific "pid-exit" errors. The only exception to that I see is the uptime poll since that is more broad than a specific process

let pid = process.pid();

// `/proc/{pid}/status`
let status = match process.status() {
Ok(status) => status,
Err(e) => {
warn!("Couldn't read status: {:?}", e);
// The pid may have exited since we scanned it or we may not
// have sufficient permission.
return Ok(());
}
};
if status.tgid != pid {
// This is a thread, not a process and we do not wish to scan it.
return Ok(());
}

// If we haven't seen this process before, initialize its ProcessInfo.
match self.process_info.entry(pid) {
Entry::Occupied(_) => { /* Already initialized */ }
Entry::Vacant(entry) => {
let exe = proc_exe(pid).await?;
let comm = proc_comm(pid).await?;
let cmdline = proc_cmdline(pid).await?;
let pid_s = format!("{pid}");
let stat_sampler = stat::Sampler::new();
// If we haven't seen this process before, initialize its ProcessInfo.
match self.process_info.entry(pid) {
Entry::Occupied(_) => { /* Already initialized */ }
Entry::Vacant(entry) => {
let exe = proc_exe(pid).await?;
let comm = proc_comm(pid).await?;
let cmdline = proc_cmdline(pid).await?;
let pid_s = format!("{pid}");
let stat_sampler = stat::Sampler::new();

entry.insert(ProcessInfo {
cmdline,
exe,
comm,
pid_s,
stat_sampler,
});
}
entry.insert(ProcessInfo {
cmdline,
exe,
comm,
pid_s,
stat_sampler,
});
}
}

// SAFETY: We've just inserted this pid into the map.
let pinfo = self
.process_info
.get_mut(&pid)
.expect("catastrophic programming error");
// SAFETY: We've just inserted this pid into the map.
let pinfo = self
.process_info
.get_mut(&pid)
.expect("catastrophic programming error");

let labels: [(&'static str, String); 4] = [
("pid", pinfo.pid_s.clone()),
("exe", pinfo.exe.clone()),
("cmdline", pinfo.cmdline.clone()),
("comm", pinfo.comm.clone()),
];
let labels: [(&'static str, String); 4] = [
("pid", pinfo.pid_s.clone()),
("exe", pinfo.exe.clone()),
("cmdline", pinfo.cmdline.clone()),
("comm", pinfo.comm.clone()),
];

// `/proc/{pid}/status`
report_status_field!(status, labels, vmrss);
report_status_field!(status, labels, rssanon);
report_status_field!(status, labels, rssfile);
report_status_field!(status, labels, rssshmem);
report_status_field!(status, labels, vmdata);
report_status_field!(status, labels, vmstk);
report_status_field!(status, labels, vmexe);
report_status_field!(status, labels, vmlib);
// `/proc/{pid}/status`
report_status_field!(status, labels, vmrss);
report_status_field!(status, labels, rssanon);
report_status_field!(status, labels, rssfile);
report_status_field!(status, labels, rssshmem);
report_status_field!(status, labels, vmdata);
report_status_field!(status, labels, vmstk);
report_status_field!(status, labels, vmexe);
report_status_field!(status, labels, vmlib);

let uptime = uptime::poll().await?;
let uptime = uptime::poll().await?;

// `/proc/{pid}/stat`, most especially per-process CPU data.
if let Err(e) = pinfo.stat_sampler.poll(pid, uptime, &labels).await {
// We don't want to bail out entirely if we can't read stats
// which will happen if we don't have permissions or, more
// likely, the process has exited.
warn!("Couldn't process `/proc/{pid}/stat`: {e}");
continue;
}
// `/proc/{pid}/stat`, most especially per-process CPU data.
if let Err(e) = pinfo.stat_sampler.poll(pid, uptime, &labels).await {
// We don't want to bail out entirely if we can't read stats
// which will happen if we don't have permissions or, more
// likely, the process has exited.
warn!("Couldn't process `/proc/{pid}/stat`: {e}");
return Ok(());
}

// `/proc/{pid}/smaps`
match memory::smaps::Regions::from_pid(pid) {
Ok(memory_regions) => {
for (pathname, measures) in memory_regions.aggregate_by_pathname() {
let labels: [(&'static str, String); 5] = [
("pid", pinfo.pid_s.clone()),
("exe", pinfo.exe.clone()),
("cmdline", pinfo.cmdline.clone()),
("comm", pinfo.comm.clone()),
("pathname", pathname),
];
gauge!("smaps.rss.by_pathname", &labels).set(measures.rss as f64);
gauge!("smaps.pss.by_pathname", &labels).set(measures.pss as f64);
gauge!("smaps.swap.by_pathname", &labels).set(measures.swap as f64);
gauge!("smaps.size.by_pathname", &labels).set(measures.size as f64);
// `/proc/{pid}/smaps`
match memory::smaps::Regions::from_pid(pid) {
Ok(memory_regions) => {
for (pathname, measures) in memory_regions.aggregate_by_pathname() {
let labels: [(&'static str, String); 5] = [
("pid", pinfo.pid_s.clone()),
("exe", pinfo.exe.clone()),
("cmdline", pinfo.cmdline.clone()),
("comm", pinfo.comm.clone()),
("pathname", pathname),
];
gauge!("smaps.rss.by_pathname", &labels).set(measures.rss as f64);
gauge!("smaps.pss.by_pathname", &labels).set(measures.pss as f64);
gauge!("smaps.swap.by_pathname", &labels).set(measures.swap as f64);
gauge!("smaps.size.by_pathname", &labels).set(measures.size as f64);

if let Some(m) = measures.private_clean {
gauge!("smaps.private_clean.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.private_dirty {
gauge!("smaps.private_dirty.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.shared_clean {
gauge!("smaps.shared_clean.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.shared_dirty {
gauge!("smaps.shared_dirty.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.referenced {
gauge!("smaps.referenced.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.anonymous {
gauge!("smaps.anonymous.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.lazy_free {
gauge!("smaps.lazy_free.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.anon_huge_pages {
gauge!("smaps.anon_huge_pages.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.shmem_pmd_mapped {
gauge!("smaps.shmem_pmd_mapped.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.shared_hugetlb {
gauge!("smaps.shared_hugetlb.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.private_hugetlb {
gauge!("smaps.private_hugetlb.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.file_pmd_mapped {
gauge!("smaps.file_pmd_mapped.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.locked {
gauge!("smaps.locked.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.swap_pss {
gauge!("smaps.swap_pss.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.private_clean {
gauge!("smaps.private_clean.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.private_dirty {
gauge!("smaps.private_dirty.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.shared_clean {
gauge!("smaps.shared_clean.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.shared_dirty {
gauge!("smaps.shared_dirty.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.referenced {
gauge!("smaps.referenced.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.anonymous {
gauge!("smaps.anonymous.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.lazy_free {
gauge!("smaps.lazy_free.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.anon_huge_pages {
gauge!("smaps.anon_huge_pages.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.shmem_pmd_mapped {
gauge!("smaps.shmem_pmd_mapped.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.shared_hugetlb {
gauge!("smaps.shared_hugetlb.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.private_hugetlb {
gauge!("smaps.private_hugetlb.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.file_pmd_mapped {
gauge!("smaps.file_pmd_mapped.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.locked {
gauge!("smaps.locked.by_pathname", &labels).set(m as f64);
}
if let Some(m) = measures.swap_pss {
gauge!("smaps.swap_pss.by_pathname", &labels).set(m as f64);
}
}
Err(err) => {
// We don't want to bail out entirely if we can't read stats
// which will happen if we don't have permissions or, more
// likely, the process has exited.
warn!("Couldn't process `/proc/{pid}/smaps`: {err}");
}
}

// `/proc/{pid}/smaps_rollup`
if let Err(err) = memory::smaps_rollup::poll(pid, &labels, &mut aggr).await {
// We don't want to bail out entirely if we can't read smap rollup
Err(err) => {
// We don't want to bail out entirely if we can't read stats
// which will happen if we don't have permissions or, more
// likely, the process has exited.
warn!("Couldn't process `/proc/{pid}/smaps_rollup`: {err}");
warn!("Couldn't process `/proc/{pid}/smaps`: {err}");
return Ok(());
}
}
// END pid loop

gauge!("total_rss_bytes").set(aggr.rss as f64);
gauge!("total_pss_bytes").set(aggr.pss as f64);
// `/proc/{pid}/smaps_rollup`
if let Err(err) = memory::smaps_rollup::poll(pid, &labels, aggr).await {
// We don't want to bail out entirely if we can't read smap rollup
// which will happen if we don't have permissions or, more
// likely, the process has exited.
warn!("Couldn't process `/proc/{pid}/smaps_rollup`: {err}");
return Ok(());
}

Ok(())
}
Expand Down
Loading