Skip to content

Commit 5e392ad

Browse files
gllbpront
andauthored
feat(cli): Handle reload based on referenced file change (#22539)
* feat(watch tls files) Add http sink tls cert/key to config::watcher * Update changelog.d/22386_extend_watcher_paths.enhancement.md Co-authored-by: Pavlos Rontidis <[email protected]> * feat(watch tls file) Add ComponentConfig to let watcher find components * feat(watch tls file) Let watcher send ReloadComponent signal * feat(watch tls file) handle ReloadComponent signal * feat(watch tls file) Ensure signal is sent once in case of ReloadComponent * (feat reload components) Handle Vec<ComponentKey> instead of single one * cargo fmt * Update changelog.d/22386_extend_watcher_paths.enhancement.md Co-authored-by: Pavlos Rontidis <[email protected]> * Update src/config/mod.rs Co-authored-by: Pavlos Rontidis <[email protected]> * Fix missing enclosed delimiter * solve clippy issues * cargo fmt * Fix testing * run cargo fmt * add dedicated watcher test --------- Co-authored-by: Pavlos Rontidis <[email protected]>
1 parent 4762af5 commit 5e392ad

14 files changed

+261
-53
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
The TLS `crt_file` and `key_file` from `http` sinks are now watched when `--watch_config` is enabled and therefore changes to those files will trigger a config reload without the need to restart Vector.
2+
3+
authors: gllb

lib/vector-common/src/config.rs

+6
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ impl ComponentKey {
3939
}
4040
}
4141

42+
impl AsRef<ComponentKey> for ComponentKey {
43+
fn as_ref(&self) -> &ComponentKey {
44+
self
45+
}
46+
}
47+
4248
impl From<String> for ComponentKey {
4349
fn from(id: String) -> Self {
4450
Self { id }

src/app.rs

+65-18
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::extra_context::ExtraContext;
1818
use crate::{api, internal_events::ApiStarted};
1919
use crate::{
2020
cli::{handle_config_errors, LogFormat, Opts, RootOpts, WatchConfigMethod},
21-
config::{self, Config, ConfigPath},
21+
config::{self, ComponentConfig, ComponentKey, Config, ConfigPath},
2222
heartbeat,
2323
internal_events::{VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped},
2424
signal::{SignalHandler, SignalPair, SignalRx, SignalTo},
@@ -336,9 +336,32 @@ async fn handle_signal(
336336
allow_empty_config: bool,
337337
) -> Option<SignalTo> {
338338
match signal {
339+
Ok(SignalTo::ReloadComponents(component_keys)) => {
340+
let mut topology_controller = topology_controller.lock().await;
341+
342+
// Reload paths
343+
if let Some(paths) = config::process_paths(config_paths) {
344+
topology_controller.config_paths = paths;
345+
}
346+
347+
// Reload config
348+
let new_config = config::load_from_paths_with_provider_and_secrets(
349+
&topology_controller.config_paths,
350+
signal_handler,
351+
allow_empty_config,
352+
)
353+
.await;
354+
355+
reload_config_from_result(
356+
topology_controller,
357+
new_config,
358+
Some(component_keys.iter().map(AsRef::as_ref).collect()),
359+
)
360+
.await
361+
}
339362
Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
340363
let topology_controller = topology_controller.lock().await;
341-
reload_config_from_result(topology_controller, config_builder.build()).await
364+
reload_config_from_result(topology_controller, config_builder.build(), None).await
342365
}
343366
Ok(SignalTo::ReloadFromDisk) => {
344367
let mut topology_controller = topology_controller.lock().await;
@@ -356,7 +379,7 @@ async fn handle_signal(
356379
)
357380
.await;
358381

359-
reload_config_from_result(topology_controller, new_config).await
382+
reload_config_from_result(topology_controller, new_config, None).await
360383
}
361384
Err(RecvError::Lagged(amt)) => {
362385
warn!("Overflow, dropped {} signals.", amt);
@@ -370,9 +393,13 @@ async fn handle_signal(
370393
async fn reload_config_from_result(
371394
mut topology_controller: MutexGuard<'_, TopologyController>,
372395
config: Result<Config, Vec<String>>,
396+
components_to_reload: Option<Vec<&ComponentKey>>,
373397
) -> Option<SignalTo> {
374398
match config {
375-
Ok(new_config) => match topology_controller.reload(new_config).await {
399+
Ok(new_config) => match topology_controller
400+
.reload(new_config, components_to_reload)
401+
.await
402+
{
376403
ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))),
377404
_ => None,
378405
},
@@ -491,23 +518,14 @@ pub async fn load_configs(
491518
) -> Result<Config, ExitCode> {
492519
let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?;
493520

494-
if let Some(watcher_conf) = watcher_conf {
495-
// Start listening for config changes immediately.
496-
config::watcher::spawn_thread(
497-
watcher_conf,
498-
signal_handler.clone_tx(),
499-
config_paths.iter().map(Into::into),
500-
None,
501-
)
502-
.map_err(|error| {
503-
error!(message = "Unable to start config watcher.", %error);
504-
exitcode::CONFIG
505-
})?;
506-
}
521+
let watched_paths = config_paths
522+
.iter()
523+
.map(<&PathBuf>::from)
524+
.collect::<Vec<_>>();
507525

508526
info!(
509527
message = "Loading configs.",
510-
paths = ?config_paths.iter().map(<&PathBuf>::from).collect::<Vec<_>>()
528+
paths = ?watched_paths
511529
);
512530

513531
let mut config = config::load_from_paths_with_provider_and_secrets(
@@ -518,6 +536,35 @@ pub async fn load_configs(
518536
.await
519537
.map_err(handle_config_errors)?;
520538

539+
let mut watched_component_paths = Vec::new();
540+
541+
if let Some(watcher_conf) = watcher_conf {
542+
for (name, sink) in config.sinks() {
543+
let files = sink.inner.files_to_watch();
544+
let component_config =
545+
ComponentConfig::new(files.into_iter().cloned().collect(), name.clone());
546+
watched_component_paths.push(component_config);
547+
}
548+
549+
info!(
550+
message = "Starting watcher.",
551+
paths = ?watched_paths
552+
);
553+
554+
// Start listening for config changes.
555+
config::watcher::spawn_thread(
556+
watcher_conf,
557+
signal_handler.clone_tx(),
558+
watched_paths,
559+
watched_component_paths,
560+
None,
561+
)
562+
.map_err(|error| {
563+
error!(message = "Unable to start config watcher.", %error);
564+
exitcode::CONFIG
565+
})?;
566+
}
567+
521568
config::init_log_schema(config.global.log_schema.clone(), true);
522569
config::init_telemetry(config.global.telemetry.clone(), true);
523570

src/config/mod.rs

+22
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,28 @@ pub use vector_lib::{
7676
id::Inputs,
7777
};
7878

79+
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
80+
pub struct ComponentConfig {
81+
pub config_paths: Vec<PathBuf>,
82+
pub component_key: ComponentKey,
83+
}
84+
85+
impl ComponentConfig {
86+
pub const fn new(config_paths: Vec<PathBuf>, component_key: ComponentKey) -> Self {
87+
Self {
88+
config_paths,
89+
component_key,
90+
}
91+
}
92+
93+
pub fn contains(&self, config_paths: &[PathBuf]) -> Option<ComponentKey> {
94+
if config_paths.iter().any(|p| self.config_paths.contains(p)) {
95+
return Some(self.component_key.clone());
96+
}
97+
None
98+
}
99+
}
100+
79101
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
80102
pub enum ConfigPath {
81103
File(PathBuf, FormatHint),

src/config/sink.rs

+6
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::cell::RefCell;
33
use async_trait::async_trait;
44
use dyn_clone::DynClone;
55
use serde::Serialize;
6+
use std::path::PathBuf;
67
use vector_lib::buffers::{BufferConfig, BufferType};
78
use vector_lib::configurable::attributes::CustomAttribute;
89
use vector_lib::configurable::schema::{SchemaGenerator, SchemaObject};
@@ -220,6 +221,11 @@ pub trait SinkConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync
220221
/// Gets the input configuration for this sink.
221222
fn input(&self) -> Input;
222223

224+
/// Gets the files to watch to trigger reload
225+
fn files_to_watch(&self) -> Vec<&PathBuf> {
226+
Vec::new()
227+
}
228+
223229
/// Gets the list of resources, if any, used by this sink.
224230
///
225231
/// Resources represent dependencies -- network ports, file descriptors, and so on -- that

src/config/watcher.rs

+103-8
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use std::{
77
thread,
88
};
99

10+
use crate::config::ComponentConfig;
11+
1012
use notify::{recommended_watcher, EventKind, RecursiveMode};
1113

1214
use crate::Error;
@@ -67,9 +69,18 @@ pub fn spawn_thread<'a>(
6769
watcher_conf: WatcherConfig,
6870
signal_tx: crate::signal::SignalTx,
6971
config_paths: impl IntoIterator<Item = &'a PathBuf> + 'a,
72+
component_configs: Vec<ComponentConfig>,
7073
delay: impl Into<Option<Duration>>,
7174
) -> Result<(), Error> {
72-
let config_paths: Vec<_> = config_paths.into_iter().cloned().collect();
75+
let mut config_paths: Vec<_> = config_paths.into_iter().cloned().collect();
76+
let mut component_config_paths: Vec<_> = component_configs
77+
.clone()
78+
.into_iter()
79+
.flat_map(|p| p.config_paths.clone())
80+
.collect();
81+
82+
config_paths.append(&mut component_config_paths);
83+
7384
let delay = delay.into().unwrap_or(CONFIG_WATCH_DELAY);
7485

7586
// Create watcher now so not to miss any changes happening between
@@ -92,6 +103,12 @@ pub fn spawn_thread<'a>(
92103

93104
debug!(message = "Consumed file change events for delay.", delay = ?delay);
94105

106+
let component_keys: Vec<_> = component_configs
107+
.clone()
108+
.into_iter()
109+
.flat_map(|p| p.contains(&event.paths))
110+
.collect();
111+
95112
// We need to read paths to resolve any inode changes that may have happened.
96113
// And we need to do it before raising sighup to avoid missing any change.
97114
if let Err(error) = watcher.add_paths(&config_paths) {
@@ -102,9 +119,17 @@ pub fn spawn_thread<'a>(
102119
debug!(message = "Reloaded paths.");
103120

104121
info!("Configuration file changed.");
105-
_ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| {
106-
error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
107-
});
122+
if !component_keys.is_empty() {
123+
info!("Component {:?} configuration changed.", component_keys);
124+
_ = signal_tx.send(crate::signal::SignalTo::ReloadComponents(component_keys)).map_err(|error| {
125+
error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error)
126+
});
127+
} else {
128+
_ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk)
129+
.map_err(|error| {
130+
error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
131+
});
132+
}
108133
} else {
109134
debug!(message = "Ignoring event.", event = ?event)
110135
}
@@ -158,6 +183,7 @@ fn create_watcher(
158183
mod tests {
159184
use super::*;
160185
use crate::{
186+
config::ComponentKey,
161187
signal::SignalRx,
162188
test_util::{temp_dir, temp_file, trace_init},
163189
};
@@ -174,6 +200,75 @@ mod tests {
174200
)
175201
}
176202

203+
async fn test_component_reload(
204+
file: &mut File,
205+
expected_component: &ComponentKey,
206+
timeout: Duration,
207+
mut receiver: SignalRx,
208+
) -> bool {
209+
file.write_all(&[0]).unwrap();
210+
file.sync_all().unwrap();
211+
212+
matches!(
213+
tokio::time::timeout(timeout, receiver.recv()).await,
214+
Ok(Ok(crate::signal::SignalTo::ReloadComponents(components))) if components.contains(expected_component)
215+
)
216+
}
217+
218+
#[tokio::test]
219+
async fn component_update() {
220+
trace_init();
221+
222+
let delay = Duration::from_secs(3);
223+
let dir = temp_dir().to_path_buf();
224+
let watcher_conf = WatcherConfig::RecommendedWatcher;
225+
let component_file_path = vec![dir.join("tls.cert"), dir.join("tls.key")];
226+
let http_component = ComponentKey::from("http");
227+
228+
std::fs::create_dir(&dir).unwrap();
229+
230+
let mut component_files: Vec<std::fs::File> = component_file_path
231+
.iter()
232+
.map(|file| File::create(file).unwrap())
233+
.collect();
234+
let component_config =
235+
ComponentConfig::new(component_file_path.clone(), http_component.clone());
236+
237+
let (signal_tx, signal_rx) = broadcast::channel(128);
238+
spawn_thread(
239+
watcher_conf,
240+
signal_tx,
241+
&[dir],
242+
vec![component_config],
243+
delay,
244+
)
245+
.unwrap();
246+
247+
let signal_rx = signal_rx.resubscribe();
248+
let signal_rx2 = signal_rx.resubscribe();
249+
250+
if !test_component_reload(
251+
&mut component_files[0],
252+
&http_component,
253+
delay * 5,
254+
signal_rx,
255+
)
256+
.await
257+
{
258+
panic!("Test timed out");
259+
}
260+
261+
if !test_component_reload(
262+
&mut component_files[1],
263+
&http_component,
264+
delay * 5,
265+
signal_rx2,
266+
)
267+
.await
268+
{
269+
panic!("Test timed out");
270+
}
271+
}
177272
#[tokio::test]
178273
async fn file_directory_update() {
179274
trace_init();
@@ -187,7 +282,7 @@ mod tests {
187282
let mut file = File::create(&file_path).unwrap();
188283

189284
let (signal_tx, signal_rx) = broadcast::channel(128);
190-
spawn_thread(watcher_conf, signal_tx, &[dir], delay).unwrap();
285+
spawn_thread(watcher_conf, signal_tx, &[dir], vec![], delay).unwrap();
191286

192287
if !test(&mut file, delay * 5, signal_rx).await {
193288
panic!("Test timed out");
@@ -204,7 +299,7 @@ mod tests {
204299
let watcher_conf = WatcherConfig::RecommendedWatcher;
205300

206301
let (signal_tx, signal_rx) = broadcast::channel(128);
207-
spawn_thread(watcher_conf, signal_tx, &[file_path], delay).unwrap();
302+
spawn_thread(watcher_conf, signal_tx, &[file_path], vec![], delay).unwrap();
208303

209304
if !test(&mut file, delay * 5, signal_rx).await {
210305
panic!("Test timed out");
@@ -225,7 +320,7 @@ mod tests {
225320
let watcher_conf = WatcherConfig::RecommendedWatcher;
226321

227322
let (signal_tx, signal_rx) = broadcast::channel(128);
228-
spawn_thread(watcher_conf, signal_tx, &[sym_file], delay).unwrap();
323+
spawn_thread(watcher_conf, signal_tx, &[sym_file], vec![], delay).unwrap();
229324

230325
if !test(&mut file, delay * 5, signal_rx).await {
231326
panic!("Test timed out");
@@ -246,7 +341,7 @@ mod tests {
246341
let mut file = File::create(&file_path).unwrap();
247342

248343
let (signal_tx, signal_rx) = broadcast::channel(128);
249-
spawn_thread(watcher_conf, signal_tx, &[sub_dir], delay).unwrap();
344+
spawn_thread(watcher_conf, signal_tx, &[sub_dir], vec![], delay).unwrap();
250345

251346
if !test(&mut file, delay * 5, signal_rx).await {
252347
panic!("Test timed out");

src/signal.rs

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ pub type SignalRx = broadcast::Receiver<SignalTo>;
1414
/// Control messages used by Vector to drive topology and shutdown events.
1515
#[allow(clippy::large_enum_variant)] // discovered during Rust upgrade to 1.57; just allowing for now since we did previously
1616
pub enum SignalTo {
17+
/// Signal to reload given components.
18+
ReloadComponents(Vec<ComponentKey>),
1719
/// Signal to reload config from a string.
1820
ReloadFromConfigBuilder(ConfigBuilder),
1921
/// Signal to reload config from the filesystem.

0 commit comments

Comments
 (0)