8.3 Task全生命周期详解
本节讲解Task的生命过程,对Task在Driver和Executor中交互的全生命周期原理和源码进行详解。
8.3.1 Task的生命过程详解
Task的生命过程详解如下。
(1)当Driver中的CoarseGrainedSchedulerBackend给CoarseGrainedExecutorBackend发送LaunchTask之后,CoarseGrainedExecutorBackend收到LaunchTask消息后,首先会反序列化TaskDescription。
(2)Executor会通过launchTask执行Task,在launchTask方法中调用new()函数创建TaskRunner,TaskRunner继承自Runnable接口。
(3)TaskRunner在ThreadPool运行具体的Task,在TaskRunner的run方法中首先会通过调用statusUpdate给Driver发信息汇报自己的状态,说明自己是Running状态。其中execBackend是ExecutorBackend,ExecutorBackend是一个trait,其具体的实现子类是CoarseGrainedExecutorBackend,其中的statusUpdate方法中将向Driver提交StatusUpdate消息。
(4)TaskRunner内部会做一些准备工作:例如,反序列化Task的依赖,然后通过网络获取需要的文件、Jar等。
(5)然后是反序列Task本身。
(6)调用反序列化后的Task.run方法来执行任务,并获得执行结果。其中Task的run方法调用时会导致Task的抽象方法runTask的调用,在Task的runTask内部会调用RDD的iterator方法,该方法就是我们针对当前Task所对应的Partition进行计算的关键所在,在处理的内部会迭代Partition的元素,并交给我们自定义的function进行处理!
对于ShuffleMapTask,首先要对RDD以及其依赖关系进行反序列化,最终计算会调用RDD的compute方法。具体计算时有具体的RDD,例如,MapPartitionsRDD的compute。compute方法其中的f就是我们在当前的Stage中计算具体Partition的业务逻辑代码。
对于ResultTask:调用rdd.iterator方法,最终计算仍然会调用RDD的compute方法。
(7)把执行结果序列化,并根据大小判断不同的结果传回给Driver。
(8)CoarseGrainedExecutorBackend给DriverEndpoint发送StatusUpdate来传输执行结果。DriverEndpoint会把执行结果传递给TaskSchedulerImpl处理,然后交给TaskResultGetter内部通过线程去分别处理Task执行成功和失败时的不同情况,最后告诉DAGScheduler任务处理结束的状况。
说明:
①在执行具体Task的业务逻辑前,会进行四次反序列:
a)TaskDescription的反序列化。
b)反序列化Task的依赖。
c)Task的反序列化。
d)RDD反序列化。
②在Spark 1.6中,AkkFrameSize是128MB,所以可以广播非常大的任务;而任务的执行结果最大可以达到1GB。Spark 2.2版本中,CoarseGrainedSchedulerBackend的launchTask方法中序列化任务大小的限制是maxRpcMessageSize为128MB。
8.3.2 Task在Driver和Executor中交互的全生命周期原理和源码详解
在Standalone模式中,Driver中的CoarseGrainedSchedulerBackend给CoarseGrained-ExecutorBackend发送launchTasks消息,CoarseGrainedExecutorBackend收到launchTasks消息以后会调用executor.launchTask。
CoarseGrainedExecutorBackend的receive方法如下,模式匹配收到LaunchTask消息:
(1)LaunchTask判断Executor是否存在,如果Executor不存在,则直接退出,然后会反序列化TaskDescription。
Spark 2.1.1版本的CoarseGrainedExecutorBackend的receive方法的源码如下。
1. val taskDesc = ser.deserialize[TaskDescription](data.value)
Spark 2.2.0版本的CoarseGrainedExecutorBackend的receive方法的源码如下。
1. val taskDesc = TaskDescription.decode(data.value)
(2)Executor会通过launchTask来执行Task,launchTask方法中分别传入taskId、尝试次数、任务名称、序列化后的任务本身。
Spark 2.1.1版本的CoarseGrainedExecutorBackend的receive方法的源码如下。
1. executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask)
Spark 2.2.0版本的CoarseGrainedExecutorBackend的receive方法的源码如下。
1. executor.launchTask(this, taskDesc)
进入Executor.scala的launchTask方法,在launchTask方法中调用new()函数创建一个TaskRunner,传入的参数包括taskId、尝试次数、任务名称、序列化后的任务本身。然后放入runningTasks数据结构,在threadPool中执行TaskRunner。
TaskRunner本身是一个Runnable接口。
下面看一下TaskRunner的run方法。TaskMemoryManager是内存的管理,deserialize-StartTime是反序列化开始的时间,setContextClassLoader是ClassLoader加载具体的类。ser是序列化器。
然后调用execBackend.statusUpdate,statusUpdate是ExecutorBackend的方法,Executor-Backend通过statusUpdate给Driver发信息,汇报自己的状态。
1. private[spark] trait ExecutorBackend { 2. def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit 3. }
其中,execBackend是ExecutorBackend,ExecutorBackend是一个trait,其具体的实现子类是CoarseGrainedExecutorBackend。execBackend实例是在CoarseGrainedExecutorBackend的receive方法收到LaunchTask消息,调用executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask)时将CoarseGrainedExecutorBackend自己本身的this实例传进来的。这里调用CoarseGrained-ExecutorBackend的statusUpdate方法。statusUpdate方法将向Driver提交StatusUpdate消息。
CoarseGrainedExecutorBackend的statusUpdate的源码如下。
1. override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { 2. val msg = StatusUpdate(executorId, taskId, state, data) 3. driver match { 4. case Some(driverRef) => driverRef.send(msg) 5. case None => logWarning(s"Drop $msg because has not yet connected to driver") 6. } 7. }
(3)TaskRunner的run方法中,TaskRunner在ThreadPool中运行具体的Task,在TaskRunner的run方法中首先会通过调用statusUpdate给Driver发信息汇报自己的状态,说明自己是Running状态。
1. execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
其中,EMPTY_BYTE_BUFFER没有具体内容。
1. private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
接下来通过Task.deserializeWithDependencies(serializedTask)反序列化Task,得到一个Tuple,获取到taskFiles、taskJars、taskProps、taskBytes等信息。
(4)Executor会通过TaskRunner在ThreadPool中运行具体的Task,TaskRunner内部会做一些准备工作:反序列化Task的依赖。
Spark 2.1.1版本的Executor.scala的源码如下。
1. val (taskFiles, taskJars, taskProps, taskBytes) = 2. Task.deserializeWithDependencies(serializedTask)
Spark 2.2.0版本的Executor.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第1~2行Properties、addedFiles、addedJars、serializedTask等信息调整为从taskDescription中获取。
1. ....... 2. Executor.taskDeserializationProps.set(taskDescription.properties) 3. updateDependencies(taskDescription.addedFiles, taskDescription.addedJars) 4. task = ser.deserialize[Task[Any]]( 5. taskDescription.serializedTask, Thread.currentThread. getContextClassLoader) 6. task.localProperties = taskDescription.properties 7. task.setTaskMemoryManager(taskMemoryManager) 8. ........
然后通过网络来获取需要的文件、Jar等。
Spark 2.1.1版本的Executor.scala的源码如下。
1. updateDependencies(taskFiles, taskJars)
Spark 2.2.0版本的Executor.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中taskFiles、taskJars等信息调整为从taskDescription.addedFiles,taskDescription. addedJars中获取。
1. updateDependencies(taskDescription.addedFiles, taskDescription.addedJars)
再来看一下updateDependencies方法。从SparkContext收到一组新的文件JARs,下载Task运行需要的依赖Jars,在类加载机中加载新的JARs包。updateDependencies方法的源码如下。
Spark 2.1.1版本的Executor.scala的源码如下。
1. private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) { 2. Lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) 3. synchronized { 4. //获取将要计算的依赖关系 5. for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { 6. logInfo("Fetching " + name + " with timestamp " + timestamp) 7. //使用useCache获取文件,本地模式关闭缓存 8. Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf, 9. env.securityManager, hadoopConf, timestamp, useCache = !isLocal) 10. currentFiles(name) = timestamp 11. } 12. for ((name, timestamp) <- newJars) { 13. val localName = name.split("/").last 14. val currentTimeStamp = currentJars.get(name) 15. .orElse(currentJars.get(localName)) 16. .getOrElse(-1L) 17. if (currentTimeStamp < timestamp) { 18. logInfo("Fetching " + name + " with timestamp " + timestamp) 19. //使用useCache获取文件,本地模式关闭缓存 20. Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf, 21. env.securityManager, hadoopConf, timestamp, useCache = !isLocal) 22. currentJars(name) = timestamp 23. //将它增加到类装入器中 24. val url = new File(SparkFiles.getRootDirectory(), localName). toURI.toURL 25. if (!urlClassLoader.getURLs().contains(url)) { 26. logInfo("Adding " + url + " to class loader") 27. urlClassLoader.addURL(url) 28. } 29. } 30. } 31. } 32. }
Spark 2.2.0版本的Executor.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第1行newFiles、newJars的数据类型由HashMap[String, Long]调整为Map[String, Long]。
1. private def updateDependencies(newFiles: Map[String, Long], newJars: Map[String, Long]) {.......
Executor的updateDependencies方法中,Executor运行具体任务时进行下载,下载文件使用synchronized关键字,因为Executor在线程中运行,同一个Stage内部不同的任务线程要共享这些内容,因此ExecutorBackend多条线程资源操作的时候,需要通过同步块加锁。
updateDependencies方法的Utils.fetchFile将文件或目录下载到目标目录,支持各种方式获取文件,包括HTTP,Hadoop兼容的文件系统、标准文件系统的文件,基于URL参数。获取目录只支持从Hadoop兼容的文件系统。如果usecache设置为true,第一次尝试取文件到本地缓存,执行同一应用程序进行共享。usecache主要用于executors,而不是本地模式。如果目标文件已经存在,并有不同于请求文件的内容,将抛出SparkException异常。
1. def fetchFile( 2. url: String, 3. targetDir: File, 4. conf: SparkConf, 5. securityMgr: SecurityManager, 6. hadoopConf: Configuration, 7. timestamp: Long, 8. useCache: Boolean) { 9. ...... 10. doFetchFile(url, localDir, cachedFileName, conf, securityMgr, hadoopConf) 11. .......
doFetchFile方法如下,包括spark、http | https | ftp、file各种协议方式的下载。
1. private def doFetchFile( 2. url: String, 3. targetDir: File, 4. filename: String, 5. conf: SparkConf, 6. securityMgr: SecurityManager, 7. hadoopConf: Configuration) { 8. val targetFile = new File(targetDir, filename) 9. val uri = new URI(url) 10. val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false) 11. Option(uri.getScheme).getOrElse("file") match { 12. case "spark" => 13. ...... 14. downloadFile(url, is, targetFile, fileOverwrite) 15. case "http" | "https" | "ftp" => 16. ...... 17. downloadFile(url, in, targetFile, fileOverwrite) 18. case "file" => 19. ...... 20. copyFile(url, sourceFile, targetFile, fileOverwrite) 21. case _ => 22. val fs = getHadoopFileSystem(uri, hadoopConf) 23. val path = new Path(uri) 24. fetchHcfsFile(path, targetDir, fs, conf, hadoopConf, fileOverwrite, 25. filename = Some(filename)) 26. } 27. }
(5)回到TaskRunner的run方法,所有依赖的Jar都下载完成后,然后是反序列Task本身。
Spark 2.1.1版本的Executor.scala的源码如下。
1. task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread. getContextClassLoader)
Spark 2.2.0版本的Executor.scala的源码与Spark 2.1.1版本相比具有如下特点:
1. task = ser.deserialize[Task[Any]]( 2. taskDescription.serializedTask, Thread.currentThread. getContextClassLoader)
在执行具体Task的业务逻辑前会进行四次反序列。
(a)TaskDescription的反序列化。
(b)反序列化Task的依赖。
(c)Task的反序列化。
(d)RDD反序列化。
(6)回到TaskRunner的run方法,调用反序列化后的Task.run方法来执行任务并获得执行结果。
其中,Task的run方法调用时会导致Task的抽象方法runTask的调用,在Task的runTask内部会调用RDD的iterator方法,该方法就是针对当前Task所对应的Partition进行计算的关键所在,在处理的内部会迭代Partition的元素并交给自定义的function进行处理。
进入task.run方法,在run方法里面再调用runTask方法。
1. final def run( 2. taskAttemptId: Long, 3. attemptNumber: Int, 4. metricsSystem: MetricsSystem): T = { 5. SparkEnv.get.blockManager.registerTask(taskAttemptId) 6. context = new TaskContextImpl( 7. ...... 8. TaskContext.setTaskContext(context) 9. ...... 10. try { 11. runTask(context) 12. ......
进入Task.scala的runTask方法,这里是一个抽象方法,没有具体的实现。
1. def runTask(context: TaskContext): T
Task包括两种Task:ResultTask和ShuffleMapTask。抽象runTask方法由子类的runTask实现。先看一下ShuffleMapTask的runTask方法,runTask实际运行的时候会调用RDD的iterator,然后针对partition进行计算。
1. override def runTask(context: TaskContext): MapStatus = { 2. ...... 3. val ser = SparkEnv.get.closureSerializer.newInstance() 4. val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( 5. ...... 6. val manager = SparkEnv.get.shuffleManager 7. writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) 8. writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator [_ <: Product2[Any, Any]]]) 9. writer.stop(success = true).get 10. ......
ShuffleMapTask在计算具体的Partition之后实际上会通过shuffleManager获得的shuffleWriter把当前Task计算内容根据具体的shuffleManager实现写入到具体的文件中。操作完成以后会把MapStatus发送给DAGscheduler,Driver的DAGScheduler的MapOutputTracker会收到注册的信息。
同样地,ResultTask的runTask方法也是调用RDD的iterator,然后针对partition进行计算。MapOutputTracker会把ShuffleMapTask执行结果交给ResultTask,ResultTask根据前面Stage的执行结果进行Shuffle,产生整个Job最后的结果。
1. override def runTask(context: TaskContext): U = { 2. ...... 3. val ser = SparkEnv.get.closureSerializer.newInstance() 4. val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( 5. ...... 6. func(context, rdd.iterator(partition, context)) 7. }
ResultTask、ShuffleMapTask的runTask方法真正执行的时候,调用RDD的iterator,对Partition进行计算。RDD.scala的iterator方法的源码如下。
1. override def runTask(context: TaskContext): U = { 2. ...... 3. val ser = SparkEnv.get.closureSerializer.newInstance() 4. val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( 5. ...... 6. func(context, rdd.iterator(partition, context)) 7. }
RDD.scala的iterator方法中,如果storageLevel不等于NONE,就直接获取或者计算得到RDD的分区;如果storageLevel是空,就从checkpoint中读取或者计算RDD分区。
进入computeOrReadCheckpoint:
1. private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = 2. { 3. if (isCheckpointedAndMaterialized) { 4. firstParent[T].iterator(split, context) 5. } else { 6. compute(split, context) 7. } 8. }
最终计算会调用RDD的compute方法。
1. def compute(split: Partition, context: TaskContext): Iterator[T]
RDD的compute方法中的Partition是一个trait。
1. trait Partition extends Serializable { 2. def index: Int 3. override def hashCode(): Int = index 4. override def equals(other: Any): Boolean = super.equals(other) 5. }
RDD的compute方法中的TaskContext里面有很多方法,包括任务是否完成、任务是否中断、任务是否在本地运行、任务运行完成时的监听器、任务运行失败的监听器、stageId、partitionId、重试的次数等。
1. abstract class TaskContext extends Serializable { 2. def isCompleted(): Boolean 3. def isInterrupted(): Boolean 4. def isRunningLocally(): Boolean 5. def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext 6. def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext 7. def addTaskFailureListener(listener: TaskFailureListener): TaskContext 8. def addTaskFailureListener(f: (TaskContext, Throwable) => Unit): TaskContext 9. def stageId() 10. def partitionId(): Int 11. def attemptNumber(): Int 12. ......
下面看一下TaskContext具体的实现TaskContextImpl。TaskContextImpl维持了很多上下文信息,如stageId、partitionId、taskAttemptId、重试次数、taskMemoryManager等。
1. private[spark] class TaskContextImpl( 2. val stageId: Int, 3. val partitionId: Int, 4. override val taskAttemptId: Long, 5. override val attemptNumber: Int, 6. override val taskMemoryManager: TaskMemoryManager, 7. localProperties: Properties, 8. @transient private val metricsSystem: MetricsSystem, 9. //默认值仅用于测试 10. override val taskMetrics: TaskMetrics = TaskMetrics.empty) 11. extends TaskContext 12. with Logging { 13. ......
RDD的compute方法具体计算的时候有具体的RDD,如MapPartitionsRDD的compute、传进去的Partition及TaskContext上下文。
MapPartitionsRDD.scala的compute的源码如下。
1. override def compute(split: Partition, context: TaskContext): Iterator[U] = 2. f(context, split.index, firstParent[T].iterator(split, context))
MapPartitionsRDD.scala的compute中的f就是我们在当前的Stage中计算具体Partition的业务逻辑代码。f是函数,是我们自己写的业务逻辑。Stage从后往前推,把所有的RDD合并变成一个,函数也会变成一个链条,展开成一个很大的函数。Compute返回的是一个Iterator。
Task包括两种Task:ResultTask和ShuffleMapTask。
先看一下ShuffleMapTask的runTask方法,从ShuffleMapTask的角度讲,rdd.iterator获得数据记录以后,对rdd.iterator计算后的Iterator记录进行write。
1. val manager = SparkEnv.get.shuffleManager 2. writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) 3. writer.write(rdd.iterator(partition, context).asInstanceOf [Iterator[_ <: Product2[Any, Any]]]) 4. writer.stop(success = true).get
ResultTask.scala的runTask方法较简单:在ResultTask中,rdd.iterator获得数据记录以后,直接调用func函数。func函数是Task任务反序列化后直接获得的fun函数。
1. val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( 2. ByteBuffer.wrap(taskBinary.value), Thread.currentThread. getContextClassLoader) 3. ...... 4. func(context, rdd.iterator(partition, context))
(7)回到TaskRunner的run方法,把执行结果序列化,并根据大小判断不同的结果传回给Driver。
task.run运行的结果赋值给value。
resultSer.serialize(value)把task.run的执行结果value序列化。
maxResultSize > 0 && resultSize > maxResultSize对任务执行结果的大小进行判断,并进行相应的处理。任务执行完以后,任务的执行结果最大可以达到1GB。
如果任务执行结果特别大,超过1GB,日志就会提示超出任务大小限制。返回元数据ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))。
如果任务执行结果小于1GB,大于maxDirectResultSize(128MB),就放入blockManager,返回元数据ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))。
如果任务执行结果小于128MB,就直接返回serializedDirectResult。
TaskRunner的run方法如下。
Spark 2.1.1版本的Executor.scala的源码如下。
1. override def run(): Unit = { 2. ...... 3. val value = try { 4. val res = task.run( 5. taskAttemptId = taskId, 6. attemptNumber = attemptNumber, 7. metricsSystem = env.metricsSystem) 8. threwException = false 9. Res 10. ...... 11. val valueBytes = resultSer.serialize(value) 12. ...... 13. val directResult = new DirectTaskResult(valueBytes, accumUpdates) 14. val serializedDirectResult = ser.serialize(directResult) 15. val resultSize = serializedDirectResult.limit 16. ...... 17. 18. val serializedResult: ByteBuffer = { 19. if (maxResultSize > 0 && resultSize > maxResultSize) { 20. ....... 21. ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId (taskId), resultSize)) 22. } else if (resultSize > maxDirectResultSize) { 23. val blockId = TaskResultBlockId(taskId) 24. env.blockManager.putBytes( 25. blockId, 26. new ChunkedByteBuffer(serializedDirectResult.duplicate()), 27. StorageLevel.MEMORY_AND_DISK_SER) 28. ...... 29. ser.serialize(new IndirectTaskResult[Any](blockId, resultSize)) 30. } else { 31. ...... 32. serializedDirectResult 33. } 34. }
Spark 2.2.0版本的Executor.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第6行attemptNumber调整为taskDescription.attemptNumber。
1. ...... 2. attemptNumber = taskDescription.attemptNumber, 3. .......
其中的maxResultSize大小是1GB,任务的执行结果最大可以达到1GB。
1. Executor.scala 2. //对结果的总大小限制的字节数(默认为1GB) 3. private val maxResultSize = Utils.getMaxResultSize(conf) 4. ....... 5. Utils.scala 6. //对结果的总大小限制的字节数(默认为1GB) 7. def getMaxResultSize(conf: SparkConf): Long = { 8. memoryStringToMb(conf.get("spark.driver.maxResultSize", "1g")).toLong << 20 9. }
其中的Executor.scala中的maxDirectResultSize大小,取spark.task.maxDirectResultSize和RpcUtils.maxMessageSizeBytes的最小值。其中spark.rpc.message.maxSize默认配置是128MB。spark.task.maxDirectResultSize在配置文件中进行配置。
1. private val maxDirectResultSize = Math.min( 2. conf.getSizeAsBytes("spark.task.maxDirectResultSize", 1L << 20), 3. RpcUtils.maxMessageSizeBytes(conf)) 4. ...... 5. def maxMessageSizeBytes(conf: SparkConf): Int = { 6. val maxSizeInMB = conf.getInt("spark.rpc.message.maxSize", 128) 7. if (maxSizeInMB > MAX_MESSAGE_SIZE_IN_MB) { 8. throw new IllegalArgumentException( 9. s"spark.rpc.message.maxSize should not be greater than $MAX_ MESSAGE_SIZE_IN_MB MB") 10. } 11. maxSizeInMB * 1024 * 1024 12. }
补充说明:Driver发消息给Executor,Spark 1.6版本中CoarseGrainedSchedulerBackend的launchTask方法中序列化任务大小的限制是akkaFrameSize-AkkaUtils.reservedSizeBytes。其中,akkaFrameSize是128MB,reservedSizeBytes是200B。
Spark 1.6.0版本的CoarseGrainedSchedulerBackend.scala的源码如下。
1. private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { 2. ...... 3. if (serializedTask.limit >= akkaFrameSize - AkkaUtils. reservedSizeBytes) { 4. ...... 5. private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) 6. ....... 7. def maxFrameSizeBytes(conf: SparkConf): Int = { 8. val frameSizeInMB = conf.getInt("spark.akka.frameSize", 128) 9. if (frameSizeInMB > AKKA_MAX_FRAME_SIZE_IN_MB) { 10. throw new IllegalArgumentException( 11. s"spark.akka.frameSize should not be greater than $AKKA_MAX_FRAME_ SIZE_IN_MB MB") 12. } 13. frameSizeInMB * 1024 * 1024 14. } 15. ....... 16. val reservedSizeBytes = 200 * 1024 17. ......
Spark 2.2.0版本的CoarseGrainedSchedulerBackend.scala的源码与Spark 1.6.0版本相比具有如下特点。
上段代码中第3行Driver发消息给Executor,发送任务的序列化大小的限制serializedTask.limit从akkaFrameSize - AkkaUtils.reservedSizeBytes调整为maxRpc-MessageSize。
上段代码中第5行AkkaUtils.maxFrameSizeBytes(conf)调整为RpcUtils.maxMessage-SizeBytes(conf)。
上段代码中第7~14行maxFrameSizeBytes函数整体替换为以下代码。Spark 2.2.0版本中,CoarseGrainedSchedulerBackend的launchTasks方法中序列化任务大小的限制是maxRpcMessageSize为128MB。
1. ...... 2. if (serializedTask.limit >= maxRpcMessageSize) { 3. ...... 4. 5. private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) 6. 7. def maxMessageSizeBytes(conf: SparkConf): Int = { 8. val maxSizeInMB = conf.getInt("spark.rpc.message.maxSize", 128) 9. if (maxSizeInMB > MAX_MESSAGE_SIZE_IN_MB) { 10. throw new IllegalArgumentException( 11. s"spark.rpc.message.maxSize should not be greater than $MAX_MESSAGE_SIZE_IN_MB MB") 12. } 13. maxSizeInMB * 1024 * 1024 14. } 15. }
回到TaskRunner的run方法,execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)给Driver发送一个消息,消息中将taskId、TaskState.FINISHED、serializedResult放进去。
statusUpdate方法的源码如下。
1. override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { 2. val msg = StatusUpdate(executorId, taskId, state, data) 3. driver match { 4. case Some(driverRef) => driverRef.send(msg) 5. case None => logWarning(s"Drop $msg because has not yet connected to driver") 6. } 7. }
(8)CoarseGrainedExecutorBackend给DriverEndpoint发送StatusUpdate来传输执行结果,DriverEndpoint会把执行结果传递给TaskSchedulerImpl处理,然后交给TaskResultGetter内部通过线程去分别处理Task执行成功和失败的不同情况,最后告诉DAGScheduler任务处理结束的状况。
CoarseGrainedSchedulerBackend.scala中DriverEndpoint的receive方法如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. case StatusUpdate(executorId, taskId, state, data) => 3. scheduler.statusUpdate(taskId, state, data.value) 4. if (TaskState.isFinished(state)) { 5. executorDataMap.get(executorId) match { 6. case Some(executorInfo) => 7. executorInfo.freeCores += scheduler.CPUS_PER_TASK 8. makeOffers(executorId) 9. case None => 10. //忽略更新,因为我们不知道Executor 11. logWarning(s"Ignored task status update ($taskId state $state)" + 12. s"from unknown executor with ID $executorId") 13. } 14. }
DriverEndpoint的receive方法中,StatusUpdate调用scheduler.statusUpdate,然后释放资源,再次进行资源调度makeOffers(executorId)。
TaskSchedulerImpl的statusUpdate中:
如果是TaskState.LOST,则记录下原因,将Executor清理掉。
如果是TaskState.isFinished,则从taskSet中运行的任务中remove掉任务,调用taskResultGetter.enqueueSuccessfulTask处理。
如果是TaskState.FAILED、TaskState.KILLED、TaskState.LOST,则调用taskResultGetter. enqueueFailedTask处理。
TaskSchedulerImpl的statusUpdate的源码如下。
1. def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { 2. var failedExecutor: Option[String] = None 3. var reason: Option[ExecutorLossReason] = None 4. synchronized { 5. try { 6. taskIdToTaskSetManager.get(tid) match { 7. case Some(taskSet) => 8. if (state == TaskState.LOST) { 9. //TaskState.LOST只被废弃的Mesos 细粒度的调度模式使用,每个Executor对应单 //个任务,因此将Executor标记为失败 10. val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException( 11. "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId. contains(tid)")) 12. if (executorIdToRunningTaskIds.contains(execId)) { 13. reason = Some( 14. SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) 15. removeExecutor(execId, reason.get) 16. failedExecutor = Some(execId) 17. } 18. } 19. if (TaskState.isFinished(state)) { 20. cleanupTaskState(tid) 21. taskSet.removeRunningTask(tid) 22. if (state == TaskState.FINISHED) { 23. taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) 24. } else if (Set(TaskState.FAILED, TaskState.KILLED, T askState.LOST).contains(state)) { 25. taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) 26. } 27. } 28. case None => 29. logError( 30. ("Ignoring update with state %s for TID %s because its task set is gone (this is " + 31. "likely the result of receiving duplicate task finished status updates) or its " + 32. "executor has been marked as failed.") 33. .format(state, tid)) 34. } 35. } catch { 36. case e: Exception => logError("Exception in statusUpdate", e) 37. } 38. } 39. //更新DAGScheduler时没持有这个锁,所以可能导致死锁 40. if (failedExecutor.isDefined) { 41. assert(reason.isDefined) 42. dagScheduler.executorLost(failedExecutor.get, reason.get) 43. backend.reviveOffers() 44. } 45. }
其中,taskResultGetter是TaskResultGetter的实例化对象。
1. private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
TaskResultGetter.scala的源码如下。
1. private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl) 2. extends Logging { 3. 4. private val THREADS = sparkEnv.conf.getInt("spark.resultGetter. threads", 4) 5. 6. //用于测试 7. protected val getTaskResultExecutor: ExecutorService = 8. ThreadUtils.newDaemonFixedThreadPool(THREADS, "task-result-getter") 9. ....... 10. def enqueueSuccessfulTask( 11. taskSetManager: TaskSetManager, 12. tid: Long, 13. serializedData: ByteBuffer): Unit = { 14. getTaskResultExecutor.execute(new Runnable { 15. override def run(): Unit = Utils.logUncaughtExceptions { 16. try { 17. val (result, size) = serializer.get().deserialize[TaskResult[_]] (serializedData) match { 18. case directResult: DirectTaskResult[_] => 19. if (!taskSetManager.canFetchMoreResults(serializedData. limit())) { 20. return 21. } 22. //反序列化“值”时不持有任何锁,所以不会阻止其他线程。我们在这里调用它,这样在 //TaskSetManager.handleSuccessfulTask中,当它再次被调用时,不需要反序列化值 23. directResult.value(taskResultSerializer.get()) 24. (directResult, serializedData.limit()) 25. case IndirectTaskResult(blockId, size) => 26. if (!taskSetManager.canFetchMoreResults(size)) { 27. //如果大小超过maxResultSize,将被Executor丢弃 28. sparkEnv.blockManager.master.removeBlock(blockId) 29. return 30. } 31. logDebug("Fetching indirect task result for TID %s".format(tid)) 32. scheduler.handleTaskGettingResult(taskSetManager, tid) 33. val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes (blockId) 34. 35. if (!serializedTaskResult.isDefined) { 36. /*如果运行任务的机器失败,我们将无法获得任务结果 当任务结束,我们试图取结果时,块管理器必须刷新结果*/ 37. scheduler.handleFailedTask( 38. taskSetManager, tid, TaskState.FINISHED, TaskResultLost) 39. return 40. } 41. val deserializedResult = serializer.get().deserialize [DirectTaskResult[_]]( 42. serializedTaskResult.get.toByteBuffer) 43. //反序列化获取值 44. deserializedResult.value(taskResultSerializer.get()) 45. sparkEnv.blockManager.master.removeBlock(blockId) 46. (deserializedResult, size) 47. } 48. 49. //从Executors接收的累加器更新中设置任务结果大小,我们需要在Driver上执行此操 //作,因为如果我们在Executors 上执行此操作,那么将结果更新大小后须进行序列化 50. result.accumUpdates = result.accumUpdates.map { a => 51. if (a.name == Some(InternalAccumulator.RESULT_SIZE)) { 52. val acc = a.asInstanceOf[LongAccumulator] 53. assert(acc.sum == 0L, "task result size should not have been set on the executors") 54. acc.setValue(size.toLong) 55. acc 56. } else { 57. a 58. } 59. } 60. 61. scheduler.handleSuccessfulTask(taskSetManager, tid, result) 62. } catch { 63. case cnf: ClassNotFoundException => 64. val loader = Thread.currentThread.getContextClassLoader 65. taskSetManager.abort("ClassNotFound with classloader: " + loader) 66. //匹配NonFatal,所以我们不从上面的return捕获ControlThrowable 异常 67. case NonFatal(ex) => 68. logError("Exception while getting task result", ex) 69. taskSetManager.abort("Exception while getting task result: %s".format(ex)) 70. } 71. } 72. }) 73. }
TaskResultGetter.scala的enqueueSuccessfulTask方法中,处理成功任务的时候开辟了一条新线程,先将结果反序列化,然后根据接收的结果类型DirectTaskResult、IndirectTaskResult分别处理。
如果是DirectTaskResult,则直接获得结果并返回。
如果是IndirectTaskResult,就通过blockManager.getRemoteBytes远程获取。获取以后再进行反序列化。
最后是scheduler.handleSuccessfulTask。
TaskSchedulerImpl的handleSuccessfulTask的源码如下。
1. def handleSuccessfulTask( 2. taskSetManager: TaskSetManager, 3. tid: Long, 4. taskResult: DirectTaskResult[_]): Unit = synchronized { 5. taskSetManager.handleSuccessfulTask(tid, taskResult) 6. }
TaskSchedulerImpl中也有失败任务的相应处理。
Spark 2.1.1版本的TaskSchedulerImpl.scala的源码如下。
1. def handleFailedTask( 2. taskSetManager: TaskSetManager, 3. tid: Long, 4. taskState: TaskState, 5. reason: TaskFailedReason): Unit = synchronized { 6. taskSetManager.handleFailedTask(tid, taskState, reason) 7. if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { 8. //任务集管理状态更新后,需要再次分配资源,失败的任务需要重新运行 9. backend.reviveOffers() 10. } 11. }
Spark 2.2.0版本的TaskSchedulerImpl.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第7行if语句判断条件更新。
1. ...... 2. if (!taskSetManager.isZombie && !taskSetManager.someAttemptSucceeded (tid)) { 3. .......
TaskSchedulerImpl的handleSuccessfulTask交给TaskSetManager调用handleSuccessfulTask,告诉DAGScheduler任务处理结束的状况,并且Kill掉其他尝试的相同任务(因为一个任务已经尝试成功,其他的相同任务没必要再次去尝试)。
Spark 2.1.1版本的TaskSetManager的handleSuccessfulTask的源码如下。
1. def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { 2. val info = taskInfos(tid) 3. val index = info.index 4. info.markFinished(TaskState.FINISHED) 5. removeRunningTask(tid) 6. /**这种方法被 TaskSchedulerImpl.handleSuccessfulTask 调用,其持有 Task *SchedulerImpl锁直至退出。为了避免SPARK-7655的问题,当持有一个锁的时候,我 *们不应该反序列化值,以避免阻塞其他线程。所以,我们在TaskResultGetter.enqueue- *SuccessfulTask中调用result.value()。注意:result.value()只在第一次调用 *时反序列化值,所以在这里result.value()只是返回值,并不会阻止其他线程 7. */ 8. 9. sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info) 10. //杀掉同一任务的任何其他尝试(因为现在不需要这些任务,所以一次尝试成功) 11. for (attemptInfo <- taskAttempts(index) if attemptInfo.running) { 12. logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + 13. s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + 14. s"as the attempt ${info.attemptNumber} succeeded on ${info.host}") 15. sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true) 16. } 17. if (!successful(index)) { 18. tasksSuccessful += 1 19. logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" + 20. s" ${info.duration} ms on ${info.host} (executor ${info. executorId})" + 21. s" ($tasksSuccessful/$numTasks)") 22. //如果所有的任务都成功了,就标记成功并停止 23. successful(index) = true 24. if (tasksSuccessful == numTasks) { 25. isZombie = true 26. } 27. } else { 28. logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + 29. " because task " + index + " has already completed successfully") 30. } 31. maybeFinishTaskSet() 32. }
Spark 2.2.0版本的TaskSetManager的handleSuccessfulTask的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第4行info.markFinished新增第2个参数clock.getTimeMillis()获取时间。
上段代码中第4行之后新增if (speculationEnabled)的处理代码。
上段代码中第9行sched.dagScheduler.taskEnded代码置后,放到maybeFinishTaskSet()方法之前。
上段代码中第15行sched.backend.killTask的第3个参数调整为interruptThread = true,新增第4个参数reason。
1. ...... 2. info.markFinished(TaskState.FINISHED, clock.getTimeMillis()) 3. if (speculationEnabled) { 4. successfulTaskDurations.insert(info.duration) 5. } 6. ...... 7. interruptThread = true, 8. reason = "another attempt succeeded") 9. ...... 10. sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info) 11. ......
speculationEnabled默认设置为spark.speculation=false,用于推测执行慢的任务;如果设置为true,successfulTaskDurations使用MedianHeap记录成功任务的持续时间,这样就可以确定什么时候启动推测性任务,这种情况只在启用推测时使用,以避免不使用堆时增加堆中的开销。
TaskSetManager的handleSuccessfulTask中调用了maybeFinishTaskSet。maybeFinishTaskSet的源码如下。
Spark 2.1.1版本的TaskSetManager.scala的源码如下。
1. private def maybeFinishTaskSet() { 2. if (isZombie && runningTasks == 0) { 3. sched.taskSetFinished(this) 4. } 5. }
Spark 2.2.0版本的TaskSetManager.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第3行之后增加了tasksSuccessful == numTasks的逻辑处理。BlacklistTracker设计跟踪问题的Executors和nodes。blacklistTracker循环遍历更新黑名单列表。
1. ...... 2. if (tasksSuccessful == numTasks) { 3. blacklistTracker.foreach(_.updateBlacklistForSuccessfulTaskSet( 4. taskSet.stageId, 5. taskSet.stageAttemptId, 6. taskSetBlacklistHelperOpt.get.execToFailures)) 7. } 8. }
TaskSetManager:单TaskSet的任务调度在TaskSchedulerImpl中进行。TaskSetManager类跟踪每项任务,如果任务重试失败(超过有限的次数),对于TaskSet处理本地调度主要的接口是resourceOffer,询问TaskSet是否要在一个节点上运行任务,进行状态更新statusUpdate,告诉TaskSet的一个任务的状态发生了改变(如已完成)。线程:这个类被设计成只在具有锁的代码TaskScheduler上调用(如事件处理程序),不应该从其他线程调用。
总结:
Task执行及结果处理原理流程图如图8-3所示。任务从Driver上发送过来,CoarseGrainedSchedulerBackend发送任务,CoarseGrainedExecutorBackend收到任务后,交给Executor处理,Executor会通过launchTask执行Task。TaskRunner内部会做很多准备工作:反序列化Task的依赖,通过网络获取需要的文件、Jar、反序列Task本身等待;然后调用Task的runTask执行,runTask有ShuffleMapTask、ResultTask两种。通过iterator()方法根据业务逻辑循环遍历,如果是ShuffleMapTask,就把MapStatus汇报给MapOutTracker;如果是ResultTask,就从前面的MapOutTracker中获取信息。
图8-3 Task执行及结果处理原理流程图