Hadoop MapReduce Tutorial

Hadoop MapReduce Tutorial – Maximize MapReduce

SujeeManiyam
mapreduce

A step-by-step guide on how to use Hadoop MapReduce to analyse the effectiveness of an advertising campaign, by Sujee Maniyam

For this article, let’s pretend that we are running an online
advertising company. We run advertising campaigns for clients (like
Pepsi, Sony) and the ads are displayed on popular websites such as
news sites (CNN, Fox) and social media sites (Facebook). To
track how well an advertising campaign is doing, we keep track of
the ads we serve and ads that users click.

Scenario

Here is the sequence of events:

  1. We serve the ad to the user

  2. If the ad appears on users browser, aka user saw the
    ad. 
    We track this event as VIEWED_EVENT

  3. If user clicks on the ad, we track this event as
    CLICKED_
    EVENT

These events are logged as plain text log files on our web
servers. These logs can get rather large, into hundreds of millions
(100,000,000) per day. We are talking about hundreds of gigabytes
or even terabytes of data. There is lot of interest
ing
information and trends that can be gleaned by analyzing this
massive amount of data, and Hadoop is a good choice to analyze it.
For this article, we will calculate the campaign effectiveness by
measuring the clicks vs. views ratio.

What is needed to run this example?

Java: The code samples are
in Java and you will need Java for running Hadoop. Having an IDE
such as Eclipse is optional.

Hadoop: You’ll need Hadoop installed and
running. This can be an actual Hadoop cluster comprising multiple
ma- chines. You can have Hadoop running on your laptop as well!
Just plain Hadoop is enough. Hadoop can be downloaded from the
Apache website or Cloudera’s website. Setting up
Hadoop is beyond the scope of this article. Please refer to the
documentation
from respective sites
.

Ruby (optional): There
are sample log files provided. If you need more log files, you can
use the included Ruby script

ad-log-generator.rb. Ruby is a very compact language
and very handy for scripts etc.

Source Code for this article: All code and
materials mentioned in this article are available from Github.
You can download the code from the URL in the references, or you
can use git to get it:

https://github.com/sujee/java-tech-journal-mapreduce.

Log Files: The log files are in the
following format:
times- tamp, user_id, view/click,
domain, campaign_id.
E.g: 1262332801728, 899523, 1, npr.org,
19

The files are in CSV (Comma Separated Values) format, one
record per line. Each record will have the following
fields:

  • timestamp : unix time stamp in milliseconds 
  • user_id : each user has a unique id
  • action_id : 1=view, 2=click
  • domain : which domain the ad was served
  • campaign_id: identifies the campaign the ad was part of

There is a file sample.log that
contains some of the entries:

  • 1293868800864,319248,1,flickr.com,12 
  • 1293868801728,625828,1,npr.org,19 
  • 1293868802592,522177,2,wikipedia.org,16 
  • 1293868803456,535052,2,cnn.com,20

logs directory contains a few log files that can
be used for testing.

Measuring Campaign Effectiveness: Views vs.
Clicks

We want to see how our ad campaigns are doing. For each
campaign we want to measure the VIEWS and CLICKS. And we are going
to use MapReduce to calculate this! Let’s sketch out the MapReduce
algorithm. Sometimes it is easier to work backwards from the
results we want:

Input: log
lines
Output:
campaign1,
total_views, total_clicks
campaign2, total_views, total_clicks

Our reducer has to produce the
final output, campaign 
stats. So reducers have to
receive ‘campaign’ as key. Let’s make all action_id for each
campaign as values. So our map is going through log lines, and
extract campaign_id and action_id. Figure 1 illustrates the
workflow.

Let’s see some
Code!

A MapReduce program (usually)
has three components:


mapper
reducer

and a ‘driver’ program to wire every thing
together

If the code is fairly small, we can
have the whole thing in ONE file. The code can be
seen in Listing
1 on GitHub

Code Walkthrough:
Mapper

The first thing to note is how
our mapper class is defined:

static class MyMapper extends Mapper<Object, Text, IntWritable, IntWritable>

We are specifying:

  • input key, value is: Object, Text: Input key is the line
    number, since we don’t care about it, we are just casting it as
    Object. Input value is the actual line, a Java String, the
    equivalent MapReduce class is called Text.
  • output key value is: IntWritable, IntWritable: Mapper outputs
    campaign_id and action_id as integer.

Side Note: MapReduce Wrapper
Classes

Java Type MapReduce Type
Int IntWritable
Long LongWritable
String Text
Table 1: MapReduce wrapper
classes around Java primitives 

MapReduce has wrapper classes around
most Java primitives. This is so the values can be serialized
/de-serialized (Table 1). Now the map function:

 

static class MyMapper extends Mapper<Object, Text, IntWritable, IntWritable>

Note the input parameters match the Mapper class definitions.
The mapper function is quite straightforward. We split the line
into tokens. Extract ‘campaign’ and ‘action’ fields. Wrap these two
as IntWritables and write them out.

Code: Reducer

Let’s start with Reducer class definition:

public static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable, Text>
  • input key/value : Int-
    Writable/IntWritable
    : These class types match the output
    of Mapper 
  • and the output is Int- Writable/Text

Let’s look at the reduce function:

public void reduce(IntWritable key, Iterable<IntWritable> results, Context context)

Key/value : IntWritable/List of IntWritables
for every campaign, we are getting all actions for that campaign as
an iterable list. We are iterating through action_ids .
And calculating ‘views’ and ‘clicks’. Once we are done calculating,
we write out the results. This is possible because all actions
for a campaign are grouped and sent to one reducer (remember the
MapReduce ‘magic’).

Final Code: Driver Code

This is where all the mappers and reducers are wired and
the job is set off. Most of this code is boiler plate code and is
quite straightforward. I will skip the explanation for
this 
part. Before going further, make sure to have the
following:

Hadoop installed and running

HADOOP_HOME environment variable
set

Copy the log files into HDFS:

        hadoop dfs –mkdir adlogs/in 
        hadoop dfs –put *.log adlogs/in

This will create a directory in HDFS under /user/user_name/
adlogs/in
. The username in our case is
/user/sujee/adlogs/in. Run compile.sh in the top directory
to compile java source and produce a jar (a.jar): sh
./compile.sh
. We run this map reduce job by:

$HADOOP_HOME/bin/hadoop jar a.jar mapreduce.CampaignMR1 adlogs/in adlogs/out
  • using the ‘hadoop’ command that is in ‘$HADOOP_ HOME/bin’
    directory
  • pointing to a.jarsupplying our MR class:
    mapreduce.CampaignMR1
  • two paths : adlogs/in is the location of log files, 
  • adlogs/out is where the output will be written 

This will produce an output like this:

12/01/18 08:22:25 INFO input.FileInputFormat: Total input paths to process : 5
12/01/18 08:22:25 INFO mapred.JobClient: Running job: job_201201180820_0004
12/01/18 08:22:26 INFO mapred.JobClient: map 0% reduce 0%
12/01/18 08:22:32 INFO mapred.JobClient: map 40% reduce 0%
12/01/18 08:22:33 INFO mapred.JobClient: map 60% reduce 0%
12/01/18 08:22:36 INFO mapred.JobClient: map 100% reduce 0%
12/01/18 08:22:41 INFO mapred.JobClient: map 100% reduce 26%
12/01/18 08:22:43 INFO mapred.JobClient: map 100% reduce 100%
12/01/18 08:22:43 INFO mapred.JobClient: Job complete: job_201201180820_0004

Let’s see the output generated. This will be
in /user/user_name/ adlogs/out. You can browse
here using NameNode Web UI. There should be a
file part-r-00000. Go ahead and click on it. Or you can
inspect is like this also:

• hadoop dfs –ls adlogs/out
• hadoop dfs –cat adlots/out/part-r-00000

It will look like the following

1  views=134, clicks=122

2  views=139, clicks=135

3  views=115, clicks=152

4  views=129, clicks=123

5  views=99, clicks=120

6  views=110, clicks=124

7  views=126, clicks=113

8  views=110, clicks=119

9  views=127, clicks=131

10  views=113, clicks=143

11 views = 120, clicks=116

12 views = 99, clicks=143

13 views=113, clicks =133

14 views=117, clicks 135

15 views=127, clicks 121

16 views=117, clicks 121

17 views=127, clicks 98

18 views=127, clicks 157

19 views=127, clicks 117

20 views=132, clicks 169

There we go, we have our campaign summary! The result file is in
HDFS. To copy it to your local computer:

hadoop dfs –copyToLocal adlogs/out/part-r-00000 campaign-report

Then this report can be emailed, imported to excel and so
on.

Note on Output Format

The above format is:

Campaign_id <TAB> views=x, clicks=y <NEWLINE>

To make it easy to import to Excel, we can use TAB as field
separator. In reduce function, modify the following line:

String stats = "views=" + views + ", clicks=" + clicks;

to:

String stats = views + "t" + clicks;

Improving MapReduce Code

Currently our map and reduce functions assume the data is
correct and everything is in order. In real life however, it’s a
little less ideal. We need to harden our code to deal with inva-
lid data. This is especially true in MapReduce because we deal with
large amount of data. Debugging can be very difficult when things
don’t work right. The hardened code is seen in
Listing 2
 on GitHub.

Map Reduce Good Practices

Using Counters: At the end map reduce job run, you will see
output like this:

FileSystemCounters

12/01/18 08:22:43 INFO mapred.JobClient: FILE_BYTES_READ=1733
12/01/18 08:22:43 INFO mapred.JobClient: HDFS_BYTES_READ=186904
12/01/18 08:22:43 INFO mapred.JobClient: FILE_BYTES_WRITTEN=290468
12/01/18 08:22:43 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=508

These are counters produced by the MapReduce framework. These
ones display some IO stats. We can have our own counters as well. I
have just shown a custom counter called INCORRECT_RECORDS. Every
time we encounter data that is not valid, we increment this
counter. At the end of MapReduce run, this counter value will be
printed out. Also, we can inspect this counter at the job-tracker
page. It is a good practice to have custom counters to track
things. Counters can be used to track a number of invalid records,
to keep track of time spent in a particular operation for
example.

Error Checking: Programs must deal with
errors. Checking for errors is even more crucial in Map-Reduce
programs because the MR program deals with large amount of data and
MR jobs run on multiple machines. So debugging a program crash
because of bad-data is not easy. And when processing unstructured
data (logs etc.), there is always a chance to encounter malformed
data. So the MR program has to check for errors at every step of
the way:

  • First check is to see if the line has the correct number of
    fields (6 in our case). If not, we increment
    INCORRECT_ RECORDS counter and return from map function.
    No further processing is needed.
  • Always print out in correct records. These logs go
    into mapper logs. These can be accessed via JobTracker
    UI (Click on Job | map | Tasks | output).
  • Then we handle exceptions when converting action/campaign into
    integers. 
  • Usually mappers have to do a lot of error-checking
    because they go through the raw input. So mapper is a good
    place to clean up data, drop invalid data, etc.

Reducer’s input comes from mappers, by the time the data
is cleaned up by mappers.

Development and Deployment:
You can develop map reduce jobs on your laptop or a smaller
cluster. Get the code/ algorithm working with a small amount of
test data. Then deploy the job to a production cluster. Code that
works with smaller amounts of data (megabytes) might not work with
larger data sets (terabytes).

Testing with different sizes of
data
:
When we start to write a MapReduce
function, we will start with a small set of sample data. Once we
are reasonably sure that the algorithm is solid, then we will test
with larger-sized data. This is why having ‘data generator’ scripts
can be handy (our ruby script). Using these scripts, we can
generate any amount of data (ten lines or ten million
lines).

Development Tools: There are
not many tools and IDEs available for Hadoop MapReduce programming.
Plain Eclipse can be used for developing MapReduce programs in
Java. This void is being filled by tools like Karphasphere
IDE.

Next Steps

Now that you have done a simple map-reduce program, it is
time to explore more. We calculated total views and total clicks
for campaign. How can we calculate views/clicks for each day?
(Hint: use a composite key: campaign_id + date) 

 

This article originally appeared in Java Tech Journal:
Hadoop back in February 2012. To read more articles of a
Hadoop/MapReduce download the issue here

Author
SujeeManiyam
Sujee Maniyam is a seasoned software developer. He has developed Enterprise apps, Web apps and mobile apps. Now he consults on the Hadoop/Big Data area. His clients include small startups and Enterprise companies. He writes about Hadoop and its sister projects at http://sujee.net/ tech/articles.
Comments
comments powered by Disqus