Spark MLlib机器学习实践(第2版)
上QQ阅读APP看书,第一时间看更新

3.3 RDD应用API详解

本书的目的是教会读者在实际运用中使用RDD去解决相关问题,因此笔者建议读者更多地将注重点转移到真实的程序编写上。

本节将带领大家学习RDD的各种API用法,读者尽量掌握这些RDD的用法。当然本节的内容可能有点多,读者至少要对这些API有个印象,在后文的数据分析时需要查询某个具体方法的用法时再回来查看。

3.3.1 使用aggregate方法对给定的数据集进行方法设定

RDD中比较常见的一种方式是aggregate方法,其功能是对给定的数据集进行方法设定,源码如下:

        def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) =>
    U): U

从源码可以看到,aggregate定义了几个泛型参数。U是数据类型,可以传入任意类型的数据。seqOp是给定的计算方法,输出结果要求也是U类型,而第二个combOp是合并方法,将第一个计算方法得出的结果与源码中zeroValue进行合并。需要指出的是:zeroValue并不是一个固定的值,而是一个没有实际意义的“空值”,它没有任何内容,而是确认传入的结果。具体程序编写如程序3-1所示。

代码位置://SRC//C03// testRDDMethod.scala

程序3-1 aggregate方法

        import org.apache.spark.{SparkContext, SparkConf}

        object testRDDMethod {
          def main(args: Array[String]) {
            val conf = new SparkConf()                //创建环境变量
            .setMaster("local")                        //设置本地化处理
            .setAppName("testRDDMethod")         //设定名称
            val sc = new SparkContext(conf)          //创建环境变量实例
            val arr = sc.parallelize(Array(1,2,3,4,5,6))         //输入数组数据集
            val result = arr.aggregate(0)(math.max(_, _), _ + _)//使用aggregate方法
            println(result)                             //打印结果
          }
        }

从上面代码中可以看到,RDD在使用时是借助于Spark环境进行工作的,因此需要对Spark环境变量进行设置。

RDD工作在Spark上,因此,parallelize方法是将内存数据读入Spark系统中,作为一个整体数据集。下面的math.max方法用于比较数据集中数据的大小,第二个“_+_”方法是对传递的第一个比较方法结果进行处理。

由于这里只传入了第一个比较大小方法的结果6,此时与中立数空值相加,最终结果如下:6

 6

提示

parallelize是SparkContext内方法,由于篇幅关系,笔者不单独介绍,而作为RDD的顺带学习内容,提取一些比较重要的做讲解。

程序3-1中,Aggregate方法处理的是第一个计算方法结果和空值的计算结果。这里空值与数组中最大数6相加,结果是6不言而喻。

parallelize是SparkContext中的方法,其源码如下:

        def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism):
    RDD[T]

从代码中可以看到,括号中第一个参数是数据,而同时其还有一个带有默认数值的参数,这个参数默认为1,表示的是将数据值分布在多少个数据节点中存放。程序3-2中,笔者将其设定为2,请读者观察结果。

代码位置://SRC//C03// testRDDMethod2.scala

程序3-2 参数改变后

        import org.apache.spark.{SparkContext, SparkConf}

        object testRDDMethod2 {
          def main(args: Array[String]) {
            val conf = new SparkConf()                //创建环境变量
            .setMaster("local")                        //设置本地化处理
            .setAppName("testRDDMethod2")             //设定名称
            val sc = new SparkContext(conf)          //创建环境变量实例
            val arr = sc.parallelize(Array(1,2,3,4,5,6),2) //输入数组数据集
            val result = arr.aggregate(0)(math.max(_, _), _ + _)//使用aggregate方法
            println(result)                             //打印结果
          }
        }

先说下输出结果,结果是9。

原因是在Parallelize中,将数据分成2个节点存储,即:

Array(1,2,3,4,5,6) -> Array(1,2,3) + Array(4,5,6)

提示

建议读者将Parallelize进行更多分区查看输出结果。

这样数据在进行下一步的aggregate方法时,分别有两个数据集传入math.max方法。而max方法分别查找出两个数据集的最大值,分别是3和6。这样在调用aggregate方法的第二个计算方法时,将查找到的数据值进行相加,获得了最大值9。而此时由于不再需要zeroValue值,则将其舍去不用。

除此之外,程序3-3还演示了aggregate方法用于字符串的操作。

代码位置://SRC//C03// testRDDMethod3.scala

程序3-3 aggregate方法用于字符串

        import org.apache.spark.{SparkContext, SparkConf}

        object testRDDMethod3 {
          def main(args: Array[String]) {
            val conf = new SparkConf()            //创建环境变量
            .setMaster("local")                    //设置本地化处理
            .setAppName("testRDDMethod3")       //设定名称
            val sc = new SparkContext(conf)      //创建环境变量实例
            val arr = sc.parallelize(Array("abc","b","c","d","e","f"))  //创建数据集
            //调用计算方法
            val result = arr.aggregate("")((value,word) => value + word, _ + _)

            println(result)                        //打印结果
          }
        }

请读者自行验证输出结果。

3.3.2 提前计算的cache方法

cache方法的作用是将数据内容计算并保存在计算节点的内存中。这个方法的使用是针对Spark的Lazy数据处理模式。

在Lazy模式中,数据在编译和未使用时是不进行计算的,而仅仅保存其存储地址,只有在Action方法到来时才正式计算。这样做的好处在于可以极大地减少存储空间,从而提高利用率,而有时必须要求数据进行计算,此时就需要使用cache方法,其使用方法如程序3-4所示。

代码位置://SRC//C03// CacheTest.scala

程序3-4 cache方法

        import org.apache.spark.{SparkContext, SparkConf}

        object CacheTest {
          def main(args: Array[String]) {
            val conf = new SparkConf()                     //创建环境变量
            .setMaster("local")                             //设置本地化处理
            .setAppName("CacheTest ")                      //设定名称
              val sc = new SparkContext(conf)           //创建环境变量实例
            val arr = sc.parallelize(Array("abc","b","c","d","e","f"))  //设定数据集
            println(arr)                                     //打印结果
            println("----------------")                   //分隔符
            println(arr.cache())                           //打印结果
          }
        }

这里分隔符分割了相同的数据,分别是未使用cache方法进行处理的数据和使用cache方法进行处理的数据,其结果如下:

        ParallelCollectionRDD[0] at parallelize at CacheTest.scala:12
        ----------------
        abc,b,c,d,e,f

从结果中可以看到,第一行打印结果是一个RDD存储格式,而第二行打印结果是真正的数据结果。

需要说明的是:除了使用cache方法外,RDD还有专门的采用迭代形式打印数据的专用方法,具体请见例子3-5。

代码位置://SRC//C03// CacheTest2.scala

程序3-5 采用迭代形式打印数据

        import org.apache.spark.{SparkContext, SparkConf}

        object CacheTest2 {
          def main(args: Array[String]) {
            val conf = new SparkConf()            //创建环境变量
            .setMaster("local")                    //设置本地化处理
            .setAppName("CacheTest2 ")            //设定名称
                val sc = new SparkContext(conf)      //创建环境变量实例
            val arr = sc.parallelize(Array("abc","b","c","d","e","f"))  //设定数据集
            arr.foreach(println)                   //打印结果
          }
        }

arr.foreach(println)是一个专门用来打印未进行Action操作的数据的专用方法,可以对数据进行提早计算。

具体内容请读者自行运行查看。

3.3.3 笛卡尔操作的cartesian方法

此方法是用于对不同的数组进行笛卡尔操作,要求是数据集的长度必须相同,结果作为一个新的数据集返回。其使用方法如程序3-6所示。

代码位置://SRC//C03// Cartesian.scala

程序3-6 cartesian方法

        import org.apache.spark.{SparkContext, SparkConf}

        object Cartesian{
          def main(args: Array[String]) {
            val conf = new SparkConf()                     //创建环境变量
            .setMaster("local")                             //设置本地化处理
            .setAppName("Cartesian ")                      //设定名称
            val sc = new SparkContext(conf)               //创建环境变量实例
            var arr = sc.parallelize(Array(1,2,3,4,5,6))             //创建第一个数组
            var arr2 = sc.parallelize(Array(6,5,4,3,2,1))            //创建第二个数据
            val result = arr.cartesian(arr2)                       //进行笛卡尔计算
            result.foreach(print)                               //打印结果
          }
        }

打印结果如下:

        (1,6)(1,5)(1,4)(1,3)(1,2)(1,1)(2,6)(2,5)(2,4)(2,3)(2,2)(2,1)(3,6)(3,5)(3,4)
    (3,3)(3,2)(3,1)(4,6)(4,5)(4,4)(4,3)(4,2)(4,1)(5,6)(5,5)(5,4)(5,3)(5,2)(5,1)(6,
    6)(6,5)(6,4)(6,3)(6,2)(6,1)

3.3.4 分片存储的coalesce方法

Coalesce方法是将已经存储的数据重新分片后再进行存储,其源码如下:

        def  coalesce(numPartitions:  Int,  shuffle:  Boolean  =  false)(implicit  ord:
    Ordering[T] = null): RDD[T]

这里的第一个参数是将数据重新分成的片数,布尔参数指的是将数据分成更小的片时使用,举例中将其设置为true,程序代码如3-7所示。

代码位置://SRC//C03// Coalesce.scala

程序3-7 coalesce方法

        import org.apache.spark.{SparkContext, SparkConf}

        object Coalesce{
          def main(args: Array[String]) {
            val conf = new SparkConf()                     //创建环境变量
            .setMaster("local")                        //设置本地化处理
            .setAppName("Coalesce ")                  //设定名称
            val sc = new SparkContext(conf)          //创建环境变量实例
            val arr = sc.parallelize(Array(1,2,3,4,5,6))         //创建数据集
            val arr2 = arr.coalesce(2,true)                   //将数据重新分区
            val result = arr.aggregate(0)(math.max(_, _), _ + _)   //计算数据值
            println(result)                             //打印结果
            //计算重新分区数据值
            val result2 = arr2.aggregate(0)(math.max(_, _), _ + _)
            println(result2)  }                                  //打印结果
        }

请读者运行代码自行验证结果。

除此之外,RDD中还有一个repartition方法与这个coalesce方法类似,均是将数据重新分区组合,其使用方法如程序3-8所示。

代码位置://SRC//C03// Repartition.scala

程序3-8 repartition方法

        import org.apache.spark.{SparkContext, SparkConf}

        object Repartition {
          def main(args: Array[String]) {
            val conf = new SparkConf()                //创建环境变量
            .setMaster("local")                        //设置本地化处理
            .setAppName("Repartition")                     //设定名称
            val sc = new SparkContext(conf)          //创建环境变量实例
            val arr = sc.parallelize(Array(1,2,3,4,5,6))         //创建数据集
            arr = arr.repartition(3)                      //重新分区
            println(arr.partitions.length)}                 //打印分区数
        }

打印结果请读者自行验证。

3.3.5 以value计算的countByValue方法

countByValue方法是计算数据集中某个数据出现的个数,并将其以map的形式返回。具体程序如程序3-9所示。

代码位置://SRC//C03// countByValue.scala

程序3-9 countByValue方法

        import org.apache.spark.{SparkContext, SparkConf}

        object countByValue{
          def main(args: Array[String]) {
            val conf = new SparkConf()                //创建环境变量
            .setMaster("local")                        //设置本地化处理
            .setAppName("countByValue")               //设定名称
            val sc = new SparkContext(conf)          //创建环境变量实例
            val arr = sc.parallelize(Array(1,2,3,4,5,6))     //创建数据集
            val result = arr.countByValue()          //调用方法计算个数
            result.foreach(print)                      //打印结果
          }
        }

最终结果如下:

(5,1)(1,1)(6,1)(2,1)(3,1)(4,1)

3.3.6 以key计算的countByKey方法

countByKey方法与countByValue方法有本质的区别。countByKey是计算数组中元数据键值对key出现的个数,具体见程序3-10所示。

代码位置://SRC//C03// countByKey.scala

程序3-10 countByKey方法

        import org.apache.spark.{SparkContext, SparkConf}

        object countByKey{
          def main(args: Array[String]) {
            val conf = new SparkConf()            //创建环境变量
            .setMaster("local")                    //设置本地化处理
            .setAppName("countByKey")             //设定名称
            val sc = new SparkContext(conf)      //创建环境变量实例
            //创建数据集
            var arr = sc.parallelize(Array((1, "cool"), (2, "good"), (1, "bad"), (1,
    "fine")))
            val result = arr.countByKey()        //进行计数
            result.foreach(print)                 //打印结果
          }
        }

打印结果如下:

(1,3)(2,1)

从打印结果可以看到,这里计算了数据键值对的key出现的个数,即1出现了3次,2出现了1次。

3.3.7 除去数据集中重复项的distinct方法

distinct方法的作用是去除数据集中重复的项,如程序3-11所示。

代码位置://SRC//C03// distinct.scala

程序3-11 distinct方法

        import org.apache.spark.{SparkContext, SparkConf}

        object distinct{
          def main(args: Array[String]) {
            val conf = new SparkConf()                //创建环境变量
            .setMaster("local")                        //设置本地化处理
            .setAppName("distinct")                    //设定名称
            val sc = new SparkContext(conf)          //创建环境变量实例
            var arr = sc.parallelize(Array(("cool"), ("good"), ("bad"),
    ("fine"),("good"),("cool")))                     //创建数据集
            val result = arr.distinct()               //进行去重操作
            result.foreach(println)                    //打印最终结果
          }
        }

打印结果如下:

        cool
        fine
        bad
        good

3.3.8 过滤数据的filter方法

filter方法是一个比较常用的方法,它用来对数据集进行过滤,如程序3-12所示。

代码位置://SRC//C03// filter.scala

程序3-12 filter方法

        import org.apache.spark.{SparkContext, SparkConf}

        object filter{
          def main(args: Array[String]) {
            val conf = new SparkConf()                 //创建环境变量
            .setMaster("local")                        //设置本地化处理
            .setAppName("filter ")                     //设定名称
            val sc = new SparkContext(conf)          //创建环境变量实例
            var arr = sc.parallelize(Array(1,2,3,4,5))      //创建数据集
            val result = arr.filter(_ >= 3)                //进行筛选工作
            result.foreach(println)                     //打印最终结果
          }
        }

具体结果请读者自行验证。这里需要说明的是,在filter方法中,使用的方法是“_ >= 3”,这里调用了Scala编程中的编程规范,下划线_的作用是作为占位符标记所有的传过来的数据。在此方法中,数组的数据(1,2,3,4,5)依次传进来替代了占位符。

3.3.9 以行为单位操作数据的flatMap方法

flatMap方法是对RDD中的数据集进行整体操作的一个特殊方法,因为其在定义时就是针对数据集进行操作,因此最终返回的也是一个数据集。flatMap方法应用程序如3-13所示。

代码位置://SRC//C03// flatMap.scala

程序3-13 flatMap方法

        import org.apache.spark.{SparkContext, SparkConf}
        object flatMap{
          def main(args: Array[String]) {
            val conf = new SparkConf()                //创建环境变量
            .setMaster("local")                             //设置本地化处理
            .setAppName("flatMap")                     //设定名称
            val sc = new SparkContext(conf)          //创建环境变量实例
            var arr = sc.parallelize(Array(1,2,3,4,5))       //创建数据集
            val result = arr.flatMap(x => List(x + 1)).collect()  //进行数据集计算
            result.foreach(println)                    //打印结果
          }
        }

请读者自行验证打印结果,除此之外,请读者参考下一节的map方法,对它们的操作结果做一个比较。

3.3.10 以单个数据为目标进行操作的map方法

map方法可以对RDD中的数据集中的数据进行逐个操作,它与flatmap不同之处在于,flatmap是将数据集中的数据作为一个整体去处理,之后再对其中的数据做计算。而map方法直接对数据集中的数据做单独的处理。map方法应用程序如3-14所示。

代码位置://SRC//C03// testMap.scala

程序3-14 map方法

        import org.apache.spark.{SparkContext, SparkConf}

        object testMap {
          def main(args: Array[String]) {
            val conf = new SparkConf()            //创建环境变量
            .setMaster("local")                        //设置本地化处理
            .setAppName("testMap")                //设定名称
            val sc = new SparkContext(conf)      //创建环境变量实例
            var arr = sc.parallelize(Array(1,2,3,4,5))  //创建数据集
            val result = arr.map(x => List(x + 1)).collect()  //进行单个数据计算
            result.foreach(println)               //打印结果
          }
        }

请读者自行打印结果比较。

提示

RDD中有很多相似的方法和粗略的计算方法,这里需要读者更加细心地去挖掘。

3.3.11 分组数据的groupBy方法

groupBy方法是将传入的数据进行分组,其分组的依据是作为参数传入的计算方法。groupBy方法的程序代码如3-15所示。

代码位置://SRC//C03// groupBy.scala

程序3-15 groupBy方法

        import org.apache.spark.{SparkContext, SparkConf}

        object groupBy{
          def main(args: Array[String]) {
            val conf = new SparkConf()                //创建环境变量
            .setMaster("local")                        //设置本地化处理
            .setAppName("groupBy")                     //设定名称
            val sc = new SparkContext(conf)          //创建环境变量实例
            var arr = sc.parallelize(Array(1,2,3,4,5))  //创建数据集
            arr.groupBy(myFilter(_), 1)                   //设置第一个分组
            arr.groupBy(myFilter2(_), 2)                  //设置第二个分组
          }

          def myFilter(num: Int): Unit = {                //自定义方法
            num >=3                                          //条件
          }
          def myFilter2(num: Int): Unit = {              //自定义方法
            num <3                                            //条件
          }

        }

在程序3-15中,笔者采用了两个自定义的方法,即myFilter和myFilter2作为分组条件。然后将分组条件的方法作为一个整体传入groupBy中。

groupBy的第一个参数是传入的方法名,而第二个参数是分组的标签值。具体打印结果请读者自行完成。

3.3.12 生成键值对的keyBy方法

keyBy方法是为数据集中的每个个体数据增加一个key,从而可以与原来的个体数据形成键值对。keyBy方法程序代码如3-16所示。

代码位置://SRC//C03// keyBy.scala

程序3-16 keyBy方法

        import org.apache.spark.{SparkContext, SparkConf}

        object keyBy{
          def main(args: Array[String]) {
            val conf = new SparkConf()        //创建环境变量
            .setMaster("local")                   //设置本地化处理
            .setAppName("keyBy")               //设定名称
            val sc = new SparkContext(conf)     //创建环境变量实例
            //创建数据集
            var str = sc.parallelize(Array("one","two","three","four","five"))
            val str2 = str.keyBy(word => word.size)   //设置配置方法
            str2.foreach(println)                   //打印结果
          }
        }

最终打印结果如下:

(3,one)(3,two)(5,three)(4,four)(4,five)

这里可以很明显地看出,每个数据(单词)与自己的字符长度形成一个数字键值对。

3.3.13 同时对两个数据进行处理的reduce方法

reduce方法是RDD中一个较为重要的数据处理方法,与map方法不同之处在于,它在处理数据时需要两个参数。reduce方法演示如程序3-17所示。

代码位置://SRC//C03// testReduce.scala

程序3-17 reduce方法

        import org.apache.spark.{SparkContext, SparkConf}

        object testReduce {
          def main(args: Array[String]) {
            val conf = new SparkConf()           //创建环境变量
            .setMaster("local")                 //设置本地化处理
            .setAppName("testReduce")          //设定名称
            val sc = new SparkContext(conf)   //创建环境变量实例
            var str = sc.parallelize(Array("one","two","three","four","five"))//创建
    数据集
            val result = str.reduce(_ + _)         //进行数据拟合
            result.foreach(print)                //打印数据结果
          }
        }

打印结果如下:

onetwothreefourfive

从结果上可以看到,reduce方法主要是对传入的数据进行合并处理。它的两个下划线分别代表不同的内容,第一个下划线代表数据集的第一个数据,而第二个下划线在第一次合并处理时代表空集,即以下方式进行:

        null + one ->  one + two -> onetwo + three -> onetwothree + four -> onetwothreefour
    + five

除此之外,reduce方法还可以传入一个已定义的方法作为数据处理方法,程序3-18中演示了一种寻找最长字符串的一段代码。

代码位置://SRC//C03// testReduce2.scala

程序3-18 寻找最长字符串

        import org.apache.spark.{SparkContext, SparkConf}

        object testRDDMethod {
          def main(args: Array[String]) {
            val conf = new SparkConf()                                     //创建环境变量
            .setMaster("local")                    //设置本地化处理
            .setAppName("testReduce2")            //设定名称
            val sc = new SparkContext(conf)      //创建环境变量实例
            //创建数据集
            var str = sc.parallelize(Array("one","two","three","four","five"))
            val result = str.reduce(myFun)       //进行数据拟合
            result.foreach(print)             //打印结果
          }

          def myFun(str1:String,str2:String):String = {  //创建方法
            var str = str1                     //设置确定方法
            if(str2.size >= str.size){                //比较长度
              str = str2                   //替换
            }
            return str                     //返回最长的那个字符串
          }
        }

3.3.14 对数据进行重新排序的sortBy方法

sortBy方法也是一个常用的排序方法,其主要功能是对已有的RDD重新排序,并将重新排序后的数据生成一个新的RDD,其源码如下:

        sortBy[K](f:  (T)  ⇒  K,  ascending:  Boolean  =  true,  numPartitions:  Int  =
    this.partitions.size)

从源码上可以看到,sortBy方法主要有3个参数,第一个为传入方法,用以计算排序的数据。第二个是指定排序的值按升序还是降序显示。第三个是分片的数量。程序3-19中演示了分别根据不同的数据排序方法对数据集进行排序的代码。

代码位置://SRC//C03// sortBy.scala

程序3-19 sortBy方法

        import org.apache.spark.{SparkContext, SparkConf}

        object sortBy {
          def main(args: Array[String]) {
            val conf = new SparkConf()      //创建环境变量
            .setMaster("local")               //设置本地化处理
            .setAppName("sortBy")             //设定名称
            val sc = new SparkContext(conf)      //创建环境变量实例
            //创建数据集
            var str =
    sc.parallelize(Array((5,"b"),(6,"a"),(1,"f"),(3,"d"),(4,"c"),(2,"e")))
            str = str.sortBy(word => word._1,true)           //按第一个数据排序
            val str2 = str.sortBy(word => word._2,true) //按第二个数据排序
            str.foreach(print)                         //打印输出结果
            str2.foreach(print)                        //打印输出结果
          }
        }

从程序3-19可以看到,在程序的排序部分,分别使用了按元组第一个字符排序和按第二个字符排序的方法。这里需要说明的是,“._1”的格式是元组中数据符号的表示方法,意思是使用当前元组中第一个数据,同样的表示,“._2”是使用元组中第二个数据。

最终显示结果如下:

        (1,f)(2,e)(3,d)(4,c)(5,b)(6,a)            //第一个输出结果
        (6,a)(5,b)(4,c)(3,d)(2,e)(1,f)            //第二个输出结果

从结果可以很清楚地看到,第一个输出结果以数字为顺序进行排序,第二个输出结果以字母为顺序进行排序。

提示

请读者尝试更改第二个sortBy中第二个布尔参数进行排序。

3.3.15 合并压缩的zip方法

zip方法是常用的合并压缩算法,它可以将若干个RDD压缩成一个新的RDD,进而形成一系列的键值对存储形式的新RDD。具体程序见3-20。

代码位置://SRC//C03// testZip.scala

程序3-20 zip方法

        import org.apache.spark.{SparkContext, SparkConf}

        object testZip{
          def main(args: Array[String]) {
            val conf = new SparkConf()            //创建环境变量
            .setMaster("local")                    //设置本地化处理
            .setAppName("testZip")            //设定名称
            val sc = new SparkContext(conf)      //创建环境变量实例
            val arr1 = Array(1,2,3,4,5,6)        //创建数据集1
            val arr2 = Array("a","b","c","d","e","f") //创建数据集2
            val arr3 = Array("g","h","i","j","k","l")  //创建数据集3
            val arr4 = arr1.zip(arr2).zip(arr3)      //进行压缩算法
            arr4.foreach(print)               //打印结果
          }
        }

最终结果如下:

((1,a),g)((2,b),h)((3,c),i)((4,d),j)((5,e),k)((6,f),l)

从结果可以看到,这里数据被压缩成一个双重的键值对形式的数据。