-
Notifications
You must be signed in to change notification settings - Fork 1
/
reassign-replicas.sh
executable file
·105 lines (88 loc) · 2.81 KB
/
reassign-replicas.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#!/bin/bash
KAFKA_HOME=/opt/kafka;
ZK_CONNECT=$1
TMP_DESCRIBE_PATH=/tmp/describe
TMP_VERIFY_PATH=/tmp/verify
VERSION=1
REASSIGNMENT_JSON_FILE=/tmp/execute.json
NEW_BROKER=$2
NOT_COMPLETED_SUCCESSFULLY="true"
echo "ZK_CONNECT = $ZK_CONNECT"
echo "NEW_BROKERS = $NEW_BROKER"
$KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper $ZK_CONNECT > $TMP_DESCRIBE_PATH
array_contains () {
local array="$1[@]"
local seeking=$2
local in=1
for element in "${!array}"; do
if [[ $element == $seeking ]]; then
in=0
break
fi
done
return $in
}
if [ -s $TMP_DESCRIBE_PATH ]
then
rm $REASSIGNMENT_JSON_FILE
echo "{\"version\":$VERSION,\"partitions\":[" >> $REASSIGNMENT_JSON_FILE
IFS=',' read -ra NEW_BROKER_ARRAY <<< "$NEW_BROKER"
while read line
do
BROKER_LIST=()
echo "$line" | grep -q "ReplicationFactor"
if [ $? -ne 0 ];then
# echo $line
topic=`echo "$line" | awk '{print $2}'`
partition=`echo "$line" | awk '{print $4}'`
replicas=`echo "$line" | awk '{print $8}'`
isr=`echo "$line" | awk '{print $10}'`
IFS=','
read -ra REPLICA_ARRAY <<< "$replicas"
read -ra ISR_ARRAY <<< "$isr"
BROKER_LIST=( "${ISR_ARRAY[@]}" )
while [ "${#BROKER_LIST[@]}" -lt "${#REPLICA_ARRAY[@]}" ]
do
selectedBroker=$(shuf -i 1-${#NEW_BROKER_ARRAY[@]} -n 1)
selectedBroker=`expr $selectedBroker - 1`
array_contains BROKER_LIST "${NEW_BROKER_ARRAY[$selectedBroker]}" && ALREADY_EXISTS="true" || ALREADY_EXISTS="false"
if [ "$ALREADY_EXISTS" == "false" ] ; then
BROKER_LIST+=(${NEW_BROKER_ARRAY[$selectedBroker]})
fi
done
BROKER_LIST_STR=""
for i in "${BROKER_LIST[@]}"; do
BROKER_LIST_STR+="$i,"
done
BROKER_LIST_STR="${BROKER_LIST_STR%?}"
TOPICS_JSON+=`echo "{\"topic\":\"$topic\"","\"partition\":$partition,\"replicas\":[$BROKER_LIST_STR]},"`
fi
done < $TMP_DESCRIBE_PATH
TOPICS_JSON="${TOPICS_JSON%?}"
echo "$TOPICS_JSON" >> $REASSIGNMENT_JSON_FILE
echo "]}" >> $REASSIGNMENT_JSON_FILE
$KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper $ZK_CONNECT --reassignment-json-file $REASSIGNMENT_JSON_FILE --execute
echo "Verify the status of the partition reassignment"
while [ "$NOT_COMPLETED_SUCCESSFULLY" == "true" ]
do
$KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper $ZK_CONNECT --reassignment-json-file $REASSIGNMENT_JSON_FILE --verify > $TMP_VERIFY_PATH
while read line
do
echo "$line" | grep -q "Status of partition reassignment"
#echo "$?"
if [ $? -ne 0 ];then
readLine=`echo "$line"`
echo "$readLine"
if [[ $readLine == *"completed successfully"* ]]; then
NOT_COMPLETED_SUCCESSFULLY="false"
else
NOT_COMPLETED_SUCCESSFULLY="true"
break
fi
fi
done < $TMP_VERIFY_PATH
sleep 1
done
else
echo "No topics found"
fi