3.2 RDD弹性特性七个方面解析
RDD作为弹性分布式数据集,它的弹性具体体现在以下七个方面。
1.自动进行内存和磁盘数据存储的切换
Spark会优先把数据放到内存中,如果内存实在放不下,会放到磁盘里面,不但能计算内存放下的数据,也能计算内存放不下的数据。如果实际数据大于内存,则要考虑数据放置策略和优化算法。当应用程序内存不足时,Spark应用程序将数据自动从内存存储切换到磁盘存储,以保障其高效运行。
2.基于Lineage(血统)的高效容错机制
Lineage是基于Spark RDD的依赖关系来完成的(依赖分为窄依赖和宽依赖两种形态),每个操作只关联其父操作,各个分片的数据之间互不影响,出现错误时只要恢复单个Split的特定部分即可。常规容错有两种方式:一个是数据检查点;另一个是记录数据的更新。数据检查点的基本工作方式,就是通过数据中心的网络链接不同的机器,然后每次操作的时候都要复制数据集,就相当于每次都有一个复制,复制是要通过网络传输的,网络带宽就是分布式的瓶颈,对存储资源也是很大的消耗。记录数据更新就是每次数据变化了就记录一下,这种方式不需要重新复制一份数据,但是比较复杂,消耗性能。Spark的RDD通过记录数据更新的方式为何很高效?因为① RDD是不可变的且Lazy;② RDD的写操作是粗粒度的。但是,RDD读操作既可以是粗粒度的,也可以是细粒度的。
3.Task如果失败,会自动进行特定次数的重试
默认重试次数为4次。TaskSchedulerImpl的源码如下所示。
Spark 2.1.1版本的TaskSchedulerImpl.scala的源码如下。
1. private[spark] class TaskSchedulerImpl( 2. val sc: SparkContext, 3. val maxTaskFailures: Int, 4. isLocal: Boolean = false) 5. extends TaskScheduler with Logging 6. { 7. def this(sc: SparkContext) = this(sc, sc.conf.get(config.MAX_TASK_FAILURES)) 8. 9. config\package.scala 10. ...... 11. private[spark] val MAX_TASK_FAILURES = 12. ConfigBuilder("spark.task.maxFailures") 13. .intConf 14. .createWithDefault(4)
Spark 2.2.0版本的TaskSchedulerImpl.scala的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第1行增加了类TaskSchedulerImpl的访问权限限制,限于在[scheduler]包内访问。
上段代码中第3行之后增加了黑名单列表跟踪变量,用于跟踪问题executors和nodes节点。
上段代码中第5行之后新增了导入TaskSchedulerImpl._的所有内容。
上段代码中第7行this构造函数中新增了maybeCreateBlacklistTracker参数。
新增了一个带sc、maxTaskFailures、isLocal参数的this构造函数。
1. private[spark] class TaskSchedulerImpl private[scheduler]( 2. ....... 3. private[scheduler] val blacklistTrackerOpt: Option[BlacklistTracker], 4. isLocal: Boolean = false) 5. extends TaskScheduler with Logging { 6. 7. import TaskSchedulerImpl._ 8. 9. def this(sc: SparkContext) = { 10. this( 11. ...... 12. TaskSchedulerImpl.maybeCreateBlacklistTracker(sc)) 13. } 14. 15. def this(sc: SparkContext, maxTaskFailures: Int, isLocal: Boolean) = { 16. this( 17. sc, 18. maxTaskFailures, 19. TaskSchedulerImpl.maybeCreateBlacklistTracker(sc), 20. isLocal = isLocal) 21. } 22. .....
TaskSchedulerImpl是底层的任务调度接口TaskScheduler的实现,这些Schedulers从每一个Stage中的DAGScheduler中获取TaskSet,运行它们,尝试是否有故障。DAGScheduler是高层调度,它计算每个Job的Stage的DAG,然后提交Stage,用TaskSets的形式启动底层TaskScheduler调度在集群中运行。
4.Stage如果失败,会自动进行特定次数的重试
这样,Stage对象可以跟踪多个StageInfo(存储SparkListeners监听到的Stage的信息,将Stage信息传递给Listeners或web UI)。默认重试次数为4次,且可以直接运行计算失败的阶段,只计算失败的数据分片,Stage的源码如下所示。
Spark 2.1.1版本的Stage.scala的源码如下。
1. private[scheduler] abstract class Stage( 2. val id: Int, 3. val rdd: RDD[_], 4. val numTasks: Int, 5. val parents: List[Stage], 6. val firstJobId: Int, 7. val callSite: CallSite) 8. extends Logging { 9. //partition的个数 10. val numPartitions = rdd.partitions.length 11. 12. /** 属于这个工作集的Stage */ 13. val jobIds = new HashSet[Int] 14. 15. val pendingPartitions = new HashSet[Int] 16. 17. /** 用于此Stage的下一个新attempt 的标识ID */ 18. private var nextAttemptId: Int = 0 19. 20. val name: String = callSite.shortForm 21. val details: String = callSite.longForm 22. 23. /** *最新的[StageInfo] object指针,需要被初始化, *任何attempts都是被创造出来的,因为DAGScheduler使用 StageInfo *告诉SparkListeners工作何时开始(即发生前的任何阶段已经创建) 24. */ 25. private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId) 26. 27. /** *设置stage attempt IDs 当失败时可以读取失败信息, *跟踪这些失败,为了避免无休止地重复失败 *跟踪每一次 attempt,以便避免记录重复故障 *如果从同一stage创建多任务失败(spark-5945) 28. */ 29. private val fetchFailedAttemptIds = new HashSet[Int] 30. 31. private[scheduler] def clearFailures() : Unit = { 32. fetchFailedAttemptIds.clear() 33. } 34. 35. /** * 检查是否应该中止由于连续多次读取失败的stage * 如果失败的次数超过允许的次数,此方法更新失败stage attempts 和返回的运行集 36. */ 37. private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean = { 38. fetchFailedAttemptIds.add(stageAttemptId) 39. fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES 40. } 41. 42. /** 在stage 中创建一个新的 attempt */ 43. def makeNewStageAttempt( 44. numPartitionsToCompute: Int, 45. taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = { 46. val metrics = new TaskMetrics 47. metrics.register(rdd.sparkContext) 48. _latestInfo = StageInfo.fromStage( 49. this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences) 50. nextAttemptId += 1 51. } 52. 53. /** 返回当前stage中最新的 StageInfo */ 54. def latestInfo: StageInfo = _latestInfo 55. 56. override final def hashCode(): Int = id 57. 58. override final def equals(other: Any): Boolean = other match { 59. case stage: Stage => stage != null && stage.id == id 60. case _ => false 61. } 62. 63. /**返回需要重新计算的分区标识的序列*/ 64. def findMissingPartitions(): Seq[Int] 65. } 66. 67. private[scheduler] object Stage { 68. //允许一个stage中止的连续故障数 69. val MAX_CONSECUTIVE_FETCH_FAILURES = 4 70. }
Spark 2.2.0版本的Stage.scala的源码与Spark 2.1.1版本的Stage.scala的源码相比具有如下特点。
上段代码中第15行删除pendingPartitions变量。
上段代码中第37~40行删除failedOnFetchAndShouldAbort方法。
上段代码中第67~70行删除Stage的object Stage对象,去掉了val MAX_CONSECUTIVE_FETCH_FAILURES = 4的变量。
在Stage终止之前允许的Stage连续尝试的次数为4次,重试次数参数从Spark 2.1.1版本的Stage.scala的源码移到了Spark 2.2.0版本的DAGScheduler.scala的源码object DAGScheduler中进行定义。
1. /** *在终止之前允许的连续尝试的次数 2. */ 3. 4. private[scheduler] val maxConsecutiveStageAttempts = 5. sc.getConf.getInt("spark.stage.maxConsecutiveAttempts", 6. DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS) 7. ...... 8. 9. private[spark] object DAGScheduler { 10. //在毫秒级别,等待读取失败事件后就停止(在下一个检测到来之前);这是一个避免重新提 //交任务的简单方法,非读取数据的map中更多失败事件的到来 11. val RESUBMIT_TIMEOUT = 200 12. 13. //在终止之前允许连续尝试的次数 14. val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4 15. }
Stage是Spark Job运行时具有相同逻辑功能和并行计算任务的一个基本单元。Stage中所有的任务都依赖同样的Shuffle,每个DAG任务通过DAGScheduler在Stage的边界处发生Shuffle形成Stage,然后DAGScheduler运行这些阶段的拓扑顺序。每个Stage都可能是ShuffleMapStage,如果是ShuffleMapStage,则跟踪每个输出节点(nodes)上的输出文件分区,它的任务结果是输入其他的Stage(s),或者输入一个ResultStage,若输入一个ResultStage,这个ResultStage的任务直接在这个RDD上运行计算这个Spark Action的函数(如count()、 save()等),并生成shuffleDep等字段描述Stage和生成变量,如outputLocs和numAvailableOutputs,为跟踪map输出做准备。每个Stage会有firstjobid,确定第一个提交Stage的Job,使用FIFO调度时,会使得其前面的Job先行计算或快速恢复(失败时)。
ShuffleMapStage是DAG产生数据进行Shuffle的中间阶段,它发生在每次Shuffle操作之前,可能包含多个Pipelined操作,ResultStage阶段捕获函数在RDD的分区上运行Action算子计算结果,有些Stage不是运行在RDD的所有的分区上,例如,first()、lookup()等。SparkListener是Spark调度器的事件监听接口。注意,这个接口随着Spark版本的不同会发生变化。
5.checkpoint和persist(检查点和持久化),可主动或被动触发
checkpoint是对RDD进行的标记,会产生一系列的文件,且所有父依赖都会被删除,是整个依赖(Lineage)的终点。checkpoint也是Lazy级别的。persist后RDD工作时每个工作节点都会把计算的分片结果保存在内存或磁盘中,下一次如果对相同的RDD进行其他的Action计算,就可以重用。
因为用户只与Driver Program交互,因此只能用RDD中的cache()方法去cache用户能看到的RDD。所谓能看到,是指经过Transformation算子处理后生成的RDD,而某些在Transformation算子中Spark自己生成的RDD是不能被用户直接cache的。例如,reduceByKey()中会生成的ShuffleRDD、MapPartitionsRDD是不能被用户直接cache的。在Driver Program中设定RDD.cache()后,系统怎样进行cache?首先,在计算RDD的Partition之前就去判断Partition要不要被cache,如果要被cache,先将Partition计算出来,然后cache到内存。cache可使用memory,如果写到HDFS磁盘的话,就要检查checkpoint。调用RDD.cache()后,RDD就变成persistRDD了,其StorageLevel为MEMORY_ONLY,persistRDD会告知Driver说自己是需要被persist的。此时会调用RDD.iterator()。 RDD.scala的iterator()的源码如下。
1. /** * RDD的内部方法,将从合适的缓存中读取,否则计算它 * 这不应该被用户直接使用,但可用于实现自定义的子RDD 2. */ 3. 4. 5. final def iterator(split: Partition, context: TaskContext): Iterator[T] = { 6. if (storageLevel != StorageLevel.NONE) { 7. getOrCompute(split, context) 8. } else { 9. computeOrReadCheckpoint(split, context) 10. } 11. }
当RDD.iterator()被调用的时候,也就是要计算该RDD中某个Partition的时候,会先去cacheManager那里获取一个blockId,然后去BlockManager里匹配该Partition是否被checkpoint了,如果是,那就不用计算该Partition了,直接从checkpoint中读取该Partition的所有records放入ArrayBuffer里面。如果没有被checkpoint过,先将Partition计算出来,然后将其所有records放到cache中。总体来说,当RDD会被重复使用(不能太大)时,RDD需要cache。Spark自动监控每个节点缓存的使用情况,利用最近最少使用原则删除老旧的数据。如果想手动删除RDD,可以使用RDD.unpersist()方法。
此外,可以利用不同的存储级别存储每一个被持久化的RDD。例如,它允许持久化集合到磁盘上,将集合作为序列化的Java对象持久化到内存中、在节点间复制集合或者存储集合到Alluxio中。可以通过传递一个StorageLevel对象给persist()方法设置这些存储级别。cache()方法使用默认的存储级别-StorageLevel.MEMORY_ONLY。RDD根据useDisk、useMemory、 useOffHeap、deserialized、replication 5个参数的组合提供了常用的12种基本存储,完整的存储级别介绍如下。Spark 1.6.0版本的StorageLevel.scala的源码如下。
1. val NONE = new StorageLevel(false, false, false, false) 2. val DISK_ONLY = new StorageLevel(true, false, false, false) 3. val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) 4. val MEMORY_ONLY = new StorageLevel(false, true, false, true) 5. val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) 6. val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) 7. val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) 8. val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) 9. val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) 10. val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) 11. val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 12. //堆外存储 13. val OFF_HEAP = new StorageLevel(false, false, true, false)
Spark 2.2.0版本的Stage.scala的源码与Spark 1.6.0版本相比具有如下特点。
上段代码中第13行堆外存储OFF_HEAP显式指定副本的参数值为1。
OFF_HEAP = new StorageLevel(true, true, true, false, 1)
1. ...... 2. val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
StorageLevel是控制存储RDD的标志,每个StorageLevel记录RDD是否使用memory,或使用ExternalBlockStore存储,如果RDD脱离了memory或ExternalBlockStore,是否扔掉RDD,是否保留数据在内存中的序列化格式,以及是否复制多个节点的RDD分区。另外,org.apache.spark.storage.StorageLevel是单实例(singleton)对象,包含了一些静态常量和常用的存储级别,且可用singleton对象工厂方法StorageLevel(...)创建定制化的存储级别。
Spark的多个存储级别意味着在内存利用率和CPU利用率间的不同权衡。推荐通过下面的过程选择一个合适的存储级别:①如果RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。因为这是CPU利用率最高的选项,会使RDD上的操作尽可能地快。②如果不适合用默认级别,就选择MEMORY_ONLY_SER。选择一个更快的序列化库提高对象的空间使用率,但是仍能够相当快地访问。③除非算子计算RDD花费较大或者需要过滤大量的数据,不要将RDD存储到磁盘上,否则重复计算一个分区,就会和从磁盘上读取数据一样慢。④如果希望更快地恢复错误,可以利用replicated存储机制,所有的存储级别都可以通过replicated计算丢失的数据来支持完整的容错。另外,replicated的数据能在RDD上继续运行任务,而不需要重复计算丢失的数据。在拥有大量内存的环境中或者多应用程序的环境中,Off_Heap(将对象从堆中脱离出来序列化,然后存储在一大块内存中,这就像它存储到磁盘上一样,但它仍然在RAM内存中。Off_Heap对象在这种状态下不能直接使用,须进行序列化及反序列化。序列化和反序列化可能会影响性能,Off_Heap堆外内存不需要进行GC)。Off_Heap具有如下优势:Off_Heap运行多个执行者共享的Alluxio中相同的内存池,显著地减少GC。如果单个的Executor崩溃,缓存的数据也不会丢失。
6.数据调度弹性,DAGScheduler、TASKScheduler和资源管理无关
Spark将执行模型抽象为通用的有向无环图计划(DAG),这可以将多Stage的任务串联或并行执行,从而不需要将Stage中间结果输出到HDFS中,当发生节点运行故障时,可有其他可用节点代替该故障节点运行。
7.数据分片的高度弹性(coalesce)
Spark进行数据分片时,默认将数据放在内存中,如果内存放不下,一部分会放在磁盘上进行保存。
RDD.scala的coalesce算子代码如下:
1. def coalesce(numPartitions: Int, shuffle: Boolean = false, 2. partitionCoalescer: Option[PartitionCoalescer] = Option.empty) 3. (implicit ord: Ordering[T] = null) 4. : RDD[T] = withScope { 5. require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") 6. if (shuffle) { 7. /**从随机分区开始,将元素均匀分布在输出分区上*/ 8. val distributePartition = (index: Int, items: Iterator[T]) => { 9. var position = (new Random(index)).nextInt(numPartitions) 10. items.map { t => 11. //注:Key的哈希码是Key本身,HashPartitioner分区器将它与总分区数进行 //取模运算 12. 13. position = position + 1 14. (position, t) 15. } 16. } : Iterator[(Int, T)] 17. 18. //包括一个shuffle 步骤,使我们的上游任务仍然是分布式的 19. new CoalescedRDD( 20. new ShuffledRDD[Int, T, T](mapPartitionsWithIndex (distributePartition), 21. new HashPartitioner(numPartitions)), 22. numPartitions, 23. partitionCoalescer).values 24. } else { 25. new CoalescedRDD(this, numPartitions, partitionCoalescer) 26. } 27. }
例如,在计算的过程中,会产生很多的数据碎片,这时产生一个Partition可能会非常小,如果一个Partition非常小,每次都会消耗一个线程去处理,这时可能会降低它的处理效率,需要考虑把许多小的Partition合并成一个较大的Partition去处理,这样会提高效率。另外,有可能内存不是那么多,而每个Partition的数据Block比较大,这时需要考虑把Partition变成更小的数据分片,这样让Spark处理更多的批次,但是不会出现OOM。