With a bunch of applications acting as consumers to a Kafka stream it appears to be a Google dark art to find any decent information of what’s going where and doing what. The big question is, where is my application up to in the topic log?
After hours of try, test, rinse, repeat, tea, pulling hair, more tea, Stackoverflow (we all do, get over it) and yet more tea, this dear digital plumber was looking like this….
….but in the male form, less angry and not using tables, just more tea.
The /consumer
node in Zookeeper is a bit of a red herring, your application consumer group ids don’t show up there but ones from the Kafka shell console do. This makes running the ConsumerGroupCommand class a bit of a dead end.
Consumer Offsets Hidden in Plain Sight
It does exist though! It’s right there, looking at you…
$ bin/zookeeper-shell.sh localhost:2181 Connecting to localhost:2181 Welcome to ZooKeeper! JLine support is disabled WATCHER:: WatchedEvent state:SyncConnected type:None path:null ls /brokers/topics [__consumer_offsets, topic-input]
..just not plainly obvious.
Most consumers are basically while loops collecting a number of records and using the poll() method to update the consumer offset of any records not dealt with, basically saying “I’m up to here boss! I didn’t read these though”. It also acts as the initial link with the Kafka Group Coordinator to register a new consumer group. Those consumer groups do not show up where you expect them too.
Finding Out Where the Offset Is
At this point Zookeeper isn’t much help to me, using the Zookeeper shell doesn’t give me much to go on.
get /brokers/topics/__consumer_offsets/partitions/44/state {"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]} cZxid = 0x4e ctime = Sat Mar 04 07:41:28 GMT 2017 mZxid = 0x4e mtime = Sat Mar 04 07:41:28 GMT 2017 pZxid = 0x4e cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 72 numChildren = 0
So I need something else…. and help is at hand. It just takes a little jiggery pokery.
Using The Kafka Consumer Console to Read Offset Information
We can use Kafka’s console tools to read the __consumer_offsets
. First thing to do is create a config file in a temporary directory.
$ echo "exclude.internal.topics=false" > /tmp/consumer.config
Then we can start the console.
$ bin/kafka-console-consumer.sh --consumer.config /tmp/consumer.config --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --zookeeper localhost:2181 --topic __consumer_offsets --from-beginning
Any consumer applications you have running should show up in the offset log. In this example I have two applications running from the same topic (topic-input) on one partition. So I can see from here that my-stream-processing-application
is up to offset 315 in the topic while my-other-processing-application
is further ahead at 504. That could potentially tell us there’s an issue with the first application as it appears to be way behind in the topic.
[my-stream-processing-application,topic-input,0]::[OffsetMetadata[189,NO_METADATA],CommitTime 1488613422905,ExpirationTime 1488699822905] [my-stream-processing-application,topic-input,0]::[OffsetMetadata[252,NO_METADATA],CommitTime 1488613901498,ExpirationTime 1488700301498] [my-stream-processing-application,topic-input,0]::[OffsetMetadata[315,NO_METADATA],CommitTime 1488614472422,ExpirationTime 1488700872422] [my-other-processing-application,topic-input,0]::[OffsetMetadata[378,NO_METADATA],CommitTime 1488614576300,ExpirationTime 1488700976300] [my-other-processing-application,topic-input,0]::[OffsetMetadata[441,NO_METADATA],CommitTime 1488614606314,ExpirationTime 1488701006314] [my-other-processing-application,topic-input,0]::[OffsetMetadata[504,NO_METADATA],CommitTime 1488615237410,ExpirationTime 1488701637410]
The frustration at this point is that it’s hard to know the total number of records in the log, we know where the offset is up to but not the total and what the lag is.
The search continues…..