4.1 Spark Driver Program剖析
SparkContext是通往Spark集群的唯一入口,是整个Application运行调度的核心。本节将深度剖析SparkContext。
4.1.1 Spark Driver Program
Spark Driver Program(以下简称Driver)是运行Application的main函数并且新建SparkContext实例的程序。其实,初始化SparkContext是为了准备Spark应用程序的运行环境,在Spark中,由SparkContext负责与集群进行通信、资源的申请、任务的分配和监控等。当Worker节点中的Executor运行完毕Task后,Driver同时负责将SparkContext关闭。通常也可以使用SparkContext来代表驱动程序(Driver)。
Driver(SparkContext)整体架构图如图4-1所示。
图4-1 Driver(SparkContext)整体架构图
4.1.2 SparkContext深度剖析
SparkContext是通往Spark集群的唯一入口,可以用来在Spark集群中创建RDDs、累加器(Accumulators)和广播变量(Broadcast Variables)。SparkContext也是整个Spark应用程序(Application)中至关重要的一个对象,可以说是整个Application运行调度的核心(不是指资源调度)。
SparkContext的核心作用是初始化Spark应用程序运行所需要的核心组件,包括高层调度器(DAGScheduler)、底层调度器(TaskScheduler)和调度器的通信终端(SchedulerBackend),同时还会负责Spark程序向Master注册程序等。
一般而言,通常为了测试或者学习Spark开发一个Application,在Application的main方法中,最开始几行编写的代码一般是这样的:首先,创建SparkConf实例,设置SparkConf实例的属性,以便覆盖Spark默认配置文件spark-env.sh,spark-default.sh和log4j.properties中的参数;然后,SparkConf实例作为SparkContext类的唯一构造参数来实例化SparkContext实例对象。SparkContext在实例化的过程中会初始化DAGScheduler、TaskScheduler和SchedulerBackend,而当RDD的action触发了作业(Job)后,SparkContext会调用DAGScheduler将整个Job划分成几个小的阶段(Stage),TaskScheduler会调度每个Stage的任务(Task)进行处理。还有,SchedulerBackend管理整个集群中为这个当前的Application分配的计算资源,即Executor。
如果用一个车来比喻Spark Application,那么SparkContext就是车的引擎,而SparkConf是关于引擎的配置参数。说明:只可以有一个SparkContext实例运行在一个JVM内存中,所以在创建新的SparkContext实例前,必须调用stop方法停止当前JVM唯一运行的SparkContext实例。
Spark程序在运行时分为Driver和Executor两部分:Spark程序编写是基于SparkContext的,具体包含两方面。
Spark编程的核心基础RDD是由SparkContext最初创建的(第一个RDD一定是由SparkContext创建的)。
Spark程序的调度优化也是基于SparkContext,首先进行调度优化。
Spark程序的注册是通过SparkContext实例化时生产的对象来完成的(其实是SchedulerBackend来注册程序)。
Spark程序在运行时要通过Cluster Manager获取具体的计算资源,计算资源获取也是通过SparkContext产生的对象来申请的(其实是SchedulerBackend来获取计算资源的)。
SparkContext崩溃或者结束的时候,整个Spark程序也结束。
4.1.3 SparkContext源码解析
SparkContext是Spark应用程序的核心。我们运行WordCount程序,通过日志来深入了解SparkContext。
WordCount.scala的代码如下。
1. package com.dt.spark.sparksql 2. 3. import org.apache.log4j.{Level, Logger} 4. import org.apache.spark.rdd.RDD 5. import org.apache.spark.{SparkConf, SparkContext} 6. 7. /** 8. * 使用Scala开发本地测试的Spark WordCount程序 9. * @author DT大数据梦工厂 10. * 新浪微博:http://weibo.com/ilovepains/ 11. */ 12. object WordCount { 13. def main(args: Array[String]){ 14. Logger.getLogger("org").setLevel(Level.ALL) 15. /** 16. * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息, * 例如,通过setMaster设置程序要链接的Spark集群的Master的URL,如果设置 * 为local,则代表Spark程序在本地运行,特别适合于机器配置非常差(如只有1GB * 的内存)的初学者 17. */ 18. 19. val conf = new SparkConf() //创建SparkConf对象 20. conf.setAppName("Wow,WordCountJobRuntime!") //设置应用程序的名称,在程序运行的监控界面中可以看到名称 21. conf.setMaster("local") //此时,程序在本地运行,不需要安装Spark集群 22. 23. /** 24. * 第2步:创建SparkContext对象 25. * SparkContext是Spark程序所有功能的唯一入口,采用Scala、Java、Python、 * R等都必须有一个SparkContext 26. * SparkContext 核心作用:初始化 Spark 应用程序运行所需要的核心组件,包括 * DAGScheduler、TaskScheduler、SchedulerBackend 27. * 同时还会负责Spark程序往Master注册程序等 28. * SparkContext是整个Spark应用程序中至关重要的一个对象 29. */ 30. val sc = new SparkContext(conf) //创建SparkContext对象,通过传入SparkConf //实例来定制Spark运行的具体参数和配置信息 31. /** 32. * 第 3 步:根据具体的数据来源(如 HDFS、HBase、Local FS、DB、S3 等)通过 * SparkContext来创建RDD 33. * RDD的创建有3种方式:根据外部的数据来源(如HDFS),根据Scala集合,由其他 * 的RDD操作 34. * 数据会被RDD划分成一系列的Partitions,分配到每个Partition的数据属于一 * 个Task的处理范畴 35. */ 36. val lines = sc.textFile("data/wordcount/helloSpark.txt") //读取本地文件并设置为一个Partition 37. 38. /** 39. * 第4步:对初始的RDD进行Transformation级别的处理,如通过map、filter等 * 高阶函数等的编程,进行具体的数据计算 40. * 第4.1步:将每一行的字符串拆分成单个单词 41. */ 42. 43. val words = lines.flatMap { line => line.split(" ")} //对每一行的字符串进行单词拆分,并把所有行的拆分结果通过 //flat合并成为一个大的单词集合 44. 45. /** 46. * 第4步:对初始的RDD进行Transformation级别的处理,如通过map、filter等 * 高阶函数等的编程,进行具体的数据计算 47. * 第4.2步:在单词拆分的基础上对每个单词实例计数为1,也就是word => (word, 1) 48. */ 49. val pairs: RDD[(String, Int)] = words.map { word => (word, 1) } 50. pairs.cache() 51. /** 52. * 第4步:对初始的RDD进行Transformation级别的处理,如通过map、filter等 * 高阶函数等的编程,进行具体的数据计算 53. * 第4.3步:在每个单词实例计数为1的基础上统计每个单词在文件中出现的总次数 54. */ 55. val wordCountsOdered = pairs.reduceByKey(_+_).saveAsTextFile("data/ wordcount/wordCountResult.log") 56. 57. while(true){ 58. 59. } 60. sc.stop() 61. 62. } 63. }
在IDEA中运行WordCount.scala代码,日志显示如下。
1. Using Spark's default log4j profile: org/apache/spark/log4j-defaults. properties 2. 17/06/16 06:00:49 INFO SparkContext: Running Spark version 2.1.0 3. ........ 4. 17/06/16 06:00:54 TRACE BlockInfoManager: Task -1024 releasing lock for broadcast_0_piece0 5. 17/06/16 06:00:54 DEBUG BlockManager: Putting block broadcast_0_piece0 without replication took 377 ms 6. 17/06/16 06:00:54 INFO SparkContext: Created broadcast 0 from textFile at WordCountJobRuntime.scala:39 7. ........ 8. 17/06/16 06:00:54 INFO SparkContext: Starting job: saveAsTextFile at WordCountJobRuntime.scala:58 9. .......
程序一开始,日志里显示的是:INFO SparkContext: Running Spark version 2.1.0,日志中间部分是一些随着SparkContext创建而创建的对象,另一条比较重要的日志信息,作业启动了并正在运行:INFO SparkContext: Starting job: saveAsTextFile at WordCountJobRuntime.scala:58。
在程序运行的过程中会创建TaskScheduler、DAGScheduler和SchedulerBackend,它们有各自的功能。DAGScheduler是面向Job的Stage的高层调度器;TaskScheduler是底层调度器。SchedulerBackend是一个接口,根据具体的ClusterManager的不同会有不同的实现。程序打印结果后便开始结束。日志显示:INFO SparkContext: Successfully stopped SparkContext。
1. ........ 2. 17/06/16 06:00:56 INFO BlockManagerMaster: BlockManagerMaster stopped 3. 17/06/16 06:00:56 INFO OutputCommitCoordinator$OutputCommitCoordinator Endpoint: OutputCommitCoordinator stopped! 4. 17/06/16 06:00:56 INFO SparkContext: Successfully stopped SparkContext 5. 17/06/16 06:00:56 INFO ShutdownHookManager: Shutdown hook called 6. .......
通过这个例子可以感受到Spark程序的运行到处都可以看到SparkContext的存在,我们将SparkContext作为Spark源码阅读的入口,来理解Spark的所有内部机制。
图4-2是从一个整体去看SparkContext创建的实例对象。首先,SparkContext构建的顶级三大核心为DAGScheduler、TaskScheduler、SchedulerBackend,其中,DAGScheduler是面向Job的Stage的高层调度器;TaskScheduler是一个接口,是底层调度器,根据具体的ClusterManager的不同会有不同的实现,Standalone模式下具体的实现是TaskSchedulerImpl。SchedulerBackend是一个接口,根据具体的ClusterManager的不同会有不同的实现。Standalone模式下具体的实现是StandaloneSchedulerBackend。
图4-2 SparkContext整体运行图
从整个程序运行的角度讲,SparkContext包含四大核心对象:DAGScheduler、TaskScheduler、SchedulerBackend、MapOutputTrackerMaster。StandaloneSchedulerBackend有三大核心功能:负责与Master连接,注册当前程序RegisterWithMaster;接收集群中为当前应用程序分配的计算资源Executor的注册并管理Executors;负责发送Task到具体的Executor执行。
第一步:程序一开始运行时会实例化SparkContext里的对象,所有不在方法里的成员都会被实例化!一开始实例化时第一个关键的代码是createTaskScheduler,它位于SparkContext的PrimaryConstructor中,当它实例化时会直接被调用,这个方法返回的是taskScheduler和dagScheduler的实例,然后基于这个内容又构建了DAGScheduler,最后调用taskScheduler的start()方法。要先创建taskScheduler,然后再创建dagScheduler,因为taskScheduler是受dagScheduler管理的。
SparkContext.scala的源码如下。
1. //创建和启动调度器 2. val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) 3. _schedulerBackend = sched 4. _taskScheduler = ts 5. _dagScheduler = new DAGScheduler(this) 6. _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) 7. //在DAGScheduler构造器中设置taskScheduler的引用以后,启动TaskScheduler 8. _taskScheduler.start() 9. ......
第二步:调用createTaskScheduler,这个方法创建了TaskSchedulerImpl和StandaloneSchedulerBackend,createTaskScheduler方法的第一个入参是SparkContext,传入的this对象是在应用程序中创建的sc,第二个入参是master的地址。
以下是WordCount.scala创建SparkConf和SparkContext的上下文信息。
1. val conf = new SparkConf() //创建SparkConf对象 2. conf.setAppName("Wow,WordCount") //设置应用程序的名称,在程序运行的监控界面中可以看到名称 3. conf.setMaster("local") 4. val sc = new SparkContext(conf)
当SparkContext调用createTaskScheduler方法时,根据集群的条件创建不同的调度器,例如,createTaskScheduler第二个入参master如传入local参数,SparkContext将创建TaskSchedulerImpl实例及LocalSchedulerBackend实例,在测试代码的时候,可以尝试传入local[*]或者是local[2]的参数,然后跟踪代码,看看创建了什么样的实例对象。
SparkContext中的SparkMasterRegex对象定义不同的正则表达式,从master字符串中根据正则表达式适配master信息。
SparkContext.scala的源码如下。
1. private object SparkMasterRegex { 2. //正则表达式 local[N]和 local[*] 用于master 格式 3. val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r 4. //正则表达式 local[N, maxRetries]用于失败任务的测试 5. val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r 6. //正则表达式用于模拟Spark 本地集群 [N, cores, memory] 7. val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+) \s*,\s*([0-9]+)\s*]""".r 8. //用于连接到Spark部署集群的正则表达式 9. val SPARK_REGEX = """spark://(.*)""".r 10. }
这是设计模式中的策略模式,它会根据实际需要创建出不同的SchedulerBackend的子类。
SparkContext.scala的createTaskScheduler方法的源码如下。
1. /** *基于给定的主URL创建任务调度器,返回一个二元调度程序的后台和任务调度 2. */ 3. 4. private def createTaskScheduler( 5. sc: SparkContext, 6. master: String, 7. deployMode: String): (SchedulerBackend, TaskScheduler) = { 8. import SparkMasterRegex._ 9. 10. //当在本地运行时,不要试图在失败时重新执行任务 11. val MAX_LOCAL_TASK_FAILURES = 1 12. 13. master match { 14. case "local" => 15. val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) 16. val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1) 17. scheduler.initialize(backend) 18. (backend, scheduler) 19. 20. case LOCAL_N_REGEX(threads) => 21. def localCpuCount: Int = Runtime.getRuntime.availableProcessors() 22. //local[*]估计机器上的核数; local[N] 精确地使用N个线程 23. val threadCount = if (threads == "*") localCpuCount else threads.toInt 24. if (threadCount <= 0) { 25. throw new SparkException(s"Asked to run locally with $threadCount threads") 26. } 27. val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) 28. val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) 29. scheduler.initialize(backend) 30. (backend, scheduler) 31. 32. case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => 33. def localCpuCount: Int = Runtime.getRuntime.availableProcessors() 34. //local[*, M] 计算机核发生M个故障 35. //local[N, M] 意味着N个线程M个故障 36. val threadCount = if (threads == "*") localCpuCount else threads. toInt 37. val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true) 38. val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) 39. scheduler.initialize(backend) 40. (backend, scheduler) 41. 42. case SPARK_REGEX(sparkUrl) => 43. val scheduler = new TaskSchedulerImpl(sc) 44. val masterUrls = sparkUrl.split(",").map("spark://" + _) 45. val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) 46. scheduler.initialize(backend) 47. (backend, scheduler) 48. 49. case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => 50. //确认请求的内存<= memoryPerSlave,否则Spark将会挂起 51. val memoryPerSlaveInt = memoryPerSlave.toInt 52. if (sc.executorMemory > memoryPerSlaveInt) { 53. throw new SparkException( 54. "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format( 55. memoryPerSlaveInt, sc.executorMemory)) 56. } 57. 58. val scheduler = new TaskSchedulerImpl(sc) 59. val localCluster = new LocalSparkCluster( 60. numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf) 61. val masterUrls = localCluster.start() 62. val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) 63. scheduler.initialize(backend) 64. backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => { 65. localCluster.stop() 66. } 67. (backend, scheduler) 68. 69. case masterUrl => 70. val cm = getClusterManager(masterUrl) match { 71. case Some(clusterMgr) => clusterMgr 72. case None => throw new SparkException("Could not parse Master URL: '" + master + "'") 73. } 74. try { 75. val scheduler = cm.createTaskScheduler(sc, masterUrl) 76. val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) 77. cm.initialize(scheduler, backend) 78. (backend, scheduler) 79. } catch { 80. case se: SparkException => throw se 81. case NonFatal(e) => 82. throw new SparkException("External scheduler cannot be instantiated", e) 83. } 84. } 85. }
在实际生产环境下,我们都是用集群模式,即以spark://开头,此时在程序运行时,框架会创建一个TaskSchedulerImpl和StandaloneSchedulerBackend的实例,在这个过程中也会初始化taskscheduler,把StandaloneSchedulerBackend的实例对象作为参数传入。StandaloneSchedulerBackend被TaskSchedulerImpl管理,最后返回TaskScheduler和StandaloneSchdeulerBackend。
SparkContext.scala的源码如下。
1. case SPARK_REGEX(sparkUrl) => 2. val scheduler = new TaskSchedulerImpl(sc) 3. val masterUrls = sparkUrl.split(",").map("spark://" + _) 4. val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) 5. scheduler.initialize(backend) 6. (backend, scheduler)
createTaskScheduler方法执行完毕后,调用了taskscheduler.start()方法来正式启动taskscheduler,这里虽然调用了taskscheduler.start方法,但实际上是调用了taskSchedulerImpl的start方法,因为taskSchedulerImpl是taskScheduler的子类。
Task默认失败重试次数是4次,如果任务不容许失败,就可以调大这个参数。调大spark.task.maxFailures参数有助于确保重要的任务失败后可以重试多次。
初始化TaskSchedulerImpl:调用createTaskScheduler方法时会初始化TaskSchedulerImpl,然后把StandaloneSchedulerBackend当作参数传进去,初始化TaskSchedulerImpl时首先是创建一个Pool来初定义资源分布的模式Scheduling Mode,默认是先进先出(FIFO)的模式。
Spark 2.1.1版本的TaskSchedulerImpl.scala的initialize的源码如下。
1. def initialize(backend: SchedulerBackend) { 2. this.backend = backend 3. //临时设置rootPool名字为空 4. rootPool = new Pool("", schedulingMode, 0, 0) 5. schedulableBuilder = { 6. schedulingMode match { 7. case SchedulingMode.FIFO => 8. new FIFOSchedulableBuilder(rootPool) 9. case SchedulingMode.FAIR => 10. new FairSchedulableBuilder(rootPool, conf) 11. case _ => 12. throw new IllegalArgumentException(s"Unsupported spark. scheduler.mode: $schedulingMode") 13. } 14. } 15. schedulableBuilder.buildPools() 16. }
Spark 2.2.0版本的TaskSchedulerImpl.scala的initialize的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第4行rootPool变量的创建从initialize方法内移动至initialize方法外;rootPool作为TaskSchedulerImpl类的成员变量,在构建TaskSchedulerImpl时初始化。
1. val rootPool: Pool = new Pool("", schedulingMode, 0, 0) 2. ......
可以设置spark.scheduler.mode参数来定义资源调度池,例如FAIR、FIFO,默认资源调度池是先进先出(FIFO)模式。
Spark 2.1.1版本的TaskSchedulerImpl.scala的源码如下。
1. private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO") 2. val schedulingMode: SchedulingMode = try { 3. SchedulingMode.withName(schedulingModeConf.toUpperCase) 4. } catch { 5. case e: java.util.NoSuchElementException => 6. throw new SparkException(s"Unrecognized spark.scheduler.mode: $schedulingModeConf") 7. }
Spark 2.2.0版本的TaskSchedulerImpl.scala的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第1行Conf配置文件获取属性的代码进行了微调,调整为从object TaskSchedulerImpl中获取。
上段代码中第3行toUpperCase更新为toUpperCase(Locale.ROOT)。
上段代码中第6行异常提示字符串更新为$SCHEDULER_MODE_PROPERTY。
1. private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString) 2. ..... 3. SchedulingMode.withName(schedulingModeConf.toUpperCase(Locale.ROOT)) 4. ..... 5. throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf") 6. } 7. ....... 8. private[spark] object TaskSchedulerImpl { 9. 10. val SCHEDULER_MODE_PROPERTY = "spark.scheduler.mode" 11. ..... 12. SchedulingMode.scala 13. 14. object SchedulingMode extends Enumeration { 15. 16. type SchedulingMode = Value 17. val FAIR, FIFO, NONE = Value 18. }
回到taskScheduler start方法,taskScheduler.start方法调用时会再调用schedulerbackend的start方法。
TaskSchedulerImpl.scala的start方法的源码如下。
1. override def start() { 2. backend.start() 3. 4. if (!isLocal && conf.getBoolean("spark.speculation", false)) { 5. logInfo("Starting speculative execution thread") 6. speculationScheduler.scheduleAtFixedRate(new Runnable { 7. override def run(): Unit = Utils.tryOrStopSparkContext(sc) { 8. checkSpeculatableTasks() 9. } 10. }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit. MILLISECONDS) 11. } 12. }
SchedulerBackend包含多个子类,分别是LocalSchedulerBackend、CoarseGrainedScheduler-Backend和StandaloneSchedulerBackend、MesosCoarseGrainedSchedulerBackend、YarnScheduler-Backend。
StandaloneSchedulerBackend的start方法调用了CoarseGraninedSchedulerBackend的start方法,通过StandaloneSchedulerBackend注册程序把command提交给Master:Command ("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)来创建一个StandaloneAppClient的实例。
Spark 2.1.1版本的StandaloneSchedulerBackend.scala的start方法的源码如下。
1. override def start() { 2. super.start() 3. launcherBackend.connect() 4. 5. //executors 节点与我们通信的端点 6. val driverUrl = RpcEndpointAddress( 7. sc.conf.get("spark.driver.host"), 8. sc.conf.get("spark.driver.port").toInt, 9. CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString 10. val args = Seq( 11. "--driver-url", driverUrl, 12. "--executor-id", "{{EXECUTOR_ID}}", 13. "--hostname", "{{HOSTNAME}}", 14. "--cores", "{{CORES}}", 15. "--app-id", "{{APP_ID}}", 16. "--worker-url", "{{WORKER_URL}}") 17. val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") 18. .map(Utils.splitCommandString).getOrElse(Seq.empty) 19. val classPathEntries = sc.conf.getOption("spark.executor. extraClassPath") 20. .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) 21. val libraryPathEntries = sc.conf.getOption("spark.executor. extraLibraryPath") 22. .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) 23. 24. //测试时,将父类路径公开给子对象,由compute-classpath.{cmd,sh}计算路径。当 //“*-provided”配置启用,子进程可使用所有需要的jar包 25. val testingClassPath = 26. if (sys.props.contains("spark.testing")) { 27. sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq 28. } else { 29. Nil 30. } 31. 32. //使用注册调度必要的一些配置启动executors 33. val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf. isExecutorStartupConf) 34. val javaOpts = sparkJavaOpts ++ extraJavaOpts 35. val command = Command("org.apache.spark.executor.CoarseGrained- ExecutorBackend", 36. args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) 37. val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") 38. val coresPerExecutor = conf.getOption("spark.executor.cores").map (_.toInt) 39. //如果使用动态分配,现在将我们的初始执行器限制设置为0, //ExecutorAllocationManager将实际的初始限制发送给Master节点 40. val initialExecutorLimit = 41. if (Utils.isDynamicAllocationEnabled(conf)) { 42. Some(0) 43. } else { 44. None 45. } 46. val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit) 47. client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf) 48. client.start() 49. launcherBackend.setState(SparkAppHandle.State.SUBMITTED) 50. waitForRegistration() 51. launcherBackend.setState(SparkAppHandle.State.RUNNING) 52. }
Spark 2.2.0版本的StandaloneSchedulerBackend.scala的start方法的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第3行增加了对SparkContext部署方式的判断。
上段代码中第37行appUIAddress变量名称调整为webUrl。
上段代码中第46行构建应用程序的描述信息ApplicationDescription第5个参数appUIAddress更新为webUrl参数。
1. .... 2. //SPARK-21159:只有在client模式下scheduler backend去连接launcher。在 //cluster 集群下,提交应用程序应提交给Master 3. if (sc.deployMode == "client") { 4. launcherBackend.connect() 5. } 6. ...... 7. val webUrl = sc.ui.map(_.webUrl).getOrElse("") 8. ...... 9. val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit) 10. ......
Master发指令给Worker去启动Executor所有的进程时加载的Main方法所在的入口类就是command中的CoarseGrainedExecutorBackend,在CoarseGrainedExecutorBackend中启动Executor(Executor是先注册,再实例化),Executor通过线程池并发执行Task,然后再调用它的run方法。
CoarseGrainedExecutorBackend.scala的源码如下。
1. def main(args: Array[String]) { 2. var driverUrl: String = null 3. var executorId: String = null 4. var hostname: String = null 5. var cores: Int = 0 6. var appId: String = null 7. var workerUrl: Option[String] = None 8. val userClassPath = new mutable.ListBuffer[URL]() 9. 10. var argv = args.toList 11. while (!argv.isEmpty) { 12. argv match { 13. case ("--driver-url") :: value :: tail => 14. driverUrl = value 15. argv = tail 16. case ("--executor-id") :: value :: tail => 17. executorId = value 18. argv = tail 19. case ("--hostname") :: value :: tail => 20. hostname = value 21. argv = tail 22. case ("--cores") :: value :: tail => 23. cores = value.toInt 24. argv = tail 25. case ("--app-id") :: value :: tail => 26. appId = value 27. argv = tail 28. case ("--worker-url") :: value :: tail => 29. //Worker url 用于spark standalone 模式,以加强与Worker的分享 30. workerUrl = Some(value) 31. argv = tail 32. case ("--user-class-path") :: value :: tail => 33. userClassPath += new URL(value) 34. argv = tail 35. case Nil => 36. case tail => 37. //scalastyle:off println 38. System.err.println(s"Unrecognized options: ${tail.mkString(" ")}") 39. //scalastyle:on println 40. printUsageAndExit() 41. } 42. } 43. 44. if (driverUrl == null || executorId == null || hostname == null || cores <= 0 || 45. appId == null) { 46. printUsageAndExit() 47. } 48. 49. run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath) 50. System.exit(0) 51. }
CoarseGrainedExecutorBackend的main入口方法中调用了run方法。
Spark 2.1.1版本的CoarseGrainedExecutorBackend的run入口方法的源码如下。
1. private def run( 2. driverUrl: String, 3. executorId: String, 4. hostname: String, 5. cores: Int, 6. appId: String, 7. workerUrl: Option[String], 8. userClassPath: Seq[URL]) { 9. 10. Utils.initDaemon(log) 11. 12. SparkHadoopUtil.get.runAsSparkUser { () => 13. //Debug 代码 14. Utils.checkHost(hostname) 15. 16. //Bootstrap 去抓取 driver节点 Spark属性 17. val executorConf = new SparkConf 18. val port = executorConf.getInt("spark.executor.port", 0) 19. val fetcher = RpcEnv.create( 20. "driverPropsFetcher", 21. hostname, 22. port, 23. executorConf, 24. new SecurityManager(executorConf), 25. clientMode = true) 26. val driver = fetcher.setupEndpointRefByURI(driverUrl) 27. val cfg = driver.askWithRetry[SparkAppConfig](RetrieveSparkAppConfig) 28. val props = cfg.sparkProperties ++ Seq[(String, String)](("spark. app.id", appId)) 29. fetcher.shutdown() 30. 31. //从driver 节点获取属性信息,创建SparkEnv 32. val driverConf = new SparkConf() 33. for ((key, value) <- props) { 34. //这是SSL在独立模式下需要的 35. if (SparkConf.isExecutorStartupConf(key)) { 36. driverConf.setIfMissing(key, value) 37. } else { 38. driverConf.set(key, value) 39. } 40. } 41. if (driverConf.contains("spark.yarn.credentials.file")) { 42. logInfo("Will periodically update credentials from: " + 43. driverConf.get("spark.yarn.credentials.file")) 44. SparkHadoopUtil.get.startCredentialUpdater(driverConf) 45. } 46. 47. val env = SparkEnv.createExecutorEnv( 48. driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false) 49. 50. env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( 51. env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env)) 52. workerUrl.foreach { url => 53. env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher (env.rpcEnv, url)) 54. } 55. env.rpcEnv.awaitTermination() 56. SparkHadoopUtil.get.stopCredentialUpdater() 57. } 58. }
Spark 2.2.0版本的CoarseGrainedExecutorBackend的run入口方法的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第27行Spark 2.2.0版本将Rpc消息终端引用RpcEndpointRef的askWithRetry方法调整为askSync方法。CoarseGrainedExecutorBackend通过消息循环体向driver发送RetrieveSparkAppConfig消息,RetrieveSparkAppConfig是一个case object。Driver端的CoarseGrainedSchedulerBackend消息循环体收到消息以后,将Spark的属性信息sparkProperties及加密key等内容封装成SparkAppConfig消息,将SparkAppConfig消息再回复给CoarseGrainedExecutorBackend。
1. ........ 2. val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig) 3. ........
回到StandaloneSchedulerBackend.scala的start方法:其中创建了一个很重要的对象,即StandaloneAppClient对象,然后调用它的client.start()方法。
在start方法中创建一个ClientEndpoint对象。
StandaloneAppClient.scala的star方法的源码如下。
1. def start() { 2. //启动 rpcEndpoint; it will call back into the listener. 3. endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint (rpcEnv))) 4. }
ClientEndpoint是一个RpcEndPoint,首先调用自己的onStart方法,接下来向Master注册。
StandaloneAppClient.scala的ClientEndpoint类的源码如下。
1. private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint 2. ....... 3. override def onStart(): Unit = { 4. try { 5. registerWithMaster(1) 6. } catch { 7. case e: Exception => 8. logWarning("Failed to connect to master", e) 9. markDisconnected() 10. stop() 11. } 12. } 13. .......
调用registerWithMaster方法,从registerWithMaster调用tryRegisterAllMasters,开一条新的线程来注册,然后发送一条信息(RegisterApplication的case class)给Master。
StandaloneAppClient.scala的registerWithMaster的源码如下。
1. private def registerWithMaster(nthRetry: Int) { 2. registerMasterFutures.set(tryRegisterAllMasters()) 3. registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable { 4. override def run(): Unit = { 5. if (registered.get) { 6. registerMasterFutures.get.foreach(_.cancel(true)) 7. registerMasterThreadPool.shutdownNow() 8. } else if (nthRetry >= REGISTRATION_RETRIES) { 9. markDead("All masters are unresponsive! Giving up.") 10. } else { 11. registerMasterFutures.get.foreach(_.cancel(true)) 12. registerWithMaster(nthRetry + 1) 13. } 14. } 15. }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)) 16. } 17. ......
StandaloneAppClient.scala的tryRegisterAllMasters的源码如下。
1. private def tryRegisterAllMasters(): Array[JFuture[_]] = { 2. for (masterAddress <- masterRpcAddresses) yield { 3. registerMasterThreadPool.submit(new Runnable { 4. override def run(): Unit = try { 5. if (registered.get) { 6. return 7. } 8. logInfo("Connecting to master " + masterAddress.toSparkURL + "...") 9. val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) 10. masterRef.send(RegisterApplication(appDescription, self)) 11. } catch { 12. case ie: InterruptedException => //Cancelled 13. case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) 14. } 15. }) 16. } 17. } 18. ......
Master收到RegisterApplication信息后便开始注册,注册后再次调用schedule()方法。
Master.scala的receive方法的源码如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. ....... 3. 4. case RegisterApplication(description, driver) => 5. //待办事宜:防止某些driver重复注册 6. if (state == RecoveryState.STANDBY) { 7. //忽略,不要发送响应 8. } else { 9. logInfo("Registering app " + description.name) 10. val app = createApplication(description, driver) 11. registerApplication(app) 12. logInfo("Registered app " + description.name + " with ID " + app.id) 13. persistenceEngine.addApplication(app) 14. driver.send(RegisteredApplication(app.id, self)) 15. schedule() 16. } 17. .......
总结:从SparkContext创建taskSchedulerImpl初始化不同的实例对象来完成最终向Master注册的任务,中间包括调用scheduler的start方法和创建StandaloneAppClient来间接创建ClientEndPoint完成注册工作。
我们把SparkContext称为天堂之门,SparkContext开启天堂之门:Spark程序是通过SparkContext发布到Spark集群的;SparkContext导演天堂世界:Spark程序的运行都是在SparkContext为核心的调度器的指挥下进行的;SparkContext关闭天堂之门:SparkContext崩溃或者结束的时候整个Spark程序也结束。