Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): Handle reload based on referenced file change #22539

Merged
merged 18 commits into from
Mar 17, 2025
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/22386_extend_watcher_paths.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The TLS `crt_file` and `key_file` from `http` sinks are now watched when `--watch_config` is enabled and therefore changes to those files will trigger a config reload without the need to restart Vector.

authors: gllb
6 changes: 6 additions & 0 deletions lib/vector-common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ impl ComponentKey {
}
}

impl AsRef<ComponentKey> for ComponentKey {
fn as_ref(&self) -> &ComponentKey {
self
}
}

impl From<String> for ComponentKey {
fn from(id: String) -> Self {
Self { id }
Expand Down
83 changes: 65 additions & 18 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::extra_context::ExtraContext;
use crate::{api, internal_events::ApiStarted};
use crate::{
cli::{handle_config_errors, LogFormat, Opts, RootOpts, WatchConfigMethod},
config::{self, Config, ConfigPath},
config::{self, ComponentConfig, ComponentKey, Config, ConfigPath},
heartbeat,
internal_events::{VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped},
signal::{SignalHandler, SignalPair, SignalRx, SignalTo},
Expand Down Expand Up @@ -336,9 +336,32 @@ async fn handle_signal(
allow_empty_config: bool,
) -> Option<SignalTo> {
match signal {
Ok(SignalTo::ReloadComponents(component_keys)) => {
Copy link
Member

@pront pront Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some implementation details:

  1. We can add component_keys (or whatever struct we end up with) as a new RunningTopology field to avoid passing them around to all these functions.
  2. Then, we can use this new field in async fn shutdown_diff(...)
  3. We can probably add the notion of changed watched paths to src/config/diff.rs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the component_keys here are passed when a change is detected from the watcher. If we use it as a field of RunningTopology then it supposed to be empty almost all the time, except when a change is detected. Which will still imply passing it in reload_config_and_respawn anyway, right ? or maybe I don't get it ?

For the notion of watched paths change in configDiff I think I see what you mean.

Copy link
Contributor Author

@gllb gllb Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pront So I did implement this logic in configDiff, and it does what it should. Only this part : https://github.com/gllb/vector/blob/master/src/topology/running.rs#L490-L509 is causing the sink to be re-used and therefore it is not reloaded.
On the implementation where all is done inside shutdown_diff its easy to work around it as I can append to sink_to_change after so the reuse buffer code doesnt even know about it. But since its in configDiff now, I dont see how to work around it, any idea ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could remove this part, but I dont know if its safe to do or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glad we agree on the src/config/diff.rs enhancement.

What is more important is how to keep track of the component type. I don't think we have an enum or something to model currently. But we probably to keep associate a ComponentKey with a new ComponentType. Talking about this, I realize it might be better to do this in a followup PR since it might need some discussion.


If we use it as a field of RunningTopology then it supposed to be empty almost all the time, except when a change is detected.

Correct. It will be empty until the watcher detects a file change, then it will push it there. When we process/reload, it will clear the whole thing. I am a bit ambivalent on this, so feel free to TIOLI.

Which will still imply passing it in reload_config_and_respawn anyway, right ?

To be clear, I was proposing adding it here:

pub struct RunningTopology {

The reload_config_and_respawn will then have access to it via self.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I can create follow up PR for it but not sure I will have a lot of time for it, lets see. I still have no idea about the struggle with reuse buffer code I pointed out in shutdown_diff but I will open a PR with what I have, so anybody can contribute as well.

let mut topology_controller = topology_controller.lock().await;

// Reload paths
if let Some(paths) = config::process_paths(config_paths) {
topology_controller.config_paths = paths;
}

// Reload config
let new_config = config::load_from_paths_with_provider_and_secrets(
&topology_controller.config_paths,
signal_handler,
allow_empty_config,
)
.await;

reload_config_from_result(
topology_controller,
new_config,
Some(component_keys.iter().map(AsRef::as_ref).collect()),
)
.await
}
Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
let topology_controller = topology_controller.lock().await;
reload_config_from_result(topology_controller, config_builder.build()).await
reload_config_from_result(topology_controller, config_builder.build(), None).await
}
Ok(SignalTo::ReloadFromDisk) => {
let mut topology_controller = topology_controller.lock().await;
Expand All @@ -356,7 +379,7 @@ async fn handle_signal(
)
.await;

reload_config_from_result(topology_controller, new_config).await
reload_config_from_result(topology_controller, new_config, None).await
}
Err(RecvError::Lagged(amt)) => {
warn!("Overflow, dropped {} signals.", amt);
Expand All @@ -370,9 +393,13 @@ async fn handle_signal(
async fn reload_config_from_result(
mut topology_controller: MutexGuard<'_, TopologyController>,
config: Result<Config, Vec<String>>,
components_to_reload: Option<Vec<&ComponentKey>>,
) -> Option<SignalTo> {
match config {
Ok(new_config) => match topology_controller.reload(new_config).await {
Ok(new_config) => match topology_controller
.reload(new_config, components_to_reload)
.await
{
ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))),
_ => None,
},
Expand Down Expand Up @@ -491,23 +518,14 @@ pub async fn load_configs(
) -> Result<Config, ExitCode> {
let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?;

if let Some(watcher_conf) = watcher_conf {
// Start listening for config changes immediately.
config::watcher::spawn_thread(
watcher_conf,
signal_handler.clone_tx(),
config_paths.iter().map(Into::into),
None,
)
.map_err(|error| {
error!(message = "Unable to start config watcher.", %error);
exitcode::CONFIG
})?;
}
let watched_paths = config_paths
.iter()
.map(<&PathBuf>::from)
.collect::<Vec<_>>();

info!(
message = "Loading configs.",
paths = ?config_paths.iter().map(<&PathBuf>::from).collect::<Vec<_>>()
paths = ?watched_paths
);

let mut config = config::load_from_paths_with_provider_and_secrets(
Expand All @@ -518,6 +536,35 @@ pub async fn load_configs(
.await
.map_err(handle_config_errors)?;

let mut watched_component_paths = Vec::new();

if let Some(watcher_conf) = watcher_conf {
for (name, sink) in config.sinks() {
let files = sink.inner.files_to_watch();
let component_config =
ComponentConfig::new(files.into_iter().cloned().collect(), name.clone());
watched_component_paths.push(component_config);
}

info!(
message = "Starting watcher.",
paths = ?watched_paths
);

// Start listening for config changes.
config::watcher::spawn_thread(
watcher_conf,
signal_handler.clone_tx(),
watched_paths,
watched_component_paths,
None,
)
.map_err(|error| {
error!(message = "Unable to start config watcher.", %error);
exitcode::CONFIG
})?;
}

config::init_log_schema(config.global.log_schema.clone(), true);
config::init_telemetry(config.global.telemetry.clone(), true);

Expand Down
22 changes: 22 additions & 0 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,28 @@ pub use vector_lib::{
id::Inputs,
};

#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
pub struct ComponentConfig {
pub config_paths: Vec<PathBuf>,
pub component_key: ComponentKey,
}

impl ComponentConfig {
pub const fn new(config_paths: Vec<PathBuf>, component_key: ComponentKey) -> Self {
Self {
config_paths,
component_key,
}
}

pub fn contains(&self, config_paths: &[PathBuf]) -> Option<ComponentKey> {
if config_paths.iter().any(|p| self.config_paths.contains(p)) {
return Some(self.component_key.clone());
}
None
}
}

#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
pub enum ConfigPath {
File(PathBuf, FormatHint),
Expand Down
6 changes: 6 additions & 0 deletions src/config/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::cell::RefCell;
use async_trait::async_trait;
use dyn_clone::DynClone;
use serde::Serialize;
use std::path::PathBuf;
use vector_lib::buffers::{BufferConfig, BufferType};
use vector_lib::configurable::attributes::CustomAttribute;
use vector_lib::configurable::schema::{SchemaGenerator, SchemaObject};
Expand Down Expand Up @@ -220,6 +221,11 @@ pub trait SinkConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync
/// Gets the input configuration for this sink.
fn input(&self) -> Input;

/// Gets the files to watch to trigger reload
fn files_to_watch(&self) -> Vec<&PathBuf> {
Vec::new()
}

/// Gets the list of resources, if any, used by this sink.
///
/// Resources represent dependencies -- network ports, file descriptors, and so on -- that
Expand Down
53 changes: 44 additions & 9 deletions src/config/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::{
thread,
};

use crate::config::ComponentConfig;

use notify::{recommended_watcher, EventKind, RecursiveMode};

use crate::Error;
Expand Down Expand Up @@ -67,9 +69,18 @@ pub fn spawn_thread<'a>(
watcher_conf: WatcherConfig,
signal_tx: crate::signal::SignalTx,
config_paths: impl IntoIterator<Item = &'a PathBuf> + 'a,
component_configs: Vec<ComponentConfig>,
delay: impl Into<Option<Duration>>,
) -> Result<(), Error> {
let config_paths: Vec<_> = config_paths.into_iter().cloned().collect();
let mut config_paths: Vec<_> = config_paths.into_iter().cloned().collect();
let mut component_config_paths: Vec<_> = component_configs
.clone()
.into_iter()
.flat_map(|p| p.config_paths.clone())
.collect();

config_paths.append(&mut component_config_paths);

let delay = delay.into().unwrap_or(CONFIG_WATCH_DELAY);

// Create watcher now so not to miss any changes happening between
Expand All @@ -92,6 +103,12 @@ pub fn spawn_thread<'a>(

debug!(message = "Consumed file change events for delay.", delay = ?delay);

let component_keys: Vec<_> = component_configs
.clone()
.into_iter()
.flat_map(|p| p.contains(&event.paths))
.collect();

// We need to read paths to resolve any inode changes that may have happened.
// And we need to do it before raising sighup to avoid missing any change.
if let Err(error) = watcher.add_paths(&config_paths) {
Expand All @@ -102,9 +119,17 @@ pub fn spawn_thread<'a>(
debug!(message = "Reloaded paths.");

info!("Configuration file changed.");
_ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| {
error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
});
if !component_keys.is_empty() {
info!("Component {:?} configuration changed.", component_keys);
_ = signal_tx.send(crate::signal::SignalTo::ReloadComponents(component_keys)).map_err(|error| {
error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error)
});
} else {
_ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk)
.map_err(|error| {
error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
});
}
} else {
debug!(message = "Ignoring event.", event = ?event)
}
Expand Down Expand Up @@ -158,6 +183,7 @@ fn create_watcher(
mod tests {
use super::*;
use crate::{
config::ComponentKey,
signal::SignalRx,
test_util::{temp_dir, temp_file, trace_init},
};
Expand All @@ -182,12 +208,21 @@ mod tests {
let dir = temp_dir().to_path_buf();
let file_path = dir.join("vector.toml");
let watcher_conf = WatcherConfig::RecommendedWatcher;

let component_file_path = vec![dir.join("tls.cert"), dir.join("tls.key")];
let component_config =
ComponentConfig::new(component_file_path, ComponentKey::from("http"));
std::fs::create_dir(&dir).unwrap();
let mut file = File::create(&file_path).unwrap();

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(watcher_conf, signal_tx, &[dir], delay).unwrap();
spawn_thread(
watcher_conf,
signal_tx,
&[dir],
vec![component_config],
delay,
)
.unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand All @@ -204,7 +239,7 @@ mod tests {
let watcher_conf = WatcherConfig::RecommendedWatcher;

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(watcher_conf, signal_tx, &[file_path], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[file_path], vec![], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand All @@ -225,7 +260,7 @@ mod tests {
let watcher_conf = WatcherConfig::RecommendedWatcher;

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(watcher_conf, signal_tx, &[sym_file], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[sym_file], vec![], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand All @@ -246,7 +281,7 @@ mod tests {
let mut file = File::create(&file_path).unwrap();

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(watcher_conf, signal_tx, &[sub_dir], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[sub_dir], vec![], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand Down
2 changes: 2 additions & 0 deletions src/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub type SignalRx = broadcast::Receiver<SignalTo>;
/// Control messages used by Vector to drive topology and shutdown events.
#[allow(clippy::large_enum_variant)] // discovered during Rust upgrade to 1.57; just allowing for now since we did previously
pub enum SignalTo {
/// Signal to reload given components.
ReloadComponents(Vec<ComponentKey>),
/// Signal to reload config from a string.
ReloadFromConfigBuilder(ConfigBuilder),
/// Signal to reload config from the filesystem.
Expand Down
14 changes: 14 additions & 0 deletions src/sinks/http/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use http::{header::AUTHORIZATION, HeaderName, HeaderValue, Method, Request, StatusCode};
use hyper::Body;
use indexmap::IndexMap;
use std::path::PathBuf;
use vector_lib::codecs::{
encoding::{Framer, Serializer},
CharacterDelimitedEncoder,
Expand Down Expand Up @@ -301,6 +302,19 @@ impl SinkConfig for HttpSinkConfig {
Input::new(self.encoding.config().1.input_type())
}

fn files_to_watch(&self) -> Vec<&PathBuf> {
let mut files = Vec::new();
if let Some(tls) = &self.tls {
if let Some(crt_file) = &tls.crt_file {
files.push(crt_file)
}
if let Some(key_file) = &tls.key_file {
files.push(key_file)
}
};
files
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
Expand Down
8 changes: 6 additions & 2 deletions src/topology/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ pub enum ReloadOutcome {
}

impl TopologyController {
pub async fn reload(&mut self, mut new_config: config::Config) -> ReloadOutcome {
pub async fn reload(
&mut self,
mut new_config: config::Config,
components_to_reload: Option<Vec<&config::ComponentKey>>,
) -> ReloadOutcome {
new_config
.healthchecks
.set_require_healthy(self.require_healthy);
Expand Down Expand Up @@ -103,7 +107,7 @@ impl TopologyController {

match self
.topology
.reload_config_and_respawn(new_config, self.extra_context.clone())
.reload_config_and_respawn(new_config, self.extra_context.clone(), components_to_reload)
.await
{
Ok(true) => {
Expand Down
Loading
Loading