Producing and Consuming Avro Messages over Kafka in Scala

Lukas Grasse
3 min readApr 3, 2018

--

Apache Kafka is an amazing tool for logging/streaming data at scale. In this tutorial we will create a system written in Scala that uses Kafka to produce and consume messages. We will also use Avro; a system for defining schemas and serializing data. Using Avro schemas helps when modelling our Kafka messages and ensures the data our producers generate remains consistent. Avro also has the benefit of allowing us to easily persist messages to databases or other storage using connectors such as the JDBC Sink Connector.

Installing the Confluent Platform

To get started first download and install the confluent platform. You can start it by running

confluent start

In a terminal. You can verify everything is up and running using

confluent status

which should show

connect is [UP]kafka-rest is [UP]schema-registry is [UP]kafka is [UP]zookeeper is [UP]

We’ll also be using sbt to build our project. You can download sbt here.

Creating the Scala Project

First, let’s define a build file for sbt with a root project and two subprojects; one for producing messages and one for consuming messages.

There are two important things to note in our build file. The first is that we are using the sbt-avrohugger plugin to automatically create scala classes from our avro schema definitions. The second is that our exampleSubscriber project is going to depend on our examplePublisher project in order to use the automatically generated Scala class.

The build file also imports dependencies from an object called Deps, so let’s create that in our root project:

We also need to create a plugins.sbt file that defines the plugins we’re using

Defining and Generating the Avro Schema

Now let’s define the Avro schema for our project in the file example_publisher/src/main/avro/example.avdl

@namespace ("com.example.examplepub.event")
protocol ExampleEvent {
record ExampleRecord {
@logicalType("timestamp-millis")
long timestamp;
string msg;
}
}

Our schema contains a message stored in a string, and a timestamp. Next we can generate a Scala class from this schema using the avroScalaGenerate sbt task:

~$ sbt examplePublisher/avroScalaGenerate

Writing the Consumer

Now that the class is generated we can use it in our publisher’s main class located at example_publisher/src/main/java/com/example/examplepub/Main.scala

The publisher reads the kafka url and schema registry url from the command line using the scallop library. We set our key serializer to be a StringSerializer and use the KafkaAvroSerializer for the value serializer. We can run the publisher using

sbt examplePublisher/run

which will publish 5 messages to Kafka.

Writing the Consumer

Now let’s write our subscriber’s main class located at example_subscriber/src/main/scala/example/examplesub/Main.scala

The consumer polls for messages every two seconds. It is also important to note that the specific.avro.reader property must be set to true for the received record to be automatically cast as an ExampleRecord, and we need to set the group.id. We also define a listener on a different thread that will close the consumer when the user presses ctrl-c.

Running the Producer/Consumer

Now we’re ready to test our producer and consumer. Open two terminal tabs and in the first run

sbt examplePublisher/run

To publish some messages:

and run the subscriber in the other terminal:

sbt exampleSubscriber/run

You should see the 5 messages from the producer print on the subscriber’s console:

I hope this tutorial has shown you how simple it can be to write Kafka producers/consumers in Scala. Also, as mentioned before, you can use connectors to persist messages to databases and other systems without having to writing custom consumers. You can view the full project for this tutorial on github.

Get in Contact

I am also a consultant who specializes in Apache Kafka, Machine Learning and AI. I would be glad to help you! You can find my contact info at https://lukasgrasse.com

--

--

Lukas Grasse
Lukas Grasse

Written by Lukas Grasse

CTO and Co-Founder of Reverb Robotics Inc | Machine Learning and AI Consultant | Ph.D. Student in Neuroscience @ U of L.