Hands-On Reactive Programming with Clojure
上QQ阅读APP看书,第一时间看更新

Rolling averages

Now that we can see the up-to-date stock price for a given company, it makes sense to display a rolling average of the past, say, five stock prices. In a real scenario, this would provide an objective view of a company's share trend on the stock market.

Let's extend our program to accommodate this new requirement.

First, we'll need to modify our namespace definition:

(ns stock-market-monitor.core 
  (:require [seesaw.core :refer :all]) 
  (:import (java.util.concurrent ScheduledThreadPoolExecutor 
                                 TimeUnit) 
           (clojure.lang PersistentQueue))) 

The only change is a new import clause for Clojure's PersistentQueue class. We will be using that later.

We'll also need a new label to display the current running average:

(def running-avg-label (label "Running average: -")) 
(config! main-frame :content 
         (border-panel 
          :north  price-label 
          :center running-avg-label 
          :border 5)) 

Next, we need a function to calculate rolling averages. A rolling—or moving—average is a calculation in statistics where you take the average of a subset of items in a dataset. This subset has a fixed size and it shifts forward as data comes in. This will become clear with an example.

Suppose you have a list with numbers from 1 to 10, inclusive. If we use 3 as the subset size, the rolling averages are as follows:

[1 2 3 4 5 6 7 8 9 10] => 2.0 
[1 2 3 4 5 6 7 8 9 10] => 3.0 
[1 2 3 4 5 6 7 8 9 10] => 4.0 

The highlighted parts in the preceding code show the current window is used to calculate the subset average.

Now that we know what rolling averages are, we can move on and implement them in our program:

(defn roll-buffer [buffer num buffer-size] 
  (let [buffer (conj buffer num)] 
    (if (> (count buffer) buffer-size) 
      (pop buffer) 
      buffer))) 
 
(defn avg [numbers] 
  (float (/ (reduce + numbers) 
            (count numbers)))) 
 
(defn make-running-avg [buffer-size] 
  (let [buffer (atom clojure.lang.PersistentQueue/EMPTY)] 
    (fn [n] 
      (swap! buffer roll-buffer n buffer-size) 
      (avg @buffer)))) 
 
(def running-avg (make-running-avg 5)) 

The roll-buffer function is a utility function that takes a queue, a number, and a buffer size as arguments. It adds that number to the queue, popping the oldest element if the queue goes over the buffer limit, thus causing its contents to roll over.

Next, we have a function for calculating the average of a collection of numbers. We cast the result to float if there's an uneven division.

Finally, the higher-order make-running-avg function returns a stateful, single argument function that closes over an empty persistent queue. This queue is used to keep track of the current subset of data.

We then create an instance of this function by calling it with a buffer size of 5 and save it to the running-avg var. Each time we call this new function with a number, it will add it to the queue using the roll-buffer function and then finally return the average of the items in the queue.

The code we have written to manage the thread pool will be reused as is, so all that is left to do is update our periodic function:

(defn worker [] 
  (let [price (share-price "XYZ")] 
    (->> (str "Price: " price) (text! price-label)) 
    (->> (str "Running average: " (running-avg price)) 
         (text! running-avg-label)))) 
 
(defn -main [& args] 
  (show! main-frame) 
  (.addShutdownHook (Runtime/getRuntime) 
                    (Thread. #(shutdown @pool))) 
  (init-scheduler 1) 
  (run-every @pool 500 
             #(invoke-now (worker)))) 

Since our function isn't a one-liner anymore, we abstract it away in its own function, called worker. As before, it updates the price label, but we have also extended it to use the running-avg function that we created earlier.

We're ready to run the program once more:

lein trampoline run -m stock-market-monitor.core  

You should see a window like the one that's shown in the following screenshot:

You should see that, in addition to displaying the current share price for XYZ, the program also keeps track and refreshes the running average of the stream of prices.