From 6979302f36b33bf963190ad864271daf828546e0 Mon Sep 17 00:00:00 2001 From: Nikita Baksalyar Date: Thu, 14 Apr 2022 02:06:21 +0100 Subject: [PATCH] Update RingBufs API - allow referring individual maps This commit introduces a change to the way RingBuf maps are handled. Previously, they were coalesced with PerfEventArray maps so if a user wanted to handle different maps in different async tasks, they needed to redirect events to e.g. mpsc queues to handle them separately. This change simplifies the API and implementation by allowing to refer to BPF maps individually through a corresponding key in a `HashMap`. Signed-off-by: Nikita Baksalyar --- redbpf/src/load/loader.rs | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/redbpf/src/load/loader.rs b/redbpf/src/load/loader.rs index c49f90c3..4ec0eb9b 100644 --- a/redbpf/src/load/loader.rs +++ b/redbpf/src/load/loader.rs @@ -7,6 +7,7 @@ use futures::channel::mpsc; use futures::prelude::*; +use std::collections::HashMap; use std::convert::AsRef; use std::fs; use std::io; @@ -44,6 +45,7 @@ impl Loader { let online_cpus = cpus::get_online().unwrap(); let (sender, receiver) = mpsc::unbounded(); + let mut ringbufs = HashMap::new(); // bpf_map_type_BPF_MAP_TYPE_PERF_EVENT_ARRAY = 4 for m in module.maps.iter_mut().filter(|m| m.kind == 4) { @@ -65,17 +67,13 @@ impl Loader { let name = m.name.clone(); let map = RingBufMap::bind(m).unwrap(); let stream = RingBufMessageStream::new(name.clone(), map); - let mut s = sender.clone(); - let fut = stream.for_each(move |events| { - s.start_send((name.clone(), events)).unwrap(); - future::ready(()) - }); - tokio::spawn(fut); + ringbufs.insert(name.clone(), stream); } Ok(Loaded { module, events: receiver, + ringbufs, }) } @@ -108,6 +106,25 @@ pub struct Loaded { /// # }; /// ``` pub events: mpsc::UnboundedReceiver<(String, ::Item)>, + + /// Streams of events emitted by BPF ring buffer maps. + /// Each key in the map corresponds to a map name. + /// + /// # Example + /// + /// ``` + /// use std::path::Path; + /// use futures::stream::StreamExt; + /// use redbpf::load::Loader; + /// # async { + /// let mut loader = Loader::load_file(&Path::new("probe.elf")).unwrap(); + /// let mut ringbuf = loader.ringbufs.get("ring_buffer"); + /// while let Some(event) = ringbuf.next().await { + /// // ... + /// } + /// # }; + /// ``` + pub ringbufs: HashMap, } impl Loaded {