7.6 Shuffle与Storage模块间的交互
在Spark中,存储模块被抽象成Storage。顾名思义,Storage是存储的意思,代表着Spark中的数据存储系统,负责管理和实现数据块(Block)的存放。其中存取数据的最小单元是Block,数据由不同的Block组成,所有操作都是以Block为单位进行的。从本质上讲,RDD中的Partition和Storage中的Block是等价的,只是所处的模块不同,看待的角度不一样而已。
Storage抽象模块的实现分为两个层次,如图7-13所示。
(1)通信层:通信层是典型的Master-Slave结构,Master和Slave之间传输控制和状态信息。通信层主要由BlockManager、BlockManagerMaster、BlockManagerMasterEndpoint、BlockManagerSlaveEndpoint等类实现。
(2)存储层:负责把数据存储到内存、磁盘或者堆外内存中,有时还需要为数据在远程节点上生成副本,这些都由存储层提供的接口实现。Spark 2.2.0具体的存储层的实现类有DiskStore和MemoryStore。
图7-13 Storage存储模块
Shuffle模块若要和Storage模块进行交互,需要通过调用统一的操作类BlockManager来完成。如果把整个存储模块看成一个黑盒,BlockManager就是黑盒上留出的一个供外部调用的接口。
7.6.1 Shuffle注册的交互
Spark中BlockManager在Driver端的创建,在SparkContext创建的时候会根据具体的配置创建SparkEnv对象,源码如下所示。
SparkContext.scala的源码如下。
1. _env = createSparkEnv(_conf, isLocal, listenerBus) 2. SparkEnv.set(_env) 3. ....... 4. private[spark] def createSparkEnv( 5. conf: SparkConf, 6. isLocal: Boolean, 7. listenerBus: LiveListenerBus): SparkEnv = { 8. //创建Driver端的运行环境 9. SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext .numDriverCores(master)) 10. }
createSparkEnv方法中,传入SparkConf配置对象、isLocal标志,以及LiveListenerBus,方法中使用SparkEnv对象的createDriverEnv方法创建SparkEnv并返回。在SparkEnv的createDriverEvn方法中,将会创建BlockManager、BlockManagerMaster等对象,完成Storage在Driver端的部署。
SparkEnv中创建BlockManager、BlockManagerMaster关键源码如下所示。
SparkEnv.scala的源码如下。
1. val blockTransferService = 2. new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress, blockManagerPort, numUsableCores) 3. 4. //创建BlockManagerMasterEndpoint 5. val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint( 6. BlockManagerMaster.DRIVER_ENDPOINT_NAME, 7. //创建BlockManagerMasterEndpoint 8. new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)), 9. conf, isDriver) 10. //创建BlockManager 11. //注:blockManager无效,直到initialize()被调用 12. val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster, 13. serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, 14. blockTransferService, securityManager, numUsableCores)
使用new关键字实例化出BlockManagerMaster,传入BlockManager的构造函数,实例化出BlockManager对象。这里的BlockManagerMaster和BlockManager属于聚合关系。BlockManager主要对外提供统一的访问接口,BlockManagerMaster主要对内提供各节点之间的指令通信服务。
构建BlockManager时,传入shuffleManager参数,shuffleManager是在SparkEnv中创建的,将shuffleManager传入到BlockManager中,BlockManager就拥有shuffleManager的成员变量,从而可以调用shuffleManager的相关方法。
BlockManagerMaster在Driver端和Executors中的创建稍有差别。首先来看在Driver端创建的情形。创建BlockManagerMaster传入的isDriver参数,isDriver为true,表示在Driver端创建,否则视为在Slave节点上创建。
当SparkContext中执行_env.blockManager.initialize(_applicationId)代码时,会调用Driver端BlockManager的initialize方法。Initialize方法的源码如下所示。
SparkContext.scala的源码如下。
1. _env.blockManager.initialize(_applicationId)
Spark 2.1.1版本的BlockManager.scala的源码如下。
1. def initialize(appId: String): Unit = { 2. //调用blockTransferService的init方法,blockTransferService用于在不同节点 //fetch数据、传送数据 3. blockTransferService.init(this) 4. //shuffleClient用于读取其他Executor上的shuffle files 5. shuffleClient.init(appId) 6. 7. blockReplicationPolicy = { 8. val priorityClass = conf.get( 9. "spark.storage.replication.policy", classOf [RandomBlockReplicationPolicy].getName) 10. val clazz = Utils.classForName(priorityClass) 11. val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy] 12. logInfo(s"Using $priorityClass for block replication policy") 13. ret 14. } 15. 16. val id = 17. BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None) 18. 19. //向blockManagerMaster注册BlockManager。在registerBlockManager方法中传 //入了 slaveEndpoint,slaveEndpoint 为 BlockManager 中的 RPC 对象,用于和 //blockManagerMaster通信 20. val idFromMaster = master.registerBlockManager( 21. id, 22. maxMemory, 23. slaveEndpoint) 24. //得到blockManagerId 25. blockManagerId = if (idFromMaster != null) idFromMaster else id 26. 27. //得到shuffleServerId 28. shuffleServerId = if (externalShuffleServiceEnabled) { 29. logInfo(s"external shuffle service port = $externalShuffleServicePort") 30. BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort) 31. } else { 32. blockManagerId 33. } 34. //注册shuffleServer 35. //如果存在,将注册Executors配置与本地shuffle服务 36. if (externalShuffleServiceEnabled && !blockManagerId.isDriver) { 37. registerWithExternalShuffleServer() 38. } 39. 40. logInfo(s"Initialized BlockManager: $blockManagerId") 41. }
Spark 2.2.0版本的BlockManager.scala的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第22行删除maxMemory。
上段代码中第22行之后新增参数maxOnHeapMemory:最大的堆内存大小。
上段代码中第22行之后新增参数maxOffHeapMemory:最大的堆外存大小。
1. ..... 2. maxOnHeapMemory, 3. maxOffHeapMemory, 4. ...
如上面的源码所示,initialize方法使用appId初始化BlockManager,主要完成以下工作。
(1)初始化BlockTransferService。
(2)初始化ShuffleClient。
(3)创建BlockManagerId。
(4)将BlockManager注册到BlockManagerMaster上。
(5)若ShuffleService可用,则注册ShuffleService。
在BlockManager的initialize方法上右击Find Usages,可以看到initialize方法在两个地方得到调用:一个是SparkContext;另一个是Executor。启动Executor时,会调用BlockManager的initialize方法。Executor中调用initialize方法的源码如下所示。
Executor.scala的源码如下。
1. //CoarseGrainedExecutorBackend中实例化Executor,isLocal设置成false,即 //Executor中isLocal始终为false 2. 3. if (!isLocal) { 4. //向度量系统注册 5. //env.metricsSystem.registerSource(executorSource) 6. //调用BlockManager的initialize方法,initialize方法将向BlockManagerMaster //注册,完成Executor中的BlockManager向Driver中的BlockManager注册 7. env.blockManager.initialize(conf.getAppId) 8. }
上面代码中调用了env.blockManager.initialize方法。在initialize方法中,完成BlockManger向Master端BlockManagerMaster的注册。使用方法master.registerBlockManager (id,maxMemory,slaveEndpoint)完成注册,registerBlockManager方法中传入Id、maxMemory、salveEndPoint引用,分别表示Executor中的BlockManager、最大内存、BlockManager中的BlockMangarSlaveEndpoint。BlockManagerSlaveEndpoint是一个RPC端点,使用它完成同BlockManagerMaster的通信。BlockManager收到注册请求后将Executor中注册的BlockManagerInfo存入哈希表中,以便通过BlockManagerSlaveEndpoint向Executor发送控制命令。
ShuffleManager是一个用于shuffle系统的可插拔接口。在Driver端SparkEnv中创建ShuffleManager,在每个Executor上也会创建。基于spark.shuffle.manager进行设置。Driver使用ShuffleManager注册到shuffles系统,Executors(或Driver在本地运行的任务)可以请求读取和写入数据。这将被SparkEnv的SparkConf和isDriver布尔值作为参数。
ShuffleManager.scala的源码如下。
1. private[spark] trait ShuffleManager { 2. 3. /** 4. *注册一个shuffle管理器,获取一个句柄传递给任务 5. */ 6. def registerShuffle[K, V, C]( 7. shuffleId: Int, 8. numMaps: Int, 9. dependency: ShuffleDependency[K, V, C]): ShuffleHandle 10. 11. /**为给定分区获取一个写入器。Executors节点通过Map任务调用*/ 12. def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V] 13. 14. /** *获取读取器汇聚一定范围的分区(从 startPartition 到 endPartition-1)。在 *Executors节点,通过reduce 任务调用 15. */ 16. 17. def getReader[K, C]( 18. handle: ShuffleHandle, 19. startPartition: Int, 20. endPartition: Int, 21. context: TaskContext): ShuffleReader[K, C] 22. 23. /** 24. *从ShuffleManager移除一个shuffle的元数据 25. * @return如果元数据成功删除,则返回true,否则返回false 26. */ 27. def unregisterShuffle(shuffleId: Int): Boolean 28. 29. /** * 返回一个能够根据块坐标来检索shuffle 块数据的解析器 30. */ 31. def shuffleBlockResolver: ShuffleBlockResolver 32. 33. /** 关闭ShuffleManager */ 34. def stop(): Unit 35. }
Spark Shuffle Pluggable框架ShuffleBlockManager在Spark 1.6.0之后改成了ShuffleBlockResolver。ShuffleBlockResolver具体读取shuffle数据,是一个trait。在ShuffleBlockResolver中已无getBytes方法。getBlockData(blockId: ShuffleBlockId)方法返回的是ManagedBuffer,这是核心。
ShuffleBlockResolver的源码如下。
1. trait ShuffleBlockResolver { 2. type ShuffleId = Int 3. 4. /** *为指定的块检索数据。如果块数据不可用,则抛出一个未指明的异常 5. */ 6. def getBlockData(blockId: ShuffleBlockId): ManagedBuffer 7. 8. def stop(): Unit 9. }
Spark 2.0版本中通过IndexShuffleBlockResolver来具体实现ShuffleBlockResolver (SortBasedShuffle方式),已无FileShuffleBlockManager(Hashshuffle方式)。IndexShuffle-BlockResolver创建和维护逻辑块和物理文件位置之间的shuffle blocks映射关系。来自于相同map task任务的shuffle blocks数据存储在单个合并数据文件中;数据文件中的数据块的偏移量存储在单独的索引文件中。将shuffleBlockId + reduce ID set to 0 + ".后缀"作为数据shuffle data的shuffleBlockId名字。其中,文件名后缀为".data"的是数据文件;文件名后缀为".index"的是索引文件。
7.6.2 Shuffle写数据的交互
基于Sort的Shuffle实现的ShuffleHandle包含BypassMergeSortShuffleHandle与BaseShuffleHandle。两种ShuffleHandle写数据的方法可以参考SortShuffleManager类的getWriter方法,关键代码如下所示。
SortShuffleManager的getWriter的源码如下。
1. override def getWriter[K, V]( 2. ....... 3. case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => 4. new BypassMergeSortShuffleWriter( 5. env.blockManager, 6. shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], 7. ........ 8. case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => 9. new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) 10. } 11. }
在对应构建的两种数据写入器类BypassMergeSortShuffleWriter与SortShuffleWriter中,都是通过变量shuffleBlockResolver对逻辑数据块与物理数据块的映射进行解析。BypassMergeSortShuffleWriter写数据的具体实现位于实现的write方法中,其中调用的createTempShuffleBlock方法描述了各个分区所生成的中间临时文件的格式与对应的BlockId。SortShuffleWriter写数据的具体实现位于实现的write方法中。
7.6.3 Shuffle读数据的交互
SparkEnv.get.shuffleManager.getReader是SortShuffleManager的getReader,是获取数据的阅读器,getReader方法中创建了一个BlockStoreShuffleReader实例。SortShuffleManager. scala的read()方法的源码如下。
1. override def getReader[K, C]( 2. handle: ShuffleHandle, 3. startPartition: Int, 4. endPartition: Int, 5. context: TaskContext): ShuffleReader[K, C] = { 6. new BlockStoreShuffleReader( 7. handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context) 8. }
BlockStoreShuffleReader实例的read方法,首先实例化new ShuffleBlockFetcherIterator。ShuffleBlockFetcherIterator是一个阅读器,里面有一个成员blockManager。blockManager是内存和磁盘上数据读写的统一管理器;ShuffleBlockFetcherIterator.scala的initialize方法中splitLocalRemoteBlocks()划分本地和远程的blocks,Utils.randomize(remoteRequests)把远程请求通过随机的方式添加到队列中,fetchUpToMaxBytes()发送远程请求获取我们的blocks,fetchLocalBlocks()获取本地的blocks。
7.6.4 BlockManager架构原理、运行流程图和源码解密
BlockManager是管理整个Spark运行时数据的读写,包含数据存储本身,在数据存储的基础上进行数据读写。由于Spark是分布式的,所以BlockManager也是分布式的,BlockManager本身相对而言是一个比较大的模块,Spark中有非常多的模块:调度模块、资源管理模块等。BlockManager是另外一个非常重要的模块。BlockManager本身的源码量非常大。本节从BlockManager原理流程对BlockManager做深刻地讲解。在Shuffle读写数据的时候,我们需要读写BlockManager。因此,BlockManager是至关重要的内容。
编写一个业务代码WordCount.scala,通过观察WordCount运行时BlockManager的日志来理解BlockManager的运行。
WordCount.scala的代码如下。
1. package com.dt.spark.sparksql 2. 3. import org.apache.log4j.{Level, Logger} 4. import org.apache.spark.SparkConf 5. import org.apache.spark.SparkContext 6. import org.apache.spark.internal.config 7. import org.apache.spark.rdd.RDD 8. 9. /** 10. * 使用Scala开发本地测试的Spark WordCount程序 11. * 12. * @author DT大数据梦工厂 13. * 新浪微博:http://weibo.com/ilovepains/ 14. */ 15. object WordCount { 16. def main(args: Array[String]) { 17. Logger.getLogger("org").setLevel(Level.ALL) 18. 19. /** 20. *第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息, 21. *例如,通过setMaster设置程序要链接的Spark集群的Master的URL,如果设置 22. *为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(如只有 23. *1GB的内存)的初学者 * 24. */ 25. val conf = new SparkConf() //创建SparkConf对象 26. conf.setAppName("Wow,My First Spark App!") //设置应用程序的名称,在程序 //运行的监控界面可以看到名称 27. conf.setMaster("local") //此时,程序在本地运行,不需要安装Spark集群 28. /** 29. * 第2步:创建SparkContext对象 30. * SparkContext是Spark程序所有功能的唯一入口,采用Scala、Java、Python、 * R等都必须有一个SparkContext 31. * SparkContext 核心作用:初始化 Spark 应用程序运行所需要的核心组件,包括 * DAGScheduler、TaskScheduler、SchedulerBackend 32. * 同时还会负责Spark程序往Master注册程序等 33. * SparkContext是整个Spark应用程序中最重要的一个对象 34. */ 35. val sc = new SparkContext(conf) //创建SparkContext对象,通过传入SparkConf //实例来定制Spark运行的具体参数和配置信息 36. /** 37. * 第 3 步:根据具体的数据来源(如 HDFS、HBase、Local FS、DB、S3 等)通过 * SparkContext创建RDD 38. * RDD的创建基本有三种方式:根据外部的数据来源(如HDFS)、根据Scala集合、由 * 其他的RDD操作 39. * 数据会被RDD划分为一系列的Partitions,分配到每个Partition的数据属于一 * 个Task的处理范畴 40. */ 41. //val lines: RDD[String] = sc.textFile("D://Big_Data_Software spark- 1.6.0-bin-hadoop2.6README.md", 1) //读取本地文件并设置为一个Partition 42. // val lines = sc.textFile("D://Big_Data_Software spark-1.6.0-bin- hadoop2.6//README.md", 1) //读取本地文件并设置为一个Partition 43. 44. val lines = sc.textFile("data/wordcount/helloSpark.txt", 1) //读取本地文件并设置为一个Partition 45. /** 46. * 第4步:对初始的RDD进行Transformation级别的处理,如通过map、filter等 * 高阶函数等的编程,进行具体的数据计算 47. * 第4.1步:将每一行的字符串拆分成单个单词 48. */ 49. 50. val words = lines.flatMap { line => line.split(" ") } //对每一行的字符串进行单词拆分并把所有行的拆分结果通过flat合并成为一个大的单词集合 51. 52. /** 53. * 第4步:对初始的RDD进行Transformation级别的处理,如通过map、filter等 * 高阶函数等的编程,进行具体的数据计算 54. * 第4.2步:在单词拆分的基础上,对每个单词实例计数为1,也就是word => (word, 1) 55. */ 56. val pairs = words.map { word => (word, 1) } 57. 58. /** 59. * 第4步:对初始的RDD进行Transformation级别的处理,如通过map、filter等 * 高阶函数等的编程,进行具体的数据计算 60. * 第4.3步:在每个单词实例计数为1基础上,统计每个单词在文件中出现的总次数 61. */ 62. val wordCountsOdered = pairs.reduceByKey(_ + _).map(pair => (pair._2, pair._1)).sortByKey(false).map(pair => (pair._2, pair._1)) //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce) 63. wordCountsOdered.collect.foreach(wordNumberPair => println (wordNumberPair._1 + " : " + wordNumberPair._2)) 64. while (true) { 65. 66. } 67. sc.stop() 68. 69. } 70. }
在IDEA中运行一个业务程序WordCount.scala,日志中显示:
SparkEnv:Registering MapOutputTracker,其中MapOutputTracker中数据的读写都和BlockManager关联。
SparkEnv:Registering BlockManagerMaste,其中Registering BlockManagerMaster由BlockManagerMaster进行注册。
DiskBlockManager:Created local directory C:\Users\dell\AppData\Local\Temp\blockmgr-...其中DiskBlockManager是管理磁盘存储的,里面有我们的数据。可以访问Temp目录下以blockmgr-开头的文件的内容。
WordCount运行结果如下。
1. Using Spark's default log4j profile: org/apache/spark/log4j-defaults. properties 2. 17/06/06 05:37:57 INFO SparkContext: Running Spark version 2.1.0 3. ...... 4. 17/06/06 05:38:01 INFO SparkEnv: Registering MapOutputTracker 5. 17/06/06 05:38:01 DEBUG MapOutputTrackerMasterEndpoint: init 6. 17/06/06 05:38:01 INFO SparkEnv: Registering BlockManagerMaster 7. 17/06/06 05:38:01 INFO BlockManagerMasterEndpoint: Using org.apache .spark.storage.DefaultTopologyMapper for getting topology information 8. 17/06/06 05:38:01 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 9. 17/06/06 05:38:01 INFO DiskBlockManager: Created local directory at C:\Users\dell\AppData\Local\Temp\blockmgr-a58a44dd-484b-4871-a92a-828 872c98804 10. 17/06/06 05:38:01 DEBUG DiskBlockManager: Adding shutdown hook 11. 17/06/06 05:38:01 DEBUG ShutdownHookManager: Adding shutdown hook 12. 17/06/06 05:38:01 INFO MemoryStore: MemoryStore started with capacity 637.2 MB 13. 17/06/06 05:38:02 INFO SparkEnv: Registering OutputCommitCoordinator 14. 17/06/06 05:38:02 DEBUG OutputCommitCoordinator$OutputCommitCoordinator- Endpoint: init 15. ........
从Application启动的角度观察BlockManager:
(1)Application启动时会在SparkEnv中注册BlockManagerMaster以及MapOutputTracker,其中,
a)BlockManagerMaster:对整个集群的Block数据进行管理。
b)MapOutputTrackerMaster:跟踪所有的Mapper的输出。
BlockManagerMaster中有一个引用driverEndpoint,isDriver判断是否运行在Driver上。
BlockManagerMaster的源码如下。
1. private[spark] 2. class BlockManagerMaster( 3. var driverEndpoint: RpcEndpointRef, 4. conf: SparkConf, 5. isDriver: Boolean) 6. extends Logging {
BlockManagerMaster注册给SparkEnv,SparkEnv在SparkContext中。
SparkContext.scala的源码如下。
1. ...... 2. private var _env: SparkEnv = _ 3. ...... 4. _env = createSparkEnv(_conf, isLocal, listenerBus) 5. SparkEnv.set(_env)
进入createSparkEnv方法:
1. private[spark] def createSparkEnv( 2. conf: SparkConf, 3. isLocal: Boolean, 4. listenerBus: LiveListenerBus): SparkEnv = { 5. SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext. numDriverCores(master)) 6. }
进入SparkEnv.scala的createDriverEnv方法:
1. private[spark] def createDriverEnv( 2. ...... 3. create( 4. conf, 5. SparkContext.DRIVER_IDENTIFIER, 6. bindAddress, 7. advertiseAddress, 8. port, 9. isLocal, 10. numCores, 11. ioEncryptionKey, 12. listenerBus = listenerBus, 13. mockOutputCommitCoordinator = mockOutputCommitCoordinator 14. ) 15. } 16. ......
SparkEnv.scala的createDriverEnv中调用了create方法,判断是否是Driver。create方法的源码如下。
1. private def create( 2. conf: SparkConf, 3. executorId: String, 4. bindAddress: String, 5. advertiseAddress: String, 6. port: Int, 7. isLocal: Boolean, 8. numUsableCores: Int, 9. ioEncryptionKey: Option[Array[Byte]], 10. listenerBus: LiveListenerBus = null, 11. mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { 12. 13. val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER 14. ...... 15. if (isDriver) { 16. conf.set("spark.driver.port", rpcEnv.address.port.toString) 17. } else if (rpcEnv.address != null) { 18. conf.set("spark.executor.port", rpcEnv.address.port.toString) 19. logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port .toString}") 20. } 21. ...... 22. val mapOutputTracker = if (isDriver) { 23. new MapOutputTrackerMaster(conf, broadcastManager, isLocal) 24. } else { 25. new MapOutputTrackerWorker(conf) 26. } 27. ...... 28. SparkContext.scala 29. private[spark] val DRIVER_IDENTIFIER = "driver" 30. ......
在SparkEnv.scala的createDriverEnv中调用new()函数创建一个MapOutputTrackerMaster。MapOutputTrackerMaster的源码如下。
1. private[spark] class MapOutputTrackerMaster(conf: SparkConf, 2. broadcastManager: BroadcastManager, isLocal: Boolean) 3. extends MapOutputTracker(conf) { 4. ......
然后看一下blockManagerMaster。在SparkEnv.scala中调用new()函数创建一个blockManagerMaster。
1. val blockManagerMaster = new BlockManagerMaster (registerOrLookupEndpoint( 2. BlockManagerMaster.DRIVER_ENDPOINT_NAME, 3. new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)), 4. conf, isDriver)
BlockManagerMaster对整个集群的Block数据进行管理,Block是Spark数据管理的单位,与数据存储没有关系,数据可能存在磁盘上,也可能存储在内存中,还可能存储在offline,如Alluxio上。源码如下。
1. private[spark] 2. class BlockManagerMaster( 3. var driverEndpoint: RpcEndpointRef, 4. conf: SparkConf, 5. isDriver: Boolean) 6. extends Logging { 7. .....
构建BlockManagerMaster的时候调用new()函数创建一个BlockManagerMasterEndpoint,这是循环消息体。
1. private[spark] 2. class BlockManagerMasterEndpoint( 3. override val rpcEnv: RpcEnv, 4. val isLocal: Boolean, 5. conf: SparkConf, 6. listenerBus: LiveListenerBus) 7. extends ThreadSafeRpcEndpoint with Logging {
(2)BlockManagerMasterEndpoint本身是一个消息体,会负责通过远程消息通信的方式去管理所有节点的BlockManager。
查看WordCount在IDEA中的运行日志,日志中显示BlockManagerMasterEndpoint: Registering block manager,向block manager进行注册。
1. ...... 2. 17/06/06 05:38:02 INFO BlockManager: Using org.apache.spark.storage. RandomBlockReplicationPolicy for block replication policy 3. 17/06/06 05:38:02 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.93.1, 63572, None) 4. 17/06/06 05:38:02 DEBUG DefaultTopologyMapper: Got a request for 192.168. 93.1 5. 17/06/06 05:38:02 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.93.1:63572 with 637.2 MB RAM, BlockManagerId(driver, 192.168.93.1, 63572, None) 6. 17/06/06 05:38:02 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.93.1, 63572, None) 7. 17/06/06 05:38:02 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.93.1, 63572, None) 8. .......
(3)每启动一个ExecutorBackend,都会实例化BlockManager,并通过远程通信的方式注册给BlockManagerMaster;实质上是Executor中的BlockManager在启动的时候注册给了Driver上的BlockManagerMasterEndpoint。
(4)MemoryStore是BlockManager中专门负责内存数据存储和读写的类。
查看WordCount在IDEA中的运行日志,日志中显示MemoryStore: Block broadcast_0 stored as values in memory,数据存储在内存中。
1. ....... 2. 17/06/06 05:38:04 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 208.5 KB, free 637.0 MB) 3. 17/06/06 05:38:04 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.0 KB, free 637.0 MB) 4. 17/06/06 05:38:04 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.93.1:63572 (size: 20.0 KB, free: 637.2 MB) 5. .......
Spark读写数据是以block为单位的,MemoryStore将block数据存储在内存中。MemoryStore.scala的源码如下。
1. private[spark] class MemoryStore( 2. conf: SparkConf, 3. blockInfoManager: BlockInfoManager, 4. serializerManager: SerializerManager, 5. memoryManager: MemoryManager, 6. blockEvictionHandler: BlockEvictionHandler) 7. extends Logging { 8. ......
(5)DiskStore是BlockManager中专门负责基于磁盘的数据存储和读写的类。
Spark 2.1.1版本的DiskStore.scala的源码如下。
1. private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) extends Logging { 2. .......
Spark 2.2.0版本的DiskStore.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第1行新增加了securityManager安全管理的成员变量。
1. ....... 2. securityManager: SecurityManager) extends Logging {
(6)DiskBlockManager:管理Logical Block与Disk上的Physical Block之间的映射关系并负责磁盘文件的创建、读写等。
查看WordCount在IDEA中的运行日志,日志中显示INFO DiskBlockManager: Created local directory。DiskBlockManager负责磁盘文件的管理。
1. ..... 2. 17/06/06 05:38:01 INFO BlockManagerMasterEndpoint: Using org.apache. spark.storage.DefaultTopologyMapper for getting topology information 3. 17/06/06 05:38:01 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 4. 17/06/06 05:38:01 INFO DiskBlockManager: Created local directory at C:\Users\dell\AppData\Local\Temp\blockmgr-a58a44dd-484b-4871-a92a-828 872c98804 5. 17/06/06 05:38:01 DEBUG DiskBlockManager: Adding shutdown hook 6. .......
DiskBlockManager负责管理逻辑级别和物理级别的映射关系,根据BlockID映射一个文件。在目录spark.local.dir或者SPARK_LOCAL_DIRS中,Block文件进行hash生成。通过createLocalDirs生成本地目录。DiskBlockManager的源码如下。
1. private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging { 2. ...... 3. private def createLocalDirs(conf: SparkConf): Array[File] = { 4. Utils.getConfiguredLocalDirs(conf).flatMap { rootDir => 5. try { 6. val localDir = Utils.createDirectory(rootDir, "blockmgr") 7. logInfo(s"Created local directory at $localDir") 8. Some(localDir) 9. } catch { 10. case e: IOException => 11. logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e) 12. None 13. } 14. } 15. }
从Job运行的角度来观察BlockManager:
查看WordCount.scala的运行日志:日志中显示INFO BlockManagerInfo: Added broadcast_0_piece0 in memory,将BlockManagerInfo的广播变量加入到内存中。
1. ...... 2. 17/06/06 05:38:04 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.0 KB, free 637.0 MB) 3. 17/06/06 05:38:04 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.93.1:63572 (size: 20.0 KB, free: 637.2 MB) 4. ......
Driver使用BlockManagerInfo管理ExecutorBackend中BlockManager的元数据,BlockManagerInfo的成员变量包括blockManagerId、系统当前时间timeMs、最大堆内内存maxOnHeapMem、最大堆外内存maxOffHeapMem、slaveEndpoint。
Spark 2.1.1版本的BlockManagerMasterEndpoint.scala的源码如下。
1. private[spark] class BlockManagerInfo( 2. val blockManagerId: BlockManagerId, 3. timeMs: Long, 4. val maxMem: Long, 5. val slaveEndpoint: RpcEndpointRef) 6. extends Logging {
Spark 2.2.0版本的BlockManagerMasterEndpoint.scala的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第4行删除maxMem。
上段代码中第4行之后新增maxOnHeapMem成员变量:最大的堆内内存大小。
上段代码中第4行之后新增maxOffHeapMem成员变量:最大的堆外内存大小。
1. ...... 2. val maxOnHeapMem: Long, 3. val maxOffHeapMem: Long, 4. ...... 5. extends Logging {
集群中每启动一个节点,就创建一个BlockManager,BlockManager是在每个节点(Driver及Executors)上运行的管理器,用于存放和检索本地和远程不同的存储块(内存、磁盘和堆外内存)。BlockManagerInfo中的BlockManagerId标明是哪个BlockManager,slaveEndpoint是消息循环体,用于消息通信。
(1)首先通过MemoryStore存储广播变量。
(2)在Driver中是通过BlockManagerInfo来管理集群中每个ExecutorBackend中的BlockManager中的元数据信息的。
(3)当改变了具体的ExecutorBackend上的Block信息后,就必须发消息给Driver中的BlockManagerMaster来更新相应的BlockManagerInfo。
(4)当执行第二个Stage的时候,第二个Stage会向Driver中的MapOutputTracker-MasterEndpoint发消息请求上一个Stage中相应的输出,此时MapOutputTrackerMaster会把上一个Stage的输出数据的元数据信息发送给当前请求的Stage。图7-14是BlockManager工作原理和运行机制简图:
图7-14 BlockManager工作原理和运行机制简图
BlockManagerMasterEndpoint.scala中BlockManagerInfo的getStatus方法如下。
1. def getStatus(blockId: BlockId): Option[BlockStatus] = Option(_blocks. get(blockId))
其中的BlockStatus是一个case class。
1. case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) { 2. def isCached: Boolean = memSize + diskSize > 0 3. }
BlockTransferService.scala进行网络连接操作,获取远程数据。
1. private[spark] 2. abstract class BlockTransferService extends ShuffleClient with Closeable with Logging {
7.6.5 BlockManager解密进阶:BlockManager初始化和注册解密、BlockManagerMaster工作解密、BlockTransferService解密、本地数据读写解密、远程数据读写解密
BlockManager既可以运行在Driver上,也可以运行在Executor上。在Driver上的BlockManager管理集群中Executor的所有的BlockManager,BlockManager分成Master、Slave结构,一切的调度、一切的工作由Master触发,Executor在启动的时候一定会启动BlockManager。BlockManager主要提供了读和写数据的接口,可以从本地读写数据,也可以从远程读写数据。读写数据可以基于磁盘,也可以基于内存以及OffHeap。OffHeap就是堆外空间(如Alluxion是分布式内存管理系统,与基于内存计算的Spark系统形成天衣无缝的组合,在大数据领域中,Spark+Alluxion+Kafka是非常有用的组合)。
从整个程序运行的角度看,Driver也是Executor的一种,BlockManager可以运行在Driver上,也可以运行在Executor上。BlockManager.scala的源码如下。
1. private[spark] class BlockManager( 2. executorId: String, 3. rpcEnv: RpcEnv, 4. val master: BlockManagerMaster, 5. val serializerManager: SerializerManager, 6. val conf: SparkConf, 7. memoryManager: MemoryManager, 8. mapOutputTracker: MapOutputTracker, 9. shuffleManager: ShuffleManager, 10. val blockTransferService: BlockTransferService, 11. securityManager: SecurityManager, 12. numUsableCores: Int) 13. extends BlockDataManager with BlockEvictionHandler with Logging { 14. ...... 15. val diskBlockManager = { 16. //如果外部服务不为shuffle 文件提供服务执行清理文件 17. val deleteFilesOnStop = 18. !externalShuffleServiceEnabled || executorId == SparkContext. DRIVER_IDENTIFIER 19. new DiskBlockManager(conf, deleteFilesOnStop) 20. } 21. ...... 22. private val futureExecutionContext = ExecutionContext.fromExecutorService( 23. ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128)) 24. ...... 25. private[spark] val memoryStore = 26. new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this) 27. private[spark] val diskStore = new DiskStore(conf, diskBlockManager) 28. memoryManager.setMemoryStore(memoryStore) 29. ...... 30. def initialize(appId: String): Unit = { 31. .......
BlockManager中的成员变量中:BlockManagerMaster对整个集群的BlockManagerMaster进行管理;serializerManager是默认的序列化器;MemoryManager是内存管理;MapOutputTracker是Shuffle输出的时候,要记录ShuffleMapTask输出的位置,以供下一个Stage使用,因此需要进行记录。BlockTransferService是进行网络操作的,如果要连同另外一个BlockManager进行数据读写操作,就需要BlockTransferService。Block是Spark运行时数据的最小抽象单位,可能放入内存中,也可能放入磁盘中,还可能放在Alluxio上。
SecurityManager是安全管理;numUsableCores是可用的Cores。
BlockManager中DiskBlockManager管理磁盘的读写,创建并维护磁盘上逻辑块和物理块之间的逻辑映射位置。一个block被映射到根据BlockId生成的一个文件,块文件哈希列在目录spark.local.dir中(如果设置了SPARK LOCAL DIRS),或在目录(SPARK LOCAL DIRS)中。
然后在BlockManager中创建一个缓存池:block-manager-future以及memoryStore 、diskStore。
Shuffle读写数据的时候是通过BlockManager进行管理的。
Spark 2.1.1版本的BlockManager.scala的源码如下。
1. var blockManagerId: BlockManagerId = _ 2. 3. //服务此Executor的shuffle文件的服务器的地址,这或者是外部的服务,或者只是我们 //自己的Executor的BlockManager 4. private[spark] var shuffleServerId: BlockManagerId = _ 5. 6. //客户端读取其他Executors的shuffle文件。这或者是一个外部服务,或者只是 //标准BlockTransferService 直接连接到其他Executors 7. 8. private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { 9. val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores) 10. new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(), 11. securityManager.isSaslEncryptionEnabled()) 12. } else { 13. blockTransferService 14. }
Spark 2.2.0版本的BlockManager.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第11行ExternalShuffleClient类中去掉securityManager.isSaslEncryptionEnabled()成员变量。
1. ...... 2. new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled()) 3. ......
BlockManager.scala中,BlockManager实例对象通过调用initialize方法才能正式工作,传入参数是appId,基于应用程序的ID初始化BlockManager。initialize不是在构造器的时候被使用,因为BlockManager实例化的时候还不知道应用程序的ID,应用程序ID是应用程序启动时,ExecutorBackend向Master注册时候获得的。
BlockManager.scala的initialize方法中的BlockTransferService进行网络通信。ShuffleClient是BlockManagerWorker每次启动时向BlockManagerMaster注册。BlockManager.scala的initialize方法中调用了registerBlockManager,向Master进行注册,告诉BlockManagerMaster把自己注册进去。
Spark 2.1.1版本BlockManagerMaster.scala的registerBlockManager的源码如下。
1. def registerBlockManager( 2. blockManagerId: BlockManagerId, 3. maxMemSize: Long, 4. slaveEndpoint: RpcEndpointRef): BlockManagerId = { 5. logInfo(s"Registering BlockManager $blockManagerId") 6. val updatedId = driverEndpoint.askWithRetry[BlockManagerId]( 7. RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint)) 8. logInfo(s"Registered BlockManager $updatedId") 9. updatedId 10. }
Spark 2.2.0版本的BlockManagerMaster.scala的registerBlockManager的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第3行maxMemSize删除。
上段代码中第3行之后新增参数maxOnHeapMemSize:最大的堆内内存大小。
上段代码中第3行之后新增参数maxOffHeapMemSize:最大的堆外内存大小。
上段代码中第6行driverEndpoint.askWithRetry方法调整为driverEndpoint.askSync方法。
上段代码中第7行RegisterBlockManager新增maxOnHeapMemSize、maxOffHeapMemSize两个参数。
1. ...... 2. maxOnHeapMemSize: Long, 3. maxOffHeapMemSize: Long, 4. ...... 5. val updatedId = driverEndpoint.askSync[BlockManagerId]( 6. RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) 7. ......
registerBlockManager方法的RegisterBlockManager是一个case class。
Spark 2.1.1版本的BlockManagerMessages.scala的源码如下。
1. case class RegisterBlockManager( 2. blockManagerId: BlockManagerId, 3. maxMemSize: Long, 4. sender: RpcEndpointRef) 5. extends ToBlockManagerMaster
Spark 2.2.0版本的BlockManagerMessages.scala的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第3行maxMemSize删除。
上段代码中第3行之后新增成员变量maxOnHeapMemSize:最大堆内内存大小。
上段代码中第3行之后新增成员变量maxOffHeapMemSize:最大堆外内存大小。
1. ...... 2. maxOnHeapMemSize: Long, 3. maxOffHeapMemSize: Long, 4. ......
在Executor实例化的时候,要初始化blockManager。blockManager在initialize中将应用程序ID传进去。
Executor.scala的源码如下。
1. if (!isLocal) { 2. env.metricsSystem.registerSource(executorSource) 3. env.blockManager.initialize(conf.getAppId) 4. }
Executor.scala中,Executor每隔10s向Master发送心跳消息,如收不到心跳消息,blockManager须重新注册。
Spark 2.1.1版本的Executor.scala的源码如下。
1. ....... 2. val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId) 3. try { 4. val response = heartbeatReceiverRef.askWithRetry [HeartbeatResponse]( 5. message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s")) 6. if (response.reregisterBlockManager) { 7. logInfo("Told to re-register on heartbeat") 8. env.blockManager.reregister() 9. } 10. heartbeatFailures = 0 11. } catch { 12. case NonFatal(e) => 13. logWarning("Issue communicating with driver in heartbeater", e) 14. heartbeatFailures += 1 15. if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) { 16. logError(s"Exit as unable to send heartbeats to driver " + 17. s"more than $HEARTBEAT_MAX_FAILURES times") 18. System.exit(ExecutorExitCode.HEARTBEAT_FAILURE) 19. } 20. } 21. } 22. .......
Spark 2.2.0版本的Executor.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第4行heartbeatReceiverRef.askWithRetry方法调整为heartbeatReceiverRef.askSync方法。
1. ....... 2. val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( 3. message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
回到BlockManagerMaster.scala的registerBlockManager:
registerBlockManager中RegisterBlockManager传入的slaveEndpoint是:具体的Executor启动时会启动一个BlockManagerSlaveEndpoint,会接收BlockManagerMaster发过来的指令。在initialize方法中通过master.registerBlockManager传入slaveEndpoint,而slaveEndpoint是在rpcEnv.setupEndpoint方法中调用new()函数创建的BlockManagerSlaveEndpoint。
总结一下:
(1)当Executor实例化的时候,会通过BlockManager.initialize来实例化Executor上的BlockManager,并且创建BlockManagerSlaveEndpoint这个消息循环体来接受Driver中BlockManagerMaster发过来的指令,如删除Block等。
1. env.blockManager.initialize(conf.getAppId)
BlockManagerSlaveEndpoint.scala的源码如下。
1. class BlockManagerSlaveEndpoint( 2. override val rpcEnv: RpcEnv, 3. blockManager: BlockManager, 4. mapOutputTracker: MapOutputTracker) 5. extends ThreadSafeRpcEndpoint with Logging {
(2)当BlockManagerSlaveEndpoint实例化后,Executor上的BlockManager需要向Driver上的BlockManagerMasterEndpoint注册。
BlockManagerMaster的registerBlockManager方法,其中的driverEndpoint是构建BlockManagerMaster时传进去的。
(3)BlockManagerMasterEndpoint接收到Executor上的注册信息并进行处理。
Spark 2.1.1版本的BlockManagerMasterEndpoint.scala的源码如下。
1. class BlockManagerMasterEndpoint( 2. override val rpcEnv: RpcEnv, 3. ...... 4. override def receiveAndReply(context: RpcCallContext): PartialFunction [Any, Unit] = { 5. case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) => 6. context.reply(register(blockManagerId, maxMemSize, slaveEndpoint)) 7. ......
Spark 2.2.0版本的BlockManagerMasterEndpoint.scala的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第5行RegisterBlockManager新增成员变量:maxOnHeapMemSize、maxOffHeapMemSize。
上段代码中第6行register新增成员变量:maxOnHeapMemSize、maxOffHeapMemSize。
1. ...... 2. override def receiveAndReply(context: RpcCallContext): PartialFunction [Any, Unit] = { 3. case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) => 4. context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) 5. ......
BlockManagerMasterEndpoint的register注册方法,为每个Executor的BlockManager生成对应的BlockManagerInfo。BlockManagerInfo是一个HashMap[BlockManagerId, BlockManagerInfo]。
register注册方法源码如下。
Spark 2.1.1版本的BlockManagerMasterEndpoint.scala的源码如下。
1. private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo] 2. ...... 3. private def register( 4. idWithoutTopologyInfo: BlockManagerId, 5. maxMemSize: Long, 6. slaveEndpoint: RpcEndpointRef): BlockManagerId = { 7. //dummy id不应包含拓扑信息 8. //我们在这里得到信息和回应一个块标识符 9. val id = BlockManagerId( 10. idWithoutTopologyInfo.executorId, 11. idWithoutTopologyInfo.host, 12. idWithoutTopologyInfo.port, 13. topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host)) 14. 15. val time = System.currentTimeMillis() 16. if (!blockManagerInfo.contains(id)) { 17. blockManagerIdByExecutor.get(id.executorId) match { 18. case Some(oldId) => 19. //同一个Executor 的块管理器已经存在,所以删除它(假定已挂掉) 20. logError("Got two different block manager registrations on same executor - " 21. + s" will replace old one $oldId with new one $id") 22. removeExecutor(id.executorId) 23. case None => 24. } 25. logInfo("Registering block manager %s with %s RAM, %s".format( 26. id.hostPort, Utils.bytesToString(maxMemSize), id)) 27. 28. blockManagerIdByExecutor(id.executorId) = id 29. 30. blockManagerInfo(id) = new BlockManagerInfo( 31. id, System.currentTimeMillis(), maxMemSize, slaveEndpoint) 32. } 33. listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize)) 34. id 35. }......
Spark 2.2.0版本的BlockManagerMasterEndpoint.scala的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第5行删掉maxMemSize。
上段代码中第5行之后Register新增参数maxOnHeapMemSize:最大堆内内存大小;maxOffHeapMemSize:最大堆外内存大小。
上段代码中第26行日志打印时新增最大堆内内存大小、最大堆外内存大小的信息。
上段代码中第31行构建BlockManagerInfo实例时传入maxOnHeapMemSize、maxOffHeapMemSize。
上段代码中第33行listenerBus监控系统增加对最大堆内内存大小、最大堆外内存大小信息的监控。
1. ....... 2. maxOnHeapMemSize: Long, 3. maxOffHeapMemSize: Long, 4. ...... 5. id.hostPort, Utils.bytesToString(maxOnHeapMemSize + maxOffHeapMemSize), id)) 6. ....... 7. 8. blockManagerInfo(id) = new BlockManagerInfo( 9. id, System.currentTimeMillis(), maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) 10. ....... 11. listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize, 12. Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) 13. .......
BlockManagerMasterEndpoint中,BlockManagerId是一个class,标明了BlockManager在哪个Executor中,以及host主机名、port端口等信息。
BlockManagerId.scala的源码如下。
1. class BlockManagerId private ( 2. private var executorId_ : String, 3. private var host_ : String, 4. private var port_ : Int, 5. private var topologyInfo_ : Option[String]) 6. extends Externalizable {
BlockManagerMasterEndpoint中,BlockManagerInfo包含内存、slaveEndpoint等信息。
回到BlockManagerMasterEndpoint的register注册方法:如果blockManagerInfo没有包含BlockManagerId,根据BlockManagerId.executorId查询BlockManagerId,如果匹配到旧的BlockManagerId,就进行清理。
BlockManagerMasterEndpoint的removeExecutor方法如下。
1. private def removeExecutor(execId: String) { 2. logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.") 3. blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) 4. }
进入removeBlockManager方法,从blockManagerIdByExecutor数据结构中清理掉block manager信息,从blockManagerInfo数据结构中清理掉所有的blocks信息。removeBlockManager源码如下。
Spark 2.1.1版本的BlockManagerMasterEndpoint.scala的removeBlockManager的源码如下。
1. private def removeBlockManager(blockManagerId: BlockManagerId) { 2. val info = blockManagerInfo(blockManagerId) 3. 4. //从blockManagerIdByExecutor删除块管理 5. blockManagerIdByExecutor -= blockManagerId.executorId 6. 7. //将它从blockManagerInfo 删除所有的块 8. blockManagerInfo.remove(blockManagerId) 9. val iterator = info.blocks.keySet.iterator 10. while (iterator.hasNext) { 11. val blockId = iterator.next 12. val locations = blockLocations.get(blockId) 13. locations -= blockManagerId 14. if (locations.size == 0) { 15. blockLocations.remove(blockId) 16. } 17. } 18. listenerBus.post(SparkListenerBlockManagerRemoved(System. currentTimeMillis(), blockManagerId)) 19. logInfo(s"Removing block manager $blockManagerId") 20. }
Spark 2.2.0版本的BlockManagerMasterEndpoint.scala的removeBlockManager的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第16~20行整体替换为以下代码:新增数据复制处理。
1. ....... 2. //如果没有块管理器,就注销这个块。否则,如果主动复制启用,块block是一个RDD //或测试块 block(后者用于单元测试),我们发送一条消息随机选择Executor的位 //置来复制给定块block。注意,我们忽略了其他块block类型(如广播broadcast/ //shuffle blocks),因为复制在这种情况下没有多大意义 3. ...... 4. logWarning(s"No more replicas available for $blockId !") 5. } else if (proactivelyReplicate && (blockId.isRDD || blockId. isInstanceOf[TestBlockId])) { 6. 7. //假设Executor未能找出故障前存在的副本数量 8. val maxReplicas = locations.size + 1 9. val i = (new Random(blockId.hashCode)).nextInt(locations.size) 10. val blockLocations = locations.toSeq 11. val candidateBMId = blockLocations(i) 12. blockManagerInfo.get(candidateBMId).foreach { bm => 13. val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId) 14. val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas) 15. bm.slaveEndpoint.ask[Boolean](replicateMsg) 16. } 17. } 18. } 19. .........
removeBlockManager中的一行代码blockLocations.remove的remove方法如下。
HashMap.java的源码如下。
1. public V remove(Object key) { 2. Node<K,V> e; 3. return (e = removeNode(hash(key), key, null, false, true)) == null ? 4. null : e.value; 5. }
回到BlockManagerMasterEndpoint的register注册方法:然后在blockManagerIdByExecutor中加入BlockManagerId,将BlockManagerId加入BlockManagerInfo信息,在listenerBus中进行监听,函数返回BlockManagerId,完成注册。
回到BlockManager.scala,在initialize方法通过master.registerBlockManager注册成功以后,将返回值赋值给idFromMaster。Initialize初始化之后,看一下BlockManager.scala中其他的方法。
reportAllBlocks方法:具体的Executor须向Driver不断地汇报自己的状态。
BlockManager.scala的reportAllBlocks方法的源码如下。
1. private def reportAllBlocks(): Unit = { 2. logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.") 3. for ((blockId, info) <- blockInfoManager.entries) { 4. val status = getCurrentBlockStatus(blockId, info) 5. if (info.tellMaster && !tryToReportBlockStatus(blockId, status)) { 6. logError(s"Failed to report $blockId to master; giving up.") 7. return 8. } 9. } 10. }
reportAllBlocks方法中调用了getCurrentBlockStatus,包括内存、磁盘等信息。
getCurrentBlockStatus的源码如下。
1. private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = { 2. info.synchronized { 3. info.level match { 4. case null => 5. BlockStatus.empty 6. case level => 7. val inMem = level.useMemory && memoryStore.contains(blockId) 8. val onDisk = level.useDisk && diskStore.contains(blockId) 9. val deserialized = if (inMem) level.deserialized else false 10. val replication = if (inMem || onDisk) level.replication else 1 11. val storageLevel = StorageLevel( 12. useDisk = onDisk, 13. useMemory = inMem, 14. useOffHeap = level.useOffHeap, 15. deserialized = deserialized, 16. replication = replication) 17. val memSize = if (inMem) memoryStore.getSize(blockId) else 0L 18. val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L 19. BlockStatus(storageLevel, memSize, diskSize) 20. } 21. } 22. }
getCurrentBlockStatus方法中的BlockStatus,包含存储级别StorageLevel、内存大小、磁盘大小等信息。
BlockManagerMasterEndpoint.scala的BlockStatus的源码如下。
1. case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) { 2. def isCached: Boolean = memSize + diskSize > 0 3. } 4. ...... 5. object BlockStatus { 6. def empty: BlockStatus = BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L) 7. }
回到BlockManager.scala,其中的getLocationBlockIds方法比较重要,根据BlockId获取这个BlockId所在的BlockManager。
BlockManager.scala的getLocationBlockIds的源码如下。
1. private def getLocationBlockIds(blockIds: Array[BlockId]): Array[Seq [BlockManagerId]] = { 2. val startTimeMs = System.currentTimeMillis 3. val locations = master.getLocations(blockIds).toArray 4. logDebug("Got multiple block location in %s".format (Utils.getUsedTimeMs(startTimeMs))) 5. locations 6. }
getLocationBlockIds方法中根据BlockId通过master.getLocations向Master获取位置信息,因为master管理所有的位置信息。getLocations方法里的driverEndpoint是BlockManagerMasterEndpoint,Executor向BlockManagerMasterEndpoint发送GetLocationsMultipleBlockIds消息。
Spark 2.1.1版本的BlockManagerMaster.scala的getLocations方法的源码如下。
1. def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq [BlockManagerId]] = { 2. driverEndpoint.askWithRetry[IndexedSeq[Seq[BlockManagerId]]]( 3. GetLocationsMultipleBlockIds(blockIds)) 4. }
Spark 2.2.0版本的BlockManagerMaster.scala的getLocations方法的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第2行driverEndpoint.askWithRetry方法调整为driverEndpoint. askSync方法。
1. ...... 2. driverEndpoint.askSync[IndexedSeq[Seq[BlockManagerId]]]( 3. .......
getLocations中的GetLocationsMultipleBlockIds是一个case class。
1. case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster
在BlockManagerMasterEndpoint侧接收GetLocationsMultipleBlockIds消息。
BlockManagerMasterEndpoint.scala的receiveAndReply方法如下。
1. override def receiveAndReply(context: RpcCallContext): PartialFunction [Any, Unit] = { 2. ...... 3. case GetLocationsMultipleBlockIds(blockIds) => 4. context.reply(getLocationsMultipleBlockIds(blockIds))
进入getLocationsMultipleBlockIds方法,进行map操作,开始查询位置信息。
1. private def getLocationsMultipleBlockIds( 2. blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = { 3. blockIds.map(blockId => getLocations(blockId)) 4. }
进入getLocations方法,首先判断内存缓存结构blockLocations中是否包含blockId,如果已包含,就获取位置信息,否则返回空的信息。
1. private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { 2. if (blockLocations.containsKey(blockId)) blockLocations.get (blockId).toSeq else Seq.empty 3. }
其中,blockLocations是一个重要的数据结构,是一个JHashMap。Key是BlockId。Value是一个HashSet[BlockManagerId],使用HashSet。因为每个BlockId在磁盘上有副本,不同机器的位置不一样,而且不同副本对应的BlockManagerId不一样,位于不同的机器上,所以使用HashSet数据结构。
BlockManagerMasterEndpoint.scala的blockLocations的源码如下。
1. private val blockLocations = new JHashMap[BlockId, mutable.HashSet [BlockManagerId]]
回到BlockManager.scala,getLocalValues是一个重要的方法,从blockInfoManager中获取本地数据。
首先根据blockId从blockInfoManager中获取BlockInfo信息。
从BlockInfo信息获取level级别,根据level.useMemory && memoryStore.contains (blockId)判断是否在内存中,如果在内存中,就从memoryStore中获取数据。
根据level.useDisk && diskStore.contains(blockId)判断是否在磁盘中,如果在磁盘中,就从diskStore中获取数据。
Spark 2.1.1版本的BlockManager.scala的getLocalValues方法的源码如下。
1. def getLocalValues(blockId: BlockId): Option[BlockResult] = { 2. logDebug(s"Getting local block $blockId") 3. blockInfoManager.lockForReading(blockId) match { 4. case None => 5. logDebug(s"Block $blockId was not found") 6. None 7. case Some(info) => 8. val level = info.level 9. logDebug(s"Level for block $blockId is $level") 10. if (level.useMemory && memoryStore.contains(blockId)) { 11. val iter: Iterator[Any] = if (level.deserialized) { 12. memoryStore.getValues(blockId).get 13. } else { 14. serializerManager.dataDeserializeStream( 15. blockId, memoryStore.getBytes(blockId).get.toInputStream()) (info.classTag) 16. } 17. val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId)) 18. Some(new BlockResult(ci, DataReadMethod.Memory, info.size)) 19. } else if (level.useDisk && diskStore.contains(blockId)) { 20. val iterToReturn: Iterator[Any] = { 21. val diskBytes = diskStore.getBytes(blockId) 22. if (level.deserialized) { 23. val diskValues = serializerManager.dataDeserializeStream( 24. blockId, 25. diskBytes.toInputStream(dispose = true))(info.classTag) 26. maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) 27. } else { 28. val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes) 29. .map {_.toInputStream(dispose = false)} 30. .getOrElse { diskBytes.toInputStream(dispose = true) } 31. serializerManager.dataDeserializeStream(blockId, stream) (info.classTag) 32. } 33. } 34. val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId)) 35. Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) 36. } else { 37. handleLocalReadFailure(blockId) 38. } 39. } 40. }
Spark 2.2.0版本的BlockManager.scala的getLocalValues方法的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第9行之后新增taskAttemptId的创建。
上段代码中第17行releaseLock新增一个参数taskAttemptId。
上段代码中第21、25、28、30行diskBytes更新为diskData。
上段代码中第21行之后新增val iterToReturn: Iterator[Any]。
上段代码中第25行diskData.toInputStream方法删掉dispose = true参数。
上段代码中第34行CompletionIterator的第二个参数调整为releaseLockAndDispose (blockId, diskData, taskAttemptId)。
1. ....... 2. val taskAttemptId = Option(TaskContext.get()).map(_.taskAttemptId()) 3. ...... 4. //在迭代器iterator完成触发时,我们需要从一个没有TaskContext上下文的 //线程捕获taskId,参阅spark-18406讨论 5. val ci = CompletionIterator[Any, Iterator[Any]](iter, { 6. releaseLock(blockId, taskAttemptId) 7. ........ 8. val diskData = diskStore.getBytes(blockId) 9. val iterToReturn: Iterator[Any] = { 10. ........ 11. diskData.toInputStream())(info.classTag) 12. ........ 13. val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData) 14. ........ 15. .getOrElse { diskData.toInputStream() } 16. ........ 17. releaseLockAndDispose(blockId, diskData, taskAttemptId) 18. ........
回到BlockManager.scala,getRemoteValues方法从远程的BlockManager中获取block数据,在JVM中不需要去获取锁。
BlockManager.scala的getRemoteValues方法的源码如下。
1. private def getRemoteValues[T: ClassTag](blockId: BlockId): Option [BlockResult] = { 2. val ct = implicitly[ClassTag[T]] 3. getRemoteBytes(blockId).map { data => 4. val values = 5. serializerManager.dataDeserializeStream(blockId, data.toInputStream (dispose = true))(ct) 6. new BlockResult(values, DataReadMethod.Network, data.size) 7. } 8. }
getRemoteValues方法中调用getRemoteBytes,获取远程的数据,如果获取的失败次数超过最大的获取次数(locations.size),就提示失败,返回空值;如果获取到远程数据,就返回。
getRemoteBytes方法调用blockTransferService.fetchBlockSync方法实现远程获取数据。
BlockTransferService.scala的fetchBlockSync方法的源码如下。
Spark 2.1.1版本的BlockTransferService.scala的fetchBlockSync方法的源码如下。
1. def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = { 2. //线程等待的监视器 3. val result = Promise[ManagedBuffer]() 4. fetchBlocks(host, port, execId, Array(blockId), 5. new BlockFetchingListener { 6. override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = { 7. result.failure(exception) 8. } 9. override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { 10. val ret = ByteBuffer.allocate(data.size.toInt) 11. ret.put(data.nioByteBuffer()) 12. ret.flip() 13. result.success(new NioManagedBuffer(ret)) 14. } 15. }) 16. ThreadUtils.awaitResult(result.future, Duration.Inf) 17. }
Spark 2.2.0版本的BlockTransferService.scala的fetchBlockSync方法的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第15行fetchBlocks方法新增了shuffleFiles = null参数。fetchBlocks方法用于异步从远程节点获取序列块,仅在调用[init]之后可用。注意,这个API需要一个序列,可以实现批处理请求,而不是返回一个future,底层实现可以调用onBlockFetchSuccess尽快获取块的数据,而不是等待所有块被取出来。
1. ...... 2. }, shuffleFiles = null) 3. .......
fetchBlockSync中调用fetchBlocks方法,NettyBlockTransferService继承自BlockTransferService,是BlockTransferService实现子类。
Spark 2.1.1版本的NettyBlockTransferService的fetchBlocks的源码如下。
1. override def fetchBlocks( 2. host: String, 3. port: Int, 4. execId: String, 5. blockIds: Array[String], 6. listener: BlockFetchingListener): Unit = { 7. logTrace(s"Fetch blocks from $host:$port (executor id $execId)") 8. try { 9. val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter { 10. override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) { 11. val client = clientFactory.createClient(host, port) 12. new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener).start() 13. } 14. } 15. 16. val maxRetries = transportConf.maxIORetries() 17. if (maxRetries > 0) { 18. //注意,Fetcher将正确处理maxRetries等于0的情况;避免它在代码中产生Bug, //一旦确定了稳定性,就应该删除if语句 19. new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start() 20. } else { 21. blockFetchStarter.createAndStart(blockIds, listener) 22. } 23. } catch { 24. case e: Exception => 25. logError("Exception while beginning fetchBlocks", e) 26. blockIds.foreach(listener.onBlockFetchFailure(_, e)) 27. } 28. }
Spark 2.2.0版本的NettyBlockTransferService的fetchBlocks的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第6行fetchBlocks方法新增了shuffleFiles参数。
1. ....... 2. shuffleFiles: Array[File]): Unit = { 3. .......
回到BlockManager.scala,无论是doPutBytes(),还是doPutIterator()方法中,都会使用doPut方法。
BlockManager.scala的doPut方法的源码如下。
1. private def doPut[T]( 2. blockId: BlockId, 3. level: StorageLevel, 4. classTag: ClassTag[_], 5. tellMaster: Boolean, 6. keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = { 7. require(blockId != null, "BlockId is null") 8. require(level != null && level.isValid, "StorageLevel is null or invalid") 9. 10. val putBlockInfo = { 11. val newInfo = new BlockInfo(level, classTag, tellMaster) 12. if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) { 13. newInfo 14. } else { 15. logWarning(s"Block $blockId already exists on this machine; not re-adding it") 16. if (!keepReadLock) { 17. //在现有的块上lockNewBlockForWriting 返回一个读锁,所以我们必须释放它 18. releaseLock(blockId) 19. } 20. return None 21. } 22. } 23. 24. val startTimeMs = System.currentTimeMillis 25. var exceptionWasThrown: Boolean = true 26. val result: Option[T] = try { 27. val res = putBody(putBlockInfo) 28. exceptionWasThrown = false 29. ...... 30. result 31. }
doPut方法中,lockNewBlockForWriting写入一个新的块前先尝试获得适当的锁,如果我们是第一个写块,获得写入锁后继续后续操作。否则,如果另一个线程已经写入块,须等待写入完成,才能获取读取锁,调用new()函数创建一个BlockInfo赋值给putBlockInfo,然后通过putBody(putBlockInfo)将数据存入。putBody是一个匿名函数,输入BlockInfo,输出的是一个泛型Option[T]。putBody函数体内容是doPutIterator方法(doPutBytes方法也类似调用doPut)调用doPut时传入的。
BlockManager.scala的doPutIterator调用doput方法,在其putBody匿名函数体中进行判断:
如果是level.useMemory,则在memoryStore中放入数据。
如果是level.useDisk,则在diskStore中放入数据。
如果level.replication大于1,则在其他节点中存入副本数据。
其中,BlockManager.scala的replicate方法的副本复制源码如下。
Spark 2.1.1版本的BlockManager.scala的replicate方法的源码如下。
1. private def replicate( 2. blockId: BlockId, 3. data: ChunkedByteBuffer, 4. level: StorageLevel, 5. classTag: ClassTag[_]): Unit = { 6. ...... 7. while(numFailures <= maxReplicationFailures && 8. !peersForReplication.isEmpty && 9. peersReplicatedTo.size != numPeersToReplicateTo) { 10. val peer = peersForReplication.head 11. try { 12. val onePeerStartTime = System.nanoTime 13. logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer") 14. blockTransferService.uploadBlockSync( 15. peer.host, 16. peer.port, 17. peer.executorId, 18. blockId, 19. new NettyManagedBuffer(data.toNetty), 20. tLevel, 21. classTag) 22. ......
Spark 2.2.0版本的BlockManager.scala的replicate方法的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第5行replicate方法中新增了existingReplicas参数。
上段代码中第19行uploadBlockSync方法的第5个参数由NettyManagedBuffer实例调整为BlockManagerManagedBuffer实例。
1. ....... 2. existingReplicas: Set[BlockManagerId] = Set.empty): Unit = { 3. ...... 4. new BlockManagerManagedBuffer(blockInfoManager, blockId, data, false), 5. ......
replicate方法中调用了blockTransferService.uploadBlockSync方法。
BlockTransferService.scala的uploadBlockSync的源码如下。
1. def uploadBlockSync( 2. hostname: String, 3. port: Int, 4. execId: String, 5. blockId: BlockId, 6. blockData: ManagedBuffer, 7. level: StorageLevel, 8. classTag: ClassTag[_]): Unit = { 9. val future = uploadBlock(hostname, port, execId, blockId, blockData, level, classTag) 10. ThreadUtils.awaitResult(future, Duration.Inf) 11. } 12. }
uploadBlockSync中又调用uploadBlock方法,BlockTransferService.scala的uploadBlock方法无具体实现,NettyBlockTransferService是BlockTransferService的子类,具体实现uploadBlock方法。
NettyBlockTransferService的uploadBlock的源码如下。
1. override def uploadBlock( 2. hostname: String, 3. port: Int, 4. execId: String, 5. blockId: BlockId, 6. blockData: ManagedBuffer, 7. level: StorageLevel, 8. classTag: ClassTag[_]): Future[Unit] = { 9. val result = Promise[Unit]() 10. val client = clientFactory.createClient(hostname, port) 11. 12. //使用JavaSerializer序列号器将StorageLevel和ClassTag序列化。其他一切都 //用我们的二进制协议编码 13. val metadata = JavaUtils.bufferToArray(serializer.newInstance(). serialize((level, classTag))) 14. 15. //为了序列化,转换或复制NIO缓冲到数组 16. val array = JavaUtils.bufferToArray(blockData.nioByteBuffer()) 17. 18. client.sendRpc(new UploadBlock(appId, execId, blockId.toString, metadata, array).toByteBuffer, 19. new RpcResponseCallback { 20. override def onSuccess(response: ByteBuffer): Unit = { 21. logTrace(s"Successfully uploaded block $blockId") 22. result.success((): Unit) 23. } 24. override def onFailure(e: Throwable): Unit = { 25. logError(s"Error while uploading block $blockId", e) 26. result.failure(e) 27. } 28. }) 29. 30. result.future 31. }
回到BlockManager.scala,看一下dropFromMemory方法。如果存储级别定位为MEMORY_AND_DISK,那么数据可能放在内存和磁盘中,内存够的情况下不会放到磁盘上;如果内存不够,就放到磁盘上,这时就会调用dropFromMemory。如果存储级别不是定义为MEMORY_AND_DISK,而只是存储在内存中,内存不够时,缓存的数据此时就会丢弃。如果仍需要数据,那就要重新计算。
Spark 2.1.1版本的BlockManager.scala的dropFromMemory的源码如下。
1. private[storage] override def dropFromMemory[T: ClassTag]( 2. blockId: BlockId, 3. data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { 4. logInfo(s"Dropping block $blockId from memory") 5. val info = blockInfoManager.assertBlockIsLockedForWriting(blockId) 6. var blockIsUpdated = false 7. val level = info.level 8. 9. //如果存储级别要求,则保存到磁盘 10. if (level.useDisk && !diskStore.contains(blockId)) { 11. logInfo(s"Writing block $blockId to disk") 12. data() match { 13. case Left(elements) => 14. diskStore.put(blockId) { fileOutputStream => 15. serializerManager.dataSerializeStream( 16. blockId, 17. fileOutputStream, 18. elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]]) 19. } 20. case Right(bytes) => 21. diskStore.putBytes(blockId, bytes) 22. } 23. blockIsUpdated = true 24. } 25. 26. //实际由内存存储 27. val droppedMemorySize = 28. if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L 29. val blockIsRemoved = memoryStore.remove(blockId) 30. if (blockIsRemoved) { 31. blockIsUpdated = true 32. } else { 33. logWarning(s"Block $blockId could not be dropped from memory as it does not exist") 34. } 35. 36. val status = getCurrentBlockStatus(blockId, info) 37. if (info.tellMaster) { 38. reportBlockStatus(blockId, status, droppedMemorySize) 39. } 40. if (blockIsUpdated) { 41. addUpdatedBlockStatusToTaskMetrics(blockId, status) 42. } 43. status.storageLevel 44. }
Spark 2.2.0版本的BlockManager.scala的dropFromMemory的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第14行fileOutputStream名称调整为channel。
上段代码中第14行之后新增代码:val out = Channels.newOutputStream(channel)。
上段代码中第17行fileOutputStream调整为out。
1. ........ 2. diskStore.put(blockId) { channel => 3. val out = Channels.newOutputStream(channel) 4. ........ 5. out, 6. ........
总结:dropFromMemory是指在内存不够的时候,尝试释放一部分内存给要使用内存的应用,释放的这部分内存数据需考虑是丢弃,还是放到磁盘上。如果丢弃,如5000个步骤作为一个Stage,前面4000个步骤进行了Cache,Cache时可能有100万个partition分区单位,其中丢弃了100个,丢弃的100个数据就要重新计算;但是,如果设置了同时放到内存和磁盘,此时会放入磁盘中,下次如果需要,就可以从磁盘中读取数据,而不是重新计算。