Skip to content

Commit 938faa6

Browse files
chore(kafka-dedup): refactor deduplication logic (#38924)
1 parent 8fe3115 commit 938faa6

File tree

7 files changed

+972
-910
lines changed

7 files changed

+972
-910
lines changed

rust/common/types/src/event.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ use serde_json::Value;
77
use time::{format_description::well_known::Rfc3339, OffsetDateTime};
88
use uuid::Uuid;
99

10+
/// Information about the library/SDK that sent an event
11+
#[derive(Debug, Clone, PartialEq, Eq)]
12+
pub struct LibraryInfo {
13+
pub name: String,
14+
pub version: Option<String>,
15+
}
16+
1017
#[derive(Default, Debug, Deserialize, Serialize)]
1118
pub struct RawEvent {
1219
#[serde(
@@ -238,6 +245,24 @@ impl RawEvent {
238245
*value = f(value.take());
239246
}
240247
}
248+
249+
/// Extract library information from the event properties
250+
/// Returns None if $lib property is not present
251+
pub fn extract_library_info(&self) -> Option<LibraryInfo> {
252+
let name = self
253+
.properties
254+
.get("$lib")
255+
.and_then(|v| v.as_str())
256+
.map(String::from)?;
257+
258+
let version = self
259+
.properties
260+
.get("$lib_version")
261+
.and_then(|v| v.as_str())
262+
.map(String::from);
263+
264+
Some(LibraryInfo { name, version })
265+
}
241266
}
242267

243268
impl CapturedEvent {

rust/kafka-deduplicator/src/checkpoint/worker.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,9 @@ mod tests {
335335

336336
use super::*;
337337
use crate::checkpoint::CheckpointConfig;
338-
use crate::store::{DeduplicationStore, DeduplicationStoreConfig};
338+
use crate::store::{
339+
DeduplicationStore, DeduplicationStoreConfig, TimestampKey, TimestampMetadata,
340+
};
339341

340342
use common_types::RawEvent;
341343
use tempfile::TempDir;
@@ -394,7 +396,9 @@ mod tests {
394396

395397
// Add an event to the store
396398
let event = create_test_event();
397-
store.handle_event_with_raw(&event).unwrap();
399+
let key = TimestampKey::from(&event);
400+
let metadata = TimestampMetadata::new(&event);
401+
store.put_timestamp_record(&key, &metadata).unwrap();
398402

399403
let tmp_checkpoint_dir = TempDir::new().unwrap();
400404
let config = CheckpointConfig {
@@ -452,7 +456,9 @@ mod tests {
452456

453457
// Add an event to the store
454458
let event = create_test_event();
455-
store.handle_event_with_raw(&event).unwrap();
459+
let key = TimestampKey::from(&event);
460+
let metadata = TimestampMetadata::new(&event);
461+
store.put_timestamp_record(&key, &metadata).unwrap();
456462

457463
let partition = Partition::new("some_test_topic".to_string(), 0);
458464

rust/kafka-deduplicator/src/checkpoint_manager.rs

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -643,7 +643,9 @@ impl Drop for CheckpointManager {
643643
mod tests {
644644
use super::*;
645645
use crate::checkpoint::worker::CheckpointTarget;
646-
use crate::store::{DeduplicationStore, DeduplicationStoreConfig};
646+
use crate::store::{
647+
DeduplicationStore, DeduplicationStoreConfig, TimestampKey, TimestampMetadata,
648+
};
647649
use common_types::RawEvent;
648650
use std::{collections::HashMap, path::PathBuf, time::Duration};
649651
use tempfile::TempDir;
@@ -775,8 +777,11 @@ mod tests {
775777

776778
// Add events to the stores
777779
let event = create_test_event();
778-
store1.handle_event_with_raw(&event).unwrap();
779-
store2.handle_event_with_raw(&event).unwrap();
780+
// Add test data directly to stores
781+
let key = TimestampKey::from(&event);
782+
let metadata = TimestampMetadata::new(&event);
783+
store1.put_timestamp_record(&key, &metadata).unwrap();
784+
store2.put_timestamp_record(&key, &metadata).unwrap();
780785

781786
// add dedup stores to manager
782787
let stores = store_manager.stores();
@@ -840,9 +845,13 @@ mod tests {
840845

841846
// Add an event
842847
let event1 = create_test_event();
843-
store.handle_event_with_raw(&event1).unwrap();
848+
let key1 = TimestampKey::from(&event1);
849+
let metadata1 = TimestampMetadata::new(&event1);
850+
store.put_timestamp_record(&key1, &metadata1).unwrap();
844851
let event2 = create_test_event();
845-
store.handle_event_with_raw(&event2).unwrap();
852+
let key2 = TimestampKey::from(&event2);
853+
let metadata2 = TimestampMetadata::new(&event2);
854+
store.put_timestamp_record(&key2, &metadata2).unwrap();
846855

847856
// Create manager with short interval for testing
848857
let tmp_checkpoint_dir = TempDir::new().unwrap();
@@ -971,8 +980,11 @@ mod tests {
971980

972981
// Add events to the stores
973982
let event = create_test_event();
974-
store1.handle_event_with_raw(&event).unwrap();
975-
store2.handle_event_with_raw(&event).unwrap();
983+
// Add test data directly to stores
984+
let key = TimestampKey::from(&event);
985+
let metadata = TimestampMetadata::new(&event);
986+
store1.put_timestamp_record(&key, &metadata).unwrap();
987+
store2.put_timestamp_record(&key, &metadata).unwrap();
976988

977989
// add dedup stores to manager
978990
let stores = store_manager.stores();
@@ -1036,8 +1048,11 @@ mod tests {
10361048

10371049
// Add events to the stores
10381050
let event = create_test_event();
1039-
store1.handle_event_with_raw(&event).unwrap();
1040-
store2.handle_event_with_raw(&event).unwrap();
1051+
// Add test data directly to stores
1052+
let key = TimestampKey::from(&event);
1053+
let metadata = TimestampMetadata::new(&event);
1054+
store1.put_timestamp_record(&key, &metadata).unwrap();
1055+
store2.put_timestamp_record(&key, &metadata).unwrap();
10411056

10421057
// add dedup stores to manager
10431058
let stores = store_manager.stores();
@@ -1102,7 +1117,9 @@ mod tests {
11021117
let event = create_test_event();
11031118
let part = Partition::new("max_inflight_checkpoints".to_string(), i);
11041119
let store = create_test_store(part.topic(), part.partition_number());
1105-
store.handle_event_with_raw(&event).unwrap();
1120+
let key = TimestampKey::from(&event);
1121+
let metadata = TimestampMetadata::new(&event);
1122+
store.put_timestamp_record(&key, &metadata).unwrap();
11061123
stores.insert(part, store);
11071124
}
11081125

@@ -1172,9 +1189,11 @@ mod tests {
11721189

11731190
// Add events to the stores
11741191
let event = create_test_event();
1175-
store1.handle_event_with_raw(&event).unwrap();
1176-
store2.handle_event_with_raw(&event).unwrap();
1177-
store3.handle_event_with_raw(&event).unwrap();
1192+
let key = crate::store::keys::TimestampKey::from(&event);
1193+
let metadata = crate::store::metadata::TimestampMetadata::new(&event);
1194+
store1.put_timestamp_record(&key, &metadata).unwrap();
1195+
store2.put_timestamp_record(&key, &metadata).unwrap();
1196+
store3.put_timestamp_record(&key, &metadata).unwrap();
11781197

11791198
// add dedup stores to manager
11801199
let stores = store_manager.stores();

0 commit comments

Comments
 (0)