News

Keechma author Mihael Konjević on

Introducing Keechma Toolbox (Part 1) - Pipelines

In this blog post I want to introduce the Keechma Toolbox Library - a set of tools I've been using while developing Keechma apps. While Keechma the framework is pretty agnostic when it comes to implementation, the Toolbox lib is heavily opinionated and contains code that I'm using every day.

There is a possiblity that some of the stuff will end up in the separate packages, but right now it's all together because it's easier to develop and manage.

Today I'll talk about one of the sub-libraries - controller pipelines.

The Problem

When I was developing Keechma, I was very careful to leave the controllers open ended. Controllers are the connection point between your domain code and the UI so I didn't want to build too much structure or restrictions in them, I wanted you to be able to implement any features you want.

When you implement the handler function, you get access to the app-db atom, and two channels - in-chan, which is used to receive the messages, and out-chan, which is used for communication with other controllers. This flexibility is great, but the resulting code is not something I would call pretty:

(defn update! [app-db-atom updater]
    (reset! app-db-atom (updater @app-db-atom)))

(defn load-restaurant [app-db-atom slug]
  ;; Before making the request for the restaurant save the empty item with
  ;; the meta defined - {:is-loading? true}
  (update! app-db-atom
           #(edb/insert-named-item % :restaurants :current {} {:is-loading? true}))
  (go
    ;; Load the restaurant and save it in the entity-db
    (let [req (<! (http/get (str "/restaurants/" slug)))
          meta {:is-loading? false}
          [success data] (unpack-req req)]
      (update! app-db-atom
#(edb/insert-named-item % :restaurants :current data meta)))))

example command handler from the "Place My Order" app

There is a lot of stuff happening in this function:

  1. We mark the current restaurant as loading (by storing {:is-loading? true} as it's metadata)
  2. We make an AJAX request to load the restaurant data
  3. We extract the data from the response (unpack-req function)
  4. We finally store the data in the app-db

* There is one thing missing in this function - error handling.

This function is pretty short, but it suffers from a serious problem: it complects three different types of functions into one opaque blob:

  1. Side-effect functions - functions that mutate the app-db
  2. Pure functions - like unpack-req
  3. Async functions that communicate with the API

I would also add that the code is not exactly clear in it's intent. A lot of things happen at once, some of which implicitly. That makes it hard to understand.

Let's rewrite this code to use pipelines (and pretend that the http/get function is returning a promise instead of a channel):

(pipeline! [value app-db]
    (pp/commit! (edb/insert-named-item app-db :restaurants :current {} {:is-loading? true}))
    (http/get (str "/restaurants/" value))
    (unpack-req value)
    (pp/commit! (edb/insert-named-item app-db :restaurants :current (last value) {:is-loading? false}))
    (rescue! [error]
        (pp/commit! (edb-insert-named-item app-db :restaurants :current {} {:error error}))

While this code complects all of the steps, it looks much nicer:

  1. You can easily follow the flow between steps
  2. You don't have to care about the async functions, it's handled for you automatically
  3. Side-effect functions are clearly marked (pp/commit!)
  4. Error handling is easy to implement
  5. Each of the functions in the pipeline is doing only one thing, which makes the intent clear

Now that you've seen the final result, let me explain how pipelines work.

Pipelines

I implemented pipelines out of frustration. I was writing the same boilerplate code over and over again. Most of the controller actions have the same form:

  1. Mark some item or collection as loading
  2. Make a request to the API to load the data
  3. Extract the data from the response
  4. Put the data into the app-db
  5. Handle any errors that might have happened

So, just to load an item from the server we have to update the app-db two times, handle an async operation and handle any potential errors. Over and over again. This kind of code is hard to extract and generalize so I had a bunch of similar functions littered in the code base.

Pipelines allow me to write this code in a clear, declarative fashion and they are very easy to use when you understand how they work.

The implementation

  1. Pipelines are built from a list of functions
  2. Each function can be either a side-effect or a processor function
  3. value is bound to the command arguments or the return value of the previous processor function
  4. app-db value is always bound to the current state of the app-db atom
  5. Side-effects can't affect the value - their return value is ignored
  6. If a processing function returns a promise, pipeline will wait until that promise is resolved or rejected before proceeding to the next function
  7. If a processing function returns nil the value argument will be bound to the previously returned value
  8. Any exception or promise rejection will cause the pipeline to jump to the rescue! block

I'm sure you're interested in how all of this works. Let's take a look at the pipeline code again:

(pipeline! [value app-db]
    (pp/commit! (edb/insert-named-item app-db :restaurants :current {} {:is-loading? true}))
    (http/get (str "/restaurants/" value))
    (unpack-req value)
    (pp/commit! (edb/insert-named-item app-db :restaurants :current (last value) {:is-loading? false}))
    (rescue! [error]
        (pp/commit! (edb-insert-named-item app-db :restaurants :current {} {:error error}))

pipeline! is a macro and it transforms it's body block to something that looks like this:

 {:begin [(fn [value app-db])
                 (pp/commit! (edb/insert-named-item app-db :restaurants :current {} {:is-loading? true}))
             (fn [value app-db])
                 (http/get (str "/restaurants/" value)))
             (fn [value app-db])
                 (unpack-req value))
             (fn [value app-db]
                 (pp/commit! (edb/insert-named-item app-db :restaurants :current (last value) {:is-loading? false})))]
  :rescue [(fn [value app-db error]
                  (pp/commit! (edb-insert-named-item app-db :restaurants :current {} {:error error})))}

This code will be passed to the pipeline runner which knows how to handle this structure (if you're interested in how it works - check out the code).

Pipelines use the great Promesa library to handle promises, and each pipeline returns a promise.

Branching

If you have a pipeline that needs to implement some kind of branching (maybe you want to load the item only if it's not loaded yet) you can nest the pipelines:

;; on the pipeline start `value` will hold whatever was passed to the command as the argument
(pipeline! [value app-db]
    (when (nil? (edb/get-named-item app-db :restaurants :current))
        (pipeline! [value app-db]
            (pp/commit! (edb/insert-named-item app-db :restaurants :current {} {:is-loading? true}))
            (http/get (str "/restaurants/" value))
            (unpack-req value)
            (pp/commit! (edb/insert-named-item app-db :restaurants :current (last value) {:is-loading? false})))
            (rescue! [error]
                (pp/commit! (edb-insert-named-item app-db :restaurants :current {} {:error error})))

Pipelines allow you to implement features that require a series of steps to run in succession without forcing you to play the event ping pong.

Pipeline Controller

To actually run the pipelines, you must use the pipeline controller which is also a part of the Keechma Toolbox library, so the full example would look like this:

(ns pipelines.example
    (:require [keechma.toolbox.pipeline.core :as pp :refer-macros [pipeline!]
                [keechma.toolbox.pipeline.controller :as controller]))

(def controller
    (controller/constructor
        (fn [_] true) ;; this is controller's `params` function
        {:load-restaurant ;; pipeline key is the command it responds to
            (pipeline! [value app-db]
                (when (nil? (edb/get-named-item app-db :restaurants :current))
                    (pipeline! [value app-db]
                        (pp/commit! (edb/insert-named-item app-db :restaurants :current {} {:is-loading? true}))
                        (http/get (str "/restaurants/" value))
                        (unpack-req value)
                        (pp/commit! (edb/insert-named-item app-db :restaurants :current (last value) {:is-loading? false})))
                        (rescue! [error]
                            (pp/commit! (edb-insert-named-item app-db :restaurants :current {} {:error error})))}))

Examples

Async Notifications

While I was doing research for pipelines, I wanted to see what other approaches exist. In that research I've encountered this thread on Stack Overflow which explains how to implement a notification system with Redux.

The idea is to have the notification appear and then automatically disappear after five seconds.

This is one of my favorite pipeline examples, because it's super simple but it still demonstrates the elegance of pipelines:

(ns pipelines.example
    (:require [keechma.toolbox.pipeline.core :as pp :refer-macros [pipeline!]
                [keechma.toolbox.pipeline.controller :as controller]
                [promesa.core :as p))

(defn delay-pipeline [msec]
    (p/promise (fn [resolve _] (js/setTimeout resolve msec))))

(def controller
    (controller/constructor
        (fn [_] true) ;; this is controller's `params` function
        {:show-notice ;; pipeline key is the command it responds to
            (pipeline! [value app-db]
                (pp/commit! (assoc app-db :notice value)) ;; store the notice in the app-db
                (delay-pipeline 5000) wait 5 seconds
                (pp/commit! (dissoc app-db :notice)))}))

That's it - clear, simple and obvious.

Live search

As I've mentioned before, pipelines return a promise. This allows them to be cancelled at any time (Promesa is using the Bluebird library which implements promise cancellation). In the next example, we'll take advantage of this property.

The task is to implement a live search - on each keypress call the command that will perform the search, wait 300 milliseconds and kick off the search request, if the command wasn't called again in the meantime. You've probably used a feature like this many times.

The problem with this feature is that you have to make sure that you don't have race conditions. If the search request started, and then the user enters another letter, you want to cancel that request and kick off a new one. Otherwise the first request could finish after the second one in which case you would show the wrong results to the user.

Pipelines have the exclusive function to help with cases like this. You wrap the pipeline with it, and then it will ensure that only one pipeline is running at the time, and if it's called again while the pipeline is running, it will cancel the current pipeline which will also cancel the AJAX request. Keechma toolbox comes with a thin wrapper around the cljs-ajax library which wraps the AJAX request functions with promises and implements request cancellation.

The final code looks like this

(ns pipelines.example
    (:require [keechma.toolbox.pipeline.core :as pp :refer-macros [pipeline!]
                [keechma.toolbox.pipeline.controller :as controller]
                [keechma.toolbox.ajax :refer [GET]
                [promesa.core :as p]))

(defn delay-pipeline [msec]
    (p/promise (fn [resolve _] (js/setTimeout resolve msec))))

(defn movie-search [value]
    (GET (str "api/url?search=" value)))

(def search-controller
  (pp-controller/constructor
   (fn [] true)
   {:search (pp/exclusive
             (pipeline! [value app-db]
               (when-not (empty? value)
                   (pipeline! [value app-db]
                  (delay-pipeline 300)
                  (movie-search value)
                  (println "SEARCH RESULTS:" value)))))}))

This is the workflow:

  1. Make sure that the value is not empty
  2. Wait 300 milliseconds
  3. Make the request
  4. Print the results

There you have it, a live search implementation in ~20 lines of code.

Conclusion

Pipelines are one of my favorite parts of the Keechma toolbox library. I've been using them for months and I think that 90% - 95% of my controller code is in pipelines. They make my code clearer and easier to understand, and I hope you'll find them as useful as I do. Please let me know if you have any feedback.

We're working hard on the v1 release, and if you want to keep track of Keechma news and releases, subscribe to our newsletter: