3.7 Spark RDD中Runtime流程解析
本节讲解Spark的Runtime架构图,并从一个作业的视角通过Driver、Master、Worker、Executor等角色透视Spark的Runtime生命周期。
3.7.1 Runtime架构图
(1)从Spark Runtime的角度讲,包括五大核心对象:Master、Worker、Executor、Driver、CoarseGrainedExecutorBackend。
(2)Spark在做分布式集群系统设计的时候:最大化功能独立、模块化封装具体独立的对象、强内聚松耦合。Spark运行架构图如图3-7所示。
图3-7 Spark运行架构图
(3)当Driver中的SparkContext初始化时会提交程序给Master,Master如果接受该程序在Spark中运行,就会为当前的程序分配AppID,同时会分配具体的计算资源。需要特别注意的是,Master是根据当前提交程序的配置信息来给集群中的Worker发指令分配具体的计算资源,但是,Master发出指令后并不关心具体的资源是否已经分配,换言之,Master是发指令后就记录了分配的资源,以后客户端再次提交其他的程序,就不能使用该资源了。其弊端是可能会导致其他要提交的程序无法分配到本来应该可以分配到的计算资源;最终的优势是Spark分布式系统功能在耦合的基础上最快地运行系统(否则如果Master要等到资源最终分配成功后才通知Driver,就会造成Driver阻塞,不能够最大化并行计算资源的使用率)。需要补充说明的是:Spark在默认情况下由于集群中一般都只有一个Application在运行,所有Master分配资源策略的弊端就没有那么明显了。
3.7.2 生命周期
本节对Spark Runtime(Driver、Master、Worker、Executor)内幕解密,从Spark Runtime全局的角度看Spark具体是怎么工作的,从一个作业的视角通过Driver、Master、Worker、Executor等角色来透视Spark的Runtime生命周期。
Job提交过程源码解密中一个非常重要的技巧是通过在spark-shell中运行一个Job来了解Job提交的过程,然后再用源码验证这个过程。我们可以在spark-shell中运行一个程序,从控制台观察日志。
1. ,sc.textFile("/library/dataforSortedShufffle").flatMap(_.split(" ")).map (word => (word, 1).reduceByKey(_+_)saveAsTextFile("/library/dataoutput2")
这里我们编写WordCountJobRuntime.scala代码,从IDEA中观察日志。读入的数据源文件内容如下。
1. Hello Spark Hello Scala 2. Hello Hadoop 3. Hello Flink 4. Spark is Awesome
WordCountJobRuntime.scala的代码如下。
1. package com.dt.spark.sparksql 2. 3. import org.apache.log4j.{Level, Logger} 4. import org.apache.spark.{SparkConf, SparkContext} 5. 6. /** 7. * 使用Scala开发本地测试的Spark WordCount程序 8. * @author DT大数据梦工厂 9. * 新浪微博:http://weibo.com/ilovepains/ 10. */ 11. object WordCountJobRuntime { 12. def main(args: Array[String]){ 13. Logger.getLogger("org").setLevel(Level.ALL) 14. /** 15. * 第1步:创建Spark的配置对象SparkConf,设置Spark程序运行时的配置信息, * 例如,通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置 * 为local,则代表Spark程序在本地运行,特别适合于机器配置非常差(如只有1GB的内 * 存)的初学者 * 16. */ 17. 18. 19. val conf = new SparkConf() //创建SparkConf对象 20. conf.setAppName("Wow,WordCountJobRuntime!") //设置应用程序的名称,在程序运行的监控界面可以看到名称 21. conf.setMaster("local") //此时,程序在本地运行,不需要安装Spark集群 22. 23. /** 24. * 第2步:创建SparkContext对象 25. * SparkContext是Spark程序所有功能的唯一入口,采用Scala、Java、Python、 * R等,都必须有一个SparkContext 26. * SparkContext 核心作用:初始化 Spark 应用程序运行所需要的核心组件,包括 * DAGScheduler、TaskScheduler、SchedulerBackend,同时还会负责Spark程序 * 往Master注册程序等 27. * SparkContext是整个Spark应用程序中至关重要的一个对象 28. */ 29. val sc = new SparkContext(conf) //创建SparkContext对象,通过传入SparkConf //实例来定制Spark运行的具体参数和配置信息 30. 31. /** 32. * 第 3 步:根据具体的数据来源(如 HDFS、HBase、Local FS、DB、S3 等)通过 * SparkContext创建RDD 33. * RDD的创建有3种方式:根据外部的数据来源(如HDFS)、根据Scala集合、由其他 * 的RDD操作 34. * 数据会被RDD划分为一系列的Partitions,分配到每个Partition的数据属于一 * 个Task的处理范畴 35. */ 36. val lines = sc.textFile("data/wordcount/helloSpark.txt") 37. /** 38. * 第4步:对初始的RDD进行Transformation级别的处理,如通过map、filter等 * 高阶函数等的编程,进行具体的数据计算 39. * 第4.1步:将每一行的字符串拆分成单个单词 40. */ 41. 42. val words = lines.flatMap { line => line.split(" ")} //对每一行的字符串进行单词拆分,并把所有行的拆 //分结果通过flat合并成为一个大的单词集合 43. 44. /** 45. * 第4步:对初始的RDD进行Transformation级别的处理,如通过map、filter等 * 高阶函数等的编程,进行具体的数据计算 46. * 第4.2步:在单词拆分的基础上对每个单词实例计数为1,也就是word => (word, 1) 47. */ 48. val pairs = words.map { word => (word, 1) } 49. 50. /** 51. * 第4步:对初始的RDD进行Transformation级别的处理,如通过map、filter等 * 高阶函数等的编程,进行具体的数据计算 52. * 第4.3步:在每个单词实例计数为1的基础上统计每个单词在文件中出现的总次数 53. */ 54. val wordCountsOdered = pairs.reduceByKey(_+_).saveAsTextFile("data/ wordcount/wordCountResult.log") 55. 56. while(true){ 57. 58. } 59. sc.stop() 60. 61. } 62. }
在IDEA中运行,WordCountJobRuntime的运行结果保存在data/wordcount/ wordCountResult.log目录的part-00000中。
1. (Awesome,1) 2. (Flink,1) 3. (Spark,2) 4. (is,1) 5. (Hello,4) 6. (Scala,1) 7. (Hadoop,1)
在IDEA的控制台中观察WordCountJobRuntime.scala运行日志,这里Spark版本是version 2.1.0。其中,MemoryStore是从Storge内存角度来看的,Storge是磁盘管理和内存管理。这里,Spark读取了Hadoop的HDFS,因此使用了Hadoop的内容,如FileInputFormat,日志中显示FileInputFormat: Total input paths to process : 1说明有一个文件要处理。
1. Using Spark's default log4j profile: org/apache/spark/log4j-defaults. properties 2. 17/05/24 05:48:20 INFO SparkContext: Running Spark version 2.1.0 3. ...... 4. 17/05/24 05:48:24 DEBUG DiskBlockManager: Adding shutdown hook 5. 17/05/24 05:48:24 DEBUG ShutdownHookManager: Adding shutdown hook 6. 17/05/24 05:48:24 INFO MemoryStore: MemoryStore started with capacity 637.2 MB 7. 17/05/24 05:48:24 INFO SparkEnv: Registering OutputCommitCoordinator 8. ...... 9. 17/05/24 05:48:27 DEBUG HadoopRDD: Creating new JobConf and caching it for later re-use 10. 17/05/24 05:48:27 DEBUG FileInputFormat: Time taken to get FileStatuses: 28 11. 17/05/24 05:48:27 INFO FileInputFormat: Total input paths to process : 1 12. 17/05/24 05:48:27 DEBUG FileInputFormat: Total # of splits generated by getSplits: 1, TimeTaken: 48 13. ......
在Spark中,所有的Action都会触发至少一个Job,在WordCountJobRuntime.scala代码中,是通过saveAsTextFile来触发Job的。在日志中查看SparkContext: Starting job: saveAsTextFile触发saveAsTextFile。紧接着交给DAGScheduler,日志中显示DAGScheduler: Registering RDD,因为这里有两个Stage,从具体计算的角度,前面Stage计算的时候保留输出。然后是DAGScheduler获得了job的ID(job 0)。
1. 17/05/24 05:48:28 INFO SparkContext: Starting job: saveAsTextFile at WordCountJobRuntime.scala:61 2. 17/05/24 05:48:28 DEBUG SortShuffleManager: Can't use serialized shuffle for shuffle 0 because an aggregator is defined 3. 17/05/24 05:48:28 INFO DAGScheduler: Registering RDD 3 (map at WordCountJobRuntime.scala:55) 4. 17/05/24 05:48:28 INFO DAGScheduler: Got job 0 (saveAsTextFile at WordCountJobRuntime.scala:61) with 1 output partitions 5. ......
SparkContext在实例化的时候会构造StandaloneSchedulerBackend(Spark 2.0版本将之前的SparkDeploySchedulerBackend名字更新为StandaloneSchedulerBackend)、DAGScheduler、TaskSchedulerImpl、MapOutputTrackerMaster等对象。
其中,StandaloneSchedulerBackend负责集群计算资源的管理和调度,这是从作业的角度来考虑的,注册给Master的时候,Master给我们分配资源,资源从Executor本身转过来向StandaloneSchedulerBackend注册,这是从作业调度的角度来考虑的,不是从整个集群来考虑,整个集群是Master来管理计算资源的。
DAGScheduler负责高层调度(如Job中Stage的划分、数据本地性等内容)。
TaskSchedulerImple负责具体Stage内部的底层调度(如具体每个Task的调度、Task的容错等)。
MapOutputTrackerMaster负责Shuffle中数据输出和读取的管理。Shuffle的时候将数据写到本地,下一个Stage要使用上一个Stage的数据,因此写数据的时候要告诉Driver中的MapOutputTrackerMaster具体写到哪里,下一个Stage读取数据的时候也要访问Driver的MapOutputTrackerMaster获取数据的具体位置。
MapOutputTrackerMaster的源码如下。
1. private[spark] class MapOutputTrackerMaster(conf: SparkConf, 2. broadcastManager: BroadcastManager, isLocal: Boolean) 3. extends MapOutputTracker(conf) {
DAGScheduler是面向Stage调度的高层调度实现。它为每一个Job计算DAG,跟踪RDDS及Stage输出结果进行物化,并找到一个最小的计划去运行Job,然后提交stages中的TaskSets到底层调度器TaskScheduler提交集群运行,TaskSet包含完全独立的任务,基于集群上已存在的数据运行(如从上一个Stage输出的文件),如果这个数据不可用,获取数据可能会失败。
Spark Stages根据RDD图中Shuffle的边界来创建,如果RDD的操作是窄依赖,如map()和filter(),在每个Stages中将一系列tasks组合成流水线执行。但是,如果是宽依赖,Shuffle依赖需要多个Stages(上一个Stage进行map输出写入文件,下一个Stage读取数据文件),每个Stage依赖于其他的Stage,其中进行多个算子操作。算子操作在各种类型的RDDS(如MappedRDD、FilteredRDD)的RDD.compute()中实际执行。
在DAG阶段,DAGScheduler根据当前缓存状态决定每个任务运行的位置,并将任务传递给底层的任务调度器TaskScheduler。此外,它处理Shuffle输出文件丢失的故障,在这种情况下,以前的Stage可能需要重新提交。Stage中不引起Shuffle文件丢失的故障由任务调度器TaskScheduler处理,在取消整个Stage前,将重试几次任务。
当浏览这个代码时,有几个关键概念:
Jobs作业(表现为[ActiveJob])作为顶级工作项提交给调度程序。当用户调用一个action,如count()算子,Job将通过submitJob进行提交。每个作业可能需要执行多个stages来构建中间数据。
Stages ([Stage])是一组任务的集合,在相同的RDD分区上,每个任务计算相同的功能,计算Jobs的中间结果。Stage根据Shuffle划分边界,我们必须等待前一阶段Stage完成输出。有两种类型的Stage:[ResultStage]是执行action的最后一个Stage,[ShuffleMapStage]是Shuffle Stages通过map写入输出文件中的。如果Jobs重用相同的RDDs,Stages可以跨越多个Jobs共享。
Tasks任务是单独的工作单位,每个任务发送到一个分布式节点。
缓存跟踪:DAGScheduler记录哪些RDDS被缓存,避免重复计算,以及记录Shuffle map Stages已经生成的输出文件,避免在map端重新计算。
数据本地化:DAGScheduler基于RDDS的数据本地性、缓存位置,或Shuffle数据在Stage中运行每一个任务的Task。
清理:当依赖于它们的运行作业完成时,所有数据结构将被清除,防止在长期运行的应用程序中内存泄漏。
为了从故障中恢复,同一个Stage可能需要运行多次,这被称为重试“attempts”。如在上一个Stage中的输出文件丢失,TaskScheduler中将报告任务失败,DAGScheduler通过检测CompletionEvent与FetchFailed或ExecutorLost事件重新提交丢失的Stage。DAGScheduler将等待看是否有其他节点或任务失败,然后在丢失计算任务的阶段Stage中重新提交TaskSets。在这个过程中,可能须创建之前被清理的Stage。旧Stage的任务仍然可以运行,但必须在正确的Stage中接收事件并进行操作。
做改变或者回顾时需要看的清单有:
Job运行结束时,所有的数据结构将被清理,及清理程序运行中的状态。
添加一个新的数据结构时,在新结构中更新'DAGSchedulerSuite.assertDataStructuresEmpty',包括新结构,将有助于捕获内存泄漏。
DAGScheduler.scala的源码如下。
1. private[spark] 2. class DAGScheduler( 3. private[scheduler] val sc: SparkContext, 4. private[scheduler] val taskScheduler: TaskScheduler, 5. listenerBus: LiveListenerBus, 6. mapOutputTracker: MapOutputTrackerMaster, 7. blockManagerMaster: BlockManagerMaster, 8. env: SparkEnv, 9. clock: Clock = new SystemClock()) 10. extends Logging {
回到运行日志,SparkContext在实例化的时候会构造StandaloneSchedulerBackend、DAGScheduler、TaskSchedulerImpl、MapOutputTrackerMaster四大核心对象,DAGScheduler获得Job ID,日志中显示DAGScheduler: Final stage: ResultStage 1,Final stage是ResultStage;Parents of final stage是ShuffleMapStage,DAGScheduler是面向Stage的。日志中显示两个Stage:Stage 1是Final stage,Stage 0是ShuffleMapStage。
接下来序号改变,运行时最左侧从0开始,日志中显示DAGScheduler: missing: List(ShuffleMapStage 0),父Stage是ShuffleMapStage,DAGScheduler调度时必须先计算父Stage,因此首先提交的是ShuffleMapStage 0,这里RDD是MapPartitionsRDD,只有Stage中的最后一个算子是真正有效的,Stage 0中的最后一个操作是map,因此生成了MapPartitionsRDD。Stage 0无父Stage,因此提交,提交时进行广播等内容,然后提交作业。
1. ...... 2. 17/05/24 05:48:28 INFO DAGScheduler: Final stage: ResultStage 1 (saveAsTextFile at WordCountJobRuntime.scala:61) 3. 17/05/24 05:48:28 INFO DAGScheduler: Parents of final stage: List (ShuffleMapStage 0) 4. 17/05/24 05:48:28 INFO DAGScheduler: Missing parents: List (ShuffleMapStage 0) 5. 17/05/24 05:48:28 DEBUG DAGScheduler: submitStage(ResultStage 1) 6. 17/05/24 05:48:28 DEBUG DAGScheduler: missing: List(ShuffleMapStage 0) 7. 17/05/24 05:48:28 DEBUG DAGScheduler: submitStage(ShuffleMapStage 0) 8. 17/05/24 05:48:28 DEBUG DAGScheduler: missing: List() 9. 17/05/24 05:48:28 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCountJobRuntime.scala:55), which has no missing parents 10. 17/05/24 05:48:28 DEBUG DAGScheduler: submitMissingTasks(ShuffleMapStage 0) 11. 17/05/24 05:48:28 TRACE BlockInfoManager: Task -1024 trying to put broadcast_1 12. ......
我们从Web UI的角度看一下,如图3-8所示,Web UI中显示生成两个Stage:Stage 0、Stage 1。
图3-8 Stage划分
日志中显示DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0,DAGScheduler提交作业,显示提交一个须计算的任务,ShuffleMapStage在本地运行是一个并行度,交给TaskSchedulerImpl运行。这里是一个并行度,提交底层的调度器TaskScheduler,TaskScheduler收到任务后,就发布任务到集群中运行,由TaskSetManager进行管理:日志中显示TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 6012 bytes),显示具体运行的位置,及worker运行了哪些任务。这里在本地只运行了一个任务。
1. 17/05/24 05:48:28 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCountJobRuntime.scala:55), which has no missing parents 2. 17/05/24 05:48:28 DEBUG DAGScheduler: submitMissingTasks(ShuffleMapStage 0) 3. ...... 4. 17/05/24 05:48:28 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCountJobRuntime.scala:55) 5. 17/05/24 05:48:28 DEBUG DAGScheduler: New pending partitions: Set(0) 6. 17/05/24 05:48:28 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 7. 17/05/24 05:48:28 DEBUG TaskSetManager: Epoch for TaskSet 0.0: 0 8. 17/05/24 05:48:28 DEBUG TaskSetManager: Valid locality levels for TaskSet 0.0: NO_PREF, ANY 9. 17/05/24 05:48:28 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 0 10. 17/05/24 05:48:28 DEBUG TaskSetManager: Valid locality levels for TaskSet 0.0: NO_PREF, ANY 11. 17/05/24 05:48:28 DEBUG SecurityManager: user=null aclsEnabled=false viewAcls=dell viewAclsGroups= 12. 17/05/24 05:48:28 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 6012 bytes) 13. 17/05/24 05:48:28 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 14. 17/05/24 05:48:28 DEBUG Executor: Task 0's epoch is 0
然后是完成作业,日志中显示TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 327 ms on localhost (executor driver),在本地机器上完成作业。当Stage的一个任务完成后,ShuffleMapStage就已完成。Task任务运行完后向DAGScheduler汇报,DAGScheduler查看曾经提交了几个Task,计算Task的数量如果等于Task的总数量,那Stage也就完成了。这个Stage完成以后,下一个Stage开始运行。
1. ...... 2. 17/05/24 05:48:29 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1744 bytes result sent to driver 3. 17/05/24 05:48:29 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 0 4. 17/05/24 05:48:29 DEBUG TaskSetManager: No tasks for locality level NO_PREF, so moving to locality level ANY 5. 17/05/24 05:48:29 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 327 ms on localhost (executor driver) (1/1) 6. 17/05/24 05:48:29 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 7. 17/05/24 05:48:29 DEBUG DAGScheduler: ShuffleMapTask finished on driver 8. 17/05/24 05:48:29 INFO DAGScheduler: ShuffleMapStage 0 (map at WordCountJobRuntime.scala:55) finished in 0.358 s
ShuffleMapStage完成后,将运行下一个Stage。日志中显示DAGScheduler: looking for newly runnable stages,这里一共有两个Stage,ShuffleMapStage运行完成,那只有一个ResultStage将运行。DAGScheduler又提交最后一个Stage的一个任务,默认并行度是继承的。同样,发布任务给Executor进行计算。
1. ...... 2. 17/05/24 05:48:29 INFO DAGScheduler: looking for newly runnable stages 3. 17/05/24 05:48:29 INFO DAGScheduler: running: Set() 4. 17/05/24 05:48:29 INFO DAGScheduler: waiting: Set(ResultStage 1) 5. 17/05/24 05:48:29 INFO DAGScheduler: failed: Set() 6. 17/05/24 05:48:29 DEBUG MapOutputTrackerMaster: Increasing epoch to 1 7. 17/05/24 05:48:29 TRACE DAGScheduler: Checking if any dependencies of ShuffleMapStage 0 are now runnable 8. 17/05/24 05:48:29 TRACE DAGScheduler: running: Set() 9. 17/05/24 05:48:29 TRACE DAGScheduler: waiting: Set(ResultStage 1) 10. 17/05/24 05:48:29 TRACE DAGScheduler: failed: Set() 11. 17/05/24 05:48:29 DEBUG DAGScheduler: submitStage(ResultStage 1) 12. 17/05/24 05:48:29 DEBUG DAGScheduler: missing: List() 13. 17/05/24 05:48:29 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at WordCountJobRuntime.scala:61), which has no missing parents 14. 17/05/24 05:48:29 DEBUG DAGScheduler: submitMissingTasks(ResultStage 1) 15. ...... 16. 17/05/24 05:48:29 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at WordCountJobRuntime.scala:61) 17. 17/05/24 05:48:29 DEBUG DAGScheduler: New pending partitions: Set(0) 18. 17/05/24 05:48:29 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 19. ....
Task任务运行完后向DAGScheduler汇报,DAGScheduler计算曾经提交了几个Task,如果Task的数量等于Task的总数量,ResultStage也运行完成。然后进行相关的清理工作,两个Stage(ShuffleMapStage、ResultStage)完成,Job也就完成。
1. ...... 2. 17/05/24 05:48:29 DEBUG MapOutputTrackerMaster: Fetching outputs for shuffle 0, partitions 0-1 3. 17/05/24 05:48:29 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 4. 17/05/24 05:48:29 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 5. 17/05/24 05:48:29 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 5 ms 6. 17/05/24 05:48:29 DEBUG ShuffleBlockFetcherIterator: Got local blocks in 12 ms 7. 17/05/24 05:48:29 DEBUG TaskMemoryManager: Task 1 release 0.0 B from org.apache.spark.util.collection.ExternalAppendOnlyMap@3da8eddf 8. ...... 9. 17/05/24 05:48:29 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 409 ms on localhost (executor driver) (1/1) 10. 17/05/24 05:48:29 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 11. 17/05/24 05:48:29 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at WordCountJobRuntime.scala:61) finished in 0.410 s 12. 17/05/24 05:48:29 DEBUG DAGScheduler: After removal of stage 1, remaining stages = 1 13. 17/05/24 05:48:29 DEBUG DAGScheduler: After removal of stage 0, remaining stages = 0 14. 17/05/24 05:48:29 INFO DAGScheduler: Job 0 finished: saveAsTextFile at WordCountJobRuntime.scala:61, took 1.345921 s 15. ......
下面看一下WebUI,ShuffleMapStage中的任务交给Executor,图3-9中显示了任务的相关信息,如Shuffle的输出等,第一个Stage肯定生成Shuffle的输出,可以看一下最右侧的Shuffle Write Size/Records。图3-9中的Input Size/Records是从Hdfs中读入的文件数据。
图3-9 ShuffleMapStage运行
接下来看一下第二个Stage。第二个Stage同样显示Executor的信息,图3-10最右侧显示Shuffle Read Size/Records。如果在分布式集群运行,须远程读取数据,例如,原来是4个Executor计算,在第二个Stage中是两个Executor计算,因此一部分数据是本地的,一部分是远程的,或从远程节点拉取数据。ResultStage最后要产生输出,输出到文件保存。
图3-10 ResultStage运行
Task的运行解密:
(1)Task是运行在Executor中的,而Executor又是位于CoarseGrainedExecutorBackend中的,且CoarseGrainedExecutorBackend和Executor是一一对应的;计算运行于Executor,而Executor位于CoarseGrainedExecutorBackend中,CoarseGrainedExecutorBackend是进程。发任务消息也是在CoarseGrainedExecutorBackend。
(2)当CoarseGrainedExecutorBackend接收到TaskSetManager发过来的LaunchTask消息后会反序列化TaskDescription,然后使用CoarseGrainedExecutorBackend中唯一的Executor来执行任务。
CoarseGrainedExecutorBackend收到Driver发送的LaunchTask任务消息,其中LaunchTask是case class,而不是case object,是因为每个消息是一个消息实例,每个消息状态不一样,而case object是唯一的,因此使用case class。
1. //Driver节点到 executors节点 2. case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
Executor.scala的源码如下。
Spark 2.1.1版本的Executor.scala的launchTask的源码如下。
1. ...... 2. //维护正在运行的任务列表 3. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] 4. ...... 5. def launchTask( 6. context: ExecutorBackend, 7. taskId: Long, 8. attemptNumber: Int, 9. taskName: String, 10. serializedTask: ByteBuffer): Unit = { 11. val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask) 12. runningTasks.put(taskId, tr) 13. threadPool.execute(tr) 14. } 15. ...... 16. 17. class TaskRunner( 18. execBackend: ExecutorBackend, 19. val taskId: Long, 20. val attemptNumber: Int, 21. taskName: String, 22. serializedTask: ByteBuffer) 23. extends Runnable { 24. .......
Spark 2.2.0版本的Executor.scala的launchTask的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第7~10行调整launchTask方法的第二个参数:传入封装的taskDescription任务描述信息。
上段代码中第11行构建TaskRunner实例传入的也是taskDescription参数。
上段代码中第19~22行TaskRunner的第二个成员变量更新为TaskDescription类型。
1. ...... 2. def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = { 3. val tr = new TaskRunner(context, taskDescription) 4. ..... 5. } 6. ...... 7. class TaskRunner( 8. ..... 9. private val taskDescription: TaskDescription) 10. ...
在Executor.scala中单击launchTask,运行的任务使用了ConcurrentHashMap数据结构,运行launchTask的时候构建了一个TaskRunner,TaskRunner是一个Runnable,而Runnable是Java中的接口,Scala可以直接调用Java的代码,run方法中包括任务的反序列化等内容。通过Runnable封装任务,然后放入到runningTasks中,在threadPool中执行任务。threadPool是一个newDaemonCachedThreadPool。任务交给Executor的线程池中的线程去执行,执行的时候下载资源、数据等内容。
Spark 2.1.1版本的Executor.scala的threadPool的源码如下。
1. //启动worker线程池 2. private val threadPool = ThreadUtils.newDaemonCachedThreadPool ("Executor task launch worker")
Spark 2.2.0版本的Executor.scala的threadPool的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第2行Executor的线程池由ThreadUtils.newDaemonCachedThreadPool方式调整为Executors.newCachedThreadPool(threadFactory)线程池的方式。
1. //启动worker线程池 2. private val threadPool = { 3. val threadFactory = new ThreadFactoryBuilder() 4. .setDaemon(true) 5. .setNameFormat("Executor task launch worker-%d") 6. .setThreadFactory(new ThreadFactory { 7. override def newThread(r: Runnable): Thread = 8. //使用UninterruptibleThread 运行任务,这样我们就可以允许运行代码不被 //Thread.interrupt()线程中断。例如,KAFKA-1894、HADOOP-10622, //如果某些方法被中断,程序将会一直挂起 9. new UninterruptibleThread(r, "unused") //thread name will be set by ThreadFactoryBuilder 10. }) 11. .build() 12. Executors.newCachedThreadPool(threadFactory).asInstanceOf [ThreadPoolExecutor] 13. }