Skip to content

Commit

Permalink
KAFKA-15372: Reconfigure dedicated MM2 connectors after leadership ch…
Browse files Browse the repository at this point in the history
…ange (#14293)

Signed-off-by: Greg Harris <[email protected]>
Reviewers: Chris Egerton <[email protected]>
  • Loading branch information
gharris1727 authored Dec 12, 2023
1 parent 68389c2 commit 3f7eada
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.mirror;

import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;

import static org.apache.kafka.connect.mirror.MirrorMaker.CONNECTOR_CLASSES;

public class MirrorHerder extends DistributedHerder {

private static final Logger log = LoggerFactory.getLogger(MirrorHerder.class);

private final MirrorMakerConfig config;
private final SourceAndTarget sourceAndTarget;
private boolean wasLeader;

public MirrorHerder(MirrorMakerConfig mirrorConfig, SourceAndTarget sourceAndTarget, DistributedConfig config, Time time, Worker worker, String kafkaClusterId, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, String restUrl, RestClient restClient, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, List<String> restNamespace, AutoCloseable... uponShutdown) {
super(config, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, restUrl, restClient, connectorClientConfigOverridePolicy, restNamespace, uponShutdown);
this.config = mirrorConfig;
this.sourceAndTarget = sourceAndTarget;
}

@Override
protected void rebalanceSuccess() {
if (isLeader()) {
if (!wasLeader) {
log.info("This node {} is now a leader for {}. Configuring connectors...", this, sourceAndTarget);
configureConnectors();
}
wasLeader = true;
} else {
wasLeader = false;
}
}

private void configureConnectors() {
CONNECTOR_CLASSES.forEach(this::maybeConfigureConnector);
}

private void maybeConfigureConnector(Class<?> connectorClass) {
Map<String, String> desiredConfig = config.connectorBaseConfig(sourceAndTarget, connectorClass);
Map<String, String> actualConfig = configState.connectorConfig(connectorClass.getSimpleName());
if (actualConfig == null || !actualConfig.equals(desiredConfig)) {
configureConnector(connectorClass.getSimpleName(), desiredConfig);
} else {
log.info("This node is a leader for {} and configuration for {} is already up to date.", sourceAndTarget, connectorClass.getSimpleName());
}
}

private void configureConnector(String connectorName, Map<String, String> connectorProps) {
putConnectorConfig(connectorName, connectorProps, true, (e, x) -> {
if (e == null) {
log.info("{} connector configured for {}.", connectorName, sourceAndTarget);
} else if (e instanceof NotLeaderException) {
// No way to determine if the herder is a leader or not beforehand.
log.info("This node lost leadership for {} while trying to update the connector configuration for {}. Using existing connector configuration.", connectorName, sourceAndTarget);
} else {
log.error("Failed to configure {} connector for {}", connectorName, sourceAndTarget, e);
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
Expand Down Expand Up @@ -198,8 +198,7 @@ public void start() {
log.info("Initializing internal REST resources");
internalServer.initializeInternalResources(herders);
}
log.info("Configuring connectors...");
herderPairs.forEach(this::configureConnectors);
log.info("Configuring connectors will happen once the worker joins the group as a leader");
log.info("Kafka MirrorMaker started");
}

Expand Down Expand Up @@ -229,32 +228,12 @@ public void awaitStop() {
}
}

private void configureConnector(SourceAndTarget sourceAndTarget, Class<?> connectorClass) {
checkHerder(sourceAndTarget);
Map<String, String> connectorProps = config.connectorBaseConfig(sourceAndTarget, connectorClass);
herders.get(sourceAndTarget)
.putConnectorConfig(connectorClass.getSimpleName(), connectorProps, true, (e, x) -> {
if (e == null) {
log.info("{} connector configured for {}.", connectorClass.getSimpleName(), sourceAndTarget);
} else if (e instanceof NotLeaderException) {
// No way to determine if the herder is a leader or not beforehand.
log.info("This node is a follower for {}. Using existing connector configuration.", sourceAndTarget);
} else {
log.error("Failed to configure {} connector for {}", connectorClass.getSimpleName(), sourceAndTarget, e);
}
});
}

private void checkHerder(SourceAndTarget sourceAndTarget) {
if (!herders.containsKey(sourceAndTarget)) {
throw new IllegalArgumentException("No herder for " + sourceAndTarget.toString());
}
}

private void configureConnectors(SourceAndTarget sourceAndTarget) {
CONNECTOR_CLASSES.forEach(x -> configureConnector(sourceAndTarget, x));
}

private void addHerder(SourceAndTarget sourceAndTarget) {
log.info("creating herder for " + sourceAndTarget.toString());
Map<String, String> workerProps = config.workerConfig(sourceAndTarget);
Expand Down Expand Up @@ -297,7 +276,7 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
// Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
// herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than
// tracking the various shared admin objects in this class.
Herder herder = new DistributedHerder(distributedConfig, time, worker,
Herder herder = new MirrorHerder(config, sourceAndTarget, distributedConfig, time, worker,
kafkaClusterId, statusBackingStore, configBackingStore,
advertisedUrl, restClient, clientConfigOverridePolicy,
restNamespace, sharedAdmin);
Expand Down Expand Up @@ -337,6 +316,11 @@ public ConnectorStateInfo connectorStatus(SourceAndTarget sourceAndTarget, Strin
return herders.get(sourceAndTarget).connectorStatus(connector);
}

public void taskConfigs(SourceAndTarget sourceAndTarget, String connector, Callback<List<TaskInfo>> cb) {
checkHerder(sourceAndTarget);
herders.get(sourceAndTarget).taskConfigs(connector, cb);
}

public static void main(String[] args) {
ArgumentParser parser = ArgumentParsers.newArgumentParser("connect-mirror-maker");
parser.description("MirrorMaker 2.0 driver");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
import org.apache.kafka.connect.mirror.MirrorMaker;
import org.apache.kafka.connect.mirror.MirrorSourceConfig;
import org.apache.kafka.connect.mirror.MirrorSourceConnector;
import org.apache.kafka.connect.mirror.SourceAndTarget;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.util.FutureCallback;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.test.NoRetryException;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -40,11 +44,14 @@
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
import static org.apache.kafka.connect.mirror.MirrorMaker.CONNECTOR_CLASSES;
Expand Down Expand Up @@ -104,6 +111,14 @@ private MirrorMaker startMirrorMaker(String name, Map<String, String> mmProps) {
return result;
}

private void stopMirrorMaker(String name) {
MirrorMaker mirror = mirrorMakers.remove(name);
if (mirror == null) {
throw new IllegalStateException("No MirrorMaker named " + name + " has been started");
}
mirror.stop();
}

/**
* Tests a single-node cluster without the REST server enabled.
*/
Expand Down Expand Up @@ -198,7 +213,7 @@ public void testMultiNodeCluster() throws Exception {
put("listeners", "http://localhost:0");
// Refresh topics very frequently to quickly pick up on topics that are created
// after the MM2 nodes are brought up during testing
put("refresh.topics.interval.seconds", "1");
put(MirrorSourceConfig.REFRESH_TOPICS_INTERVAL_SECONDS, "1");
put("clusters", String.join(", ", a, b));
put(a + ".bootstrap.servers", clusterA.bootstrapServers());
put(b + ".bootstrap.servers", clusterB.bootstrapServers());
Expand Down Expand Up @@ -226,6 +241,9 @@ public void testMultiNodeCluster() throws Exception {
put("offset.storage.replication.factor", "1");
put("status.storage.replication.factor", "1");
put("config.storage.replication.factor", "1");
// For the multi-node case, we wait for reassignment so shorten the delay period.
put(a + "." + DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "1000");
put(b + "." + DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "1000");
}};

final SourceAndTarget sourceAndTarget = new SourceAndTarget(a, b);
Expand Down Expand Up @@ -260,6 +278,22 @@ public void testMultiNodeCluster() throws Exception {
// and wait for MirrorMaker to copy it to cluster B
awaitTopicContent(clusterB, b, a + "." + topic, messagesPerTopic);
}

// Perform a rolling restart of the cluster with a new configuration
Map<String, String> newMmProps = new HashMap<>(mmProps);
String newConfigValue = "2";
newMmProps.put(MirrorSourceConfig.REFRESH_TOPICS_INTERVAL_SECONDS, newConfigValue);
for (int i = 0; i < numNodes; i++) {
stopMirrorMaker("node " + i);
MirrorMaker any = mirrorMakers.values().stream().findAny().get();
// Wait for the cluster finish the reassignment and rebalance before bringing up the next node.
awaitConnectorTasksStart(any, MirrorHeartbeatConnector.class, sourceAndTarget);
awaitConnectorTasksStart(any, MirrorSourceConnector.class, sourceAndTarget);
startMirrorMaker("node " + i, newMmProps);
}
// Assert that the new configuration is propagated
awaitTaskConfigurations(mirrorMakers.get("node 0"), MirrorSourceConnector.class, sourceAndTarget,
config -> newConfigValue.equals(config.get(MirrorSourceConfig.REFRESH_TOPICS_INTERVAL_SECONDS)));
}
}

Expand Down Expand Up @@ -308,6 +342,23 @@ private <T extends SourceConnector> void awaitConnectorTasksStart(final MirrorMa
}, MM_START_UP_TIMEOUT_MS, "Tasks for connector " + clazz.getSimpleName() + " for MirrorMaker instances did not transition to running in time");
}

private <T extends SourceConnector> void awaitTaskConfigurations(MirrorMaker mm, Class<T> clazz, SourceAndTarget sourceAndTarget, Predicate<Map<String, String>> predicate) throws InterruptedException {
String connName = clazz.getSimpleName();
waitForCondition(() -> {
try {
FutureCallback<List<TaskInfo>> cb = new FutureCallback<>();
mm.taskConfigs(sourceAndTarget, connName, cb);
return cb.get(MM_START_UP_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.stream()
.map(TaskInfo::config)
.allMatch(predicate);
} catch (Exception ex) {
log.error("Something unexpected occurred. Unable to get configuration of connector {} for mirror maker with source->target={}", connName, sourceAndTarget, ex);
throw new NoRetryException(ex);
}
}, MM_START_UP_TIMEOUT_MS, "Connector configuration for " + connName + " for MirrorMaker instances is incorrect");
}

private void awaitTopicContent(EmbeddedKafkaCluster cluster, String clusterName, String topic, int numMessages) throws Exception {
try (Consumer<?, ?> consumer = cluster.createConsumer(Collections.singletonMap(AUTO_OFFSET_RESET_CONFIG, "earliest"))) {
consumer.subscribe(Collections.singleton(topic));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1678,7 +1678,7 @@ public void setClusterLoggerLevel(String namespace, String level) {
}

// Should only be called from work thread, so synchronization should not be needed
private boolean isLeader() {
protected boolean isLeader() {
return assignment != null && member.memberId().equals(assignment.leader());
}

Expand Down Expand Up @@ -1804,9 +1804,17 @@ private boolean handleRebalanceCompleted() {
member.requestRejoin();
return false;
}
rebalanceSuccess();
return true;
}

/**
* Hook for performing operations after a successful rebalance
*/
protected void rebalanceSuccess() {
// This space intentionally left blank.
}

/**
* Try to read to the end of the config log within the given timeout. If unsuccessful, leave the group
* and wait for a brief backoff period before returning
Expand Down

0 comments on commit 3f7eada

Please sign in to comment.