From 2c309f3b491211065f192140dcb54609670fcd55 Mon Sep 17 00:00:00 2001 From: Guillaume Le Blanc Date: Fri, 7 Feb 2025 17:09:27 +0100 Subject: [PATCH 01/16] feat(watch tls files) Add http sink tls cert/key to config::watcher --- .../22386_extend_watcher_paths.enhancement.md | 3 ++ src/app.rs | 38 ++++++++++++------- src/config/sink.rs | 6 +++ src/sinks/http/config.rs | 14 +++++++ 4 files changed, 47 insertions(+), 14 deletions(-) create mode 100644 changelog.d/22386_extend_watcher_paths.enhancement.md diff --git a/changelog.d/22386_extend_watcher_paths.enhancement.md b/changelog.d/22386_extend_watcher_paths.enhancement.md new file mode 100644 index 0000000000000..e849a6ca2e219 --- /dev/null +++ b/changelog.d/22386_extend_watcher_paths.enhancement.md @@ -0,0 +1,3 @@ +extract tls crt and key file from http sinks to add them to the watcher list and therefore let vector restart on change on those files. This should allow extension to other sinks and file, and might be adaptable for enrichment tables as well. + +authors: gllb diff --git a/src/app.rs b/src/app.rs index 32586b4c8c6c2..5d198b6faf4ce 100644 --- a/src/app.rs +++ b/src/app.rs @@ -491,23 +491,14 @@ pub async fn load_configs( ) -> Result { let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?; - if let Some(watcher_conf) = watcher_conf { - // Start listening for config changes immediately. - config::watcher::spawn_thread( - watcher_conf, - signal_handler.clone_tx(), - config_paths.iter().map(Into::into), - None, - ) - .map_err(|error| { - error!(message = "Unable to start config watcher.", %error); - exitcode::CONFIG - })?; - } + let mut watched_paths = config_paths + .iter() + .map(<&PathBuf>::from) + .collect::>(); info!( message = "Loading configs.", - paths = ?config_paths.iter().map(<&PathBuf>::from).collect::>() + paths = ?watched_paths ); let mut config = config::load_from_paths_with_provider_and_secrets( @@ -518,6 +509,25 @@ pub async fn load_configs( .await .map_err(handle_config_errors)?; + if let Some(watcher_conf) = watcher_conf { + for (_, sink) in config.sinks() { + let mut files = sink.inner.files_to_watch(); + watched_paths.append(&mut files); + } + + info!( + message = "Starting watcher.", + paths = ?watched_paths + ); + + // Start listening for config changes. + config::watcher::spawn_thread(watcher_conf, signal_handler.clone_tx(), watched_paths, None) + .map_err(|error| { + error!(message = "Unable to start config watcher.", %error); + exitcode::CONFIG + })?; + } + config::init_log_schema(config.global.log_schema.clone(), true); config::init_telemetry(config.global.telemetry.clone(), true); diff --git a/src/config/sink.rs b/src/config/sink.rs index 7c07575589893..65b28c808a6e3 100644 --- a/src/config/sink.rs +++ b/src/config/sink.rs @@ -3,6 +3,7 @@ use std::cell::RefCell; use async_trait::async_trait; use dyn_clone::DynClone; use serde::Serialize; +use std::path::PathBuf; use vector_lib::buffers::{BufferConfig, BufferType}; use vector_lib::configurable::attributes::CustomAttribute; use vector_lib::configurable::schema::{SchemaGenerator, SchemaObject}; @@ -220,6 +221,11 @@ pub trait SinkConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync /// Gets the input configuration for this sink. fn input(&self) -> Input; + /// Gets the files to watch to trigger reload + fn files_to_watch(&self) -> Vec<&PathBuf> { + Vec::new() + } + /// Gets the list of resources, if any, used by this sink. /// /// Resources represent dependencies -- network ports, file descriptors, and so on -- that diff --git a/src/sinks/http/config.rs b/src/sinks/http/config.rs index e78c1a93adf75..9e628b258ac22 100644 --- a/src/sinks/http/config.rs +++ b/src/sinks/http/config.rs @@ -3,6 +3,7 @@ use http::{header::AUTHORIZATION, HeaderName, HeaderValue, Method, Request, StatusCode}; use hyper::Body; use indexmap::IndexMap; +use std::path::PathBuf; use vector_lib::codecs::{ encoding::{Framer, Serializer}, CharacterDelimitedEncoder, @@ -301,6 +302,19 @@ impl SinkConfig for HttpSinkConfig { Input::new(self.encoding.config().1.input_type()) } + fn files_to_watch(&self) -> Vec<&PathBuf> { + let mut files = Vec::new(); + if let Some(tls) = &self.tls { + if let Some(crt_file) = &tls.crt_file { + files.push(crt_file) + } + if let Some(key_file) = &tls.key_file { + files.push(key_file) + } + }; + files + } + fn acknowledgements(&self) -> &AcknowledgementsConfig { &self.acknowledgements } From 8cc8f7e2df7e0d3dba66cad7e1207b9756ca26d5 Mon Sep 17 00:00:00 2001 From: Guillaume Le Blanc Date: Fri, 7 Feb 2025 19:53:18 +0100 Subject: [PATCH 02/16] Update changelog.d/22386_extend_watcher_paths.enhancement.md Co-authored-by: Pavlos Rontidis --- changelog.d/22386_extend_watcher_paths.enhancement.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/22386_extend_watcher_paths.enhancement.md b/changelog.d/22386_extend_watcher_paths.enhancement.md index e849a6ca2e219..ce1b81cccb5c1 100644 --- a/changelog.d/22386_extend_watcher_paths.enhancement.md +++ b/changelog.d/22386_extend_watcher_paths.enhancement.md @@ -1,3 +1,3 @@ -extract tls crt and key file from http sinks to add them to the watcher list and therefore let vector restart on change on those files. This should allow extension to other sinks and file, and might be adaptable for enrichment tables as well. +The TLS `crt_file` and `key_file` from `http` sinks are now part of the watcher list and therefore they are reloaded on Vector restart. authors: gllb From 8d6310d70f326dca40cf691756e0345535652537 Mon Sep 17 00:00:00 2001 From: Guillaume Le Blanc Date: Thu, 27 Feb 2025 11:33:28 +0100 Subject: [PATCH 03/16] feat(watch tls file) Add ComponentConfig to let watcher find components --- src/app.rs | 13 ++++++++----- src/config/mod.rs | 24 ++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/src/app.rs b/src/app.rs index 5d198b6faf4ce..1cdff206cab70 100644 --- a/src/app.rs +++ b/src/app.rs @@ -18,7 +18,7 @@ use crate::extra_context::ExtraContext; use crate::{api, internal_events::ApiStarted}; use crate::{ cli::{handle_config_errors, LogFormat, Opts, RootOpts, WatchConfigMethod}, - config::{self, Config, ConfigPath}, + config::{self, Config, ConfigPath, ComponentConfig}, heartbeat, internal_events::{VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped}, signal::{SignalHandler, SignalPair, SignalRx, SignalTo}, @@ -491,7 +491,7 @@ pub async fn load_configs( ) -> Result { let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?; - let mut watched_paths = config_paths + let watched_paths = config_paths .iter() .map(<&PathBuf>::from) .collect::>(); @@ -509,10 +509,13 @@ pub async fn load_configs( .await .map_err(handle_config_errors)?; + let mut watched_component_paths = Vec::new(); + if let Some(watcher_conf) = watcher_conf { - for (_, sink) in config.sinks() { - let mut files = sink.inner.files_to_watch(); - watched_paths.append(&mut files); + for (name, sink) in config.sinks() { + let files = sink.inner.files_to_watch(); + let component_config = ComponentConfig::new(files.into_iter().cloned().collect(), name.clone()); + watched_component_paths.push(component_config); } info!( diff --git a/src/config/mod.rs b/src/config/mod.rs index bb3082e8cef0f..a7d35db6f5543 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -70,6 +70,30 @@ pub use vector_lib::{ id::Inputs, }; +#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)] +pub struct ComponentConfig { + pub config_paths: Vec, + pub component_key: ComponentKey, +} + +impl ComponentConfig { + pub fn new(config_paths: Vec, component_key: ComponentKey) -> Self { + Self { + config_paths, + component_key, + } + } + + pub fn contains(&self, config_paths: &Vec) -> Option { + for i in config_paths { + if self.config_paths.contains(&i) { + return Some(self.component_key.clone()) + } + } + None + } +} + #[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)] pub enum ConfigPath { File(PathBuf, FormatHint), From ad9d0e8d86f1e97f15dff8bb25865a8e736d19c8 Mon Sep 17 00:00:00 2001 From: Guillaume Le Blanc Date: Fri, 28 Feb 2025 12:01:45 +0100 Subject: [PATCH 04/16] feat(watch tls file) Let watcher send ReloadComponent signal --- src/app.rs | 2 +- src/config/watcher.rs | 32 +++++++++++++++++++++++++++++--- src/signal.rs | 2 ++ 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/src/app.rs b/src/app.rs index 1cdff206cab70..15a59c36a2c02 100644 --- a/src/app.rs +++ b/src/app.rs @@ -524,7 +524,7 @@ pub async fn load_configs( ); // Start listening for config changes. - config::watcher::spawn_thread(watcher_conf, signal_handler.clone_tx(), watched_paths, None) + config::watcher::spawn_thread(watcher_conf, signal_handler.clone_tx(), watched_paths, watched_component_paths, None) .map_err(|error| { error!(message = "Unable to start config watcher.", %error); exitcode::CONFIG diff --git a/src/config/watcher.rs b/src/config/watcher.rs index 269b127f892ab..edc54bf4400be 100644 --- a/src/config/watcher.rs +++ b/src/config/watcher.rs @@ -7,6 +7,8 @@ use std::{ thread, }; +use crate::config::ComponentConfig; + use notify::{recommended_watcher, EventKind, RecursiveMode}; use crate::Error; @@ -67,9 +69,19 @@ pub fn spawn_thread<'a>( watcher_conf: WatcherConfig, signal_tx: crate::signal::SignalTx, config_paths: impl IntoIterator + 'a, + component_configs: Vec, delay: impl Into>, ) -> Result<(), Error> { - let config_paths: Vec<_> = config_paths.into_iter().cloned().collect(); + let mut config_paths: Vec<_> = config_paths.into_iter().cloned().collect(); + let mut component_config_paths: Vec<_> = component_configs + .clone() + .into_iter() + .map(|p| p.config_paths.clone()) + .flatten() + .collect(); + + config_paths.append(&mut component_config_paths); + let delay = delay.into().unwrap_or(CONFIG_WATCH_DELAY); // Create watcher now so not to miss any changes happening between @@ -92,6 +104,19 @@ pub fn spawn_thread<'a>( debug!(message = "Consumed file change events for delay.", delay = ?delay); + let component_keys: Vec<_> = component_configs + .clone() + .into_iter() + .map(|p| p.contains(&event.paths)) + .flatten() + .collect(); + + for component_key in component_keys { + info!("Component {} configuration changed.", component_key); + _ = signal_tx.send(crate::signal::SignalTo::ReloadComponent(component_key)).map_err(|error| { + error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error) + }); + } // We need to read paths to resolve any inode changes that may have happened. // And we need to do it before raising sighup to avoid missing any change. if let Err(error) = watcher.add_paths(&config_paths) { @@ -182,12 +207,13 @@ mod tests { let dir = temp_dir().to_path_buf(); let file_path = dir.join("vector.toml"); let watcher_conf = WatcherConfig::RecommendedWatcher; - + let component_file_path = Vec::new(dir.join("tls.cert"), dir.join("tls.key")); + let component_config = ComponentConfig::new(component_file_path, ComponentKey::from("http")); std::fs::create_dir(&dir).unwrap(); let mut file = File::create(&file_path).unwrap(); let (signal_tx, signal_rx) = broadcast::channel(128); - spawn_thread(watcher_conf, signal_tx, &[dir], delay).unwrap(); + spawn_thread(watcher_conf, signal_tx, &[dir], component_file_path, delay).unwrap(); if !test(&mut file, delay * 5, signal_rx).await { panic!("Test timed out"); diff --git a/src/signal.rs b/src/signal.rs index 91a1e3515f798..61edcaf1aaacf 100644 --- a/src/signal.rs +++ b/src/signal.rs @@ -14,6 +14,8 @@ pub type SignalRx = broadcast::Receiver; /// Control messages used by Vector to drive topology and shutdown events. #[allow(clippy::large_enum_variant)] // discovered during Rust upgrade to 1.57; just allowing for now since we did previously pub enum SignalTo { + /// Signal to reload a given component. + ReloadComponent(ComponentKey), /// Signal to reload config from a string. ReloadFromConfigBuilder(ConfigBuilder), /// Signal to reload config from the filesystem. From cf0a9dda0ae259e451534f1971b8e27270b00523 Mon Sep 17 00:00:00 2001 From: Guillaume Le Blanc Date: Fri, 28 Feb 2025 12:03:11 +0100 Subject: [PATCH 05/16] feat(watch tls file) handle ReloadComponent signal --- src/app.rs | 33 ++++++++++++++++++++++++++++----- src/topology/controller.rs | 7 +++++-- src/topology/running.rs | 12 ++++++++++-- 3 files changed, 43 insertions(+), 9 deletions(-) diff --git a/src/app.rs b/src/app.rs index 15a59c36a2c02..438ec9242a157 100644 --- a/src/app.rs +++ b/src/app.rs @@ -18,7 +18,7 @@ use crate::extra_context::ExtraContext; use crate::{api, internal_events::ApiStarted}; use crate::{ cli::{handle_config_errors, LogFormat, Opts, RootOpts, WatchConfigMethod}, - config::{self, Config, ConfigPath, ComponentConfig}, + config::{self, Config, ConfigPath, ComponentConfig, ComponentKey}, heartbeat, internal_events::{VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped}, signal::{SignalHandler, SignalPair, SignalRx, SignalTo}, @@ -336,9 +336,31 @@ async fn handle_signal( allow_empty_config: bool, ) -> Option { match signal { + Ok(SignalTo::ReloadComponent(component_key)) => { + let mut topology_controller = topology_controller.lock().await; + + // Reload paths + if let Some(paths) = config::process_paths(config_paths) { + topology_controller.config_paths = paths; + } + + // Reload config + let new_config = config::load_from_paths_with_provider_and_secrets( + &topology_controller.config_paths, + signal_handler, + allow_empty_config, + ) + .await; + + reload_config_from_result( + topology_controller, + new_config, + Some(&component_key)) + .await + } Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => { let topology_controller = topology_controller.lock().await; - reload_config_from_result(topology_controller, config_builder.build()).await + reload_config_from_result(topology_controller, config_builder.build(), None).await } Ok(SignalTo::ReloadFromDisk) => { let mut topology_controller = topology_controller.lock().await; @@ -354,9 +376,9 @@ async fn handle_signal( signal_handler, allow_empty_config, ) - .await; + .await; - reload_config_from_result(topology_controller, new_config).await + reload_config_from_result(topology_controller, new_config, None).await } Err(RecvError::Lagged(amt)) => { warn!("Overflow, dropped {} signals.", amt); @@ -370,9 +392,10 @@ async fn handle_signal( async fn reload_config_from_result( mut topology_controller: MutexGuard<'_, TopologyController>, config: Result>, + component_to_reload: Option<&ComponentKey> ) -> Option { match config { - Ok(new_config) => match topology_controller.reload(new_config).await { + Ok(new_config) => match topology_controller.reload(new_config, component_to_reload).await { ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))), _ => None, }, diff --git a/src/topology/controller.rs b/src/topology/controller.rs index 4e2066e5a814a..d1c5cb105a49c 100644 --- a/src/topology/controller.rs +++ b/src/topology/controller.rs @@ -59,7 +59,10 @@ pub enum ReloadOutcome { } impl TopologyController { - pub async fn reload(&mut self, mut new_config: config::Config) -> ReloadOutcome { + pub async fn reload( + &mut self, mut new_config: config::Config, + component_to_reload: Option<&config::ComponentKey> + ) -> ReloadOutcome { new_config .healthchecks .set_require_healthy(self.require_healthy); @@ -103,7 +106,7 @@ impl TopologyController { match self .topology - .reload_config_and_respawn(new_config, self.extra_context.clone()) + .reload_config_and_respawn(new_config, self.extra_context.clone(), component_to_reload) .await { Ok(true) => { diff --git a/src/topology/running.rs b/src/topology/running.rs index 4ff91c510fe8c..1837e57a76517 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -227,6 +227,7 @@ impl RunningTopology { &mut self, new_config: Config, extra_context: ExtraContext, + component_to_reload: Option<&ComponentKey>, ) -> Result { info!("Reloading running topology with new configuration."); @@ -244,7 +245,7 @@ impl RunningTopology { // // We also shutdown any component that is simply being removed entirely. let diff = ConfigDiff::new(&self.config, &new_config); - let buffers = self.shutdown_diff(&diff, &new_config).await; + let buffers = self.shutdown_diff(&diff, &new_config, component_to_reload).await; // Gives windows some time to make available any port // released by shutdown components. @@ -349,6 +350,7 @@ impl RunningTopology { &mut self, diff: &ConfigDiff, new_config: &Config, + component_to_reload: Option<&ComponentKey>, ) -> HashMap { // First, we shutdown any changed/removed sources. This ensures that we can allow downstream // components to terminate naturally by virtue of the flow of events stopping. @@ -532,7 +534,7 @@ impl RunningTopology { // they can naturally shutdown and allow us to recover their buffers if possible. let mut buffer_tx = HashMap::new(); - let sinks_to_change = diff + let mut sinks_to_change = diff .sinks .to_change .iter() @@ -543,6 +545,12 @@ impl RunningTopology { .is_some() })) .collect::>(); + + match component_to_reload { + Some(component) => sinks_to_change.push(component), + _ => (), + } + for key in &sinks_to_change { debug!(component = %key, "Changing sink."); if reuse_buffers.contains(key) { From 8b491c6dc10c5ca6faf05f5e87a9c2c3ac6e66d2 Mon Sep 17 00:00:00 2001 From: Guillaume Le Blanc Date: Fri, 28 Feb 2025 14:37:23 +0100 Subject: [PATCH 06/16] feat(watch tls file) Ensure signal is sent once in case of ReloadComponent --- src/config/watcher.rs | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/config/watcher.rs b/src/config/watcher.rs index edc54bf4400be..2edaa2e88b48a 100644 --- a/src/config/watcher.rs +++ b/src/config/watcher.rs @@ -111,12 +111,6 @@ pub fn spawn_thread<'a>( .flatten() .collect(); - for component_key in component_keys { - info!("Component {} configuration changed.", component_key); - _ = signal_tx.send(crate::signal::SignalTo::ReloadComponent(component_key)).map_err(|error| { - error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error) - }); - } // We need to read paths to resolve any inode changes that may have happened. // And we need to do it before raising sighup to avoid missing any change. if let Err(error) = watcher.add_paths(&config_paths) { @@ -127,9 +121,19 @@ pub fn spawn_thread<'a>( debug!(message = "Reloaded paths."); info!("Configuration file changed."); - _ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| { - error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error) - }); + if component_keys.len() > 0 { + for component_key in component_keys { + info!("Component {} configuration changed.", component_key); + _ = signal_tx.send(crate::signal::SignalTo::ReloadComponent(component_key)).map_err(|error| { + error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error) + }); + } + } else { + _ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk) + .map_err(|error| { + error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error) + }); + } } else { debug!(message = "Ignoring event.", event = ?event) } From ecee69cc94627bb15a0b902768de8b146e2030dc Mon Sep 17 00:00:00 2001 From: Guillaume Le Blanc Date: Mon, 3 Mar 2025 13:04:33 +0100 Subject: [PATCH 07/16] (feat reload components) Handle Vec instead of single one --- lib/vector-common/src/config.rs | 6 ++++++ src/app.rs | 8 ++++---- src/config/watcher.rs | 10 ++++------ src/signal.rs | 4 ++-- src/topology/controller.rs | 4 ++-- src/topology/running.rs | 10 +++++----- 6 files changed, 23 insertions(+), 19 deletions(-) diff --git a/lib/vector-common/src/config.rs b/lib/vector-common/src/config.rs index 621915e322ce7..605b5a2c55f80 100644 --- a/lib/vector-common/src/config.rs +++ b/lib/vector-common/src/config.rs @@ -39,6 +39,12 @@ impl ComponentKey { } } +impl AsRef for ComponentKey { + fn as_ref(&self) -> &ComponentKey { + &self + } +} + impl From for ComponentKey { fn from(id: String) -> Self { Self { id } diff --git a/src/app.rs b/src/app.rs index 438ec9242a157..812372b46c638 100644 --- a/src/app.rs +++ b/src/app.rs @@ -336,7 +336,7 @@ async fn handle_signal( allow_empty_config: bool, ) -> Option { match signal { - Ok(SignalTo::ReloadComponent(component_key)) => { + Ok(SignalTo::ReloadComponents(component_keys)) => { let mut topology_controller = topology_controller.lock().await; // Reload paths @@ -355,7 +355,7 @@ async fn handle_signal( reload_config_from_result( topology_controller, new_config, - Some(&component_key)) + Some(component_keys.iter().map(AsRef::as_ref).collect())) .await } Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => { @@ -392,10 +392,10 @@ async fn handle_signal( async fn reload_config_from_result( mut topology_controller: MutexGuard<'_, TopologyController>, config: Result>, - component_to_reload: Option<&ComponentKey> + components_to_reload: Option> ) -> Option { match config { - Ok(new_config) => match topology_controller.reload(new_config, component_to_reload).await { + Ok(new_config) => match topology_controller.reload(new_config, components_to_reload).await { ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))), _ => None, }, diff --git a/src/config/watcher.rs b/src/config/watcher.rs index 2edaa2e88b48a..4c5bcc0f95e8a 100644 --- a/src/config/watcher.rs +++ b/src/config/watcher.rs @@ -122,12 +122,10 @@ pub fn spawn_thread<'a>( info!("Configuration file changed."); if component_keys.len() > 0 { - for component_key in component_keys { - info!("Component {} configuration changed.", component_key); - _ = signal_tx.send(crate::signal::SignalTo::ReloadComponent(component_key)).map_err(|error| { - error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error) - }); - } + info!("Component {:?} configuration changed.", component_keys); + _ = signal_tx.send(crate::signal::SignalTo::ReloadComponents(component_keys)).map_err(|error| { + error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error) + }); } else { _ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk) .map_err(|error| { diff --git a/src/signal.rs b/src/signal.rs index 61edcaf1aaacf..8057cd947670d 100644 --- a/src/signal.rs +++ b/src/signal.rs @@ -14,8 +14,8 @@ pub type SignalRx = broadcast::Receiver; /// Control messages used by Vector to drive topology and shutdown events. #[allow(clippy::large_enum_variant)] // discovered during Rust upgrade to 1.57; just allowing for now since we did previously pub enum SignalTo { - /// Signal to reload a given component. - ReloadComponent(ComponentKey), + /// Signal to reload given components. + ReloadComponents(Vec), /// Signal to reload config from a string. ReloadFromConfigBuilder(ConfigBuilder), /// Signal to reload config from the filesystem. diff --git a/src/topology/controller.rs b/src/topology/controller.rs index d1c5cb105a49c..51b87fe1be355 100644 --- a/src/topology/controller.rs +++ b/src/topology/controller.rs @@ -61,7 +61,7 @@ pub enum ReloadOutcome { impl TopologyController { pub async fn reload( &mut self, mut new_config: config::Config, - component_to_reload: Option<&config::ComponentKey> + components_to_reload: Option> ) -> ReloadOutcome { new_config .healthchecks @@ -106,7 +106,7 @@ impl TopologyController { match self .topology - .reload_config_and_respawn(new_config, self.extra_context.clone(), component_to_reload) + .reload_config_and_respawn(new_config, self.extra_context.clone(), components_to_reload) .await { Ok(true) => { diff --git a/src/topology/running.rs b/src/topology/running.rs index 1837e57a76517..5efad6ab15b75 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -227,7 +227,7 @@ impl RunningTopology { &mut self, new_config: Config, extra_context: ExtraContext, - component_to_reload: Option<&ComponentKey>, + components_to_reload: Option>, ) -> Result { info!("Reloading running topology with new configuration."); @@ -245,7 +245,7 @@ impl RunningTopology { // // We also shutdown any component that is simply being removed entirely. let diff = ConfigDiff::new(&self.config, &new_config); - let buffers = self.shutdown_diff(&diff, &new_config, component_to_reload).await; + let buffers = self.shutdown_diff(&diff, &new_config, components_to_reload).await; // Gives windows some time to make available any port // released by shutdown components. @@ -350,7 +350,7 @@ impl RunningTopology { &mut self, diff: &ConfigDiff, new_config: &Config, - component_to_reload: Option<&ComponentKey>, + components_to_reload: Option>, ) -> HashMap { // First, we shutdown any changed/removed sources. This ensures that we can allow downstream // components to terminate naturally by virtue of the flow of events stopping. @@ -546,8 +546,8 @@ impl RunningTopology { })) .collect::>(); - match component_to_reload { - Some(component) => sinks_to_change.push(component), + match components_to_reload { + Some(mut components) => sinks_to_change.append(&mut components), _ => (), } From c3c896863ab8b9d7c0402d0b3df32ab0f477ae6c Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Tue, 4 Mar 2025 11:33:41 -0500 Subject: [PATCH 08/16] cargo fmt --- lib/vector-common/src/config.rs | 6 +++--- src/app.rs | 37 +++++++++++++++++++++------------ src/config/mod.rs | 2 +- src/config/watcher.rs | 3 ++- src/topology/controller.rs | 5 +++-- src/topology/running.rs | 4 +++- 6 files changed, 36 insertions(+), 21 deletions(-) diff --git a/lib/vector-common/src/config.rs b/lib/vector-common/src/config.rs index 605b5a2c55f80..4dbb06cd1ef03 100644 --- a/lib/vector-common/src/config.rs +++ b/lib/vector-common/src/config.rs @@ -40,9 +40,9 @@ impl ComponentKey { } impl AsRef for ComponentKey { - fn as_ref(&self) -> &ComponentKey { - &self - } + fn as_ref(&self) -> &ComponentKey { + &self + } } impl From for ComponentKey { diff --git a/src/app.rs b/src/app.rs index 812372b46c638..283ba707c8ae9 100644 --- a/src/app.rs +++ b/src/app.rs @@ -18,7 +18,7 @@ use crate::extra_context::ExtraContext; use crate::{api, internal_events::ApiStarted}; use crate::{ cli::{handle_config_errors, LogFormat, Opts, RootOpts, WatchConfigMethod}, - config::{self, Config, ConfigPath, ComponentConfig, ComponentKey}, + config::{self, ComponentConfig, ComponentKey, Config, ConfigPath}, heartbeat, internal_events::{VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped}, signal::{SignalHandler, SignalPair, SignalRx, SignalTo}, @@ -350,13 +350,14 @@ async fn handle_signal( signal_handler, allow_empty_config, ) - .await; + .await; reload_config_from_result( topology_controller, new_config, - Some(component_keys.iter().map(AsRef::as_ref).collect())) - .await + Some(component_keys.iter().map(AsRef::as_ref).collect()), + ) + .await } Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => { let topology_controller = topology_controller.lock().await; @@ -376,7 +377,7 @@ async fn handle_signal( signal_handler, allow_empty_config, ) - .await; + .await; reload_config_from_result(topology_controller, new_config, None).await } @@ -392,10 +393,13 @@ async fn handle_signal( async fn reload_config_from_result( mut topology_controller: MutexGuard<'_, TopologyController>, config: Result>, - components_to_reload: Option> + components_to_reload: Option>, ) -> Option { match config { - Ok(new_config) => match topology_controller.reload(new_config, components_to_reload).await { + Ok(new_config) => match topology_controller + .reload(new_config, components_to_reload) + .await + { ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))), _ => None, }, @@ -537,7 +541,8 @@ pub async fn load_configs( if let Some(watcher_conf) = watcher_conf { for (name, sink) in config.sinks() { let files = sink.inner.files_to_watch(); - let component_config = ComponentConfig::new(files.into_iter().cloned().collect(), name.clone()); + let component_config = + ComponentConfig::new(files.into_iter().cloned().collect(), name.clone()); watched_component_paths.push(component_config); } @@ -547,11 +552,17 @@ pub async fn load_configs( ); // Start listening for config changes. - config::watcher::spawn_thread(watcher_conf, signal_handler.clone_tx(), watched_paths, watched_component_paths, None) - .map_err(|error| { - error!(message = "Unable to start config watcher.", %error); - exitcode::CONFIG - })?; + config::watcher::spawn_thread( + watcher_conf, + signal_handler.clone_tx(), + watched_paths, + watched_component_paths, + None, + ) + .map_err(|error| { + error!(message = "Unable to start config watcher.", %error); + exitcode::CONFIG + })?; } config::init_log_schema(config.global.log_schema.clone(), true); diff --git a/src/config/mod.rs b/src/config/mod.rs index e0fe361b62045..f4e307e3cc1b7 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -93,7 +93,7 @@ impl ComponentConfig { pub fn contains(&self, config_paths: &Vec) -> Option { for i in config_paths { if self.config_paths.contains(&i) { - return Some(self.component_key.clone()) + return Some(self.component_key.clone()); } } None diff --git a/src/config/watcher.rs b/src/config/watcher.rs index 4c5bcc0f95e8a..d7c5046bde26f 100644 --- a/src/config/watcher.rs +++ b/src/config/watcher.rs @@ -210,7 +210,8 @@ mod tests { let file_path = dir.join("vector.toml"); let watcher_conf = WatcherConfig::RecommendedWatcher; let component_file_path = Vec::new(dir.join("tls.cert"), dir.join("tls.key")); - let component_config = ComponentConfig::new(component_file_path, ComponentKey::from("http")); + let component_config = + ComponentConfig::new(component_file_path, ComponentKey::from("http")); std::fs::create_dir(&dir).unwrap(); let mut file = File::create(&file_path).unwrap(); diff --git a/src/topology/controller.rs b/src/topology/controller.rs index 51b87fe1be355..f485ee6d56524 100644 --- a/src/topology/controller.rs +++ b/src/topology/controller.rs @@ -60,8 +60,9 @@ pub enum ReloadOutcome { impl TopologyController { pub async fn reload( - &mut self, mut new_config: config::Config, - components_to_reload: Option> + &mut self, + mut new_config: config::Config, + components_to_reload: Option>, ) -> ReloadOutcome { new_config .healthchecks diff --git a/src/topology/running.rs b/src/topology/running.rs index 6f71147383c75..fed741786cae3 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -245,7 +245,9 @@ impl RunningTopology { // // We also shutdown any component that is simply being removed entirely. let diff = ConfigDiff::new(&self.config, &new_config); - let buffers = self.shutdown_diff(&diff, &new_config, components_to_reload).await; + let buffers = self + .shutdown_diff(&diff, &new_config, components_to_reload) + .await; // Gives windows some time to make available any port // released by shutdown components. From 95df0da77df1b1d8de12120744f2be94ced00cee Mon Sep 17 00:00:00 2001 From: Guillaume Le Blanc Date: Wed, 5 Mar 2025 16:48:17 +0100 Subject: [PATCH 09/16] Update changelog.d/22386_extend_watcher_paths.enhancement.md Co-authored-by: Pavlos Rontidis --- changelog.d/22386_extend_watcher_paths.enhancement.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/22386_extend_watcher_paths.enhancement.md b/changelog.d/22386_extend_watcher_paths.enhancement.md index ce1b81cccb5c1..0f3aabcbde3c6 100644 --- a/changelog.d/22386_extend_watcher_paths.enhancement.md +++ b/changelog.d/22386_extend_watcher_paths.enhancement.md @@ -1,3 +1,3 @@ -The TLS `crt_file` and `key_file` from `http` sinks are now part of the watcher list and therefore they are reloaded on Vector restart. +The TLS `crt_file` and `key_file` from `http` sinks are now watched when `--watch_config` is enabled and therefore changes to those files will trigger a config reload without the need to restart Vector. authors: gllb From 5811fd8d3e653594a085462914b483e40316168d Mon Sep 17 00:00:00 2001 From: Guillaume Le Blanc Date: Wed, 5 Mar 2025 16:48:49 +0100 Subject: [PATCH 10/16] Update src/config/mod.rs Co-authored-by: Pavlos Rontidis --- src/config/mod.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index f4e307e3cc1b7..67d01e4452ff1 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -90,15 +90,12 @@ impl ComponentConfig { } } - pub fn contains(&self, config_paths: &Vec) -> Option { - for i in config_paths { - if self.config_paths.contains(&i) { - return Some(self.component_key.clone()); - } + pub fn contains(&self, config_paths: &[PathBuf]) -> Option { + if config_paths.iter().any(|p| self.config_paths.contains(p)) { + return Some(self.component_key.clone()); } None } -} #[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)] pub enum ConfigPath { From d67394c459ea9f943fc69d8f051612988e1a6c10 Mon Sep 17 00:00:00 2001 From: Guillaume Le Blanc Date: Thu, 6 Mar 2025 09:57:01 +0100 Subject: [PATCH 11/16] Fix missing enclosed delimiter --- src/config/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/config/mod.rs b/src/config/mod.rs index 67d01e4452ff1..2fe2d219b55b8 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -96,6 +96,7 @@ impl ComponentConfig { } None } +} #[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)] pub enum ConfigPath { From 214cce4c043c331df15394c3b9f288655edcc449 Mon Sep 17 00:00:00 2001 From: Guillaume Le Blanc Date: Thu, 6 Mar 2025 10:58:55 +0100 Subject: [PATCH 12/16] solve clippy issues --- lib/vector-common/src/config.rs | 2 +- src/config/mod.rs | 2 +- src/config/watcher.rs | 19 +++++++++---------- src/topology/running.rs | 5 ++--- src/topology/test/doesnt_reload.rs | 2 +- src/topology/test/mod.rs | 24 ++++++++++++------------ src/topology/test/reload.rs | 12 ++++++------ src/topology/test/transient_state.rs | 8 ++++---- 8 files changed, 36 insertions(+), 38 deletions(-) diff --git a/lib/vector-common/src/config.rs b/lib/vector-common/src/config.rs index 4dbb06cd1ef03..6c9d7eb9626b8 100644 --- a/lib/vector-common/src/config.rs +++ b/lib/vector-common/src/config.rs @@ -41,7 +41,7 @@ impl ComponentKey { impl AsRef for ComponentKey { fn as_ref(&self) -> &ComponentKey { - &self + self } } diff --git a/src/config/mod.rs b/src/config/mod.rs index 2fe2d219b55b8..6478203229475 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -83,7 +83,7 @@ pub struct ComponentConfig { } impl ComponentConfig { - pub fn new(config_paths: Vec, component_key: ComponentKey) -> Self { + pub const fn new(config_paths: Vec, component_key: ComponentKey) -> Self { Self { config_paths, component_key, diff --git a/src/config/watcher.rs b/src/config/watcher.rs index d7c5046bde26f..6f63bffec166d 100644 --- a/src/config/watcher.rs +++ b/src/config/watcher.rs @@ -76,8 +76,7 @@ pub fn spawn_thread<'a>( let mut component_config_paths: Vec<_> = component_configs .clone() .into_iter() - .map(|p| p.config_paths.clone()) - .flatten() + .flat_map(|p| p.config_paths.clone()) .collect(); config_paths.append(&mut component_config_paths); @@ -107,8 +106,7 @@ pub fn spawn_thread<'a>( let component_keys: Vec<_> = component_configs .clone() .into_iter() - .map(|p| p.contains(&event.paths)) - .flatten() + .flat_map(|p| p.contains(&event.paths)) .collect(); // We need to read paths to resolve any inode changes that may have happened. @@ -121,7 +119,7 @@ pub fn spawn_thread<'a>( debug!(message = "Reloaded paths."); info!("Configuration file changed."); - if component_keys.len() > 0 { + if !component_keys.is_empty() { info!("Component {:?} configuration changed.", component_keys); _ = signal_tx.send(crate::signal::SignalTo::ReloadComponents(component_keys)).map_err(|error| { error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error) @@ -187,6 +185,7 @@ mod tests { use crate::{ signal::SignalRx, test_util::{temp_dir, temp_file, trace_init}, + config::ComponentKey, }; use std::{fs::File, io::Write, time::Duration}; use tokio::sync::broadcast; @@ -209,14 +208,14 @@ mod tests { let dir = temp_dir().to_path_buf(); let file_path = dir.join("vector.toml"); let watcher_conf = WatcherConfig::RecommendedWatcher; - let component_file_path = Vec::new(dir.join("tls.cert"), dir.join("tls.key")); + let component_file_path = vec![dir.join("tls.cert"), dir.join("tls.key")]; let component_config = ComponentConfig::new(component_file_path, ComponentKey::from("http")); std::fs::create_dir(&dir).unwrap(); let mut file = File::create(&file_path).unwrap(); let (signal_tx, signal_rx) = broadcast::channel(128); - spawn_thread(watcher_conf, signal_tx, &[dir], component_file_path, delay).unwrap(); + spawn_thread(watcher_conf, signal_tx, &[dir], vec![component_config], delay).unwrap(); if !test(&mut file, delay * 5, signal_rx).await { panic!("Test timed out"); @@ -233,7 +232,7 @@ mod tests { let watcher_conf = WatcherConfig::RecommendedWatcher; let (signal_tx, signal_rx) = broadcast::channel(128); - spawn_thread(watcher_conf, signal_tx, &[file_path], delay).unwrap(); + spawn_thread(watcher_conf, signal_tx, &[file_path], vec![], delay).unwrap(); if !test(&mut file, delay * 5, signal_rx).await { panic!("Test timed out"); @@ -254,7 +253,7 @@ mod tests { let watcher_conf = WatcherConfig::RecommendedWatcher; let (signal_tx, signal_rx) = broadcast::channel(128); - spawn_thread(watcher_conf, signal_tx, &[sym_file], delay).unwrap(); + spawn_thread(watcher_conf, signal_tx, &[sym_file], vec![], delay).unwrap(); if !test(&mut file, delay * 5, signal_rx).await { panic!("Test timed out"); @@ -275,7 +274,7 @@ mod tests { let mut file = File::create(&file_path).unwrap(); let (signal_tx, signal_rx) = broadcast::channel(128); - spawn_thread(watcher_conf, signal_tx, &[sub_dir], delay).unwrap(); + spawn_thread(watcher_conf, signal_tx, &[sub_dir], vec![], delay).unwrap(); if !test(&mut file, delay * 5, signal_rx).await { panic!("Test timed out"); diff --git a/src/topology/running.rs b/src/topology/running.rs index fed741786cae3..fca1772af2bc0 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -548,9 +548,8 @@ impl RunningTopology { })) .collect::>(); - match components_to_reload { - Some(mut components) => sinks_to_change.append(&mut components), - _ => (), + if let Some(mut components) = components_to_reload { + sinks_to_change.append(&mut components) } for key in &sinks_to_change { diff --git a/src/topology/test/doesnt_reload.rs b/src/topology/test/doesnt_reload.rs index 788ee4c676075..d75f30e537a77 100644 --- a/src/topology/test/doesnt_reload.rs +++ b/src/topology/test/doesnt_reload.rs @@ -23,7 +23,7 @@ async fn topology_doesnt_reload_new_data_dir() { new_config.global.data_dir = Some(Path::new("/qwerty").to_path_buf()); topology - .reload_config_and_respawn(new_config.build().unwrap(), Default::default()) + .reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None) .await .unwrap(); diff --git a/src/topology/test/mod.rs b/src/topology/test/mod.rs index 18c8af61c8eb7..63e17c7e8b152 100644 --- a/src/topology/test/mod.rs +++ b/src/topology/test/mod.rs @@ -301,7 +301,7 @@ async fn topology_remove_one_source() { config.add_sink("out1", &["in1"], sink1); assert!(topology - .reload_config_and_respawn(config.build().unwrap(), Default::default()) + .reload_config_and_respawn(config.build().unwrap(), Default::default(), None) .await .unwrap()); @@ -350,7 +350,7 @@ async fn topology_remove_one_sink() { config.add_sink("out1", &["in1"], basic_sink(10).1); assert!(topology - .reload_config_and_respawn(config.build().unwrap(), Default::default()) + .reload_config_and_respawn(config.build().unwrap(), Default::default(), None) .await .unwrap()); @@ -403,7 +403,7 @@ async fn topology_remove_one_transform() { config.add_sink("out1", &["t2"], sink2); assert!(topology - .reload_config_and_respawn(config.build().unwrap(), Default::default()) + .reload_config_and_respawn(config.build().unwrap(), Default::default(), None) .await .unwrap()); @@ -452,7 +452,7 @@ async fn topology_swap_source() { config.add_sink("out1", &["in2"], sink2); assert!(topology - .reload_config_and_respawn(config.build().unwrap(), Default::default()) + .reload_config_and_respawn(config.build().unwrap(), Default::default(), None) .await .unwrap()); @@ -517,7 +517,7 @@ async fn topology_swap_transform() { config.add_sink("out1", &["t1"], sink2); assert!(topology - .reload_config_and_respawn(config.build().unwrap(), Default::default()) + .reload_config_and_respawn(config.build().unwrap(), Default::default(), None) .await .unwrap()); @@ -569,7 +569,7 @@ async fn topology_swap_sink() { config.add_sink("out1", &["in1"], sink2); assert!(topology - .reload_config_and_respawn(config.build().unwrap(), Default::default()) + .reload_config_and_respawn(config.build().unwrap(), Default::default(), None) .await .unwrap()); @@ -657,7 +657,7 @@ async fn topology_swap_transform_is_atomic() { config.add_sink("out1", &["t1"], basic_sink(10).1); assert!(topology - .reload_config_and_respawn(config.build().unwrap(), Default::default()) + .reload_config_and_respawn(config.build().unwrap(), Default::default(), None) .await .unwrap()); @@ -693,7 +693,7 @@ async fn topology_rebuild_connected() { config.add_sink("out1", &["in1"], sink1); assert!(topology - .reload_config_and_respawn(config.build().unwrap(), Default::default()) + .reload_config_and_respawn(config.build().unwrap(), Default::default(), None) .await .unwrap()); @@ -752,7 +752,7 @@ async fn topology_rebuild_connected_transform() { config.add_sink("out1", &["t2"], sink2); assert!(topology - .reload_config_and_respawn(config.build().unwrap(), Default::default()) + .reload_config_and_respawn(config.build().unwrap(), Default::default(), None) .await .unwrap()); @@ -807,7 +807,7 @@ async fn topology_optional_healthcheck_does_not_fail_reload() { let (mut topology, _) = start_topology(config, false).await; let config = basic_config_with_sink_failing_healthcheck(); assert!(topology - .reload_config_and_respawn(config, Default::default()) + .reload_config_and_respawn(config, Default::default(), None) .await .unwrap()); } @@ -820,7 +820,7 @@ async fn topology_healthcheck_not_run_on_unchanged_reload() { let mut config = basic_config_with_sink_failing_healthcheck(); config.healthchecks.require_healthy = true; assert!(topology - .reload_config_and_respawn(config, Default::default()) + .reload_config_and_respawn(config, Default::default(), None) .await .unwrap()); } @@ -846,7 +846,7 @@ async fn topology_healthcheck_run_for_changes_on_reload() { let mut config = config.build().unwrap(); config.healthchecks.require_healthy = true; assert!(!topology - .reload_config_and_respawn(config, Default::default()) + .reload_config_and_respawn(config, Default::default(), None) .await .unwrap()); } diff --git a/src/topology/test/reload.rs b/src/topology/test/reload.rs index a96546de9efb6..cd3d312eeb287 100644 --- a/src/topology/test/reload.rs +++ b/src/topology/test/reload.rs @@ -65,7 +65,7 @@ async fn topology_reuse_old_port() { let (mut topology, _) = start_topology(old_config.build().unwrap(), false).await; assert!(topology - .reload_config_and_respawn(new_config.build().unwrap(), Default::default()) + .reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None) .await .unwrap()); } @@ -90,7 +90,7 @@ async fn topology_rebuild_old() { let (mut topology, _) = start_topology(old_config.build().unwrap(), false).await; assert!(!topology - .reload_config_and_respawn(new_config.build().unwrap(), Default::default()) + .reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None) .await .unwrap()); } @@ -107,7 +107,7 @@ async fn topology_old() { let (mut topology, _) = start_topology(old_config.clone().build().unwrap(), false).await; assert!(topology - .reload_config_and_respawn(old_config.build().unwrap(), Default::default()) + .reload_config_and_respawn(old_config.build().unwrap(), Default::default(), None) .await .unwrap()); } @@ -258,7 +258,7 @@ async fn topology_readd_input() { new_config.add_source("in2", internal_metrics_source()); new_config.add_sink("out", &["in1"], prom_exporter_sink(address_0, 1)); assert!(topology - .reload_config_and_respawn(new_config.build().unwrap(), Default::default()) + .reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None) .await .unwrap()); @@ -268,7 +268,7 @@ async fn topology_readd_input() { new_config.add_source("in2", internal_metrics_source()); new_config.add_sink("out", &["in1", "in2"], prom_exporter_sink(address_0, 1)); assert!(topology - .reload_config_and_respawn(new_config.build().unwrap(), Default::default()) + .reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None) .await .unwrap()); @@ -299,7 +299,7 @@ async fn reload_sink_test( // Now reload the topology with the "new" configuration, and make sure that a component is now listening on `new_address`. assert!(topology - .reload_config_and_respawn(new_config, Default::default()) + .reload_config_and_respawn(new_config, Default::default(), None) .await .unwrap()); diff --git a/src/topology/test/transient_state.rs b/src/topology/test/transient_state.rs index 514f9dc86457b..eb25acb126a5f 100644 --- a/src/topology/test/transient_state.rs +++ b/src/topology/test/transient_state.rs @@ -30,7 +30,7 @@ async fn closed_source() { topology.sources_finished().await; assert!(topology - .reload_config_and_respawn(new_config.build().unwrap(), Default::default()) + .reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None) .await .unwrap()); } @@ -52,7 +52,7 @@ async fn remove_sink() { let (mut topology, _) = start_topology(old_config.build().unwrap(), false).await; assert!(topology - .reload_config_and_respawn(new_config.build().unwrap(), Default::default()) + .reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None) .await .unwrap()); } @@ -75,7 +75,7 @@ async fn remove_transform() { let (mut topology, _) = start_topology(old_config.build().unwrap(), false).await; assert!(topology - .reload_config_and_respawn(new_config.build().unwrap(), Default::default()) + .reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None) .await .unwrap()); } @@ -99,7 +99,7 @@ async fn replace_transform() { let (mut topology, _) = start_topology(old_config.build().unwrap(), false).await; assert!(topology - .reload_config_and_respawn(new_config.build().unwrap(), Default::default()) + .reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None) .await .unwrap()); } From 8ef872a5699ec77d0382bc487942b2fcfcefbc73 Mon Sep 17 00:00:00 2001 From: Guillaume Le Blanc Date: Thu, 6 Mar 2025 18:11:11 +0100 Subject: [PATCH 13/16] cargo fmt --- src/config/watcher.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/config/watcher.rs b/src/config/watcher.rs index 6f63bffec166d..f9e43abe299ef 100644 --- a/src/config/watcher.rs +++ b/src/config/watcher.rs @@ -183,9 +183,9 @@ fn create_watcher( mod tests { use super::*; use crate::{ + config::ComponentKey, signal::SignalRx, test_util::{temp_dir, temp_file, trace_init}, - config::ComponentKey, }; use std::{fs::File, io::Write, time::Duration}; use tokio::sync::broadcast; @@ -215,7 +215,14 @@ mod tests { let mut file = File::create(&file_path).unwrap(); let (signal_tx, signal_rx) = broadcast::channel(128); - spawn_thread(watcher_conf, signal_tx, &[dir], vec![component_config], delay).unwrap(); + spawn_thread( + watcher_conf, + signal_tx, + &[dir], + vec![component_config], + delay, + ) + .unwrap(); if !test(&mut file, delay * 5, signal_rx).await { panic!("Test timed out"); From 6452ef67277ac84f1ddeb89f6d2d00f359a5510c Mon Sep 17 00:00:00 2001 From: Guillaume Le Blanc Date: Thu, 13 Mar 2025 10:30:37 +0100 Subject: [PATCH 14/16] Fix testing --- src/config/watcher.rs | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/src/config/watcher.rs b/src/config/watcher.rs index f9e43abe299ef..34903462d68d3 100644 --- a/src/config/watcher.rs +++ b/src/config/watcher.rs @@ -200,6 +200,19 @@ mod tests { ) } + async fn test_component_reload(file: &mut File, + expected_component: &ComponentKey, + timeout: Duration, + mut receiver: SignalRx) -> bool { + file.write_all(&[0]).unwrap(); + file.sync_all().unwrap(); + + matches!( + tokio::time::timeout(timeout, receiver.recv()).await, + Ok(Ok(crate::signal::SignalTo::ReloadComponents(components))) if components.contains(expected_component) + ) + } + #[tokio::test] async fn file_directory_update() { trace_init(); @@ -209,10 +222,15 @@ mod tests { let file_path = dir.join("vector.toml"); let watcher_conf = WatcherConfig::RecommendedWatcher; let component_file_path = vec![dir.join("tls.cert"), dir.join("tls.key")]; + let http_component = ComponentKey::from("http"); + let component_config = - ComponentConfig::new(component_file_path, ComponentKey::from("http")); + ComponentConfig::new(component_file_path.clone(), http_component.clone()); std::fs::create_dir(&dir).unwrap(); let mut file = File::create(&file_path).unwrap(); + let mut component_files: Vec = component_file_path.iter().map(|file| { + File::create(&file).unwrap() + }).collect(); let (signal_tx, signal_rx) = broadcast::channel(128); spawn_thread( @@ -224,7 +242,18 @@ mod tests { ) .unwrap(); - if !test(&mut file, delay * 5, signal_rx).await { + let signal_rx2 = signal_rx.resubscribe(); + let signal_rx3 = signal_rx.resubscribe(); + + if !test_component_reload(&mut component_files[0], &http_component, delay * 5, signal_rx).await { + panic!("Test timed out"); + } + + if !test_component_reload(&mut component_files[1], &http_component, delay * 5, signal_rx2).await { + panic!("Test timed out"); + } + + if !test(&mut file, delay * 5, signal_rx3).await { panic!("Test timed out"); } } From c9330e7fda1547ef9ad329ece3b3643bbb642e7f Mon Sep 17 00:00:00 2001 From: Guillaume Le Blanc Date: Thu, 13 Mar 2025 16:10:31 +0100 Subject: [PATCH 15/16] run cargo fmt --- src/config/watcher.rs | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/src/config/watcher.rs b/src/config/watcher.rs index 34903462d68d3..398307ada37be 100644 --- a/src/config/watcher.rs +++ b/src/config/watcher.rs @@ -200,10 +200,12 @@ mod tests { ) } - async fn test_component_reload(file: &mut File, - expected_component: &ComponentKey, - timeout: Duration, - mut receiver: SignalRx) -> bool { + async fn test_component_reload( + file: &mut File, + expected_component: &ComponentKey, + timeout: Duration, + mut receiver: SignalRx, + ) -> bool { file.write_all(&[0]).unwrap(); file.sync_all().unwrap(); @@ -228,9 +230,10 @@ mod tests { ComponentConfig::new(component_file_path.clone(), http_component.clone()); std::fs::create_dir(&dir).unwrap(); let mut file = File::create(&file_path).unwrap(); - let mut component_files: Vec = component_file_path.iter().map(|file| { - File::create(&file).unwrap() - }).collect(); + let mut component_files: Vec = component_file_path + .iter() + .map(|file| File::create(file).unwrap()) + .collect(); let (signal_tx, signal_rx) = broadcast::channel(128); spawn_thread( @@ -245,11 +248,25 @@ mod tests { let signal_rx2 = signal_rx.resubscribe(); let signal_rx3 = signal_rx.resubscribe(); - if !test_component_reload(&mut component_files[0], &http_component, delay * 5, signal_rx).await { + if !test_component_reload( + &mut component_files[0], + &http_component, + delay * 5, + signal_rx, + ) + .await + { panic!("Test timed out"); } - if !test_component_reload(&mut component_files[1], &http_component, delay * 5, signal_rx2).await { + if !test_component_reload( + &mut component_files[1], + &http_component, + delay * 5, + signal_rx2, + ) + .await + { panic!("Test timed out"); } From cc649871e8087bdc6fd87bafc85a25574ef5d272 Mon Sep 17 00:00:00 2001 From: Guillaume Le Blanc Date: Fri, 14 Mar 2025 08:59:27 +0100 Subject: [PATCH 16/16] add dedicated watcher test --- src/config/watcher.rs | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/src/config/watcher.rs b/src/config/watcher.rs index 398307ada37be..2471a95efbf87 100644 --- a/src/config/watcher.rs +++ b/src/config/watcher.rs @@ -216,24 +216,23 @@ mod tests { } #[tokio::test] - async fn file_directory_update() { + async fn component_update() { trace_init(); let delay = Duration::from_secs(3); let dir = temp_dir().to_path_buf(); - let file_path = dir.join("vector.toml"); let watcher_conf = WatcherConfig::RecommendedWatcher; let component_file_path = vec![dir.join("tls.cert"), dir.join("tls.key")]; let http_component = ComponentKey::from("http"); - let component_config = - ComponentConfig::new(component_file_path.clone(), http_component.clone()); std::fs::create_dir(&dir).unwrap(); - let mut file = File::create(&file_path).unwrap(); + let mut component_files: Vec = component_file_path .iter() .map(|file| File::create(file).unwrap()) .collect(); + let component_config = + ComponentConfig::new(component_file_path.clone(), http_component.clone()); let (signal_tx, signal_rx) = broadcast::channel(128); spawn_thread( @@ -245,8 +244,8 @@ mod tests { ) .unwrap(); + let signal_rx = signal_rx.resubscribe(); let signal_rx2 = signal_rx.resubscribe(); - let signal_rx3 = signal_rx.resubscribe(); if !test_component_reload( &mut component_files[0], @@ -269,8 +268,23 @@ mod tests { { panic!("Test timed out"); } + } + #[tokio::test] + async fn file_directory_update() { + trace_init(); - if !test(&mut file, delay * 5, signal_rx3).await { + let delay = Duration::from_secs(3); + let dir = temp_dir().to_path_buf(); + let file_path = dir.join("vector.toml"); + let watcher_conf = WatcherConfig::RecommendedWatcher; + + std::fs::create_dir(&dir).unwrap(); + let mut file = File::create(&file_path).unwrap(); + + let (signal_tx, signal_rx) = broadcast::channel(128); + spawn_thread(watcher_conf, signal_tx, &[dir], vec![], delay).unwrap(); + + if !test(&mut file, delay * 5, signal_rx).await { panic!("Test timed out"); } }