-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka-setup.txt
208 lines (149 loc) · 8.11 KB
/
kafka-setup.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
Kafka Setup
sudo mkdir -p /data/kafka
#Add file limits - allow to open 100000 file descriptors by kafka
echo "* hard nofile 100000
* soft nofile 100000" | sudo tee --append /etc/security/limits.conf
#Reboot the system for setting the new file limits
sudo reboot
#Change the server configs
vi config/server.properties
advertised.listeners=PLAINTEXT://kafka-1:9092
delete.topic.enable=true
logs.dir=/data/kafka
zookeeper.connect=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181/kafka
zookeeper.connection.timeout.ms=6000
#Script for making kafka as a daemon
#!/bin/bash
DAEMON_PATH=/home/phyadavi/kafka_2.12-2.0.0/bin
DAEMON_NAME=kafka
PATH=$PATH:$DAEMON_PATH
case "$1" in
start)
#start daemon
pid=$(ps ax|grep -i 'kafka.Kafka' | grep -v grep | awk '{print $1}')
if [ -n "$pid" ]
then
echo "Kafka is already RUNNING";
else
echo "Starting $DAEMON_NAME";
$DAEMON_PATH/kafka-server-start.sh -daemon /home/phyadavi/kafka_2.12-2.0.0/config/server.properties
fi
;;
stop)
#stop daemon
echo "Shutting down $DAEMON_NAME";
$DAEMON_PATH/kafka-server-stop.sh
;;
restart)
$0 stop
sleep 2
$0 start
;;
status)
pid=$(ps ax | grep -i 'kafka.Kafka' | grep -v grep |awk '{print $1}')
if [ -n "$pid" ]
then
echo "Kafka is RUNNING PID: $pid"
else
echo "Kafka is not RUNNING"
fi
;;
*)
echo "usage: $0 (start|stop|restart|status)"
esac
exit 0
sudo chmod +x /etc/init.d/kafka
sudo update-rc.d kafka defaults
journalctl -u kafka.service
sudo service kafka start
systemctl -l status
nc -vz localhost 9092
######################################################################
kafka-topics.sh --create --zookeeper zookeeper-1,zookeeper-2,zookeeper-3 --replication-factor 1 --partitions 1 --topic test
kafka-console-producer.sh --broker-list kafka-1:9092,kafka-2:9092,kafka-3:9092 --topic test
kafka-console-consumer.sh --bootstrap-server kafka-1:9092,kafka-2:9092,kafka-3:9092 --topic test --from-beginning
kafka-reassign-partitions.sh --broker-list "0,1,2,3,4,5,6,7,8,9,10,11,12" --zookeeper zookeeper-1,zookeeper-2,zookeeper-3 --generate --topics-to-move-json-file topics_to_move.json
kafka-reassign-partitions.sh --zookeeper zookeeper-1,zookeeper-2,zookeeper-3 --reassignment-json-file consumer_offset_repartition.json --execute
So, we finally figured out why our __consumer_offsets topics was so skewed
in volume across partitions. Turns out that the partition is chosen based
on the consumer group_id (which makes sense), and we had a few extremely
over-committing consumers in prod. A few of them were committing several
thousand times per second, even though they were only consuming one or two
records per second. So, these consumers were, over time, generating many
hundreds of billions offset commits, and the additional stress on the
brokers with those specific partitions, in turn, was generating URPs across
the entire cluster.
The most egregious offender was Apache Camel running a Kafka client
v0.10.0.1. Upgrading the Kafka clients to 0.10.2.0 seemed to fix the
problem. Separately, one of the consumers actually had a mechanism where
they were polling every 50ms, across 10 different topics, and committing
after every poll cycle, regardless of if there had been messages consumed
or not. So, once they dialed that down to commit every 3 seconds per
consumer, the problem went away.
What we have realized is that maybe choosing the __consumer_offset
partition based on the groupId is not ideal. A lot of teams will use the
same exact group ID for dozens of machines, just consuming from different
topics. All of those commits will end up on the same exact partition,
hence the same broker, and this might in turn cause performance problems.
Would it be reasonable to change this in the future, to maybe default to
current behavior, but allow a setting that would choose the partition based
on group_ID+topic name? That would be extremely helpful.
The way we figured out this whole thing was, when going through the client
source code, trying to understand how to decode the binary data in the
__consumer_offsets records, I found the *OffsetsManagerFormatter*, which
allowed me to consume the __consumer_offsets topic directly, deserializing
the consumer/topic that are being committed to. This is what I was running
to get this information:
*./kafka-console-consumer.sh --bootstrap-server kafka-server:9092 --topic __consumer_offsets --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" *
I thought this was really helpful, and think this tool should be
mentioned/documented, so other people can find it when diagnosing
performance issues on their Kafka clusters.
Anyway, thought I'd post a follow-up to the original question, since it
might help somebody else in the future.
sh kafka-preferred-replica-election.sh --zookeeper zookeeper-1,zookeeper-2,zookeeper-3
kafka-topics.sh --zookeeper zookeeper-1,zookeeper-2,zookeeper-3 --describe --under-replicated-partitions
kafka-topics.sh --zookeeper zookeeper-1,zookeeper-2,zookeeper-3 --describe --unavailable-partitions
bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server kafka-1:9092,kafka-2:9092,kafka-3:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --new-consumer --consumer.config consumer.conf
kafka-consumer-groups.sh --bootstrap-server kafka-1:9092,kafka-2:9092,kafka-3:9092 --list
kafka-consumer-groups.sh --bootstrap-server kafka-1:9092,kafka-2:9092,kafka-3:9092 --describe --group asyncRawWriterConsumerGroup
kafka-consumer-groups.sh --bootstrap-server kafka-1:9092,kafka-2:9092,kafka-3:9092 --describe --group asyncNormalizedWriterConsumerGroup
kafka-consumer-groups.sh --bootstrap-server kafka-1:9092,kafka-2:9092,kafka-3:9092 --describe --group __consumer_offsets
kafka-consumer-groups.sh --bootstrap-server kafka-1:9092,kafka-2:9092,kafka-3:9092 --describe --group dedupeconsumergroup
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=<kafka.broker.hostname> -Djava.net.preferIPv4Stack=true"
[dedupeconsumergroup,daas.dedupe.avrosyslog.incoming,4]::[OffsetMetadata[8646,NO_METADATA],CommitTime 1538115746766,ExpirationTime 1538202146766]
[dedupeconsumergroup,daas.dedupe.avrosyslog.incoming,6]::[OffsetMetadata[8639,NO_METADATA],CommitTime 1538115746766,ExpirationTime 1538202146766]
[phodisvc@cstg-sa-nos-prd-023 bin]$ kafka-topics.sh --zookeeper zookeeper-1,zookeeper-2,zookeeper-3 --describe --under-replicated-partitions
{"partitions":
[{"topic": "__consumer_offset",
"partition": 1,
"replicas": [1,2,4] }],
"version":1
}
kafka-run-class.sh kafka.tools.VerifyConsumerRebalance --group asyncRawWriterConsumerGroup --zookeeper.connect zookeeper-1,zookeeper-2,zookeeper-3
kafka-run-class.sh kafka.tools.VerifyConsumerRebalance --group asyncNormalizedWriterConsumerGroup --zookeeper.connect zookeeper-1,zookeeper-2,zookeeper-3
kafka-run-class.sh kafka.tools.VerifyConsumerRebalance --group dedupeconsumergroup --zookeeper.connect zookeeper-1,zookeeper-2,zookeeper-3
/**
* For verifying the consistency among replicas.
*
* 1. start a fetcher on every broker.
* 2. each fetcher does the following
* 2.1 issues fetch request
* 2.2 puts the fetched result in a shared buffer
* 2.3 waits for all other fetchers to finish step 2.2
* 2.4 one of the fetchers verifies the consistency of fetched
results among replicas
*
* The consistency verification is up to the high watermark. The tool
reports the
* max lag between the verified offset and the high watermark among
all partitions.
*
* If a broker goes down, the verification of the partitions on that
broker is delayed
* until the broker is up again.
*
* Caveats:
* 1. The tools needs all brokers to be up at startup time.
* 2. The tool doesn't handle out of range offsets.
*
kafka-replica-verification.sh --broker-list kafka-1:9092,kafka-2:9092,kafka-3:9092