
3.5 RDD内部的计算机制
RDD的多个Partition分别由不同的Task处理。Task分为两类:shuffleMapTask、resultTask。本节基于源码对RDD的计算过程进行深度解析。
3.5.1 Task解析
Task是计算运行在集群上的基本计算单位。一个Task负责处理RDD的一个Partition,一个RDD的多个Partition会分别由不同的Task去处理,通过之前对RDD的窄依赖关系的讲解,我们可以发现在RDD的窄依赖中,子RDD中Partition的个数基本都大于等于父RDD中Partition的个数,所以Spark计算中对于每一个Stage分配的Task的数目是基于该Stage中最后一个RDD的Partition的个数来决定的。最后一个RDD如果有100个Partition,则Spark对这个Stage分配100个Task。
Task运行于Executor上,而Executor位于CoarseGrainedExecutorBackend(JVM进程)中。
Spark Job中,根据Task所处Stage的位置,我们将Task分为两类:第一类为shuffleMapTask,指Task所处的Stage不是最后一个Stage,也就是Stage的计算结果还没有输出,而是通过Shuffle交给下一个Stage使用;第二类为resultTask,指Task所处Stage是DAG中最后一个Stage,也就是Stage计算结果需要进行输出等操作,计算到此已经结束;简单地说,Spark Job中除了最后一个Stage的Task为resultTask,其他所有Task都为shuffleMapTask。
3.5.2 计算过程深度解析
Spark中的Job本身内部是由具体的Task构成的,基于Spark程序内部的调度模式,即根据宽依赖的关系,划分不同的Stage,最后一个Stage依赖倒数第二个Stage等,我们从最后一个Stage获取结果;在Stage内部,我们知道有一系列的任务,这些任务被提交到集群上的计算节点进行计算,计算节点执行计算逻辑时,复用位于Executor中线程池中的线程,线程中运行的任务调用具体Task的run方法进行计算,此时,如果调用具体Task的run方法,就需要考虑不同Stage内部具体Task的类型,Spark规定最后一个Stage中的Task的类型为resultTask,因为我们需要获取最后的结果,所以前面所有Stage的Task是shuffleMapTask。
RDD在进行计算前,Driver给其他Executor发送消息,让Executor启动Task,在Executor启动Task成功后,通过消息机制汇报启动成功信息给Driver。Task计算示意图如图3-6所示。

图3-6 Task计算示意图
详细情况如下:Driver中的CoarseGrainedSchedulerBackend给CoarseGrainedExecutor-Backend发送LaunchTask消息。
(1)首先反序列化TaskDescription。
CoarseGrainedExecutorBackend.scala的receive的源码如下:

launchTask中调用了decode方法,解析读取dataIn、taskId、attemptNumber、executorId、name、index等信息,读取相应的JAR、文件、属性,返回TaskDescription值。
Spark 2.2.1版本的TaskDescription.scala的decode的源码如下:

Spark 2.4.3版本的TaskDescription.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第7行之后新增加partitionId的变量。

(2)Executor会通过launchTask执行Task。
(3)Executor的launchTask方法创建一个TaskRunner实例在threadPool来运行具体的Task。
Executor.scala的launchTask的源码如下:

在TaskRunner的run方法首先会通过statusUpdate给Driver发信息汇报自己的状态,说明自己处于running状态。同时,TaskRunner内部会做一些准备工作,如反序列化Task的依赖,通过网络获取需要的文件、Jar等;然后反序列化Task本身。
Spark 2.2.1版本的Executor.scala的run方法的源码如下:

Spark 2.4.3版本的Executor.scala的run源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第15行代码变量taskStart的名称调整为taskStartTime。
1. var taskStartTime: Long = 0
(4)调用反序列化后的Task.run方法来执行任务,并获得执行结果。
Spark 2.2.1版本的Executor.scala的run方法的源码如下:

Spark 2.4.3版本的Executor.scala的run源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第2行taskStart名称调整为taskStartTime。
上段代码中第7行try方法调整为Utils.tryWithSafeFinally方法。

task.run方法调用了runTask的方法,而runTask方法是一个抽象方法,runTask方法内部会调用RDD的iterator()方法,该方法就是针对当前Task对应的Partition进行计算的关键所在,在处理的方法内部会迭代Partition的元素,并交给我们自定义的function进行处理。
Task.scala的run方法的源码如下:

task有两个子类,分别是ShuffleMapTask和ResultTask,下面分别对两者进行讲解。
1.ShuffleMapTask
ShuffleMapTask.scala的源码如下:

首先,ShuffleMapTask会反序列化RDD及其依赖关系,然后通过调用RDD的iterator方法进行计算,而iterator方法中进行的最终运算的方法是compute()。
RDD.scala的iterator方法的源码如下:

其中,RDD.scala的computeOrReadCheckpoint的源码如下:

RDD的compute方法是一个抽象方法,每个RDD都需要重写的方法。
此时,选择查看MapPartitionsRDD已经实现的compute方法,可以发现compute方法的实现是通过f方法实现的,而f方法就是我们创建MapPartitionsRDD时输入的操作函数。
Spark 2.2.1版本的MapPartitionsRDD.scala的源码如下:

Spark 2.4.3版本MapPartitionsRDD.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第4行之后新增2个参数isFromBarrier、isOrderSensitive,isFromBarrier参数指示此RDD是否从RDDBarrier转换,至少含有一个RDDBarrier的Stage阶段将转变为屏障阶段(BarrierS tage)。isOrderSensitive参数指示函数是否区分顺序。
上段代码中第18行之后新增isBarrier_、getOutputDeterministicLevel方法。%

注意:通过迭代器的不断叠加,将每个RDD的小函数合并成一个大的函数流。
然后在计算具体的Partition之后,通过shuffleManager获得的shuffleWriter把当前Task计算的结果根据具体的shuffleManager实现写入到具体的文件中,操作完成后会把MapStatus发送给Driver端的DAGScheduler的MapOutputTracker。
2.ResultTask
Driver端的DAGScheduler的MapOutputTracker把shuffleMapTask执行的结果交给ResultTask,ResultTask根据前面Stage的执行结果进行shuffle后产生整个job最后的结果。
ResultTask.scala的runTask的源码如下:

而ResultTask的runTask方法中反序列化生成func函数,最后通过func函数计算出最终的结果。