Fixed buffer
This is the simplest form of buffering. It is fixed to a chosen number, n, allowing producers to put items in the channel without having to wait for consumers:
(def result (chan (async/buffer 5))) (go-loop [] (<! (async/timeout 1000)) (when-let [x (<! result)] (prn "Got value: " x) (recur))) (go (doseq [n (range 5)] (>! result n)) (prn "Done putting values!") (async/close! result)) ;; "Done putting values!" ;; "Got value: " 0 ;; "Got value: " 1 ;; "Got value: " 2 ;; "Got value: " 3 ;; "Got value: " 4
In the preceding example, we created a buffer of size 5 and started a go loop to consume values from it. The go loop uses a timeout channel to delay each loop cycle.
Then, we started another go block that puts numbers from 0 to 4 into the result channel and prints to the console once it's done.
By then, the first timeout will have expired and we will see the values printed to the REPL.
Now, let's watch what happens if the buffer isn't large enough:
(def result (chan (async/buffer 2))) (go-loop [] (<! (async/timeout 1000)) (when-let [x (<! result)] (prn "Got value: " x) (recur))) (go (doseq [n (range 5)] (>! result n)) (prn "Done putting values!") (async/close! result)) ;; "Got value: " 0 ;; "Got value: " 1 ;; "Got value: " 2 ;; "Done putting values!" ;; "Got value: " 3 ;; "Got value: " 4
This time, our buffer size is 2, but everything else is the same. As you can see, the go loop finishes much later, as it attempted to put another value in the result channel and was blocked/parked, since its buffer was full.
As with most things, this might be OK, but if we are not willing to block a fast producer just because we can't consume its items fast enough, we must look for another option.