I was doing some work with a colleague earlier this week which involved connecting to an internal RabbitMQ broker and transforming some messages before forwarding them to our Kafka broker.
Weâre using langohr to connect to RabbitMQ. Its consumer and queue documentation shows how to use the subscribe
function to connect to a broker and print messages that arrive:
The example above is pretty close to what we started working with earlier today. Itâs also quite similar to a lot of other code Iâve written in the past: connect to a broker or service and provide a block/function to be called when something interesting happens.
Sequences, not handlers
Although thereâs nothing wrong with this I think thereâs a nicer way: flip the responsibility so instead of the subscriber pushing to our handler function we consume it through Clojureâs sequence abstraction.
This is the approach I took when I wrote clj-kafka, a Clojure library to interact with LinkedInâs Kafka (as an aside, Kafka is really cool- Iâm planning a blog post on how weâve been building a new data platform for uSwitch.com but itâs well worth checking out).
Hereâs a little example of consuming messages through a sequence thatâs taken from the clj-kafka README:
We create our consumer and access messages through a sequence abstraction by calling messages
 with the topic we wish to consume from.
The advantage of exposing the items through a sequence is that it becomes instantly composable with the many functions that already exist within Clojure: map
, filter
, remove
etc.
In my experience, when writing consumption code that uses handler functions/callbacks Iâve ended up with code that looks like this:
It makes consuming data more complicated and pulls more complexity into the handler function than necessary.
Push to Pull
This is all made possible thanks to a lovely function written by Christophe Grande:
The function returns a vector containing 2 important parts: the sequence, and a function to put things into that sequence.
Returning to our original RabbitMQ example, we can change the subscriber code to use pipe
to return the sequence that accesses the queue of messages:
We can then map
, filter
and more.
We pull responsibility out of the handler function and into the consumption of the sequence. This is really important, and it compliments something else which Iâve recently noticed myself doing more often.
In the handler function above I convert the function parameters to a map containing :payload
, :ch
and :msg-meta
. In our actual application weâre only concerned with reading the message payload and converting it from a JSON string to a Clojure map.
Initially, we started writing something similar to this:
We have a function that exposes the messages through a sequence, but we pass a kind of transformation function as the last argument to subscriber-seq
. This initially felt ok: subscriber-seq
calls our handler and extracts the payload into our desired representation before putting it into the queue that backs the sequence.
But weâre pushing more responsibility into subscriber-seq
than needs to be there.
Weâre just extracting and transforming messages as they appear in the sequence so we can and should be building upon Clojure's existing functions: map and the like. The code below feels much better:
It feels better for a similar reason as moving the handler to a sequence- weâre making our function less complex and encouraging the composition through the many functions that already exist. Line 13 is a great example of this for me- map
âing a composite function to transform the incoming data rather than adding more work into subscriber-seq
.
Pipe
Iâve probably used Christopheâs pipe
function 3 or 4 times this year to take code that started with handler functions and evolved it to deal with sequences. I think itâs a really neat way of making callback-based APIs more elegant.