2.2 用Flux.create创建源
讲解完订阅操作的细节,下面来介绍一下Flux.create源的创建。而且,还可以将其与RxJava 2的Flowable.create进行对比。下面先来看看Flux.create的源码:
从上面的源码可以看到,需要传入的参数emitter和背压策略,以及与RxJava 2不一样的创建支持源的模式。这里要注意的是,Consumer方法接收的参数类型为?super FluxSink<T>,这是在告诉我们它需要一个FluxSink(这是一个单接口)消费类型。此处,我们不对<?extends T>和<?super T>的具体区别进行分析,感兴趣的读者可以在网上查阅相关资料。但这里要说明一点,那就是可以直接将<?super T>里的问号看成Object。假如操作的是一个容器的话(List<?super T>),可以很轻易地添加T类型的父类或子类型(默认类型向上强转)的对象,并将其全部看作Object。但List<?extends T>则不行,如果T类型的子类是T1、T2,而T1、T2根本就不是一个类型的(添加的时候并不会默认类型向上强转),这时编译器将无法确定具体是哪个类型,因为编译器确定的是问号所属的类型,而不是后面的上限或者下限,编译器只进行检查。这点需要记住!所以,在给List<?extends T>添加T1、T2类型的元素时,会出错。
问号所表达的意思是,我就是这个T类型的子类(extends,所以不可能会默认强转类型)或者父类(super,自己可以是自己的父类),这也是很多人一直迷惑而久久不能熟练使用的原因。
FluxSink拥有与RxJava 2中的FlowableEmitter相似的能力,只不过Reactor更加直接。在RxJava 2中可以看到Flowable create(FlowableOnSubscribe source,BackpressureStrategy mode),其中的FlowableOnSubscribe虽然有一个void subscribe(@NonNull FlowableEmitter<T>e)方法,但是直接将其看作JDK 8+中的一个java.util.function.Consumer#accept方法即可,accept方法也接收的是T类型,返回void,所以Reactor 3直接省掉了诸多麻烦的包装,直接调用java.util.function.Consumer轻装上阵,这时业务也更加清晰明了了。从业务层面来看,FlowableEmitter表达的就是一个产生数据源的动作。
2.2.1 FluxCreate细节探索
接下来,看看FluxCreate的一些源码细节:
从subscribe方法来看,其首先做的是根据背压策略包装一个用于做元素下发动作的sink类。我们知道,只有在产生订阅的时候(即调用subscribe方法的时候),才会进行元素的生产、下发。可以将这个subscribe方法看作执行一条业务链的触发者。对于这条业务链中每个环节的任务,我们往往只能确定要做什么事,会得到什么类型的结果,但暂时不会去考虑该环节任务的具体实现。我们可以将这些任务看作抽象任务,而这些抽象任务则是基于subscribe方法所传入的CoreSubscriber(订阅者)来进行装饰增强包装并承载的。但从上游源生产者的角度看,它所操作的就是一个很单纯的CoreSubscriber对象,只需要基于CoreSubscriber的几个核心方法来进行设计实现即可。
我们来看看用于承接上游源生产者与下游订阅者之间联系的BaseSink的相关源码:
结合之前介绍的内容,对于上面源码BaseSink中定义的volatile变量的一些操作,我们理解起来应该完全没有压力,包括request(long n)内部的细节,难点主要在原子操作上,前面已经讲解过了。关于request(long n),它存在于Reactor 3库中的很多地方,其实现的整体思路与RxJava 2中的思路没太多差别,仅request请求元素的存储方式发生了些许改变。
再次强调,在产生订阅的时候,需要先确定元素请求数量,即订阅者会先调用onSubscribe方法来确定数量,并在此onSubscribe方法中调用FluxCreate.BaseSink#request方法,在其最后调用onRequestedFromDownstream,即根据不同的策略来执行相应的元素下发动作(支持背压的话,会调用BufferAsyncSink内的drain方法);接着接入生产元素的逻辑Consumer<?super FluxSink<T>>source,进行从生产者到消费者的对接,也就是下面源码中的代码逻辑:
接下来讲解一下具体策略的实现,其整体思路和RxJava 2中的是一样的。拿BufferAsyncSink来讲,同样是队列、drain操作,对于Reactor 3来说,其buffer-size是256,产生源的队列依然是一个无界队列,只不过初始大小为256:
我们来看一个Demo:
其默认的背压策略为OverflowStrategy.BUFFER,然后测试会产生如下结果:
在这里,publishOn操作实现了小货车的功能,在RxJava 2中也有类似的功能,具体的区别后面详解。可以看到,整个执行效果与《Java编程方法论:响应式RxJava与代码设计实战》一书中介绍过的RxJava 2中的执行过程没什么区别。关于其他背压策略,就不赘述了,与RxJava 2中的如出一辙,并且onBackpressureBuffer操作的用法也是如此。
2.2.2 Flux的快速包装方法
下面介绍Flux的快速包装方法,主要分为支持背压的方法和不支持背压的方法,下面分别介绍。
支持背压的方法如下。
◎ just:可以指定序列中包含的全部元素。创建的Flux源序列会在发布元素之后自动结束。
◎ fromArray、fromIterable和fromStream:可以从一个数组、Iterable对象或Stream对象中创建Flux对象。
◎ empty:创建一个不包含任何元素,只发布结束消息的源序列。
◎ error(Throwable error):创建一个只包含错误消息的源序列。
◎ never:创建一个不包含任何消息通知的序列。
◎ range(int start,int count):创建包含从start起始的count个Integer对象的源序列。
不支持背压(想要支持背压的话,可以手动添加,调用onBackpressureXXX方法即可)的方法如下。
◎ interval(Duration period)和interval(Duration delay,Duration period):创建一个包含了从0开始递增的Long对象的源序列。其中包含的元素按照指定的间隔时间来发布。除了间隔时间外,还可以指定起始元素发布之前的延迟时间,指定的时间间隔和延迟时间的单位为ms。在未手动添加背压策略的情况下,具体消费行为在有延时的情况下很容易发生异常,其原理与RxJava 2中的一模一样。
下面看一个Demo:
另外,大家可能会对Flux.fromStream操作感兴趣,它内部主要利用了stream.iterator方法,并得到了Iterator对象,实现方式与RxJava 2中的io.reactivex.internal.operators.flowable.FlowableFromIterable基本相同。可以对比两者的源码实现,主要区别在于使用JDK提供的Stream时,需要在出现异常或结束操作的时候关闭流,即会调用stream::close。
2.2.3 Reactor 3中的generate方法
图2-2是Reactor 3中generate方法的运行原理。
图2-2
generate方法通过同步(会限制对下游操作API的调度选择)和逐一方式来产生Flux元素序列。元素序列的产生是通过调用SynchronousSink对象的next、complete和error(Throwable)方法来完成的。逐一产生是指在具体的元素产生逻辑中,next方法最多只能被调用一次。在有些情况下,元素序列的产生可能是有状态的,需要用到某些状态对象。此时可以使用generate方法的另一种形式:generate(Callable<S> stateSupplier,BiFunction<S,SynchronousSink<T>,S>generator),其中的stateSupplier用于提供初始的状态对象。在产生元素序列时,状态对象会作为generator的第一个参数传入,可以在对应的逻辑中对该状态对象进行修改,以供下一次产生时使用:
在上面的源码中,Reactor的generate方法和RxJava 2中的实现也很相似,细节稍有不同,比如其实现同步的方式。下面,我将大家在阅读源码过程中可能会产生疑惑的地方讲解一下。
首先观察以下源码:
产生订阅所执行的动作很简单,就是生产几个初始状态值,然后调用消费者CoreSubscriber的onSubscribe方法,传入一个Subscription对象,默认会调用该Subscription对象的request方法:
在这里,说明一下Operators.addCap(REQUESTED,this,n)==0操作:
前面介绍过LambdaSubscriber,在默认的情况下,其请求数为Long.MAX_VALUE,也就是toAdd的值为Long.MAX_VALUE。这里的REQUESTED的初始值为0,也就是r等于0,所以会跳过第一个if语句,接着执行addCap方法,得到的u值就是Long.MAX_VALUE。但要注意,r依然等于0,因为基本类型参数属于值传递,所以updater.compareAndSet(instance,r,u)内产生的数据计算不会影响r的值,addCap方法返回的就是0,这就是说,Operators.addCap(REQUESTED,this,n)==0这个操作为true,才会进入下面的判断。而RxJava 2中的BackpressureHelper.add(this,n)!=0L,其内部实现和上面所述是一致的,若请求数已经设定过,则r不等于0,此时BackpressureHelper.add(this,n)!=0L为false,直接返回即可,接着Reactor会将余下的操作逻辑抽取出来,根据请求数是否为Long.MAX_VALUE来选择执行fastPath或slowPath(n)方法:
这里也是generate方法实现同步的地方,其中只需要弄明白fastPath方法。查看s=g.apply(s,this)操作,之前分析过,FluxSink可以达到与RxJava 2中的FlowableEmitter相似的能力,在这里就是SynchronousSink。那么对照RxJava 2中下发元素的动作,即在BiFunction<S,?super Emitter<T>,S> generator的实现中只执行一次Emitter的onNext操作。此处Reactor会执行GenerateSubscription#next操作,那么会在下发元素前先设定hasValue为true,然后在fastPath方法结束的时候将hasValue设定为false。在这个过程中,假如下游有多个线程同时在执行异步接收操作,那么只要有一个线程中的fastPath方法结束,剩下的线程在进行if(!hasValue)判断时就会进入其执行体,从而产生异常。这也告诉我们,此方法不支持下游subscribeOn的多线程池异步请求操作,需要大家注意。
可以查看以下Demo,希望读者可以从这个Demo中找出上面想表达的东西,带着问题继续读接下来的内容:
执行结果如下:
为什么没有报错?为什么没有在产生多线程的同时并发请求,而只是相对于主线程做了异步处理?此处的“猫腻”接下来会一步步揭晓。
为了保证此generate API的通用性,RxJava 2和Reactor都下了很大功夫,这里也是RxJava 2中的Flowable没有涉及的地方,Reactor 3中的generate方法算是对RxJava 2进行了一点优化。RxJava 2和Reactor 3中元素生产者处理下游异步请求的代码设计原理差不多,所以放在Reactor里一并讲解。在这里,两者都是通过一个QueueSubscription类型的实现类做到如下效果的:从元素生产下发到消费的整个过程中,根据自身实际情况来协调各个操作间的同步和异步行为。