Using onyx-template to craft a Kafka Streaming application. Part 2. #clojure #onyx #kafka #streaming #data

high-res-logo

The Story So Far….. And Beyond

In Part 1 I covered that basic setting up of the Onyx platform configuration, starting Zookeeper and deploying the peer and a basic job. In this post I’m going to plug in the Kafka components into the code base and setup a three broker cluster on a local machine. Then we’ll kick everything off and send some messages for processing.

Code Amendments To Our Application.

Before we dive into code a little bit of planning needs to be thought about. First of all how many peers we need to run. At present my workflow is pretty simple:

:in :inc :out

A core.async channel for the input, a function to increment the value passed in and then an output channel (again via core.async). I’m going to modify this so we have a Kafka topic being read (though the :in channel), I’ll amend the math.clj code for ease of understanding so it just shows to deserialised message as a Clojure map. The output channel I’ll leave as is.

My Kafka cluster has three brokers and one partition. If a broker dies then another will be elected the lead and the data stream doesn’t suffer. Onyx operates on a one peer per partition, if you allocate too many peers against it then it will throw errors so I’m making that clear now.

One peer per partition.

Let’s make the code changes.

Changing the Project Dependencies

First of all let’s add the Kafka plugin to the project.clj file.

[org.onyxplatform/onyx-kafka-0.8 "0.9.9.1-SNAPSHOT"]
[cheshire "5.5.0"]

There are separate plugins for Kafka 0.8 and 0.9. As my cluster is 0.8.2.2 then I’m going for the 0.8 plugin.

Changing the Basic Job Code

Second let’s make changes to the basic job. Job one is to add the dependencies to my :require line.

[onyx.plugin.kafka :as kafka]
 [onyx.tasks.kafka :as kafka-task]

Next in the basic-job function I need to change the base job configuration. The input channel. :in, is now a onyx.tasks.kafka/consumer and will read the topic stream that we give it.

(-> base-job
 (add-task (kafka-task/consumer :in kafka-opts))
 (add-task (math/process-kafka :inc batch-settings))
 (add-task (core-async-task/output :out batch-settings)))

I need to beef up the settings for the Kafka channel so I’m adding kafka-opts to the let statement.

{:onyx/name :in
 :onyx/plugin :onyx.plugin.kafka/read-messages
 :onyx/type :input
 :onyx/medium :kafka
 :kafka/topic "my-message-stream"
 :kafka/group-id "onyx-consumer"
 :kafka/fetch-size 307200
 :kafka/chan-capacity 1000
 :kafka/zookeeper "127.0.0.1:2181"
 :kafka/offset-reset :smallest
 :kafka/force-reset? true
 :kafka/empty-read-back-off 500
 :kafka/commit-interval 500
 :kafka/deserializer-fn :testapp.shared/deserialize-message-json
 :kafka/wrap-with-metadata? false
 :onyx/min-peers 1
 :onyx/max-peers 1
 :onyx/batch-size 100
 :onyx/doc "Reads messages from a Kafka topic"}

I’m passing in the topic, consumer group and peer size information. I also need to add a deserialiser function to convert the message stream to a map for me.

Adding a Deserialiser

(I’m English so I’m using “s” instead of “z”….) 🙂

I’ve borrowed this quick Clojure code from one of Mastodon C’s repos, it works and I’m happy with it so I reuse it for safety.

(ns testapp.shared
 (:require [taoensso.timbre :as timbre]
 [cheshire.core :as json]))

(defn deserialize-message-json [bytes]
 (let [as-string (String. bytes "UTF-8")]
 (try
 (json/parse-string as-string true)
 (catch Exception e
 {:parse_error e :original as-string}))))

(defn serialize-message-json [segment]
 (.getBytes (json/generate-string segment)))

(def logger (agent nil))

(defn log-batch [event lifecycle]
 (let [task-name (:onyx/name (:onyx.core/task-map event))]
 (doseq [m (map :message (mapcat :leaves (:tree (:onyx.core/results event))))]
 (send logger (fn [_] (timbre/debug task-name " segment: " m)))))
 {})

(def log-calls
 {:lifecycle/after-batch log-batch})

So for every message stream that Onyx reads it will pass through the deserialiser (as we configured in the :in workflow. All that’s left to do is change the original math.clj file to print the map to the console.

Amending The Process Function

Let’s keep this really simple. It just prints the deserialised map to the console.

(ns testapp.tasks.math
 (:require [schema.core :as s]))

(defn get-data [fn-data]
 (println fn-data))

(s/defn process-kafka
 ([task-name :- s/Keyword task-opts]
 {:task {:task-map (merge {:onyx/name task-name
 :onyx/type :function
 :onyx/fn ::get-data}
 task-opts)}}))

I’m reusing the original code from the template, obviously in any other scenario you’d be tidying up naming as you went along.

So that’s the code taken care of. Added the Kafka dependency, changed the job spec around a bit, added a deserialiser for the messages and amended the processing function.

Use leiningen to clean and create an uberjar.

$ lein clean ; lein uberjar
Compiling lib-onyx.media-driver
Compiling testapp.core
Compiling lib-onyx.media-driver
Compiling testapp.core
Created target/testapp-0.1.0-SNAPSHOT.jar
Created target/peer.jar

All done. Now to setup the Kafka cluster.

Setting Up a Three Node Kafka Cluster

Kafka runs nicely as single node cluster but I want to use three brokers to give it some real world exposure. The config directory has a file called server.properties. I’m going to copy that for the other two brokers I want.

$ cp server.properties server1.properties
$ cp server.properties server2.properties

There are three things to change in each of the new properties file.

Setting server1.properties server2.properties
broker.id 1 2
port 9093 9094
log.dirs /tmp/kafka-logs1 /tmp/kafka-logs2

You can leave the original server.properties file alone, it will still use port 9092 as default.

Now For The Big Test

So this is the order we’re going to run in:

  • Start Zookeeper
  • Start Kafka Broker 0
  • Start Kafka Broker 1
  • Start Kafka Broker 2
  • Add a new topic
  • Start the Onyx Peers
  • Tail the onyx.log file
  • Submit the job.
  • Send some messages to the Kafka topic.

Assuming you’ve been using Zookeeper before it’s worth clearing out the structure as we’re only testing things.

$ rm -rf /tmp/zookeeper
$ rm -rf /tmp/kafka-logs*

Starting Zookeeper

With a clean sheet we can start Zookeeper:

$KAFKA_HOME/bin/zookeeper-server-start.sh config/zookeeper.properties

Give it a second or two to start up and the open another terminal window.

Start Kafka Broker 0

The first Kafka broker will act as the leader.

$KAFKA_HOME/bin/kafka-server-start.sh config/server.properties

[2016-08-03 18:03:59,792] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2016-08-03 18:03:59,844] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

Start Kafka Broker 1

As a JMX_PORT is already set from broker 0 you will need to specify a new port number for Broker 1.

JMX_PORT=9997 $KAFKA_HOME/bin/kafka-server-start.sh config/server1.properties

[2016-08-03 18:04:29,616] INFO Registered broker 1 at path /brokers/ids/1 with address 192.168.1.91:9093. (kafka.utils.ZkUtils$)
[2016-08-03 18:04:29,631] INFO [Kafka Server 1], started (kafka.server.KafkaServer)

Start Kafka Broker 2

As Brokers 0 and 1 have their own JMX ports, one again you’ll need to specify a new one for broker 2.

JMX_PORT=9998 $KAFKA_HOME/bin/kafka-server-start.sh config/server2.properties

[2016-08-03 18:04:51,572] INFO Registered broker 2 at path /brokers/ids/2 with address 192.168.1.91:9094. (kafka.utils.ZkUtils$)
[2016-08-03 18:04:51,586] INFO [Kafka Server 2], started (kafka.server.KafkaServer)

Create a new topic

While Kafka can create new topics on the fly when messages are sent to them Onyx doesn’t always behave when the job is submitted but the topic isn’t there. So it’s safer to create the topic ahead of time.

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic my-message-stream --replication-factor 3 --partitions 1

Created topic "my-message-stream".

Notice the partition count is 1, that needs to match the number of peers on the :in channel of the Kafka reader.

Start the Onyx Peers

We’ve already create the uberjar so it’s just a case of firing up the peers.

$ java -cp target/peer.jar testapp.core start-peers 3 -c resources/config.edn -p :default
Starting peer-group
Starting env
Starting peers
Attempting to connect to Zookeeper @ 127.0.0.1:2181
Started peers. Blocking forever.

Now create a new terminal window so we can tail the logs.

Tail the onyx.log File

A lot of the output is dumped in the onyx.log file so it’s worth tailing it for errors and info. If the peer count is incorrect then it’ll show up here.

$ tail -f onyx.log
16-Aug-03 18:08:25 mini.local INFO [onyx.log.zookeeper] - Starting ZooKeeper client connection. If Onyx hangs here it may indicate a difficulty connecting to ZooKeeper.
16-Aug-03 18:08:25 jmini.local INFO [onyx.static.logging-configuration] - Starting Logging Configuration
16-Aug-03 18:08:25 mini.local INFO [onyx.log.zookeeper] - Starting ZooKeeper client connection. If Onyx hangs here it may indicate a difficulty connecting to ZooKeeper.
16-Aug-03 18:08:27 mini.local INFO [onyx.peer.communicator] - Starting Log Writer
16-Aug-03 18:08:27 mini.local INFO [onyx.peer.communicator] - Starting Replica Subscription
16-Aug-03 18:08:27 mini.local INFO [onyx.static.logging-configuration] - Starting Logging Configuration
16-Aug-03 18:08:27 mini.local INFO [onyx.messaging.acking-daemon] - Starting Acking Daemon
16-Aug-03 18:08:27 mini.local INFO [onyx.messaging.aeron] - Starting Aeron Messenger
16-Aug-03 18:08:27 mini.local INFO [onyx.peer.virtual-peer] - Starting Virtual Peer ec2d6a86-d221-4e98-b9c4-e01e845486cc
16-Aug-03 18:08:27 mini.local INFO [onyx.messaging.acking-daemon] - Starting Acking Daemon
16-Aug-03 18:08:27 mini.local INFO [onyx.messaging.aeron] - Starting Aeron Messenger
16-Aug-03 18:08:27 mini.local INFO [onyx.peer.virtual-peer] - Starting Virtual Peer 4d1b1c70-8d1c-4229-843c-42ef9a16e432
16-Aug-03 18:08:27 mini.local INFO [onyx.messaging.acking-daemon] - Starting Acking Daemon
16-Aug-03 18:08:27 mini.local INFO [onyx.messaging.aeron] - Starting Aeron Messenger
16-Aug-03 18:08:27 mini.local INFO [onyx.peer.virtual-peer] - Starting Virtual Peer 44be7413-dbce-46b5-a104-f161473f5bd6

Submit The Job

Submit the job, remember the job name has now changed from basic-job to kafka-job.

$ java -cp target/peer.jar testapp.core submit-job "kafka-job" -c resources/config.edn -p :default
16-Aug-03 18:11:40 jasebellmacmini.local INFO [onyx.log.zookeeper] - Starting ZooKeeper client connection. If Onyx hangs here it may indicate a difficulty connecting to ZooKeeper.
16-Aug-03 18:11:41 jasebellmacmini.local INFO [onyx.log.zookeeper] - Stopping ZooKeeper client connection
Successfully submitted job: #uuid "c4997d0d-a75f-4d26-9819-41782a50fcae"
Blocking on job completion...

Once deployed look at the onyx.log file again. You should see the Kafka offset position being reported.

16-Aug-03 18:11:43 mini.local INFO [onyx.plugin.kafka] - Kafka consumer is starting at offset 0

We’re ready to send some messages.

Send Some Messages to the Kafka Topic

There are some useful scripts in the Kafka bin directory, one includes a shell for writing messages to a topic.

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic my-message-stream

The broker list is comprised of all our running brokers. When connected start pumping out some messages:

[2016-08-03 18:15:01,904] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
{"firstname":"Jase", "lastname":"Bell", "github":"https://github.com/jasebell"}
{"firstname":"Jase", "lastname":"Bell", "github":"https://github.com/jasebell"}
{"firstname":"Jase", "lastname":"Bell", "github":"https://github.com/jasebell"}
{"firstname":"Jase", "lastname":"Bell", "github":"https://github.com/jasebell"}
{"firstname":"Jase", "lastname":"Bell", "github":"https://github.com/jasebell"}
{"firstname":"Jase", "lastname":"Bell", "github":"https://github.com/jasebell"}
{"firstname":"Jase", "lastname":"Bell", "github":"https://github.com/jasebell"}
{"firstname":"Jase", "lastname":"Bell", "github":"https://github.com/jasebell"}
{"firstname":"Jase", "lastname":"Bell", "github":"https://github.com/jasebell"}

Now look at the terminal window where the peer is running, you should see the deserialised output.

Attempting to connect to Zookeeper @ 127.0.0.1:2181
Started peers. Blocking forever.
{:firstname Jase, :lastname Bell, :github https://github.com/jasebell}
{:firstname Jase, :lastname Bell, :github https://github.com/jasebell}
{:firstname Jase, :lastname Bell, :github https://github.com/jasebell}
{:firstname Jase, :lastname Bell, :github https://github.com/jasebell}
{:firstname Jase, :lastname Bell, :github https://github.com/jasebell}
{:firstname Jase, :lastname Bell, :github https://github.com/jasebell}
{:firstname Jase, :lastname Bell, :github https://github.com/jasebell}
{:firstname Jase, :lastname Bell, :github https://github.com/jasebell}
{:firstname Jase, :lastname Bell, :github https://github.com/jasebell}

Our work here is done.

I’m Going To Park That There….

Okay that was a long gig for anyone. We’ve covered integrating the Onyx Kafka plugin to a project, amending the code to allow it to consume Kafka messages from a topic and deserialise the to a Clojure map.

Next time, when I get to it. I’ll do something funky with the map and give it some real world use case.

A cup of tea is in order….

1882b8cc909570b25ea861feaf8ec2dd

 

 

 

 

4 responses to “Using onyx-template to craft a Kafka Streaming application. Part 2. #clojure #onyx #kafka #streaming #data”

  1. Thanks for the great post. I found some inconsistencies that made it a bit difficult to follow.

    * “base-job” wasn’t actually renamed to “kafka-job” anywhere
    * where and how to add “kafka-opts” could be more explicit
    * if the project is generated under a different name instead of “testapp”, it breaks when starting peers, I’m still puzzled

  2. Heads up to anyone following this in 2019 — the kafka-opts you need to pass to the kafka-task/consumer have changed over time as some were deprecated.

    Other than that, I was able to get everything in this tutorial working on Onyx 0.14.5 after I used the below options.

    {:onyx/name :in
    :onyx/doc “Reads messages from a Kafka topic”
    :onyx/plugin :onyx.plugin.kafka/read-messages
    :onyx/type :input
    :onyx/medium :kafka
    :onyx/min-peers 1
    :onyx/max-peers 1
    :onyx/batch-size 100
    :onyx/batch-timeout 500
    :kafka/topic “my-message-stream”
    :kafka/group-id “onyx-consumer”
    :kafka/zookeeper “127.0.0.1:2181”
    :kafka/deserializer-fn :northstar-zonerunner.shared/deserialize-message-json
    :kafka/wrap-with-metadata? false
    :kafka/receive-buffer-bytes 65536
    :kafka/offset-reset :earliest}

    Other than that, a tremendous thanks to Jason Bell. I’ve been struggling to have my own “penny-drop” moments while learning Onyx, and this tutorial was an irreplaceable resource to get me up and running.

  3. Extra comment — make sure you edit the :kafka/deserializer-fn above to the name of your own template, as I forgot to change it to “myapp.”

    In case the options change again in the future, you may find it useful to inspect the Onyx information model and the Kafka-Onyx information model to see what the new ones are.

    https://github.com/onyx-platform/onyx/blob/0.14.x/src/onyx/information_model.cljc
    https://github.com/onyx-platform/onyx-kafka/blob/0.14.x/src/onyx/kafka/information_model.cljc

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: