Hadoop:Data Processing and Modelling
上QQ阅读APP看书,第一时间看更新

Chapter 4. Developing MapReduce Programs

Now that we have explored the technology of MapReduce, we will spend this chapter looking at how to put it to use. In particular, we will take a more substantial dataset and look at ways to approach its analysis by using the tools provided by MapReduce.

In this chapter we will cover the following topics:

  • Hadoop Streaming and its uses
  • The UFO sighting dataset
  • Using Streaming as a development/debugging tool
  • Using multiple mappers in a single job
  • Efficiently sharing utility files and data across the cluster
  • Reporting job and task status and log information useful for debugging

Throughout this chapter, the goal is to introduce both concrete tools and ideas about how to approach the analysis of a new data set. We shall start by looking at how to use scripting programming languages to aid MapReduce prototyping and initial analysis. Though it may seem strange to learn the Java API in the previous chapter and immediately move to different languages, our goal here is to provide you with an awareness of different ways to approach the problems you face. Just as many jobs make little sense being implemented in anything but the Java API, there are other situations where using another approach is best suited. Consider these techniques as new additions to your tool belt and with that experience you will know more easily which is the best fit for a given scenario.

Using languages other than Java with Hadoop

We have mentioned previously that MapReduce programs don't have to be written in Java. Most programs are written in Java, but there are several reasons why you may want or need to write your map and reduce tasks in another language. Perhaps you have existing code to leverage or need to use third-party binaries—the reasons are varied and valid.

Hadoop provides a number of mechanisms to aid non-Java development, primary amongst these are Hadoop Pipes that provides a native C++ interface to Hadoop and Hadoop Streaming that allows any program that uses standard input and output to be used for map and reduce tasks. We will use Hadoop Streaming heavily in this chapter.

How Hadoop Streaming works

With the MapReduce Java API, both map and reduce tasks provide implementations for methods that contain the task functionality. These methods receive the input to the task as method arguments and then output results via the Context object. This is a clear and type-safe interface but is by definition Java specific.

Hadoop Streaming takes a different approach. With Streaming, you write a map task that reads its input from standard input, one line at a time, and gives the output of its results to standard output. The reduce task then does the same, again using only standard input and output for its data flow.

Any program that reads and writes from standard input and output can be used in Streaming, such as compiled binaries, Unixshell scripts, or programs written in a dynamic language such as Ruby or Python.

Why to use Hadoop Streaming

The biggest advantage to Streaming is that it can allow you to try ideas and iterate on them more quickly than using Java. Instead of a compile/jar/submit cycle, you just write the scripts and pass them as arguments to the Streaming jar file. Especially when doing initial analysis on a new dataset or trying out new ideas, this can significantly speed up development.

The classic debate regarding dynamic versus static languages balances the benefits of swift development against runtime performance and type checking. These dynamic downsides also apply when using Streaming. Consequently, we favor use of Streaming for up-front analysis and Java for the implementation of jobs that will be executed on the production cluster.

We will use Ruby for Streaming examples in this chapter, but that is a personal preference. If you prefer shell scripting or another language, such as Python, then take the opportunity to convert the scripts used here into the language of your choice.

Time for action – implementing WordCount using Streaming

Let's flog the dead horse of WordCount one more time and implement it using Streaming by performing the following steps:

  1. Save the following file to wcmapper.rb:
    #/bin/env ruby
    
    while line = gets
        words = line.split("\t")
        words.each{ |word| puts word.strip+"\t1"}}
    end
  2. Make the file executable by executing the following command:
    $ chmod +x wcmapper.rb
    
  3. Save the following file to wcreducer.rb:
    #!/usr/bin/env ruby
    
    current = nil
    count = 0
    
    while line = gets
        word, counter = line.split("\t")
    
        if word == current
            count = count+1
        else
            puts current+"\t"+count.to_s if current
            current = word
            count = 1
        end
    end
    puts current+"\t"+count.to_s
  4. Make the file executable by executing the following command:
    $ chmod +x wcreducer.rb
    
  5. Execute the scripts as a Streaming job using the datafile from the previous chapter:
    $ hadoop jar hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar 
    -file wcmapper.rb -mapper wcmapper.rb -file wcreducer.rb 
    -reducer wcreducer.rb -input test.txt -output output
    packageJobJar: [wcmapper.rb, wcreducer.rb, /tmp/hadoop-hadoop/hadoop-unjar1531650352198893161/] [] /tmp/streamjob937274081293220534.jar tmpDir=null
    12/02/05 12:43:53 INFO mapred.FileInputFormat: Total input paths to process : 1
    12/02/05 12:43:53 INFO streaming.StreamJob: getLocalDirs(): [/var/hadoop/mapred/local]
    12/02/05 12:43:53 INFO streaming.StreamJob: Running job: job_201202051234_0005
    
    12/02/05 12:44:01 INFO streaming.StreamJob: map 100% reduce 0%
    12/02/05 12:44:13 INFO streaming.StreamJob: map 100% reduce 100%
    12/02/05 12:44:16 INFO streaming.StreamJob: Job complete: job_201202051234_0005
    12/02/05 12:44:16 INFO streaming.StreamJob: Output: wcoutput
    
  6. Check the result file:
    $ hadoop fs -cat output/part-00000
    

What just happened?

Ignore the specifics of Ruby. If you don't know the language, it isn't important here.

Firstly, we created the script that will be our mapper. It uses the gets function to read a line from standard input, splits this into words, and uses the puts function to write the word and the value 1 to the standard output. We then made the file executable.

Our reducer is a little more complex for reasons we will describe in the next section. However, it performs the job we would expect, it counts the number of occurrences for each word, reads from standard input, and gives the output as the final value to standard output. Again we made sure to make the file executable.

Note that in both cases we are implicitly using Hadoop input and output formats discussed in the earlier chapters. It is the TextInputFormat property that processes the source file and provides each line one at a time to the map script. Conversely, the TextOutputFormat property will ensure that the output of reduce tasks is also correctly written as textual data. We can of course modify these if required.

Next, we submitted the Streaming job to Hadoop via the rather cumbersome command line shown in the previous section. The reason for each file to be specified twice is that any file not available on each node must be packaged up by Hadoop and shipped across the cluster, which requires it to be specified by the -file option. Then, we also need to tell Hadoop which script performs the mapper and reducer roles.

Finally, we looked at the output of the job, which should be identical to the previous Java-based WordCount implementations

Differences in jobs when using Streaming

The Streaming WordCount mapper looks a lot simpler than the Java version, but the reducer appears to have more logic. Why? The reason is that the implied contract between Hadoop and our tasks changes when we use Streaming.

In Java we knew that our map() method would be invoked once for each input key/value pair and our reduce() method would be invoked for each key and its set of values.

With Streaming we don't have the concept of the map or reduce methods anymore, instead we have written scripts that process streams of received data. This changes how we need to write our reducer. In Java the grouping of values to each key was performed by Hadoop; each invocation of the reduce method would receive a single key and all its values. In Streaming, each instance of the reduce task is given the individual ungathered values one at a time.

Hadoop Streaming does sort the keys, for example, if a mapper emitted the following data:

First     1
Word      1
Word      1
A         1
First     1

The Streaming reducer would receive this data in the following order:

A         1
First     1
First     1
Word      1
Word      1

Hadoop still collects the values for each key and ensures that each key is passed only to a single reducer. In other words, a reducer gets all the values for a number of keys and they are grouped together; however, they are not packaged into individual executions of the reducer, that is, one per key, as with the Java API.

This should explain the mechanism used in the Ruby reducer; it first sets empty default values for the current word; then after reading each line it determines if this is another value for the current key, and if so, increments the count. If not, then there will be no more values for the previous key and its final output is sent to standard output and the counting begins again for the new word.

After reading so much in the earlier chapters about how great it is for Hadoop to do so much for us, this may seem a lot more complex, but after you write a few Streaming reducers it's actually not as bad as it may first appear. Also remember that Hadoop does still manage the assignment of splits to individual map tasks and the necessary coordination that sends the values for a given key to the same reducer. This behavior can be modified through configuration settings to change the number of mappers and reducers just as with the Java API.

Analyzing a large dataset

Armed with our abilities to write MapReduce jobs in both Java and Streaming, we'll now explore a more significant dataset than any we've looked at before. In the following section, we will attempt to show how to approach such analysis and the sorts of questions Hadoop allows you to ask of a large dataset.

Getting the UFO sighting dataset

We will use a public domain dataset of over 60,000 UFO sightings. This is hosted by InfoChimps at http://www.infochimps.com/datasets/60000-documented-ufo-sightings-with-text-descriptions-and-metada.

You will need to register for a free InfoChimps account to download a copy of the data.

The data comprises a series of UFO sighting records with the following fields:

  1. Sighting date: This field gives the date when the UFO sighting occurred.
  2. Recorded date: This field gives the date when the sighting was reported, often different to the sighting date.
  3. Location: This field gives the location where the sighting occurred.
  4. Shape: This field gives a brief summary of the shape of the UFO, for example, diamond, lights, cylinder.
  5. Duration: This field gives the duration of how long the sighting lasted.
  6. Description: This field gives free text details of the sighting.

Once downloaded, you will find the data in a few formats. We will be using the .tsv (tab-separated value) version.

Getting a feel for the dataset

When faced with a new dataset it is often difficult to get a feel for the nature, breadth, and quality of the data involved. There are several questions, the answers to which will affect how you approach the follow-on analysis, in particular:

  • How big is the dataset?
  • How complete are the records?
  • How well do the records match the expected format?

The first is a simple question of scale; are we talking hundreds, thousands, millions, or more records? The second question asks how complete the records are. If you expect each record to have 10 fields (if this is structured or semi-structured data), how many have key fields populated with data? The last question expands on this point, how well do the records match your expectations of format and representation?

Time for action – summarizing the UFO data

Now we have the data, let's get an initial summarization of its size and how many records may be incomplete:

  1. With the UFO tab-separated value (TSV) file on HDFS saved as ufo.tsv, save the following file to summarymapper.rb:
    #!/usr/bin/env ruby
    
    while line = gets
        puts "total\t1"
        parts = line.split("\t")
        puts "badline\t1" if parts.size != 6
        puts "sighted\t1" if !parts[0].empty?
        puts "recorded\t1" if !parts[1].empty?
        puts "location\t1" if !parts[2].empty?
        puts "shape\t1" if !parts[3].empty?
        puts "duration\t1" if !parts[4].empty?
        puts "description\t1" if !parts[5].empty?
    end
  2. Make the file executable by executing the following command:
    $ chmod +x summarymapper.rb
    
  3. Execute the job as follows by using Streaming:
    $ hadoop jar hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar 
    -file summarymapper.rb -mapper summarymapper.rb -file wcreducer.rb -reducer wcreducer.rb -input ufo.tsv -output ufosummary
    
  4. Retrieve the summary data:
    $ hadoop fs -cat ufosummary/part-0000
    

What just happened?

Remember that our UFO sightings should have six fields as described previously. They are listed as follows:

  • The date of the sighting
  • The date the sighting was reported
  • The location of the sighting
  • The shape of the object
  • The duration of the sighting
  • A free text description of the event

The mapper examines the file and counts the total number of records in addition to identifying potentially incomplete records.

We produce the overall count by simply recording how many distinct records are encountered while processing the file. We identify potentially incomplete records by flagging those that either do not contain exactly six fields or have at least one field that has a null value.

Therefore, the implementation of the mapper reads each line and does three things as it proceeds through the file:

  • It gives the output of a token to be incremented in the total number of records processed
  • It splits the record on tab boundaries and records any occurrence of lines which do not result in six fields' values
  • For each of the six expected fields it reports when the values present are other than an empty string, that is, there is data in the field, though this doesn't actually say anything about the quality of that data

We wrote this mapper intentionally to produce the output of the form (token, count). Doing this allowed us to use our existing WordCount reducer from our earlier implementations as the reducer for this job. There are certainly more efficient implementations, but as this job is unlikely to be frequently executed, the convenience is worth it.

At the time of writing, the result of this job was as follows:

badline324
description61372
duration58961
location61377
recorded61377
shape58855
sighted61377
total61377

We see from these figures that we have 61,300records. All of these provide values for the sighted date, reported date, and location fields. Around 58,000-59,000 records have values for shape and duration, and almost all have a description.

When split on tab characters there were 372 lines found to not have exactly six fields. However, since only five records had no value for description, this suggests that the bad records typically have too many tabs as opposed to too few. We could of course alter our mapper to gather detailed information on this fact. This is likely due to tabs being used in the free text description, so for now we will do our analysis expecting most records to have correctly placed values for all the six fields, but not make any assumptions regarding further tabs in each record.

Examining UFO shapes

Out of all the fields in these reports, it was shape that immediately interested us most, as it could offer some interesting ways of grouping the data depending on what sort of information we have in that field.

Time for action – summarizing the shape data

Just as we provided a summarization for the overall UFO data set earlier, let's now do a more focused summarization on the data provided for UFO shapes:

  1. Save the following to shapemapper.rb:
    #!/usr/bin/env ruby
    
    while line = gets  
        parts = line.split("\t")    
        if parts.size == 6        
            shape = parts[3].strip     
            puts shape+"\t1" if !shape.empty?   
        end     
    end     
  2. Make the file executable:
    $ chmod +x shapemapper.rb
    
  3. Execute the job once again using the WordCount reducer:
    $ hadoop jar hadoop/contrib/streaming/hadoop-streaming-1.0.3.jarr --file shapemapper.rb -mapper shapemapper.rb -file wcreducer.rb -reducer wcreducer.rb -input ufo.tsv -output shapes
    
  4. Retrieve the shape info:
    $ hadoop fs -cat shapes/part-00000 
    

What just happened?

Our mapper here is pretty simple. It breaks each record into its constituent fields, discards any without exactly six fields, and gives a counter as the output for any non-empty shape value.

For our purposes here, we are happy to ignore any records that don't precisely match the specification we expect. Perhaps one record is the single UFO sighting that will prove it once and for all, but even so it wouldn't likely make much difference to our analysis. Think about the potential value of individual records before deciding to so easily discard some. If you are working primarily on large aggregations where you care mostly about trends, individual records likely don't matter. But in cases where single individual values could materially affect the analysis or must be accounted for, an approach of trying to parse and recover more conservatively rather than discard may be best. We'll talk more about this trade-off in Chapter 6, When Things Break.

After the usual routine of making the mapper executable and running the job we produced, data showing 29 different UFO shapes were reported. Here's some sample output tabulated in compact form for space reasons:

changed1 changing1533 chevron758 cigar1774
circle5250 cone265 crescent2 cross177
cylinder981 delta8 diamond909 disk4798
dome1 egg661 fireball3437 flare1
flash988 formation1775 hexagon1 light12140
other4574 oval2859 pyramid1 rectangle957
round2 sphere3614 teardrop592 triangle6036
unknown4459

As we can see, there is a wide variance in sighting frequency. Some such as pyramid occur only once, while light comprises more than a fifth of all reported shapes. Considering many UFO sightings are at night, it could be argued that a description of light is not terribly useful or specific and when combined with the values for other and unknown we see that around 21000 of our 58000 reported shapes may not actually be of any use. Since we are not about to run out and do additional research, this doesn't matter very much, but what's important is to start thinking of your data in these terms. Even these types of summary analysis can start giving an insight into the nature of the data and indicate what quality of analysis may be possible. In the case of reported shapes, for example, we have already discovered that out of our 61000 sightings only 58000 reported the shape and of these 21000 are of dubious value. We have already determined that our 61000 sample set only provides 37000 shape reports that we may be able to work with. If your analysis is predicated on a minimum number of samples, always be sure to do this sort of summarization up-front to determine if the data set will actually meet your needs.

Time for action – correlating of sighting duration to UFO shape

Let's do a little more detailed analysis in regards to this shape data. We wondered if there was any correlation between the duration of a sighting to the reported shape. Perhaps cigar-shaped UFOs hang around longer than the rest or formations always appear for the exact amount of time.

  1. Save the following to shapetimemapper.rb:
    #!/usr/bin/env ruby
    
    pattern = Regexp.new /\d* ?((min)|(sec))/
    
    while line = gets
    parts = line.split("\t")
    if parts.size == 6
    shape = parts[3].strip
    duration = parts[4].strip.downcase
    if !shape.empty? && !duration.empty?
    match = pattern.match(duration)
    time = /\d*/.match(match[0])[0]
    unit = match[1]
    time = Integer(time)
    time = time * 60 if unit == "min"
    puts shape+"\t"+time.to_s
    end
    end
    end
  2. Make the file executable by executing the following command:
    $ chmod +x shapetimemapper.rb
    
  3. Save the following to shapetimereducer.rb:
    #!/usr/bin/env ruby
    
    current = nil
    min = 0
    max = 0
    mean = 0
    total = 0
    count = 0
    
    while line = gets
    word, time = line.split("\t")
    time = Integer(time)
    
    if word == current
    count = count+1
    total = total+time
    min = time if time < min
    max = time if time > max
    else
    puts current+"\t"+min.to_s+" "+max.to_s+" "+(total/count).to_s if current
    current = word
    count = 1
    total = time
    min = time
    max = time
    end
    end
    puts current+"\t"+min.to_s+" "+max.to_s+" "+(total/count).to_s
  4. Make the file executable by executing the following command:
    $ chmod +x shapetimereducer.rb
    
  5. Run the job:
    $ hadoop jar hadoop/contrib/streaminghHadoop-streaming-1.0.3.jar -file shapetimemapper.rb -mapper shapetimemapper.rb -file shapetimereducer.rb -reducer shapetimereducer.rb -input ufo.tsv -output shapetime
    
  6. Retrieve the results:
    $ hadoop fs -cat shapetime/part-00000
    

What just happened?

Our mapper here is a little more involved than previous examples due to the nature of the duration field. Taking a quick look at some sample records, we found values as follows:

15 seconds
2 minutes
2 min
2minutes
5-10 seconds

In other words, there was a mixture of range and absolute values, different formatting and inconsistent terms for time units. Again for simplicity we decided on a limited interpretation of the data; we will take the absolute value if present, and the upper part of a range if not. We would assume that the strings min or sec would be present for the time units and would convert all timings into seconds. With some regular expression magic, we unpack the duration field into these parts and do the conversion. Note again that we simply discard any record that does not work as we expect, which may not always be appropriate.

The reducer follows the same pattern as our earlier example, starting with a default key and reading values until a new one is encountered. In this case, we want to capture the minimum, maximum, and mean for each shape, so use numerous variables to track the needed data.

Remember that Streaming reducers need to handle a series of values grouped into their associated keys and must identify when a new line has a changed key, and hence indicates the last value for the previous key that has been processed. In contrast, a Java reducer would be simpler as it only deals with the values for a single key in each execution.

After making both files executable we run the job and get the following results, where we removed any shape with less than 10 sightings and again made the output more compact for space reasons. The numbers for each shape are the minimum value, the maximum value, and mean respectively:

changing0 5400 670 chevron0 3600 333
cigar0 5400 370 circle0 7200 423
cone0 4500 498 cross2 3600 460
cylinder0 5760 380 diamond0 7800 519
disk0 5400 449 egg0 5400 383
fireball0 5400 236 flash0 7200 303
formation0 5400 434 light0 9000 462
other0 5400 418 oval0 5400 405
rectangle0 4200 352 sphere0 14400 396
teardrop0 2700 335 triangle0 18000 375
unknown0 6000 470

It is surprising to see the relatively narrow variance in the mean sighting duration across all shape types; most have the mean value between 350 and 430 seconds. Interestingly, we also see that the shortest mean duration is for fireballs and the maximum for changeable objects, both of which make some degree of intuitive sense. A fireball by definition wouldn't be a long-lasting phenomena and a changeable object would need a lengthy duration for its changes to be noticed.

Using Streaming scripts outside Hadoop

This last example with its more involved mapper and reducer is a good illustration of how Streaming can help MapReduce development in another way; you can execute the scripts outside of Hadoop.

It's generally good practice during MapReduce development to have a sample of the production data against which to test your code. But when this is on HDFS and you are writing Java map and reduce tasks, it can be difficult to debug problems or refine complex logic. With map and reduce tasks that read input from the command line, you can directly run them against some data to get quick feedback on the result. If you have a development environment that provides Hadoop integration or are using Hadoop in standalone mode, the problems are minimized; just remember that Streaming does give you this ability to try the scripts outside of Hadoop; it may be useful some day.

While developing these scripts the author noticed that the last set of records in his UFO datafile had data in a better structured manner than those at the start of the file. Therefore, to do a quick test on the mapper all that was required was:

$ tail ufo.tsv | shapetimemapper.rb

This principle can be applied to the full workflow to exercise both the map and reduce script.

Time for action – performing the shape/time analysis from the command line

It may not be immediately obvious how to do this sort of local command-line analysis, so let's look at an example.

With the UFO datafile on the local filesystem, execute the following command:

$ cat ufo.tsv | shapetimemapper.rb | sort| shapetimereducer.rb

What just happened?

With a single Unixcommand line, we produced output identical to our previous full MapReduce job. If you look at what the command line does, this makes sense.

Firstly, the input file is sent—a line at a time—to the mapper. The output of this is passed through the Unix sort utility and this sorted output is passed a line at a time to the reducer. This is of course a very simplified representation of our general MapReduce job workflow.

Then the obvious question is why should we bother with Hadoop if we can do equivalent analysis at the command line. The answer of course is our old friend, scale. This simple approach works fine for a file such as the UFO sightings, which though non-trivial, is only 71MB in size. To put this into context we could hold thousands of copies of this dataset on a single modern disk drive.

So what if the dataset was 71GB in size instead, or even 71TB? In the latter case, at least we would have to spread the data across multiple hosts, and then decide how to split the data, combine partial answers, and deal with the inevitable failures along the way. In other words,we would need something like Hadoop.

However, don't discount the use of command-line tools like this, such approaches should be well used during MapReduce development.

Java shape and location analysis

Let's return to the Java MapReduce API and consider some analysis of the shape and location data within the reports.

However, before we start writing code, let's think about how we've been approaching the per-field analysis of this dataset. The previous mappers have had a common pattern:

  • Discard records determined to be corrupt
  • Process valid records to extract the field of interest
  • Output a representation of the data we care about for the record

Now if we were to write Java mappers to analyze location and then perhaps the sighting and reported time columns, we would follow a similar pattern. So can we avoid any of the consequent code duplication?

The answer is yes, through the use of org.apache.hadoop.mapred.lib.ChainMapper. This class provides a means by which multiple mappers are executed in sequence and it is the output of the final mapper that is passed to the reducer. ChainMapper is applicable not just for this type of data clean-up; when analyzing particular jobs, it is not an uncommon pattern that is useful to perform multiple map-type tasks before applying a reducer.

An example of this approach would be to write a validation mapper that could be used by all future field analysis jobs. This mapper would discard lines deemed corrupt, passing only valid lines to the actual business logic mapper that can now be focused on analyzing data instead of worrying about coarse-level validation.

An alternative approach here would be to do the validation within a custom InputFormat class that discards non-valid records; which approach makes the most sense will depend on your particular situation.

Each mapper in the chain is executed within a single JVM so there is no need to worry about the use of multiple mappers increasing our filesystem I/O load.

Time for action – using ChainMapper for field validation/analysis

Let's use this principle and employ the ChainMapper class to help us provide some record validation within our job:

  1. Create the following class as UFORecordValidationMapper.java:
    import java.io.IOException;
    
    import org.apache.hadoop.io.* ;
    import org.apache.hadoop.mapred.* ;
    import org.apache.hadoop.mapred.lib.* ;
    
    public class UFORecordValidationMapper extends MapReduceBase
    implements Mapper<LongWritable, Text, LongWritable, Text>
    {
    
        public void map(LongWritable key, Text value,
            OutputCollector<LongWritable, Text> output,
            Reporter reporter) throws IOException
    {
    String line = value.toString();
            if (validate(line))
                output.collect(key, value);
        }
    
            private boolean validate(String str)
            {
                String[] parts = str.split("\t") ;
    
                if (parts.length != 6)
                return false ;
    
                return true ;
            }
        }
  2. Create the following as UFOLocation.java:
    import java.io.IOException;
    import java.util.Iterator ;
    import java.util.regex.* ;
    
    import org.apache.hadoop.conf.* ;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.* ;
    import org.apache.hadoop.mapred.* ;
    import org.apache.hadoop.mapred.lib.* ;
    
    public class UFOLocation
    {
    
        public static class MapClass extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, LongWritable>
    {
    
    private final static LongWritable one = new LongWritable(1);
    private static Pattern locationPattern = Pattern.compile(
    "[a-zA-Z]{2}[^a-zA-Z]*$") ;
    
    public void map(LongWritable key, Text value,
    OutputCollector<Text, LongWritable> output,
    Reporter reporter) throws IOException
    {
    String line = value.toString();
            String[] fields = line.split("\t") ;
            String location = fields[2].trim() ;
            if (location.length() >= 2)
            {
    
                Matcher matcher = locationPattern.matcher(location) ;
                if (matcher.find() )
                {
                    int start = matcher.start() ;
                    String state = location.substring(start,start+2);
    
                    output.collect(new Text(state.toUpperCase()), 
                           One);
                }
            }
        }
    }
    
    public static void main(String[] args) throws Exception
    {
        Configuration config = new Configuration() ;
    JobConf conf = new JobConf(config, UFOLocation.class);
    conf.setJobName("UFOLocation");
    
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(LongWritable.class);
    
    JobConf mapconf1 = new JobConf(false) ;
    ChainMapper.addMapper( conf, UFORecordValidationMapper.class,                  
    LongWritable.class, Text.class, LongWritable.class, 
    Text.class, true, mapconf1) ;
    
    JobConf mapconf2 = new JobConf(false) ;
    ChainMapper.addMapper( conf, MapClass.class, 
    LongWritable.class, Text.class, 
    Text.class, LongWritable.class, true, mapconf2) ;
    conf.setMapperClass(ChainMapper.class);
    conf.setCombinerClass(LongSumReducer.class);
    conf.setReducerClass(LongSumReducer.class);
    
    FileInputFormat.setInputPaths(conf,args[0]) ;
    FileOutputFormat.setOutputPath(conf, new Path(args[1])) ;
    
    JobClient.runJob(conf);
    }
    }
  3. Compile both files:
    $ javac UFORecordValidationMapper.java UFOLocation.java
    
  4. Jar up the class files and submit the job to Hadoop:
    $ Hadoop jar ufo.jar UFOLocation ufo.tsv output
    
  5. Copy the output file to the local filesystem and examine it:
    $ Hadoop fs -get output/part-00000 locations.txt
    $ more locations.txt

What just happened?

There's quite a bit happening here, so let's look at it one piece at a time.

The first mapper is our simple validation mapper. The class follows the same interface as the standard MapReduce API and the map method simply returns the result of a utility validation method. We split this out into a separate method to highlight the functionality of the mapper, but the checks could easily have been within the main map method itself. For simplicity, we keep to our previous validation strategy of looking for the number of fields and discarding lines that don't break into exactly six tab-delimited fields.

Note that the ChainMapper class has unfortunately been one of the last components to be migrated to the context object API and as of Hadoop 1.0, it can only be used with the older API. It remains a valid concept and useful tool but until Hadoop 2.0, where it will finally be migrated into the org.apache.hadoop.mapreduce.lib.chain package, its current use requires the older approach.

The other file contains another mapper implementation and an updated driver in the main method. The mapper looks for a two-letter sequence at the end of the location field in a UFO sighting report. From some manual examination of data, it is obvious that most location fields are of the form city, state, where the standard two-character abbreviation is used for the state.

Some records, however, add trailing parenthesis, periods, or other punctuation. Some others are simply not in this format. For our purposes, we are happy to discard those records and focus on those that have the trailing two-character state abbreviation we are looking for.

The map method extracts this from the location field using another regular expression and gives the output as the capitalized form of the abbreviation along with a simple count.

The driver for the job has the most changes as the previous configuration involving a single map class is replaced with multiple calls on the ChainMapper class.

The general model is to create a new configuration object for each mapper, then add the mapper to the ChainMapper class along with a specification of its input and output, and a reference to the overall job configuration object.

Notice that the two mappers have different signatures. Both input a key of type LongWritable and value of type Text which are also the output types of UFORecordValidationMapper. UFOLocationMapper however outputs the reverse with a key of type Text and a value of type LongWritable.

The important thing here is to match the input from the final mapper in the chain (UFOLocationMapper) with the inputs expected by the reduce class (LongSumReducer). When using theChainMapper class the mappers in the chain can have different input and output as long as the following are true:

  • For all but the final mapper each map output matches the input of the subsequent mapper in the chain
  • For the final mapper, its output matches the input of the reducer

We compile these classes and put them in the same jar file. This is the first time we have bundled the output from more than one Java source file together. As may be expected, there is no magic here; the usual rules on jar files, path, and class names apply. Because in this case we have both our classes in the same package, we don't have to worry about an additional import in the driver class file.

We then run the MapReduce job and examine the output, which is not quite as expected.

Have a go hero

Use the Java API and the previousChainMapper example to reimplement the mappers previously written in Ruby that produce the shape frequency and duration reports.

Too many abbreviations

The following are the first few entries from our result file of the previous job:

AB      286
AD      6
AE      7
AI      6
AK      234
AL      548
AM      22
AN      161
…

The file had 186 different two-character entries. Plainly, our approach of extracting the final character digraph from the location field was not sufficiently robust.

We have a number of issues with the data which becomes apparent after a manual analysis of the source file:

  • There is inconsistency in the capitalization of the state abbreviations
  • A non-trivial number of sightings are from outside the U.S. and though they may follow a similar (city, area) pattern, the abbreviation is not one of the 50 we'd expect
  • Some fields simply don't follow the pattern at all, yet would still be captured by our regular expression

We need to filter these results, ideally by normalizing the U.S. records into correct state output and by gathering everything else into a broader category.

To perform this task we need to add to the mapper some notion of what the valid U.S. state abbreviations are. We could of course hardcode this into the mapper but that does not seem right. Although we are for now going to treat all non-U.S. sightings as a single category, we may wish to extend that over time and perhaps do a breakdown by country. If we hardcode the abbreviations, we would need to recompile our mapper each time.

Using the Distributed Cache

Hadoop gives us an alternative mechanism to achieve the goal of sharing reference data across all tasks in the job, the Distributed Cache. This can be used to efficiently make available common read-only files that are used by the map or reduce tasks to all nodes. The files can be text data as in this case but could also be additional jars, binary data, or archives; anything is possible.

The files to be distributed are placed on HDFS and added to the DistributedCache within the job driver. Hadoop copies the files onto the local filesystem of each node prior to job execution, meaning every task has local access to the files.

An alternative is to bundle needed files into the job jar submitted to Hadoop. This does tie the data to the job jar making it more difficult to share across jobs and requires the jar to be rebuilt if the data changes.

Time for action – using the Distributed Cache to improve location output

Let's now use the Distributed Cache to share a list of U.S. state names and abbreviations across the cluster:

  1. Create a datafile called states.txt on the local filesystem. It should have the state abbreviation and full name tab separated, one per line. Or retrieve the file from this book's homepage. The file should start like the following:
    AL      Alabama
    AK      Alaska
    AZ      Arizona
    AR      Arkansas
    CA      California
    
    …
  2. Place the file on HDFS:
    $ hadoop fs -put states.txt states.txt
    
  3. Copy the previous UFOLocation.java file to UFOLocation2.java file and make the changes by adding the following import statements:
    import java.io.* ;
    import java.net.* ;
    import java.util.* ;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.filecache.DistributedCache ;
  4. Add the following line to the driver main method after the job name is set:
    DistributedCache.addCacheFile(new URI ("/user/hadoop/states.txt"), conf) ;
  5. Replace the map class as follows:
        public static class MapClass extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, LongWritable>
        {
    
            private final static LongWritable one = new LongWritable(1);
            private static Pattern locationPattern = Pattern.compile(
    "[a-zA-Z]{2}[^a-zA-Z]*$") ;
            private Map<String, String> stateNames ;
    
            @Override
            public void configure( JobConf job)
            {
                try
                {
                    Path[] cacheFiles = DistributedCache.getLocalCacheFiles(job) ;
                    setupStateMap( cacheFiles[0].toString()) ;
                } catch (IOException e) 
    {
    System.err.println("Error reading state file.") ;
                        System.exit(1) ;
    }
            }
    
            private void setupStateMap(String filename) 
    throws IOException
            {
                Map<String, String> states = new HashMap<String, 
    String>() ;
                BufferedReader reader = new BufferedReader( new FileReader(filename)) ;
                String line = reader.readLine() ;
                while (line != null)
                {
                    String[] split = line.split("\t") ;
                    states.put(split[0], split[1]) ;
                    line = reader.readLine() ;
                }
    
                stateNames = states ;
            }
    
            public void map(LongWritable key, Text value,
                OutputCollector<Text, LongWritable> output,
                Reporter reporter) throws IOException
            {
                String line = value.toString();
            String[] fields = line.split("\t") ;
            String location = fields[2].trim() ;
            if (location.length() >= 2)
            {
    
                Matcher matcher = locationPattern.matcher(location) ;
                if (matcher.find() )
                {
                    int start = matcher.start() ;
                    String state = location.substring(start, start+2) ;
    
                    output.collect(newText(lookupState(state.toUpperCase())), one);
                }
            }
        }
    
        private String lookupState( String state)
        {
            String fullName = stateNames.get(state) ;
    
            return fullName == null? "Other": fullName ;
            }
    }
  6. Compile these classes and submit the job to Hadoop. Then retrieve the result file.

What just happened?

We first created the lookup file we will use in our job and placed it on HDFS. Files to be added to the Distributed Cache must initially be copied onto the HDFS filesystem.

After creating our new job file, we added the required class imports. Then we modified the driver class to add the file we want on each node to be added to the DistributedCache. The filename can be specified in multiple ways, but the easiest way is with an absolute path to the file location on HDFS.

There were a number of changes to our mapper class. We added an overridden configure method, which we use to populate a map that will be used to associate state abbreviations with their full name.

The configure method is called on task startup and the default implementation does nothing. In our overridden version, we retrieve the array of files that have been added to the Distributed Cache. As we know there is only one file in the cache we feel safe in using the first index in this array, and pass that to a utility method that parses the file and uses the contents to populate the state abbreviation lookup map. Notice that once the file reference is retrieved, we can access the file with standard Java I/O classes; it is after all just a file on the local filesystem.

We add another method to perform the lookup that takes the string extracted from the location field and returns either the full name of the state if there is a match or the string Other otherwise. This is called prior to the map result being written via the OutputCollector class.

The result of this job should be similar to the following data:

Alabama548
Alaska234
Arizona2097
Arkansas534
California7679
…
Other4531…
…

This works fine but we have been losing some information along the way. In our validation mapper, we simply drop any lines which don't meet our six field criteria. Though we don't care about individual lost records, we may care if the number of dropped records is very large. Currently, our only way of determining that is to sum the number of records for each recognized state and subtract from the total number of records in the file. We could also try to have this data flow through the rest of the job to be gathered in a special reduced key but that also seems wrong. Fortunately, there is a better way.

Counters, status, and other output

At the end of every MapReducejob, we see output related to counters such as the following output:

12/02/12 06:28:51 INFO mapred.JobClient: Counters: 22
12/02/12 06:28:51 INFO mapred.JobClient:   Job Counters 
12/02/12 06:28:51 INFO mapred.JobClient:     Launched reduce tasks=1
12/02/12 06:28:51 INFO mapred.JobClient:     Launched map tasks=18
12/02/12 06:28:51 INFO mapred.JobClient:     Data-local map tasks=18
12/02/12 06:28:51 INFO mapred.JobClient:   SkippingTaskCounters
12/02/12 06:28:51 INFO mapred.JobClient:     MapProcessedRecords=61393
…

It is possible to add user-defined counters that will likewise be aggregated from all tasks and reported in this final output as well as in the MapReduce web UI.

Time for action – creating counters, task states, and writing log output

We'll modify our UFORecordValidationMapper to report statistics about skipped records and also highlight some other facilities for recording information about a job:

  1. Create the following as the UFOCountingRecordValidationMapper.java file:
    import java.io.IOException;
    
    import org.apache.hadoop.io.* ;
    import org.apache.hadoop.mapred.* ;
    import org.apache.hadoop.mapred.lib.* ;
    
    public class UFOCountingRecordValidationMapper extends MapReduceBase
    implements Mapper<LongWritable, Text, LongWritable, Text>
    {
    
        public enum LineCounters
        {
            BAD_LINES,
            TOO_MANY_TABS,
            TOO_FEW_TABS
        } ;
    
        public void map(LongWritable key, Text value,
            OutputCollector<LongWritable, Text> output,
            Reporter reporter) throws IOException
        {
            String line = value.toString();
    
            if (validate(line, reporter))
    Output.collect(key, value);
        }
    
        private boolean validate(String str, Reporter reporter)
        {
            String[] parts = str.split("\t") ;
    
            if (parts.length != 6)
            {
                if (parts.length < 6)
                {
    reporter.incrCounter(LineCounters.TOO_FEW_TABS, 1) ;
                }
                else
                {
                    reporter.incrCounter(LineCounters.TOO_MANY_TABS, 1) ;
                }
    
                reporter.incrCounter(LineCounters.BAD_LINES, 1) ;
    
    if((reporter.getCounter(
    LineCounters.BAD_LINES).getCounter()%10)
    == 0)
                {
                    reporter.setStatus("Got 10 bad lines.") ;
                    System.err.println("Read another 10 bad lines.") ;
                }
    
                return false ;
            }
            return true ;
        }
            }
  2. Make a copy of the UFOLocation2.java file as the UFOLocation3.java file to use this new mapper instead of UFORecordValidationMapper:
    …
            JobConf mapconf1 = new JobConf(false) ;
            ChainMapper.addMapper( conf, 
    UFOCountingRecordValidationMapper.class,
                LongWritable.class, Text.class, LongWritable.class, 
    Text.class,
                true, mapconf1) ;
  3. Compile the files, jar them up, and submit the job to Hadoop:
    
    12/02/12 06:28:51 INFO mapred.JobClient: Counters: 22
    12/02/12 06:28:51 INFO mapred.JobClient: UFOCountingRecordValidationMapper$LineCounters
    12/02/12 06:28:51 INFO mapred.JobClient: TOO_MANY_TABS=324
    12/02/12 06:28:51 INFO mapred.JobClient: BAD_LINES=326
    12/02/12 06:28:51 INFO mapred.JobClient: TOO_FEW_TABS=2
    12/02/12 06:28:51 INFO mapred.JobClient: Job Counters 
    
  4. Use a web browser to go to the MapReduce web UI (remember by default it is on port 50030 on the JobTracker host). Select the job at the bottom of the Completed Jobs list and you should see a screen similar to the following screenshot:
  5. Click on the link to the map tasks and you should see an overview screen like the following screenshot:
  6. For one of the tasks with our custom status message, click on the link to its counters. This should give a screen similar to the one shown as follows:
  7. Go back to the task list and click on the task ID to get the task overview similar to the following screenshot:
  8. Under the Task Logs column are options for the amount of data to be displayed. Click on All and the following screenshot should be displayed:
  9. Now log into one of the task nodes and look through the files stored under hadoop/logs/userlogs. There is a directory for each task attempt and several files within each; the one to look for is stderr.

What just happened?

The first thing we need to do in order to add new counters is to create a standard Java enumeration that will hold them. In this case we created what Hadoop would consider a counter group called LineCounters and within that there are three counters for the total number of bad lines, and finer grained counters for the number of lines with either too few or too many fields. This is all you need to do to create a new set of counters; define the enumeration and once you start setting the counter values, they will be automatically understood by the framework.

To add to a counter we simply increment it via the Reporter object, in each case here we add one each time we encounter a bad line, one with fewer than six fields, and one with more than six fields.

We also retrieve the BAD_LINE counter for a task and if it is a multiple of 10, do the following:

  • Set the task status to reflect this fact
  • Write a similar message to stderr with the standard Java System.err.println mechanism

We then go to the MapReduce UI and validate whether we can see both the counter totals in the job overview as well as tasks with the custom state message in the task list.

We then explored the web UI, looking at the counters for an individual job, then under the detail page for a task we saw, we can click on through the log files for the task.

We then looked at one of the nodes to see that Hadoop also captures the logs from each task in a directory on the filesystem under the {HADOOP_HOME}/logs/userlogs directory. Under subdirectories for each task attempt, there are files for the standard streams as well as the general task logs. As you will see, a busy node can end up with a large number of task log directories and it is not always easy to identify the task directories of interest. The web interface proved itself to be a more efficient view on this data.

Tip

If you are using the Hadoop context object API, then counters are accessed through the Context.getCounter().increment() method.

Too much information!

After not worrying much about how to get status and other information out of our jobs, it may suddenly seem like we've got too many confusing options. The fact of the matter is that when running a fully distributed cluster in particular, there really is no way around the fact that the data may be spread across every node. With Java code we can't as easily mock its usage on the command line as we did with our Ruby Streaming tasks; so care needs to be taken to think about what information will be needed at runtime. This should include details concerning both the general job operation (additional statistics) as well as indicators of problems that may need further investigation.

Counters, task status messages, and good old-fashioned Java logging can work together. If there is a situation you care about, set it as a counter that will record each time it occurs and consider setting the status message of the task that encountered it. If there is some specific data, write that to stderr. Since counters are so easily visible, you can know pretty quickly post job completion if the situation of interest occurred. From this, you can go to the web UI and see all the tasks in which the situation was encountered at a glance. From here, you can click through to examine the more detailed logs for the task.

In fact, you don't need to wait until the job completes; counters and task status messages are updated in the web UI as the job proceeds, so you can start the investigation as soon as either counters or task status messages alert you to the situation. This is particularly useful in very long running jobs where the errors may cause you to abort the job.

Summary

This chapter covered development of a MapReduce job, highlighting some of the issues and approaches you are likely to face frequently. In particular, we learned how Hadoop Streaming provides a means to use scripting languages to write map and reduce tasks, and how using Streaming can be an effective tool for early stages of job prototyping and initial data analysis.

We also learned that writing tasks in a scripting language can provide the additional benefit of using command-line tools to directly test and debug the code. Within the Java API, we looked at the ChainMapper class that provides an efficient way of decomposing a complex map task into a series of smaller, more focused ones.

We then saw how the Distributed Cache provides a mechanism for efficient sharing of data across all nodes. It copies files from HDFS onto the local filesystem on each node, providing local access to the data. We also learned how to add job counters by defining a Java enumeration for the counter group and using framework methods to increment their values, and how to use a combination of counters, task status messages, and debug logs to develop an efficient job analysis workflow.

We expect most of these techniques and ideas to be the ones that you will encounter frequently as you develop MapReduce jobs. In the next chapter, we will explore a series of more advanced techniques that are less often encountered but are invaluable when they are.