6.3 从Application提交的角度重新审视Driver
本节从Application提交的角度重新审视Driver,彻底解密Driver到底是什么时候产生的,以及Driver和Master交互原理、Driver和Master交互源码。
6.3.1 Driver到底是什么时候产生的
在SparkContext实例化时,通过createTaskScheduler来创建TaskSchedulerImpl和StandaloneSchedulerBackend。
SparkContext.scala的源码如下。
1. class SparkContext(config: SparkConf) extends Logging { 2. ........ 3. val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) 4. _schedulerBackend = sched 5. _taskScheduler = ts 6. 7. _dagScheduler = new DAGScheduler(this) 8. _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) 9. ...... 10. private def createTaskScheduler( 11. ...... 12. case SPARK_REGEX(sparkUrl) => 13. val scheduler = new TaskSchedulerImpl(sc) 14. val masterUrls = sparkUrl.split(",").map("spark://" + _) 15. val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) 16. scheduler.initialize(backend) 17. (backend, scheduler) 18. ......
在createTaskScheduler中调用scheduler.initialize(backend),initialize的方法参数把StandaloneSchedulerBackend传进来。
TaskSchedulerImpl的initialize的源码如下。
1. def initialize(backend: SchedulerBackend) { 2. this.backend = backend 3. ......
initialize的方法把StandaloneSchedulerBackend传进来了,但还没有启动Standalone-SchedulerBackend。在TaskSchedulerImpl的initialize方法中,把StandaloneSchedulerBackend传进来,赋值为TaskSchedulerImpl的backend。
在TaskSchedulerImpl中调用start方法时,会调用backend.start方法,在start方法中会注册应用程序。
SparkContext.scala的taskScheduler的源码如下。
1. val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) 2. _schedulerBackend = sched 3. _taskScheduler = ts 4. _dagScheduler = new DAGScheduler(this) 5. ...... 6. _taskScheduler.start() 7. _applicationId = _taskScheduler.applicationId() 8. _applicationAttemptId = taskScheduler.applicationAttemptId() 9. _conf.set("spark.app.id", _applicationId) 10. ......
其中调用了_taskScheduler的start方法。
1. private[spark] trait TaskScheduler { 2. ...... 3. 4. def start(): Unit 5. .....
TaskScheduler的start()方法没具体实现,TaskScheduler子类的TaskSchedulerImpl的start()方法的源码如下。
1. override def start() { 2. backend.start() 3. ......
TaskSchedulerImpl的start()通过backend.start()启动了StandaloneSchedulerBackend的start方法。
StandaloneSchedulerBackend的start方法中,将command封装注册给Master,Master转过来要Worker启动具体的Executor。command已经封装好指令,Executor具体要启动进程入口类CoarseGrainedExecutorBackend。然后调用new()函数创建一个StandaloneAppClient,通过client.start()启动client。
StandaloneAppClient的start方法中调用new()函数创建一个ClientEndpoint。
1. def start() { 2. //启动一个rpcEndpoint,它将回调到监听器 3. endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint (rpcEnv))) 4. }
ClientEndpoint的源码如下。
1. private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint 2. with Logging { 3. ...... 4. override def onStart(): Unit = { 5. try { 6. registerWithMaster(1) 7. } catch { 8. case e: Exception => 9. logWarning("Failed to connect to master", e) 10. markDisconnected() 11. stop() 12. } 13. }
ClientEndpoint是一个ThreadSafeRpcEndpoint。ClientEndpoint的onStart()方法中调用registerWithMaster(1)进行注册,向Master注册程序。registerWithMaster方法如下。
StandaloneAppClient.scala的源码如下。
1. private def registerWithMaster(nthRetry: Int) { 2. registerMasterFutures.set(tryRegisterAllMasters()) 3. ......
registerWithMaster中调用了tryRegisterAllMasters方法。在tryRegisterAllMasters方法中,ClientEndpoint向Master发送RegisterApplication消息进行应用程序的注册。
StandaloneAppClient.scala的源码如下。
1. private def tryRegisterAllMasters(): Array[JFuture[_]] = { 2. ...... 3. masterRef.send(RegisterApplication(appDescription, self)) 4. ......
程序注册以后,Master通过schedule()分配资源,通知Worker启动Executor,Executor启动的进程是CoarseGrainedExecutorBackend,Executor启动以后又转过来向Driver注册,Driver其实是StandaloneSchedulerBackend的父类CoarseGrainedSchedulerBackend的一个消息循环体DriverEndpoint。
Master.scala的receive方法的源码如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. case RegisterApplication(description, driver) => 3. ....... 4. registerApplication(app) 5. logInfo("Registered app " + description.name + " with ID " + app.id) 6. persistenceEngine.addApplication(app) 7. driver.send(RegisteredApplication(app.id, self)) 8. schedule() 9. }
在Master的receive方法中调用了schedule方法。Schedule方法在等待的应用程序中调度当前可用的资源。每次一个新的应用程序连接或资源发生可用性的变化时,此方法将被调用。
Master.scala的schedule方法的源码如下。
1. private def schedule(): Unit = { 2. ....... 3. if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { 4. launchDriver(worker, driver) 5. waitingDrivers -= driver 6. launched = true 7. } 8. curPos = (curPos + 1) % numWorkersAlive 9. } 10. } 11. startExecutorsOnWorkers() 12. }
Master.scala在schedule方法中调用launchDriver方法。launchDriver方法给Worker发送launchDriver的消息。Master.scala的launchDriver的源码如下。
1. private def launchDriver(worker: WorkerInfo, driver: DriverInfo) { 2. logInfo("Launching driver " + driver.id + " on worker " + worker.id) 3. worker.addDriver(driver) 4. driver.worker = Some(worker) 5. worker.endpoint.send(LaunchDriver(driver.id, driver.desc)) 6. driver.state = DriverState.RUNNING 7. }
launchDriver本身是一个case class,包括driverId、driverDesc等信息。
1. case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage
DriverDescription包含了jarUrl、memory、cores、supervise、command等内容。
1. private[deploy] case class DriverDescription( 2. jarUrl: String, 3. mem: Int, 4. cores: Int, 5. supervise: Boolean, 6. command: Command) { 7. 8. override def toString: String = s"DriverDescription (${command. mainClass})" 9. }
Master.scala中launchDriver启动了Driver,接下来,launchExecutor启动Executor。Master.scala的launchExecutor的源码如下。
1. private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { 2. logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) 3. worker.addExecutor(exec) 4. worker.endpoint.send(LaunchExecutor(masterUrl, 5. exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)) 6. exec.application.driver.send( 7. ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) 8. }
Master给Worker发送一个消息LaunchDriver启动Driver,然后是launchExecutor启动Executor,launchExecutor有自己的调度方式,资源调度后,也是给Worker发送了一个消息LaunchExecutor。
Worker就收到Master发送的LaunchDriver、LaunchExecutor消息。
图6-2是Worker原理内幕和流程机制。
图6-2 Worker原理内幕和流程机制
Master、Worker部署在不同的机器上,Master、Worker为进程存在。Master给Worker发两种不同的指令:一种指令是LaunchDriver;另一种指令是LaunchExecutor。
Worker收到Master的LaunchDriver消息以后,调用new()函数创建一个DriverRunner,然后启动driver.start()方法。
Worker.scala的源码如下。
1. case LaunchDriver(driverId, driverDesc) => 2. ...... 3. val driver = new DriverRunner( 4. ...... 5. driver.start()
Worker收到Master的LaunchExecutor消息以后,new()函数创建一个ExecutorRunner,然后启动manager.start()方法。
Worker.scala的源码如下。
1. case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => 2. ...... 3. val manager = new ExecutorRunner( 4. ...... 5. manager.start()
Worker的DriverRunner、ExecutorRunner在调用start方法时,在start内部都启动了一条线程,使用Thread来处理Driver、Executor的启动。以Worker收到LaunchDriver消息,new出DriverRunnerDriverRunner为例,DriverRunner.scala的start的源码如下。
1. /**启动一个线程来运行和管理Driver*/ 2. private[worker] def start() = { 3. new Thread("DriverRunner for " + driverId) { 4. override def run() { 5. var shutdownHook: AnyRef = null 6. try { 7. shutdownHook = ShutdownHookManager.addShutdownHook { () => 8. logInfo(s"Worker shutting down, killing driver $driverId") 9. kill() 10. } 11. 12. //准备Driver 的jars 包,运行Driver 13. val exitCode = prepareAndRunDriver() 14. 15. //设置的最终状态取决于是否强制删除,并处理退出代码 16. finalState = if (exitCode == 0) { 17. Some(DriverState.FINISHED) 18. } else if (killed) { 19. Some(DriverState.KILLED) 20. } else { 21. Some(DriverState.FAILED) 22. } 23. } catch { 24. case e: Exception => 25. kill() 26. finalState = Some(DriverState.ERROR) 27. finalException = Some(e) 28. } finally { 29. if (shutdownHook != null) { 30. ShutdownHookManager.removeShutdownHook(shutdownHook) 31. } 32. } 33. 34. //通知worker节点Driver的最终状态及可能的异常 35. worker.send(DriverStateChanged(driverId, finalState.get, finalException)) 36. } 37. }.start() 38. }
DriverRunner.scala的start方法中调用了prepareAndRunDriver方法,准备Driver的jar包和启动Driver。prepareAndRunDriver的源码如下。
1. private[worker] def prepareAndRunDriver(): Int = { 2. val driverDir = createWorkingDirectory() 3. val localJarFilename = downloadUserJar(driverDir) 4. 5. def substituteVariables(argument: String): String = argument match { 6. case "{{WORKER_URL}}" => workerUrl 7. case "{{USER_JAR}}" => localJarFilename 8. case other => other 9. } 10. 11. //待办事项:如果我们增加了提交多个jars包的能力,在这里也要增加 12. val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) 13. 14. 15. runDriver(builder, driverDir, driverDesc.supervise) 16. }
LaunchDriver的启动过程如下。
Worker进程:Worker的DriverRunner调用start方法,内部使用Thread来处理Driver启动。DriverRunner创建Driver在本地系统的工作目录(即Linux的文件目录),每次工作都有自己的目录,封装好Driver的启动Command,通过ProcessBuilder启动Driver。这些内容都属于Worker进程。
Driver进程:启动的Driver属于Driver进程。
LaunchExecutor的启动过程如下。
Worker进程:Worker的ExecutorRunner调用start方法,内部使用Thread来处理Executor启动。ExecutorRunner创建Executor在本地系统的工作目录(即Linux的文件目录),每次工作都有自己的目录,封装好Executor的启动Command,通过ProcessBuilder来启动Executor。这些内容都属于Worker进程。
Executor进程:启动的Executor属于Executor进程。Executor在ExecutorBackend里面,ExecutorBackend在Spark standalone模式中是CoarseGrainedExecutorBackend。CoarseGrainedExecutorBackend继承自ExecutorBackend。Executor和ExecutorBackend是一对一的关系,一个ExecutorBackend有一个Executor,在Executor内部是通过线程池并发处理的方式来处理Spark提交过来的Task的。
Executor启动后要向Driver注册,注册给SchedulerBackend。
CoarseGrainedExecutorBackend的源码如下。
1. private[spark] class CoarseGrainedExecutorBackend( 2. override val rpcEnv: RpcEnv, 3. driverUrl: String, 4. executorId: String, 5. hostname: String, 6. cores: Int, 7. userClassPath: Seq[URL], 8. env: SparkEnv) 9. extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging { 10. 11. private[this] val stopping = new AtomicBoolean(false) 12. var executor: Executor = null 13. @volatile var driver: Option[RpcEndpointRef] = None 14. ......
再次看一下Master的schedule方法。
1. private def schedule(): Unit = { 2. ...... 3. if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { 4. launchDriver(worker, driver) 5. waitingDrivers -= driver 6. launched = true 7. } 8. curPos = (curPos + 1) % numWorkersAlive 9. } 10. } 11. startExecutorsOnWorkers() 12. }
Master的schedule方法中,如果Driver运行在集群中,通过launchDriver来启动Driver。launchDriver发送一个消息交给worker的endpoint,这是RPC的通信机制。
1. private def launchDriver(worker: WorkerInfo, driver: DriverInfo) { 2. logInfo("Launching driver " + driver.id + " on worker " + worker.id) 3. worker.addDriver(driver) 4. driver.worker = Some(worker) 5. worker.endpoint.send(LaunchDriver(driver.id, driver.desc)) 6. driver.state = DriverState.RUNNING 7. }
Master的schedule方法中启动Executor的部分,通过startExecutorsOnWorkers启动,startExecutorsOnWorkers也是通过RPC的通信方式。
Master.scala的方法中调用allocateWorkerResourceToExecutors方法进行正式分配。
allocateWorkerResourceToExecutors正式分配时就通过launchExecutor方法启动Executor。
1. private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { 2. logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) 3. worker.addExecutor(exec) 4. worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)) 5. exec.application.driver.send( ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) 6. }
Master发送消息给Worker,发送两个消息:一个是LaunchDriver;另一个是LaunchExecutor。Worker收到Master的LaunchDriver、LaunchExecutor消息。下面看一下Worker。
1. private[deploy] class Worker( 2. override val rpcEnv: RpcEnv, 3. webUiPort: Int, 4. cores: Int, 5. memory: Int, 6. masterRpcAddresses: Array[RpcAddress], 7. endpointName: String, 8. workDirPath: String = null, 9. val conf: SparkConf, 10. val securityMgr: SecurityManager) 11. extends ThreadSafeRpcEndpoint with Logging {
Worker实现RPC通信,继承自ThreadSafeRpcEndpoint。ThreadSafeRpcEndpoint是一个trait,其他的RPC对象可以给它发消息。
1. private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint
Worker在receive方法中接收消息。就像一个邮箱,不断地循环邮箱接收邮件,我们可以把消息看成邮件。
1. override def receive: PartialFunction[Any, Unit] = synchronized { 2. case SendHeartbeat => 3. ...... 4. case WorkDirCleanup => 5. ...... 6. case MasterChanged(masterRef, masterWebUiUrl) => 7. ...... 8. case ReconnectWorker(masterUrl) => 9. ....... 10. case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => 11. ...... 12. case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) 13. ...... 14. case KillExecutor(masterUrl, appId, execId) => 15. ...... 16. case LaunchDriver(driverId, driverDesc) => 17. ......
Worker.scala的receive方法LaunchDriver启动Driver的源码如下。
1. case LaunchDriver(driverId, driverDesc) => 2. logInfo(s"Asked to launch driver $driverId") 3. val driver = new DriverRunner( 4. conf, 5. driverId, 6. workDir, 7. sparkHome, 8. driverDesc.copy(command = Worker.maybeUpdateSSLSettings (driverDesc.command, conf)), 9. self, 10. workerUri, 11. securityMgr) 12. drivers(driverId) = driver 13. driver.start() 14. 15. coresUsed += driverDesc.cores 16. memoryUsed += driverDesc.mem
LaunchDriver方法首先打印日志,传进来时肯定会告诉driverId。启动Driver或者Executor时,Driver或者Executor所在的进程一定满足内存级别的要求,但不一定满足Cores的要求,实际的Cores可能比期待的Cores多,也有可能少。
logInfo方法打印日志使用了封装。
1. protected def logInfo(msg: => String) { 2. if (log.isInfoEnabled) log.info(msg) 3. }
回到LaunchDriver方法,其中调用new()函数创建一个DriverRunner。DriverRunner包括driverId、工作目录(workDir)、spark的路径(sparkHome)、driverDesc、workerUri、securityMgr等内容。在代码drivers(driverId) = driver中,将driver交给一个数据结构drivers,drivers是一个HashMap,是Key-Value的方式,其中Key是Driver的ID,Value是DriverRunner。Worker下可能启动很多Executor,须根据具体的ID管理DriverRunner。DriverRunner内部通过线程的方式启动另外一个进程Driver。DriverRunner是Driver所在进程的代理。
1. val drivers = new HashMap[String, DriverRunner]
回到Worker.scala的LaunchDriver,Worker在启动driver前,将相关的DriverRunner数据保存到Worker的内存数据结构中,然后进行driver.start()。start之后,将消耗的cores、memory增加到coresUsed、memoryUsed。
接下来进入DriverRunner.scala的源码。DriverRunner管理Driver的执行,包括在Driver失败的时候自动重启。如Driver运行在集群模式中,加入supervise关键字可以自动重启。
1. private[deploy] class DriverRunner( 2. conf: SparkConf, 3. val driverId: String, 4. val workDir: File, 5. val sparkHome: File, 6. val driverDesc: DriverDescription, 7. val worker: RpcEndpointRef, 8. val workerUrl: String, 9. val securityManager: SecurityManager) 10. extends Logging {
其中DriverDescription的源码如下。其中包括DriverDescription的成员supervise,supervise是一个布尔值,如果设置为True,在集群模式中Driver运行失败的时候,Worker会负责重新启动Driver。
1. private[deploy] case class DriverDescription( 2. jarUrl: String, 3. mem: Int, 4. cores: Int, 5. supervise: Boolean, 6. command: Command) { 7. 8. override def toString: String = s"DriverDescription (${command .mainClass})" 9. }
回到Worker.scala的LaunchDriver,DriverRunner构造出后,调用其start方法,通过一个线程管理Driver,包括启动Driver及关闭Driver。其中,Thread("DriverRunner for " + driverId),DriverRunner for driverId是线程的名字,Thread是Java的代码,scala可以无缝连接Java。
DriverRunner的start方法调用prepareAndRunDriver来实现driver jar包的准备及启动driver。
prepareAndRunDriver方法中调用了createWorkingDirectory方法创建目录。通过Java的new File创建了Driver的工作目录,如果目录不存在而且创建不成功,就提示失败。在本地文件系统创建一个目录一般不会失败,除非磁盘满。createWorkingDirectory的源码如下。
1. private def createWorkingDirectory(): File = { 2. val driverDir = new File(workDir, driverId) 3. if (!driverDir.exists() && !driverDir.mkdirs()) { 4. throw new IOException("Failed to create directory " + driverDir) 5. } 6. driverDir 7. }
回到DriverRunner.scala的prepareAndRunDriver方法,其中采用downloadUserJar方法下载jar包。我们自己写的代码是一个jar包,这里下载用户的jar包到本地。jar包在Hdfs中,开发人员需要从Hdfs中获取Jar包下载到本地。
downloadUserJar方法的源码如下。
1. private def downloadUserJar(driverDir: File): String = { 2. val jarFileName = new URI(driverDesc.jarUrl).getPath.split("/").last 3. val localJarFile = new File(driverDir, jarFileName) 4. if (!localJarFile.exists()) { //如果在一个节点上运行多个Worker,文件可能 //已经存在 5. logInfo(s"Copying user jar ${driverDesc.jarUrl} to $localJarFile") 6. Utils.fetchFile( 7. driverDesc.jarUrl, 8. driverDir, 9. conf, 10. securityManager, 11. SparkHadoopUtil.get.newConfiguration(conf), 12. System.currentTimeMillis(), 13. useCache = false) 14. if (!localJarFile.exists()) { //验证复制成功 15. throw new IOException( 16. s"Can not find expected jar $jarFileName which should have been loaded in $driverDir") 17. } 18. } 19. localJarFile.getAbsolutePath 20. }
downloadUserJar方法调用了fetchFile,fetchFile借助Hadoop,从Hdfs中下载文件。我们提交文件时,将jar包上传到Hdfs上,提交一份,大家都可以从Hdfs中下载。Utile. fetchFile方法的源码如下。
1. def fetchFile( 2. url: String, 3. targetDir: File, 4. conf: SparkConf, 5. securityMgr: SecurityManager, 6. hadoopConf: Configuration, 7. timestamp: Long, 8. useCache: Boolean) { 9. val fileName = decodeFileNameInURI(new URI(url)) 10. val targetFile = new File(targetDir, fileName) 11. val fetchCacheEnabled = conf.getBoolean("spark.files.useFetchCache", defaultValue = true) 12. if (useCache && fetchCacheEnabled) { 13. val cachedFileName = s"${url.hashCode}${timestamp}_cache" 14. val lockFileName = s"${url.hashCode}${timestamp}_lock" 15. val localDir = new File(getLocalDir(conf)) 16. val lockFile = new File(localDir, lockFileName) 17. val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel() 18. //只有一个executor 入口。FileLock用来控制executors 下载的文件同步,无论 //锁类型是mandatory还是advisory,它始终是安全的 19. val lock = lockFileChannel.lock() 20. val cachedFile = new File(localDir, cachedFileName) 21. try { 22. if (!cachedFile.exists()) { 23. doFetchFile(url, localDir, cachedFileName, conf, securityMgr, hadoopConf) 24. } 25. } finally { 26. lock.release() 27. lockFileChannel.close() 28. } 29. copyFile( 30. url, 31. cachedFile, 32. targetFile, 33. conf.getBoolean("spark.files.overwrite", false) 34. ) 35. } else { 36. doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf) 37. }
回到DriverRunner.scala的prepareAndRunDriver方法,driverDesc.command表明运行什么类,构建进程运行类的入口,然后是runDriver启动Driver。
1. private[worker] def prepareAndRunDriver(): Int = { 2. ....... 3. val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager, 4. driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) 5. 6. runDriver(builder, driverDir, driverDesc.supervise) 7. }
DriverRunner.scala的runDriver方法如下。runDriver中重定向输出文件和err文件,可以通过log文件查看执行的情况。最后是调用runCommandWithRetry方法。
1. private def runDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean): Int = { 2. builder.directory(baseDir) 3. def initialize(process: Process): Unit = { 4. //stdout和stderr重定向到文件 5. val stdout = new File(baseDir, "stdout") 6. CommandUtils.redirectStream(process.getInputStream, stdout) 7. 8. val stderr = new File(baseDir, "stderr") 9. val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"") 10. val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40) 11. Files.append(header, stderr, StandardCharsets.UTF_8) 12. CommandUtils.redirectStream(process.getErrorStream, stderr) 13. } 14. runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise) 15. }
runCommandWithRetry中传入的参数是ProcessBuilderLike(builder),这里调用new()函数创建一个ProcessBuilderLike,在重载方法start中执行processBuilder.start()。ProcessBuilderLike的源码如下。
1. private[deploy] object ProcessBuilderLike { 2. def apply(processBuilder: ProcessBuilder): ProcessBuilderLike = new ProcessBuilderLike { 3. override def start(): Process = processBuilder.start() 4. override def command: Seq[String] = processBuilder.command().asScala 5. } 6. }
runCommandWithRetry的源码如下。
1. private[worker] def runCommandWithRetry( 2. command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Int = { 3. var exitCode = -1 4. //等待时间提交重试 5. var waitSeconds = 1 6. //运行一定秒的时间以后回退重置 7. val successfulRunDuration = 5 8. var keepTrying = !killed 9. 10. while (keepTrying) { 11. logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\"")) 12. 13. synchronized { 14. if (killed) { return exitCode } 15. process = Some(command.start()) 16. initialize(process.get) 17. } 18. 19. val processStart = clock.getTimeMillis() 20. exitCode = process.get.waitFor() 21. 22. //如果尝试另一个运行检查 23. keepTrying = supervise && exitCode != 0 && !killed 24. if (keepTrying) { 25. if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) { 26. waitSeconds = 1 27. } 28. logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.") 29. sleeper.sleep(waitSeconds) 30. waitSeconds = waitSeconds * 2 //exponential back-off 31. } 32. } 33. 34. exitCode 35. } 36. }
runCommandWithRetry第一次不一定能申请成功,因此循环遍历重试。DriverRunner启动进程是通过ProcessBuilder中的process.get.waitFor来完成的。如果supervise设置为True,exitCode为非零退出码及driver进程没有终止,我们将keepTrying设置为True,继续循环重试启动进程。
回到DriverRunner.scala的LaunchDriver方法如下。
1. case LaunchDriver(driverId, driverDesc) => 2. ...... 3. drivers(driverId) = driver 4. driver.start()
采用driver.start方法启动Driver,进入start的源码如下。
1. private[worker] def start() = { 2. new Thread("DriverRunner for " + driverId) { 3. override def run() { 4. ...... 5. } catch { 6. case e: Exception => 7. kill() 8. finalState = Some(DriverState.ERROR) 9. finalException = Some(e) 10. } finally { 11. if (shutdownHook != null) { 12. ShutdownHookManager.removeShutdownHook(shutdownHook) 13. } 14. } 15. 16. //通知worker节点Driver的最终状态及可能的异常 17. worker.send(DriverStateChanged(driverId, finalState.get, finalException)) 18. } 19. }.start() 20. }
Start启动时运行到了finalState,可能是Spark运行出状况了,如Driver运行时KILLED或者FAILED,出状况以后,通过worker.send给自己发一个消息,通知DriverStateChanged状态改变。下面是Worker.scala中的driverStateChanged的源码。
1. case driverStateChanged @ DriverStateChanged(driverId, state, exception) => 2. handleDriverStateChanged(driverStateChanged)
在其中调用handleDriverStateChanged方法,handleDriverStateChanged的源码如下。
1. private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = { 2. val driverId = driverStateChanged.driverId 3. val exception = driverStateChanged.exception 4. val state = driverStateChanged.state 5. state match { 6. case DriverState.ERROR => 7. logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}") 8. case DriverState.FAILED => 9. logWarning(s"Driver $driverId exited with failure") 10. case DriverState.FINISHED => 11. logInfo(s"Driver $driverId exited successfully") 12. case DriverState.KILLED => 13. logInfo(s"Driver $driverId was killed by user") 14. case _ => 15. logDebug(s"Driver $driverId changed state to $state") 16. } 17. sendToMaster(driverStateChanged) 18. val driver = drivers.remove(driverId).get 19. finishedDrivers(driverId) = driver 20. trimFinishedDriversIfNecessary() 21. memoryUsed -= driver.driverDesc.mem 22. coresUsed -= driver.driverDesc.cores 23. }
Worker.scala的handleDriverStateChanged方法中对于state的不同情况,打印相关日志。关键代码是sendToMaster(driverStateChanged),发一个消息给Master,告知Driver进程挂掉。消息内容是driverStateChanged。sendToMaster的源码如下。
1. private def sendToMaster(message: Any): Unit = { 2. master match { 3. case Some(masterRef) => masterRef.send(message) 4. case None => 5. logWarning( 6. s"Dropping $message because the connection to master has not yet been established") 7. } 8. }
下面来看一下Master的源码。Master收到DriverStateChanged消息以后,无论Driver的状态是DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED中的任何一个,都把Driver从内存数据结构中删掉,并把持久化引擎中的数据清理掉。
1. case DriverStateChanged(driverId, state, exception) => 2. state match { 3. case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED => 4. removeDriver(driverId, state, exception) 5. case _ => 6. throw new Exception(s"Received unexpected state update for driver $driverId: $state") 7. }
进入removeDriver的源码,清理掉相关数据以后,再次调用schedule方法。
1. private def removeDriver( 2. driverId: String, 3. finalState: DriverState, 4. exception: Option[Exception]) { 5. drivers.find(d => d.id == driverId) match { 6. case Some(driver) => 7. logInfo(s"Removing driver: $driverId") 8. drivers -= driver 9. if (completedDrivers.size >= RETAINED_DRIVERS) { 10. val toRemove = math.max(RETAINED_DRIVERS / 10, 1) 11. completedDrivers.trimStart(toRemove) 12. } 13. completedDrivers += driver 14. persistenceEngine.removeDriver(driver) 15. driver.state = finalState 16. driver.exception = exception 17. driver.worker.foreach(w => w.removeDriver(driver)) 18. schedule() 19. case None => 20. logWarning(s"Asked to remove unknown driver: $driverId") 21. } 22. } 23. }
接下来看一下启动Executor。Worker.scala的LaunchExecutor方法的源码如下。
Spark 2.1.1版本的Worker.scala的源码如下。
1. case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => 2. if (masterUrl != activeMasterUrl) { 3. logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") 4. } else { 5. try { 6. logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) 7. 8. //创建executor节点的工作目录 9. val executorDir = new File(workDir, appId + "/" + execId) 10. if (!executorDir.mkdirs()) { 11. throw new IOException("Failed to create directory " + executorDir) 12. } 13. 14. //创建 executor 的本地目录。通过 SPARK_EXECUTOR_DIRS 环境变量传递给 //executor。应用程序完成后,这些目录将会被Worker删除 15. val appLocalDirs = appDirectories.getOrElse(appId, 16. Utils.getOrCreateLocalRootDirs(conf).map { dir => 17. val appDir = Utils.createDirectory(dir, namePrefix = "executor") 18. Utils.chmod700(appDir) 19. appDir.getAbsolutePath() 20. }.toSeq) 21. appDirectories(appId) = appLocalDirs 22. val manager = new ExecutorRunner( 23. appId, 24. execId, 25. appDesc.copy(command = Worker.maybeUpdateSSLSettings (appDesc.command, conf)), 26. cores_, 27. memory_, 28. self, 29. workerId, 30. host, 31. webUi.boundPort, 32. publicAddress, 33. sparkHome, 34. executorDir, 35. workerUri, 36. conf, 37. appLocalDirs, ExecutorState.RUNNING) 38. executors(appId + "/" + execId) = manager 39. manager.start() 40. coresUsed += cores_ 41. memoryUsed += memory_ 42. sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None)) 43. } catch { 44. case e: Exception => 45. logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e) 46. if (executors.contains(appId + "/" + execId)) { 47. executors(appId + "/" + execId).kill() 48. executors -= appId + "/" + execId 49. } 50. sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, 51. Some(e.toString), None)) 52. } 53. }
Spark 2.2.0版本的Worker.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第16~20行整体替换,新增以下代码实现对Executor本地目录创建失败的异常处理。
1. ..... 2. val localRootDirs = Utils.getOrCreateLocalRootDirs(conf) 3. val dirs = localRootDirs.flatMap { dir => 4. try { 5. val appDir = Utils.createDirectory(dir, namePrefix = "executor") 6. Utils.chmod700(appDir) 7. Some(appDir.getAbsolutePath()) 8. } catch { 9. case e: IOException => 10. logWarning(s"${e.getMessage}. Ignoring this directory.") 11. None 12. } 13. }.toSeq 14. if (dirs.isEmpty) { 15. throw new IOException("No subfolder can be created in " + 16. s"${localRootDirs.mkString(",")}.") 17. } 18. dirs 19. }) 20. .....
直接看一下manager.start方法,启动一个线程Thread,在run方法中调用fetchAndRunExecutor。
其中,fetchAndRunExecutor的源码如下。
1. private def fetchAndRunExecutor() { 2. try { 3. //启动进程 4. val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf), 5. memory, sparkHome.getAbsolutePath, substituteVariables) 6. val command = builder.command() 7. val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"") 8. logInfo(s"Launch command: $formattedCommand") 9. 10. builder.directory(executorDir) 11. builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString (File.pathSeparator)) 12. //如果在Spark Shell中运行,避免创建一个“Scala”的父进程执行executor命令 13. builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0") 14. 15. //增加WebUI日志网址 16. val baseUrl = 17. if (conf.getBoolean("spark.ui.reverseProxy", false)) { 18. s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId& logType=" 19. } else { 20. s"http://$publicAddress:$webUiPort/logPage/?appId= $appId&executorId=$execId&logType=" 21. } 22. builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl} stderr") 23. builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl} stdout") 24. 25. process = builder.start() 26. val header = "Spark Executor Command: %s\n%s\n\n".format( 27. formattedCommand, "=" * 40) 28. 29. //重定向stdout和stderr文件 30. val stdout = new File(executorDir, "stdout") 31. stdoutAppender = FileAppender(process.getInputStream, stdout, conf) 32. 33. val stderr = new File(executorDir, "stderr") 34. Files.write(header, stderr, StandardCharsets.UTF_8) 35. stderrAppender = FileAppender(process.getErrorStream, stderr, conf) 36. 37. //等待它退出;执行器可以退出代码0(当driver 指示它关闭)或非零退出码 38. val exitCode = process.waitFor() 39. state = ExecutorState.EXITED 40. val message = "Command exited with code " + exitCode 41. worker.send(ExecutorStateChanged(appId, execId, state, Some (message), Some(exitCode))) 42. } catch { 43. case interrupted: InterruptedException => 44. logInfo("Runner thread for executor " + fullId + " interrupted") 45. state = ExecutorState.KILLED 46. killProcess(None) 47. case e: Exception => 48. logError("Error running executor", e) 49. state = ExecutorState.FAILED 50. killProcess(Some(e.toString)) 51. } 52. } 53. }
fetchAndRunExecutor类似于启动Driver的过程,在启动Executor时首先构建CommandUtils.buildProcessBuilder,然后是builder.start(),退出时发送ExecutorStateChanged消息给Worker。
Worker.scala源码中的executorStateChanged如下。
1. case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) => 2. handleExecutorStateChanged(executorStateChanged)
进入handleExecutorStateChanged源码,sendToMaster(executorStateChanged)发executorStateChanged消息给Master。
1. private[worker] def handleExecutorStateChanged(executorStateChanged: ExecutorStateChanged): 2. Unit = { 3. sendToMaster(executorStateChanged) 4. val state = executorStateChanged.state 5. if (ExecutorState.isFinished(state)) { 6. val appId = executorStateChanged.appId 7. val fullId = appId + "/" + executorStateChanged.execId 8. val message = executorStateChanged.message 9. val exitStatus = executorStateChanged.exitStatus 10. executors.get(fullId) match { 11. case Some(executor) => 12. logInfo("Executor " + fullId + "finished with state" + state + 13. message.map(" message " + _).getOrElse("") + 14. exitStatus.map(" exitStatus " + _).getOrElse("")) 15. executors -= fullId 16. finishedExecutors(fullId) = executor 17. trimFinishedExecutorsIfNecessary() 18. coresUsed -= executor.cores 19. memoryUsed -= executor.memory 20. case None => 21. logInfo("Unknown Executor " + fullId + " finished with state " + state + 22. message.map(" message " + _).getOrElse("") + 23. exitStatus.map(" exitStatus " + _).getOrElse("")) 24. } 25. maybeCleanupApplication(appId) 26. } 27. } 28. }
下面看一下Master.scala。Master收到ExecutorStateChanged消息。如状态发生改变,通过exec.application.driver.send给Driver也发送一个ExecutorUpdated消息,流程和启动Driver基本是一样的。ExecutorStateChanged的源码如下。
1. case ExecutorStateChanged(appId, execId, state, message, exitStatus) => 2. val execOption = idToApp.get(appId).flatMap(app => app.executors .get(execId)) 3. execOption match { 4. case Some(exec) => 5. val appInfo = idToApp(appId) 6. val oldState = exec.state 7. exec.state = state 8. 9. if (state == ExecutorState.RUNNING) { 10. assert(oldState == ExecutorState.LAUNCHING, 11. s"executor $execId state transfer from $oldState to RUNNING is illegal") 12. appInfo.resetRetryCount() 13. } 14. 15. exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false)) 16. 17. if (ExecutorState.isFinished(state)) { 18. //从Worker和应用程序中删除此executor 19. logInfo(s"Removing executor ${exec.fullId} because it is $state") 20. //如果应用程序已经完成,保存应用程序状态,以在UI页面上正确显示信息 21. 22. if (!appInfo.isFinished) { 23. appInfo.removeExecutor(exec) 24. } 25. exec.worker.removeExecutor(exec) 26. 27. val normalExit = exitStatus == Some(0) 28. //只重试一定次数,这样就不会进入无限循环。重要提示:此代码路径不是通过测 //试执行的,改变if条件时要小心 29. 30. 31. if (!normalExit 32. && appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES 33. && MAX_EXECUTOR_RETRIES >= 0) { //< 0 disables this application-killing path 34. val execs = appInfo.executors.values 35. if (!execs.exists(_.state == ExecutorState.RUNNING)) { 36. logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + 37. s"${appInfo.retryCount} times; removing it") 38. removeApplication(appInfo, ApplicationState.FAILED) 39. } 40. } 41. } 42. schedule() 43. case None => 44. logWarning(s"Got status update for unknown executor $appId/$execId") 45. }
6.3.2 Driver和Master交互原理解析
Driver和Master交互,Master是一个消息循环体。本节讲解Driver消息循环体的产生过程,Driver消息循环体生成之后,就可以与Master互相通信了。
Spark应用程序提交时,我们会提交一个spark-submit脚本。spark-submit脚本中直接运行了org.apache.spark.deploy.SparkSubmit对象。Spark-submit脚本内容如下所示。
1. #!/usr/bin/env bash 2. SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" 3. export PYTHONHASHSEED=0 4. exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"//运行SparkSubmit
进入到SparkSubmit中,main函数代码如下所示。
SparkSubmit.scala的源码如下。
1. def main(args: Array[String]): Unit = { 2. //由启动main函数传入的参数构建SparkSubmitArguments对象 3. val appArgs = new SparkSubmitArguments(args) 4. //打印参数信息 5. if (appArgs.verbose) { 6. printStream.println(appArgs) 7. } 8. appArgs.action match { 9. //提交,调用submit方法 10. case SparkSubmitAction.SUBMIT => submit(appArgs) 11. //杀死,调用kill方法 12. case SparkSubmitAction.KILL => kill(appArgs) 13. //请求状态,调用requestStatus方法 14. case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) 15. } 16. }
上面的代码中,spark-submit脚本提交的命令行参数通过main函数的args获取,并将args参数传入SparkSubmitArguments中完成解析。最后通过匹配appArgs参数中的action类型,执行submit、kill、requestStatus操作。
进入到SparkSubmitArguments中,分析一下参数的解析过程。SparkSubmitArguments中的关键代码如下所示。
SparkSubmitArguments.scala的源码如下。
1. //调用parse方法,从命令行解析出各个参数 2. try { 3. parse(args.asJava) 4. } catch { 5. //捕获到IllegalArgumentException,打印错误并退出 6. case e: IllegalArgumentException => 7. SparkSubmit.printErrorAndExit(e.getMessage()) 8. } 9. //合并默认的Spark配置项,使用传入的配置覆盖默认的配置 10. mergeDefaultSparkProperties() 11. //从sparkProperties移除不是“spark.”为开始的配置 12. ignoreNonSparkProperties() 13. //加载系统环境变量中的配置信息 14. loadEnvironmentArguments() 15. //验证参数是否合法 16. validateArguments()
在上面的代码中,parse(args.toList)将会解析命令行参数,通过mergeDefaultSpark-Properties合并默认配置,调用ignoreNonSparkProperties方法忽略不是以“spark.”为开始的配置,方法loadEnvironmentArguments加载系统环境变量,最后调用validateArguments方法检验参数的合法性。这些配置如何提交呢?main函数中由case SparkSubmitAction.SUBMIT => submit(appArgs)这句代码判断是否提交参数并执行程序,如果匹配到SparkSubmit-Action.SUBMIT,则调用submit(appArgs)方法,参数appArgs是SparkSubmitArguments类型,appArgs中包含了提交的各种参数,包括命令行传入以及默认的配置项。
submit(appArgs)方法主要完成两件事情:
(1)准备提交环境。
(2)执行main方法,完成提交。
首先来看Spark中是如何准备环境的。在submit(appArgs)方法中,有如下源码。
SparkSubmit.scala的源码如下。
1. private def submit(args: SparkSubmitArguments): Unit = { 2. val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) 3. ....... 4. runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) 5. ......
这段代码中,调用prepareSubmitEnvironment(args)方法,完成提交环境的准备。该方法返回一个四元Tuple,分别表示子进程参数、子进程classpath列表、系统属性map、子进程main方法。完成了提交环境的准备工作后,接下来就启动子进程,在Standalone模式下,启动的子进程是org.apache.spark.deploy.Client对象。具体的执行过程在runMain函数中,关键代码如下所示。
SparkSubmit.scala的源码如下。
1. private def runMain( 2. childArgs: Seq[String], 3. childClasspath: Seq[String], 4. sysProps: Map[String, String], 5. childMainClass: String, 6. verbose: Boolean): Unit = { 7. ...... 8. Thread.currentThread.setContextClassLoader(loader)//获得classLoader 9. for (jar <- childClasspath) { //遍历Classpath列表 10. addJarToClasspath(jar, loader) //使用loader类加载器将jar包依赖加入Classpath 11. } 12. for ((key, value) <- sysProps) { //将sysProps中的配置全部设置到System全局变量中 13. System.setProperty(key, value) 14. } 15. var mainClass: Class[_] = null 16. mainClass = Utils.classForName(childMainClass)//获取启动的MainClass 17. ......//得到启动的对象的main方法 18. val mainMethod = mainClass.getMethod("main", new Array[String] (0).getClass) 19. ......//使用反射执行main方法,并将childArgs作为参数传入该main方法 20. mainMethod.invoke(null, childArgs.toArray) 21. }
在上面的代码中,使用Utils工具提供的classForName方法,找到主类,然后在mainClass上调用getMethod方法得到main方法,最后在mainMethod上调用invoke执行main方法。需要注意的是,执行invoke方法同时传入了childArgs参数,这个参数中保留了配置信息。Utils.classForName(childMainClass)方法将会返回要执行的主类,这里的childMainClass是哪一个类呢?其实,这个参数在不同的部署模式下是不一样的,standalone模式下,childMainClass指的是org.apache.spark.deploy.Client类,从源码中可以找到依据,源码如下所示。
SparkSubmit.scala的源码如下。
1. //在prepareSubmitEnvironment方法中判断是否为Standalone集群模式 2. if (args.isStandaloneCluster) { 3. //判断使用Rest,使用Rest childMainClass为org.apache.spark.deploy //.rest.RestSubmissionClient 4. if (args.useRest) { 5. childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient" 6. childArgs += (args.primaryResource, args.mainClass) 7. } else { 8. //非Rest,childMainClass为org.apache.spark.deploy.Client 9. childMainClass = "org.apache.spark.deploy.Client" 10. if (args.supervise) { childArgs += "--supervise" } 11. //设置driver memory 12. Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) } 13. //设置driver cores 14. Option(args.driverCores).foreach { c => childArgs += ("--cores", c) } 15. childArgs += "launch" 16. childArgs += (args.master, args.primaryResource, args.mainClass) 17. } 18. if (args.childArgs != null) { 19. childArgs ++= args.childArgs 20. } 21. }
在上面的代码中,程序首先根据args.isStandaloneCluster判断部署模式,如果是standalone模式并且不使用REST服务,childMainClass = "org.apache.spark.deploy.Client"。从上述代码中可以看出,childArgs中存入了Executor的memory配置和cores配置。与runMain方法中描述一样,程序将启动org.apache.spark.deploy.Client类,并运行主方法。Client类中做了哪些事情?先来看这个类是怎样完成调用的。下面是Client对象及主方法。
Client.scala的源码如下。
1. object Client { 2. def main(args: Array[String]) { 3. //若sys中不包含SPARK_SUBMIT,则打印警告信息 4. if (!sys.props.contains("SPARK_SUBMIT")) { 5. println("WARNING: This client is deprecated and will be removed in a future version of Spark") 6. println("Use ./bin/spark-submit with \"--master spark://host: port\"") 7. } 8. //scalastyle:on println 9. //创建SparkConf对象 10. val conf = new SparkConf() 11. //创建ClientArguments对象,代表Driver端的参数 12. val driverArgs = new ClientArguments(args) 13. 14. //设置RPC请求超时时间为10s 15. if (!conf.contains("spark.rpc.askTimeout")) { 16. conf.set("spark.rpc.askTimeout", "10s") 17. } 18. Logger.getRootLogger.setLevel(driverArgs.logLevel) 19. //使用RpcEnv的create创建RPC环境 20. val rpcEnv = 21. RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf)) 22. 23. //得到master的URL并得到Master的Endpoints,用于同Master通信 24. val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL). 25. map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME)) 26. rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf)) 27. //等待rpcEnv的终止 28. rpcEnv.awaitTermination() 29. } 30. }
上面的代码中,首先实例化出一个SparkConfig对象,通过这个配置对象,可以在代码中指定一些配置项,如appName、Master地址等。val driverArgs = new ClientArguments(args)使用传入的args参数构建一个ClientArguments对象,该对象同样保留传入的配置信息,如Executor memory、Executor cores等都包含在这个对象中。
使用RpcEnv.create工厂方法,创建一个rpcEnv成员,使用该成员设置好到Master的通信端点,通过该端点实现同Master的通信。Spark 2.0中默认采用Netty框架来实现远程过程调用(Remote Precedure Call,RPC),通过使用RPC异步通信机制,完成各节点之间的通信。在rpcEnv.setupEndpoint方法中调用new()函数创建一个Driver ClientEndpoint。ClientEndpoint是一个ThreadSafeRpcEndpoint消息循环体,至此就生成了Driver ClientEndpoint。在ClientEndpoint的onStart方法中向Master提交注册。这里通过masterEndpoint向Master发送RequestSubmitDriver(driverDescription)请求,完成Driver的注册。
Client.scala的onStart的源码如下。
1. private class ClientEndpoint( 2. override val rpcEnv: RpcEnv, 3. driverArgs: ClientArguments, 4. masterEndpoints: Seq[RpcEndpointRef], 5. conf: SparkConf) 6. extends ThreadSafeRpcEndpoint with Logging { 7. 8. override def onStart(): Unit = { 9. driverArgs.cmd match { 10. ........ 11. val driverDescription = new DriverDescription( 12. driverArgs.jarUrl, 13. driverArgs.memory, 14. driverArgs.cores, 15. driverArgs.supervise, 16. command) 17. ayncSendToMasterAndForwardReply[SubmitDriverResponse]( 18. RequestSubmitDriver(driverDescription)) 19. 20. ......
Master收到Driver ClientEndpoint的RequestSubmitDriver消息以后,就将Driver的信息加入到waitingDrivers和drivers的数据结构中。然后进行schedule()资源分配,Master向Worker发送LaunchDriver的消息指令。
Master.scala的源码如下。
1. case RequestSubmitDriver(description) => 2. ....... 3. val driver = createDriver(description) 4. persistenceEngine.addDriver(driver) 5. waitingDrivers += driver 6. drivers.add(driver) 7. schedule() 8. ......
在Client.scala的onStart代码中,提交的配置参数始终在不同的对象、节点上传递。Master把Driver加载到Worker节点并启动,Worker节点上运行的Driver同样包含配置参数。当Driver端的SparkContext启动并实例化DAGScheduler、TaskScheduler时,StandaloneSchedulerBackend在做另一件事情——实例化StandaloneAppClient,StandaloneAppClient中有StandaloneApp-ClientPoint,也是一个RPC端口的引用,用于和Master进行通信。在StandaloneAppClientPoint的onStart方法中,向Master发送RegisterApplication(appDescription,self)请求,Master节点收到请求并调用schedule方法,向Worker发送LaunchExecutor(masterUrl,exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)请求,Worker节点启动ExecutorRunner。ExecutorRunner中启动CoarseGrainedExecutorBackend并向Driver注册。
在CoarseGrainedExecutorBackend的main方法中,有如下所示代码。
1. var argv = args.toList //将args转化成List 2. while (!argv.isEmpty) { //argv不为空,则一直循环 3. argv match { 4. case ("--driver-url") :: value :: tail => 5. driverUrl = value //得到driveRurl 6. argv = tail 7. case ("--executor-id") :: value :: tail => 8. executorId = value //得到executorid 9. argv = tail 10. case ("--hostname") :: value :: tail => 11. hostname = value //得到hostname 12. argv = tail 13. case ("--cores") :: value :: tail => 14. cores = value.toInt //得到配置的Executor核的个数 15. argv = tail 16. case ("--app-id") :: value :: tail => 17. appId = value //得到application的id 18. argv = tail 19. case ("--worker-url") :: value :: tail => 20. workerUrl = Some(value) //得到worker的url 21. argv = tail 22. case ("--user-class-path") :: value :: tail => 23. userClassPath += new URL(value) //得到用户类路径 24. argv = tail 25. case Nil => 26. case tail => 27. System.err.println(s"Unrecognized options: ${tail.mkString(" ")}") 28. printUsageAndExit() //打印并退出 29. } 30. }
从程序提交一直到CoarseGrainedExecutorBackend进程启动,配置参数一直被传递。在CoarseGrainedExecutorBackend中取出了cores配置信息,并通过run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)将cores传入run方法,CoarseGrainedExecutor-Backend以进程的形式在JVM中启动,此时JVM的资源指占用资源的数量并启动起来。需要注意的是,在一个Worker节点上,只要物理内核的个数和内存大小能够满足Executor启动要求,一个Worker节点上就可以运行多个Executor。
6.3.3 Driver和Master交互源码详解
从Spark-Submit的脚本分析,提交应用程序时,Main启动的类,也就是用户最终提交执行的类是org.apache.spark.deploy.SparkSubmit。SparkSubmit的全路径为org.apache.spark.deploy. SparkSubmit。SparkSubmit是启动一个Spark应用程序的主入口点。当集群管理器为STANDALONE、部署模式为CLUSTER时,根据提交的两种方式将childMainClass分别设置为不同的类,同时将传入的args.mainClass(提交应用程序时设置的主类)及其参数根据不同集群管理器与部署模式进行转换,并封装到新的主类所需的参数中。在REST方式(Spark 1.3+)方式中,childMainClass是"org.apache.spark.deploy.rest.RestSubmissionClient";在传统方式中,childMainClass是"org.apache.spark.deploy.Client"。
接下来以REST方式讲解。当提交方式为REST方式(Spark 1.3+)时,会将应用程序的主类等信息封装到RestSubmissionClient类中,由该类负责向RestSubmissionServer发送提交应用程序的请求,而RestSubmissionServer接收到应用程序提交的请求后,会向Master发送RequestSubmitDriver消息,然后由Master根据资源调度策略,启动集群中相应的Driver,执行提交的应用程序。Cluster部署模式下的部署与执行框架如图6-3所示。
图6-3 Cluster部署模式下的部署与执行框架
为了体现各个组件间的部署关系,这里以框架图的形式进行描述,对应地,可以从时序图的角度去理解各个类或组件之间的交互关系。其中,组件Master和Worker的标注在方框的左上角,其他方框表示一个具体的实例。
其中,RestSubmissionClient是提交应用程序的客户端处,对提交的应用程序进行封装的类。之后各个组件间的交互流程分析如下。
(1)第1步constructSubmitRequest,就是在RestSubmissionClient实例中,根据提交的应用程序信息,构建出提交请求。
(2)然后继续第2步createSubmission,在该步骤中向RestSubmissionServer发送post请求,即图6-3中对应的第3步(注意,实际上是在第2步中调用)。
(3)RestSubmissionServer接收到post请求后,由对应的Servlet进行处理,这里对应为StandaloneSubmitRequestServlet,即开始第4步,调用doPost,发送Post请求。
(4)doPost中继续第5步handleSubmit,开始处理提交请求。在处理过程中,向Master的RPC终端发送消息RequestSubmitDriver,对应图中的第6步。
(5)Master接收到该消息后,执行第7步createDriver,创建Driver,需要由Master的调度机制创建,对应第8步schedule,获取分配的资源后,向Worker(这些Worker启动时会注册到Master上)的RPC终端发送LaunchDriver消息。
(6)Worker在RPC终端接收到消息后开始处理,实例化一个DriverRunner,并运行之前封装的应用程序。
注意:从上面部署框架及其术语解析部分可以知道,由于提交的应用程序在main部分包含了SparkContext实例,因此我们也称之为Driver Program,即驱动程序。因此,在框架中,对应在Master和Worker处都使用Driver,而不是Application(应用程序)。
其中主要的源码及其分析如下。
(1)RestSubmissionClient中run方法的代码如下所示。
RestSubmissionClient.scala的源码如下。
1. def run( 2. appResource: String, 3. mainClass: String, 4. appArgs: Array[String], 5. conf: SparkConf, 6. env: Map[String, String] = Map()): SubmitRestProtocolResponse = { 7. val master = conf.getOption("spark.master").getOrElse { 8. throw new IllegalArgumentException("'spark.master' must be set.") 9. } 10. val sparkProperties = conf.getAll.toMap 11. //创建一个Rest提交客户端 12. val client = new RestSubmissionClient(master) 13. //封装应用程序的相关信息,包括主资源、主类等 14. val submitRequest = client.constructSubmitRequest( 15. appResource, mainClass, appArgs, sparkProperties, env) 16. //Rest提交客户端开始创建Submission,创建过程中向RestSubmissionServer发送 //post请求 17. 18. client.createSubmission(submitRequest) 19. }
(2)收到提交的Post请求之后,StandaloneSubmitRequestServlet向Master的RPC终端发送RequestSubmitDriver请求,代码如下所示。
Spark 2.1.1版本StandaloneRestServer.scala的源码如下。
1. protected override def handleSubmit( 2. requestMessageJson: String, 3. requestMessage: SubmitRestProtocolMessage, 4. responseServlet: HttpServletResponse): SubmitRestProtocolResponse = { 5. requestMessage match { 6. case submitRequest: CreateSubmissionRequest => 7. 8. //在这里开始构建驱动程序(也就是包含SparkContext的应用程序)的描述信息, //对应DriverDescription实例并向Master的RPC终端masterEndpoint发 //送请求消息RequestSubmitDriver 9. 10. 11. val driverDescription = buildDriverDescription(submitRequest) 12. val response = masterEndpoint.askWithRetry[DeployMessages. SubmitDriverResponse]( 13. DeployMessages.RequestSubmitDriver(driverDescription)) 14. 15. val submitResponse = new CreateSubmissionResponse 16. submitResponse.serverSparkVersion = sparkVersion 17. submitResponse.message = response.message 18. submitResponse.success = response.success 19. submitResponse.submissionId = response.driverId.orNull 20. ......
Spark 2.2.0版本的StandaloneRestServer.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第12行masterEndpoint.askWithRetry方法调整为masterEndpoint.askSync方法。
1. ...... 2. val response = masterEndpoint.askSync[DeployMessages.SubmitDriverResponse]( 3. DeployMessages.RequestSubmitDriver(driverDescription)) 4. ......
(3)构建DriverDescription的buildDriverDescription方法的代码如下所示。
StandaloneRestServer.scala的源码如下。
1. DriverDescription private def buildDriverDescription(request: CreateSubmissionRequest): DriverDescription = { 2. ...... 3. //构建Command实例,将主类mainClass封装到DriverWrapper(可以通过jps查看) 4. val command = new Command( 5. "org.apache.spark.deploy.worker.DriverWrapper", 6. Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs, //args to the DriverWrapper 7. environmentVariables, extraClassPath, extraLibraryPath, javaOpts) 8. ...... 9. 10. //构建驱动程序的描述信息DriverDescription 11. new DriverDescription( 12. appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command) 13. }
(4)Master接收RequestSubmitDriver,处理消息并返回SubmitDriverResponse消息。
Master.scala的源码如下。
1. case RequestSubmitDriver(description) => 2. if (state != RecoveryState.ALIVE) { 3. val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + 4. "Can only accept driver submissions in ALIVE state." 5. context.reply(SubmitDriverResponse(self, false, None, msg)) 6. } else { 7. logInfo("Driver submitted " + description.command.mainClass) 8. val driver = createDriver(description) 9. persistenceEngine.addDriver(driver) 10. waitingDrivers += driver 11. drivers.add(driver) 12. schedule() 13. 14. //待办事项:让提交的客户端轮询master来确定driver的当前状态。目前使用fire //and forget方式发送消息 15. 16. 17. context.reply(SubmitDriverResponse(self, true, Some(driver.id), 18. s"Driver successfully submitted as ${driver.id}")) 19. }
(5)Master的schedule():调度机制的调度代码如下所示。
Master.scala的源码如下。
1. private def schedule(): Unit = { 2. 3. launchDriver(worker, driver) 4. ..... 5. startExecutorsOnWorkers() 6. }
(6)Worker上的Driver启动的代码如下所示。
Worker.scala的源码如下。
1. case LaunchDriver(driverId, driverDesc) => 2. logInfo(s"Asked to launch driver $driverId") 3. val driver = new DriverRunner( 4. conf, 5. driverId, 6. workDir, 7. sparkHome, 8. driverDesc.copy(command = Worker.maybeUpdateSSLSettings (driverDesc.command, conf)), 9. self, 10. workerUri, 11. securityMgr) 12. drivers(driverId) = driver 13. driver.start() 14. 15. coresUsed += driverDesc.cores 16. memoryUsed += driverDesc.mem
Driver Client管理Driver,包括向Master提交Driver、请求Kill Driver等。Driver Client与Master间的交互消息如下。
DeployMessages.scala的源码如下。
1. //DriverClient <-> Master 2. //Driver Client向Master请求提交Driver 3. case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage 4. //Master向Driver Client返回注册是否成功的消息 5. case class SubmitDriverResponse( 6. master: RpcEndpointRef, success: Boolean, driverId: Option[String], message: String) 7. extends DeployMessage 8. //Driver Client向Master请求Kill Driver 9. case class RequestKillDriver(driverId: String) extends DeployMessage 10. //Master回复Kill Driver是否成功 11. case class KillDriverResponse( 12. master: RpcEndpointRef, driverId: String, success: Boolean, message: String) 13. extends DeployMessage 14. //Driver Client向Master请求Driver状态 15. case class RequestDriverStatus(driverId: String) extends DeployMessage 16. //Master向Driver Client返回状态请求信息 17. case class DriverStatusResponse(found: Boolean, state: Option [DriverState], 18. workerId: Option[String], workerHostPort: Option[String], exception: Option[Exception])
Driver在handleSubmit方法中向Master请求提交RequestSubmitDriver消息。
Master收到Driver StandaloneSubmitRequestServlet发送的消息RequestSubmitDriver。Master做相应的处理以后,返回Driver StandaloneSubmitRequestServlet消息SubmitDriver-Response。
Master的源码如下。
1. case RequestSubmitDriver(description) => 2. if (state != RecoveryState.ALIVE) { 3. val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + 4. "Can only accept driver submissions in ALIVE state." 5. context.reply(SubmitDriverResponse(self, false, None, msg)) 6. } else { 7. logInfo("Driver submitted " + description.command.mainClass) 8. val driver = createDriver(description) 9. persistenceEngine.addDriver(driver) 10. waitingDrivers += driver 11. drivers.add(driver) 12. schedule() 13. 14. //待办事项:让提交的客户端轮询master来确定driver的当前状态,目前使用fire //and forget方式发送消息 15. 16. 17. context.reply(SubmitDriverResponse(self, true, Some(driver.id), 18. s"Driver successfully submitted as ${driver.id}")) 19. }
类似地,Master收到Driver StandaloneKillRequestServlet方法中发送的RequestKillDriver消息,Master做相应的处理以后,返回Driver StandaloneKillRequestServlet消息KillDriverResponse。
Master收到Driver StandaloneStatusRequestServlet方法中发送的RequestDriverStatus更新消息,Master做相应的处理以后,返回Driver StandaloneStatusRequestServlet消息DriverStatusResponse。