Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/24014_memory_table_source_reload.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed a crash on configuration reload when memory enrichment tables are configured to be used as a source.

authors: esensar Quad9DNS
119 changes: 114 additions & 5 deletions src/config/diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,29 +119,50 @@ impl Difference {
) -> Self {
let old_names = old
.iter()
.flat_map(|(k, t)| vec![t.as_source(k).map(|(k, _)| k), t.as_sink(k).map(|(k, _)| k)])
.flat_map(|(original_key, t)| {
vec![
t.as_source(original_key).map(|(k, _)| (original_key, k)),
t.as_sink(original_key).map(|(k, _)| (original_key, k)),
]
})
.flatten()
.collect::<HashSet<_>>();
let new_names = new
.iter()
.flat_map(|(k, t)| vec![t.as_source(k).map(|(k, _)| k), t.as_sink(k).map(|(k, _)| k)])
.flat_map(|(original_key, t)| {
vec![
t.as_source(original_key).map(|(k, _)| (original_key, k)),
t.as_sink(original_key).map(|(k, _)| (original_key, k)),
]
})
.flatten()
.collect::<HashSet<_>>();

let to_change = old_names
.intersection(&new_names)
.filter(|&n| {
.filter(|(original_key, _)| {
// This is a hack around the issue of comparing two
// trait objects. Json is used here over toml since
// toml does not support serializing `None`
// to_value is used specifically (instead of string)
// to avoid problems comparing serialized HashMaps,
// which can iterate in varied orders.
let old_value = serde_json::to_value(&old[n]).unwrap();
let new_value = serde_json::to_value(&new[n]).unwrap();
let old_value = serde_json::to_value(&old[*original_key]).unwrap();
let new_value = serde_json::to_value(&new[*original_key]).unwrap();
old_value != new_value
})
.cloned()
.map(|(_, key)| key)
.collect::<HashSet<_>>();

// Remove the original key, since it is no longer needed for changed lookup
let old_names = old_names
.into_iter()
.map(|(_, key)| key)
.collect::<HashSet<_>>();
let new_names = new_names
.into_iter()
.map(|(_, key)| key)
.collect::<HashSet<_>>();

let to_remove = &old_names - &new_names;
Expand Down Expand Up @@ -201,3 +222,91 @@ impl Difference {
self.to_change.iter().chain(self.to_remove.iter())
}
}

#[cfg(test)]
mod tests {
use crate::config::ConfigBuilder;
use indoc::indoc;

use super::*;

#[test]
fn new_tables_uses_correct_keys() {
let old_config: Config = toml::from_str::<ConfigBuilder>(indoc! {r#"
[enrichment_tables.memory_table]
type = "memory"
ttl = 10
inputs = []

[enrichment_tables.memory_table.source_config]
source_key = "memory_table_source"
export_expired_items = true
export_interval = 50

[enrichment_tables.memory_table_unchanged]
type = "memory"
ttl = 10
inputs = []

[enrichment_tables.memory_table_old]
type = "memory"
ttl = 10
inputs = []

[sources.demo]
type = "test_basic"

[sinks.demo_sink]
type = "test_basic"
inputs = ["demo"]
"#})
.unwrap()
.build()
.unwrap();

let new_config: Config = toml::from_str::<ConfigBuilder>(indoc! {r#"
[enrichment_tables.memory_table]
type = "memory"
ttl = 20
inputs = []

[enrichment_tables.memory_table.source_config]
source_key = "memory_table_source"
export_expired_items = true
export_interval = 50

[enrichment_tables.memory_table_unchanged]
type = "memory"
ttl = 10
inputs = []

[enrichment_tables.memory_table_new]
type = "memory"
ttl = 1000
inputs = []

[sources.demo]
type = "test_basic"

[sinks.demo_sink]
type = "test_basic"
inputs = ["demo"]
"#})
.unwrap()
.build()
.unwrap();

let diff =
Difference::new_tables(&old_config.enrichment_tables, &new_config.enrichment_tables);

assert_eq!(diff.to_add, HashSet::from_iter(["memory_table_new".into()]));
assert_eq!(
diff.to_remove,
HashSet::from_iter(["memory_table_old".into()])
);
assert_eq!(
diff.to_change,
HashSet::from_iter(["memory_table".into(), "memory_table_source".into()])
);
}
}
2 changes: 1 addition & 1 deletion src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ impl<'a> Builder<'a> {
.chain(
table_sources
.iter()
.map(|(key, sink)| (key, sink))
.map(|(key, source)| (key, source))
.filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
)
{
Expand Down
Loading