diff --git a/changelog.d/24014_memory_table_source_reload.fix.md b/changelog.d/24014_memory_table_source_reload.fix.md new file mode 100644 index 0000000000000..56b0403da230e --- /dev/null +++ b/changelog.d/24014_memory_table_source_reload.fix.md @@ -0,0 +1,3 @@ +Fixed a crash on configuration reload when memory enrichment tables are configured to be used as a source. + +authors: esensar Quad9DNS diff --git a/src/config/diff.rs b/src/config/diff.rs index fa5f8cf09b8ff..3ecef90ca0174 100644 --- a/src/config/diff.rs +++ b/src/config/diff.rs @@ -26,7 +26,7 @@ impl ConfigDiff { sources: Difference::new(&old.sources, &new.sources, &components_to_reload), transforms: Difference::new(&old.transforms, &new.transforms, &components_to_reload), sinks: Difference::new(&old.sinks, &new.sinks, &components_to_reload), - enrichment_tables: Difference::new_tables( + enrichment_tables: Difference::from_enrichment_tables( &old.enrichment_tables, &new.enrichment_tables, ), @@ -113,39 +113,42 @@ impl Difference { } } - fn new_tables( + fn from_enrichment_tables( old: &IndexMap>, new: &IndexMap>, ) -> Self { - let old_names = old - .iter() - .flat_map(|(k, t)| vec![t.as_source(k).map(|(k, _)| k), t.as_sink(k).map(|(k, _)| k)]) - .flatten() - .collect::>(); - let new_names = new - .iter() - .flat_map(|(k, t)| vec![t.as_source(k).map(|(k, _)| k), t.as_sink(k).map(|(k, _)| k)]) - .flatten() - .collect::>(); + let old_table_keys = extract_table_component_keys(old); + let new_table_keys = extract_table_component_keys(new); - let to_change = old_names - .intersection(&new_names) - .filter(|&n| { + let to_change = old_table_keys + .intersection(&new_table_keys) + .filter(|(table_key, _derived_component_key)| { // This is a hack around the issue of comparing two // trait objects. Json is used here over toml since // toml does not support serializing `None` // to_value is used specifically (instead of string) // to avoid problems comparing serialized HashMaps, // which can iterate in varied orders. - let old_value = serde_json::to_value(&old[n]).unwrap(); - let new_value = serde_json::to_value(&new[n]).unwrap(); + let old_value = serde_json::to_value(&old[*table_key]).unwrap(); + let new_value = serde_json::to_value(&new[*table_key]).unwrap(); old_value != new_value }) .cloned() + .map(|(_table_key, derived_component_key)| derived_component_key) .collect::>(); - let to_remove = &old_names - &new_names; - let to_add = &new_names - &old_names; + // Extract only the derived component keys for the final difference calculation + let old_component_keys = old_table_keys + .into_iter() + .map(|(_table_key, component_key)| component_key) + .collect::>(); + let new_component_keys = new_table_keys + .into_iter() + .map(|(_table_key, component_key)| component_key) + .collect::>(); + + let to_remove = &old_component_keys - &new_component_keys; + let to_add = &new_component_keys - &old_component_keys; Self { to_remove, @@ -201,3 +204,117 @@ impl Difference { self.to_change.iter().chain(self.to_remove.iter()) } } + +/// Helper function to extract component keys from enrichment tables. +fn extract_table_component_keys( + tables: &IndexMap>, +) -> HashSet<(&ComponentKey, ComponentKey)> { + tables + .iter() + .flat_map(|(table_key, table)| { + vec![ + table + .as_source(table_key) + .map(|(component_key, _)| (table_key, component_key)), + table + .as_sink(table_key) + .map(|(component_key, _)| (table_key, component_key)), + ] + }) + .flatten() + .collect() +} + +#[cfg(test)] +mod tests { + use crate::config::ConfigBuilder; + use indoc::indoc; + + use super::*; + + #[test] + fn diff_enrichment_tables_uses_correct_keys() { + let old_config: Config = serde_yaml::from_str::(indoc! {r#" + enrichment_tables: + memory_table: + type: "memory" + ttl: 10 + inputs: [] + source_config: + source_key: "memory_table_source" + export_expired_items: true + export_interval: 50 + + memory_table_unchanged: + type: "memory" + ttl: 10 + inputs: [] + + memory_table_old: + type: "memory" + ttl: 10 + inputs: [] + + sources: + test: + type: "test_basic" + + sinks: + test_sink: + type: "test_basic" + inputs: ["test"] + "#}) + .unwrap() + .build() + .unwrap(); + + let new_config: Config = serde_yaml::from_str::(indoc! {r#" + enrichment_tables: + memory_table: + type: "memory" + ttl: 20 + inputs: [] + source_config: + source_key: "memory_table_source" + export_expired_items: true + export_interval: 50 + + memory_table_unchanged: + type: "memory" + ttl: 10 + inputs: [] + + memory_table_new: + type: "memory" + ttl: 1000 + inputs: [] + + sources: + test: + type: "test_basic" + + sinks: + test_sink: + type: "test_basic" + inputs: ["test"] + "#}) + .unwrap() + .build() + .unwrap(); + + let diff = Difference::from_enrichment_tables( + &old_config.enrichment_tables, + &new_config.enrichment_tables, + ); + + assert_eq!(diff.to_add, HashSet::from_iter(["memory_table_new".into()])); + assert_eq!( + diff.to_remove, + HashSet::from_iter(["memory_table_old".into()]) + ); + assert_eq!( + diff.to_change, + HashSet::from_iter(["memory_table".into(), "memory_table_source".into()]) + ); + } +} diff --git a/src/topology/builder.rs b/src/topology/builder.rs index ff3eff5705484..de9cded5a194d 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -225,7 +225,7 @@ impl<'a> Builder<'a> { .chain( table_sources .iter() - .map(|(key, sink)| (key, sink)) + .map(|(key, source)| (key, source)) .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)), ) {