Skip to content

Commit 5b12b19

Browse files
feat: Inject vector aggregator address as env into vector config (#844)
* inject vector aggregator address as env into config & add watch for referenced cms * add pr number to changelog * rename store * chore: Use borrows * chore: Fix changelog headings --------- Co-authored-by: Nick Larsen <[email protected]>
1 parent 1a5b2be commit 5b12b19

File tree

8 files changed

+135
-154
lines changed

8 files changed

+135
-154
lines changed

Diff for: CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,16 @@ All notable changes to this project will be documented in this file.
1010
- BREAKING: The file log directory was set by `KAFKA_OPERATOR_LOG_DIRECTORY`, and is now set by `ROLLING_LOGS`
1111
(or via `--rolling-logs <DIRECTORY>`).
1212
- Replace stackable-operator `print_startup_string` with `tracing::info!` with fields.
13+
- BREAKING: Inject the vector aggregator address into the vector config using the env var `VECTOR_AGGREGATOR_ADDRESS` instead
14+
of having the operator write it to the vector config ([#844]).
1315

1416
### Fixed
1517

1618
- Use `json` file extension for log files ([#846]).
19+
- Fix a bug where changes to ConfigMaps that are referenced in the KafkaCluster spec didn't trigger a reconciliation ([#844]).
1720

1821
[#840]: https://github.com/stackabletech/kafka-operator/pull/840
22+
[#844]: https://github.com/stackabletech/kafka-operator/pull/844
1923
[#846]: https://github.com/stackabletech/kafka-operator/pull/846
2024

2125
## [25.3.0] - 2025-03-21

Diff for: Cargo.lock

+4-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: Cargo.nix

+7-7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ repository = "https://github.com/stackabletech/kafka-operator"
1111

1212
[workspace.dependencies]
1313
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }
14-
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.89.1" }
14+
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.90.0" }
1515
stackable-telemetry = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-telemetry-0.4.0" }
1616
stackable-versioned = { git = "https://github.com/stackabletech/operator-rs.git", features = ["k8s"], tag = "stackable-versioned-0.7.1" }
1717

Diff for: crate-hashes.json

+3-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: rust/operator-binary/src/kafka_controller.rs

+32-39
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,7 @@ use crate::{
9292
graceful_shutdown::{add_graceful_shutdown_config, graceful_shutdown_config_properties},
9393
pdb::add_pdbs,
9494
},
95-
product_logging::{
96-
LOG4J_CONFIG_FILE, MAX_KAFKA_LOG_FILES_SIZE, extend_role_group_config_map,
97-
resolve_vector_aggregator_address,
98-
},
95+
product_logging::{LOG4J_CONFIG_FILE, MAX_KAFKA_LOG_FILES_SIZE, extend_role_group_config_map},
9996
utils::build_recommended_labels,
10097
};
10198

@@ -253,10 +250,8 @@ pub enum Error {
253250
#[snafu(display("failed to resolve and merge config for role and role group"))]
254251
FailedToResolveConfig { source: crate::crd::Error },
255252

256-
#[snafu(display("failed to resolve the Vector aggregator address"))]
257-
ResolveVectorAggregatorAddress {
258-
source: crate::product_logging::Error,
259-
},
253+
#[snafu(display("vector agent is enabled but vector aggregator ConfigMap is missing"))]
254+
VectorAggregatorConfigMapMissing,
260255

261256
#[snafu(display("failed to add the logging configuration to the ConfigMap [{cm_name}]"))]
262257
InvalidLoggingConfig {
@@ -396,7 +391,7 @@ impl ReconcilerError for Error {
396391
Error::FailedToInitializeSecurityContext { .. } => None,
397392
Error::CreateClusterResources { .. } => None,
398393
Error::FailedToResolveConfig { .. } => None,
399-
Error::ResolveVectorAggregatorAddress { .. } => None,
394+
Error::VectorAggregatorConfigMapMissing { .. } => None,
400395
Error::InvalidLoggingConfig { .. } => None,
401396
Error::ApplyServiceAccount { .. } => None,
402397
Error::ApplyRoleBinding { .. } => None,
@@ -508,10 +503,6 @@ pub async fn reconcile_kafka(
508503
None
509504
};
510505

511-
let vector_aggregator_address = resolve_vector_aggregator_address(kafka, client)
512-
.await
513-
.context(ResolveVectorAggregatorAddressSnafu)?;
514-
515506
let mut ss_cond_builder = StatefulSetConditionBuilder::default();
516507

517508
let (rbac_sa, rbac_rolebinding) = build_rbac_resources(
@@ -550,7 +541,6 @@ pub async fn reconcile_kafka(
550541
&rolegroup_ref,
551542
rolegroup_config,
552543
&merged_config,
553-
vector_aggregator_address.as_deref(),
554544
)?;
555545
let rg_statefulset = build_broker_rolegroup_statefulset(
556546
kafka,
@@ -688,7 +678,6 @@ fn build_broker_rolegroup_config_map(
688678
rolegroup: &RoleGroupRef<v1alpha1::KafkaCluster>,
689679
broker_config: &HashMap<PropertyNameKind, BTreeMap<String, String>>,
690680
merged_config: &KafkaConfig,
691-
vector_aggregator_address: Option<&str>,
692681
) -> Result<ConfigMap> {
693682
let mut server_cfg = broker_config
694683
.get(&PropertyNameKind::File(SERVER_PROPERTIES_FILE.to_string()))
@@ -751,15 +740,11 @@ fn build_broker_rolegroup_config_map(
751740
tracing::debug!(?server_cfg, "Applied server config");
752741
tracing::debug!(?jvm_sec_props, "Applied JVM config");
753742

754-
extend_role_group_config_map(
755-
rolegroup,
756-
vector_aggregator_address,
757-
&merged_config.logging,
758-
&mut cm_builder,
759-
)
760-
.context(InvalidLoggingConfigSnafu {
761-
cm_name: rolegroup.object_name(),
762-
})?;
743+
extend_role_group_config_map(rolegroup, &merged_config.logging, &mut cm_builder).context(
744+
InvalidLoggingConfigSnafu {
745+
cm_name: rolegroup.object_name(),
746+
},
747+
)?;
763748

764749
cm_builder
765750
.build()
@@ -1114,21 +1099,29 @@ fn build_broker_rolegroup_statefulset(
11141099

11151100
// Add vector container after kafka container to keep the defaulting into kafka container
11161101
if merged_config.logging.enable_vector_agent {
1117-
pod_builder.add_container(
1118-
product_logging::framework::vector_container(
1119-
resolved_product_image,
1120-
"config",
1121-
"log",
1122-
merged_config.logging.containers.get(&Container::Vector),
1123-
ResourceRequirementsBuilder::new()
1124-
.with_cpu_request("250m")
1125-
.with_cpu_limit("500m")
1126-
.with_memory_request("128Mi")
1127-
.with_memory_limit("128Mi")
1128-
.build(),
1129-
)
1130-
.context(ConfigureLoggingSnafu)?,
1131-
);
1102+
match &kafka.spec.cluster_config.vector_aggregator_config_map_name {
1103+
Some(vector_aggregator_config_map_name) => {
1104+
pod_builder.add_container(
1105+
product_logging::framework::vector_container(
1106+
resolved_product_image,
1107+
"config",
1108+
"log",
1109+
merged_config.logging.containers.get(&Container::Vector),
1110+
ResourceRequirementsBuilder::new()
1111+
.with_cpu_request("250m")
1112+
.with_cpu_limit("500m")
1113+
.with_memory_request("128Mi")
1114+
.with_memory_limit("128Mi")
1115+
.build(),
1116+
vector_aggregator_config_map_name,
1117+
)
1118+
.context(ConfigureLoggingSnafu)?,
1119+
);
1120+
}
1121+
None => {
1122+
VectorAggregatorConfigMapMissingSnafu.fail()?;
1123+
}
1124+
}
11321125
}
11331126

11341127
add_graceful_shutdown_config(merged_config, &mut pod_builder).context(GracefulShutdownSnafu)?;

Diff for: rust/operator-binary/src/main.rs

+82-48
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@ use stackable_operator::{
1414
rbac::v1::RoleBinding,
1515
},
1616
kube::{
17+
ResourceExt,
1718
core::DeserializeGuard,
1819
runtime::{
1920
Controller,
2021
events::{Recorder, Reporter},
22+
reflector::ObjectRef,
2123
watcher,
2224
},
2325
},
@@ -156,55 +158,87 @@ pub async fn create_controller(
156158
instance: None,
157159
}));
158160

159-
Controller::new(
161+
let kafka_controller = Controller::new(
160162
namespace.get_api::<DeserializeGuard<v1alpha1::KafkaCluster>>(&client),
161163
watcher::Config::default(),
162-
)
163-
.owns(
164-
namespace.get_api::<StatefulSet>(&client),
165-
watcher::Config::default(),
166-
)
167-
.owns(
168-
namespace.get_api::<Service>(&client),
169-
watcher::Config::default(),
170-
)
171-
.owns(
172-
namespace.get_api::<Listener>(&client),
173-
watcher::Config::default(),
174-
)
175-
.owns(
176-
namespace.get_api::<ConfigMap>(&client),
177-
watcher::Config::default(),
178-
)
179-
.owns(
180-
namespace.get_api::<ServiceAccount>(&client),
181-
watcher::Config::default(),
182-
)
183-
.owns(
184-
namespace.get_api::<RoleBinding>(&client),
185-
watcher::Config::default(),
186-
)
187-
.shutdown_on_signal()
188-
.run(
189-
kafka_controller::reconcile_kafka,
190-
kafka_controller::error_policy,
191-
Arc::new(kafka_controller::Ctx {
192-
client: client.clone(),
193-
product_config,
194-
}),
195-
)
196-
// We can let the reporting happen in the background
197-
.for_each_concurrent(
198-
16, // concurrency limit
199-
move |result| {
200-
// The event_recorder needs to be shared across all invocations, so that
201-
// events are correctly aggregated
202-
let event_recorder = event_recorder.clone();
203-
async move {
204-
report_controller_reconciled(&event_recorder, KAFKA_FULL_CONTROLLER_NAME, &result)
164+
);
165+
let config_map_store = kafka_controller.store();
166+
kafka_controller
167+
.owns(
168+
namespace.get_api::<StatefulSet>(&client),
169+
watcher::Config::default(),
170+
)
171+
.owns(
172+
namespace.get_api::<Service>(&client),
173+
watcher::Config::default(),
174+
)
175+
.owns(
176+
namespace.get_api::<Listener>(&client),
177+
watcher::Config::default(),
178+
)
179+
.owns(
180+
namespace.get_api::<ConfigMap>(&client),
181+
watcher::Config::default(),
182+
)
183+
.owns(
184+
namespace.get_api::<ServiceAccount>(&client),
185+
watcher::Config::default(),
186+
)
187+
.owns(
188+
namespace.get_api::<RoleBinding>(&client),
189+
watcher::Config::default(),
190+
)
191+
.shutdown_on_signal()
192+
.watches(
193+
namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
194+
watcher::Config::default(),
195+
move |config_map| {
196+
config_map_store
197+
.state()
198+
.into_iter()
199+
.filter(move |kafka| references_config_map(kafka, &config_map))
200+
.map(|kafka| ObjectRef::from_obj(&*kafka))
201+
},
202+
)
203+
.run(
204+
kafka_controller::reconcile_kafka,
205+
kafka_controller::error_policy,
206+
Arc::new(kafka_controller::Ctx {
207+
client: client.clone(),
208+
product_config,
209+
}),
210+
)
211+
// We can let the reporting happen in the background
212+
.for_each_concurrent(
213+
16, // concurrency limit
214+
move |result| {
215+
// The event_recorder needs to be shared across all invocations, so that
216+
// events are correctly aggregated
217+
let event_recorder = event_recorder.clone();
218+
async move {
219+
report_controller_reconciled(
220+
&event_recorder,
221+
KAFKA_FULL_CONTROLLER_NAME,
222+
&result,
223+
)
205224
.await;
206-
}
207-
},
208-
)
209-
.await;
225+
}
226+
},
227+
)
228+
.await;
229+
}
230+
231+
fn references_config_map(
232+
kafka: &DeserializeGuard<v1alpha1::KafkaCluster>,
233+
config_map: &DeserializeGuard<ConfigMap>,
234+
) -> bool {
235+
let Ok(kafka) = &kafka.0 else {
236+
return false;
237+
};
238+
239+
kafka.spec.cluster_config.zookeeper_config_map_name == config_map.name_any()
240+
|| match &kafka.spec.cluster_config.authorization.opa {
241+
Some(opa_config) => opa_config.config_map_name == config_map.name_any(),
242+
None => false,
243+
}
210244
}

0 commit comments

Comments
 (0)