
4.5 打通Spark系统运行内幕机制循环流程
Spark通过DAGScheduler面向整个Job划分出了不同的Stage,划分Stage之后,Stage从后往前划分,执行的时候从前往后执行,每个Stage内部有一系列的任务,Stage里面的任务是并行计算,并行任务的逻辑是完全相同的,但处理的数据不同。DAGScheduler以TaskSet的方式,把一个DAG构建的Stage中的所有任务提交给底层的调度器TaskScheduler。TaskScheduler是一个接口,与具体的任务解耦合,可以运行在不同的调度模式下,如可运行在Standalone模式,也可运行在Yarn上。
Spark基础调度(见图4-6)包括RDD Objects、DAGScheduler、TaskScheduler、Worker等内容。
DAGScheduler在提交TaskSet给底层调度器的时候是面向接口TaskScheduler的,这符合面向对象中依赖抽象而不依赖具体的原则,带来底层资源调度器的可插拔性,导致Spark可以运行在众多的资源调度器模式上,如Standalone、Yarn、Mesos、Local、EC2、其他自定义的资源调度器;在Standalone的模式下我们聚焦于TaskSchedulerImpl。

图4-6 Spark基础调度图
TaskScheduler是一个接口Trait,底层任务调度接口,由[org.apache.spark.scheduler.TaskSchedulerImpl]实现。这个接口允许插入不同的任务调度程序。每个任务调度器在单独的SparkContext中调度任务。任务调度程序从每个Stage的DAGScheduler获得提交的任务集,负责发送任务到集群运行,如果任务运行失败,将重试,返回DAGScheduler事件。
Spark 2.2.1版本的TaskScheduler.scala的源码如下:


Spark 2.4.3版本的TaskScheduler.scala的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第29行之后新增加了killAllTaskAttempts方法。
上段代码中第57行之后新增workerRemoved的方法。

DAGScheduler把TaskSet交给底层的接口TaskScheduler,具体实现时有不同的方法。TaskScheduler主要由TaskSchedulerImpl实现。
TaskSchedulerImpl也有自己的子类YarnScheduler。

YarnScheduler的子类YarnClusterScheduler实现如下:

默认情况下,我们研究Standalone的模式,所以主要研究TaskSchedulerImpl。DAGScheduler把TaskSet交给TaskScheduler,TaskScheduler中通过TastSetManager管理具体的任务。TaskScheduler的核心任务是提交TaskSet到集群运算,并汇报结果。
为TaskSet创建和维护一个TaskSetManager,并追踪任务的本地性以及错误信息。
遇到延后的Straggle任务,会放到其他节点重试。
向DAGScheduler汇报执行情况,包括在Shuffle输出lost的时候报告fetch failed错误等信息。
TaskSet是一个普通的类,第一个成员是tasks,tasks是一个数组。TaskSet的源码如下:

TaskScheduler内部有SchedulerBackend,SchedulerBackend管理Executor资源。从Standalone的模式来讲,具体实现是StandaloneSchedulerBackend(Spark 2.0版本将之前的AppClient名字更新为StandaloneAppClient)。
SchedulerBackend本身是一个接口,是一个trait。Spark 2.2.1版本的SchedulerBackend的源码如下:

Spark 2.4.3版本的SchedulerBackend.scala的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第31行之后新增maxNumConcurrentTasks方法。获取当前可并发启动的最大任务数。注意,请不要缓存此方法返回的值,因为添加/删除executors,数字可能会更改。
1. def maxNumConcurrentTasks(): Int
StandaloneSchedulerBackend:专门负责收集Worker的资源信息。接收Worker向Driver注册的信息,ExecutorBackend启动的时候进行注册,为当前应用程序准备计算资源,以进程为单位。
StandaloneSchedulerBackend的源码如下:

StandaloneSchedulerBackend里有一个Client: StandaloneAppClient。

StandaloneAppClient允许应用程序与Spark standalone集群管理器通信。获取Master的URL、应用程序描述和集群事件监听器,当各种事件发生时可以回调监听器。masterUrls的格式为spark://host:port,StandaloneAppClient需要向Master注册。
StandaloneAppClient在StandaloneSchedulerBackend.scala的start方法启动时进行赋值,用new()函数创建一个StandaloneAppClient。
StandaloneSchedulerBackend.scala的源码如下:

StandaloneAppClient.scala中,里面有一个类是ClientEndpoint,核心工作是在启动时向Master注册。StandaloneAppClient的start方法启动时,就调用new函数创建一个ClientEndpoint。
StandaloneAppClient的源码如下:

StandaloneSchedulerBackend在启动时构建StandaloneAppClient实例,并在StandaloneAppClient实例start时启动了ClientEndpoint消息循环体。ClientEndpoint在启动时会向Master注册当前程序。
StandaloneAppClient中ClientEndpoint类的onStart()方法如下:

这是StandaloneSchedulerBackend的第一个注册的核心功能。StandaloneSchedulerBackend继承自CoarseGrainedSchedulerBackend。而CoarseGrainedSchedulerBackend在启动时就创建DriverEndpoint,从实例的角度讲,DriverEndpoint也属于StandaloneSchedulerBackend实例。

StandaloneSchedulerBackend的父类CoarseGrainedSchedulerBackend在start的时候会实例化类型为DriverEndpoint(这就是我们程序运行时的经典对象Driver)的消息循环体。StandaloneSchedulerBackend在运行时向Master注册申请资源,当Worker的ExecutorBackend启动时会发送RegisteredExecutor信息向DriverEndpoint注册,此时StandaloneSchedulerBackend就掌握了当前应用程序拥有的计算资源,TaskScheduler就是通过StandaloneSchedulerBackend拥有的计算资源来具体运行Task的;StandaloneSchedulerBackend不是应用程序的总管,应用程序的总管是DAGScheduler、TaskScheduler,StandaloneSchedulerBackend向应用程序的Task分配具体的计算资源,并把Task发送到集群中。
SparkContext、DAGScheduler、TaskSchedulerImpl、StandaloneSchedulerBackend在应用程序启动时只实例化一次,应用程序存在期间始终存在这些对象。
这里基于Spark 2.2版本讲解如下。
Spark调度器三大核心资源为SparkContext、DAGScheduler和TaskSchedulerImpl。TaskSchedulerImpl作为具体的底层调度器,运行时需要计算资源,因此需要StandaloneSchedulerBackend。StandaloneSchedulerBackend设计巧妙的地方是启动时启动StandaloneAppClient,而StandaloneAppClient在start时有一个ClientEndpoint的消息循环体,ClientEndpoint的消息循环体启动的时候向Master注册应用程序。
StandaloneSchedulerBackend的父类CoarseGrainedSchedulerBackend在start启动的时候会实例化DriverEndpoint,所有的ExecutorBackend启动的时候都要向DriverEndpoint注册,注册最后落到了StandaloneSchedulerBackend的内存数据结构中,表面上看是在CoarseGrainedSchedulerBackend,但是实例化的时候是StandaloneSchedulerBackend,注册给父类的成员其实就是子类的成员。
作为前提问题:TaskScheduler、StandaloneSchedulerBackend是如何启动的?TaskScheduler-Impl是什么时候实例化的?
TaskSchedulerImpl是在SparkContext中实例化的。在SparkContext类实例化的时候,只要不是方法体里面的内容,都会被执行,(sched, ts)是SparkContext的成员,将调用createTaskScheduler方法。调用createTaskScheduler方法返回一个Tuple,包括两个元素:sched是我们的schedulerBackend;ts是taskScheduler。

createTaskScheduler里有很多运行模式,这里关注Standalone模式,首先调用new()函数创建一个TaskSchedulerImpl,TaskSchedulerImpl和SparkContext是一一对应的,整个程序运行的时候只有一个TaskSchedulerImpl,也只有一个SparkContext;接着实例化StandaloneSchedulerBackend,整个程序运行的时候只有一个StandaloneSchedulerBackend。createTaskScheduler方法如下:

在SparkContext实例化的时候通过createTaskScheduler来创建TaskSchedulerImpl和StandaloneSchedulerBackend。然后在createTaskScheduler中调用scheduler.initialize(backend)。
initialize的方法参数把StandaloneSchedulerBackend传进来,schedulingMode模式匹配有两种方式:FIFO、FAIR。
initialize的方法中调用schedulableBuilder.buildPools()。buildPools方法根据FIFOSchedul-ableBuilder、FairSchedulableBuilder不同的模式重载方法实现。

initialize的方法把StandaloneSchedulerBackend传进来了,但还没有启动Standalone-SchedulerBackend。在TaskSchedulerImpl的initialize方法中把StandaloneSchedulerBackend传进来,从而赋值为TaskSchedulerImpl的backend;在TaskSchedulerImpl调用start方法时会调用backend.start方法,在start方法中会最终注册应用程序。
下面来看SparkContext.scala的taskScheduler的启动。

其中调用了_taskScheduler的start方法。

TaskScheduler的start()方法没有具体实现。TaskScheduler子类的TaskSchedulerImpl的start()方法的源码如下:

TaskSchedulerImpl的start通过backend.start启动了StandaloneSchedulerBackend的start方法。
StandaloneSchedulerBackend的start方法中,将command封装注册给Master,Master转过来要Worker启动具体的Executor。command已经封装好指令,Executor具体要启动进程入口类CoarseGrainedExecutorBackend。然后调用new()函数创建一个StandaloneAppClient,通过client.start启动client。
StandaloneAppClient的start方法中调用new()函数创建一个ClientEndpoint。

ClientEndpoint的源码如下:

ClientEndpoint是一个ThreadSafeRpcEndpoint。ClientEndpoint的onStart方法中调用registerWithMaster(1)进行注册,向Master注册程序。registerWithMaster方法如下:

程序注册后,Master通过schedule分配资源,通知Worker启动Executor,Executor启动的进程是CoarseGrainedExecutorBackend,Executor启动后又转过来向Driver注册,Driver其实是StandaloneSchedulerBackend的父类CoarseGrainedSchedulerBackend的一个消息循环体DriverEndpoint。
总结:在SparkContext实例化时调用createTaskScheduler来创建TaskSchedulerImpl和StandaloneSchedulerBackend,同时在SparkContext实例化的时候会调用TaskSchedulerImpl的start,在start方法中会调用StandaloneSchedulerBackend的start,在该start方法中会创建StandaloneAppClient对象,并调用StandaloneAppClient对象的start方法,在该start方法中会创建ClientEndpoint,创建ClientEndpoint时会传入Command来指定具体为当前应用程序启动的Executor的入口类的名称为CoarseGrainedExecutorBackend,然后ClientEndpoint启动并通过tryRegisterMaster来注册当前的应用程序到Master中,Master接收到注册信息后如果可以运行程序,为该程序生产Job ID并通过schedule来分配计算资源,具体计算资源的分配是通过应用程序的运行方式、Memory、cores等配置信息决定的。最后,Master会发送指令给Worker,Worker为当前应用程序分配计算资源时会首先分配ExecutorRunner。ExecutorRunner内部会通过Thread的方式构建ProcessBuilder来启动另外一个JVM进程,这个JVM进程启动时加载的main方法所在的类的名称就是在创建ClientEndpoint时传入的Command来指定具体名称为CoarseGrainedExecutorBackend的类,此时JVM在通过ProcessBuilder启动时获得了CoarseGrainedExecutorBackend后加载并调用其中的main方法,在main方法中会实例化CoarseGrainedExecutorBackend本身这个消息循环体,而CoarseGrainedExecutorBackend在实例化时会通过回调onStart向DriverEndpoint发送RegisterExecutor来注册当前的CoarseGrainedExecutorBackend,此时DriverEndpoint收到该注册信息并保存在Standalone-SchedulerBackend实例的内存数据结构中,这样Driver就获得了计算资源。
CoarseGrainedExecutorBackend.scala的main方法如下:

CoarseGrainedExecutorBackend的main然后开始调用run方法。

在CoarseGrainedExecutorBackend的main方法中,通过env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(env.rpcEnv, driverUrl, executorId, hostname,cores, userClassPath, env))构建了CoarseGrainedExecutorBackend实例本身。