Skip to content

Commit

Permalink
Fix: some fixes and optimizations
Browse files Browse the repository at this point in the history
removing eBPF script's stand-alone mode
more details about syscalls
  • Loading branch information
hov1417 committed Aug 8, 2023
1 parent 7cd35e3 commit 18bb8bb
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 218 deletions.
225 changes: 29 additions & 196 deletions ebpf/ebpf.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
#!/usr/bin/env python3

import argparse
import json
import logging
import os
import signal
import resource
import sys
from time import sleep, strftime, time
import signal
from time import sleep, time
from typing import Optional, Tuple

import pexpect
Expand Down Expand Up @@ -48,31 +46,6 @@ def update(self, size):
self.size += size


def print_outstanding(bpf, pid, top, min_age_ns):
print(f"Top {top} stacks with outstanding allocations:")
alloc_info = {}
allocs = bpf["allocs"]
stack_traces = bpf["stack_traces"]
for address, info in sorted(allocs.items(), key=lambda a: a[1].size):
if BPF.monotonic_time() - min_age_ns < info.timestamp_ns or info.stack_id < 0:
continue
if info.stack_id in alloc_info:
alloc_info[info.stack_id].update(info.size)
else:
stack = list(stack_traces.walk(info.stack_id))
combined = []
for addr in stack:
func_name = bpf.sym(addr, pid, show_module=True, show_offset=True)
formatted_address = ('0x' + format(addr, '016x') + '\t').encode('utf-8')
combined.append(formatted_address + func_name)
alloc_info[info.stack_id] = Allocation(combined, info.size, address.value)
print(f"\taddr = {address.value} size = {info.size}")
to_show = sorted(alloc_info.values(), key=lambda a: a.size)[-top:]
for alloc in to_show:
stack = b"\n\t\t".join(alloc.stack).decode("ascii")
print(f"\t{alloc.size} bytes in {alloc.count} allocations from stack\n\t\t{stack}")


def get_outstanding(bpf, pid, min_age_ns, top):
alloc_info = {}
allocs = bpf["allocs"]
Expand Down Expand Up @@ -122,30 +95,6 @@ def is_positive(self):
return self.alloc_size > self.free_size


def print_memory_statistics(bpf, pid, top):
stack_traces = bpf["stack_traces"]
print("stack traces", len(list(stack_traces.items())))
combined_alloc = list(
sorted(
map(CombinedAlloc, bpf["combined_allocs"].items()),
key=lambda a: a.key(),
)
)
memory = sum((c.alloc_size - c.free_size for c in combined_alloc)) / 1024
print("overall, allocated", memory, "kb in", len(combined_alloc), "allocations")
entries = []
for allocation in combined_alloc[:top]:
trace = get_trace_info(bpf, pid, stack_traces, allocation.stack_id.value)
entry = f"\t{allocation.alloc_size - allocation.free_size} bytes in " \
f"{allocation.number_of_allocs - allocation.number_of_frees}" \
f" allocations from stack ({allocation.number_of_allocs + allocation.number_of_frees} allocs/frees)" \
f"\n\t\t{trace}"
entries.append(entry)

print(f"Top {top} stacks with outstanding allocations:")
print('\n'.join(reversed(entries)))


def get_statistics(bpf, pid, min_age_ns, top):
stack_traces = bpf["stack_traces"]
combined_alloc = list(
Expand Down Expand Up @@ -200,18 +149,6 @@ def walk_trace(stack_traces, stack_id):
return []


def print_outstanding_kernel_cache(bpf, top):
kernel_cache_allocs = list(
sorted(filter(lambda a: a[1].alloc_size > a[1].free_size, bpf['kernel_cache_counts'].items()),
key=lambda a: a[1].alloc_size - a[1].free_size)
)[:top]
if not kernel_cache_allocs:
return
print("---------------- Kernel Caches ---------------")
for (k, v) in kernel_cache_allocs:
print("Cache", str(k.name, "utf-8"), v.alloc_count - v.free_count, v.alloc_size - v.free_size)


def gernel_kernel_cache(bpf, top):
kernel_cache_allocs = list(
sorted(filter(lambda a: a[1].alloc_size > a[1].free_size, bpf['kernel_cache_counts'].items()),
Expand All @@ -237,21 +174,13 @@ def gernel_kernel_cache(bpf, top):
return caches


def print_syscalls(bpf, top):
syscall_counts = bpf["syscall_counts"]
print("SYSCALL COUNT TIME")
for k, v in sorted(syscall_counts.items(), key=lambda kv: -kv[1].total_ns)[:top]:
print("%-22s %8d %16.3f" % (system_call_name(k.value), v.count, v.total_ns / 1e3))
syscall_counts.clear()


def get_syscalls(bpf):
syscall_counts = bpf["syscall_counts"]
syscalls = {}
for k, v in syscall_counts.items():
key = k.value
syscall_id = key >> 32
thread_id = (1 << 32) & key
thread_id = ((1 << 32) - 1) & key

syscalls.setdefault(thread_id, []).append({
'name': system_call_name(syscall_id),
Expand All @@ -268,28 +197,28 @@ def system_call_name(k):
return syscall_name(k).decode('ascii')


def print_time():
print("[%s]" % strftime("%H:%M:%S"))


def get_syscall_stacks(bpf, pid):
def get_additional_syscall_info(bpf, pid):
syscall_counts_stacks = bpf['syscall_counts_stacks']
syscalls_per_thread = {}
stack_ids = []
pid_to_syscall_times = {}
tid_to_syscall_timestamps = {}
for k, v in syscall_counts_stacks.items():
stack_set = syscalls_per_thread.setdefault(v.pid_tgid, {}).setdefault(system_call_name(v.id),
{'stacks': set(), 'number': 0})
stack_set['stacks'].add(v.stack_id)
stack_set['number'] += 1
stack_ids.append(v.stack_id)
pid_to_syscall_times.setdefault(v.pid_tgid, []).append(k.value)

stacks = {stack_id: get_trace_info_as_list(bpf, pid, bpf['stack_traces'], stack_id) for stack_id in stack_ids}
thread_id = ((1 << 32) - 1) & v.pid_tgid
stack_dict = syscalls_per_thread.setdefault(thread_id, {}).setdefault(system_call_name(v.id), {})
stack_trace = '\n\t'.join(get_trace_info_as_list(bpf, pid, bpf['stack_traces'], v.stack_id))
stack_dict[stack_trace] = stack_dict.get(stack_trace, 0) + 1
tid_to_syscall_timestamps.setdefault(thread_id, []).append(k.value)

syscalls_per_thread = {thread_id: {
call_name: [
{
'stack_trace': trace.split('\n\t'),
'count': count,
} for (trace, count) in per_syscall.items()
] for (call_name, per_syscall) in calls.items()}
for (thread_id, calls) in syscalls_per_thread.items()}
return {
'syscalls': syscalls_per_thread,
'stacks': stacks,
'pid_to_syscall_times': pid_to_syscall_times
'syscall_stacks': syscalls_per_thread,
'tid_to_syscall_timestamps': tid_to_syscall_timestamps
}


Expand All @@ -300,19 +229,20 @@ def save_snapshot(
top: int,
snapshot_dir: str,
with_memory: bool,
with_syscall_stacks: bool,
with_syscall_details: bool,
snapshot_prefix: Optional[str] = None
):
current_time_millis = int(round(time() * 1000))
snapshot = {'time': current_time_millis}

if with_memory:
snapshot['memory_stats'] = get_statistics(bpf, pid, min_age_ns, top)
snapshot['kernel_caches'] = gernel_kernel_cache(bpf, top)

snapshot['syscalls'] = get_syscalls(bpf)

if with_syscall_stacks:
snapshot['syscall_stacks'] = get_syscall_stacks(bpf, pid)
if with_syscall_details:
snapshot['syscall_details'] = get_additional_syscall_info(bpf, pid)

os.makedirs(snapshot_dir, exist_ok=True)
with open(get_result_file_name(snapshot_dir, snapshot_prefix or 'snapshot', 'json'), 'w') as outfile:
Expand All @@ -328,74 +258,6 @@ def get_result_file_name(dir_name: str, prefix: str, suffix: str):
return path


class Arguments:
def __init__(self, args):
self.pid = args.pid
self.command = args.command
self.interval = args.interval
self.min_age_ns = 1e6 * args.older
self.alloc_sample_every_n = args.alloc_sample_rate
self.top = args.top
self.min_alloc_size = args.min_alloc_size
self.max_alloc_size = args.max_alloc_size

if args.snapshots is None:
self.save_snapshots = './snapshots'
elif args.snapshots == -1:
self.save_snapshots = None
else:
self.save_snapshots = args.snapshots

if self.min_alloc_size is not None and self.max_alloc_size is not None \
and self.min_alloc_size > self.max_alloc_size:
print("min_size (-z) can't be greater than max_size (-Z)")
exit(1)

if self.command is None and self.pid is None:
print("Either -p or -c must be specified")
exit(1)


def parse():
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("-p", "--pid", type=int, default=-1,
help="the PID to trace; if not specified, trace kernel allocs")
parser.add_argument("interval", nargs="?", default=5, type=int,
help="interval in seconds to print outstanding allocations")
parser.add_argument("-o", "--older", default=500, type=int,
help="prune allocations younger than this age in milliseconds")
parser.add_argument("-c", "--command",
help="execute and trace the specified command")
parser.add_argument("-s", "--alloc-sample-rate", default=1, type=int,
help="sample every N-th allocation to decrease the overhead")
parser.add_argument("-T", "--top", type=int, default=10,
help="display only this many top stats")
parser.add_argument("-z", "--min-alloc-size", type=int,
help="capture only allocations larger than this size")
parser.add_argument("-Z", "--max-alloc-size", type=int,
help="capture only allocations smaller than this size")
parser.add_argument("-S", "--snapshots", default=-1, type=str, nargs='?',
help="save statistics snapshots to the specified directory")
return Arguments(parser.parse_args())


def attach_ebpf(pid: int = -1,
process: Optional[spawn] = None,
command: Optional[str] = None,
top: int = 10,
interval: int = 5,
alloc_sample_every_n: int = 1,
max_alloc_size: Optional[int] = None,
min_age_ns: int = 500,
min_alloc_size: Optional[int] = None,
save_snapshots: Optional[str] = None,
with_memory: bool = False):
(bpf, pid, process) = attach_probes(pid, process, command, alloc_sample_every_n, max_alloc_size, min_alloc_size,
with_memory)

harvest_ebpf(bpf, pid, process, top, interval, min_age_ns, save_snapshots)


def attach_probes(
pid: int = -1,
process: Optional[spawn] = None,
Expand All @@ -404,7 +266,7 @@ def attach_probes(
max_alloc_size: Optional[int] = None,
min_alloc_size: Optional[int] = None,
with_memory: bool = False,
syscall_stacks: bool = False,
syscall_details: bool = False,
communicate_with_signals: bool = False
) -> Optional[Tuple[BPF, int, spawn]]:
if pid == -1 and command is None and process is None:
Expand All @@ -431,7 +293,7 @@ def attach_probes(
f"-DPAGE_SIZE={resource.getpagesize()}",
f"-DFILTER_BY_SIZE={get_size_filter(min_alloc_size, max_alloc_size)}",
"-DWITH_MEMORY" if with_memory else "",
"-DCOLLECT_SYSCALL_STACK_INFO" if syscall_stacks else "",
"-DCOLLECT_SYSCALL_STACK_INFO" if syscall_details else "",
])

# Attaching probes
Expand All @@ -445,7 +307,6 @@ def attach_probes(
sleep(wait_interval)
wait_time += wait_interval
if process.terminated:
print(process.readline().decode('utf-8'))
raise Exception(f'Process is already terminated, with status code {process.exitstatus}')
if wait_time > timeout:
raise Exception('Process is not alive')
Expand Down Expand Up @@ -503,16 +364,6 @@ def sleep_and_check(
break


def print_ebpf_info(bpf, pid, min_age_ns, top):
print_time()
print_memory_statistics(bpf, pid, top)
print_outstanding(bpf, pid, top, min_age_ns)
print_outstanding_kernel_cache(bpf, top)
print_syscalls(bpf, top)
print()
sys.stdout.flush()


def harvest_ebpf(
bpf: BPF,
pid: int = -1,
Expand All @@ -521,7 +372,7 @@ def harvest_ebpf(
interval: int = 5,
min_age_ns: int = 500,
with_memory: bool = False,
with_syscall_stacks: bool = False,
with_syscall_details: bool = False,
save_snapshots: Optional[str] = None,
snapshot_prefix: Optional[str] = None,
communicate_with_signals: bool = False,
Expand All @@ -538,10 +389,7 @@ def harvest_ebpf(
sleep_and_check(interval, pid, process, check_delay=0.2)
except KeyboardInterrupt:
break
if save_snapshots:
save_snapshot(bpf, pid, min_age_ns, top, save_snapshots, with_memory, with_syscall_stacks, snapshot_prefix)
else:
print_ebpf_info(bpf, pid, min_age_ns, top)
save_snapshot(bpf, pid, min_age_ns, top, save_snapshots, with_memory, with_syscall_details, snapshot_prefix)
if is_terminated(pid, process):
break

Expand All @@ -551,18 +399,3 @@ def harvest_ebpf(

logging.info("Detaching...")
bpf.cleanup()


if __name__ == "__main__":
arguments = parse()
attach_ebpf(
pid=arguments.pid,
command=arguments.command,
top=arguments.top,
interval=arguments.interval,
alloc_sample_every_n=arguments.alloc_sample_every_n,
max_alloc_size=arguments.max_alloc_size,
min_age_ns=arguments.min_age_ns,
min_alloc_size=arguments.min_alloc_size,
save_snapshots=arguments.save_snapshots,
)
Loading

0 comments on commit 18bb8bb

Please sign in to comment.