From 5273d0d4f87eb8c97e8456234146a83267de244b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 28 Aug 2025 19:55:35 +0200 Subject: [PATCH 1/4] feat(enrichment tables): add per-event ttl for memory enrichment table Adds an optional `ttl_field` configuration to memory enrichment table, to allow overriding the global default TTL per event. --- src/enrichment_tables/memory/config.rs | 6 ++ src/enrichment_tables/memory/source.rs | 2 +- src/enrichment_tables/memory/table.rs | 96 ++++++++++++++++--- .../cue/reference/generated/configuration.cue | 6 ++ 4 files changed, 95 insertions(+), 15 deletions(-) diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index ea3db29159a46..02467c5e6c597 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -10,6 +10,7 @@ use tokio::sync::Mutex; use vector_lib::config::{AcknowledgementsConfig, DataType, Input, LogNamespace}; use vector_lib::enrichment::Table; use vector_lib::id::ComponentKey; +use vector_lib::lookup::lookup_v2::OptionalValuePath; use vector_lib::schema::{self}; use vector_lib::{configurable::configurable_component, sink::VectorSink}; use vrl::path::OwnedTargetPath; @@ -61,6 +62,10 @@ pub struct MemoryConfig { #[configurable(derived)] #[serde(skip_serializing_if = "vector_lib::serde::is_default")] pub source_config: Option, + /// Field to read from the incoming value to use as TTL override. + #[configurable(derived)] + #[serde(default)] + pub ttl_field: OptionalValuePath, #[serde(skip)] memory: Arc>>>, @@ -86,6 +91,7 @@ impl Default for MemoryConfig { log_namespace: None, source_config: None, internal_metrics: InternalMetricsConfig::default(), + ttl_field: OptionalValuePath::none(), } } } diff --git a/src/enrichment_tables/memory/source.rs b/src/enrichment_tables/memory/source.rs index 2dfceeff55086..df7e7a68f1c14 100644 --- a/src/enrichment_tables/memory/source.rs +++ b/src/enrichment_tables/memory/source.rs @@ -97,7 +97,7 @@ impl MemorySource { }) .filter_map(|(k, v)| { let mut event = Event::Log(LogEvent::from_map( - v.as_object_map(now, self.memory.config.ttl, k).ok()?, + v.as_object_map(now, k).ok()?, EventMetadata::default(), )); let log = event.as_mut_log(); diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs index 0d97c7ebbcc21..706ac814d479f 100644 --- a/src/enrichment_tables/memory/table.rs +++ b/src/enrichment_tables/memory/table.rs @@ -38,6 +38,7 @@ use super::source::MemorySource; pub struct MemoryEntry { value: String, update_time: CopyValue, + ttl: u64, } impl ByteSizeOf for MemoryEntry { @@ -47,13 +48,10 @@ impl ByteSizeOf for MemoryEntry { } impl MemoryEntry { - pub(super) fn as_object_map( - &self, - now: Instant, - total_ttl: u64, - key: &str, - ) -> Result { - let ttl = total_ttl.saturating_sub(now.duration_since(*self.update_time).as_secs()); + pub(super) fn as_object_map(&self, now: Instant, key: &str) -> Result { + let ttl = self + .ttl + .saturating_sub(now.duration_since(*self.update_time).as_secs()); Ok(ObjectMap::from([ ( KeyString::from("key"), @@ -71,8 +69,8 @@ impl MemoryEntry { ])) } - fn expired(&self, now: Instant, ttl: u64) -> bool { - now.duration_since(*self.update_time).as_secs() > ttl + fn expired(&self, now: Instant) -> bool { + now.duration_since(*self.update_time).as_secs() > self.ttl } } @@ -119,9 +117,9 @@ impl Memory { let mut writer = self.write_handle.lock().expect("mutex poisoned"); let now = Instant::now(); - for (k, v) in value.into_iter() { + for (k, value) in value.into_iter() { let new_entry_key = String::from(k); - let Ok(v) = serde_json::to_string(&v) else { + let Ok(v) = serde_json::to_string(&value) else { emit!(MemoryEnrichmentTableInsertFailed { key: &new_entry_key, include_key_metric_tag: self.config.internal_metrics.include_key_tag @@ -131,6 +129,15 @@ impl Memory { let new_entry = MemoryEntry { value: v, update_time: now.into(), + ttl: self + .config + .ttl_field + .path + .as_ref() + .and_then(|p| value.get(p)) + .and_then(|v| v.as_integer()) + .map(|v| v as u64) + .unwrap_or(self.config.ttl), }; let new_entry_size = new_entry_key.size_of() + new_entry.size_of(); if let Some(max_byte_size) = self.config.max_byte_size @@ -173,7 +180,7 @@ impl Memory { if let Some(reader) = self.get_read_handle().read() { for (k, v) in reader.iter() { if let Some(entry) = v.get_one() - && entry.expired(now, self.config.ttl) + && entry.expired(now) { // Byte size is not reduced at this point, because the actual deletion // will only happen at refresh time @@ -274,8 +281,7 @@ impl Table for Memory { key: &key, include_key_metric_tag: self.config.internal_metrics.include_key_tag }); - row.as_object_map(Instant::now(), self.config.ttl, &key) - .map(|r| vec![r]) + row.as_object_map(Instant::now(), &key).map(|r| vec![r]) } None => { emit!(MemoryEnrichmentTableReadFailed { @@ -377,6 +383,7 @@ mod tests { use std::slice::from_ref; use std::{num::NonZeroU64, time::Duration}; use tokio::time; + use vector_lib::lookup::lookup_v2::OptionalValuePath; use vector_lib::{ event::{EventContainer, MetricValue}, @@ -434,6 +441,7 @@ mod tests { MemoryEntry { value: "5".to_string(), update_time: (Instant::now() - Duration::from_secs(secs_to_subtract)).into(), + ttl, }, ); handle.write_handle.refresh(); @@ -454,6 +462,64 @@ mod tests { ); } + #[test] + fn calculates_ttl_override() { + let global_ttl = 100; + let ttl_override = 10; + let memory = Memory::new(build_memory_config(|c| { + c.ttl = global_ttl; + c.ttl_field = OptionalValuePath::new("ttl"); + })); + memory.handle_value(ObjectMap::from([ + ( + "ttl_override".into(), + Value::from(ObjectMap::from([ + ("val".into(), Value::from(5)), + ("ttl".into(), Value::from(ttl_override)), + ])), + ), + ( + "default_ttl".into(), + Value::from(ObjectMap::from([("val".into(), Value::from(5))])), + ), + ])); + + let default_condition = Condition::Equals { + field: "key", + value: Value::from("default_ttl"), + }; + let override_condition = Condition::Equals { + field: "key", + value: Value::from("ttl_override"), + }; + + assert_eq!( + Ok(ObjectMap::from([ + ("key".into(), Value::from("default_ttl")), + ("ttl".into(), Value::from(global_ttl)), + ( + "value".into(), + Value::from(ObjectMap::from([("val".into(), Value::from(5))])) + ), + ])), + memory.find_table_row(Case::Sensitive, &[default_condition], None, None, None) + ); + assert_eq!( + Ok(ObjectMap::from([ + ("key".into(), Value::from("ttl_override")), + ("ttl".into(), Value::from(ttl_override)), + ( + "value".into(), + Value::from(ObjectMap::from([ + ("val".into(), Value::from(5)), + ("ttl".into(), Value::from(ttl_override)) + ])) + ), + ])), + memory.find_table_row(Case::Sensitive, &[override_condition], None, None, None) + ); + } + #[test] fn removes_expired_records_on_scan_interval() { let ttl = 100; @@ -467,6 +533,7 @@ mod tests { MemoryEntry { value: "5".to_string(), update_time: (Instant::now() - Duration::from_secs(ttl + 10)).into(), + ttl, }, ); handle.write_handle.refresh(); @@ -534,6 +601,7 @@ mod tests { MemoryEntry { value: "5".to_string(), update_time: (Instant::now() - Duration::from_secs(ttl / 2)).into(), + ttl, }, ); handle.write_handle.refresh(); diff --git a/website/cue/reference/generated/configuration.cue b/website/cue/reference/generated/configuration.cue index 143ecac9e3171..4ae8cb81346d5 100644 --- a/website/cue/reference/generated/configuration.cue +++ b/website/cue/reference/generated/configuration.cue @@ -207,6 +207,12 @@ generated: configuration: configuration: { required: false relevant_when: "type = \"memory\"" } + ttl_field: { + type: string: default: "" + description: "Field to read from the incoming value to use as TTL override." + required: false + relevant_when: "type = \"memory\"" + } locale: { type: string: default: "en" description: """ From ebaac7c297d230f32b5da0e6508c5a78f62a528f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 28 Aug 2025 20:09:11 +0200 Subject: [PATCH 2/4] Add changelog entry --- changelog.d/23666_memory_enrichment_ttl_field.feature.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog.d/23666_memory_enrichment_ttl_field.feature.md diff --git a/changelog.d/23666_memory_enrichment_ttl_field.feature.md b/changelog.d/23666_memory_enrichment_ttl_field.feature.md new file mode 100644 index 0000000000000..046f87f90bb23 --- /dev/null +++ b/changelog.d/23666_memory_enrichment_ttl_field.feature.md @@ -0,0 +1,3 @@ +Added an optional `ttl_field` configuration option to the memory enrichment table, to override the global memory table TTL on a per event basis. + +authors: esensar Quad9DNS From 29346efe2a01cada4f337d129bc7d270740fa15c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 28 Aug 2025 20:35:42 +0200 Subject: [PATCH 3/4] Update website/cue/reference/generated/configuration.cue Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com> --- website/cue/reference/generated/configuration.cue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/cue/reference/generated/configuration.cue b/website/cue/reference/generated/configuration.cue index 4ae8cb81346d5..2ce7ad4206892 100644 --- a/website/cue/reference/generated/configuration.cue +++ b/website/cue/reference/generated/configuration.cue @@ -209,7 +209,7 @@ generated: configuration: configuration: { } ttl_field: { type: string: default: "" - description: "Field to read from the incoming value to use as TTL override." + description: "Field in the incoming value used as the TTL override." required: false relevant_when: "type = \"memory\"" } From daa4e630b0b1c539c071dfc900fd81a483106b54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 28 Aug 2025 20:37:25 +0200 Subject: [PATCH 4/4] Update description in the code --- src/enrichment_tables/memory/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 02467c5e6c597..b79c8a96affc6 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -62,7 +62,7 @@ pub struct MemoryConfig { #[configurable(derived)] #[serde(skip_serializing_if = "vector_lib::serde::is_default")] pub source_config: Option, - /// Field to read from the incoming value to use as TTL override. + /// Field in the incoming value used as the TTL override. #[configurable(derived)] #[serde(default)] pub ttl_field: OptionalValuePath,