Hadoop MapReduce Tutorial
Hadoop MapReduce Tutorial - Maximize MapReduce
For this article, let’s pretend that we are running an online advertising company. We run advertising campaigns for clients (like Pepsi, Sony) and the ads are displayed on popular websites such as news sites (CNN, Fox) and social media sites (Facebook). To track how well an advertising campaign is doing, we keep track of the ads we serve and ads that users click.
Here is the sequence of events:
We serve the ad to the user
If the ad appears on users browser, aka user saw the ad. We track this event as VIEWED_EVENT
If user clicks on the ad, we track this event as CLICKED_EVENT
These events are logged as plain text log files on our web servers. These logs can get rather large, into hundreds of millions (100,000,000) per day. We are talking about hundreds of gigabytes or even terabytes of data. There is lot of interesting information and trends that can be gleaned by analyzing this massive amount of data, and Hadoop is a good choice to analyze it. For this article, we will calculate the campaign effectiveness by measuring the clicks vs. views ratio.
What is needed to run this example?
Java: The code samples are in Java and you will need Java for running Hadoop. Having an IDE such as Eclipse is optional.
Hadoop: You’ll need Hadoop installed and running. This can be an actual Hadoop cluster comprising multiple ma- chines. You can have Hadoop running on your laptop as well! Just plain Hadoop is enough. Hadoop can be downloaded from the Apache website or Cloudera’s website. Setting up Hadoop is beyond the scope of this article. Please refer to the documentation from respective sites.
Ruby (optional): There are sample log files provided. If you need more log files, you can use the included Ruby script ad-log-generator.rb. Ruby is a very compact language and very handy for scripts etc.
Source Code for this article: All code and materials mentioned in this article are available from Github. You can download the code from the URL in the references, or you can use git to get it: https://github.com/sujee/java-tech-journal-mapreduce.
Log Files: The log files are in the following format: times- tamp, user_id, view/click, domain, campaign_id. E.g: 1262332801728, 899523, 1, npr.org, 19
The files are in CSV (Comma Separated Values) format, one record per line. Each record will have the following fields:
- timestamp : unix time stamp in milliseconds
- user_id : each user has a unique id
- action_id : 1=view, 2=click
- domain : which domain the ad was served
- campaign_id: identifies the campaign the ad was part of
There is a file sample.log that contains some of the entries:
logs directory contains a few log files that can be used for testing.
Measuring Campaign Effectiveness: Views vs. Clicks
We want to see how our ad campaigns are doing. For each campaign we want to measure the VIEWS and CLICKS. And we are going to use MapReduce to calculate this! Let’s sketch out the MapReduce algorithm. Sometimes it is easier to work backwards from the results we want:
campaign1, total_views, total_clicks
campaign2, total_views, total_clicks
Our reducer has to produce the final output, campaign stats. So reducers have to receive ‘campaign’ as key. Let’s make all action_id for each campaign as values. So our map is going through log lines, and extract campaign_id and action_id. Figure 1 illustrates the workflow.
Let’s see some Code!
A MapReduce program (usually) has three components:
• and a ‘driver’ program to wire every thing together
If the code is fairly small, we can have the whole thing in ONE file. The code can be seen in Listing 1 on GitHub
Code Walkthrough: Mapper
The first thing to note is how our mapper class is defined:
static class MyMapper extends Mapper<Object, Text, IntWritable, IntWritable>
We are specifying:
- input key, value is: Object, Text: Input key is the line number, since we don’t care about it, we are just casting it as Object. Input value is the actual line, a Java String, the equivalent MapReduce class is called Text.
- output key value is: IntWritable, IntWritable: Mapper outputs campaign_id and action_id as integer.
Side Note: MapReduce Wrapper Classes
|Java Type||MapReduce Type|
Table 1: MapReduce wrapper classes around Java primitives
MapReduce has wrapper classes around most Java primitives. This is so the values can be serialized /de-serialized (Table 1). Now the map function:
static class MyMapper extends Mapper<Object, Text, IntWritable, IntWritable>
Note the input parameters match the Mapper class definitions. The mapper function is quite straightforward. We split the line into tokens. Extract ‘campaign’ and ‘action’ fields. Wrap these two as IntWritables and write them out.
Let’s start with Reducer class definition:
public static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable, Text>
- input key/value : Int- Writable/IntWritable: These class types match the output of Mapper
- and the output is Int- Writable/Text
Let’s look at the reduce function:
public void reduce(IntWritable key, Iterable<IntWritable> results, Context context)
Key/value : IntWritable/List of IntWritables for every campaign, we are getting all actions for that campaign as an iterable list. We are iterating through action_ids . And calculating ‘views’ and ‘clicks’. Once we are done calculating, we write out the results. This is possible because all actions for a campaign are grouped and sent to one reducer (remember the MapReduce ‘magic’).
Final Code: Driver Code
This is where all the mappers and reducers are wired and the job is set off. Most of this code is boiler plate code and is quite straightforward. I will skip the explanation for this part. Before going further, make sure to have the following:
• Hadoop installed and running
• HADOOP_HOME environment variable set
Copy the log files into HDFS:
hadoop dfs –mkdir adlogs/in hadoop dfs –put *.log adlogs/in
This will create a directory in HDFS under /user/user_name/ adlogs/in. The username in our case is /user/sujee/adlogs/in. Run compile.sh in the top directory to compile java source and produce a jar (a.jar): sh ./compile.sh. We run this map reduce job by:
$HADOOP_HOME/bin/hadoop jar a.jar mapreduce.CampaignMR1 adlogs/in adlogs/out
- using the ‘hadoop’ command that is in ‘$HADOOP_ HOME/bin’ directory
- pointing to a.jarsupplying our MR class: mapreduce.CampaignMR1
- two paths : adlogs/in is the location of log files,
- adlogs/out is where the output will be written
This will produce an output like this:
12/01/18 08:22:25 INFO input.FileInputFormat: Total input paths to process : 5 12/01/18 08:22:25 INFO mapred.JobClient: Running job: job_201201180820_0004 12/01/18 08:22:26 INFO mapred.JobClient: map 0% reduce 0% 12/01/18 08:22:32 INFO mapred.JobClient: map 40% reduce 0% 12/01/18 08:22:33 INFO mapred.JobClient: map 60% reduce 0% 12/01/18 08:22:36 INFO mapred.JobClient: map 100% reduce 0% 12/01/18 08:22:41 INFO mapred.JobClient: map 100% reduce 26% 12/01/18 08:22:43 INFO mapred.JobClient: map 100% reduce 100% 12/01/18 08:22:43 INFO mapred.JobClient: Job complete: job_201201180820_0004
Let’s see the output generated. This will be in /user/user_name/ adlogs/out. You can browse here using NameNode Web UI. There should be a file part-r-00000. Go ahead and click on it. Or you can inspect is like this also:
• hadoop dfs –ls adlogs/out
• hadoop dfs –cat adlots/out/part-r-00000
It will look like the following
1 views=134, clicks=122 2 views=139, clicks=135 3 views=115, clicks=152 4 views=129, clicks=123 5 views=99, clicks=120 6 views=110, clicks=124 7 views=126, clicks=113 8 views=110, clicks=119 9 views=127, clicks=131 10 views=113, clicks=143 11 views = 120, clicks=116 12 views = 99, clicks=143 13 views=113, clicks =133 14 views=117, clicks 135 15 views=127, clicks 121 16 views=117, clicks 121 17 views=127, clicks 98 18 views=127, clicks 157 19 views=127, clicks 117 20 views=132, clicks 169
There we go, we have our campaign summary! The result file is in HDFS. To copy it to your local computer:
hadoop dfs –copyToLocal adlogs/out/part-r-00000 campaign-report
Then this report can be emailed, imported to excel and so on.
Note on Output Format
The above format is:
Campaign_id <TAB> views=x, clicks=y <NEWLINE>
To make it easy to import to Excel, we can use TAB as field separator. In reduce function, modify the following line:
String stats = "views=" + views + ", clicks=" + clicks;
String stats = views + "\t" + clicks;
Improving MapReduce Code
Currently our map and reduce functions assume the data is correct and everything is in order. In real life however, it’s a little less ideal. We need to harden our code to deal with inva- lid data. This is especially true in MapReduce because we deal with large amount of data. Debugging can be very difficult when things don’t work right. The hardened code is seen in Listing 2 on GitHub.
Map Reduce Good Practices
Using Counters: At the end map reduce job run, you will see output like this:
FileSystemCounters 12/01/18 08:22:43 INFO mapred.JobClient: FILE_BYTES_READ=1733 12/01/18 08:22:43 INFO mapred.JobClient: HDFS_BYTES_READ=186904 12/01/18 08:22:43 INFO mapred.JobClient: FILE_BYTES_WRITTEN=290468 12/01/18 08:22:43 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=508
These are counters produced by the MapReduce framework. These ones display some IO stats. We can have our own counters as well. I have just shown a custom counter called INCORRECT_RECORDS. Every time we encounter data that is not valid, we increment this counter. At the end of MapReduce run, this counter value will be printed out. Also, we can inspect this counter at the job-tracker page. It is a good practice to have custom counters to track things. Counters can be used to track a number of invalid records, to keep track of time spent in a particular operation for example.
Error Checking: Programs must deal with errors. Checking for errors is even more crucial in Map-Reduce programs because the MR program deals with large amount of data and MR jobs run on multiple machines. So debugging a program crash because of bad-data is not easy. And when processing unstructured data (logs etc.), there is always a chance to encounter malformed data. So the MR program has to check for errors at every step of the way:
- First check is to see if the line has the correct number of fields (6 in our case). If not, we increment INCORRECT_ RECORDS counter and return from map function. No further processing is needed.
- Always print out in correct records. These logs go into mapper logs. These can be accessed via JobTracker UI (Click on Job | map | Tasks | output).
- Then we handle exceptions when converting action/campaign into integers.
- Usually mappers have to do a lot of error-checking because they go through the raw input. So mapper is a good place to clean up data, drop invalid data, etc.
Reducer’s input comes from mappers, by the time the data is cleaned up by mappers.
Development and Deployment: You can develop map reduce jobs on your laptop or a smaller cluster. Get the code/ algorithm working with a small amount of test data. Then deploy the job to a production cluster. Code that works with smaller amounts of data (megabytes) might not work with larger data sets (terabytes).
Testing with different sizes of data: When we start to write a MapReduce function, we will start with a small set of sample data. Once we are reasonably sure that the algorithm is solid, then we will test with larger-sized data. This is why having ‘data generator’ scripts can be handy (our ruby script). Using these scripts, we can generate any amount of data (ten lines or ten million lines).
Development Tools: There are not many tools and IDEs available for Hadoop MapReduce programming. Plain Eclipse can be used for developing MapReduce programs in Java. This void is being filled by tools like Karphasphere IDE.
Now that you have done a simple map-reduce program, it is time to explore more. We calculated total views and total clicks for campaign. How can we calculate views/clicks for each day? (Hint: use a composite key: campaign_id + date)
This article originally appeared in Java Tech Journal: Hadoop back in February 2012. To read more articles of a Hadoop/MapReduce download the issue here