Hands-On Reactive Programming with Clojure
上QQ阅读APP看书,第一时间看更新

Creating observables

This chapter is all about Reactive Extensions, so let's go ahead and create a project called rx-playground which we will be using in our exploratory tour. We will use RxClojure (see https://github.com/ReactiveX/RxClojure), a library that provides Clojure bindings for RxJava() (see https://github.com/ReactiveX/RxJava):

$ lein new rx-playground

Open the project file and add a dependency on RxJava's Clojure bindings:

(defproject rx-playground "0.1.0-SNAPSHOT" 
  :description "FIXME: write description" 
  :url "http://example.com/FIXME" 
  :license {:name "Eclipse Public License" 
            :url "http://www.eclipse.org/legal/epl-v10.html"} 
  :dependencies [[org.clojure/clojure "1.9.0"] 
                 [io.reactivex/rxclojure "1.0.0"]]) 

Now, fire up a REPL in the project's root directory so that we can start creating some observables:

    $ lein repl  

The first thing we need to do is import RxClojure, so let's get this out of the way by typing the following in the REPL:

(require '[rx.lang.clojure.core :as rx]) 
(import '(rx Observable)) 

The simplest way to create a new observable is by calling the return function:

(def obs (rx/return 10)) 

Now, we can subscribe to it:

(rx/subscribe obs 
              (fn [value] 
                (prn (str "Got value: " value)))) 

This will print the "Got value: 10" string to the REPL.

The subscribe function of an observable allows us to register handlers for three main things that happen throughout its life cycle: new values, errors, or a notification that the observable is done emitting values. This corresponds to the onNext, onError, and onCompleted methods of the Observer interface, respectively.

In the preceding example, we are simply subscribing to onNext, which is why we get notified about the observable's only value, 10.

A single-value observable isn't terribly interesting, though. Let's create and interact with one that emits multiple values:

(-> (rx/seq->o [1 2 3 4 5 6 7 8 9 10]) 
    (rx/subscribe prn)) 

This will print the numbers from 1 to 10, inclusive, to the REPL. seq->o is a way to create observables from Clojure sequences. It just so happens that the preceding snippet can be rewritten using Rx's own range operator:

(-> (rx/range 1 10) 
    (rx/subscribe prn)) 

Of course, this doesn't present any advantages to working with raw values or sequences in Clojure yet.

But what if we need an observable that emits an undefined number of integers at a given interval? This becomes challenging to represent as a sequence in Clojure, but Rx makes it trivial:

(import '(java.util.concurrent TimeUnit))

(def repl-out *out*)
(defn prn-to-repl [& args]
(binding [*out* repl-out]
(apply prn args)))
(rx/subscribe (Observable/interval 100 TimeUnit/MILLISECONDS) prn-to-repl)
RxClojure doesn't provide bindings to all of RxJava's API yet. The interval method is one such example of this. We're required to use interoperability and call the method directly on the Observable class from RxJava.

Observable/interval takes a number and a time unit as arguments. In this case, we are telling it to emit an integer starting from zero every 100 milliseconds. If we type this in a REPL-connected editor, however, two things will happen:

  • We will not see any output (depending on your REPL; this is true for Emacs)
  • We will have a rogue thread emitting numbers indefinitely

Both issues arise from the fact that Observable/interval is the first factory method we have used that doesn't emit values synchronously. Instead, it returns an observable that defers the work to a separate thread.

The first issue is simple enough to fix. Functions such as prn will print to whatever the dynamic var *out* is bound to. When working in certain REPL environments, such as Emacs, this is bound to the REPL stream, which is why we can generally see everything we print.

However, since Rx is deferring the work to a separate thread, *out* isn't bound to the REPL stream anymore, so we don't see the output. To fix this, we need to capture the current value of *out* and bind it in our subscription. This will be incredibly useful as we experiment with Rx in the REPL. Let's revisit the prn-to-repl helper function which we defined earlier:

(def  repl-out *out*) 
(defn prn-to-repl [& args] 
  (binding [*out* repl-out] 
    (apply prn args))) 

The first thing we do is create var called repl-out that contains the current REPL stream. Next, we create a function called prn-to-repl which works just like prn, except it uses the binding macro to create a new binding for *out* that is valid within that scope.

This still leaves us with the rogue thread problem. Now is the appropriate time to mention that the subscribe method from an observable returns a subscription object. By holding onto a reference to it, we can call its unsubscribe method to indicate that we are no longer interested in the values that are produced by that observable.

Putting it all together, our interval example can be rewritten like so:

(def subscription (rx/subscribe 
(Observable/interval 100 TimeUnit/MILLISECONDS) prn-to-repl)) (Thread/sleep 1000) (rx/unsubscribe subscription)

We create a new interval observable and immediately subscribe to it, just as we did before. This time, however, we assign the resulting subscription to a local var. Note that it now uses our helper function, prn-to-repl, so we will start seeing values being printed to the REPL straight away.

Next, we sleep the current (the REPL) thread for a second. This is enough time for Observable to produce numbers from 0 to 9. That's roughly when the REPL thread wakes up and unsubscribes from that observable, causing it to stop emitting values.