Skip to content

Commit d5e4551

Browse files
authored
feat: Metering - Collect evaluation counts and push them via server_client in batches (#83)
Core changes: - Now store counts of evaluations sorted in a hashmap by `MeteringKey` - Introduce `Batcher` responsible for collecting evaluations and pushing them to the server in batches. --------- Signed-off-by: Rainer Schoenberger <[email protected]> Signed-off-by: Rainer <[email protected]>
1 parent 5a4d900 commit d5e4551

File tree

3 files changed

+239
-21
lines changed

3 files changed

+239
-21
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ murmur3 = "0.5.2"
2929
tungstenite = { version = "0.27.0", features = ["native-tls"] }
3030
url = "2.5.4"
3131
thiserror = "2.0.7"
32+
chrono = { version = "0.4", features = ["serde"] }
3233

3334
[dev-dependencies]
3435
appconfiguration = {path = ".", features = ["test_utils"]}

src/client/metering.rs

Lines changed: 216 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,29 @@ use std::sync::mpsc;
2929
/// * MeteringThreadHandle<T> - Object representing the thread. Metrics will be sent as long as this object is alive.
3030
/// * MeteringRecorder - Use this to record all evaluations, which will eventually be sent to the server.
3131
pub(crate) fn start_metering<T: ServerClient>(
32-
_config_id: ConfigurationId,
33-
_transmit_interval: std::time::Duration,
32+
config_id: ConfigurationId,
33+
transmit_interval: std::time::Duration,
3434
server_client: T,
3535
) -> (MeteringThreadHandle, MeteringRecorder) {
3636
let (sender, receiver) = mpsc::channel();
3737

3838
let thread = ThreadHandle::new(move |_terminator: mpsc::Receiver<()>| {
39-
// TODO: termination handling
39+
let mut batcher = MeteringBatcher::new(server_client, config_id);
40+
let mut last_flush = std::time::Instant::now();
4041
loop {
41-
// TODO: error handling
42-
let _ = receiver.recv().unwrap();
43-
// TODO: actually process the event
44-
let json_data = crate::models::MeteringDataJson {};
45-
server_client.push_metering_data(&json_data);
42+
let recv_result = receiver.recv_timeout(std::time::Duration::from_millis(100));
43+
match recv_result {
44+
// Actually received an event, sort it in using the batcher:
45+
Ok(event) => batcher.handle_event(event),
46+
// Hit the timeout, do nothing here, but give the batcher a chance to flush:
47+
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
48+
// All senders have been dropped, exit the thread:
49+
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
50+
}
51+
if last_flush.elapsed() >= transmit_interval {
52+
batcher.flush();
53+
last_flush = std::time::Instant::now();
54+
}
4655
}
4756
});
4857

@@ -103,16 +112,113 @@ pub(crate) enum EvaluationEvent {
103112
Property(EvaluationEventData),
104113
}
105114

115+
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
116+
struct MeteringKey {
117+
feature_id: Option<String>,
118+
property_id: Option<String>,
119+
entity_id: String,
120+
segment_id: Option<String>,
121+
}
122+
123+
struct EvaluationData {
124+
number_of_evaluations: u32,
125+
time_of_last_evaluation: chrono::DateTime<chrono::Utc>,
126+
}
127+
128+
/// The responsibility of the MeteringBatcher is to aggregate evaluation events and batch them for transmission to the server.
129+
struct MeteringBatcher<T: ServerClient> {
130+
evaluations: std::collections::HashMap<MeteringKey, EvaluationData>,
131+
server_client: T,
132+
config_id: ConfigurationId,
133+
}
134+
135+
impl<T: ServerClient> MeteringBatcher<T> {
136+
fn new(server_client: T, config_id: ConfigurationId) -> Self {
137+
Self {
138+
evaluations: std::collections::HashMap::new(),
139+
server_client,
140+
config_id,
141+
}
142+
}
143+
144+
fn handle_event(&mut self, event: EvaluationEvent) {
145+
let (feature_id, property_id, entity_id, segment_id) = match event {
146+
EvaluationEvent::Feature(data) => (
147+
match data.subject_id {
148+
SubjectId::Feature(ref id) => Some(id.clone()),
149+
_ => None,
150+
},
151+
None,
152+
data.entity_id,
153+
data.segment_id,
154+
),
155+
EvaluationEvent::Property(data) => (
156+
None,
157+
match data.subject_id {
158+
SubjectId::Property(ref id) => Some(id.clone()),
159+
_ => None,
160+
},
161+
data.entity_id,
162+
data.segment_id,
163+
),
164+
};
165+
let key = MeteringKey {
166+
feature_id: feature_id.clone(),
167+
property_id: property_id.clone(),
168+
entity_id: entity_id.clone(),
169+
segment_id: segment_id.clone(),
170+
};
171+
let now = chrono::Utc::now();
172+
self.evaluations
173+
.entry(key)
174+
.and_modify(|v| {
175+
v.number_of_evaluations += 1;
176+
v.time_of_last_evaluation = now;
177+
})
178+
.or_insert(EvaluationData {
179+
number_of_evaluations: 1,
180+
time_of_last_evaluation: now,
181+
});
182+
}
183+
184+
fn flush(&mut self) {
185+
if self.evaluations.is_empty() {
186+
return;
187+
}
188+
let usages: Vec<crate::models::MeteringDataUsageJson> = self
189+
.evaluations
190+
.iter()
191+
.map(|(key, value)| crate::models::MeteringDataUsageJson {
192+
feature_id: key.feature_id.clone(),
193+
property_id: key.property_id.clone(),
194+
entity_id: key.entity_id.clone(),
195+
segment_id: key.segment_id.clone(),
196+
evaluation_time: value.time_of_last_evaluation,
197+
count: value.number_of_evaluations,
198+
})
199+
.collect();
200+
201+
let json_data = crate::models::MeteringDataJson {
202+
collection_id: self.config_id.collection_id.to_string(),
203+
environment_id: self.config_id.environment_id.to_string(),
204+
usages,
205+
};
206+
let _ = self.server_client.push_metering_data(&json_data);
207+
self.evaluations.clear();
208+
}
209+
}
210+
106211
#[cfg(test)]
107212
mod tests {
108213
use super::*;
109214
use crate::models::ConfigurationJson;
110215
use crate::models::MeteringDataJson;
111216
use crate::network::http_client::WebsocketReader;
112217
use crate::network::NetworkResult;
218+
use chrono;
113219

114220
struct ServerClientMock {
115-
metering_data_sender: mpsc::Sender<()>,
221+
metering_data_sender: mpsc::Sender<MeteringDataJson>,
116222
}
117223
struct WebsocketMockReader {}
118224
impl WebsocketReader for WebsocketMockReader {
@@ -122,8 +228,8 @@ mod tests {
122228
}
123229

124230
impl ServerClientMock {
125-
fn new() -> (ServerClientMock, mpsc::Receiver<()>) {
126-
let (sender, receiver) = mpsc::channel();
231+
fn new() -> (ServerClientMock, mpsc::Receiver<MeteringDataJson>) {
232+
let (sender, receiver) = mpsc::channel::<MeteringDataJson>();
127233
(
128234
ServerClientMock {
129235
metering_data_sender: sender,
@@ -150,25 +256,117 @@ mod tests {
150256
unreachable!() as NetworkResult<WebsocketMockReader>
151257
}
152258

153-
fn push_metering_data(&self, _data: &MeteringDataJson) -> NetworkResult<()> {
154-
self.metering_data_sender.send(()).unwrap();
259+
fn push_metering_data(&self, data: &MeteringDataJson) -> NetworkResult<()> {
260+
self.metering_data_sender.send(data.clone()).unwrap();
155261
Ok(())
156262
}
157263
}
158264

265+
/// Tests the propagation of evaluation events through the batcher to the server client and the timings of the flush.
159266
#[test]
160-
fn test_metrics_sent_feature() {
267+
fn test_record_evaluation_leads_to_metering_data_sent() {
161268
let (server_client, metering_data_sent_receiver) = ServerClientMock::new();
162269
let (_, metering_handle) = start_metering(
163-
ConfigurationId::new("".to_string(), "".to_string(), "".to_string()),
164-
std::time::Duration::ZERO,
270+
ConfigurationId::new(
271+
"test_guid".to_string(),
272+
"test_env_id".to_string(),
273+
"test_collection_id".to_string(),
274+
),
275+
std::time::Duration::from_millis(200), // Use 200ms for test flushing
165276
server_client,
166277
);
167278

279+
// Send a single evaluation event
168280
metering_handle
169-
.record_evaluation(SubjectId::Feature("".to_string()), "".to_string(), None)
281+
.record_evaluation(
282+
SubjectId::Feature("feature1".to_string()),
283+
"entity1".to_string(),
284+
None,
285+
)
286+
.unwrap();
287+
288+
let time_record_evaluation = chrono::Utc::now();
289+
let metering_data = metering_data_sent_receiver.recv().unwrap();
290+
assert!(chrono::Utc::now() - time_record_evaluation >= chrono::Duration::milliseconds(200));
291+
292+
assert_eq!(
293+
metering_data.collection_id,
294+
"test_collection_id".to_string()
295+
);
296+
assert_eq!(metering_data.environment_id, "test_env_id".to_string());
297+
let usage = &metering_data.usages[0];
298+
assert_eq!(usage.feature_id, Some("feature1".to_string()));
299+
assert_eq!(usage.property_id, None);
300+
assert_eq!(usage.entity_id, "entity1".to_string());
301+
assert_eq!(usage.segment_id, None);
302+
assert_eq!(usage.count, 1);
303+
// Evaluation time should be close to when we called record_evaluation.
304+
assert!(
305+
usage.evaluation_time >= time_record_evaluation
306+
&& usage.evaluation_time
307+
< time_record_evaluation + chrono::Duration::milliseconds(50)
308+
);
309+
}
310+
311+
/// Tests the correct sorting and batching of evaluation events.
312+
#[test]
313+
fn test_metrics_multiple_same_evaluation_events_are_batched_to_one_entry() {
314+
let (server_client, metering_data_sent_receiver) = ServerClientMock::new();
315+
let mut batcher = MeteringBatcher::new(
316+
server_client,
317+
ConfigurationId::new(
318+
"test_guid".to_string(),
319+
"test_env_id".to_string(),
320+
"test_collection_id".to_string(),
321+
),
322+
);
323+
324+
// Simulate two events for the same feature/entity
325+
batcher.handle_event(EvaluationEvent::Feature(EvaluationEventData {
326+
subject_id: SubjectId::Feature("feature1".to_string()),
327+
entity_id: "entity1".to_string(),
328+
segment_id: None,
329+
}));
330+
let time_second_record = chrono::Utc::now();
331+
batcher.handle_event(EvaluationEvent::Feature(EvaluationEventData {
332+
subject_id: SubjectId::Feature("feature1".to_string()),
333+
entity_id: "entity1".to_string(),
334+
segment_id: None,
335+
}));
336+
let time_third_record = chrono::Utc::now();
337+
batcher.handle_event(EvaluationEvent::Property(EvaluationEventData {
338+
subject_id: SubjectId::Property("property1".to_string()),
339+
entity_id: "entity1".to_string(),
340+
segment_id: Some("some_segment".to_string()),
341+
}));
342+
343+
// Force flush
344+
batcher.flush();
345+
346+
let metering_data = metering_data_sent_receiver.recv().unwrap();
347+
348+
// The two feature evaluations should be batched into one entry:
349+
let feature_usage = metering_data
350+
.usages
351+
.iter()
352+
.find(|u| u.feature_id == Some("feature1".to_string()))
170353
.unwrap();
354+
assert_eq!(feature_usage.property_id, None);
355+
assert_eq!(feature_usage.entity_id, "entity1".to_string());
356+
assert_eq!(feature_usage.segment_id, None);
357+
assert!(feature_usage.evaluation_time >= time_second_record);
358+
assert_eq!(feature_usage.count, 2);
171359

172-
let _ = metering_data_sent_receiver.recv().unwrap();
360+
// The property evaluation should be a separate entry:
361+
let property_usage = metering_data
362+
.usages
363+
.iter()
364+
.find(|u| u.property_id == Some("property1".to_string()))
365+
.unwrap();
366+
assert_eq!(property_usage.feature_id, None);
367+
assert_eq!(property_usage.entity_id, "entity1".to_string());
368+
assert_eq!(property_usage.segment_id, Some("some_segment".to_string()));
369+
assert!(property_usage.evaluation_time >= time_third_record);
370+
assert_eq!(property_usage.count, 1);
173371
}
174372
}

src/models.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,33 @@
1414

1515
use std::fmt::Display;
1616

17-
use serde::Deserialize;
17+
use chrono::{DateTime, Utc};
18+
use serde::{Deserialize, Serialize};
1819

1920
use crate::{errors::DeserializationError, Error, Result, Value};
2021

22+
#[derive(Debug, Clone, Serialize)]
23+
pub struct MeteringDataUsageJson {
24+
pub feature_id: Option<String>,
25+
pub property_id: Option<String>,
26+
pub entity_id: String,
27+
// Serialized as "nil" when None
28+
#[serde(default, skip_serializing_if = "Option::is_none")]
29+
pub segment_id: Option<String>,
30+
// When this evaluation was last done
31+
pub evaluation_time: DateTime<Utc>,
32+
// how often this was evaluated
33+
pub count: u32,
34+
}
35+
2136
/// Represents Metering data in a structure for data exchange used for
2237
/// sending to the server.
23-
#[derive(Debug, Deserialize)]
24-
pub struct MeteringDataJson {}
38+
#[derive(Debug, Clone, Serialize)]
39+
pub struct MeteringDataJson {
40+
pub collection_id: String,
41+
pub environment_id: String,
42+
pub usages: Vec<MeteringDataUsageJson>,
43+
}
2544

2645
/// Represents AppConfig data in a structure intended for data exchange
2746
/// (typically JSON encoded) used by

0 commit comments

Comments
 (0)