7.3 Hash Based Shuffle
本节讲解Hash Based Shuffle,包括Hash Based Shuffle概述、Hash Based Shuffle内核、Hash Based Shuffle的数据读写的源码解析等内容。
7.3.1 概述
在Spark 1.1之前,Spark中只实现了一种Shuffle方式,即基于Hash的Shuffle。在Spark 1.1版本中引入了基于Sort的Shuffle实现方式,并且在Spark 1.2版本之后,默认的实现方式从基于Hash的Shuffle,修改为基于Sort的Shuffle实现方式,即使用的ShuffleManager从默认的hash修改为sort。说明在Spark 2.0版本中,Hash的Shuffle方式已经不再使用。
Spark之所以一开始就提供基于Hash的Shuffle实现机制,其主要目的之一就是为了避免不需要的排序(这也是Hadoop Map Reduce被人诟病的地方,将Sort作为固定步骤,导致许多不必要的开销)。但基于Hash的Shuffle实现机制在处理超大规模数据集的时候,由于过程中会产生大量的文件,导致过度的磁盘I/O开销和内存开销,会极大地影响性能。
但在一些特定的应用场景下,采用基于Hash的实现Shuffle机制的性能会超过基于Sort的Shuffle实现机制。关于基于Hash与基于Sort的Shuffle实现机制的性能测试方面,可以参考Spark创始人之一的ReynoldXin给的测试:“sort-basedshuffle has lower memory usage and seems to outperformhash-based in almost allof our testing”。
相关数据可以参考https://issues.apache.org/jira/browse/SPARK-3280。
因此,在Spark 1.2版本中修改为默认基于Sort的Shuffle实现机制时,同时也给出了特定应用场景下回退的机制。
7.3.2 Hash Based Shuffle内核
1.基于Hash的Shuffle实现机制的内核框架
基于Hash的Shuffle实现,ShuffleManager的具体实现子类为HashShuffleManager,对应的具体实现机制如图7-3所示。
图7-3 基于哈希算法的Shuffle实现机制的内核框架
其中,HashShuffleManager是ShuffleManager的基于哈希算法实现方式的具体实现子类。数据块的读写分别由BlockStoreShuffleReader与HashShuffleWriter实现;数据块的文件解析器则由具体子类FileShuffleBlockResolver实现;BaseShuffleHandle是ShuffleHandle接口的基本实现,保存Shuffle注册的信息。
HashShuffleManager继承自ShuffleManager,对应实现了各个抽象接口。基于Hash的Shuffle,内部使用的各组件的具体子类如下所示。
(1)BaseShuffleHandle:携带了Shuffle最基本的元数据信息,包括shuffleId、numMaps和dependency。
(2)BlockStoreShuffleReader:负责写入的Shuffle数据块的读操作。
(3)FileShuffleBlockResolver:负责管理,为Shuffle任务分配基于磁盘的块数据的Writer。每个ShuffleShuffle任务为每个Reduce分配一个文件。
(4)HashShuffleWriter:负责Shuffle数据块的写操作。
在此与解析整个Shuffle过程一样,以HashShuffleManager类作为入口进行解析。
首先看一下HashShuffleManager具体子类的注释,如下所示。
Spark 1.6.0版本的HashShuffleManager.scala的源码(Spark 2.2版本已无HashShuffleManager方式)如下。
1. /** *使用Hash的ShuffleManager具体实现子类,针对每个Mapper都会为各个Reduce分 *区构建一个输出文件(也可能是多个任务复用文件) 2. */ 3. private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { 4. ......
2.基于Hash的Shuffle实现方式一
为了避免Hadoop中基于Sort方式的Shuffle所带来的不必要的排序开销,Spark在开始时采用了基于Hash的Shuffle方式。但这种方式存在不少缺陷,这些缺陷大部分是由于在基于Hash的Shuffle实现过程中创建了太多的文件所造成的。在这种方式下,每个Mapper端的Task运行时都会为每个Reduce端的Task生成一个文件,具体如图7-4所示。
图7-4 基于Hash的Shuffle实现方式——文件的输出细节图
Executor-Mapper表示执行Mapper端的Tasks的工作点,可以分布到集群中的多台机器节点上,并且可以以不同的形式出现,如以Spark Standalone部署模式中的Executor出现,也可以以Spark On Yarn部署模式中的容器形式出现,关键是它代表了实际执行Mapper端的Tasks的工作点的抽象概念。其中,M表示Mapper端的Task的个数,R表示Reduce端的Task的个数。
对应在右侧的本地文件系统是在该工作点上所生成的文件,其中R表示Reduce端的分区个数。生成的文件名格式为:shuffle_shuffleId_mapId_reduceId,其中的shuffle_shuffleId_1_1表示mapId为1,同时reduceId也为1。
在Mapper端,每个分区对应启动一个Task,而每个Task会为每个Reducer端的Task生成一个文件,因此最终生成的文件个数为M×R。
由于这种实现方式下,对应生成文件个数仅与Mapper端和Reducer端各自的分区数有关,因此图中将Mapper端的全部M个Task抽象到一个Executor-Mapper中,实际场景中通常是分布到集群中的各个工作点中。
生成的各个文件位于本地文件系统的指定目录中,该目录地址由配置属性spark.local.dir设置。说明:分区数与Task数,一个是静态的数据分块个数,一个是数据分块对应执行的动态任务个数,因此,在特定的、描述个数的场景下,两者是一样的。
3.基于Hash的Shuffle实现方式二
为了减少Hash所生成的文件个数,对基于Hash的Shuffle实现方式进行了优化,引入文件合并的机制,该机制设置的开关为配置属性spark.shuffle.consolidateFiles。在引入文件合并的机制后,当设置配置属性为true,即启动文件合并时,在Mapper端的输出文件会进行合并,在一定程度上可以大量减少文件的生成,降低不必要的开销。文件合并的实现方式可以参考图7-5。
图7-5 基于Hash的Shuffle的合并文件机制的输出细节图
Executor-Mapper表示集群中分配的某个工作点,其中,C表示在该工作点上所分配到的内核(Core)个数,T表示在该工作点上为每个Task分配的内核个数。C/T表示在该工作点上调度时最大的Task并行个数。
右侧的本地文件系统是在该工作点上所生成的文件,其中R表示Reduce端的分区个数。生成的文件名格式为:merged_shuffle_shuffleId_bucketId_fileId,其中的merged_shuffle_ shuffleId_1_1表示bucketId为1,同时fileId也为1。
在Mapper端,Task会复用文件组,由于最大并行个数为C/T,因此文件组最多分配C/T个,当某个Task运行结束后,会释放该文件组,之后调度的Task则复用前一个Task所释放的文件组,因此会复用同一个文件。最终在该工作点上生成的文件总数为C/T*R,如果设工作点个数为E,则总的文件数为E*C/T*R。
4.基于Hash的Shuffle机制的优缺点
1)优点
可以省略不必要的排序开销。
避免了排序所需的内存开销。
2)缺点
生成的文件过多,会对文件系统造成压力。
大量小文件的随机读写会带来一定的磁盘开销。
数据块写入时所需的缓存空间也会随之增加,会对内存造成压力。
7.3.3 Hash Based Shuffle数据读写的源码解析
1.基于Hash的Shuffle实现方式一的源码解析
下面针对Spark 1.6版本中的基于Hash的Shuffle实现在数据写方面进行源码解析(Spark2.0版本中已无Hash的Shuffle实现方式)。在基于Hash的Shuffle实现机制中,采用HashShuffleWriter作为数据写入器。在HashShuffleWriter中控制Shuffle写数据的关键代码如下所示。
Spark 1.6.0版本的HashShuffleWriter.scala的源码(Spark 2.2版本已无HashShuffle-Manager方式)如下。
1. private[spark] class HashShuffleWriter[K, V]( 2. shuffleBlockResolver: FileShuffleBlockResolver, 3. handle: BaseShuffleHandle[K, V, _], 4. mapId: Int, 5. context: TaskContext) 6. extends ShuffleWriter[K, V] with Logging { 7. 8. //控制每个Writer输出时的切片个数,对应分区个数 9. private val dep = handle.dependency 10. private val numOutputSplits = dep.partitioner.numPartitions 11. 12. ...... 13. //获取数据读写的块管理器 14. private val blockManager = SparkEnv.get.blockManager 15. private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null)) 16. 17. //从 FileShuffleBlockResolver 的 forMapTask方法中获取指定的 shuffleId 对应 //的mapId 18. //对应分区个数构建的数据块写的ShuffleWriterGroup实例 19. private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser, writeMetrics) 20. 21. 22. /** Task输出时一组记录的写入 */ 23. 24. override def write(records: Iterator[Product2[K, V]]): Unit = { 25. //判断在写时是否需要先聚合,即定义了Map端Combine时,先对数据进行聚合再写入,否则 //直接返回需要写入的一批记录 26. 27. val iter = if (dep.aggregator.isDefined) { 28. if (dep.mapSideCombine) { 29. dep.aggregator.get.combineValuesByKey(records, context) 30. } else { 31. records 32. } 33. } else { 34. require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") 35. records 36. } 37. 38. //根据分区器,获取每条记录对应的bucketId(即所在Reduce序号),根据bucketId //从FileShuffleBlockResolver构建的ShuffleWriterGroup中,获取DiskBlock- //ObjectWriter实例,对应磁盘数据块的数据写入器 39. for (elem<- iter) { 40. val bucketId = dep.partitioner.getPartition(elem._1) 41. shuffle.writers(bucketId).write(elem._1, elem._2) 42. } 43. } 44. ...... 45. }
当需要在Map端进行聚合时,使用的是聚合器(Aggregator)的combineValuesByKey方法,在该方法中使用ExternalAppendOnlyMap类对记录集进行处理,处理时如果内存不足,会引发Spill操作。早期的实现会直接缓存到内存,在数据量比较大时容易引发内存泄漏。
在HashShuffleManager中,ShuffleBlockResolver特质使用的具体子类为FileShuffleBlock-Resolver,即指定了具体如何从一个逻辑Shuffle块标识信息来获取一个块数据,对应为下面第7行调用的forMapTask方法,具体代码如下所示:
Spark 1.6.0版本的FileShuffleBlockResolver.scala的源码(Spark 2.2版本已无HashShuffleManager方式)如下。
1. /** 2. *针对给定的 Map Task,指定一个ShuffleWriterGroup实例,在数据块写入器成功 3. *关闭时,会注册为完成状态 4. */ 5. 6. 7. def forMapTask(shuffleId: Int, mapId: Int, numReduces: Int, serializer: Serializer, 8. writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = { 9. new ShuffleWriterGroup { 10. //在FileShuffleBlockResolver中维护着当前Map Task对应shuffleId标识的 //Shuffle中,指定numReduces个数的Reduce的各个状态 11. 12. shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReduces)) 13. private val shuffleState = shuffleStates(shuffleId) 14. 15. ...... 16. //根据Reduce端的任务个数,构建元素类型为DiskBlockObjectWriter的数组, //DiskBlockObjectWriter负责具体数据的磁盘写入 17. //原则上,Shuffle的输出可以存放在各种提供存储机制的系统上,但为了容错性等方面的 //考虑,目前的Shuffle实行机制都会写入到磁盘中 18. 19. val writers: Array[DiskBlockObjectWriter] = { 20. //这里的逻辑Bucket的Id值即对应的Reduce的任务序号,或者说分区ID 21. Array.tabulate[DiskBlockObjectWriter](numReduces) { bucketId => 22. //针对每个Map端分区的Id与Bucket的Id构建数据块的逻辑标识 23. val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) 24. val blockFile = blockManager.diskBlockManager.getFile(blockId) 25. val tmp = Utils.tempFileWith(blockFile) 26.blockManager.getDiskWriter(blockId, tmp, serializerInstance, bufferSize, writeMetrics) 27. } 28. } 29. ...... 30. //任务完成时回调的释放写入器方法 31. override def releaseWriters(success: Boolean) { 32. shuffleState.completedMapTasks.add(mapId) 33. } 34. } 35. }
其中,ShuffleBlockId实例构建的源码如下所示。
1. case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { 2. override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId 3. }
从name方法的重载上可以看出,后续构建的文件与代码中的mapId、reduceId的关系。当然,所有同一个Shuffle的输出数据块,都会带上shuffleId这个唯一标识的,因此全局角度上,逻辑数据块name不会重复(针对一些推测机制或失败重试机制之类的场景而已,逻辑name没有带上时间信息,因此缺少多次执行的输出区别,但在管理这些信息时会维护一个时间作为有效性判断)。
2.基于Hash的Shuffle实现方式二的源码解析
下面通过详细解析FileShuffleBlockResolver源码来加深对文件合并机制的理解。
由于在Spark 1.6中,文件合并机制已经删除,因此下面基于Spark 1.5版本的代码对文件合并机制的具体实现细节进行解析。以下代码位于FileShuffleBlockResolver类中。
合并机制的关键控制代码如下所示。
Spark 1.5.0版本的FileShuffleBlockResolver.scala的源码(Spark 2.2版本已无HashShuffleManager方式)如下。
1. /** 2. *获取一个针对特定Map Task的ShuffleWriterGroup 3. */ 4. 5. 6. def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer, 7. writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = { 8. new ShuffleWriterGroup { 9. ...... 10. val writers: Array[DiskBlockObjectWriter] = if (consolidateShuffleFiles) { 11. //获取未使用的文件组 12. fileGroup = getUnusedFileGroup() 13. Array.tabulate[DiskBlockObjectWriter](numBuckets) { bucketId => 14. val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) 15. //注意获取磁盘写入器时,传入的第二个参数与未使用文件合并机制时的差异 16. //fileGroup(bucketId):构造器方式调用,对应apply的方法调用 17.blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializerInstance, bufferSize, 18. writeMetrics) 19. } 20. } else { 21. Array.tabulate[DiskBlockObjectWriter](numBuckets) { bucketId => 22. val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) 23. //根据ShuffleBlockId信息获取文件名 24. val blockFile = blockManager.diskBlockManager.getFile(blockId) 25. val tmp = Utils.tempFileWith(blockFile) 26.blockManager.getDiskWriter(blockId, tmp, serializerInstance, bufferSize, writeMetrics) 27. } 28. } 29. ...... 30. writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime) 31. override def releaseWriters(success: Boolean) { 32. //带文件合并机制时,写入器在释放后的处理 33. //3个关键信息mapId、offsets、lengths 34. if (consolidateShuffleFiles) { 35. if (success) { 36. val offsets = writers.map(_.fileSegment().offset) 37. val lengths = writers.map(_.fileSegment().length) 38. fileGroup.recordMapOutput(mapId, offsets, lengths) 39. } 40. //回收文件组,便于后续复用 41. recycleFileGroup(fileGroup) 42. } else { 43. shuffleState.completedMapTasks.add(mapId) 44. } 45. }
其中,第10行中的consolidateShuffleFiles变量,是判断是否设置了文件合并机制,当设置consolidateShuffleFiles为true后,会继续调用getUnusedFileGroup方法,在该方法中会获取未使用的文件组,即重新分配或已经释放可以复用的文件组。
获取未使用的文件组(ShuffleFileGroup)的相关代码getUnusedFileGroup如下所示。
Spark 1.5.0版本的FileShuffleBlockResolver.scala的源码(Spark 2.2版本已无HashShuffleManager方式)如下。
1. private def getUnusedFileGroup(): ShuffleFileGroup = { 2. //获取已经构建但未使用的文件组,如果获取失败,则重新构建一个文件组 3. val fileGroup = shuffleState.unusedFileGroups.poll() 4. if (fileGroup != null) fileGroup else newFileGroup() 5. } 6. //重新构建一个文件组的源码 7. private def newFileGroup(): ShuffleFileGroup = { 8. //构建后会对文件编号进行递增,该文件编号最终用在生成的文件名中 9. val fileId = shuffleState.nextFileId.getAndIncrement() 10. val files = Array.tabulate[File](numBuckets) { bucketId => 11. //最终的文件名,可以通过文件名的组成及取值细节,加深对实现细节在文件个数上的差异的理解 12. 13. val filename = physicalFileName(shuffleId, bucketId, fileId) 14. blockManager.diskBlockManager.getFile(filename) 15. } 16. //构建并添加到shuffleState中,便于后续复用 17. val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files) 18. shuffleState.allFileGroups.add(fileGroup) 19. fileGroup 20. }
其中,第13行代码对应生成的文件名,即物理文件名,相关代码如下所示。
Spark 1.5.0版本的FileShuffleBlockResolver.scala的源码(Spark 2.2版本已无HashShuffleManager方式)如下。
1. private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = { 2. "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId) 3. }
可以看到,与未使用文件合并时的基于Hash的Shuffle实现方式不同的是,在生成的文件名中没有对应的mapId,取而代之的是与文件组相关的fileId,而fileId则是多个Mapper端的Task所共用的,在此仅从生成的物理文件名中也可以看出文件合并的某些实现细节。
另外,对应生成的文件组既然是复用的,当一个Mapper端的Task执行结束后,便会释放该文件组(ShuffleFileGroup),之后继续调度时便会复用该文件组。对应地,调度到某个Executor工作点上同时运行的Task最大个数,就对应了最多分配的文件组个数。
而在TaskSchedulerImpl调度Task时,各个Executor工作点上Task调度控制的源码说明了在各个Executor工作点上调度并行的Task数,具体代码如下所示。
1. private def resourceOfferSingleTaskSet( 2. taskSet: TaskSetManager, 3. maxLocality: TaskLocality, 4. shuffledOffers: Seq[WorkerOffer], 5. availableCpus: Array[Int], 6. tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = { 7. var launchedTask = false 8. for (i <- 0 until shuffledOffers.size) { 9. val execId = shuffledOffers(i).executorId 10. val host = shuffledOffers(i).host 11. //判断当前Executor工作点上可用的内核个数是否满足Task所需的内核个数 12. //CPUS_PER_TASK:表示设置的每个Task所需的内核个数 13. if (availableCpus(i) >= CPUS_PER_TASK) { 14. try { 15. for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { 16. ...... 17. launchedTask = true 18. } 19. } catch { 20. ...... 21. } 22. } 23. } 24. return launchedTask 25. }
其中,设置每个Task所需的内核个数的配置属性如下所示:
1. //每个任务请求的CPU个数 2. val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
对于这些会影响Executor中并行执行的任务数的配置信息,设置时需要多方面考虑,包括内核个数与任务个数的合适比例,在内存模型中,为任务分配内存的具体策略等。任务分配内存的具体策略可以参考Spark官方给出的具体设计文档,以及文档中各种设计方式的权衡等内容。