3.3 RDD应用API详解
本书的目的是教会读者在实际运用中使用RDD去解决相关问题,因此笔者建议读者更多地将注重点转移到真实的程序编写上。
本节将带领大家学习RDD的各种API用法,读者尽量掌握这些RDD的用法。当然本节的内容可能有点多,读者至少要对这些API有个印象,在后文的数据分析时需要查询某个具体方法的用法时再回来查看。
3.3.1 使用aggregate方法对给定的数据集进行方法设定
RDD中比较常见的一种方式是aggregate方法,其功能是对给定的数据集进行方法设定,源码如下:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
从源码可以看到,aggregate定义了几个泛型参数。U是数据类型,可以传入任意类型的数据。seqOp是给定的计算方法,输出结果要求也是U类型,而第二个combOp是合并方法,将第一个计算方法得出的结果与源码中zeroValue进行合并。需要指出的是:zeroValue并不是一个固定的值,而是一个没有实际意义的“空值”,它没有任何内容,而是确认传入的结果。具体程序编写如程序3-1所示。
代码位置://SRC//C03// testRDDMethod.scala
程序3-1 aggregate方法
import org.apache.spark.{SparkContext, SparkConf} object testRDDMethod { def main(args: Array[String]) { val conf = new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("testRDDMethod") //设定名称 val sc = new SparkContext(conf) //创建环境变量实例 val arr = sc.parallelize(Array(1,2,3,4,5,6)) //输入数组数据集 val result = arr.aggregate(0)(math.max(_, _), _ + _)//使用aggregate方法 println(result) //打印结果 } }
从上面代码中可以看到,RDD在使用时是借助于Spark环境进行工作的,因此需要对Spark环境变量进行设置。
RDD工作在Spark上,因此,parallelize方法是将内存数据读入Spark系统中,作为一个整体数据集。下面的math.max方法用于比较数据集中数据的大小,第二个“_+_”方法是对传递的第一个比较方法结果进行处理。
由于这里只传入了第一个比较大小方法的结果6,此时与中立数空值相加,最终结果如下:6
6
提示
parallelize是SparkContext内方法,由于篇幅关系,笔者不单独介绍,而作为RDD的顺带学习内容,提取一些比较重要的做讲解。
程序3-1中,Aggregate方法处理的是第一个计算方法结果和空值的计算结果。这里空值与数组中最大数6相加,结果是6不言而喻。
parallelize是SparkContext中的方法,其源码如下:
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
从代码中可以看到,括号中第一个参数是数据,而同时其还有一个带有默认数值的参数,这个参数默认为1,表示的是将数据值分布在多少个数据节点中存放。程序3-2中,笔者将其设定为2,请读者观察结果。
代码位置://SRC//C03// testRDDMethod2.scala
程序3-2 参数改变后
import org.apache.spark.{SparkContext, SparkConf} object testRDDMethod2 { def main(args: Array[String]) { val conf = new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("testRDDMethod2") //设定名称 val sc = new SparkContext(conf) //创建环境变量实例 val arr = sc.parallelize(Array(1,2,3,4,5,6),2) //输入数组数据集 val result = arr.aggregate(0)(math.max(_, _), _ + _)//使用aggregate方法 println(result) //打印结果 } }
先说下输出结果,结果是9。
原因是在Parallelize中,将数据分成2个节点存储,即:
Array(1,2,3,4,5,6) -> Array(1,2,3) + Array(4,5,6)
提示
建议读者将Parallelize进行更多分区查看输出结果。
这样数据在进行下一步的aggregate方法时,分别有两个数据集传入math.max方法。而max方法分别查找出两个数据集的最大值,分别是3和6。这样在调用aggregate方法的第二个计算方法时,将查找到的数据值进行相加,获得了最大值9。而此时由于不再需要zeroValue值,则将其舍去不用。
除此之外,程序3-3还演示了aggregate方法用于字符串的操作。
代码位置://SRC//C03// testRDDMethod3.scala
程序3-3 aggregate方法用于字符串
import org.apache.spark.{SparkContext, SparkConf} object testRDDMethod3 { def main(args: Array[String]) { val conf = new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("testRDDMethod3") //设定名称 val sc = new SparkContext(conf) //创建环境变量实例 val arr = sc.parallelize(Array("abc","b","c","d","e","f")) //创建数据集 //调用计算方法 val result = arr.aggregate("")((value,word) => value + word, _ + _) println(result) //打印结果 } }
请读者自行验证输出结果。
3.3.2 提前计算的cache方法
cache方法的作用是将数据内容计算并保存在计算节点的内存中。这个方法的使用是针对Spark的Lazy数据处理模式。
在Lazy模式中,数据在编译和未使用时是不进行计算的,而仅仅保存其存储地址,只有在Action方法到来时才正式计算。这样做的好处在于可以极大地减少存储空间,从而提高利用率,而有时必须要求数据进行计算,此时就需要使用cache方法,其使用方法如程序3-4所示。
代码位置://SRC//C03// CacheTest.scala
程序3-4 cache方法
import org.apache.spark.{SparkContext, SparkConf} object CacheTest { def main(args: Array[String]) { val conf = new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("CacheTest ") //设定名称 val sc = new SparkContext(conf) //创建环境变量实例 val arr = sc.parallelize(Array("abc","b","c","d","e","f")) //设定数据集 println(arr) //打印结果 println("----------------") //分隔符 println(arr.cache()) //打印结果 } }
这里分隔符分割了相同的数据,分别是未使用cache方法进行处理的数据和使用cache方法进行处理的数据,其结果如下:
ParallelCollectionRDD[0] at parallelize at CacheTest.scala:12 ---------------- abc,b,c,d,e,f
从结果中可以看到,第一行打印结果是一个RDD存储格式,而第二行打印结果是真正的数据结果。
需要说明的是:除了使用cache方法外,RDD还有专门的采用迭代形式打印数据的专用方法,具体请见例子3-5。
代码位置://SRC//C03// CacheTest2.scala
程序3-5 采用迭代形式打印数据
import org.apache.spark.{SparkContext, SparkConf} object CacheTest2 { def main(args: Array[String]) { val conf = new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("CacheTest2 ") //设定名称 val sc = new SparkContext(conf) //创建环境变量实例 val arr = sc.parallelize(Array("abc","b","c","d","e","f")) //设定数据集 arr.foreach(println) //打印结果 } }
arr.foreach(println)是一个专门用来打印未进行Action操作的数据的专用方法,可以对数据进行提早计算。
具体内容请读者自行运行查看。
3.3.3 笛卡尔操作的cartesian方法
此方法是用于对不同的数组进行笛卡尔操作,要求是数据集的长度必须相同,结果作为一个新的数据集返回。其使用方法如程序3-6所示。
代码位置://SRC//C03// Cartesian.scala
程序3-6 cartesian方法
import org.apache.spark.{SparkContext, SparkConf} object Cartesian{ def main(args: Array[String]) { val conf = new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("Cartesian ") //设定名称 val sc = new SparkContext(conf) //创建环境变量实例 var arr = sc.parallelize(Array(1,2,3,4,5,6)) //创建第一个数组 var arr2 = sc.parallelize(Array(6,5,4,3,2,1)) //创建第二个数据 val result = arr.cartesian(arr2) //进行笛卡尔计算 result.foreach(print) //打印结果 } }
打印结果如下:
(1,6)(1,5)(1,4)(1,3)(1,2)(1,1)(2,6)(2,5)(2,4)(2,3)(2,2)(2,1)(3,6)(3,5)(3,4) (3,3)(3,2)(3,1)(4,6)(4,5)(4,4)(4,3)(4,2)(4,1)(5,6)(5,5)(5,4)(5,3)(5,2)(5,1)(6, 6)(6,5)(6,4)(6,3)(6,2)(6,1)
3.3.4 分片存储的coalesce方法
Coalesce方法是将已经存储的数据重新分片后再进行存储,其源码如下:
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
这里的第一个参数是将数据重新分成的片数,布尔参数指的是将数据分成更小的片时使用,举例中将其设置为true,程序代码如3-7所示。
代码位置://SRC//C03// Coalesce.scala
程序3-7 coalesce方法
import org.apache.spark.{SparkContext, SparkConf} object Coalesce{ def main(args: Array[String]) { val conf = new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("Coalesce ") //设定名称 val sc = new SparkContext(conf) //创建环境变量实例 val arr = sc.parallelize(Array(1,2,3,4,5,6)) //创建数据集 val arr2 = arr.coalesce(2,true) //将数据重新分区 val result = arr.aggregate(0)(math.max(_, _), _ + _) //计算数据值 println(result) //打印结果 //计算重新分区数据值 val result2 = arr2.aggregate(0)(math.max(_, _), _ + _) println(result2) } //打印结果 }
请读者运行代码自行验证结果。
除此之外,RDD中还有一个repartition方法与这个coalesce方法类似,均是将数据重新分区组合,其使用方法如程序3-8所示。
代码位置://SRC//C03// Repartition.scala
程序3-8 repartition方法
import org.apache.spark.{SparkContext, SparkConf} object Repartition { def main(args: Array[String]) { val conf = new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("Repartition") //设定名称 val sc = new SparkContext(conf) //创建环境变量实例 val arr = sc.parallelize(Array(1,2,3,4,5,6)) //创建数据集 arr = arr.repartition(3) //重新分区 println(arr.partitions.length)} //打印分区数 }
打印结果请读者自行验证。
3.3.5 以value计算的countByValue方法
countByValue方法是计算数据集中某个数据出现的个数,并将其以map的形式返回。具体程序如程序3-9所示。
代码位置://SRC//C03// countByValue.scala
程序3-9 countByValue方法
import org.apache.spark.{SparkContext, SparkConf} object countByValue{ def main(args: Array[String]) { val conf = new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("countByValue") //设定名称 val sc = new SparkContext(conf) //创建环境变量实例 val arr = sc.parallelize(Array(1,2,3,4,5,6)) //创建数据集 val result = arr.countByValue() //调用方法计算个数 result.foreach(print) //打印结果 } }
最终结果如下:
(5,1)(1,1)(6,1)(2,1)(3,1)(4,1)
3.3.6 以key计算的countByKey方法
countByKey方法与countByValue方法有本质的区别。countByKey是计算数组中元数据键值对key出现的个数,具体见程序3-10所示。
代码位置://SRC//C03// countByKey.scala
程序3-10 countByKey方法
import org.apache.spark.{SparkContext, SparkConf} object countByKey{ def main(args: Array[String]) { val conf = new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("countByKey") //设定名称 val sc = new SparkContext(conf) //创建环境变量实例 //创建数据集 var arr = sc.parallelize(Array((1, "cool"), (2, "good"), (1, "bad"), (1, "fine"))) val result = arr.countByKey() //进行计数 result.foreach(print) //打印结果 } }
打印结果如下:
(1,3)(2,1)
从打印结果可以看到,这里计算了数据键值对的key出现的个数,即1出现了3次,2出现了1次。
3.3.7 除去数据集中重复项的distinct方法
distinct方法的作用是去除数据集中重复的项,如程序3-11所示。
代码位置://SRC//C03// distinct.scala
程序3-11 distinct方法
import org.apache.spark.{SparkContext, SparkConf} object distinct{ def main(args: Array[String]) { val conf = new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("distinct") //设定名称 val sc = new SparkContext(conf) //创建环境变量实例 var arr = sc.parallelize(Array(("cool"), ("good"), ("bad"), ("fine"),("good"),("cool"))) //创建数据集 val result = arr.distinct() //进行去重操作 result.foreach(println) //打印最终结果 } }
打印结果如下:
cool fine bad good
3.3.8 过滤数据的filter方法
filter方法是一个比较常用的方法,它用来对数据集进行过滤,如程序3-12所示。
代码位置://SRC//C03// filter.scala
程序3-12 filter方法
import org.apache.spark.{SparkContext, SparkConf} object filter{ def main(args: Array[String]) { val conf = new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("filter ") //设定名称 val sc = new SparkContext(conf) //创建环境变量实例 var arr = sc.parallelize(Array(1,2,3,4,5)) //创建数据集 val result = arr.filter(_ >= 3) //进行筛选工作 result.foreach(println) //打印最终结果 } }
具体结果请读者自行验证。这里需要说明的是,在filter方法中,使用的方法是“_ >= 3”,这里调用了Scala编程中的编程规范,下划线_的作用是作为占位符标记所有的传过来的数据。在此方法中,数组的数据(1,2,3,4,5)依次传进来替代了占位符。
3.3.9 以行为单位操作数据的flatMap方法
flatMap方法是对RDD中的数据集进行整体操作的一个特殊方法,因为其在定义时就是针对数据集进行操作,因此最终返回的也是一个数据集。flatMap方法应用程序如3-13所示。
代码位置://SRC//C03// flatMap.scala
程序3-13 flatMap方法
import org.apache.spark.{SparkContext, SparkConf} object flatMap{ def main(args: Array[String]) { val conf = new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("flatMap") //设定名称 val sc = new SparkContext(conf) //创建环境变量实例 var arr = sc.parallelize(Array(1,2,3,4,5)) //创建数据集 val result = arr.flatMap(x => List(x + 1)).collect() //进行数据集计算 result.foreach(println) //打印结果 } }
请读者自行验证打印结果,除此之外,请读者参考下一节的map方法,对它们的操作结果做一个比较。
3.3.10 以单个数据为目标进行操作的map方法
map方法可以对RDD中的数据集中的数据进行逐个操作,它与flatmap不同之处在于,flatmap是将数据集中的数据作为一个整体去处理,之后再对其中的数据做计算。而map方法直接对数据集中的数据做单独的处理。map方法应用程序如3-14所示。
代码位置://SRC//C03// testMap.scala
程序3-14 map方法
import org.apache.spark.{SparkContext, SparkConf} object testMap { def main(args: Array[String]) { val conf = new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("testMap") //设定名称 val sc = new SparkContext(conf) //创建环境变量实例 var arr = sc.parallelize(Array(1,2,3,4,5)) //创建数据集 val result = arr.map(x => List(x + 1)).collect() //进行单个数据计算 result.foreach(println) //打印结果 } }
请读者自行打印结果比较。
提示
RDD中有很多相似的方法和粗略的计算方法,这里需要读者更加细心地去挖掘。
3.3.11 分组数据的groupBy方法
groupBy方法是将传入的数据进行分组,其分组的依据是作为参数传入的计算方法。groupBy方法的程序代码如3-15所示。
代码位置://SRC//C03// groupBy.scala
程序3-15 groupBy方法
import org.apache.spark.{SparkContext, SparkConf} object groupBy{ def main(args: Array[String]) { val conf = new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("groupBy") //设定名称 val sc = new SparkContext(conf) //创建环境变量实例 var arr = sc.parallelize(Array(1,2,3,4,5)) //创建数据集 arr.groupBy(myFilter(_), 1) //设置第一个分组 arr.groupBy(myFilter2(_), 2) //设置第二个分组 } def myFilter(num: Int): Unit = { //自定义方法 num >=3 //条件 } def myFilter2(num: Int): Unit = { //自定义方法 num <3 //条件 } }
在程序3-15中,笔者采用了两个自定义的方法,即myFilter和myFilter2作为分组条件。然后将分组条件的方法作为一个整体传入groupBy中。
groupBy的第一个参数是传入的方法名,而第二个参数是分组的标签值。具体打印结果请读者自行完成。
3.3.12 生成键值对的keyBy方法
keyBy方法是为数据集中的每个个体数据增加一个key,从而可以与原来的个体数据形成键值对。keyBy方法程序代码如3-16所示。
代码位置://SRC//C03// keyBy.scala
程序3-16 keyBy方法
import org.apache.spark.{SparkContext, SparkConf} object keyBy{ def main(args: Array[String]) { val conf = new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("keyBy") //设定名称 val sc = new SparkContext(conf) //创建环境变量实例 //创建数据集 var str = sc.parallelize(Array("one","two","three","four","five")) val str2 = str.keyBy(word => word.size) //设置配置方法 str2.foreach(println) //打印结果 } }
最终打印结果如下:
(3,one)(3,two)(5,three)(4,four)(4,five)
这里可以很明显地看出,每个数据(单词)与自己的字符长度形成一个数字键值对。
3.3.13 同时对两个数据进行处理的reduce方法
reduce方法是RDD中一个较为重要的数据处理方法,与map方法不同之处在于,它在处理数据时需要两个参数。reduce方法演示如程序3-17所示。
代码位置://SRC//C03// testReduce.scala
程序3-17 reduce方法
import org.apache.spark.{SparkContext, SparkConf} object testReduce { def main(args: Array[String]) { val conf = new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("testReduce") //设定名称 val sc = new SparkContext(conf) //创建环境变量实例 var str = sc.parallelize(Array("one","two","three","four","five"))//创建 数据集 val result = str.reduce(_ + _) //进行数据拟合 result.foreach(print) //打印数据结果 } }
打印结果如下:
onetwothreefourfive
从结果上可以看到,reduce方法主要是对传入的数据进行合并处理。它的两个下划线分别代表不同的内容,第一个下划线代表数据集的第一个数据,而第二个下划线在第一次合并处理时代表空集,即以下方式进行:
null + one -> one + two -> onetwo + three -> onetwothree + four -> onetwothreefour + five
除此之外,reduce方法还可以传入一个已定义的方法作为数据处理方法,程序3-18中演示了一种寻找最长字符串的一段代码。
代码位置://SRC//C03// testReduce2.scala
程序3-18 寻找最长字符串
import org.apache.spark.{SparkContext, SparkConf} object testRDDMethod { def main(args: Array[String]) { val conf = new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("testReduce2") //设定名称 val sc = new SparkContext(conf) //创建环境变量实例 //创建数据集 var str = sc.parallelize(Array("one","two","three","four","five")) val result = str.reduce(myFun) //进行数据拟合 result.foreach(print) //打印结果 } def myFun(str1:String,str2:String):String = { //创建方法 var str = str1 //设置确定方法 if(str2.size >= str.size){ //比较长度 str = str2 //替换 } return str //返回最长的那个字符串 } }
3.3.14 对数据进行重新排序的sortBy方法
sortBy方法也是一个常用的排序方法,其主要功能是对已有的RDD重新排序,并将重新排序后的数据生成一个新的RDD,其源码如下:
sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.size)
从源码上可以看到,sortBy方法主要有3个参数,第一个为传入方法,用以计算排序的数据。第二个是指定排序的值按升序还是降序显示。第三个是分片的数量。程序3-19中演示了分别根据不同的数据排序方法对数据集进行排序的代码。
代码位置://SRC//C03// sortBy.scala
程序3-19 sortBy方法
import org.apache.spark.{SparkContext, SparkConf} object sortBy { def main(args: Array[String]) { val conf = new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("sortBy") //设定名称 val sc = new SparkContext(conf) //创建环境变量实例 //创建数据集 var str = sc.parallelize(Array((5,"b"),(6,"a"),(1,"f"),(3,"d"),(4,"c"),(2,"e"))) str = str.sortBy(word => word._1,true) //按第一个数据排序 val str2 = str.sortBy(word => word._2,true) //按第二个数据排序 str.foreach(print) //打印输出结果 str2.foreach(print) //打印输出结果 } }
从程序3-19可以看到,在程序的排序部分,分别使用了按元组第一个字符排序和按第二个字符排序的方法。这里需要说明的是,“._1”的格式是元组中数据符号的表示方法,意思是使用当前元组中第一个数据,同样的表示,“._2”是使用元组中第二个数据。
最终显示结果如下:
(1,f)(2,e)(3,d)(4,c)(5,b)(6,a) //第一个输出结果 (6,a)(5,b)(4,c)(3,d)(2,e)(1,f) //第二个输出结果
从结果可以很清楚地看到,第一个输出结果以数字为顺序进行排序,第二个输出结果以字母为顺序进行排序。
提示
请读者尝试更改第二个sortBy中第二个布尔参数进行排序。
3.3.15 合并压缩的zip方法
zip方法是常用的合并压缩算法,它可以将若干个RDD压缩成一个新的RDD,进而形成一系列的键值对存储形式的新RDD。具体程序见3-20。
代码位置://SRC//C03// testZip.scala
程序3-20 zip方法
import org.apache.spark.{SparkContext, SparkConf} object testZip{ def main(args: Array[String]) { val conf = new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("testZip") //设定名称 val sc = new SparkContext(conf) //创建环境变量实例 val arr1 = Array(1,2,3,4,5,6) //创建数据集1 val arr2 = Array("a","b","c","d","e","f") //创建数据集2 val arr3 = Array("g","h","i","j","k","l") //创建数据集3 val arr4 = arr1.zip(arr2).zip(arr3) //进行压缩算法 arr4.foreach(print) //打印结果 } }
最终结果如下:
((1,a),g)((2,b),h)((3,c),i)((4,d),j)((5,e),k)((6,f),l)
从结果可以看到,这里数据被压缩成一个双重的键值对形式的数据。