Getting clj-kafka consumer example to work. #clojure #kafka #streaming #data

As a data engineer and a software developer I’m spending a lot of time working out things over a number of different technologies. It might be Spark, Sparkling and S3 one day and Cassandra, Clojure and ElasticSearch the next.

Historically I’ve been a huge advocate of RabbitMQ, I still am, but more and more I’m using Kafka. The original reason for not wanting to use Kafka wasn’t Kafka, it was Zookeeper, something else to monitor. Ultimately you may find yourself knowing 90% of one thing well and not so much about the underlying tools and services. Change that.

Make Data Streams Your Friend

Get used to them, they don’t have to real time, in fact any startup that says they’re “doing real time as it’s gotta be realtime”, well 99 times out of 100 that’s usually not the case. So give them the standard Andrew Bolster gif of the century….

giphy

I’d say that 99% of my work is working with high volume streams (and some low volume ones too).

Anyways, I digress.

Okay So I Was Wrong About Kafka

It’s glorious, really flipping glorious. While I can’t go into detail about how I’m using it, blimey it’s glorious. I did wake up the morning still feeling I’d not got enough background so I cracked open the note book and went hunting for information…. seven pages of notes later I’m in a much happier place.

20160730_202853

In Clojure we use clj-kafka.

The clj-kafka wrapper is for Kafka 0.8.x (don’t attempt to use 0.9.x it will not work).

There is however one wee little error in the introduction documentation. The consumer call doesn’t work it merely returns an empty sequence. To get the consumer working you need a doall in you with-resource call.

 kafka-testing.core> (def config {"zookeeper.connect" "localhost:2181"
 "group.id" "clj-test" 
 "auto.offset.reset" "smallest"
 "auto.commit.enable" "false"})
;; => #'kafka-testing.core/config
kafka-testing.core> (use 'clj-kafka.consumer.zk)
kafka-testing.core> (use 'clj-kafka.core)
kafka-testing.core> (with-resource [c (consumer config)]
                      shutdown
                        (doall (take 2 (messages c "test"))))
;; => ({:topic "test",
 :offset 0,
 :partition 0,
 :key nil,
 :value
 [116, 104, 105, 115, 32, 105, 115, 32, 109, 121, 32, 109, 101, 115, 115, 97, 103, 101, 32, 49]}
 {:topic "test",
 :offset 1,
 :partition 0,
 :key nil,
 :value
 [116, 104, 105, 115, 32, 105, 115, 32, 109, 121, 32, 109, 101, 115, 115, 97, 103, 101, 32, 50]})

There a small note of help if anyone gets stuck with it. Next time, Kafka with the Onyx platform.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: