12.3 通过RDD分析各种类型的最喜爱电影TopN及性能优化技巧
通过RDD分析大数据电影点评系统各种类型的最喜爱电影TopN。本节分析最受男性喜爱的电影Top10和最受女性喜爱的电影Top10。
评级文件ratings.dat的格式描述如下。
1. UserID::MovieID::Rating::Timestamp 2. 用户ID、电影ID、评分数据、时间戳
用户文件users.dat的格式描述如下。
1. UserID::Gender::Age::Occupation::Zip-code 2. 用户ID、性别、年龄、职业、邮编代码
单单从评分数据ratings中无法计算出最受男性或者女性喜爱的电影Top10,因为该RDD中没有性别Gender信息,如果需要使用性别Gender信息进行性别Gender的分类,此时一定需要聚合。当然,我们力求聚合的使用是mapjoin(分布式计算的杀手是数据倾斜,Mapper端的Join是一定不会数据倾斜的),这里可否使用mapjoin呢?不可以,因为用户的数据非常多!所以,这里要使用正常的Join,此处的场景不会数据倾斜,因为用户一般都均匀地分布。
最受男性喜爱的电影Top10和最受女性喜爱的电影Top10分析需注意以下事项。
(1)因为要再次使用电影数据的RDD,所以复用了前面Cache的ratings数据。
(2)在根据性别过滤出数据后,关于TopN部分的代码直接复用前面的代码就行了。
(3)要进行Join,需要key-value。
(4)在进行Join的时候,通过take等方法注意Join后的数据格式(3319,((3319, 50, 4.5),F))。
(5)使用数据冗余来实现代码复用或者更高效地运行,这是企业级项目的一个非常重要的技巧!
大数据电影点评系统中,统计最受男性喜爱的电影Top10和最受女性喜爱的电影Top10,我们先分别过滤出男性、女性相关的数据,具体实现思路如下:
(1)对ratings中的(用户ID,电影ID,评分)元组进行map转换,格式化成Key-Value,即(用户ID,(用户ID,电影ID,评分))。
(2)对usersRDD中的每行数据按"::"分隔符分割,然后进行map转换,格式化成Key-Value,即(用户ID,性别)。
(3)将(用户ID,(用户ID,电影ID,评分))与(用户ID,性别)进行Join生成新的genderRatings RDD。格式为:(用户ID,((用户ID,电影ID,评分),性别)),性别,并且进行Cache缓存。
(4)对genderRatings RDD进行过滤转换,从元组(x._1用户ID,(x._2._1(用户ID,电影ID,评分),x._2._2性别))过滤出x._2._2性别等于男性的数据。然后进行map转换为x._2._1,即转换成(用户ID,电影ID,评分)格式,生成maleFilteredRatings。
(5)对genderRatings RDD进行过滤转换,从元组(x._1用户ID,(x._2._1(用户ID,电影ID,评分),x._2._2性别))过滤出x._2._2性别等于女性的数据。然后进行map转换为x._2._1,即转换成(用户ID,电影ID,评分)格式,生成femaleFilteredRatings。
从大数据电影点评系统中过滤男性、女性相关的数据的代码如下。
1. val male = "M" 2. val female = "F" 3. val genderRatings = ratings.map(x => (x._1, (x._1, x._2, x._3))).join( 4. usersRDD.map(_.split("::")).map(x => (x(0), x(1)))).cache() 5. genderRatings.take(2).foreach(println) 6. val maleFilteredRatings: RDD[(String, String, String)] = gender Ratings.filter(x => x._2._2.equals("M")).map(x => x._2._1) 7. val femaleFilteredRatings = genderRatings.filter(x => x._2._2.equals ("F")).map(x => x._2._1)
在IDEA中运行代码,打印出genderRatings的数据,取10个数据,格式为:(用户ID,((用户ID,电影ID,评分),性别)),结果如下。
1. (3319,((3319,32,5),F)) 2. (3319,((3319,50,4.5),F)) 3. (3319,((3319,163,4.5),F)) 4. (3319,((3319,180,5),F)) 5. (3319,((3319,296,5),F)) 6. (3319,((3319,318,5),F)) 7. (3319,((3319,405,4),F)) 8. (3319,((3319,914,4.5),F)) 9. (3319,((3319,1088,4),F)) 10. (3319,((3319,1136,5),F))
电影点评系统用户行为分析,统计所有电影中最受男性喜爱的电影Top10,具体实现思路如下:
(1)将性别为男的用户过滤以后的数据(用户ID,电影ID,评分)进行map转换,格式化成为Key-Value的方式,即(电影ID,(评分,1))。
(2)使用reduceByKey算子对Value值进行汇聚转换,对两个具有相同Key值,而Value不同的元组,如(电影ID,(x.评分,x.1)),(电影ID,(y.评分,y.1)),计算得出(x的评分+y的评分,x的计数+y的计数),转换以后Key是电影ID,Value是(总的评分,总的点评人数),格式化成为Key-Value,即(电影ID,(总评分,总点评人数))。
(3)reduceByKey算子执行完毕,接下来进行map转换操作,交换Key-Value值,并且计算出电影平均分=总评分/总点评人数,即将(电影ID,(总评分,总点评人数))转换成((总评分/总点评人数),电影ID),然后使用sortByKey(false)算子按电影平均分降序排列。
(4)再次进行Key和Value的交换,打印输出。使用map转换函数将((总评分/总点评人数),电影ID)进行交换,转换为(电影ID,(总评分/总点评人数)),再通过take(10)算子获取所有电影中最受男性喜爱的电影Top10,进行打印输出。
在所有电影中分析最受男性喜爱的电影Top10的代码如下。
1. println("所有电影中最受男性喜爱的电影Top10:") 2. maleFilteredRatings.map(x=>(x._2,(x._3.toDouble, 1)))//格式化成为Key-Value 3. .reduceByKey((x, y) => (x._1 + y._1,x._2 + y._2)) //对Value进行reduce操作,分别得出每部电影的总的评分和总的点评人数 4. .map(x => (x._2._1.toDouble / x._2._2, x._1)) //求出电影平均分 5. .sortByKey(false) //降序排列 6. .map(x => (x._2, x._1)) 7. .take(10) //取Top10 8. .foreach(println) //打印到控制台
在IDEA中运行代码,结果如下。
1. 所有电影中最受男性喜爱的电影Top10: 2. (855,5.0) 3. (6075,5.0) 4. (1166,5.0) 5. (3641,5.0) 6. (1045,5.0) 7. (4136,5.0) 8. (2538,5.0) 9. (7227,5.0) 10. (8484,5.0) 11. (5599,5.0)
同样地,在电影点评系统用户行为分析中,我们可以统计所有电影中最受女性喜爱的电影Top10,具体实现思路和最受男性喜爱的电影Top10类似,这里不再赘述。
从所有电影中分析最受女性喜爱的电影Top10的代码如下。
1. println("所有电影中最受女性喜爱的电影Top10:") 2. femaleFilteredRatings.map(x=>(x._2,(x._3.toDouble,1)))//格式化成为Key-Value 3. .reduceByKey((x, y) => (x._1 + y._1,x._2 + y._2)) //对Value进行reduce操作,分别得出每部电影的总的评分和总的点评人数 4. .map(x => (x._2._1.toDouble / x._2._2, x._1)) //求出电影平均分 5. .sortByKey(false) //降序排列 6. .map(x => (x._2, x._1)) 7. .take(10) //取Top10 8. .foreach(println) //打印到控制台
在IDEA中运行代码,结果如下。
1. 所有电影中最受女性喜爱的电影Top10: 2. [Stage 43:=================================> 7 + 1) / 8](789,5.0) 3. (855,5.0) 4. (32153,5.0) 5. (4763,5.0) 6. (26246,5.0) 7. (2332,5.0) 8. (503,5.0) 9. (4925,5.0) 10. (8767,5.0) 11. (44657,5.0)