实战Hadoop
上QQ阅读APP看本书,新人免费读10天
设备和账号都新为新人

3.3 MapReduce,你够了解吗

3.3.1 没有map、reduce的MapReduce

MapReduce框架在幕后默默地完成了很多事情,如果不重写map和reduce方法,它会不会就此罢工呢?这就来测试一下。下面设计一个“最懒”的MapReduce——LazyMapReduce,该类只对任务进行必要的初始化及输入/输出路径的设置,其余的参数(如输入/输出类型、map方法、reduce方法等)均保持默认状态。LazyMapReduce的实现代码如下:

public class LazyMapReduce {

public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage:wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "LazyMapReduce");
FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 :1); } }

将图3-3左边所示的文件作为输入数据,执行LazyMapReduce后得到的结果刚好为图3-3中分割后所示的数据。可以看出在默认情况下,MapReduce原封不动地将输入<key, value>写到输出。下面介绍MapReduce的部分参数及默认设置,只有了解了这些内容,才能在编写自己的MapReduce程序时,知道哪些类需要自己实现,哪些可以直接借用默认类,真正地做到游刃有余。

(1)InputFormat类。该类的作用是将输入的数据分割成一个个的split,并将split进一步拆分成<key,value>对作为map函数的输入。可以通过job.setInputFormatClass()方法进行设置,默认为使用TextInputFormat类处理输入(该类只处理文本文件)。TextInputFormat将文本文件的多行分割成splits,并通过LineRecorderReader将其中的每一行解析成<key, value>对,key值为对应行在文件中的偏移量,value为行的内容。

(2)Mapper类。实现map函数,根据输入的<key, value>对生成中间结果。可以通过job.setMapperClass()方法进行设置,默认使用Mapper类(Hadoop 0.20.*使用Mapper代替IdentityMapper作为默认值),Mapper将输入的<key, value>对原封不动地作为中间结果输出。

(3)Combiner类。实现combine函数,合并中间结果中具有相同key值的键值对。可以通过job.setCombinerClass()方法进行设置,默认为null,即不合并中间结果。

(4)Partitioner类。实现getPartition函数,用于在Shuffle过程按照key值将中间数据分成R份,每份由一个Reducer负责。可以通过job.setPartitionerClass()方法进行设置,默认使用HashPartitioner类,HashPartitioner使用哈希函数完成Shuffle过程。

(5)Reducer类。实现reduce函数,将中间结果合并,得到最终结果。可以通过job.setReducerClass()方法进行设置,默认使用Reducer类(Hadoop 0.20.*使用Reducer代替IdentityReducer作为默认值),Reducer将中间结果直接输出为最终结果。

(6)OutputFormat类。该类负责输出最终结果。可以通过job.setOutputFormatClass()方法进行设置,默认使用TextOutputFormat类,TextOutputFormat将最终结果写成纯文本文件,每行一个<key, value>对,key和value之间用制表符分隔开。

(7)除了上述几个类之外,job.setOutputKeyClass()方法和job.setOutput ValueClass()可以用来设置最终结果(即Reducer的输出结果)的key和value的类型,默认情况下分别为LongWritable和Text。job.setMapOutput KeyClass()方法和job.setMapOutputValueClass()则可以用来设置中间结果(即Mapper的输出结果)的key和value的类型,默认情况下与最终结果的类型保持一致。

综上所述,下面的这段代码与LazyMapReduce完成的工作完全一样。

public static void main(String[] args) throws Exception{
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
        System.err.println("Usage:wordcount <in> <out>");
        System.exit(2);
    }
    Job job = new Job(conf, "LazyMapReduce");

job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(Mapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setPartitionerClass(HashPartitioner.class); job.setReducerClass(Reducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 :1); }

3.3.2 多少个Reducers最佳

从前面的章节中已经知道,MapReduce框架会将输入文件分成多个splits,并为每一个split创建一个Mapper,所以Mappers的数目直接由splits的数目决定。而Reducers的数目可以通过job.setNumReduceTasks()函数来进行设置,默认情况下只有一个Reducer。在真正的集群环境下,如果保持默认,那么所有的中间数据都会发送给这个唯一的Reducer,导致任务的执行变得非常缓慢。究竟设多少个Reducers才算合适呢?为了解决这个问题,首先来了解一下slots的概念。

slots有点类似于一个资源池,每个任务(包括Map任务和Reduce任务)执行时都必须获得一个slot才能继续,否则就只能等待。当一个任务完成后,该任务就归还slot,这个过程有点类似于释放资源到资源池中。显然,每一个获得资源(这里特指slots)的任务都可以立即执行,无需等待。另一方面,MapReduce的任务是由TaskTracker节点负责执行的,所以slots可进一步理解为TaskTrackers能够并发地执行多少个任务。slots分为mapper slots和reducer slots两种,分别对应最大可并行执行的mapper数和reducer数。用户可以通过修改mapred-site.xml配置文件中的mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum来设置slots的值,默认值为2。

集群中可用reducer slots的总数等于集群中的总节点数乘以每个节点有多少个slots,后者就是由mapred.tasktracker.reduce.tasks.maximum的值决定的。Reducers数目的最佳值和reducer slots的总数有关,通常情况下,让Reducers的数目略小于reducer slots的总数,这样做的目的是:首先所有的Reducers可以并行执行,减少排队时间;其次对于未执行Reducer的slots,可以在其他Reducer发生故障时,立即分配给新创建的Reducer,不会明显地加长任务的总执行时间Tom White著,周敏、曾大聃、周傲英译.hadoop 权威指南(中文版)[M].北京:清华大学出版社,2010

在设置Reducers数目时,还必须考虑Mappers的数目,如果出现Reducers >= Mappers,似乎就不符合MapReduce的思想了。如果真的这么设了,会出现什么情况呢?那就准备准备,一起来测试一下吧!首先准备两个文件,内容分别为“Hello World”和“Hello Hadoop”(文件内容不包含双引号),并上传到HDFS中。在默认情况下,MapReduce会将每个文件作为一个split,所以Mapper数一定是2。然后在WordCount的代码中添加一句job.setNumReduceTasks(4),设置Reducers数为4。运行结果如图3-7所示。

图3-7 Reducers>Mappers时的运行图

从图3-7看出,四个Reducers都被执行了。通过查看HDFS,找到生成的四个输出文件part-r-00000、part-r-00001、part-r-00002和part-r-00003,如图3-8显示了四个文件的内容。从图中可以看出,只有part-r-00001和partr-00002两个文件内有内容,这是因为Mapper的中间结果通过Shuffle过程传递给Reducer,其中<hello, 2>和<world, 1>被分给第二个Reducer运行,<hadoop, 1>则被分给第三个Reducer运行,其他两个Reducers没有分配到任何输入数据。这样一来,没有输入数据的Reducers还是要执行,包括创建、生成输出文件等。显然这不会给性能带来任何优化,反而会因为任务创建和文件操作等消耗不必要的资源。所以记住,不管怎么样,Reducers < Mappers是必不可少的前提。

图3-8 Reducers>Mappers时输出文件的内容

当然slots的值也不是任意设置的,它和机器的内存及处理器核数相关。每执行一个任务,都要消耗一定的内存,如果不加以限制,就算分配了足够的slots,恐怕也无法正常执行任务。其次,如果想要每个处理器核都执行一个单独的任务,那么设置slots时所要考虑的问题又会多出一个。当身处真实的集群环境时,会发现所要考虑的问题会变得越来越多,本节就不再赘述了。