5.5 Executor执行结果的处理方式
本节讲解Executor工作原理、ExecutorBackend注册源码解密、Executor实例化内幕、Executor具体工作内幕。
Master让Worker启动,启动了一个Executor所在的进程。在Standalone模式中,Executor所在的进程是CoarseGrainedExecutorBackend。
Master侧:Master发指令给Worker,启动Executor。
Worker侧:Worker接收到Master发过来的指令,通过ExecutorRunner启动另外一个进程来运行Executor。这里是指启动另外一个进程来启动Executor,而不是直接启动Executor。Master向Worker发送指令,Worker为什么启动另外一个进程?在另外一个进程中注册给Driver,然后启动Executor。因为Worker是管理机器上的资源的,所以机器上的资源变动时要汇报给Master。Worker不是用来计算的,不能在Worker中进行计算;Spark集群中有很多应用程序,需要很多Executor,如果不是给每个Executor启动一个对应的进程,而是所有的应用程序进程都在同一个Executor里面,那么一个程序崩溃将导致其他程序也崩溃。
启动CoarseGrainedExecutorBackend。CoarseGrainedExecutorBackend是Executor所在的进程。CoarseGrainedExecutorBackend启动时,须向Driver注册。通过发送RegisterExecutor向Driver注册,注册的内容是RegisterExecutor。
CoarseGrainedExecutorBackend.scala的onStart方法的源码如下。
1. override def onStart() { 2. logInfo("Connecting to driver: " + driverUrl) 3. rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => 4. //这是一个非常快的Action,所以可以用ThreadUtils.sameThread 5. driver = Some(ref) 6. ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)) 7. }(ThreadUtils.sameThread).onComplete { 8. //这是一个非常快的Action,所以可以用ThreadUtils.sameThread 9. case Success(msg) => 10. //经常收到true,可忽略 11. case Failure(e) => 12. exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false) 13. }(ThreadUtils.sameThread) 14. }
其中,RegisterExecutor是一个case class,源码如下。
1. case class RegisterExecutor( 2. executorId: String, 3. executorRef: RpcEndpointRef, 4. hostname: String, 5. cores: Int, 6. logUrls: Map[String, String]) 7. extends CoarseGrainedClusterMessage
CoarseGrainedExecutorBackend启动时,向Driver发送RegisterExecutor消息进行注册;Driver收到RegisterExecutor消息,在Executor注册成功后会返回消息RegisteredExecutor给CoarseGrainedExecutorBackend。这里注册的Executor和真正工作的Executor没有任何关系,其实注册的是RegisterExecutorBackend。可以将RegisteredExecutor理解为RegisterExecutorBackend。
需要特别注意的是,在CoarseGrainedExecutorBackend启动时向Driver注册Executor,其实质是注册ExecutorBackend实例,和Executor实例之间没有直接关系。
CoarseGrainedExecutorBackend是Executor运行所在的进程名称,CoarseGrained-ExecutorBackend本身不会完成任务的计算。
Executor才是正在处理任务的对象。Executor内部是通过线程池的方式来完成Task的计算的。Executor对象运行于CoarseGrainedExecutorBackend进程。
CoarseGrainedExecutorBackend和Executor是一一对应的。
CoarseGrainedExecutorBackend是一个消息通信体(其具体实现了ThreadSafeRPCEndpoint),可以发送信息给Driver,并可以接受Driver中发过来的指令,如启动Task等。
CoarseGrainedExecutorBackend继承自ThreadSafeRpcEndpoint,CoarseGrainedExecutor-Backend是一个消息通信体,可以收消息,也可以发消息。源码如下。
1. private[spark] class CoarseGrainedExecutorBackend( 2. override val rpcEnv: RpcEnv, 3. driverUrl: String, 4. executorId: String, 5. hostname: String, 6. cores: Int, 7. userClassPath: Seq[URL], 8. env: SparkEnv) 9. extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
CoarseGrainedExecutorBackend发消息给Driver。Driver在StandaloneSchedulerBackend里面(Spark 2.0中已将SparkDeploySchedulerBackend更名为StandaloneSchedulerBackend)。StandaloneSchedulerBackend继承自CoarseGrainedSchedulerBackend,start启动时启动StandaloneAppClient。StandaloneAppClient(Spark 2.0中已将AppClient更名为StandaloneApp-Client)代表应用程序本身。
StandaloneAppClient的源码如下。
1. private[spark] class StandaloneAppClient( 2. rpcEnv: RpcEnv, 3. masterUrls: Array[String], 4. appDescription: ApplicationDescription, 5. listener: StandaloneAppClientListener, 6. conf: SparkConf) 7. extends Logging { 8. ...... 9. private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint 10. with Logging { 11. ......
在Driver进程中有两个至关重要的Endpoint。
ClientEndpoint:主要负责向Master注册当前的程序,是AppClient的内部成员。
DriverEndpoint:这是整个程序运行时的驱动器,是CoarseGrainedExecutorBackend的内部成员。
CoarseGrainedSchedulerBackend的DriverEndpoint的源码如下。
1. class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) 2. extends ThreadSafeRpcEndpoint with Logging {
DriverEndpoint会接收到RegisterExecutor消息,并完成在Driver上的注册。
RegisterExecutor中有一个数据结构executorDataMap,是Key-Value的方式。
1. private val executorDataMap = new HashMap[String, ExecutorData]
ExecutorData中的executorEndpoint是RpcEndpointRef。ExecutorData的源码如下。
1. private[cluster] class ExecutorData( 2. val executorEndpoint: RpcEndpointRef, 3. val executorAddress: RpcAddress, 4. override val executorHost: String, 5. var freeCores: Int, 6. override val totalCores: Int, 7. override val logUrlMap: Map[String, String] 8. ) extends ExecutorInfo(executorHost, totalCores, logUrlMap)
CoarseGrainedExecutorBackend.scala的RegisteredExecutor的源码如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. case RegisteredExecutor => 3. logInfo("Successfully registered with driver") 4. try { 5. executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) 6. } catch { 7. case NonFatal(e) => 8. exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) 9.
CoarseGrainedExecutorBackend收到RegisteredExecutor消息以后,用new()函数创建一个Executor,而Executor就是一个普通的类。
Spark 2.1.1版本的Executor.scala的源码如下。
1. private[spark] class Executor( 2. executorId: String, 3. executorHostname: String, 4. env: SparkEnv, 5. userClassPath: Seq[URL] = Nil, 6. isLocal: Boolean = false) 7. extends Logging {
Spark 2.2.0版本的Executor.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第6行之后Executor新增了UncaughtExceptionHandler成员变量,用于未捕获的异常。
1. ...... 2. uncaughtExceptionHandler: UncaughtExceptionHandler = SparkUncaughtExceptionHandler) 3. ......
回到ExecutorData.scala,其中的RpcEndpointRef是代理句柄,代理CoarseGrainedExecutorBackend。在Driver中,通过ExecutorData封装并注册ExecutorBackend的信息到Driver的内存数据结构executorMapData中。
1. private[cluster] class ExecutorData( 2. val executorEndpoint: RpcEndpointRef, 3. val executorAddress: RpcAddress, 4. override val executorHost: String, 5. var freeCores: Int, 6. override val totalCores: Int, 7. override val logUrlMap: Map[String, String] 8. ) extends ExecutorInfo(executorHost, totalCores, logUrlMap)
Executor注册消息提交给DriverEndpoint,通过DriverEndpoint写数据给CoarseGrainedSchedulerBackend里面的数据结构executorMapData。executorMapData是 CoarseGrainedSchedulerBackend的成员,因此最终注册给CoarseGrainedSchedulerBackend。CoarseGrainedSchedulerBackend获得Executor(其实是ExecutorBackend)的注册信息。
实际在执行的时候,DriverEndpoint会把信息写入CoarseGrainedSchedulerBackend的内存数据结构executorMapData中,所以最终是注册给了CoarseGrainedSchedulerBackend。也就是说,CoarseGrainedSchedulerBackend掌握了为当前程序分配的所有的ExecutorBackend进程,而在每个ExecutorBackend进行实例中,会通过Executor对象负责具体任务的运行。在运行的时候使用synchronized关键字来保证executorMapData安全地并发写操作。
CoarseGrainedSchedulerBackend.scala的receiveAndReply方法中RegisterExecutor注册的过程,源码如下。
Spark 2.1.1版本的CoarseGrainedSchedulerBackend.scala的receiveAndReply方法的源码如下。
1. override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { 2. 3. case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) => 4. //检查executorDataMap中是否包含该executorId,如果包含,就返回 //RegisterExecutorFailed消息 5. if (executorDataMap.contains(executorId)) { 6. executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) 7. context.reply(true) 8. } else { 9. //若executorRef.address地址不为null,则取出executorRef的地址作为 //executorAddress,否则使用sender的Address作为executorAddress 10. val executorAddress = if (executorRef.address != null) { 11. executorRef.address 12. } else { 13. context.senderAddress 14. } 15. logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId") 16. //在addressToExecutorId这个哈希表中加入executorAddress和executorId的对 //应关系 17. addressToExecutorId(executorAddress) = executorId 18. //totalCore增加cores个 19. totalCoreCount.addAndGet(cores) 20. totalRegisteredExecutors.addAndGet(1) 21. //创建ExecutorData对象 22. val data = new ExecutorData(executorRef, executorRef.address, hostname, 23. cores, cores, logUrls) 24. //同步代码块 25. CoarseGrainedSchedulerBackend.this.synchronized { 26. //在executorDataMap中加入executorId和ExecutorData的对应关系 27. executorDataMap.put(executorId, data) 28. if (currentExecutorIdCounter < executorId.toInt) { 29. currentExecutorIdCounter = executorId.toInt 30. } 31. //如果挂起的Executors的数量大于0 32. if (numPendingExecutors > 0) { 33. numPendingExecutors -= 1 34. logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") 35. } 36. } 37. executorRef.send(RegisteredExecutor) 38. //注:有些测试期望将executor放在map中进行reply 39. //向CoarseGrainedExecutorBackend回复true 40. context.reply(true) 41. listenerBus.post( 42. SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) 43. //调用makeOffers,给Executor发送执行任务 44. makeOffers() 45. }
Spark 2.2.0版本CoarseGrainedSchedulerBackend.scala的receiveAndReply方法的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第8行之前,新增If语句对黑名单节点进行判断。
1. ...... 2. } else if (scheduler.nodeBlacklist != null && 3. scheduler.nodeBlacklist.contains(hostname)) { 4. //如果集群管理器分配给我们一个Executor,而这个Executor在黑名单节点列 //表中(因为通知它是黑名单节点之前,集群已经开始分配这些资源了),如果集群 //忽略了我们的黑名单,那么我们立即拒绝Executor 5. logInfo(s"Rejecting $executorId as it has been blacklisted.") 6. executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId")) 7. context.reply(true) 8. .......
CoarseGrainedSchedulerBackend.scala中的RegisterExecutor:
先判断executorDataMap是否已经包含executorId,如果已经包含,就会发送注册失败的消息RegisterExecutorFailed,因为已经有重复的executor ID的Executor在运行。
然后进行Executor的注册,获取到executorAddress,在executorRef.address为空的情况下就获取到senderAddress。
定义了3个数据结构:addressToExecutorId、totalCoreCount、totalRegisteredExecutors,其中,addressToExecutorId是DriverEndpoint的数据结构,而totalCoreCount、totalRegisteredExecutors是CoarseGrainedSchedulerBackend的数据结构。addressToExecutorId、totalCoreCount、totalRegisteredExecutors包含Executors注册的信息分别为:RPC地址主机名和端口与ExecutorId的对应关系、集群中的总核数Cores、当前注册的Executors总数等。
1. protected val addressToExecutorId = new HashMap[RpcAddress, String] 2. protected val totalCoreCount = new AtomicInteger(0) 3. protected val totalRegisteredExecutors = new AtomicInteger(0)
然后调用new()函数创建一个ExecutorData,提取出executorRef、executorRef.address、hostname、cores、cores、logUrls等信息。
同步代码块CoarseGrainedSchedulerBackend.this.synchronized :集群中很多Executor向Driver注册,为防止写冲突,因此设计一个同步代码块。在运行时使用synchronized关键字,来保证executorMapData安全地并发写操作。
executorRef.send(RegisteredExecutor)发消息RegisteredExecutor给我们的sender,sender是CoarseGrainedExecutorBackend。而CoarseGrainedExecutorBackend收到消息RegisteredExecutor以后,就调用new()函数创建了Executor。
CoarseGrainedExecutorBackend收到DriverEndpoint发送过来的RegisteredExecutor消息后会启动Executor实例对象,而Executor实例对象事实上是负责真正Task计算的。
1. override def receive: PartialFunction[Any, Unit] = { 2. case RegisteredExecutor => 3. logInfo("Successfully registered with driver") 4. try { 5. executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) 6. } catch { 7. case NonFatal(e) => 8. exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) 9. }
下面来看一下Executor.scala,其中的threadPool是一个线程池。
Executor是真正负责Task计算的;其在实例化的时候会实例化一个线程池threadPool来准备Task的计算。threadPool是一个newDaemonCachedThreadPool。newDaemonCached-ThreadPool创建线程池,线程工厂按照需要的格式调用new()函数创建线程。语法实现如下。
1. def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = { 2. val threadFactory = namedThreadFactory(prefix) 3. Executors.newCachedThreadPool(threadFactory).asInstanceOf [ThreadPoolExecutor] 4. }
namedThreadFactory的源码如下。
1. def namedThreadFactory(prefix: String): ThreadFactory = { 2. new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + "-%d").build() 3. }
newCachedThreadPool创建一个线程池,根据需要创建新线程,线程池中的线程可以复用,使用提供的ThreadFactory创建新线程。newCachedThreadPool的源码如下。
1. public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { 2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); 3. }
创建的threadPool中以多线程并发执行和线程复用的方式来高效地执行Spark发过来的Task。线程池创建好后,接下来是等待Driver发送任务给CoarseGrainedExecutorBackend,不是直接发送给Executor,因为Executor不是一个消息循环体。
Executor具体是如何工作的?
当Driver发送过来Task的时候,其实是发送给了CoarseGrainedExecutorBackend这个RpcEndpoint,而不是直接发送给了Executor(Executor由于不是消息循环体,所以永远也无法直接接收远程发过来的信息)。
Driver向CoarseGrainedExecutorBackend发送LaunchTask,转过来交给线程池中的线程去执行。先判断Executor是否为空,Executor为空,则提示错误,进程就直接退出。如果Executor不为空,则反序列化任务调用Executor的launchTask,其中,attemptNumber是任务可以重试的次数。
ExecutorBackend收到Driver发送的消息,调用launchTask方法,提交给Executor执行。
Executor.scala的launchTask接收到Task执行的命令后,首先将Task封装在TaskRunner里面,然后放到runningTasks。runningTasks是一个简单的数据结构。
1. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
launchTask最后交给threadPool.execute(tr),交给线程池中的线程执行任务。 TaskRunner继承自Runnable,是Java的一个对象。
TaskRunner其实是Java中Runnable接口的具体实现,在真正工作时会交给线程池中的线程去运行,此时会调用run方法来执行Task。
Executor.scala中的Run方法最终调用task.run方法。
Spark 2.1.1版本的Executor.scala的oun方法的源码如下。
1. override def run(): Unit = { 2. ...... 3. var threwException = true 4. val value = try { 5. val res = task.run( 6. taskAttemptId = taskId, 7. attemptNumber = attemptNumber, 8. metricsSystem = env.metricsSystem) 9. threwException = false 10. res 11. } finally { 12. val releasedLocks = env.blockManager.releaseAllLocksForTask (taskId) 13. 14. ......
Spark 2.2.0版本的Executor.scala的run方法的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第7行task.run方法的第二个参数由attemptNumber调整为taskDescription. attemptNumber。
1. ...... 2. attemptNumber = taskDescription.attemptNumber, 3. ......
跟进Task.scala中的run方法,在里面调用runTask。
1. final def run( 2. taskAttemptId: Long, 3. attemptNumber: Int, 4. metricsSystem: MetricsSystem): T = { 5. ...... 6. try { 7. runTask(context) 8. } catch { 9. ......
TaskRunner在调用run方法时会调用Task的run方法,而Task的run方法会调用runTask,实际上,Task有ShuffleMapTask和ResultTask。