Self-Balancing Clusters simplify the management of Kafka clusters in the following ways:
- When the load on the cluster is unevenly distributed, Self-balancing will automatically rebalance partitions to optimize performance
- When a new broker is added to the cluster, Self-Balancing Clusters will automatically fill it with partitions
- When an operator wants to remove a broker, she can call a Self-Balancing API to shut down the broker and drain the partitions from it
- When a broker has been down for a certain amount of time, Self-Balancing Clusters will automatically reassign the partitions to other brokers
To learn more, refer to Self-Balancing Clusters in the Confluent documentation.
Self-Balancing runs on the Confluent Server brokers and does not introduce any new dependencies.
This demo showcases the main features of Self-Balancing Clusters, which debuts with the release of Confluent Platform CP 6.0.0
In order to run this demo, you will need:
- Docker
This demo will pull the following images:
- ZooKeeper
- Kafka
- Confluent Control Center (optional to also try out the GUI)
We will create uneven load in the cluster and watch Self-Balancing address this condition.
-
Run
docker-compose
docker-compose -f kafka-0-1-2.yml up
(we will be looking at the logs to see some interesting information so it's recommended to run it in the foreground).This will start ZooKeeper, 3 Confluent Server brokers and Confluent Control Center.
-
Create a topic
kafka-topics \ --bootstrap-server localhost:9092 \ --create \ --topic sbk \ --replica-assignment 0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1
We are forcing the topic to not create replicas in broker 2 to create an uneven load.
-
Produce data
kafka-producer-perf-test \ --producer-props bootstrap.servers=localhost:9092 \ --topic sbk \ --record-size 1000 \ --throughput 1000 \ --num-records 3600000
This will produce data at about 1MB/s during 1h.
-
Start watching changes in the topic
watch kafka-topics \ --bootstrap-server localhost:9092 \ --describe \ --topic sbk
This will show changes in replica assignments, in-sync replicas, as well as relevant information such as replication throttling details. Optionally, Confluent Control Center can be used to watch the same changes.
-
Wait for Self-Balancing to start the rebalance
Self-Balancing samples data about disk use, leader count, partition count, network throughput and other variables. It then aggregates this data to make informed decisions about how to reassign partitions. It needs between 10 and 20 minutes from startup to collect enough samples to generate a rebalacing plan (if one is needed). Self-Balancing also invalidates previous samples when the number of partitions in the cluster changes materially since they may not accurately reflect the current state of the cluster.
While Self-Balancing is still sampling, the following message will appear on the logs periodically:
INFO Skipping proposal precomputing because load monitor does not have enough snapshots.
-
Watch Self-Balancing rebalance the cluster
Once Self-balacing is ready to compute reassignment plans, the following message will appear:
INFO Finished the precomputation proposal candidates in * ms (com.linkedin.kafka.cruisecontrol.analyzer.GoalOptimizer)
Self-Balancing should then start the rebalancing process. Monitor the logs and the
watch kafka-topics
command to observe the changes as they happen.
We will now add 2 more brokers to the cluster and watch Self-Balancing fill them with partitions.
-
Run
docker-compose
docker-compose -f kafka-0-1-2.yml -f kafka-3-4.yml up --no-recreate
will add 2 brokers to the setup that was running previously (do not stop any of the processes from the first part of the tutorial). -
Watch Self-Balancing rebalance the cluster
Self-Balancing should be able to use the data it has already sampled, and the rebalance should kick off almost immediately.
Note in the logs how Self-Balancing is also detecting the fact that we are rebalancing to a new broker, and not just the fact that the cluster is out of balance. This is important because Self-Balancing offers a config called
confluent.balancer.heal.uneven.load.trigger
which can be set to eitherANY_UNEVEN_LOAD
orEMPTY_BROKER
.EMPTY_BROKER
will only rebalance when Self-Balancing finds an empty broker (this expansion scenario), butANY_UNEVEN_LOAD
will rebalance when the load is uneven regardless of the cause (the previous uneven load scenario).
We will now remove a broker from the cluster and watch Self-Balancing shut it down and drain the partitions.
-
Trigger a broker removal
For this example, we will be using the Confluent Server REST API.
First,
curl localhost:8090/kafka/v3/clusters
will return a collection with all the clusters the REST API can address. In this case, there is only one cluster. Note the value ofdata[0].cluster_id
as the id of our cluster.Then,
curl -X DELETE localhost:8090/kafka/v3/clusters/:cluster_id/brokers/3
(replace:cluster_id
with the cluster id from the previous step) will trigger the removal of broker 3. This should return202 Accepted
.Alternatively, you can perform this step with the new CLI tool,
kafka-remove-brokers
, that ships with Confluent Platform 6.0.0. The equivalent CLI command is:kafka-remove-brokers \ --bootstrap-server localhost:9092 \ --delete \ --broker-id 3
Finally, this operation can also be executed from Confluent Control Center.
-
Watch Self-Balancing remove the broker
Self-Balancing should be able to use the data it has already sampled, and the removal should kick off almost immediately. The removal, however, is a long-running process (because it involves data movement).
curl localhost:8090/kafka/v3/clusters/:cluster_id/remove-broker-tasks/3
will return apartition_reassignment_status
and abroker_shutdown_status
for the broker removal task.The alternative for CLI users in this case is:
kafka-remove-brokers \ --bootstrap-server localhost:9092 \ --describe \ --broker-id 3
We will now simulate a broker failure and watch Self-Balancing address this condition.
-
Stop a container
docker container stop $(docker container ls -q --filter name=kafka4)
will stop the container running broker 4. -
Watch Self-Balancing create new replicas
Self-Balancing should be able to use the data it has already sampled, and the creation of new replicas should kick off almost immediately.
Note that we have also set the config
confluent.balancer.heal.broker.failure.threshold.ms
to5000
, meaning that Self-Balancing will consider the broker dead after only 5s. This is not suitable for production environments, where typically timeouts should be set to between 30min and 1h (or whatever is most reasonable for the customer), but it is helpful here so that the demo can run faster.