diff --git a/.gitignore b/.gitignore index 314bde06..f1ed02af 100644 --- a/.gitignore +++ b/.gitignore @@ -56,3 +56,5 @@ code-review*.bak *.exe *.out *.app + +**/__pycache__/ diff --git a/CMakeLists.txt b/CMakeLists.txt index 2f138cf1..26f58d5d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -131,11 +131,11 @@ add_executable(ucsb_bench ./src/bench.cxx) if(${UCSB_BUILD_USTORE}) # Choose engine. Available engines: UCSET, ROCKSDB, LEVELDB, UDISK - set(USTORE_ENGINE_NAME UCSET) + set(USTORE_ENGINE_NAME UCSET) include("${CMAKE_MODULE_PATH}/ustore.cmake") list(APPEND UCSB_DB_LIBS "ustore") - target_compile_definitions(ucsb_bench PUBLIC UCSB_HAS_USTORE=1) + target_compile_definitions(ucsb_bench PUBLIC UCSB_HAS_USTORE=1) endif() if(${UCSB_BUILD_ROCKSDB}) diff --git a/Dockerfile b/Dockerfile index 140f2847..06249493 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,36 +4,34 @@ MAINTAINER unum@cloud.com WORKDIR ./ -RUN apt-get update +RUN apt update ENV DEBIAN_FRONTEND noninteractive # Install tools RUN apt install -y python3-pip RUN pip3 install cmake -RUN pip3 install conan RUN apt install -y gcc-10 RUN apt install -y g++-10 -RUN apt-get install -y libexplain-dev -RUN apt-get install -y libsnappy-dev -RUN apt-get install -yq pkg-config -RUN apt-get install -y git +RUN apt install -y libexplain-dev +RUN apt install -y libsnappy-dev +RUN apt install -y pkg-config +RUN apt install -y git # Build WiredTiger (latest) -RUN git clone git://github.com/wiredtiger/wiredtiger.git +RUN git clone --depth 1 git@github.com:wiredtiger/wiredtiger.git RUN mkdir ./wiredtiger/build WORKDIR "./wiredtiger/build" -RUN cmake ../. +RUN cmake .. RUN make install WORKDIR / # Build UCSB -RUN git clone https://github.com/unum-cloud/ucsb.git +RUN git clone --depth 1 git@github.com:unum-cloud/ucsb.git WORKDIR "./ucsb/" RUN update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-10 10 RUN update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-10 10 -RUN bash -i setup.sh RUN bash -i build_release.sh -RUN rm -rf ./bench +RUN rm -rf ./bench/results -ENTRYPOINT ["./build_release/bin/ucsb_bench"] +ENTRYPOINT ["./build_release/build/bin/ucsb_bench"] diff --git a/README.md b/README.md index 9dc279a4..46596e6d 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,12 @@ The outputs will be placed in the `bench/results/` folder. git clone https://github.com/unum-cloud/ucsb.git && cd ucsb && ./run.py ``` +### eBPF + +There is an optional eBPF-based profiling, which can be enabled with `--with-ebpf` and `--with-ebpf-memory` flags, +before execution make sure you have `bcc` [installed](https://github.com/iovisor/bcc/blob/master/INSTALL.md) at least version 0.21.0. + + ## Supported Databases Key-Value Stores and NoSQL databases differ in supported operations. diff --git a/cmake/leveldb.cmake b/cmake/leveldb.cmake index ea21444f..9e210799 100644 --- a/cmake/leveldb.cmake +++ b/cmake/leveldb.cmake @@ -27,4 +27,6 @@ if(NOT leveldb_POPULATED) add_subdirectory(${leveldb_SOURCE_DIR} ${leveldb_BINARY_DIR} EXCLUDE_FROM_ALL) endif() +add_compile_options(-g) + include_directories(${leveldb_SOURCE_DIR}/include) diff --git a/ebpf/ebpf.py b/ebpf/ebpf.py new file mode 100644 index 00000000..913dd1d1 --- /dev/null +++ b/ebpf/ebpf.py @@ -0,0 +1,401 @@ +#!/usr/bin/env python3 + +import json +import logging +import os +import resource +import signal +from time import sleep, time +from typing import Optional, Tuple + +import pexpect +from bcc import BPF +from bcc.syscall import syscall_name +from pexpect import spawn + +logging.basicConfig(filename='/tmp/ebpf.log', encoding='utf-8', level=logging.DEBUG) + + +class SetEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, set): + return list(obj) + return json.JSONEncoder.default(self, obj) + + +def get_size_filter(min_size, max_size): + if min_size is not None and max_size is not None: + return "if (size < %d || size > %d) return 0;" % (min_size, max_size) + elif min_size is not None: + return "if (size < %d) return 0;" % min_size + elif max_size is not None: + return "if (size > %d) return 0;" % max_size + else: + return "" + + +class Allocation(object): + def __init__(self, stack, size, address): + self.stack = stack + self.count = 1 + self.size = size + self.address = address + + def update(self, size): + self.count += 1 + self.size += size + + +def get_outstanding(bpf, pid, min_age_ns, top): + alloc_info = {} + allocs = bpf["allocs"] + stack_traces = bpf["stack_traces"] + for address, info in sorted(allocs.items(), key=lambda a: a[1].size): + if BPF.monotonic_time() - min_age_ns < info.timestamp_ns or info.stack_id < 0: + continue + if info.stack_id in alloc_info: + alloc_info[info.stack_id].update(info.size) + else: + stack = list(stack_traces.walk(info.stack_id)) + combined = [] + for addr in stack: + func_name = bpf.sym(addr, pid, show_module=True, show_offset=True) + formatted_address = ('0x' + format(addr, '016x') + ' ').encode('utf-8') + combined.append(formatted_address + func_name) + alloc_info[info.stack_id] = Allocation(combined, info.size, address.value) + + sorted_stacks = sorted(alloc_info.values(), key=lambda a: -a.size)[:top] + return list( + map(lambda alloc: {'stack': [s.decode('ascii') for s in alloc.stack], 'size': alloc.size, 'count': alloc.count}, + sorted_stacks)) + + +class CombinedAlloc(object): + def __init__(self, item): + self.stack_id = item[0] + self.free_size = item[1].free_size + self.alloc_size = item[1].alloc_size + self.number_of_frees = item[1].number_of_frees + self.number_of_allocs = item[1].number_of_allocs + + def key(self): + return self.alloc_size - self.free_size + + def __str__(self): + return f"CombinedAlloc(stack_id={self.stack_id},\n" \ + f"\t free_size={self.free_size},\n" \ + f"\t alloc_size={self.alloc_size},\n" \ + f"\t number_of_frees={self.number_of_frees},\n" \ + f"\t number_of_allocs={self.number_of_allocs})\n" + + def __repr__(self): + return self.__str__() + + def is_positive(self): + return self.alloc_size > self.free_size + + +def get_statistics(bpf, pid, min_age_ns, top): + stack_traces = bpf["stack_traces"] + combined_alloc = list( + sorted( + map(CombinedAlloc, bpf["combined_allocs"].items()), + key=lambda a: a.key(), + ) + ) + memory = sum((c.alloc_size - c.free_size for c in combined_alloc)) + entries = [] + for allocation in combined_alloc: + entries.append({ + 'alloc_size': allocation.alloc_size, + 'free_size': allocation.free_size, + 'number_of_allocs': allocation.number_of_allocs, + 'number_of_frees': allocation.number_of_frees, + 'trace': get_trace_info_as_list(bpf, pid, stack_traces, allocation.stack_id.value), + }) + return { + "memory": memory, + "combined_allocs": list(reversed(entries)), + "stack_traces": len(list(stack_traces.items())), + "outstanding": get_outstanding(bpf, pid, min_age_ns, top) + } + + +def get_trace_info(bpf, pid, stack_traces, stack_id): + trace = [] + for addr in walk_trace(stack_traces, stack_id): + sym = bpf.sym(addr, pid, show_module=False, show_offset=True) + trace.append(sym.decode()) + + trace = "\n\t\t".join(trace) + if not trace: + trace = "stack information lost" + return trace + + +def get_trace_info_as_list(bpf, pid, stack_traces, stack_id): + trace = [] + for addr in walk_trace(stack_traces, stack_id): + sym = bpf.sym(addr, pid, show_module=False, show_offset=True) + trace.append(sym.decode()) + + return trace + + +def walk_trace(stack_traces, stack_id): + try: + return stack_traces.walk(stack_id) + except KeyError: + return [] + + +def gernel_kernel_cache(bpf, top): + kernel_cache_allocs = list( + sorted(filter(lambda a: a[1].alloc_size > a[1].free_size, bpf['kernel_cache_counts'].items()), + key=lambda a: a[1].alloc_size - a[1].free_size) + )[:top] + if not kernel_cache_allocs: + return + caches = [] + to_remove = [] + for (k, v) in kernel_cache_allocs: + caches.append({ + 'name': str(k.name, "utf-8"), + 'alloc_count': v.alloc_count, + 'free_count': v.free_count, + 'alloc_size': v.alloc_size, + 'free_size': v.free_size, + }) + if v.alloc_count >= v.free_count: + to_remove.append(k) + if len(to_remove) > 0: + arr = (type(to_remove[0]) * len(to_remove))(*to_remove) + bpf['kernel_cache_counts'].items_delete_batch(arr) + return caches + + +def get_syscalls(bpf): + syscall_counts = bpf["syscall_counts"] + syscalls = {} + for k, v in syscall_counts.items(): + key = k.value + syscall_id = key >> 32 + thread_id = ((1 << 32) - 1) & key + + syscalls.setdefault(thread_id, []).append({ + 'name': system_call_name(syscall_id), + 'count': v.count, + 'total_ns': v.total_ns, + }) + syscall_counts.clear() + return syscalls + + +def system_call_name(k): + if k == 435: + return "clone3" + return syscall_name(k).decode('ascii') + + +def get_additional_syscall_info(bpf, pid): + syscall_counts_stacks = bpf['syscall_counts_stacks'] + syscalls_per_thread = {} + tid_to_syscall_timestamps = {} + for k, v in syscall_counts_stacks.items(): + thread_id = ((1 << 32) - 1) & v.pid_tgid + stack_dict = syscalls_per_thread.setdefault(thread_id, {}).setdefault(system_call_name(v.id), {}) + stack_trace = '\n\t'.join(get_trace_info_as_list(bpf, pid, bpf['stack_traces'], v.stack_id)) + stack_dict[stack_trace] = stack_dict.get(stack_trace, 0) + 1 + tid_to_syscall_timestamps.setdefault(thread_id, []).append(k.value) + + syscalls_per_thread = {thread_id: { + call_name: [ + { + 'stack_trace': trace.split('\n\t'), + 'count': count, + } for (trace, count) in per_syscall.items() + ] for (call_name, per_syscall) in calls.items()} + for (thread_id, calls) in syscalls_per_thread.items()} + return { + 'syscall_stacks': syscalls_per_thread, + 'tid_to_syscall_timestamps': tid_to_syscall_timestamps + } + + +def save_snapshot( + bpf: BPF, + pid: int, + min_age_ns: int, + top: int, + snapshot_dir: str, + with_memory: bool, + with_syscall_details: bool, + snapshot_prefix: Optional[str] = None +): + current_time_millis = int(round(time() * 1000)) + snapshot = {'time': current_time_millis} + + if with_memory: + snapshot['memory_stats'] = get_statistics(bpf, pid, min_age_ns, top) + snapshot['kernel_caches'] = gernel_kernel_cache(bpf, top) + + snapshot['syscalls'] = get_syscalls(bpf) + + if with_syscall_details: + snapshot['syscall_details'] = get_additional_syscall_info(bpf, pid) + + os.makedirs(snapshot_dir, exist_ok=True) + with open(get_result_file_name(snapshot_dir, snapshot_prefix or 'snapshot', 'json'), 'w') as outfile: + json.dump(snapshot, outfile, cls=SetEncoder) + + +def get_result_file_name(dir_name: str, prefix: str, suffix: str): + index = 0 + path = f'{dir_name}/{prefix}.{suffix}' + while os.path.isfile(path): + index += 1 + path = f'{dir_name}/{prefix}_{index}.{suffix}' + return path + + +def attach_probes( + pid: int = -1, + process: Optional[spawn] = None, + command: Optional[str] = None, + alloc_sample_every_n: int = 1, + max_alloc_size: Optional[int] = None, + min_alloc_size: Optional[int] = None, + with_memory: bool = False, + syscall_details: bool = False, + communicate_with_signals: bool = False +) -> Optional[Tuple[BPF, int, spawn]]: + if pid == -1 and command is None and process is None: + logging.info("Either pid or command or process must be specified") + return + + if command is not None: + logging.info(f"Executing '{command}' and tracing the resulting process.") + process = pexpect.spawn(command) + pid = process.pid + elif process is not None: + pid = process.pid + + if communicate_with_signals: + signal.signal(signal.SIGUSR2, signal_handler) + + # Constructing probes + bpf = BPF(src_file='./ebpf/ebpf_main.c', + usdt_contexts=[], + cflags=[ + "-Wno-macro-redefined", + f"-DPROCESS_ID={pid}", + f"-DSAMPLE_EVERY_N={alloc_sample_every_n}", + f"-DPAGE_SIZE={resource.getpagesize()}", + f"-DFILTER_BY_SIZE={get_size_filter(min_alloc_size, max_alloc_size)}", + "-DWITH_MEMORY" if with_memory else "", + "-DCOLLECT_SYSCALL_STACK_INFO" if syscall_details else "", + ]) + + # Attaching probes + + logging.info(f"Attaching to pid {pid}") + + wait_time = 0 + wait_interval = 0.01 + timeout = 5 + while not process.isalive(): + sleep(wait_interval) + wait_time += wait_interval + if process.terminated: + raise Exception(f'Process is already terminated, with status code {process.exitstatus}') + if wait_time > timeout: + raise Exception('Process is not alive') + + if with_memory: + for sym in ["malloc", "calloc", "realloc", "mmap", "posix_memalign", "valloc", "memalign", "pvalloc", + "aligned_alloc"]: + bpf.attach_uprobe(name="c", sym=sym, fn_name=sym + "_enter", pid=pid) + bpf.attach_uretprobe(name="c", sym=sym, fn_name=sym + "_exit", pid=pid) + + bpf.attach_uprobe(name="c", sym="free", fn_name="free_enter", pid=pid) + bpf.attach_uprobe(name="c", sym="munmap", fn_name="munmap_enter", pid=pid) + + # kernel cache probes + bpf.attach_kprobe(event='kmem_cache_alloc_lru', fn_name='trace_cache_alloc') + bpf.attach_kprobe(event='kmem_cache_alloc_bulk', fn_name='trace_cache_alloc') + bpf.attach_kprobe(event='kmem_cache_alloc_node', fn_name='trace_cache_alloc') + + bpf.attach_kprobe(event='kmem_cache_free', fn_name='trace_cache_free') + bpf.attach_kprobe(event='kmem_cache_free_bulk', fn_name='trace_cache_free') + + return bpf, pid, process + + +SIGUSR2_received = False + + +def signal_handler(_sig_no, _stack_frame): + global SIGUSR2_received + SIGUSR2_received = True + + +def is_terminated(pid: int, process: Optional[spawn]): + if process is None: + try: + # Sending signal 0 to a process with id {pid} will raise OSError if the process does not exist + os.kill(pid, 0) + except OSError: + return True + else: + return False + return SIGUSR2_received or not process.isalive() + + +def sleep_and_check( + interval: float, + pid: int, + process: Optional[spawn], + check_delay: float +): + wait_til = time() + interval + while time() < wait_til: + sleep(check_delay) + if is_terminated(pid, process): + break + + +def harvest_ebpf( + bpf: BPF, + pid: int = -1, + process: Optional[spawn] = None, + top: int = 10, + interval: int = 5, + min_age_ns: int = 500, + with_memory: bool = False, + with_syscall_details: bool = False, + save_snapshots: Optional[str] = None, + snapshot_prefix: Optional[str] = None, + communicate_with_signals: bool = False, +): + if pid == -1 and process is None: + raise ValueError("Either pid or process must be specified") + + if process is not None: + pid = process.pid + + while True: + logging.info(f"Sleeping for {interval} seconds...") + try: + sleep_and_check(interval, pid, process, check_delay=0.2) + except KeyboardInterrupt: + break + save_snapshot(bpf, pid, min_age_ns, top, save_snapshots, with_memory, with_syscall_details, snapshot_prefix) + if is_terminated(pid, process): + break + + if communicate_with_signals: + # Sending SIGUSR1 to the process will notify that the tracing is done + os.kill(pid, signal.SIGUSR1) + + logging.info("Detaching...") + bpf.cleanup() diff --git a/ebpf/ebpf_main.c b/ebpf/ebpf_main.c new file mode 100644 index 00000000..2c7ea57f --- /dev/null +++ b/ebpf/ebpf_main.c @@ -0,0 +1,449 @@ +#include +#include +#include +#include +#include +#include + +/** + * bpf_get_current_pid_tgid may be confusing so here is a short explanation: + * it returns + * tgid << 32 | pid + * here pid is the thread id, and tgid (thread group id) is the process id + * so we need to shift by 32 bits to get the process id + */ +#define FILTER_BY_PID u32 __pid = bpf_get_current_pid_tgid() >> 32;if (__pid != PROCESS_ID) {return 0;} + +BPF_STACK_TRACE(stack_traces, 10000); + +#ifdef WITH_MEMORY + +struct alloc_info_t { + u64 size; + u64 timestamp_ns; + int stack_id; +}; + +struct combined_alloc_info_t { + u64 alloc_size; + u64 free_size; + u64 number_of_allocs; + u64 number_of_frees; +}; + +// count of allocations per stack trace +BPF_HASH(combined_allocs, u64, struct combined_alloc_info_t, 10000); + +BPF_HASH(sizes, u64); +BPF_HASH(allocs, u64, struct alloc_info_t, 10000); +BPF_HASH(memptrs, u64, u64); + +static inline void update_statistics_add(u64 stack_id, u64 sz) { + struct combined_alloc_info_t *existing_cinfo; + struct combined_alloc_info_t cinfo = {0}; + + existing_cinfo = combined_allocs.lookup(&stack_id); + if (existing_cinfo != 0) + cinfo = *existing_cinfo; + + lock_xadd(&cinfo.alloc_size, sz); + lock_xadd(&cinfo.number_of_allocs, 1); + + combined_allocs.update(&stack_id, &cinfo); +} + +static inline void update_statistics_del(u64 stack_id, u64 sz) { + struct combined_alloc_info_t *existing_cinfo; + struct combined_alloc_info_t cinfo = {0}; + + existing_cinfo = combined_allocs.lookup(&stack_id); + if (existing_cinfo != 0) + cinfo = *existing_cinfo; + + lock_xadd(&cinfo.free_size, sz); + lock_xadd(&cinfo.number_of_frees, 1); + + + combined_allocs.update(&stack_id, &cinfo); +} + +static inline int gen_alloc_enter(struct pt_regs *ctx, size_t size) { + FILTER_BY_PID + + FILTER_BY_SIZE + if (SAMPLE_EVERY_N > 1) { + u64 ts = bpf_ktime_get_ns(); + if (ts % SAMPLE_EVERY_N != 0) + return 0; + } + + u64 pid = bpf_get_current_pid_tgid(); + u64 size64 = size; + sizes.update(&pid, &size64); + + return 0; +} + +static inline int gen_alloc_exit2(struct pt_regs *ctx, u64 address) { + FILTER_BY_PID + + u64 pid = bpf_get_current_pid_tgid(); + u64 * size64 = sizes.lookup(&pid); + struct alloc_info_t info = {0}; + + if (size64 == 0) + return 0; // missed alloc entry + + info.size = *size64; + sizes.delete(&pid); + + if (address != 0) { + info.timestamp_ns = bpf_ktime_get_ns(); + info.stack_id = stack_traces.get_stackid(ctx, BPF_F_USER_STACK); + allocs.update(&address, &info); + update_statistics_add(info.stack_id, info.size); + } + + return 0; +} + +static inline int gen_alloc_exit(struct pt_regs *ctx) { + return gen_alloc_exit2(ctx, PT_REGS_RC(ctx)); +} + +static inline int gen_free_enter(struct pt_regs *ctx, void *address) { + FILTER_BY_PID + + u64 addr = (u64)address; + struct alloc_info_t *info = allocs.lookup(&addr); + if (info == 0) + return 0; + + allocs.delete(&addr); + update_statistics_del(info->stack_id, info->size); + + return 0; +} + +/** Probes */ + +int malloc_enter(struct pt_regs *ctx, size_t size) { + return gen_alloc_enter(ctx, size); +} + +int malloc_exit(struct pt_regs *ctx) { return gen_alloc_exit(ctx); } + +int free_enter(struct pt_regs *ctx, void *address) { + return gen_free_enter(ctx, address); +} + +int calloc_enter(struct pt_regs *ctx, size_t nmemb, size_t size) { + return gen_alloc_enter(ctx, nmemb * size); +} + +int calloc_exit(struct pt_regs *ctx) { return gen_alloc_exit(ctx); } + +int realloc_enter(struct pt_regs *ctx, void *ptr, size_t size) { + gen_free_enter(ctx, ptr); + return gen_alloc_enter(ctx, size); +} + +int realloc_exit(struct pt_regs *ctx) { return gen_alloc_exit(ctx); } + +int mmap_enter(struct pt_regs *ctx) { + size_t size = (size_t) PT_REGS_PARM2(ctx); + return gen_alloc_enter(ctx, size); +} + +int mmap_exit(struct pt_regs *ctx) { return gen_alloc_exit(ctx); } + +int munmap_enter(struct pt_regs *ctx, void *address) { + return gen_free_enter(ctx, address); +} + +int posix_memalign_enter(struct pt_regs *ctx, void **memptr, size_t alignment, + size_t size) { + u64 memptr64 = (u64)(size_t)memptr; + u64 pid = bpf_get_current_pid_tgid(); + + memptrs.update(&pid, &memptr64); + return gen_alloc_enter(ctx, size); +} + +int posix_memalign_exit(struct pt_regs *ctx) { + u64 pid = bpf_get_current_pid_tgid(); + u64 *memptr64 = memptrs.lookup(&pid); + void *addr; + + if (memptr64 == 0) + return 0; + + memptrs.delete(&pid); + + if (bpf_probe_read_user(&addr, sizeof(void *), (void *) (size_t) * memptr64)) + return 0; + + u64 addr64 = (u64)(size_t) addr; + return gen_alloc_exit2(ctx, addr64); +} + +int aligned_alloc_enter(struct pt_regs *ctx, size_t alignment, size_t size) { + return gen_alloc_enter(ctx, size); +} + +int aligned_alloc_exit(struct pt_regs *ctx) { return gen_alloc_exit(ctx); } + +int valloc_enter(struct pt_regs *ctx, size_t size) { + return gen_alloc_enter(ctx, size); +} + +int valloc_exit(struct pt_regs *ctx) { return gen_alloc_exit(ctx); } + +int memalign_enter(struct pt_regs *ctx, size_t alignment, size_t size) { + return gen_alloc_enter(ctx, size); +} + +int memalign_exit(struct pt_regs *ctx) { return gen_alloc_exit(ctx); } + +int pvalloc_enter(struct pt_regs *ctx, size_t size) { + return gen_alloc_enter(ctx, size); +} + +int pvalloc_exit(struct pt_regs *ctx) { return gen_alloc_exit(ctx); } + +/** Tracepoints + Function names are in tracepoint__{{category}}__{{event}} + alternatively we can use attach_tracepoint function in python api +*/ + +int tracepoint__kmem__kmalloc(struct tracepoint__kmem__kmalloc *args) { + gen_alloc_enter((struct pt_regs *) args, args->bytes_alloc); + return gen_alloc_exit2((struct pt_regs *) args, (size_t) args->ptr); +} + +int tracepoint__kmem__kfree(struct tracepoint__kmem__kfree *args) { + return gen_free_enter((struct pt_regs *) args, (void *) args->ptr); +} + +int tracepoint__kmem__kmem_cache_alloc( + struct tracepoint__kmem__kmem_cache_alloc *args) { + gen_alloc_enter((struct pt_regs *) args, args->bytes_alloc); + return gen_alloc_exit2((struct pt_regs *) args, (size_t) args->ptr); +} + +int tracepoint__kmem__kmem_cache_free( + struct tracepoint__kmem__kmem_cache_free *args) { + return gen_free_enter((struct pt_regs *) args, (void *) args->ptr); +} + +int tracepoint__kmem__mm_page_alloc( + struct tracepoint__kmem__mm_page_alloc *args) { + gen_alloc_enter((struct pt_regs *) args, PAGE_SIZE << args->order); + return gen_alloc_exit2((struct pt_regs *) args, args->pfn); +} + +int tracepoint__kmem__mm_page_free( + struct tracepoint__kmem__mm_page_free *args) { + return gen_free_enter((struct pt_regs *) args, (void *) args->pfn); +} + +int tracepoint__kmem__kmalloc_node(struct tracepoint__kmem__kmalloc_node *args) { + gen_alloc_enter((struct pt_regs *) args, args->bytes_alloc); + return gen_alloc_exit2((struct pt_regs *) args, (size_t) args->ptr); +} + +int tracepoint__kmem__kmem_cache_alloc_node(struct tracepoint__kmem__kmem_cache_alloc_node *args) { + gen_alloc_enter((struct pt_regs *) args, args->bytes_alloc); + return gen_alloc_exit2((struct pt_regs *) args, (size_t) args->ptr); +} + +/** kernel cache */ + +// to resolve undefined error +// taken from bcc slabratetop tool +struct slab { + unsigned long __page_flags; +#if defined(CONFIG_SLAB) + struct kmem_cache *slab_cache; + union { + struct { + struct list_head slab_list; + void *freelist; /* array of free object indexes */ + void *s_mem; /* first object */ + }; + struct rcu_head rcu_head; + }; + unsigned int active; +#elif defined(CONFIG_SLUB) + struct kmem_cache *slab_cache; + union { + struct { + union { + struct list_head slab_list; +#ifdef CONFIG_SLUB_CPU_PARTIAL + struct { + struct slab *next; + int slabs; /* Nr of slabs left */ + }; +#endif + }; + /* Double-word boundary */ + void *freelist; /* first free object */ + union { + unsigned long counters; + struct { + unsigned inuse:16; + unsigned objects:15; + unsigned frozen:1; + }; + }; + }; + struct rcu_head rcu_head; + }; + unsigned int __unused; +#elif defined(CONFIG_SLOB) + struct list_head slab_list; + void *__unused_1; + void *freelist; /* first free block */ + long units; + unsigned int __unused_2; +#else +#error "Unexpected slab allocator configured" +#endif + atomic_t __page_refcount; +#ifdef CONFIG_MEMCG + unsigned long memcg_data; +#endif +}; + +// slab_address() will not be used, and NULL will be returned directly, which +// can avoid adaptation of different kernel versions +static inline void *slab_address(const struct slab *slab) { + return NULL; +} + +#ifdef CONFIG_SLUB + +#include + +#else + +#include + +#endif + +struct key_t { + char name[32]; +}; + +struct val_t { + u64 alloc_count; + u64 alloc_size; + u64 free_count; + u64 free_size; +}; + +BPF_HASH(kernel_cache_counts, +struct key_t, struct val_t); + +int trace_cache_alloc(struct pt_regs *ctx, struct kmem_cache *cachep) { + FILTER_BY_PID + + u64 size = cachep->size; + + FILTER_BY_SIZE + + struct key_t key = {}; + bpf_probe_read_kernel(&key.name, sizeof(key.name), cachep->name); + + struct val_t empty_val_t = {}; + + struct val_t *val = kernel_cache_counts.lookup_or_try_init(&key, &empty_val_t); + if (val) { + val->alloc_count++; + val->alloc_size += size; + } + + return 0; +} + +int trace_cache_free(struct pt_regs *ctx, struct kmem_cache *cachep) { + FILTER_BY_PID + + u64 size = cachep->size; + + FILTER_BY_SIZE + + struct key_t key = {}; + bpf_probe_read_kernel(&key.name, sizeof(key.name), cachep->name); + + struct val_t empty_val_t = {}; + struct val_t *val = kernel_cache_counts.lookup_or_try_init(&key, &empty_val_t); + if (val) { + val->free_count ++; + val->free_size += size; + } + + return 0; +} + +#endif // WITH_MEMORY + + +/** System Calls */ + +struct sys_call_data_t { + u64 count; + u64 total_ns; +}; +BPF_HASH(syscall_start, u64, u64); +BPF_HASH(syscall_counts, u64, struct sys_call_data_t); + +#ifdef COLLECT_SYSCALL_STACK_INFO + +struct sys_call_stack_t { + u32 id; + int stack_id; + u64 pid_tgid; +}; + +BPF_HASH(syscall_counts_stacks, u64, struct sys_call_stack_t, 10000); + +#endif // COLLECT_SYSCALL_STACK_INFO + +int tracepoint__raw_syscalls__sys_enter(struct tracepoint__raw_syscalls__sys_enter *args) { + FILTER_BY_PID + u64 pid_tgid = bpf_get_current_pid_tgid(); + u64 t = bpf_ktime_get_ns(); + syscall_start.update(&pid_tgid, &t); +#ifdef COLLECT_SYSCALL_STACK_INFO + struct sys_call_stack_t zero = {}; + struct sys_call_stack_t* data = syscall_counts_stacks.lookup_or_try_init(&t, &zero); + if (data) { + data->id = args->id; + data->stack_id = stack_traces.get_stackid(args, BPF_F_USER_STACK); + data->pid_tgid = pid_tgid; + } +#endif // COLLECT_SYSCALL_STACK_INFO + return 0; +} + +int tracepoint__raw_syscalls__sys_exit(struct tracepoint__raw_syscalls__sys_exit *args) { + FILTER_BY_PID + + u64 pid_tgid = bpf_get_current_pid_tgid(); + + struct sys_call_data_t *val, zero = {}; + u64 *start_ns = syscall_start.lookup(&pid_tgid); + if (!start_ns) + return 0; + u64 syscall_id = args->id; + u32 thread_id = pid_tgid; + u64 key = (syscall_id << 32) | thread_id; + val = syscall_counts.lookup_or_try_init(&key, &zero); + if (val) { + lock_xadd(&val->count, 1); + lock_xadd(&val->total_ns, bpf_ktime_get_ns() - *start_ns); + } + return 0; +} \ No newline at end of file diff --git a/run.py b/run.py index 35364d8a..dbe638e8 100755 --- a/run.py +++ b/run.py @@ -1,13 +1,16 @@ #!/usr/bin/env python3 +import argparse import os -import sys -import time +import pathlib import shutil import signal +import sys +import time +from threading import Thread +from typing import List + import pexpect -import pathlib -import argparse import termcolor """ @@ -53,6 +56,9 @@ drop_caches = False cleanup_previous = False run_in_docker_container = False +with_ebpf = False +with_ebpf_memory = False +with_syscall_details = False main_dir_path = "./db_main/" storage_disk_paths = [ @@ -76,21 +82,20 @@ def get_db_main_dir_path(db_name: str, size: str, main_dir_path: str) -> str: return os.path.join(main_dir_path, db_name, size, "") -def get_db_storage_dir_paths(db_name: str, size: str, storage_disk_paths: str) -> list: +def get_db_storage_dir_paths(db_name: str, size: str, storage_disk_paths: List[str]) -> list: db_storage_dir_paths = [] for storage_disk_path in storage_disk_paths: db_storage_dir_paths.append(os.path.join(storage_disk_path, db_name, size, "")) return db_storage_dir_paths -def get_results_file_path( - db_name: str, - size: str, - drop_caches: bool, - transactional: bool, - storage_disk_paths: str, - threads_count: int, -) -> str: +def get_results_file_path(db_name: str, + size: str, + drop_caches: bool, + transactional: bool, + storage_disk_paths: List[str], + threads_count: int, + ) -> str: root_dir_path = "" if drop_caches: if transactional: @@ -128,18 +133,20 @@ def drop_system_caches(): def run( - db_name: str, - size: str, - workload_names: list, - main_dir_path: str, - storage_disk_paths: str, - transactional: bool, - drop_caches: bool, - run_in_docker_container: bool, - threads_count: bool, - run_index: int, - runs_count: int, -) -> None: + db_name: str, + size: str, + workload_names: list, + main_dir_path: str, + storage_disk_paths: List[str], + transactional: bool, + drop_caches: bool, + run_in_docker_container: bool, + threads_count: int, + run_index: int, + runs_count: int, + with_ebpf: bool, + with_ebpf_memory: bool, + with_syscall_details: bool) -> None: db_config_file_path = get_db_config_file_path(db_name, size) workloads_file_path = get_workloads_file_path(size) db_main_dir_path = get_db_main_dir_path(db_name, size, main_dir_path) @@ -149,6 +156,7 @@ def run( ) transactional_flag = "-t" if transactional else "" + lazy_flag = "-l" if with_ebpf else "" filter = ",".join(workload_names) db_storage_dir_paths = ",".join(db_storage_dir_paths) @@ -161,8 +169,32 @@ def run( raise Exception("First, please build the runner: `build_release.sh`") process = pexpect.spawn( - f'{runner} -db {db_name} {transactional_flag} -cfg "{db_config_file_path}" -wl "{workloads_file_path}" -md "{db_main_dir_path}" -sd "{db_storage_dir_paths}" -res "{results_file_path}" -th {threads_count} -fl {filter} -ri {run_index} -rc {runs_count}' + f'{runner} -db {db_name} {transactional_flag} {lazy_flag} -cfg "{db_config_file_path}" -wl "{workloads_file_path}" -md "{db_main_dir_path}" -sd "{db_storage_dir_paths}" -res "{results_file_path}" -th {threads_count} -fl {filter} -ri {run_index} -rc {runs_count}' ) + thread = None + if with_ebpf: + from ebpf.ebpf import attach_probes, harvest_ebpf + bpf, pid, process = attach_probes( + process=process, + syscall_details=with_syscall_details, + with_memory=with_ebpf_memory, + communicate_with_signals=True, + ) + # Send SIGUSR1 to the process to notify it that the probes are attached + os.kill(process.pid, signal.SIGUSR1) + thread = Thread(target=harvest_ebpf, args=(bpf,), kwargs={ + process: process, + "interval": 5, + "with_memory": with_ebpf_memory, + "with_syscall_details": with_syscall_details, + "snapshot_prefix": "-".join(workload_names), + "save_snapshots": f"./bench/ebpf/snapshots/{db_name}_{size}", + "communicate_with_signals": True, + }) + thread.start() + if with_ebpf: + thread.join() + process.interact() process.close() @@ -189,6 +221,9 @@ def parse_args(): global cleanup_previous global drop_caches global run_in_docker_container + global with_ebpf + global with_ebpf_memory + global with_syscall_details parser = argparse.ArgumentParser() @@ -268,6 +303,30 @@ def parse_args(): action=argparse.BooleanOptionalAction, default=run_in_docker_container, ) + parser.add_argument( + "-eb", + "--with-ebpf", + help="Runs ebpf benchmarks", + default=with_ebpf, + dest="with_ebpf", + action=argparse.BooleanOptionalAction + ) + parser.add_argument( + "-em", + "--with-ebpf-memory", + help="Enable memory related ebpf benchmarks", + default=with_ebpf_memory, + dest="with_ebpf_memory", + action=argparse.BooleanOptionalAction + ) + parser.add_argument( + "-es", + "--with-ebpf-syscall-details", + help="Collect eBPF syscall stack traces", + default=with_syscall_details, + dest="with_syscall_details", + action=argparse.BooleanOptionalAction + ) args = parser.parse_args() db_names = args.db_names @@ -280,6 +339,9 @@ def parse_args(): cleanup_previous = args.cleanup_previous drop_caches = args.drop_caches run_in_docker_container = args.run_docker + with_ebpf = args.with_ebpf + with_ebpf_memory = args.with_ebpf_memory + with_syscall_details = args.with_syscall_details def check_args(): @@ -293,6 +355,10 @@ def check_args(): sys.exit("Database size(s) not specified") if not workload_names: sys.exit("Workload name(s) not specified") + if run_in_docker_container and with_ebpf: + sys.exit("Running ebpf benchmarks in docker container is not supported") + if with_ebpf_memory and not with_ebpf: + sys.exit("Memory related ebpf benchmarks require ebpf benchmarks to be enabled, run with --with-ebpf flag") def main() -> None: @@ -364,6 +430,9 @@ def main() -> None: threads_count, i, len(workload_names), + with_ebpf, + with_ebpf_memory, + with_syscall_details ) else: run( @@ -378,6 +447,9 @@ def main() -> None: threads_count, 0, 1, + with_ebpf, + with_ebpf_memory, + with_syscall_details ) diff --git a/src/bench.cxx b/src/bench.cxx index 9f5f3900..24cf0af8 100644 --- a/src/bench.cxx +++ b/src/bench.cxx @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -37,6 +38,7 @@ void parse_and_validate_args(int argc, char* argv[], settings_t& settings) { argparse::ArgumentParser program(argv[0]); program.add_argument("-db", "--db-name").required().help("Database name"); program.add_argument("-t", "--transaction").default_value(false).implicit_value(true).help("Transactional"); + program.add_argument("-l", "--lazy").default_value(false).implicit_value(true).help("Wait for a SIGUSR1"); program.add_argument("-cfg", "--config-path").required().help("Database configuration file path"); program.add_argument("-wl", "--workload-path").required().help("Workloads file path"); program.add_argument("-res", "--results-path").required().help("Results file path"); @@ -53,6 +55,7 @@ void parse_and_validate_args(int argc, char* argv[], settings_t& settings) { settings.db_name = program.get("db-name"); settings.transactional = program.get("transaction"); + settings.lazy = program.get("lazy"); settings.db_config_file_path = program.get("config-path"); settings.workloads_file_path = program.get("workload-path"); settings.results_file_path = program.get("results-path"); @@ -502,6 +505,21 @@ void bench(bm::State& state, workload_t const& workload, db_t& db, bool transact } } +void wait_for_SIGUSR1() { + int sig; + sigset_t set; + + // Create a signal set containing SIGUSR1 + sigemptyset(&set); + sigaddset(&set, SIGUSR1); + + // Block all signals in the set so that they don't terminate the program + sigprocmask(SIG_BLOCK, &set, NULL); + + // Wait for SIGUSR1 signal + sigwait(&set, &sig); +} + int main(int argc, char** argv) { try { @@ -509,6 +527,10 @@ int main(int argc, char** argv) { settings_t settings; parse_and_validate_args(argc, argv, settings); + if (settings.lazy) { + wait_for_SIGUSR1(); + } + // Resolve results paths fs::path final_results_file_path = settings.results_file_path; if (final_results_file_path.string().back() == '/') @@ -597,6 +619,14 @@ int main(int argc, char** argv) { file_reporter_t::merge_results(in_progress_results_file_path, final_results_file_path); fs::remove(in_progress_results_file_path); + + if (settings.lazy) { + __pid_t parent_pid = getppid(); + // Notify parent that we finished + kill(parent_pid, SIGUSR2); + wait_for_SIGUSR1(); + } + } catch (exception_t const& ex) { fmt::print("UCSB exception: {}\n", ex.what()); diff --git a/src/core/settings.hpp b/src/core/settings.hpp index 96703cfd..37de7622 100644 --- a/src/core/settings.hpp +++ b/src/core/settings.hpp @@ -9,6 +9,7 @@ namespace ucsb { struct settings_t { std::string db_name; bool transactional = false; + bool lazy = false; fs::path db_config_file_path; fs::path db_main_dir_path; std::vector db_storage_dir_paths; diff --git a/src/leveldb/leveldb.hpp b/src/leveldb/leveldb.hpp index 4b72918d..e0aca0e3 100644 --- a/src/leveldb/leveldb.hpp +++ b/src/leveldb/leveldb.hpp @@ -195,7 +195,7 @@ operation_result_t leveldb_t::remove(key_t key) { operation_result_t leveldb_t::read(key_t key, value_span_t value) const { - // Unlike RocksDB, we can't read into some form fo a `PinnableSlice`, + // Unlike RocksDB, we can't read into some form of a `PinnableSlice`, // just `std::string`, causing heap allocations. std::string data; leveldb::Status status = db_->Get(read_options_, to_slice(key), &data);