Spark大数据商业实战三部曲:内核解密|商业案例|性能调优
上QQ阅读APP看书,第一时间看更新

10.2 Spark中Accumulator原理和源码详解

本节讲解Spark中Accumulator原理及对Spark中Accumulator源码进行详解。

10.2.1 Spark中Accumulator原理详解

Spark的Broadcast和Accumulator很重要,在实际的企业级开发环境中一般会使用Broadcast和Accumulator。Broadcast、Accumulator和RDD是Spark中并列的三大基础数据结构。大家谈Spark的时候,首先谈RDD。RDD是一个并行的数据,关注在JVM中怎么处理数据。很多时候可能忽略了Broadcast和Accumulator,这两个变量都是全局级别的。例如,集群中有1000台机器,那Broadcast和Accumulator可以在1000台机器中共享。在分布式的基础上,如果有共享的数据结构,那是非常有用的。

分布式大数据系统中,进行编程的时候首先考虑数据结构。

 RDD:分布式私有数据结构。RDD本身是一个并行化的、本地化的数据结构,运行时在一个个线程中运行,RDD是私有的运行数据和私有的运行过程,但在一个Stage里面是一样的,一个线程一个时刻只处理一个数据分片,另一个线程一个时刻只处理另一个数据片。在设计业务逻辑的时候,我们通常考虑这个分片如何去处理。

 Broadcast:分布式全局只读数据结构。

 Accumulator:分布式全局只写的数据结构。我们不会在线程池中读取Accumulator,但在Driver上可以读取Accumulator。

在生产环境下,我们一定会自定义Accumulator。

(1)自定义时可以让Accumulator非常复杂,基本上可以是任意类型的Java和Scala对象。

(2)自定义Accumulator时,可以实现一些“技术福利”。例如,在Accumulator变化的时候可以把数据同步到MySQL中。我们在进行流处理的时候,数据不断地流进来,如要查询用户点击量的趋势图,计算点击量以后须实时反馈到生产环境的server上。一个非常简单的实现方式是:每次发现累加的时候,就更新一下数据库,这是一个非常强大的同步机制和同步效果。

10.2.2 Spark中Accumulator源码详解

Accumulator是一个简单的value值[Accumulable],相同类型的元素合并时结果可以累加,通过added到关联和交换操作,可以有效地支持并行,可以用来实现计数(如MapReduce)或求和。Spark原生支持数值类型的累加器,也可以自定义开发实现新类型的支持。

累加器由一个初始值V通过调用[SparkContext#accumulator SparkContext.accumulator]创建。在群集上运行的任务可以使用“+=”运算符写入,但是不能读取它的值。只有Driver程序使用[#value]方法可以读取累加器的值。例如:

1.   scala> val accum = sc.accumulator(0)
2.  accum: org.apache.spark.Accumulator[Int] = 0
3.  scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
4.  ...
5.  10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
6.
7.  scala> accum.value
8.  res2: Int = 10

Accumulator.scala的源码如下。

1.    @deprecated("use AccumulatorV2", "2.0.0")
2.  class Accumulator[T] private[spark] (
3.      // SI-8813: 必须显式地定义private val,否则Scala 2.11不编译
4.      @transient private val initialValue: T,
5.      param: AccumulatorParam[T],
6.      name: Option[String] = None,
7.      countFailedValues: Boolean = false)
8.    extends Accumulable[T, T](initialValue, param, name, countFailedValues)
9.  ......

Accumulator是一个类,继承自Accumulable。Accumulator已经被标识为过时的(deprecated),在Spark 2.0版本中可以使用AccumulatorV2。

1.   abstract class AccumulatorV2[IN, OUT] extends Serializable {
2.    private[spark] var metadata: AccumulatorMetadata = _
3.    private[this] var atDriverSide = true
4.  ......

可以通过继承创建自己的类型AccumulatorV2。AccumulatorV2抽象类有几种方法必须覆盖:reset用于将累加器重置为零,add用于将另一个值添加到累加器中,merge用于将另一个相同类型的累加器合并到该累加器中。例如,假设有一个MyVector代表数学向量的类,代码如下:

1.   class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
2.
3.    private val myVector: MyVector = MyVector.createZeroVector
4.
5.    def reset(): Unit = {
6.      myVector.reset()
7.    }
8.
9.    def add(v: MyVector): Unit = {
10.     myVector.add(v)
11.   }
12.   ...
13. }
14.
15. //创建一个这种类型的累加器
16. val myVectorAcc = new VectorAccumulatorV2
17. //然后,把它注册到Spark上下文中
18. sc.register(myVectorAcc, "MyVectorAcc1")

当自定义自己的AccumulatorV2类型时,生成的类型可能与添加的元素的类型不同。累加器更新仅在Action动作内执行,Spark保证每个任务对累加器的更新只能应用一次,即重新启动的任务将不会更新该值。在transformations转换中,如果重新执行任务或作业阶段,则每个任务的更新可能会被多次执行。Accumulators不会改变Spark的Lazy评估模型。如果它们在RDD的操作中更新,则只有在RDD作为操作的一部分进行计算时,才会更新其值。因此,累加器更新不能保证在Lazy变换中执行时执行map()。

以下代码中,accum仍然为0 ,因为没有action算子触发map操作。

1.   val accum = sc.longAccumulator
2.  data.map { x => accum.add(x); x }