Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/23666_memory_enrichment_ttl_field.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added an optional `ttl_field` configuration option to the memory enrichment table, to override the global memory table TTL on a per event basis.

authors: esensar Quad9DNS
6 changes: 6 additions & 0 deletions src/enrichment_tables/memory/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tokio::sync::Mutex;
use vector_lib::config::{AcknowledgementsConfig, DataType, Input, LogNamespace};
use vector_lib::enrichment::Table;
use vector_lib::id::ComponentKey;
use vector_lib::lookup::lookup_v2::OptionalValuePath;
use vector_lib::schema::{self};
use vector_lib::{configurable::configurable_component, sink::VectorSink};
use vrl::path::OwnedTargetPath;
Expand Down Expand Up @@ -61,6 +62,10 @@ pub struct MemoryConfig {
#[configurable(derived)]
#[serde(skip_serializing_if = "vector_lib::serde::is_default")]
pub source_config: Option<MemorySourceConfig>,
/// Field to read from the incoming value to use as TTL override.
#[configurable(derived)]
#[serde(default)]
pub ttl_field: OptionalValuePath,

#[serde(skip)]
memory: Arc<Mutex<Option<Box<Memory>>>>,
Expand All @@ -86,6 +91,7 @@ impl Default for MemoryConfig {
log_namespace: None,
source_config: None,
internal_metrics: InternalMetricsConfig::default(),
ttl_field: OptionalValuePath::none(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/enrichment_tables/memory/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl MemorySource {
})
.filter_map(|(k, v)| {
let mut event = Event::Log(LogEvent::from_map(
v.as_object_map(now, self.memory.config.ttl, k).ok()?,
v.as_object_map(now, k).ok()?,
EventMetadata::default(),
));
let log = event.as_mut_log();
Expand Down
96 changes: 82 additions & 14 deletions src/enrichment_tables/memory/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use super::source::MemorySource;
pub struct MemoryEntry {
value: String,
update_time: CopyValue<Instant>,
ttl: u64,
}

impl ByteSizeOf for MemoryEntry {
Expand All @@ -47,13 +48,10 @@ impl ByteSizeOf for MemoryEntry {
}

impl MemoryEntry {
pub(super) fn as_object_map(
&self,
now: Instant,
total_ttl: u64,
key: &str,
) -> Result<ObjectMap, String> {
let ttl = total_ttl.saturating_sub(now.duration_since(*self.update_time).as_secs());
pub(super) fn as_object_map(&self, now: Instant, key: &str) -> Result<ObjectMap, String> {
let ttl = self
.ttl
.saturating_sub(now.duration_since(*self.update_time).as_secs());
Ok(ObjectMap::from([
(
KeyString::from("key"),
Expand All @@ -71,8 +69,8 @@ impl MemoryEntry {
]))
}

fn expired(&self, now: Instant, ttl: u64) -> bool {
now.duration_since(*self.update_time).as_secs() > ttl
fn expired(&self, now: Instant) -> bool {
now.duration_since(*self.update_time).as_secs() > self.ttl
}
}

Expand Down Expand Up @@ -119,9 +117,9 @@ impl Memory {
let mut writer = self.write_handle.lock().expect("mutex poisoned");
let now = Instant::now();

for (k, v) in value.into_iter() {
for (k, value) in value.into_iter() {
let new_entry_key = String::from(k);
let Ok(v) = serde_json::to_string(&v) else {
let Ok(v) = serde_json::to_string(&value) else {
emit!(MemoryEnrichmentTableInsertFailed {
key: &new_entry_key,
include_key_metric_tag: self.config.internal_metrics.include_key_tag
Expand All @@ -131,6 +129,15 @@ impl Memory {
let new_entry = MemoryEntry {
value: v,
update_time: now.into(),
ttl: self
.config
.ttl_field
.path
.as_ref()
.and_then(|p| value.get(p))
.and_then(|v| v.as_integer())
.map(|v| v as u64)
.unwrap_or(self.config.ttl),
};
let new_entry_size = new_entry_key.size_of() + new_entry.size_of();
if let Some(max_byte_size) = self.config.max_byte_size
Expand Down Expand Up @@ -173,7 +180,7 @@ impl Memory {
if let Some(reader) = self.get_read_handle().read() {
for (k, v) in reader.iter() {
if let Some(entry) = v.get_one()
&& entry.expired(now, self.config.ttl)
&& entry.expired(now)
{
// Byte size is not reduced at this point, because the actual deletion
// will only happen at refresh time
Expand Down Expand Up @@ -274,8 +281,7 @@ impl Table for Memory {
key: &key,
include_key_metric_tag: self.config.internal_metrics.include_key_tag
});
row.as_object_map(Instant::now(), self.config.ttl, &key)
.map(|r| vec![r])
row.as_object_map(Instant::now(), &key).map(|r| vec![r])
}
None => {
emit!(MemoryEnrichmentTableReadFailed {
Expand Down Expand Up @@ -377,6 +383,7 @@ mod tests {
use std::slice::from_ref;
use std::{num::NonZeroU64, time::Duration};
use tokio::time;
use vector_lib::lookup::lookup_v2::OptionalValuePath;

use vector_lib::{
event::{EventContainer, MetricValue},
Expand Down Expand Up @@ -434,6 +441,7 @@ mod tests {
MemoryEntry {
value: "5".to_string(),
update_time: (Instant::now() - Duration::from_secs(secs_to_subtract)).into(),
ttl,
},
);
handle.write_handle.refresh();
Expand All @@ -454,6 +462,64 @@ mod tests {
);
}

#[test]
fn calculates_ttl_override() {
let global_ttl = 100;
let ttl_override = 10;
let memory = Memory::new(build_memory_config(|c| {
c.ttl = global_ttl;
c.ttl_field = OptionalValuePath::new("ttl");
}));
memory.handle_value(ObjectMap::from([
(
"ttl_override".into(),
Value::from(ObjectMap::from([
("val".into(), Value::from(5)),
("ttl".into(), Value::from(ttl_override)),
])),
),
(
"default_ttl".into(),
Value::from(ObjectMap::from([("val".into(), Value::from(5))])),
),
]));

let default_condition = Condition::Equals {
field: "key",
value: Value::from("default_ttl"),
};
let override_condition = Condition::Equals {
field: "key",
value: Value::from("ttl_override"),
};

assert_eq!(
Ok(ObjectMap::from([
("key".into(), Value::from("default_ttl")),
("ttl".into(), Value::from(global_ttl)),
(
"value".into(),
Value::from(ObjectMap::from([("val".into(), Value::from(5))]))
),
])),
memory.find_table_row(Case::Sensitive, &[default_condition], None, None, None)
);
assert_eq!(
Ok(ObjectMap::from([
("key".into(), Value::from("ttl_override")),
("ttl".into(), Value::from(ttl_override)),
(
"value".into(),
Value::from(ObjectMap::from([
("val".into(), Value::from(5)),
("ttl".into(), Value::from(ttl_override))
]))
),
])),
memory.find_table_row(Case::Sensitive, &[override_condition], None, None, None)
);
}

#[test]
fn removes_expired_records_on_scan_interval() {
let ttl = 100;
Expand All @@ -467,6 +533,7 @@ mod tests {
MemoryEntry {
value: "5".to_string(),
update_time: (Instant::now() - Duration::from_secs(ttl + 10)).into(),
ttl,
},
);
handle.write_handle.refresh();
Expand Down Expand Up @@ -534,6 +601,7 @@ mod tests {
MemoryEntry {
value: "5".to_string(),
update_time: (Instant::now() - Duration::from_secs(ttl / 2)).into(),
ttl,
},
);
handle.write_handle.refresh();
Expand Down
6 changes: 6 additions & 0 deletions website/cue/reference/generated/configuration.cue
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ generated: configuration: configuration: {
required: false
relevant_when: "type = \"memory\""
}
ttl_field: {
type: string: default: ""
description: "Field to read from the incoming value to use as TTL override."
required: false
relevant_when: "type = \"memory\""
}
locale: {
type: string: default: "en"
description: """
Expand Down
Loading