Skip to content

Commit 13c79fe

Browse files
authored
Buffering & Aligning Support (copper-project#114)
* Aligner task This task gets several inputs and aligns a matching window of time for all of them. * WIP * Fixed the horizon test. * Split purge & update in 2 separate operations. * moved the library support in a sub module. * cleanup the API, name, now not necessary * Adds a copper friendly arrayvec for CuMsg This will be a common pattern to have a partially filled spot on a message that has been preallocated. * Adapt to the new CuArray, adds a missing spot in the aligner too. * cargo fmt * Added some documentation * Some clarification on performance * not sure why this went away * Added ignore the the doctest it is a nightmare
1 parent 3a40ad7 commit 13c79fe

File tree

10 files changed

+577
-3
lines changed

10 files changed

+577
-3
lines changed

Diff for: Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ members = [
2424
"components/sources/cu_vlp16",
2525
"components/sources/cu_wt901",
2626
"components/sources/cu_rp_encoder",
27+
"components/tasks/cu_aligner",
2728
"components/tasks/cu_pid",
2829
"examples/cu_config_gen",
2930
"examples/cu_standalone_structlog",

Diff for: components/tasks/cu_aligner/Cargo.toml

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
[package]
2+
name = "cu-aligner"
3+
version.workspace = true
4+
authors.workspace = true
5+
edition.workspace = true
6+
license.workspace = true
7+
keywords.workspace = true
8+
categories.workspace = true
9+
homepage.workspace = true
10+
repository.workspace = true
11+
12+
[dependencies]
13+
cu29 = { workspace = true }
14+
cu29-clock = { workspace = true }
15+
bincode = { workspace = true }
16+
circular-buffer = "0.1.9"
17+
paste = "1.0.15"

Diff for: components/tasks/cu_aligner/README.md

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
## Aligner to buffer and align data flow from different time horizons
2+
3+
The goal of this Copper component is to align data from different sources.
4+
5+
### Theory of operation:
6+
7+
```plaintext
8+
STREAM 1 STREAM 2
9+
│ │
10+
Msg1_S1 ─╯ │ (discarded)
11+
│ │
12+
═══════════╪═══════════════════════╪═══════════ TIME HORIZON ( present - stale_data_horizon_ms )
13+
│ │
14+
│ │
15+
│ Msg1_S2 ─┤
16+
Msg2_S1 ─┤ │
17+
│ │
18+
Msg3_S1 ─┤ Msg2_S2 ─┤
19+
│ │
20+
│ │
21+
│ │
22+
═══════════╪═══════════════════════╪═══════════ BEGINNING OF ALIGNMENT WINDOW ( t - alignment_window_ms )
23+
│ │
24+
│ Msg3_S2 ─┤
25+
Msg4_S1 ─┤ │ <- This part will be in the output at each process call
26+
│ │
27+
Msg5_S1 ─┤ Msg4_S2 ─┤
28+
│ │
29+
═══════════╪═══════════════════════╪═══════════ MOST RECENT ALIGNED MESSAGE ( t )
30+
│ │
31+
│ Msg5_S2 ─┤
32+
│ │
33+
│ Msg6_S2 ─┤ (not yet aligned)
34+
```
35+
36+
The timings are taken from the CuMsg::metadata.tov field (time of validity).
37+
38+
### Usage
39+
40+
The task is generated entirely out of the `cu_aligner::define_task` macro:
41+
42+
```rust,ignore
43+
44+
use cu_aligner::define_task;
45+
use cu29::input_msg;
46+
use cu29::output_msg;
47+
use cu29::cutask::Freezable;
48+
use cu29::cutask::CuTaskLifecycle;
49+
use cu29::config::ComponentConfig;
50+
use cu29::CuResult;
51+
use cu29::cutask::CuTask;
52+
use cu29::cutask::CuMsg;
53+
54+
// Defines a task that aligns two streams of messages, one with payloads f32, the other MyPayload (you can use any rust struct that to implement the traits for CuMsgPayload).
55+
define_task!(MyAlignerTask,
56+
0 => { 15, 7, f32 }, // 5 is the Maximum capacity in nb of messages the internal
57+
// buffer structure can hold before they will me discarded,
58+
// 12 is the Maximum size in nb of messages the output (aligned messages of this type)
59+
1 => { 20, 5, u64 } // or any CuMsgPayload
60+
// you can continue with 2 => etc...
61+
);
62+
63+
```
64+
65+
You defined task will need to be connected with the matching tasks upstream in the Copper config file.
66+
The type of the input will be a tuple of CuMsg (which is what the aligner expects). From the example:
67+
`(CuMsg<f32>, CuMsg<MyPayload>)`.
68+
The type of the output will be a CuMsg of a tuple of CuArrays holding the aligned messages for each stream. From the
69+
example: `CuMsg<(CuArray<f32, 7>, CuArray<MyPayload, 5>)>`.
70+
71+
### Performance consideration
72+
73+
Copper by itself never buffers anything to avoid copies but for this aligner has to copy data until it can align it.
74+
It means you will have 1 copy from the input to the internal buffer and 1 copy from the internal buffer to the output.
75+
76+
If your usecase is just to get the latest message from 2 sources, just connect your task to the upstream tasks
77+
but do not use this aligner. It is only useful if the time of arrival from the 2 upstream tasks are too far appart
78+
in terms of tov.

Diff for: components/tasks/cu_aligner/src/buffers.rs

+255
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
use circular_buffer::CircularBuffer;
2+
use cu29::clock::{CuTime, Tov};
3+
use cu29::cutask::{CuMsg, CuMsgPayload};
4+
use cu29::{CuError, CuResult};
5+
6+
/// An augmented circular buffer that allows for time-based operations.
7+
pub struct TimeboundCircularBuffer<const S: usize, P>
8+
where
9+
P: CuMsgPayload,
10+
{
11+
pub inner: CircularBuffer<S, CuMsg<P>>,
12+
}
13+
14+
#[allow(dead_code)]
15+
fn extract_tov_time_left(tov: &Tov) -> Option<CuTime> {
16+
match tov {
17+
Tov::Time(time) => Some(*time),
18+
Tov::Range(range) => Some(range.start), // Use the start of the range for alignment
19+
Tov::None => None,
20+
}
21+
}
22+
23+
fn extract_tov_time_right(tov: &Tov) -> Option<CuTime> {
24+
match tov {
25+
Tov::Time(time) => Some(*time),
26+
Tov::Range(range) => Some(range.end), // Use the end of the range for alignment
27+
Tov::None => None,
28+
}
29+
}
30+
31+
impl<const S: usize, P> TimeboundCircularBuffer<S, P>
32+
where
33+
P: CuMsgPayload,
34+
{
35+
pub fn new() -> Self {
36+
TimeboundCircularBuffer {
37+
// It is assumed to be sorted by time with non overlapping ranges if they are Tov::Range
38+
inner: CircularBuffer::<S, CuMsg<P>>::new(),
39+
}
40+
}
41+
42+
/// Gets a slice of messages that fall within the given time range.
43+
/// In case of a Tov::Range, the message is included if its start and end time fall within the range.
44+
pub fn iter_window(
45+
&self,
46+
start_time: CuTime,
47+
end_time: CuTime,
48+
) -> impl Iterator<Item = &CuMsg<P>> {
49+
self.inner.iter().filter(move |msg| match msg.metadata.tov {
50+
Tov::Time(time) => time >= start_time && time <= end_time,
51+
Tov::Range(range) => range.start >= start_time && range.end <= end_time,
52+
_ => false,
53+
})
54+
}
55+
56+
/// Remove all the messages that are older than the given time horizon.
57+
pub fn purge(&mut self, time_horizon: CuTime) {
58+
// Find the index of the first element that should be retained
59+
let drain_end = self
60+
.inner
61+
.iter()
62+
.position(|msg| match msg.metadata.tov {
63+
Tov::Time(time) => time >= time_horizon,
64+
Tov::Range(range) => range.end >= time_horizon,
65+
_ => false,
66+
})
67+
.unwrap_or(self.inner.len()); // If none match, drain the entire buffer
68+
69+
// Drain all elements before the `drain_end` index
70+
self.inner.drain(..drain_end);
71+
}
72+
73+
/// Get the most recent time of the messages in the buffer.
74+
pub fn most_recent_time(&self) -> CuResult<Option<CuTime>> {
75+
self.inner
76+
.iter()
77+
.map(|msg| extract_tov_time_right(&msg.metadata.tov))
78+
.try_fold(None, |acc, time| {
79+
let time = time.ok_or_else(|| {
80+
CuError::from("Trying to align temporal data with no time information")
81+
})?;
82+
Ok(Some(
83+
acc.map_or(time, |current_max: CuTime| current_max.max(time)),
84+
))
85+
})
86+
}
87+
88+
/// Push a message into the buffer.
89+
pub fn push(&mut self, msg: CuMsg<P>) {
90+
self.inner.push_back(msg);
91+
}
92+
}
93+
94+
#[macro_export]
95+
macro_rules! alignment_buffers {
96+
($struct_name:ident, $($name:ident: TimeboundCircularBuffer<$size:expr, CuMsg<$payload:ty>>),*) => {
97+
struct $struct_name {
98+
target_alignment_window: cu29::clock::CuDuration, // size of the most recent data window to align
99+
stale_data_horizon: cu29::clock::CuDuration, // time horizon for purging stale data
100+
$(pub $name: crate::buffers::TimeboundCircularBuffer<$size, $payload>),*
101+
}
102+
103+
impl $struct_name {
104+
pub fn new(target_alignment_window: cu29::clock::CuDuration, stale_data_horizon: cu29::clock::CuDuration) -> Self {
105+
Self {
106+
target_alignment_window,
107+
stale_data_horizon,
108+
$($name: crate::buffers::TimeboundCircularBuffer::<$size, $payload>::new()),*
109+
}
110+
}
111+
112+
/// Call this to be sure we discard the old/ non relevant data
113+
#[allow(dead_code)]
114+
pub fn purge(&mut self, now: cu29::clock::CuTime) {
115+
let horizon_time = now - self.stale_data_horizon;
116+
// purge all the stale data from the TimeboundCircularBuffers first
117+
$(self.$name.purge(horizon_time);)*
118+
}
119+
120+
/// Get the most recent set of aligned data from all the buffers matching the constraints set at construction.
121+
#[allow(dead_code)]
122+
pub fn get_latest_aligned_data(
123+
&mut self,
124+
) -> Option<($(impl Iterator<Item = &cu29::cutask::CuMsg<$payload>>),*)> {
125+
// Now find the min of the max of the last time for all buffers
126+
// meaning the most recent time at which all buffers have data
127+
let most_recent_time = [
128+
$(self.$name.most_recent_time().unwrap_or(None)),*
129+
]
130+
.iter()
131+
.filter_map(|&time| time)
132+
.min();
133+
134+
// If there is no data in any of the buffers, return early
135+
if most_recent_time.is_none() {
136+
return None;
137+
}
138+
139+
let most_recent_time = most_recent_time.unwrap();
140+
141+
let time_to_get_complete_window = most_recent_time - self.target_alignment_window;
142+
Some(($(self.$name.iter_window(time_to_get_complete_window, most_recent_time)),*))
143+
}
144+
}
145+
};
146+
}
147+
148+
pub use alignment_buffers;
149+
150+
#[cfg(test)]
151+
mod tests {
152+
use cu29::clock::Tov;
153+
use cu29::cutask::CuMsg;
154+
use std::time::Duration;
155+
156+
#[test]
157+
fn simple_init_test() {
158+
alignment_buffers!(AlignmentBuffers, buffer1: TimeboundCircularBuffer<10, CuMsg<u32>>, buffer2: TimeboundCircularBuffer<12, CuMsg<u64>>);
159+
160+
let buffers =
161+
AlignmentBuffers::new(Duration::from_secs(1).into(), Duration::from_secs(2).into());
162+
assert_eq!(buffers.buffer1.inner.capacity(), 10);
163+
assert_eq!(buffers.buffer2.inner.capacity(), 12);
164+
}
165+
166+
#[test]
167+
fn purge_test() {
168+
alignment_buffers!(AlignmentBuffers, buffer1: TimeboundCircularBuffer<10, CuMsg<u32>>, buffer2: TimeboundCircularBuffer<12, CuMsg<u32>>);
169+
170+
let mut buffers =
171+
AlignmentBuffers::new(Duration::from_secs(1).into(), Duration::from_secs(2).into());
172+
173+
let mut msg1 = CuMsg::new(Some(1));
174+
msg1.metadata.tov = Tov::Time(Duration::from_secs(1).into());
175+
buffers.buffer1.inner.push_back(msg1.clone());
176+
buffers.buffer2.inner.push_back(msg1);
177+
// within the horizon
178+
let _ = buffers.purge(Duration::from_secs(2).into());
179+
assert_eq!(buffers.buffer1.inner.len(), 1);
180+
assert_eq!(buffers.buffer2.inner.len(), 1);
181+
// outside the horizon
182+
let _ = buffers.purge(Duration::from_secs(5).into());
183+
assert_eq!(buffers.buffer1.inner.len(), 0);
184+
assert_eq!(buffers.buffer2.inner.len(), 0);
185+
}
186+
187+
#[test]
188+
fn empty_buffers_test() {
189+
alignment_buffers!(
190+
AlignmentBuffers,
191+
buffer1: TimeboundCircularBuffer<10, CuMsg<u32>>,
192+
buffer2: TimeboundCircularBuffer<12, CuMsg<u32>>
193+
);
194+
195+
let mut buffers = AlignmentBuffers::new(
196+
Duration::from_secs(2).into(), // 2-second alignment window
197+
Duration::from_secs(5).into(), // 5-second stale data horizon
198+
);
199+
200+
// Advance time to 10 seconds
201+
assert!(buffers.get_latest_aligned_data().is_none());
202+
}
203+
204+
#[test]
205+
fn horizon_and_window_alignment_test() {
206+
alignment_buffers!(
207+
AlignmentBuffers,
208+
buffer1: TimeboundCircularBuffer<10, CuMsg<u32>>,
209+
buffer2: TimeboundCircularBuffer<12, CuMsg<u32>>
210+
);
211+
212+
let mut buffers = AlignmentBuffers::new(
213+
Duration::from_secs(2).into(), // 2-second alignment window
214+
Duration::from_secs(5).into(), // 5-second stale data horizon
215+
);
216+
217+
// Insert messages with timestamps
218+
let mut msg1 = CuMsg::new(Some(1));
219+
msg1.metadata.tov = Tov::Time(Duration::from_secs(1).into());
220+
buffers.buffer1.inner.push_back(msg1.clone());
221+
buffers.buffer2.inner.push_back(msg1);
222+
223+
let mut msg2 = CuMsg::new(Some(3));
224+
msg2.metadata.tov = Tov::Time(Duration::from_secs(3).into());
225+
buffers.buffer2.inner.push_back(msg2);
226+
227+
let mut msg3 = CuMsg::new(Some(4));
228+
msg3.metadata.tov = Tov::Time(Duration::from_secs(4).into());
229+
buffers.buffer1.inner.push_back(msg3.clone());
230+
buffers.buffer2.inner.push_back(msg3);
231+
232+
// Advance time to 7 seconds; horizon is 7 - 5 = everything 2+ should stay
233+
let now = Duration::from_secs(7).into();
234+
// Emulate a normal workflow here.
235+
buffers.purge(now);
236+
if let Some((iter1, iter2)) = buffers.get_latest_aligned_data() {
237+
let collected1: Vec<_> = iter1.collect();
238+
let collected2: Vec<_> = iter2.collect();
239+
240+
// Verify only messages within the alignment window [5, 7] are returned
241+
assert_eq!(collected1.len(), 1);
242+
assert_eq!(collected2.len(), 2);
243+
244+
assert_eq!(collected1[0].payload(), Some(&4));
245+
assert_eq!(collected2[0].payload(), Some(&3));
246+
assert_eq!(collected2[1].payload(), Some(&4));
247+
} else {
248+
panic!("Expected aligned data, but got None");
249+
}
250+
251+
// Ensure older messages outside the horizon [>2 seconds] are purged
252+
assert_eq!(buffers.buffer1.inner.len(), 1);
253+
assert_eq!(buffers.buffer2.inner.len(), 2);
254+
}
255+
}

0 commit comments

Comments
 (0)