Java编程方法论:响应式Spring Reactor 3设计与实现
上QQ阅读APP看本书,新人免费读10天
设备和账号都新为新人

2.5 通过BaseSubscriber自定义订阅者

在前面的例子中,一直都通过在subscribe方法中添加参数来定义Subscriber,那么在这里会通过继承BaseSubscriber来实现一个自定义的Subscriber,请观察下面的简单实现:

为什么要重写这两个方法呢?下面先试着观察一下BaseSubscriber中我们经常会关心的几个点:

可以知道,在参与订阅的时候,会先调用onSubscribe方法,通过这个回调方法就可以很容易地定义是使用推还是拉的方式。如果使用的是拉的方式,那么就在hookOnSubscribe回调方法内进行request方法调用。然后,执行重要的方法onNext,其中包含最重要的消费逻辑,所以必须重写hookOnNext回调方法。其实hookOnXXX方法都是空实现,需要根据自己的实际情况加以重写。下面展示一个很简单的实现,就不多做解释了。

于是,可以进行如下操作:

输出如下:

同样,也可以在源的subscribe方法中使用匿名类实现BaseSubscriber:

代码执行完毕,会得到与上面一样的结果,这里在hookOnSubscribe中使用了requestUnbounded方法,这也是BaseSubscriber提供的直接以Long类型的最大值进行请求的方式,其实际上还是调用了request(Long.MAX_VALUE))。这样,无形中就又回到了主动推的PUSH模式了。