Error handling
Back in Chapter 2, A Look at Reactive Extensions, we learned how Reactive Extensions treats errors and exceptions. It provides a rich set of combinators to deal with exceptional cases and is straightforward to use.
Despite being a pleasure to work with, core.async doesn't ship with much support for exception handling. In fact, if we write our code with only the happy path in mind, we don't even know an error occurred!
Let's have a look at an example:
(defn get-data [] (throw (Exception. "Bad things happen!"))) (defn process [] (let [result (chan)] ;; do some processing... (go (>! result (get-data))) result))
In the preceding snippet, we introduced two functions:
- get-data simulates a function that fetches data from the network or an in-memory cache. In this case, it simply throws an exception.
- process is a function that depends on get-data to do something interesting and puts the result into a channel, which is returned at the end.
Let's watch what happens when we put this together:
(go (let [result (<! (->> (process) (map> #(* % %)) (map> #(prn %))))] (prn "result is: " result)))
Nothing happens. Zero, zip, zilch, nada.
This is precisely the problem with error handling in core.async: by default, our exceptions are swallowed by the go block as it runs on a separate thread. We are left in this state where we don't really know what happened.
Not all is lost, however. David Nolen outlined a pattern for dealing with such asynchronous exceptions on his blog[3]. It only requires a few extra lines of code.
We start by defining a helper function and macro—this would probably live in a utility namespace that we would require anywhere we use core.async:
(defn throw-err [e] (when (instance? Throwable e) (throw e)) e) (defmacro <? [ch] `(throw-err (async/<! ~ch)))
The throw-err function receives a value and, if it's a subclass of Throwable, it is thrown. Otherwise, it is simply returned.
The macro <? is essentially a drop-in replacement for <!. In fact, it uses <! to get the value out of the channel, but passes it to throw-err first.
With these utilities in place, we need to make a couple of changes. First, we are going to make some changes to our process function:
(defn process [] (let [result (chan)] ;; do some processing... (go (>! result (try (get-data) (catch Exception e e)))) result))
The only change is that we wrapped get-data in a try/catch block. Look closely at the catch block: it simply returns the exception.
This is important, as we need to ensure that the exception gets put into the channel.
Next, we update our consumer code:
(go (try (let [result (<? (->> (process) (map> #(* % %)) (map> #(prn %))))] (prn "result is: " result)) (catch Exception e (prn "Oops, an error happened! We better do something about it here!")))) ;; "Oops, an error happened! We better do something about it here!"
This time, we use <? in place of <!. This makes sense, as it will rethrow any exceptions found in the channel. As a result, we can now use a simple try/catch block to regain control over our exceptions.