Reactive Programming in Kotlin
上QQ阅读APP看书,第一时间看更新

How Observable works

As we stated earlier, an Observable has three most important events/methods; let's discuss them one by one:

  • onNext: Observable passes all items one by one to this method.
  • onComplete: When all items have gone through the onNext method, Observable calls the onComplete method.
  • onError: When Observable faces any error, it calls the onError method to deal with the error, if defined. Note that both onError and onComplete are terminal events, and if onError is called, then it would never call onComplete and vice versa.

One thing to note here, the item in Observable that we are talking about can be anything; it is defined as Observable<T>, where T can be any class; even an array/list can be assigned as an Observable.

Let's look at the following image:

Let's look at this code example to understand it better:

    fun main(args: Array<String>) { 
 
      val observer:Observer<Any> = object :Observer<Any>{//1 
        override fun onComplete() {//2 
            println("All Completed") 
        } 
 
        override fun onNext(item: Any) {//3 
            println("Next $item") 
        } 
 
        override fun onError(e: Throwable) {//4 
            println("Error Occured $e") 
        } 
 
        override fun onSubscribe(d: Disposable) {//5 
            println("Subscribed to $d") 
        } 
      } 
 
      val observable: Observable<Any> = listOf
("One", 2, "Three", "Four", 4.5, "Five", 6.0f).toObservable() //6 observable.subscribe(observer)//7 val observableOnList: Observable<List<Any>> =
Observable.just(listOf("One", 2, "Three", "Four",
4.5, "Five", 6.0f), listOf("List with Single Item"), listOf(1,2,3,4,5,6))//8 observableOnList.subscribe(observer)//9 }

In the preceding example, we declared the observer instance of Any datatype on comment (1).

Here, we are taking benefit of the Any datatype. In Kotlin, every class is a child class of Any. Also, in Kotlin, everything is class and object; there is no separate primitive datatype.

The observer interface has four methods declared in it. The onComplete() method at comment 2 gets called when Observable is finished with all its items without any error. On comment 3, we defined the onNext(item: Any) function, which will be called by observable for each item it has to emit. In that method, we will print the data to the console. On comment 4, we defined the onError(e: Throwable) method, which will be called in case any error is faced by Observable. On comment 5, the onSubscribe(d: Disposable) method will get called whenever Observer subscribes to Observable. On comment 6, we will create Observable from a list (val observable) and subscribe to observable with observer on comment 7. On comment 8, we will create an observable (val observableOnList) again, this it holds lists as items.

The output of the program is as follows:

So, as you can see in the output, for the first subscription (comment 7), when we subscribe to Observable, it calls the onSubscribe method, and then Observable starts emitting items as Observer starts receiving them on the onNext method and prints them. When all items are emitted from Observable, it calls the onComplete method to denote that all items have been successfully emitted. Same with the second one, except that, here, each item is a list.

So, as we gained some basis in Observables, let's learn various ways to create Observable—factory methods for Observable.