diff --git a/redbpf/src/load/loader.rs b/redbpf/src/load/loader.rs index c49f90c3..4ec0eb9b 100644 --- a/redbpf/src/load/loader.rs +++ b/redbpf/src/load/loader.rs @@ -7,6 +7,7 @@ use futures::channel::mpsc; use futures::prelude::*; +use std::collections::HashMap; use std::convert::AsRef; use std::fs; use std::io; @@ -44,6 +45,7 @@ impl Loader { let online_cpus = cpus::get_online().unwrap(); let (sender, receiver) = mpsc::unbounded(); + let mut ringbufs = HashMap::new(); // bpf_map_type_BPF_MAP_TYPE_PERF_EVENT_ARRAY = 4 for m in module.maps.iter_mut().filter(|m| m.kind == 4) { @@ -65,17 +67,13 @@ impl Loader { let name = m.name.clone(); let map = RingBufMap::bind(m).unwrap(); let stream = RingBufMessageStream::new(name.clone(), map); - let mut s = sender.clone(); - let fut = stream.for_each(move |events| { - s.start_send((name.clone(), events)).unwrap(); - future::ready(()) - }); - tokio::spawn(fut); + ringbufs.insert(name.clone(), stream); } Ok(Loaded { module, events: receiver, + ringbufs, }) } @@ -108,6 +106,25 @@ pub struct Loaded { /// # }; /// ``` pub events: mpsc::UnboundedReceiver<(String, ::Item)>, + + /// Streams of events emitted by BPF ring buffer maps. + /// Each key in the map corresponds to a map name. + /// + /// # Example + /// + /// ``` + /// use std::path::Path; + /// use futures::stream::StreamExt; + /// use redbpf::load::Loader; + /// # async { + /// let mut loader = Loader::load_file(&Path::new("probe.elf")).unwrap(); + /// let mut ringbuf = loader.ringbufs.get("ring_buffer"); + /// while let Some(event) = ringbuf.next().await { + /// // ... + /// } + /// # }; + /// ``` + pub ringbufs: HashMap, } impl Loaded {