轻松学大数据挖掘:算法、场景与数据产品
上QQ阅读APP看本书,新人免费读10天
设备和账号都新为新人

3.2.3 简单的案例

最经典的案例当属WordCount,可以看作与“Hello World”一样经典。如文档wordcount.txt的内容,按tab键分割,如下。

接下来需要统计每项技能出现的总次数,用MapReduce来实现。以下为Map的实现过程。

      public  static  class  dealMap  extends  Mapper<Object, Text, Text,
  IntWritable>{
            //输入数据格式:张三   python   scala   java   mapreduce
            public void map(Object key, Text value, Context context)
                    throws IOException, InterruptedException {
                //步骤1:每行读取,按tab键作为分割,切分数据成数组
                String[] data=value.toString().split("\u0009");
                if(data.length==5){
                    //姓名
                    String keyName = data[0];
                    //步骤2:解析数据
                    for(int i=1; i<data.length; i++){
                        String keyStill = data[i];
                        //步骤3:设置key和value传输到reduce端
                        context.write(new Text(keyStill), new IntWritable
  (1));
                    }
                }
              }
      }

并不是所有阶段都需要setup去初始化全局变量,没有需要就可以省略。而Reduce的内容如下。

      public static class dealReduce extends Reducer<Text, IntWritable,
  Text, IntWritable> {
            public void reduce(Text key, Iterable<IntWritable> values,
  Context context)
                    throws IOException, InterruptedException{
                String keyName = key.toString();
                int sum=0;
                for (IntWritable val : values) {
                    sum+=val.get();
                }
                context.write(new Text(keyName), new IntWritable(sum));
            }
      }

最后结合Map和Reduce的内容,还需要实现Run的驱动板块,代码如下。

      public  static  Boolean  run(String  input, String  ouput)  throws
  IOException {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "WordCountJob");
            job.setJarByClass(CaseOneWordCount.class);
            job.setMapperClass(dealMap.class);
            job.setReducerClass(dealReduce.class);
            job.setNumReduceTasks(1);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            //设置输入、输出文件路径,可以保证多文件
              Path output = new Path(ouput);
              FileInputFormat.setInputPaths(job, input);
              FileOutputFormat.setOutputPath(job, output);
              //设置每一次执行前先删除输出目录,防止报错
              output.getFileSystem(conf).delete(output, true);
              Boolean result=job.waitForCompletion(true);
              return result;
        }

最后补上main的执行类就算完成了,代码如下。

      public static void main(String[] args) throws Exception {
          //输入/输出源:1.本地目录、2.集群HDFS目录、3.集群Hbase……等
          System.out.println(run("wordcount.txt", "result1"));
      }

具体的数据和源码可以自行下载(地址:http://pan.baidu.com/s/1o8PYFaE),更多的案例会在后续分享,也包括涉及大数据挖掘的实践案例。