Hands-On Reactive Programming with Python
上QQ阅读APP看书,第一时间看更新

ReactiveX Subject

A Subject is a kind of proxy or bridge that implements both an observable and an observer. Several variants of subjects exist in ReactiveX and in RxPY. The RxPY implementation of the Subject object corresponds to the PublishSubject in ReactiveX terminology. Its behavior is a follows: it emits—to an observer—the items of its source observable that are emitted after the observer subscribes to the Subject. This behavior is shown in the following figure:

Figure 3.3: The PublishSubject, or Subject, in RxPY

If the source observable of the subject was a cold observable, then it becomes a hot observable. This means that the items sent by the source observable before a subscription are lost. The next chapter will detail hot and cold observables. In the case of an error, then Subject completes immediately with this error.

Here is an example of the usage of Subject:

from rx.subjects import Subject

proxy = Subject()
proxy.subscribe(
on_next=lambda i: print("s1 item: {}".format(i)),
on_error=lambda e: print("s1 error: {}".format(e)),
on_completed=lambda: print("s1 completed")
)
a = Observable.from_([1,2,3])
a.subscribe(proxy)
print('subscribed to a')

proxy.subscribe(
on_next=lambda i: print("s2 item: {}".format(i)),
on_error=lambda e: print("s2 error: {}".format(e)),
on_completed=lambda: print("s2 completed")
)

This code prints the following results:

s1 item: 1
s1 item: 2
s1 item: 3
s1 completed
subscribed to a
s2 completed

The first subscription to the proxy subject is done before the subscription to the a observable; that is, before the subject subscribes to the a observable. So it receives all items of the a observable and prints them until completion. Once the a observable is completed, and by consequence the proxy subject is completed, a second subscription is done on the proxy. Since the proxy observable has already completed, this second subscription immediately receives the completion notification.