5.1 Master启动原理和源码详解
本节讲解Master启动的原理和源码;Master HA双机切换;Master的注册机制和状态管理解密等内容。
5.1.1 Master启动的原理详解
Spark应用程序作为独立的集群进程运行,由主程序中的SparkContext对象(称为驱动程序)协调。Spark集群部署组件图5-1所示。
图5-1 Spark集群部署组件图
其中各个术语及相关术语的描述如下。
(1)Driver Program:运行Application的main函数并新建SparkContext实例的程序,称为驱动程序(Driver Program)。通常可以使用SparkContext代表驱动程序。
(2)Cluster Manager:集群管理器(Cluster Manager)是集群资源管理的外部服务。Spark上现在主要有Standalone、YARN、Mesos 3种集群资源管理器。Spark自带的Standalone模式能够满足绝大部分纯粹的Spark计算环境中对集群资源管理的需求,基本上只有在集群中运行多套计算框架的时候才建议考虑YARN和Mesos。
(3)Worker Node:集群中可以运行Application代码的工作节点(Worker Node),相当于Hadoop的Slave节点。
(4)Executor:在Worker Node上为Application启动的一个工作进程,在进程中负责任务(Task)的运行,并且负责将数据存放在内存或磁盘上,在Executor内部通过多线程的方式(即线程池)并发处理应用程序的具体任务。
每个Application都有各自独立的Executors,因此应用程序之间是相互隔离的。
(5)Task:任务(Task)是指被Driver送到Executor上的工作单元。通常,一个任务会处理一个Partition的数据,每个Partition一般是一个HDFS的Block块的大小。
(6)Application:是创建了SparkContext实例对象的Spark用户程序,包含了一个Driver program和集群中多个Worker上的Executor。
(7)Job:和Spark的action对应,每个action,如count、savaAsTextFile等都会对应一个Job实例,每个Job会拆分成多个Stages,一个Stage中包含一个任务集(TaskSet),任务集中的各个任务通过一定的调度机制发送到工作单位(Executor)上并行执行。
Spark Standalone集群的部署采用典型的Master/Slave架构。其中,Master节点负责整个集群的资源管理与调度,Worker节点(也可以称Slave节点)在Master节点的调度下启动Executor,负责执行具体工作(包括应用程序以及应用程序提交的任务)。
5.1.2 Master启动的源码详解
Spark中各个组件是通过脚本来启动部署的。下面以脚本为入口点开始分析Master的部署。每个组件对应提供了启动的脚本,同时也会提供停止的脚本。停止脚本比较简单,在此仅分析启动脚本。
1.Master部署的启动脚本解析
首先看一下Master的启动脚本./sbin/start-master.sh,内容如下。
1. # 在脚本的执行节点启动Master组件 2. 3. #如果没有设置环境变量SPARK_HOME,会根据脚本所在位置自动设置 4. if [ -z "${SPARK_HOME}" ]; then 5. export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" 6. fi 7. 8. #注:提取的类名必须和SparkSubmit的类相匹配。任何变化都需在类中进行反映 9. 10. # Master 组件对应的类 11. CLASS="org.apache.spark.deploy.master.Master" 12. 13. #脚本的帮助信息 14. 15. if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then 16. echo "Usage: ./sbin/start-master.sh [options]" 17. pattern="Usage:" 18. pattern+="\|Using Spark's default log4j profile:" 19. pattern+="\|Registered signal handlers for" 20. 21. # 通过脚本spark-class执行指定的Master类,参数为--help 22. "${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2 23. exit 1 24. fi 25. 26. ORIGINAL_ARGS="$@" 27. 28. #控制启动Master时,是否同时启动 Tachyon的Master组件 29. START_TACHYON=false 30. 31. while (( "$#" )); do 32. case $1 in 33. --with-tachyon) 34. if [ ! -e "${SPARK_HOME}"/tachyon/bin/tachyon ]; then 35. echo "Error: --with-tachyon specified, but tachyon not found." 36. exit -1 37. fi 38. START_TACHYON=true 39. ;; 40. esac 41. shift 42. done 43. 44. . "${SPARK_HOME}/sbin/spark-config.sh" 45. 46. . "${SPARK_HOME}/bin/load-spark-env.sh" 47. 48. #下面的一些参数对应的默认配置属性 49. if [ "$SPARK_MASTER_PORT" = "" ]; then 50. SPARK_MASTER_PORT=7077 51. fi 52. 53. //用于MasterURL,所以当没有设置时,默认使用hostname,而不是IP地址 54. //该MasterURL在Worker注册或应用程序提交时使用 55. if [ "$SPARK_MASTER_IP" = "" ]; then 56. SPARK_MASTER_IP=`hostname` 57. fi 58. 59. if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then 60. SPARK_MASTER_WEBUI_PORT=8080 61. fi 62. 63. #通过启动后台进程的脚本spark-daemon.sh来启动Master组件 64. "${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \ 65. --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_ MASTER_WEBUI_PORT \ 66. $ORIGINAL_ARGS 67. 68. #需要时同时启动Tachyon,此时Tachyon是编译在Spark内的 69. if [ "$START_TACHYON" == "true" ]; then 70. "${SPARK_HOME}"/tachyon/bin/tachyon bootstrap-conf $SPARK_MASTER_IP 71. "${SPARK_HOME}"/tachyon/bin/tachyon format -s 72. "${SPARK_HOME}"/tachyon/bin/tachyon-start.sh master 73. fi
通过脚本的简单分析,可以看出Master组件是以后台守护进程的方式启动的,对应的后台守护进程的启动脚本spark-daemon.sh,在后台守护进程的启动脚本spark-daemon.sh内部,通过脚本spark-class启动一个指定主类的JVM进程,相关代码如下所示。
1. case "$mode" in 2. #这里对应的是启动一个Spark类 3. (class) 4. nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class $command "$@" >> "$log" 2>&1 < /dev/null & 5. newpid="$!" 6. ;; 7. #这里对应提交一个Spark 应用程序 8. (submit) 9. nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-submit --class $command "$@" >> "$log" 2>&1 < /dev/null & 10. newpid="$!" 11. ;; 12. 13. (*) 14. echo "unknown mode: $mode" 15. exit 1 16. ;;
通过脚本的分析,可以知道最终执行的是Master类(对应的代码为前面的CLASS="org.apache.spark.deploy.master.Master"),对应的入口点是Master伴生对象中的main方法。下面以该方法作为入口点进一步解析Master部署框架。
部署Master组件时,最简单的方式是直接启动脚本,不带任何选项参数,命令如下所示。
1. ./sbin/start-master.sh
如需设置选项参数,可以查看帮助信息,根据自己的需要进行设置。
2.Master的源码解析
首先查看Master伴生对象中的main方法。
Master.scala的源码如下。
1. def main(argStrings: Array[String]) { 2. Utils.initDaemon(log) 3. val conf = new SparkConf 4. //构建参数解析的实例 5. val args = new MasterArguments(argStrings, conf) 6. //启动RPC通信环境以及Master的RPC通信终端 7. val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf) 8. rpcEnv.awaitTermination() 9. }
和其他类(如SparkSubmit)一样,Master类的入口点处也包含了对应的参数类MasterArguments。MasterArguments类包括Spark属性配置相关的一些解析。
MasterArguments.scala的源码如下。
1. private[master] class MasterArguments(args: Array[String], conf: SparkConf) extends Logging { 2. var host = Utils.localHostName() 3. var port = 7077 4. var webUiPort = 8080 5. var propertiesFile: String = null 6. 7. //读取启动脚本中设置的环境变量 8. if (System.getenv("SPARK_MASTER_IP") != null) { 9. logWarning("SPARK_MASTER_IP is deprecated, please use SPARK_MASTER_ HOST") 10. host = System.getenv("SPARK_MASTER_IP") 11. } 12. 13. if (System.getenv("SPARK_MASTER_HOST") != null) { 14. host = System.getenv("SPARK_MASTER_HOST") 15. } 16. if (System.getenv("SPARK_MASTER_PORT") != null) { 17. port = System.getenv("SPARK_MASTER_PORT").toInt 18. } 19. if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) { 20. webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt 21. } 22. //命令行选项参数的解析 23. parse(args.toList) 24. 25. //加载SparkConf文件,所有的访问必须经过此行 26. propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile) 27. 28. if (conf.contains("spark.master.ui.port")) { 29. webUiPort = conf.get("spark.master.ui.port").toInt 30. } 31. ......
MasterArguments中的printUsageAndExit方法对应的就是命令行中的帮助信息。
MasterArguments.scala的源码如下。
1. private def printUsageAndExit(exitCode: Int) { 2. //scalastyle:off println 3. System.err.println( 4. "Usage: Master [options]\n" + 5. "\n" + 6. "Options:\n" + 7. " -i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h) \n" + 8. " -h HOST, --host HOST Hostname to listen on\n" + 9. " -p PORT, --port PORT Port to listen on (default: 7077)\n" + 10. " --webui-port PORT Port for web UI (default: 8080)\n" + 11. " --properties-file FILE Path to a custom Spark properties file.\n" + 12. " Default is conf/spark-defaults.conf.") 13. //scalastyle:on println 14. System.exit(exitCode) 15. }
解析完Master的参数后,调用startRpcEnvAndEndpoin方法启动RPC通信环境以及Master的RPC通信终端。
Spark 2.1.1版本的Master.scala的startRpcEnvAndEndpoint的源码如下。
1. 2. /** 3. * 启动Master并返回一个三元组: 4. * (1) The Master RpcEnv 5. * (2) The web UI bound port 6. * (3) The REST server bound port, if any 7. */ 8. 9. def startRpcEnvAndEndpoint( 10. host: String, 11. port: Int, 12. webUiPort: Int, 13. conf: SparkConf): (RpcEnv, Int, Option[Int]) = { 14. val securityMgr = new SecurityManager(conf) 15. //构建RPC通信环境 16. val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr) 17. //构建RPC通信终端,实例化Master 18. val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, 19. new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) 20. 21. //向Master的通信终端发送请求,获取绑定的端口号 22. //包含Master的Web UI监听端口号和REST的监听端口号 23. 24. val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse] (BoundPortsRequest) 25. (rpcEnv, portsResponse.webUIPort, portsResponse.restPort) 26. } 27. }
Spark 2.2.0版本的Master.scala的startRpcEnvAndEndpoint的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第24行MasterEndpoint的askWithRetry方法调整为askSync方法。
1. ...... 2. val portsResponse = masterEndpoint.askSync[BoundPortsResponse] (BoundPortsRequest) 3. ......
startRpcEnvAndEndpoint方法中定义的ENDPOINT_NAME如下。
Master.scala的源码如下。
1. private[deploy] object Master extends Logging { 2. val SYSTEM_NAME = "sparkMaster" 3. val ENDPOINT_NAME = "Master" 4. .......
startRpcEnvAndEndpoint方法中通过masterEndpoint.askWithRetry[BoundPortsResponse] (BoundPortsRequest)给Master自己发送一个消息BoundPortsRequest,是一个case object。发送消息BoundPortsRequest给自己,确保masterEndpoint正常启动起来。返回消息的类型是BoundPortsResponse,是一个case class。
MasterMessages.scala的源码如下。
1. private[master] object MasterMessages { 2. ...... 3. case object BoundPortsRequest 4. case class BoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int, restPort: Option[Int]) 5. }
Master收到消息BoundPortsRequest,发送返回消息BoundPortsResponse。
Master.scala的源码如下。
1. override def receiveAndReply(context: RpcCallContext): PartialFunction [Any, Unit] = { 2. ...... 3. case BoundPortsRequest => 4. context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))
在BoundPortsResponse传入的参数restServerBoundPort是在Master的onStart方法中定义的。
Master.scala的源码如下。
1. ...... 2. private var restServerBoundPort: Option[Int] = None 3. 4. override def onStart(): Unit = { 5. ...... 6. restServerBoundPort = restServer.map(_.start()) 7. .......
而restServerBoundPort是通过restServer进行map操作启动赋值。下面看一下restServer。
Master.scala的源码如下。
1. private var restServer: Option[StandaloneRestServer] = None 2. ...... 3. if (restServerEnabled) { 4. val port = conf.getInt("spark.master.rest.port", 6066) 5. restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl)) 6. } 7. ......
其中调用new()函数创建一个StandaloneRestServer。StandaloneRestServer服务器响应请求提交的[RestSubmissionClient],将被嵌入到standalone Master中,仅用于集群模式。服务器根据不同的情况使用不同的HTTP代码进行响应。
200 OK-请求已成功处理。
400错误请求:请求格式错误,未成功验证,或意外类型。
468未知协议版本:请求指定了此服务器不支持的协议。
500内部服务器错误:服务器在处理请求时引发内部异常。
服务器在HTTP主体中总包含一个JSON表示的[SubmitRestProtocolResponse]。如果发生错误,服务器将包括一个[ErrorResponse]。如果构造了这个错误响应内部失败时,响应将由一个空体组成。响应体指示内部服务器错误。
StandaloneRestServer.scala的源码如下。
1. private[deploy] class StandaloneRestServer( 2. host: String, 3. requestedPort: Int, 4. masterConf: SparkConf, 5. masterEndpoint: RpcEndpointRef, 6. masterUrl: String) 7. extends RestSubmissionServer(host, requestedPort, masterConf) {
下面看一下RestSubmissionClient客户端。客户端提交申请[RestSubmissionServer]。在协议版本V1中,REST URL以表单形式出现http://[host:port]/v1/submissions/[action],[action]可以是create、kill或状态中的其中一种。每种请求类型都表示为发送到以下前缀的HTTP消息:
(1)submit - POST to /submissions/create
(2)kill - POST /submissions/kill/[submissionId]
(3)status - GET /submissions/status/[submissionId]
在(1)情况下,参数以JSON字段的形式发布到HTTP主体中。否则,URL指定按客户端的预期操作。由于该协议预计将在Spark版本中保持稳定,因此现有字段不能添加或删除,但可以添加新的可选字段。如在少见的事件中向前或向后兼容性被破坏,Spark须引入一个新的协议版本(如V2)。客户机和服务器必须使用协议的同一版本进行通信。如果不匹配,服务器将用它支持的最高协议版本进行响应。此客户机的实现可以用指定的版本使用该信息重试。
RestSubmissionClient.scala的源码如下。
1. private[spark] class RestSubmissionClient(master: String) extends Logging { 2. import RestSubmissionClient._ 3. private val supportedMasterPrefixes = Seq("spark://", "mesos://") 4. ......
Restful把一切都看成是资源。利用Restful API可以对Spark进行监控。程序运行的每一个步骤、Task的计算步骤都可以可视化,对Spark的运行进行详细监控。
回到startRpcEnvAndEndpoint方法中,新创建了一个Master实例。Master实例化时会对所有的成员进行初始化,如默认的Cores个数等。
Master.scala的源码如下。
1. private[deploy] class Master( 2. override val rpcEnv: RpcEnv, 3. address: RpcAddress, 4. webUiPort: Int, 5. val securityMgr: SecurityManager, 6. val conf: SparkConf) 7. extends ThreadSafeRpcEndpoint with Logging with LeaderElectable { 8. ....... 9. //缺省maxCores时默认应用程序没有指定(通过Int.MaxValue) 10. private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue) 11. val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false) 12. if (defaultCores < 1) { 13. throw new SparkException("spark.deploy.defaultCores must be positive") 14. } 15. ...... 16.
Master继承了ThreadSafeRpcEndpoint和LeaderElectable,其中继承LeaderElectable涉及Master的高可用性(High Availability,HA)机制。这里先关注ThreadSafeRpcEndpoint,继承该类后,Master作为一个RpcEndpoint,实例化后首先会调用onStart方法。
Master.scala的源码如下。
1. override def onStart(): Unit = { 2. logInfo("Starting Spark master at " + masterUrl) 3. logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") 4. //构建一个Master的Web UI,查看向Master提交的应用程序等信息 5. webUi = new MasterWebUI(this, webUiPort) 6. webUi.bind() 7. masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort 8. if (reverseProxy) { 9. masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl) 10. logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " + 11. s"Applications UIs are available at $masterWebUiUrl") 12. } 13. //在一个守护线程中,启动调度机制,周期性检查Worker是否超时,当Worker节点超时后, //会修改其状态或从Master中移除其相关的操作 14. 15. checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate (new Runnable { 16. override def run(): Unit = Utils.tryLogNonFatalError { 17. self.send(CheckForWorkerTimeOut) 18. } 19. }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS) 20. 21. //默认情况下会启动Rest服务,可以通过该服务向Master提交各种请求 22. if (restServerEnabled) { 23. val port = conf.getInt("spark.master.rest.port", 6066) 24. restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl)) 25. } 26. restServerBoundPort = restServer.map(_.start()) 27. //度量(Metroics)相关的操作,用于监控 28. masterMetricsSystem.registerSource(masterSource) 29. masterMetricsSystem.start() 30. applicationMetricsSystem.start() 31. //度量系统启动后,将主程序和应用程序度量handler处理程序附加到Web UI中 32. masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) 33. applicationMetricsSystem.getServletHandlers.foreach(webUi. attachHandler) 34. //Master HA相关的操作 35. val serializer = new JavaSerializer(conf) 36. val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match { 37. case "ZOOKEEPER" => 38. logInfo("Persisting recovery state to ZooKeeper") 39. val zkFactory = 40. new ZooKeeperRecoveryModeFactory(conf, serializer) 41. (zkFactory.createPersistenceEngine(), zkFactory. createLeaderElectionAgent(this)) 42. case "FILESYSTEM" => 43. val fsFactory = 44. new FileSystemRecoveryModeFactory(conf, serializer) 45. (fsFactory.createPersistenceEngine(), fsFactory.createLeader- ElectionAgent(this)) 46. case "CUSTOM" => 47. val clazz = Utils.classForName(conf.get("spark.deploy. recoveryMode.factory")) 48. val factory = clazz.getConstructor(classOf[SparkConf], classOf [Serializer]) 49. .newInstance(conf, serializer) 50. .asInstanceOf[StandaloneRecoveryModeFactory] 51. (factory.createPersistenceEngine(), factory.createLeaderElectionAgent (this)) 52. case _ => 53. (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this)) 54. } 55. persistenceEngine = persistenceEngine_ 56. leaderElectionAgent = leaderElectionAgent_ 57. }
其中在Master的onStart方法中用new()函数创建MasterWebUI,启动一个webServer。
Master.scala的源码如下。
1. override def onStart(): Unit = { 2. ...... 3. webUi = new MasterWebUI(this, webUiPort) 4. webUi.bind() 5. masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi. boundPort
如MasterWebUI的spark.ui.killEnabled设置为True,可以通过WebUI页面把Spark的进程Kill掉。
MasterWebUI.scala的源码如下。
1. private[master] 2. class MasterWebUI( 3. val master: Master, 4. requestedPort: Int) 5. extends WebUI(master.securityMgr, master.securityMgr.getSSLOptions ("standalone"), 6. requestedPort, master.conf, name = "MasterUI") with Logging { 7. val masterEndpointRef = master.self 8. val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true) 9. ....... 10. initialize() 11. 12. /**初始化所有的服务器组件 */ 13. def initialize() { 14. val masterPage = new MasterPage(this) 15. attachPage(new ApplicationPage(this)) 16. attachPage(masterPage) 17. attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static")) 18. attachHandler(createRedirectHandler( 19. "/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST"))) 20. attachHandler(createRedirectHandler( 21. "/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST"))) 22. }
MasterWebUI中在初始化时用new()函数创建MasterPage,在MasterPage中通过代码去写Web页面。
MasterPage.scala的源码如下。
1. private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { 2. ...... 3. override def renderJson(request: HttpServletRequest): JValue = { 4. JsonProtocol.writeMasterState(getMasterState) 5. } 6. ..... 7. val content = 8. <div class="row-fluid"> 9. <div class="span12"> 10. <ul class="unstyled"> 11. <li><strong>URL:</strong> {state.uri}</li> 12. { 13. state.restUri.map { uri => 14. <li> 15. <strong>REST URL:</strong> {uri} 16. <span class="rest-uri"> (cluster mode)</span> 17. </li> 18. }.getOrElse { Seq.empty } 19. } 20. <li><strong>Alive Workers:</strong> {aliveWorkers.length}</li> 21. <li><strong>Cores in use:</strong> {aliveWorkers.map (_.cores).sum} Total, 22. {aliveWorkers.map(_.coresUsed).sum} Used</li> 23. <li><strong>Memory in use:</strong> 24. {Utils.megabytesToString(aliveWorkers.map(_.memory).sum)} Total, 25. {Utils.megabytesToString(aliveWorkers.map(_.memoryUsed) .sum)} Used</li> 26. <li><strong>Applications:</strong> 27. {state.activeApps.length} <a href="#running-app">Running</a>, 28. {state.completedApps.length} <a href="#completed-app"> Completed</a> </li> 29. <li><strong>Drivers:</strong> 30. {state.activeDrivers.length} Running, 31. {state.completedDrivers.length} Completed </li> 32. <li><strong>Status:</strong> {state.status}</li> 33. </ul> 34. </div> 35. </div> 36. ........
回到MasterWebUI.scala的initialize()方法,其中调用了attachPage方法,在WebUI中增加Web页面。
WebUI.scala的源码如下。
1. def attachPage(page: WebUIPage) { 2. val pagePath = "/" + page.prefix 3. val renderHandler = createServletHandler(pagePath, 4. (request: HttpServletRequest) => page.render(request), securityManager, conf, basePath) 5. val renderJsonHandler = createServletHandler(pagePath.stripSuffix ("/") + "/json", 6. (request: HttpServletRequest) => page.renderJson(request), securityManager, conf, basePath) 7. attachHandler(renderHandler) 8. attachHandler(renderJsonHandler) 9. val handlers = pageToHandlers.getOrElseUpdate(page, ArrayBuffer [ServletContextHandler]()) 10. handlers += renderHandler 11. }
在WebUI的bind方法中启用了JettyServer。 WebUI.scala的bind的源码如下。
1. def bind() { 2. assert(!serverInfo.isDefined, s"Attempted to bind $className more than once!") 3. try { 4. val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse ("0.0.0.0") 5. serverInfo = Some(startJettyServer(host, port, sslOptions, handlers, conf, name)) 6. logInfo(s"Bound $className to $host, and started at $webUrl") 7. } catch { 8. case e: Exception => 9. logError(s"Failed to bind $className", e) 10. System.exit(1) 11. } 12. }
JettyUtils.scala的startJettyServer尝试将Jetty服务器绑定到所提供的主机名、端口。
startJettyServer的源码如下。
1. def startJettyServer( 2. hostName: String, 3. port: Int, 4. sslOptions: SSLOptions, 5. handlers: Seq[ServletContextHandler], 6. conf: SparkConf, 7. serverName: String = ""): ServerInfo = {
5.1.3 Master HA双机切换
Spark生产环境下一般采用ZooKeeper作HA,且建议为3台Master,ZooKeeper会自动化管理Masters的切换。
采用ZooKeeper作HA时,ZooKeeper会保存整个Spark集群运行时的元数据,包括Workers、Drivers、Applications、Executors。
ZooKeeper遇到当前Active级别的Master出现故障时会从Standby Masters中选取出一台作为Active Master,但是要注意,被选举后到成为真正的Active Master之前需要从ZooKeeper中获取集群当前运行状态的元数据信息并进行恢复。
在Master切换的过程中,所有已经在运行的程序皆正常运行。因为Spark Application在运行前就已经通过Cluster Manager获得了计算资源,所以在运行时,Job本身的调度和处理和Master是没有任何关系的。
在Master的切换过程中唯一的影响是不能提交新的Job:一方面不能够提交新的应用程序给集群,因为只有Active Master才能接收新的程序提交请求;另一方面,已经运行的程序中也不能因为Action操作触发新的Job提交请求。
ZooKeeper下Master HA的基本流程如图5-2所示。
ZooKeeper下Master HA的基本流程如下。
(1)使用ZooKeeperPersistenceEngine读取集群的状态数据,包括Drivers、Applications、Workers、Executors等信息。
图5-2 ZooKeeper下Master HA的基本流程
(2)判断元数据信息是否有空的内容。
(3)把通过ZooKeeper持久化引擎获得了Drivers、Applications、Workers、Executors等信息,重新注册到Master的内存中缓存起来。
(4)验证获得的信息和当前正在运行的集群状态的一致性。
(5)将Application和Workers的状态标识为Unknown,然后向Application中的Driver以及Workers发送现在是Leader的Standby模式的Master的地址信息。
(6)当Driver和Workers收到新的Master的地址信息后会响应该信息。
(7)Master接收到来自Drivers和Workers响应的信息后会使用一个关键的方法:completeRecovery()来对没有响应的Applications (Drivers)、Workers (Executors)进行处理。处理完毕后,Master的State会变成RecoveryState.ALIVE,从而开始对外提供服务。
(8)此时Master使用自己的Schedule方法对正在等待的Application和Drivers进行资源调度。
Master HA的4大方式分别是ZOOKEEPER、FILESYSTEM、CUSTOM、NONE。
需要说明的是:
(a)ZOOKEEPER是自动管理Master。
(b)FILESYSTEM的方式在Master出现故障后需要手动重新启动机器,机器启动后会立即成为Active级别的Master来对外提供服务(接收应用程序提交的请求、接收新的Job运行的请求)。
(c)CUSTOM的方式允许用户自定义Master HA的实现,这对高级用户特别有用。
(d)NONE,这是默认情况,Spark集群中就采用这种方式,该方式不会持久化集群的数据,Master启动后立即管理集群。
Master.scala的HA的源码如下。
1. override def onStart(): Unit = { 2. ...... 3. val serializer = new JavaSerializer(conf) 4. val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match { 5. case "ZOOKEEPER" => 6. logInfo("Persisting recovery state to ZooKeeper") 7. val zkFactory = 8. new ZooKeeperRecoveryModeFactory(conf, serializer) 9. (zkFactory.createPersistenceEngine(), zkFactory.createLeader- ElectionAgent(this)) 10. case "FILESYSTEM" => 11. val fsFactory = 12. new FileSystemRecoveryModeFactory(conf, serializer) 13. (fsFactory.createPersistenceEngine(), fsFactory. createLeaderElectionAgent(this)) 14. case "CUSTOM" => 15. val clazz = Utils.classForName(conf.get("spark.deploy. recoveryMode.factory")) 16. val factory = clazz.getConstructor(classOf[SparkConf], classOf [Serializer]) 17. .newInstance(conf, serializer) 18. .asInstanceOf[StandaloneRecoveryModeFactory] 19. (factory.createPersistenceEngine(), factory.createLeaderElectionAgent (this)) 20. case _ => 21. (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this)) 22. } 23. persistenceEngine = persistenceEngine_ 24. leaderElectionAgent = leaderElectionAgent_ 25. } 26. ......
Spark默认的HA方式是NONE。
1. private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
如使用ZOOKEEPER的HA方式,ZooKeeperRecoveryModeFactory.scala的源码如下。
1. private[master] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serializer) 2. extends StandaloneRecoveryModeFactory(conf, serializer) { 3. 4. def createPersistenceEngine(): PersistenceEngine = { 5. new ZooKeeperPersistenceEngine(conf, serializer) 6. } 7. 8. def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = { 9. new ZooKeeperLeaderElectionAgent(master, conf) 10. } 11. }
通过调用zkFactory.createPersistenceEngine()用new()函数创建一个ZooKeeper-PersistenceEngine。
ZooKeeperPersistenceEngine.scala的源码如下。
1. private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer: Serializer) 2. extends PersistenceEngine 3. with Logging { 4. 5. private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status" 6. private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf) 7. 8. SparkCuratorUtil.mkdir(zk, WORKING_DIR) 9. 10. 11. override def persist(name: String, obj: Object): Unit = { 12. serializeIntoFile(WORKING_DIR + "/" + name, obj) 13. } 14. 15. override def unpersist(name: String): Unit = { 16. zk.delete().forPath(WORKING_DIR + "/" + name) 17. } 18. 19. override def read[T: ClassTag](prefix: String): Seq[T] = { 20. zk.getChildren.forPath(WORKING_DIR).asScala 21. .filter(_.startsWith(prefix)).flatMap(deserializeFromFile[T]) 22. } 23. 24. override def close() { 25. zk.close() 26. } 27. 28. private def serializeIntoFile(path: String, value: AnyRef) { 29. val serialized = serializer.newInstance().serialize(value) 30. val bytes = new Array[Byte](serialized.remaining()) 31. serialized.get(bytes) 32. zk.create().withMode(CreateMode.PERSISTENT).forPath(path, bytes) 33. } 34. 35. private def deserializeFromFile[T](filename: String)(implicit m: ClassTag[T]): Option[T] = { 36. val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename) 37. try { 38. Some(serializer.newInstance().deserialize[T](ByteBuffer.wrap (fileData))) 39. } catch { 40. case e: Exception => 41. logWarning("Exception while reading persisted file, deleting", e) 42. zk.delete().forPath(WORKING_DIR + "/" + filename) 43. None 44. } 45. } 46. }
PersistenceEngine中有至关重要的方法persist来实现数据持久化,readPersistedData来恢复集群中的元数据。
PersistenceEngine.scala的源码如下。
1. def persist(name: String, obj: Object): Unit 2. ...... 3. final def readPersistedData( 4. rpcEnv: RpcEnv): (Seq[ApplicationInfo], Seq[DriverInfo], Seq [WorkerInfo]) = { 5. rpcEnv.deserialize { () => 6. (read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read [WorkerInfo]("worker_")) 7. } 8. }
下面来看createdLeaderElectionAgent方法。在createdLeaderElectionAgent方法中调用new()函数创建ZooKeeperLeaderElectionAgent实例。
StandaloneRecoveryModeFactory.scala的源码如下。
1. def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = { 2. new ZooKeeperLeaderElectionAgent(master, conf) 3. } 4. }
ZooKeeperLeaderElectionAgent的源码如下。
1. private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable, 2. conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging { 3. 4. val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election" 5. 6. private var zk: CuratorFramework = _ 7. private var leaderLatch: LeaderLatch = _ 8. private var status = LeadershipStatus.NOT_LEADER 9. 10. start() 11. 12. private def start() { 13. logInfo("Starting ZooKeeper LeaderElection agent") 14. zk = SparkCuratorUtil.newClient(conf) 15. leaderLatch = new LeaderLatch(zk, WORKING_DIR) 16. leaderLatch.addListener(this) 17. leaderLatch.start() 18. } 19. 20. override def stop() { 21. leaderLatch.close() 22. zk.close() 23. } 24. 25. override def isLeader() { 26. synchronized { 27. //可以取得领导权 28. if (!leaderLatch.hasLeadership) { 29. return 30. } 31. 32. logInfo("We have gained leadership") 33. updateLeadershipStatus(true) 34. } 35. } 36. 37. override def notLeader() { 38. synchronized { 39. //可以取得领导权 40. if (leaderLatch.hasLeadership) { 41. return 42. } 43. 44. logInfo("We have lost leadership") 45. updateLeadershipStatus(false) 46. } 47. } 48. 49. private def updateLeadershipStatus(isLeader: Boolean) { 50. if (isLeader && status == LeadershipStatus.NOT_LEADER) { 51. status = LeadershipStatus.LEADER 52. masterInstance.electedLeader() 53. } else if (!isLeader && status == LeadershipStatus.LEADER) { 54. status = LeadershipStatus.NOT_LEADER 55. masterInstance.revokedLeadership() 56. } 57. } 58. 59. private object LeadershipStatus extends Enumeration { 60. type LeadershipStatus = Value 61. val LEADER, NOT_LEADER = Value 62. } 63. }
FILESYSTEM和NONE的方式采用MonarchyLeaderAgent的方式来完成Leader的选举,其实现是直接把传入的Master作为Leader。
LeaderElectionAgent.scala的源码如下。
1. private[spark] class MonarchyLeaderAgent(val masterInstance: LeaderElectable) 2. extends LeaderElectionAgent { 3. masterInstance.electedLeader() 4. }
FileSystemRecoveryModeFactory.scala的源码如下。
1. private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: Serializer) 2. extends StandaloneRecoveryModeFactory(conf, serializer) with Logging { 3. 4. val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "") 5. 6. def createPersistenceEngine(): PersistenceEngine = { 7. logInfo("Persisting recovery state to directory: " + RECOVERY_DIR) 8. new FileSystemPersistenceEngine(RECOVERY_DIR, serializer) 9. } 10. 11. def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = { 12. new MonarchyLeaderAgent(master) 13. } 14. }
如果WorkerState状态为UNKNOWN(Worker不响应),就把它删除,如果以集群方式运行,driver失败后可以重新启动,最后把状态变回ALIVE,注意,这里要加入--supervise这个参数。
Master.scala的源码如下。
1. private def completeRecovery() { 2. //使用短同步周期确保“only-once”恢复一次语义 3. if (state != RecoveryState.RECOVERING) { return } 4. state = RecoveryState.COMPLETING_RECOVERY 5. 6. //杀掉不响应消息的workers 和apps 7. workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker) 8. apps.filter(_.state == ApplicationState.UNKNOWN).foreach (finishApplication) 9. 10. //重新调度drivers,其未被任何workers声明 11. drivers.filter(_.worker.isEmpty).foreach { d => 12. logWarning(s"Driver ${d.id} was not found after master recovery") 13. if (d.desc.supervise) { 14. logWarning(s"Re-launching ${d.id}") 15. relaunchDriver(d) 16. } else { 17. removeDriver(d.id, DriverState.ERROR, None) 18. logWarning(s"Did not re-launch ${d.id} because it was not supervised") 19. } 20. }
5.1.4 Master的注册机制和状态管理解密
1.Master对其他组件注册的处理
Master接收注册的对象主要是Driver、Application、Worker; Executor不会注册给Master,Executor是注册给Driver中的SchedulerBackend的。
Worker是在启动后主动向Master注册的,所以如果在生产环境下加入新的Worker到正在运行的Spark集群上,此时不需要重新启动Spark集群就能够使用新加入的Worker,以提升处理能力。假如在生产环境中的集群中有500台机器,可能又新加入100台机器,这时不需要重新启动整个集群,就可以将100台新机器加入到集群。
Worker的源码如下。
1. private[deploy] class Worker( 2. override val rpcEnv: RpcEnv, 3. webUiPort: Int, 4. cores: Int, 5. memory: Int, 6. masterRpcAddresses: Array[RpcAddress], 7. endpointName: String, 8. workDirPath: String = null, 9. val conf: SparkConf, 10. val securityMgr: SecurityManager) 11. extends ThreadSafeRpcEndpoint with Logging {
Worker是一个消息循环体,继承自ThreadSafeRpcEndpoint,可以收消息,也可以发消息。Worker的onStart方法如下。
1. override def onStart() { 2. ...... 3. workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}" 4. registerWithMaster() 5. ...... 6. }
Worker的onStart方法中调用了registerWithMaster()。
1. private def registerWithMaster() { 2. ....... 3. registrationRetryTimer match { 4. case None => 5. registered = false 6. registerMasterFutures = tryRegisterAllMasters() 7. ......
registerWithMaster方法中调用了tryRegisterAllMasters,向所有的Master进行注册。
Spark 2.1.1版本的Worker.scala的源码如下。
1. private def tryRegisterAllMasters(): Array[JFuture[_]] = { 2. masterRpcAddresses.map { masterAddress => 3. registerMasterThreadPool.submit(new Runnable { 4. override def run(): Unit = { 5. try { 6. logInfo("Connecting to master " + masterAddress + "...") 7. val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) 8. registerWithMaster(masterEndpoint) 9. } catch { 10. case ie: InterruptedException => //Cancelled 11. case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) 12. } 13. } 14. }) 15. } 16. }
Spark 2.2.0版本的Worker.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第8行registerWithMaster(masterEndpoint)方法调整为sendRegisterMessageToMaster(masterEndpoint)。
1. ...... 2. sendRegisterMessageToMaster(masterEndpoint) 3. ......
tryRegisterAllMasters方法中,由于实际运行时有很多Master,因此使用线程池的线程进行提交,然后获取masterEndpoint。masterEndpoint是一个RpcEndpointRef,通过registerWithMaster (masterEndpoint)进行注册。
Spark 2.1.1版本的Worker.scala的registerWithMaster的源码如下。
1. private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = { 2. masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker( 3. workerId, host, port, self, cores, memory, workerWebUiUrl)) 4. .onComplete { 5. //这是一个非常快的动作,所以可以用"ThreadUtils.sameThread" 6. case Success(msg) => 7. Utils.tryLogNonFatalError { 8. handleRegisterResponse(msg) 9. } 10. case Failure(e) => 11. logError(s"Cannot register with master: ${masterEndpoint .address}", e) 12. System.exit(1) 13. }(ThreadUtils.sameThread) 14. }
Spark 2.1.1版本的registerWithMaster(masterEndpoint)方法调整为Spark 2.2.0版本的sendRegisterMessageToMaster(masterEndpoint)。sendRegisterMessageToMaster方法仅将RegisterWorker消息发送给Master消息循环体。sendRegisterMessageToMaster方法内部不作其他处理。
1. private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = { 2. masterEndpoint.send(RegisterWorker( 3. workerId, 4. host, 5. port, 6. self, 7. cores, 8. memory, 9. workerWebUiUrl, 10. masterEndpoint.address)) 11. }
registerWithMaster方法中的masterEndpoint.ask[RegisterWorkerResponse]传进去的是RegisterWorker。RegisterWorker是一个case class,包括id、host、port、worker、cores、memory等信息,这里Worker是自己的引用RpcEndpointRef,Master通过Ref通worker通信。
Spark 2.1.1版本的RegisterWorker.scala的源码如下。
1. case class RegisterWorker( 2. id: String, 3. host: String, 4. port: Int, 5. worker: RpcEndpointRef, 6. cores: Int, 7. memory: Int, 8. workerWebUiUrl: String) 9. extends DeployMessage { 10. Utils.checkHost(host, "Required hostname") 11. assert (port > 0) 12. }
Spark 2.2.0版本的RegisterWorker.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第8行之后新增加masterAddress的成员变量:Master的地址,用于Worker节点连接Master节点。
1. ...... 2. masterAddress: RpcAddress) 3. ......
Worker通过registerWithMaster向Master发送了RegisterWorker消息,Master收到RegisterWorker请求后,进行相应的处理。
Spark 2.1.1版本的Master.scala的receiveAndReply的源码如下。
1. override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { 2. case RegisterWorker( 3. id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => 4. logInfo("Registering worker %s:%d with %d cores, %s RAM".format( 5. workerHost, workerPort, cores, Utils.megabytesToString(memory))) 6. if (state == RecoveryState.STANDBY) { 7. context.reply(MasterInStandby) 8. } else if (idToWorker.contains(id)) { 9. context.reply(RegisterWorkerFailed("Duplicate worker ID")) 10. } else { 11. val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, 12. workerRef, workerWebUiUrl) 13. if (registerWorker(worker)) { 14. persistenceEngine.addWorker(worker) 15. context.reply(RegisteredWorker(self, masterWebUiUrl)) 16. schedule() 17. } else { 18. val workerAddress = worker.endpoint.address 19. logWarning("Worker registration failed. Attempted to re-register worker at same " + 20. "address: " + workerAddress) 21. context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: " 22. + workerAddress)) 23. } 24. }
Spark 2.2.0版本的Master.scala的receiveAndReply的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第1行,RegisterWorker消息的模式匹配从receiveAndReply方法中调整到receive方法。因为Worker向Master提交RegisterWorker消息,无须同步等待Master的答复。
上段代码中第3行RegisterWorker的传入参数新增加了masterAddress。
上段代码中第9行context.reply方法调整为workerRef.send方法。
上段代码中第15行context.reply方法调整为workerRef.send方法,以及RegisteredWorker中新增了一个参数masterAddress。
1. override def receive: PartialFunction[Any, Unit] = { 2. ...... 3. case RegisterWorker( 4. id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) 5. ...... 6. workerRef.send(RegisterWorkerFailed("Duplicate worker ID")) 7. ...... 8. workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress)) 9. ......
RegisterWorker中,Master接收到Worker的注册请求后,首先判断当前的Master是否是Standby的模式,如果是,就不处理;Master的idToWorker包含了所有已经注册的Worker的信息,然后会判断当前Master的内存数据结构idToWorker中是否已经有该Worker的注册,如果有,此时不会重复注册;其中idToWorker是一个HashMap,Key是String代表Worker的字符描述,Value是WorkerInfo。
1. private val idToWorker = new HashMap[String, WorkerInfo]
WorkerInfo包括id、host、port 、cores、memory、endpoint等内容。
1. private[spark] class WorkerInfo( 2. val id: String, 3. val host: String, 4. val port: Int, 5. val cores: Int, 6. val memory: Int, 7. val endpoint: RpcEndpointRef, 8. val webUiAddress: String) 9. extends Serializable {
Master如果决定接收注册的Worker,首先会创建WorkerInfo对象来保存注册的Worker的信息,然后调用registerWorker执行具体的注册的过程,如果Worker的状态是DEAD的状态,则直接过滤掉。对于UNKNOWN的内容,调用removeWorker进行清理(包括清理该Worker下的Executors和Drivers)。其中,UNKNOWN的情况:Master进行切换时,先对Worker发UNKNOWN消息,只有当Master收到Worker正确的回复消息,才将状态标识为正常。
registerWorker的源码如下。
1. private def registerWorker(worker: WorkerInfo): Boolean = { 2. //在同一节点上可能有一个或多个指向挂掉的workers节点的引用(不同ID),须删除它们 3. workers.filter { w => 4. (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD) 5. }.foreach { w => 6. workers -= w 7. } 8. 9. val workerAddress = worker.endpoint.address 10. if (addressToWorker.contains(workerAddress)) { 11. val oldWorker = addressToWorker(workerAddress) 12. if (oldWorker.state == WorkerState.UNKNOWN) { 13. //未知状态的worker 意味着在恢复过程中worker 被重新启动。旧的worker节点 //挂掉,须删掉旧节点,接收新worker节点 14. removeWorker(oldWorker) 15. } else { 16. logInfo("Attempted to re-register worker at same address: " + workerAddress) 17. return false 18. } 19. } 20. 21. workers += worker 22. idToWorker(worker.id) = worker 23. addressToWorker(workerAddress) = worker 24. if (reverseProxy) { 25. webUi.addProxyTargets(worker.id, worker.webUiAddress) 26. } 27. true 28. }
在registerWorker方法中,Worker注册完成后,把注册的Worker加入到Master的内存数据结构中。
1. val workers = new HashSet[WorkerInfo] 2. private val idToWorker = new HashMap[String, WorkerInfo] 3. private val addressToWorker = new HashMap[RpcAddress, WorkerInfo] 4. ...... 5. 6. workers += worker 7. idToWorker(worker.id) = worker 8. addressToWorker(workerAddress) = worker
回到Master.scala的receiveAndReply方法,Worker注册完成后,调用persistenceEngine.addWorker (worker),PersistenceEngine是持久化引擎,在Zookeeper下就是Zookeeper的持久化引擎,把注册的数据进行持久化。
PersistenceEngine.scala的addWorker方法如下。
1. final def addWorker(worker: WorkerInfo): Unit = { 2. persist("worker_" + worker.id, worker) 3. }
ZooKeeperPersistenceEngine是PersistenceEngine的一个具体实现子类,其persist方法如下。
1. private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer: Serializer) 2. extends PersistenceEngine 3. ...... 4. override def persist(name: String, obj: Object): Unit = { 5. serializeIntoFile(WORKING_DIR + "/" + name, obj) 6. } 7. ...... 8. private def serializeIntoFile(path: String, value: AnyRef) { 9. val serialized = serializer.newInstance().serialize(value) 10. val bytes = new Array[Byte](serialized.remaining()) 11. serialized.get(bytes) 12. zk.create().withMode(CreateMode.PERSISTENT).forPath(path, bytes) 13. }
回到Master.scala的receiveAndReply方法,注册的Worker数据持久化后,进行schedule()。至此,Worker的注册完成。
同样,Driver的注册过程:Driver提交给Master进行注册,Master会将Driver的信息放入内存缓存中,加入等待调度的队列,通过持久化引擎(如ZooKeeper)把注册信息持久化,然后进行Schedule。
Application的注册过程:Application提交给Master进行注册,Driver启动后会执行SparkContext的初始化,进而导致StandaloneSchedulerBackend的产生,其内部有StandaloneAppClient。StandaloneAppClient内部有ClientEndpoint。ClientEndpoint来发送RegisterApplication信息给Master。Master会将Application的信息放入内存缓存中,把Application加入等待调度的Application队列,通过持久化引擎(如ZooKeeper)把注册信息持久化,然后进行Schedule。
2.Master对Driver和Executor状态变化的处理
1)对Driver状态变化的处理
如果Driver的各个状态是DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED,就将其清理掉。其他情况则报异常。
1. override def receive: PartialFunction[Any, Unit] = { 2. ...... 3. case DriverStateChanged(driverId, state, exception) => 4. state match { 5. case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED => 6. removeDriver(driverId, state, exception) 7. case _ => 8. throw new Exception(s"Received unexpected state update for driver $driverId: $state") 9. }
removeDriver清理掉Driver后,再次调用schedule方法,removeDriver的源码如下。
1. private def removeDriver( 2. driverId: String, 3. finalState: DriverState, 4. exception: Option[Exception]) { 5. drivers.find(d => d.id == driverId) match { 6. case Some(driver) => 7. logInfo(s"Removing driver: $driverId") 8. drivers -= driver 9. if (completedDrivers.size >= RETAINED_DRIVERS) { 10. val toRemove = math.max(RETAINED_DRIVERS / 10, 1) 11. completedDrivers.trimStart(toRemove) 12. } 13. completedDrivers += driver 14. persistenceEngine.removeDriver(driver) 15. driver.state = finalState 16. driver.exception = exception 17. driver.worker.foreach(w => w.removeDriver(driver)) 18. schedule() 19. case None => 20. logWarning(s"Asked to remove unknown driver: $driverId") 21. } 22. } 23. }
2)对Executor状态变化的处理
ExecutorStateChanged的源码如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. ...... 3. case ExecutorStateChanged(appId, execId, state, message, exitStatus) => 4. val execOption = idToApp.get(appId).flatMap(app => app.executors. get(execId)) 5. execOption match { 6. case Some(exec) => 7. val appInfo = idToApp(appId) 8. val oldState = exec.state 9. exec.state = state 10. 11. if (state == ExecutorState.RUNNING) { 12. assert(oldState == ExecutorState.LAUNCHING, 13. s"executor $execId state transfer from $oldState to RUNNING is illegal") 14. appInfo.resetRetryCount() 15. } 16. 17. exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false)) 18. 19. if (ExecutorState.isFinished(state)) { 20. //从worker 和app中删掉executor 21. logInfo(s"Removing executor ${exec.fullId} because it is $state") 22. //如果应用程序已经完成,保存其状态及在UI上正确显示其信息 23. if (!appInfo.isFinished) { 24. appInfo.removeExecutor(exec) 25. } 26. exec.worker.removeExecutor(exec) 27. 28. val normalExit = exitStatus == Some(0) 29. //只重试一定次数,这样就不会进入无限循环。重要提示:这个代码路径不是通过 //测试执行的,所以改变if条件时必须小心 30. if (!normalExit 31. && appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES 32. && MAX_EXECUTOR_RETRIES >= 0) { //< 0 disables this application-killing path 33. val execs = appInfo.executors.values 34. if (!execs.exists(_.state == ExecutorState.RUNNING)) { 35. logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + 36. s"${appInfo.retryCount} times; removing it") 37. removeApplication(appInfo, ApplicationState.FAILED) 38. } 39. } 40. } 41. schedule() 42. case None => 43. logWarning(s"Got status update for unknown executor $appId/ $execId") 44. }
Executor挂掉时系统会尝试一定次数的重启(最多重启10次)。
1. private val MAX_EXECUTOR_RETRIES = conf.getInt("spark.deploy. maxExecutorRetries", 10)