Skip to content

Commit 5e392ad

Browse files
gllbpront
andauthoredMar 17, 2025
feat(cli): Handle reload based on referenced file change (vectordotdev#22539)
* feat(watch tls files) Add http sink tls cert/key to config::watcher * Update changelog.d/22386_extend_watcher_paths.enhancement.md Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com> * feat(watch tls file) Add ComponentConfig to let watcher find components * feat(watch tls file) Let watcher send ReloadComponent signal * feat(watch tls file) handle ReloadComponent signal * feat(watch tls file) Ensure signal is sent once in case of ReloadComponent * (feat reload components) Handle Vec<ComponentKey> instead of single one * cargo fmt * Update changelog.d/22386_extend_watcher_paths.enhancement.md Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com> * Update src/config/mod.rs Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com> * Fix missing enclosed delimiter * solve clippy issues * cargo fmt * Fix testing * run cargo fmt * add dedicated watcher test --------- Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com>
1 parent 4762af5 commit 5e392ad

14 files changed

+261
-53
lines changed
 
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
The TLS `crt_file` and `key_file` from `http` sinks are now watched when `--watch_config` is enabled and therefore changes to those files will trigger a config reload without the need to restart Vector.
2+
3+
authors: gllb

‎lib/vector-common/src/config.rs

+6
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ impl ComponentKey {
3939
}
4040
}
4141

42+
impl AsRef<ComponentKey> for ComponentKey {
43+
fn as_ref(&self) -> &ComponentKey {
44+
self
45+
}
46+
}
47+
4248
impl From<String> for ComponentKey {
4349
fn from(id: String) -> Self {
4450
Self { id }

‎src/app.rs

+65-18
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::extra_context::ExtraContext;
1818
use crate::{api, internal_events::ApiStarted};
1919
use crate::{
2020
cli::{handle_config_errors, LogFormat, Opts, RootOpts, WatchConfigMethod},
21-
config::{self, Config, ConfigPath},
21+
config::{self, ComponentConfig, ComponentKey, Config, ConfigPath},
2222
heartbeat,
2323
internal_events::{VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped},
2424
signal::{SignalHandler, SignalPair, SignalRx, SignalTo},
@@ -336,9 +336,32 @@ async fn handle_signal(
336336
allow_empty_config: bool,
337337
) -> Option<SignalTo> {
338338
match signal {
339+
Ok(SignalTo::ReloadComponents(component_keys)) => {
340+
let mut topology_controller = topology_controller.lock().await;
341+
342+
// Reload paths
343+
if let Some(paths) = config::process_paths(config_paths) {
344+
topology_controller.config_paths = paths;
345+
}
346+
347+
// Reload config
348+
let new_config = config::load_from_paths_with_provider_and_secrets(
349+
&topology_controller.config_paths,
350+
signal_handler,
351+
allow_empty_config,
352+
)
353+
.await;
354+
355+
reload_config_from_result(
356+
topology_controller,
357+
new_config,
358+
Some(component_keys.iter().map(AsRef::as_ref).collect()),
359+
)
360+
.await
361+
}
339362
Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
340363
let topology_controller = topology_controller.lock().await;
341-
reload_config_from_result(topology_controller, config_builder.build()).await
364+
reload_config_from_result(topology_controller, config_builder.build(), None).await
342365
}
343366
Ok(SignalTo::ReloadFromDisk) => {
344367
let mut topology_controller = topology_controller.lock().await;
@@ -356,7 +379,7 @@ async fn handle_signal(
356379
)
357380
.await;
358381

359-
reload_config_from_result(topology_controller, new_config).await
382+
reload_config_from_result(topology_controller, new_config, None).await
360383
}
361384
Err(RecvError::Lagged(amt)) => {
362385
warn!("Overflow, dropped {} signals.", amt);
@@ -370,9 +393,13 @@ async fn handle_signal(
370393
async fn reload_config_from_result(
371394
mut topology_controller: MutexGuard<'_, TopologyController>,
372395
config: Result<Config, Vec<String>>,
396+
components_to_reload: Option<Vec<&ComponentKey>>,
373397
) -> Option<SignalTo> {
374398
match config {
375-
Ok(new_config) => match topology_controller.reload(new_config).await {
399+
Ok(new_config) => match topology_controller
400+
.reload(new_config, components_to_reload)
401+
.await
402+
{
376403
ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))),
377404
_ => None,
378405
},
@@ -491,23 +518,14 @@ pub async fn load_configs(
491518
) -> Result<Config, ExitCode> {
492519
let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?;
493520

494-
if let Some(watcher_conf) = watcher_conf {
495-
// Start listening for config changes immediately.
496-
config::watcher::spawn_thread(
497-
watcher_conf,
498-
signal_handler.clone_tx(),
499-
config_paths.iter().map(Into::into),
500-
None,
501-
)
502-
.map_err(|error| {
503-
error!(message = "Unable to start config watcher.", %error);
504-
exitcode::CONFIG
505-
})?;
506-
}
521+
let watched_paths = config_paths
522+
.iter()
523+
.map(<&PathBuf>::from)
524+
.collect::<Vec<_>>();
507525

508526
info!(
509527
message = "Loading configs.",
510-
paths = ?config_paths.iter().map(<&PathBuf>::from).collect::<Vec<_>>()
528+
paths = ?watched_paths
511529
);
512530

513531
let mut config = config::load_from_paths_with_provider_and_secrets(
@@ -518,6 +536,35 @@ pub async fn load_configs(
518536
.await
519537
.map_err(handle_config_errors)?;
520538

539+
let mut watched_component_paths = Vec::new();
540+
541+
if let Some(watcher_conf) = watcher_conf {
542+
for (name, sink) in config.sinks() {
543+
let files = sink.inner.files_to_watch();
544+
let component_config =
545+
ComponentConfig::new(files.into_iter().cloned().collect(), name.clone());
546+
watched_component_paths.push(component_config);
547+
}
548+
549+
info!(
550+
message = "Starting watcher.",
551+
paths = ?watched_paths
552+
);
553+
554+
// Start listening for config changes.
555+
config::watcher::spawn_thread(
556+
watcher_conf,
557+
signal_handler.clone_tx(),
558+
watched_paths,
559+
watched_component_paths,
560+
None,
561+
)
562+
.map_err(|error| {
563+
error!(message = "Unable to start config watcher.", %error);
564+
exitcode::CONFIG
565+
})?;
566+
}
567+
521568
config::init_log_schema(config.global.log_schema.clone(), true);
522569
config::init_telemetry(config.global.telemetry.clone(), true);
523570

‎src/config/mod.rs

+22
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,28 @@ pub use vector_lib::{
7676
id::Inputs,
7777
};
7878

79+
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
80+
pub struct ComponentConfig {
81+
pub config_paths: Vec<PathBuf>,
82+
pub component_key: ComponentKey,
83+
}
84+
85+
impl ComponentConfig {
86+
pub const fn new(config_paths: Vec<PathBuf>, component_key: ComponentKey) -> Self {
87+
Self {
88+
config_paths,
89+
component_key,
90+
}
91+
}
92+
93+
pub fn contains(&self, config_paths: &[PathBuf]) -> Option<ComponentKey> {
94+
if config_paths.iter().any(|p| self.config_paths.contains(p)) {
95+
return Some(self.component_key.clone());
96+
}
97+
None
98+
}
99+
}
100+
79101
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
80102
pub enum ConfigPath {
81103
File(PathBuf, FormatHint),

‎src/config/sink.rs

+6
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::cell::RefCell;
33
use async_trait::async_trait;
44
use dyn_clone::DynClone;
55
use serde::Serialize;
6+
use std::path::PathBuf;
67
use vector_lib::buffers::{BufferConfig, BufferType};
78
use vector_lib::configurable::attributes::CustomAttribute;
89
use vector_lib::configurable::schema::{SchemaGenerator, SchemaObject};
@@ -220,6 +221,11 @@ pub trait SinkConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync
220221
/// Gets the input configuration for this sink.
221222
fn input(&self) -> Input;
222223

224+
/// Gets the files to watch to trigger reload
225+
fn files_to_watch(&self) -> Vec<&PathBuf> {
226+
Vec::new()
227+
}
228+
223229
/// Gets the list of resources, if any, used by this sink.
224230
///
225231
/// Resources represent dependencies -- network ports, file descriptors, and so on -- that

‎src/config/watcher.rs

+103-8
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use std::{
77
thread,
88
};
99

10+
use crate::config::ComponentConfig;
11+
1012
use notify::{recommended_watcher, EventKind, RecursiveMode};
1113

1214
use crate::Error;
@@ -67,9 +69,18 @@ pub fn spawn_thread<'a>(
6769
watcher_conf: WatcherConfig,
6870
signal_tx: crate::signal::SignalTx,
6971
config_paths: impl IntoIterator<Item = &'a PathBuf> + 'a,
72+
component_configs: Vec<ComponentConfig>,
7073
delay: impl Into<Option<Duration>>,
7174
) -> Result<(), Error> {
72-
let config_paths: Vec<_> = config_paths.into_iter().cloned().collect();
75+
let mut config_paths: Vec<_> = config_paths.into_iter().cloned().collect();
76+
let mut component_config_paths: Vec<_> = component_configs
77+
.clone()
78+
.into_iter()
79+
.flat_map(|p| p.config_paths.clone())
80+
.collect();
81+
82+
config_paths.append(&mut component_config_paths);
83+
7384
let delay = delay.into().unwrap_or(CONFIG_WATCH_DELAY);
7485

7586
// Create watcher now so not to miss any changes happening between
@@ -92,6 +103,12 @@ pub fn spawn_thread<'a>(
92103

93104
debug!(message = "Consumed file change events for delay.", delay = ?delay);
94105

106+
let component_keys: Vec<_> = component_configs
107+
.clone()
108+
.into_iter()
109+
.flat_map(|p| p.contains(&event.paths))
110+
.collect();
111+
95112
// We need to read paths to resolve any inode changes that may have happened.
96113
// And we need to do it before raising sighup to avoid missing any change.
97114
if let Err(error) = watcher.add_paths(&config_paths) {
@@ -102,9 +119,17 @@ pub fn spawn_thread<'a>(
102119
debug!(message = "Reloaded paths.");
103120

104121
info!("Configuration file changed.");
105-
_ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| {
106-
error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
107-
});
122+
if !component_keys.is_empty() {
123+
info!("Component {:?} configuration changed.", component_keys);
124+
_ = signal_tx.send(crate::signal::SignalTo::ReloadComponents(component_keys)).map_err(|error| {
125+
error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error)
126+
});
127+
} else {
128+
_ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk)
129+
.map_err(|error| {
130+
error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
131+
});
132+
}
108133
} else {
109134
debug!(message = "Ignoring event.", event = ?event)
110135
}
@@ -158,6 +183,7 @@ fn create_watcher(
158183
mod tests {
159184
use super::*;
160185
use crate::{
186+
config::ComponentKey,
161187
signal::SignalRx,
162188
test_util::{temp_dir, temp_file, trace_init},
163189
};
@@ -174,6 +200,75 @@ mod tests {
174200
)
175201
}
176202

203+
async fn test_component_reload(
204+
file: &mut File,
205+
expected_component: &ComponentKey,
206+
timeout: Duration,
207+
mut receiver: SignalRx,
208+
) -> bool {
209+
file.write_all(&[0]).unwrap();
210+
file.sync_all().unwrap();
211+
212+
matches!(
213+
tokio::time::timeout(timeout, receiver.recv()).await,
214+
Ok(Ok(crate::signal::SignalTo::ReloadComponents(components))) if components.contains(expected_component)
215+
)
216+
}
217+
218+
#[tokio::test]
219+
async fn component_update() {
220+
trace_init();
221+
222+
let delay = Duration::from_secs(3);
223+
let dir = temp_dir().to_path_buf();
224+
let watcher_conf = WatcherConfig::RecommendedWatcher;
225+
let component_file_path = vec![dir.join("tls.cert"), dir.join("tls.key")];
226+
let http_component = ComponentKey::from("http");
227+
228+
std::fs::create_dir(&dir).unwrap();
229+
230+
let mut component_files: Vec<std::fs::File> = component_file_path
231+
.iter()
232+
.map(|file| File::create(file).unwrap())
233+
.collect();
234+
let component_config =
235+
ComponentConfig::new(component_file_path.clone(), http_component.clone());
236+
237+
let (signal_tx, signal_rx) = broadcast::channel(128);
238+
spawn_thread(
239+
watcher_conf,
240+
signal_tx,
241+
&[dir],
242+
vec![component_config],
243+
delay,
244+
)
245+
.unwrap();
246+
247+
let signal_rx = signal_rx.resubscribe();
248+
let signal_rx2 = signal_rx.resubscribe();
249+
250+
if !test_component_reload(
251+
&mut component_files[0],
252+
&http_component,
253+
delay * 5,
254+
signal_rx,
255+
)
256+
.await
257+
{
258+
panic!("Test timed out");
259+
}
260+
261+
if !test_component_reload(
262+
&mut component_files[1],
263+
&http_component,
264+
delay * 5,
265+
signal_rx2,
266+
)
267+
.await
268+
{
269+
panic!("Test timed out");
270+
}
271+
}
177272
#[tokio::test]
178273
async fn file_directory_update() {
179274
trace_init();
@@ -187,7 +282,7 @@ mod tests {
187282
let mut file = File::create(&file_path).unwrap();
188283

189284
let (signal_tx, signal_rx) = broadcast::channel(128);
190-
spawn_thread(watcher_conf, signal_tx, &[dir], delay).unwrap();
285+
spawn_thread(watcher_conf, signal_tx, &[dir], vec![], delay).unwrap();
191286

192287
if !test(&mut file, delay * 5, signal_rx).await {
193288
panic!("Test timed out");
@@ -204,7 +299,7 @@ mod tests {
204299
let watcher_conf = WatcherConfig::RecommendedWatcher;
205300

206301
let (signal_tx, signal_rx) = broadcast::channel(128);
207-
spawn_thread(watcher_conf, signal_tx, &[file_path], delay).unwrap();
302+
spawn_thread(watcher_conf, signal_tx, &[file_path], vec![], delay).unwrap();
208303

209304
if !test(&mut file, delay * 5, signal_rx).await {
210305
panic!("Test timed out");
@@ -225,7 +320,7 @@ mod tests {
225320
let watcher_conf = WatcherConfig::RecommendedWatcher;
226321

227322
let (signal_tx, signal_rx) = broadcast::channel(128);
228-
spawn_thread(watcher_conf, signal_tx, &[sym_file], delay).unwrap();
323+
spawn_thread(watcher_conf, signal_tx, &[sym_file], vec![], delay).unwrap();
229324

230325
if !test(&mut file, delay * 5, signal_rx).await {
231326
panic!("Test timed out");
@@ -246,7 +341,7 @@ mod tests {
246341
let mut file = File::create(&file_path).unwrap();
247342

248343
let (signal_tx, signal_rx) = broadcast::channel(128);
249-
spawn_thread(watcher_conf, signal_tx, &[sub_dir], delay).unwrap();
344+
spawn_thread(watcher_conf, signal_tx, &[sub_dir], vec![], delay).unwrap();
250345

251346
if !test(&mut file, delay * 5, signal_rx).await {
252347
panic!("Test timed out");

‎src/signal.rs

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ pub type SignalRx = broadcast::Receiver<SignalTo>;
1414
/// Control messages used by Vector to drive topology and shutdown events.
1515
#[allow(clippy::large_enum_variant)] // discovered during Rust upgrade to 1.57; just allowing for now since we did previously
1616
pub enum SignalTo {
17+
/// Signal to reload given components.
18+
ReloadComponents(Vec<ComponentKey>),
1719
/// Signal to reload config from a string.
1820
ReloadFromConfigBuilder(ConfigBuilder),
1921
/// Signal to reload config from the filesystem.

‎src/sinks/http/config.rs

+14
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use http::{header::AUTHORIZATION, HeaderName, HeaderValue, Method, Request, StatusCode};
44
use hyper::Body;
55
use indexmap::IndexMap;
6+
use std::path::PathBuf;
67
use vector_lib::codecs::{
78
encoding::{Framer, Serializer},
89
CharacterDelimitedEncoder,
@@ -301,6 +302,19 @@ impl SinkConfig for HttpSinkConfig {
301302
Input::new(self.encoding.config().1.input_type())
302303
}
303304

305+
fn files_to_watch(&self) -> Vec<&PathBuf> {
306+
let mut files = Vec::new();
307+
if let Some(tls) = &self.tls {
308+
if let Some(crt_file) = &tls.crt_file {
309+
files.push(crt_file)
310+
}
311+
if let Some(key_file) = &tls.key_file {
312+
files.push(key_file)
313+
}
314+
};
315+
files
316+
}
317+
304318
fn acknowledgements(&self) -> &AcknowledgementsConfig {
305319
&self.acknowledgements
306320
}

‎src/topology/controller.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,11 @@ pub enum ReloadOutcome {
5959
}
6060

6161
impl TopologyController {
62-
pub async fn reload(&mut self, mut new_config: config::Config) -> ReloadOutcome {
62+
pub async fn reload(
63+
&mut self,
64+
mut new_config: config::Config,
65+
components_to_reload: Option<Vec<&config::ComponentKey>>,
66+
) -> ReloadOutcome {
6367
new_config
6468
.healthchecks
6569
.set_require_healthy(self.require_healthy);
@@ -103,7 +107,7 @@ impl TopologyController {
103107

104108
match self
105109
.topology
106-
.reload_config_and_respawn(new_config, self.extra_context.clone())
110+
.reload_config_and_respawn(new_config, self.extra_context.clone(), components_to_reload)
107111
.await
108112
{
109113
Ok(true) => {

‎src/topology/running.rs

+11-2
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ impl RunningTopology {
227227
&mut self,
228228
new_config: Config,
229229
extra_context: ExtraContext,
230+
components_to_reload: Option<Vec<&ComponentKey>>,
230231
) -> Result<bool, ()> {
231232
info!("Reloading running topology with new configuration.");
232233

@@ -244,7 +245,9 @@ impl RunningTopology {
244245
//
245246
// We also shutdown any component that is simply being removed entirely.
246247
let diff = ConfigDiff::new(&self.config, &new_config);
247-
let buffers = self.shutdown_diff(&diff, &new_config).await;
248+
let buffers = self
249+
.shutdown_diff(&diff, &new_config, components_to_reload)
250+
.await;
248251

249252
// Gives windows some time to make available any port
250253
// released by shutdown components.
@@ -349,6 +352,7 @@ impl RunningTopology {
349352
&mut self,
350353
diff: &ConfigDiff,
351354
new_config: &Config,
355+
components_to_reload: Option<Vec<&ComponentKey>>,
352356
) -> HashMap<ComponentKey, BuiltBuffer> {
353357
// First, we shutdown any changed/removed sources. This ensures that we can allow downstream
354358
// components to terminate naturally by virtue of the flow of events stopping.
@@ -532,7 +536,7 @@ impl RunningTopology {
532536
// they can naturally shutdown and allow us to recover their buffers if possible.
533537
let mut buffer_tx = HashMap::new();
534538

535-
let sinks_to_change = diff
539+
let mut sinks_to_change = diff
536540
.sinks
537541
.to_change
538542
.iter()
@@ -543,6 +547,11 @@ impl RunningTopology {
543547
.is_some()
544548
}))
545549
.collect::<Vec<_>>();
550+
551+
if let Some(mut components) = components_to_reload {
552+
sinks_to_change.append(&mut components)
553+
}
554+
546555
for key in &sinks_to_change {
547556
debug!(component = %key, "Changing sink.");
548557
if reuse_buffers.contains(key) {

‎src/topology/test/doesnt_reload.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ async fn topology_doesnt_reload_new_data_dir() {
2323
new_config.global.data_dir = Some(Path::new("/qwerty").to_path_buf());
2424

2525
topology
26-
.reload_config_and_respawn(new_config.build().unwrap(), Default::default())
26+
.reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None)
2727
.await
2828
.unwrap();
2929

‎src/topology/test/mod.rs

+12-12
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ async fn topology_remove_one_source() {
301301
config.add_sink("out1", &["in1"], sink1);
302302

303303
assert!(topology
304-
.reload_config_and_respawn(config.build().unwrap(), Default::default())
304+
.reload_config_and_respawn(config.build().unwrap(), Default::default(), None)
305305
.await
306306
.unwrap());
307307

@@ -350,7 +350,7 @@ async fn topology_remove_one_sink() {
350350
config.add_sink("out1", &["in1"], basic_sink(10).1);
351351

352352
assert!(topology
353-
.reload_config_and_respawn(config.build().unwrap(), Default::default())
353+
.reload_config_and_respawn(config.build().unwrap(), Default::default(), None)
354354
.await
355355
.unwrap());
356356

@@ -403,7 +403,7 @@ async fn topology_remove_one_transform() {
403403
config.add_sink("out1", &["t2"], sink2);
404404

405405
assert!(topology
406-
.reload_config_and_respawn(config.build().unwrap(), Default::default())
406+
.reload_config_and_respawn(config.build().unwrap(), Default::default(), None)
407407
.await
408408
.unwrap());
409409

@@ -452,7 +452,7 @@ async fn topology_swap_source() {
452452
config.add_sink("out1", &["in2"], sink2);
453453

454454
assert!(topology
455-
.reload_config_and_respawn(config.build().unwrap(), Default::default())
455+
.reload_config_and_respawn(config.build().unwrap(), Default::default(), None)
456456
.await
457457
.unwrap());
458458

@@ -517,7 +517,7 @@ async fn topology_swap_transform() {
517517
config.add_sink("out1", &["t1"], sink2);
518518

519519
assert!(topology
520-
.reload_config_and_respawn(config.build().unwrap(), Default::default())
520+
.reload_config_and_respawn(config.build().unwrap(), Default::default(), None)
521521
.await
522522
.unwrap());
523523

@@ -569,7 +569,7 @@ async fn topology_swap_sink() {
569569
config.add_sink("out1", &["in1"], sink2);
570570

571571
assert!(topology
572-
.reload_config_and_respawn(config.build().unwrap(), Default::default())
572+
.reload_config_and_respawn(config.build().unwrap(), Default::default(), None)
573573
.await
574574
.unwrap());
575575

@@ -657,7 +657,7 @@ async fn topology_swap_transform_is_atomic() {
657657
config.add_sink("out1", &["t1"], basic_sink(10).1);
658658

659659
assert!(topology
660-
.reload_config_and_respawn(config.build().unwrap(), Default::default())
660+
.reload_config_and_respawn(config.build().unwrap(), Default::default(), None)
661661
.await
662662
.unwrap());
663663

@@ -693,7 +693,7 @@ async fn topology_rebuild_connected() {
693693
config.add_sink("out1", &["in1"], sink1);
694694

695695
assert!(topology
696-
.reload_config_and_respawn(config.build().unwrap(), Default::default())
696+
.reload_config_and_respawn(config.build().unwrap(), Default::default(), None)
697697
.await
698698
.unwrap());
699699

@@ -752,7 +752,7 @@ async fn topology_rebuild_connected_transform() {
752752
config.add_sink("out1", &["t2"], sink2);
753753

754754
assert!(topology
755-
.reload_config_and_respawn(config.build().unwrap(), Default::default())
755+
.reload_config_and_respawn(config.build().unwrap(), Default::default(), None)
756756
.await
757757
.unwrap());
758758

@@ -807,7 +807,7 @@ async fn topology_optional_healthcheck_does_not_fail_reload() {
807807
let (mut topology, _) = start_topology(config, false).await;
808808
let config = basic_config_with_sink_failing_healthcheck();
809809
assert!(topology
810-
.reload_config_and_respawn(config, Default::default())
810+
.reload_config_and_respawn(config, Default::default(), None)
811811
.await
812812
.unwrap());
813813
}
@@ -820,7 +820,7 @@ async fn topology_healthcheck_not_run_on_unchanged_reload() {
820820
let mut config = basic_config_with_sink_failing_healthcheck();
821821
config.healthchecks.require_healthy = true;
822822
assert!(topology
823-
.reload_config_and_respawn(config, Default::default())
823+
.reload_config_and_respawn(config, Default::default(), None)
824824
.await
825825
.unwrap());
826826
}
@@ -846,7 +846,7 @@ async fn topology_healthcheck_run_for_changes_on_reload() {
846846
let mut config = config.build().unwrap();
847847
config.healthchecks.require_healthy = true;
848848
assert!(!topology
849-
.reload_config_and_respawn(config, Default::default())
849+
.reload_config_and_respawn(config, Default::default(), None)
850850
.await
851851
.unwrap());
852852
}

‎src/topology/test/reload.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ async fn topology_reuse_old_port() {
6565

6666
let (mut topology, _) = start_topology(old_config.build().unwrap(), false).await;
6767
assert!(topology
68-
.reload_config_and_respawn(new_config.build().unwrap(), Default::default())
68+
.reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None)
6969
.await
7070
.unwrap());
7171
}
@@ -90,7 +90,7 @@ async fn topology_rebuild_old() {
9090

9191
let (mut topology, _) = start_topology(old_config.build().unwrap(), false).await;
9292
assert!(!topology
93-
.reload_config_and_respawn(new_config.build().unwrap(), Default::default())
93+
.reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None)
9494
.await
9595
.unwrap());
9696
}
@@ -107,7 +107,7 @@ async fn topology_old() {
107107

108108
let (mut topology, _) = start_topology(old_config.clone().build().unwrap(), false).await;
109109
assert!(topology
110-
.reload_config_and_respawn(old_config.build().unwrap(), Default::default())
110+
.reload_config_and_respawn(old_config.build().unwrap(), Default::default(), None)
111111
.await
112112
.unwrap());
113113
}
@@ -258,7 +258,7 @@ async fn topology_readd_input() {
258258
new_config.add_source("in2", internal_metrics_source());
259259
new_config.add_sink("out", &["in1"], prom_exporter_sink(address_0, 1));
260260
assert!(topology
261-
.reload_config_and_respawn(new_config.build().unwrap(), Default::default())
261+
.reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None)
262262
.await
263263
.unwrap());
264264

@@ -268,7 +268,7 @@ async fn topology_readd_input() {
268268
new_config.add_source("in2", internal_metrics_source());
269269
new_config.add_sink("out", &["in1", "in2"], prom_exporter_sink(address_0, 1));
270270
assert!(topology
271-
.reload_config_and_respawn(new_config.build().unwrap(), Default::default())
271+
.reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None)
272272
.await
273273
.unwrap());
274274

@@ -299,7 +299,7 @@ async fn reload_sink_test(
299299

300300
// Now reload the topology with the "new" configuration, and make sure that a component is now listening on `new_address`.
301301
assert!(topology
302-
.reload_config_and_respawn(new_config, Default::default())
302+
.reload_config_and_respawn(new_config, Default::default(), None)
303303
.await
304304
.unwrap());
305305

‎src/topology/test/transient_state.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ async fn closed_source() {
3030
topology.sources_finished().await;
3131

3232
assert!(topology
33-
.reload_config_and_respawn(new_config.build().unwrap(), Default::default())
33+
.reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None)
3434
.await
3535
.unwrap());
3636
}
@@ -52,7 +52,7 @@ async fn remove_sink() {
5252

5353
let (mut topology, _) = start_topology(old_config.build().unwrap(), false).await;
5454
assert!(topology
55-
.reload_config_and_respawn(new_config.build().unwrap(), Default::default())
55+
.reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None)
5656
.await
5757
.unwrap());
5858
}
@@ -75,7 +75,7 @@ async fn remove_transform() {
7575

7676
let (mut topology, _) = start_topology(old_config.build().unwrap(), false).await;
7777
assert!(topology
78-
.reload_config_and_respawn(new_config.build().unwrap(), Default::default())
78+
.reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None)
7979
.await
8080
.unwrap());
8181
}
@@ -99,7 +99,7 @@ async fn replace_transform() {
9999

100100
let (mut topology, _) = start_topology(old_config.build().unwrap(), false).await;
101101
assert!(topology
102-
.reload_config_and_respawn(new_config.build().unwrap(), Default::default())
102+
.reload_config_and_respawn(new_config.build().unwrap(), Default::default(), None)
103103
.await
104104
.unwrap());
105105
}

0 commit comments

Comments
 (0)
Please sign in to comment.