flatmap and friends
In the previous section, we learned how to transform and combine observables with operations such as map, reduce, and zip. However, the two observables that we just looked at—musicians and bands—were perfectly capable of producing values on their own; they did not need any extra input.
In this section, we will examine a different scenario: we'll learn how we can combine observables, where the output of one is the input of another. We encountered flatmap before, in Chapter 1, What is Reactive Programming? If you have been wondering what its role is, this section addresses exactly that.
Here's what we are going to do: given an observable representing a list of all positive integers, we'll calculate the factorial for all even numbers in that list. Since the list is too big, we'll take five items from it. The end result should be the factorials of 0, 2, 4, 6, and 8, respectively.
The first thing we need is a function to calculate the factorial of a number, n, as well as our observable:
(defn factorial [n] (reduce * (range 1 (inc n)))) (defn all-positive-integers [] (Observable/interval 1 TimeUnit/MICROSECONDS))
Using some type of visual aid will be helpful in this section, so we'll start with a marble diagram representing the previous observable:
The middle arrow represents time and it flows from left to right. This diagram represents an infinite Observable sequence, as indicated by the use of ellipsis at the end of it.
Since we're combining all the observables now, we'll create one that, given a number, emits its factorial using the helper function that we defined earlier. We'll use Rx's factorial method for this purpose:
(defn fact-obs [n] (rx/observable* (fn [observer] (rx/on-next observer (factorial n)) (rx/on-completed observer))))
This is very similar to the just-obs observable we created earlier in this chapter, except that it calculates the factorial of its argument and emits the result/factorial instead, ending the sequence immediately thereafter. The following diagram illustrates how this works:
We feed the number 5 to the observable, which in turn emits its factorial, 120. The vertical bar at the end of the timeline indicates that the sequence terminates then.
Running the code for this confirms that our function is correct:
(rx/subscribe (fact-obs 5) prn-to-repl) ;; 120
So far, so good. Now, we need to combine both observables so that we can achieve our goal. This is where the flatmap of Rx comes in. First, we'll see it in action, and then we'll get into the explanation:
(rx/subscribe (->> (all-positive-integers) (rx/filter even?) (rx/flatmap fact-obs) (rx/take 5)) prn-to-repl)
If we run the preceding code, it will print the factorials for 0, 2, 4, 6, and 8, just as we wanted:
1 2 24 720 40320
Most of the preceding code snippet should look familiar. The first thing we do is filter all even numbers from all-positive-numbers. This leaves us with the following observable sequence:
Much like all-positive-integers, this, too, is an infinite observable.
However, the next line of our code looks a little odd. We call flatmap and give it the fact-obs function; a function we know itself returns another observable. flatmap will call fact-obs with each value it emits. fact-obs will, in turn, return a single-value observable for each number. However, our subscriber doesn't know how to deal with observables! It's simply interested in the factorials!
This is why, after calling fact-obs to obtain an observable, flatmap flattens all of them into a single observable we can subscribe to. This is quite a mouthful, so let's visualize what this means:
As you can see in the preceding diagram, through the execution of flatmap, we end up with a list of observables. However, we don't care about each observable, but rather about the values they emit. flatmap, then, is the perfect tool as it combines—that is, flattens—all of them into the observable sequence, like the one that's shown at the bottom of the diagram.
You can think of flatmap as mapcat for observable sequences.
The rest of the code is straightforward. We simply take the first five elements from this observable and subscribe to it, as we have been doing so far.