Producing and Consuming Avro Messages over Kafka in Scala
Apache Kafka is an amazing tool for logging/streaming data at scale. In this tutorial we will create a system written in Scala that uses Kafka to produce and consume messages. We will also use Avro; a system for defining schemas and serializing data. Using Avro schemas helps when modelling our Kafka messages and ensures the data our producers generate remains consistent. Avro also has the benefit of allowing us to easily persist messages to databases or other storage using connectors such as the JDBC Sink Connector.
Installing the Confluent Platform
To get started first download and install the confluent platform. You can start it by running
confluent start
In a terminal. You can verify everything is up and running using
confluent status
which should show
connect is [UP]kafka-rest is [UP]schema-registry is [UP]kafka is [UP]zookeeper is [UP]
We’ll also be using sbt to build our project. You can download sbt here.
Creating the Scala Project
First, let’s define a build file for sbt with a root project and two subprojects; one for producing messages and one for consuming messages.
There are two important things to note in our build file. The first is that we are using the sbt-avrohugger plugin to automatically create scala classes from our avro schema definitions. The second is that our exampleSubscriber project is going to depend on our examplePublisher project in order to use the automatically generated Scala class.
The build file also imports dependencies from an object called Deps, so let’s create that in our root project:
We also need to create a plugins.sbt file that defines the plugins we’re using
Defining and Generating the Avro Schema
Now let’s define the Avro schema for our project in the file example_publisher/src/main/avro/example.avdl
@namespace ("com.example.examplepub.event")
protocol ExampleEvent {
record ExampleRecord {
@logicalType("timestamp-millis")
long timestamp;
string msg;
}
}
Our schema contains a message stored in a string, and a timestamp. Next we can generate a Scala class from this schema using the avroScalaGenerate sbt task:
~$ sbt examplePublisher/avroScalaGenerate
Writing the Consumer
Now that the class is generated we can use it in our publisher’s main class located at example_publisher/src/main/java/com/example/examplepub/Main.scala
The publisher reads the kafka url and schema registry url from the command line using the scallop library. We set our key serializer to be a StringSerializer and use the KafkaAvroSerializer for the value serializer. We can run the publisher using
sbt examplePublisher/run
which will publish 5 messages to Kafka.
Writing the Consumer
Now let’s write our subscriber’s main class located at example_subscriber/src/main/scala/example/examplesub/Main.scala
The consumer polls for messages every two seconds. It is also important to note that the specific.avro.reader property must be set to true for the received record to be automatically cast as an ExampleRecord, and we need to set the group.id. We also define a listener on a different thread that will close the consumer when the user presses ctrl-c.
Running the Producer/Consumer
Now we’re ready to test our producer and consumer. Open two terminal tabs and in the first run
sbt examplePublisher/run
To publish some messages:
and run the subscriber in the other terminal:
sbt exampleSubscriber/run
You should see the 5 messages from the producer print on the subscriber’s console:
I hope this tutorial has shown you how simple it can be to write Kafka producers/consumers in Scala. Also, as mentioned before, you can use connectors to persist messages to databases and other systems without having to writing custom consumers. You can view the full project for this tutorial on github.
Get in Contact
I am also a consultant who specializes in Apache Kafka, Machine Learning and AI. I would be glad to help you! You can find my contact info at https://lukasgrasse.com