Perfect your playlist

Let Kiji be your DJ

Aaron Kimball and Lee Shang
music

Build your own bespoke music recommendations engine with the help of this smashing Kiji tutorial.

It’s the year 3000, and new music is being
created at such a fast pace that your company Pandorify can no
longer analyze and categorize music fast enough to recommend it to
your users. Fortunately, technology has kept up! With KijiSchema
and KijiMR, you can leverage all the song play data you have
collected over the last thousand years to make recommendations for
your users.

We believe that past patterns are a good source for
future recommendations. To impose structure on the data, we will
use a layout in KijiSchema. To effectively use this gigantic flood
of data, we can use MapReduce paradigms provided by KijiMR to
analyze and provide recommendations.

In this tutorial, we demonstrate how to use KijiMR to
leverage your data effectively. You will:

● Efficiently import data into a KijiTable.

● Manipulate data in and between Kiji and HDFS.

● Use a gatherer to generate the next­song pairs, and
use a generic MapReduce job to aggregate them into counts.

● Use a producer and join together data sources, to
generate the recommendations for our users.

How to Use this Tutorial

● Links to Javadoc ­ Class names link to the relevant
Javadoc: EntityId.

● Code Walkthrough ­ Code snippets are in gray boxes
with language specific syntax highlighting.

System.out.println("Hello Kiji");

● Shell Commands ­ Shell commands to run the above
code will be in light blue boxes, and the +results in grey.

Hello Kiji

For this tutorial, we assume you are either using the
standalone Kiji BentoBox or have installed the individual
components described on the Kiji Get Started page: http://www.kiji.org/getstarted/.

If you don’t have a working environment yet, you can
install the standalone Kiji BentoBox in a few quick steps:

● Download archive: http://archive.kiji.org/tarballs/kiji­bento­chirashi­1.2.1­release.tar.gz

● Extract archive: 

tar xzf kiji­bento­*.tar.gz

● Start HBase: 

cd kiji­bento­chirashi; source bin/kiji­env.sh; bento start

If you already installed BentoBox, make sure you have
started it:

bento start

Compiling

If you have downloaded the standalone Kiji BentoBox,
the code for this tutorial is already compiled and located in the
${KIJI_HOME}/examples/music/ directory. Commands in this tutorial
will depend on this location:

export MUSIC_HOME=${KIJI_HOME}/examples/music

If you are not using the Kiji BentoBox, set MUSIC_HOME to
the path of your local kiji music repository.

Once you have done this, if you are using Kiji
BentoBox you can skip to “Set your environment variables” if you
want to get started playing with the example code. Otherwise,
follow these steps to compile it from source.

The source code for this tutorial can be found in
${MUSIC_HOME}. The source is included along with a Maven project.
To get started using Maven, consult Getting started With Maven or
the Apache Maven Homepage.

The following tools are required to compile this
project:

● Maven 3.x

● Java 6

To compile, run mvn package from ${MUSIC_HOME}. The
build artifacts (.jar files) will be placed in the
${MUSIC_HOME}/target/ directory. This tutorial assumes you are
using the pre­built jars included with the music recommendation
example under ${MUSIC_HOME}/lib/. If you wish to use jars of
example code that you have built, you should adjust the command
lines in this tutorial to use the jars in
${MUSIC_HOME}/target/.

Set your environment variables

After Bento starts, it will display ports you will
need to complete this tutorial. It will be useful to know the
address of the MapReduce JobTracker webapp (http://localhost:50030 by default)
while working through this tutorial.

It will be useful to define an environment variable
named KIJI that holds a Kiji URI to the Kiji instance we’ll use
during this tutorial.

export KIJI=kiji://.env/kiji_music

To work through this tutorial, various Kiji tools will
require that Avro data type definitions particular to the working
music recommendation example be on the classpath. You can add your
artifacts to the Kiji classpath by running: 

export LIBS_DIR=${MUSIC_HOME}/lib export KIJI_CLASSPATH="${LIBS_DIR}/*"

Install Kiji and Create Tables

Install your Kiji instance:

kiji install --kiji=${KIJI}

Create the Kiji music tables:

kiji-schema-shell --kiji=${KIJI} --file=${MUSIC_HOME}/music_schema.ddl

This command uses kiji­schema­shell to create the tables
using the KijiSchema DDL, which makes specifying table layouts
easy. See the KijiSchema DDL Shell reference for more information
on the KijiSchema DDL.

Upload Data to HDFS

Upload the data set to HDFS:

hadoop fs -mkdir kiji-mr-tutorial

hadoop fs -copyFromLocal ${MUSIC_HOME}/example_data/*.json kiji-mr-tutorial/

Upload the import descriptor for the bulk­importer in
the next section to HDFS: hadoop fs -copyFromLocal
${MUSIC_HOME}/import/song-plays-import-descriptor.json
kiji-mr-tutorial/

Here at Pandforify, we have about a millenium of data
collected about our users’ listening patterns. This would take a
huge amount of time to load into a Kiji table, when we used a
single machine to issue writes to our table, one row at a time.
Instead we will show you how to use MapReduce to efficiently import
such large amounts of data into Kiji.

Custom Bulk Importers

One of the ways to bulk import your data is to extend
KijiBulkImporter and override its produce() method to insert rows
in a distributed manner into the Kiji table. In the example below,
we use this method to populate the song metadata.

Input files contain JSON data representing song
metadata, with one song per line. Below is the whitespace­augmented
example of a single row in our input file song-metadata.json.

Listing 1

{



"song_id":"0",

"song_name":"song0",

"artist_name":"artist1",

"album_name":"album1",

"genre":"awesome",

"tempo":"140",

"duration":"180"

}

End

The SongMetadataBulkImporter class extends
KijiBulkImporter. It expects a text input format where the input
keys are the byte offsets of each line in the input file and the
input values are the lines of text described above.

In the produce() method of the class, extract the JSON
as follows:

// Parse JSON:

final JSONObject json = (JSONObject) parser.parse(line.toString());

// Extract JSON fields:

final String songId = json.get("song_id").toString();

Use an Avro record called SongMetaData described below:
Listing 2

record SongMetadata {

string song_name;

string artist_name;

string album_name;

string genre;

long tempo;

long duration;

}

End

Then build an Avro metadata record from the parsed JSON.

Listing 3

final SongMetadata song = SongMetadata.newBuilder()

.setSongName(songName)

.setAlbumName(albumName)

.setArtistName(artistName)

.setGenre(genre)

.setTempo(tempo)

.setDuration(duration)

.build();

End

We create an EntityId object in order to use the song
ID as the row key.

final EntityId eid = context.getEntityId(songId);

Finally, write this Avro record to a cell in our Kiji
table with the song ID as the row key.

context.put(eid, "info", "metadata", song);

As an aside, take care while using explicit timestamps
when writing to Kiji. You can read about common pitfalls of
timestamps in HBase on the Kiji blog for more details.

Running the Example

Run the bulk import tool by specifying
SongMetadataBulkImporter as the importer, the Kiji table +songs as
the output, and song-metadata.json as the input with the following
command:

Listing 4

kiji bulk-import 

--importer=org.kiji.examples.music.bulkimport.SongMetadataBulkImporter 

--lib=${LIBS_DIR} 

--output="format=kiji table=${KIJI}/songs nsplits=1" 

--input="format=text file=kiji-mr-tutorial/song-metadata.json"

End

When the MapReduce bulk import job runs, KijiMR will
warn you that jars are already added. This is normal and not a
cause for alarm. Once the MapReduce job actually starts, you will
receive periodic progress updates for the map and reduce phases of
the job. When the job completes, MapReduce will print a number of
metrics describing the results of the job. You can also examine the
output of your job at the JobTracker Web UI:

http://localhost:50030.

Verify

Verify that the songs table records were added
properly by executing:

kiji scan ${KIJI}/songs --max-rows=3

Here’s what the first three
entries should look like:

Listing 5

entity-id=['song-32'] [1366936225070] info:metadata

{"song_name": "song name-32", "artist_name": "artist-2", "album_name": "album-0", "genre": "genre1.0", "tempo": 120, "duration": 180}



entity-id=['song-49'] [1366936225102] info:metadata

{"song_name": "song name-49", "artist_name": "artist-3", "album_name": "album-1", "genre": "genre4.0", "tempo": 150, "duration": 180}



entity-id=['song-36'] [1366936225077] info:metadata

{"song_name": "song name-36", "artist_name": "artist-2", "album_name": "album-0", "genre": "genre1.0", "tempo": 90, "duration": 0}

End

Bulk importing using table import
descriptors

In the example below, we use an import descriptor to
bulk import our history of song plays from the song-plays.json into
the user table. This method of bulk import requires a table import
descriptor, which is a JSON file containing:

  • The table that is the destination of the
    import.

  • The table column families.

  • The name of the destination column.

  • The name of the source field to import
    from.

  • The source for the entity ID.

  • An optional timestamp to use instead of system
    timestamp.

  • The format version of the import
    descriptor.

The import descriptor used for the user table is shown
below:

We then use the pre­written JSONBulkImporter
which expects a JSON file. Each line in this file represents a
separate JSON object to be imported into a row. The JSON object is
described by an import descriptor such as the one above. Target
columns whose sources are not present in the JSON object are
skipped.

This descriptor parametrizes a special MapReduce job,
where every row of the input file is parsed, and inserted into the
users Kiji table. The value of user_id will be used as the row key
in the Kiji table, the timestamp will be retrieved from the
play_time field. The value of song_id will be extracted and
inserted into the info:track_plays column.

Running the example

Copy the descriptor file into HDFS.

hadoopfs-copyFromLocal

${MUSIC_HOME}/import/song-plays-import-descriptor.json

kiji-mr-tutoria

Run the bulk import tool by
specifying JSONBulkImporteras the importer, the Kiji table usersas
the output, andsong-plays.jsonas the input with the following
command: kijibulk-import

Listing 6

{

name : "users",

families : [ {

name : "info",

columns : [ {

name : "track_plays",

source : "song_id"

} ]

} ],

entityIdSource : "user_id",

overrideTimestampSource : "play_time",

version : "import-1.0"

}

End
Listing 7

-Dkiji.import.text.input.descriptor.path=kiji-mr-tutorial/song-plays-import-descriptor.js on

--importer=org.kiji.mapreduce.lib.bulkimport.JSONBulkImporter

--output="format=kijitable=${KIJI}/usersnsplits=1"

--input="format=textfile=kiji-mr-tutorial/song-plays.json"

--lib=${LIBS_DIR}

End

Verify

Verify that the usertable records were added properly
by executing: 

kijiscan${KIJI}/users--max-rows=3

Here’s what the first three entries should look like:

entity-id=['user-41'][1325762580000]info:track_plays

song-41

entity-id=['user-3'][1325751420000]info:track_plays

song-0

entity-id=['user-13'][1325750400000]info:track_plays

The ‘Hello World!’ of MapReduce

To quote Scalding Developers Hadoop is a distributed
system for counting words. Unfortunately, we here at Pandorify are
fresh out of words, but have the play history of bajillions of
users listening to bazillions of different songs.

This MapReduce job uses the listening history of our
users that we have stored in the “users” Kiji table to calculate
the total number of times each song has been played. The result of
this computation is written to a text file in HDFS.

SongPlayCounter

The SongPlayCounter is an example of a Gatherer. A
gatherer is essentially a mapper that gets input from a KijiTable.
SongPlayCounter proceeds through discrete stages:

  • Setup reusable resources.

  • Read all values from column:
    “info:track_plays”.

  • Process the data from “info:track_plays” and
    emit a key­value pair for each track ID 
    each time
    it occurs.

Initialize Resources

First, SongPlayCounter prepares any resources that may
be needed by the gatherer. In Hadoop, reusable objects are commonly
instantiated only once to protect against long garbage collection
pauses. This is particularly important with Kiji because long
garbage collection pauses can cause MR jobs to fail because various
underlying HBase resources timeout or cannot be found.

Since setup() is an overriden method, we call
super.setup() to ensure that all resources are initialized
properly. If you open resources in setup(), be sure to close or
release them in the corresponding cleanup() method.

publicvoidsetup(GathererContext<Text,LongWritable>context)throwsIOException{ super.setup(context);//Anytimeyouoverridesetup,callsuper.setup(context); m = new Text();

}

Read track play data from the
table

A gatherer takes input from a table, so it must
declare what data it will need. It does this in the form of a
KijiDataRequest, which is defined in getDataRequest(). For the song
count job, we want to request all songs that have been played, for
every user. In order to get all of the values written to the
“info:track_plays” column, we must specify that the maximum number
of versions

we want. The special constant that specifies that you
want all versions of data in a column is HConstants.ALL_VERSIONS.
Otherwise, we will only get the most recent

Listing 8

publicKijiDataRequestgetDataRequest(){

//Thismethodishowwespecifywhichcolumnsineachrowthegathereroperateson.

//Inthiscase,weneedallversionsoftheinfo:track_playscolumn.

finalKijiDataRequestBuilderbuilder=KijiDataRequest.builder();

builder.newColumnsDef()

.withMaxVersions(HConstants.ALL_VERSIONS)//Retrieveallversions.

.add("info","track_plays");

returnbuilder.build();

}



End

Process track play data into key­value pairs
for occurrences

Called once for each row in the Kiji table, gather()
retrieves all the values in the “info:track_plays” column, and for
each value, sets the Text object we are reusing to contain the
current value, writes the key­value pairs using
context.write(mText,ONE)and then clears the Text object before the
next call to gather.

Listing 9

publicvoidgather(KijiRowDatarow,GathererContext<Text,LongWritable>context)

throwsIOException{

//Thegathermethodoperatesononerowatatime. Foreachuser,weiterate

//alltheirtrackplaysandemitapairofthetrackIDandthenumber1.

NavigableMap<Long,CharSequence>trackPlays=row.getValues("info","track_plays");

for(CharSequencetrackId:trackPlays.values()){

mText.set(trackId.toString());

context.write(mText,ONE);

mText.clear();

} }

End

LongSumReducer

The key­value pairs emitted from the gatherer are
shuffled and sorted by the MapReduce

framework, so each call to the reducer is given a key
and an iterator of all values associated with a key. The
LongSumReducer calls reduce() for each key and sums all of the
associated values to produce a total play count for each song ID.
The LongSumReducer has three stages:

  • Setup reusable resources.

  • Sum the values associated with a key.

  • Output the key paired with the sum.

Because summing values is such a common MapReduce
operation, LongSumReducer is provided by the KijiMR library.

Initialize Resources

It is common practice to avoid instantiating new
objects in map or reduce methods as Hadoop developers have a
(perhaps now outdated) skepticism of garbage collection in the
JVM.

protectedvoidsetup(Contextcontext){

mValue=newLongWritab

}

Sum Values and Output
Total

Called once for each key, reduce() combines all of the
values associated with a key by adding then together and writing
the total for each key to the output collector.

Listing 10

publicvoidreduce(Kkey,Iterator<LongWritable>values,

Contextcontext)throwsIOException,InterruptedException{

longsum=0;

for(LongWritablevalue:values){

sum+=value.get();

}

End

TestSongPlayCounter

To verify that SongPlayCounter performs as expected,
SongPlayCounter’s test:

  • Creates and populates an in­memory Kiji
    instance.

  • Runs a MapReduce job with SongPlayCounter as the
    gatherer and

  • LongSumReducer as the reducer.

  • Verifies that the output is as
    expected.

Create an in­memory Kiji instance

The InstanceBuilder class provides methods for
populating a test Kiji instance. Once the test instance has been
defined, its build method is called, creating the in­memory
instance and table.

Listing 11

publicfinalvoidsetup()throwsException{

finalKijiTableLayoutlayout=

KijiTableLayout.createFromEffectiveJsonResource("/layout/users.json");

finalStringtableName=layout.getName();

newInstanceBuilder(getKiji())

.withTable(tableName,layout)

.withRow("user-1").withFamily("info").withQualifier("track_plays")

.withValue(1L,"song-1") .withValue(2L,"song-2") .withValue(3L,"song-3")

.withRow("user-2").withFamily("info").withQualifier("track_plays")

.withValue(1L,"song-1") .withValue(2L,"song-3") .withValue(3L,"song-4") .withValue(4L,"song-1")

.withRow("user-3").withFamily("info").withQualifier("track_plays")

.withValue(1L,"song-5")

.build();

}

End

Run and verify SongPlayCounter

KijiGatherJobBuilder is used to create a test
MapReduce job. This job builder can be used outside the context of
a test to configure and run jobs programmatically. The job is then
run using Hadoop’s local job runner. The resulting output sequence
file is then validated.

Listing 12

finalFileoutputDir=newFile(getLocalTempDir(),"output.sequence_file");

finalKijiMapReduceJobmrjob=KijiGatherJobBuilder.create()

.withConf(getConf())

.withGatherer(SongPlayCounter.class)

.withReducer(LongSumReducer.class)

.withInputTable(mTableURI)

//Note:thelocalmap/reducejobrunnerdoesnotallowmorethanonereducer:

.withOutput(newSequenceFileMapReduceJobOutput(newPath("file://"+outputDir),1))

.build();

readSequenceFile(newFile(outputDir,"part-r-00000"),counts);

Final<String, Long==Maps.newTreeMap();

LOG.info("Countsmap:{}",counts);

assertTrue(mrjob.run());

assertEquals(5,counts.size());

assertEquals(3L,(long)counts.get("song-1")); assertEquals(1L,(long)counts.get("song-2")); assertEquals(2L,(long)counts.get("song-3")); assertEquals(1L,(long)counts.get("song-4")); assertEquals(1L,(long)counts.get("song-5"));

End

Running the Example

Listing 13

kijigather

--gatherer=org.kiji.examples.music.gather.SongPlayCounter

--reducer=org.kiji.mapreduce.lib.reduce.LongSumReducer

--input="format=kijitable=${KIJI}/users"

--output="format=textfile=output.txt_filensplits=2"

--lib=${LIBS_DIR}

End

Verify

To confirm that the gather job worked, examine the
output using hadoop filesystem command line tools:

hadoopfs-textoutput.txt_file/part-r-00000|head-3

song-1 100

song-10272

song-12101

Instead of recommending the most
popular songs to everyone using Pandorify, we want to tailor our
recommendations based on user’s listening history. For every user,
we will look up the most recent song they have listened to and then
recommend the song most frequently played after it. In order to do
that, we need to create an index so that for each song, we can
quickly look up what the most popular songs to listen to afterwards
are.

So, we need to count the number of times two songs
have been played, one after another. The SequentialPlayCounter and
SequentialPlayCountReducer allow us to do that.

SequentialPlayCounter

SequentialPlayCounter operates in much the same way
that SongPlayCounter does, but it requires a more complex key
structure to store both the song played and the song that followed.
The easiest way work with complex keys in Kiji is to use Avro. We
define a SongBiGram, which will be our key, as a pair of songs
played sequentially by a single user.

Listing 14

/**Songplaybigram.*/

recordSongBiGram{

/**TheIDofthefirstsongplayedinasequence.*/

stringfirst_song_played;

/**TheIDofthesongplayedimmediatelyafterit.*/

stringsecond_song_played;

}

End

Whereas SongPlayCounter’s output value class was
Text.class, SequentialPlayCounter uses AvroKey.class which requires
that we also implement AvroKeyWriter and override
getAvroKeyWriterSchema() to fully define the Avro key format.
SequentialPlayCounter executes the same basic stages as
SongPlayCounter, but with a more complex gather operation.

Read track play data and compose complex keys
SequentialPlayCounter reads the same data as SongPlayCounter, but
maintains a “sliding window” of the most recent two track ids. For
each song after the first, gather() emits a key­value pair where
the key is a SongBiGram of the two most recently played songs, and
the value is one (1) as a tally.


Listing 15


/**{@inheritDoc}*/


@Override


publicvoidgather(KijiRowDatainput,GathererContext<AvroKey<SongBiGram>,


LongWritable>context)


throwsIOException{


//Hereweuseaslidingwindowtobuildbigramsthatrepresentpairsofsongsthat


//everbeenplayedonefollowinganother.


//ThevariablesfirstSongandsecondSongslidealongasweiteratethroughthetrack


CharSequencefirstSong=null;


CharSequencenextSong=null;


NavigableMap<Long,CharSequence>trackPlays=input.getValues("info","track_plays");


for(CharSequencetrackId:trackPlays.values()){//Iteratethroughthisuser's


trackplays.


//Slidethewindowonesongover.


firstSong=nextSong;


nextSong=trackId;


//IffirstSongisnull,weareatthebeginningofthelistandourslidingwindow


//onlycontainsonesong,sodon'toutputit.Otherwise...


if(null!=firstSong){


//Createthebigramofthesetwosongs.


mBiGram.setFirstSongPlayed(firstSong); mBiGram.setSecondSongPlayed(nextSong); //Emitthebigramofthesetwosongs.


context.write(newAvroKey<SongBiGram>(mBiGram),ONE);


} }


}


End

SequentialPlayCountReducer

This reducer takes in pairs of songs that have been
played sequentially and the number one. It then computes the number
of times those songs have been played together, and emits the ID of
the first song as the key, and aSongCount record representing the
song played after the first as the value. A SongCount record has a
field containing the ID of the subsequent song and a field for the
number of times it has been played after the initial song.

This reducer takes AvroKey as input, and writes
AvroKey and AvroValue as output, so it must implementAvroKeyReader,
AvroKeyWriter, and AvroValueWriter. The keys we are emitting are
just strings so we could use aText key. Instead, we made the choice
to use an AvroKey so that we could use the Kiji defined
AvroKeyValue output format, which requires that you output AvroKey
and AvroValue.

The schema for our Avro key is so simple that we don’t
have to add a record to our avdl file in order to return
the correct schema in getWriterSchema(). Instead, we can use the
static methods avro provides for creating schemas of primitive
types.

publicSchemagetAvroKeyWriterSchema()throwsIOException{

// Since we are writing AvroKeys, we need to specify the schema

returnSongBiGram.SCHEMA$;

}

Sum Sequential Plays

SequentialPlayCountReducer starts with the same
reduction operation that LongSumReducer used to count track plays
in the SongCount example, but diverges when emitting key­value
pairs. Instead of passing the keys through the reducer,
SequentialPlayCountReducer creates new keys based on the track IDs
in the SongBiGram keys. The new keys are simply the first track ID
from each bi­gram, while the second track ID becomes part of the
SongCountvalue.

Listing 16

protectedvoidreduce(AvroKey<SongBiGram>key,Iterable<LongWritable>values,Context context)

throwsIOException,InterruptedException{

//Initializesumtozero.

longsum=0L;

//Addupallthevalues. WhenthisreducerisusedafterSongPlayCounter,every

value

//shouldbethenumber1,sowearejustcountingthenumberoftimesthesecond

song

//wasplayedafterthefirst(thekey).

for(LongWritablevalue:values){

sum+=value.get();

}

//Setvaluesforthiscount.

finalSongBiGramsongPair=key.datum();

finalSongCountnextSongCount=SongCount.newBuilder()

.setCount(sum)

.setSongId(songPair.getSecondSongPlayed())

.build();

//Writeoutresultforthissong.

context.write(

newAvroKey<CharSequence>(songPair.getFirstSongPlayed().toString()),

newAvroValue<SongCount>(nextSongCount));

}

End

TestSequentialSongPlayCounter

To verify that SequentialPlayCounter and
SequentialPlayCountReducer function as expected, their test:

  • Creates and populates an in­memory Kiji
    instance

  • Runs a MapReduce job with SequentialPlayCounter
    as the gatherer and

  • SequentialPlayCountReducer as the
    reducer

  • Verifies that the output is as
    expected

Create an in­memory Kiji instance

The InstanceBuilder class provides methods for
populating a test Kiji instance. Once the test instance has been
defined, its build method is called, creating the in­memory
instance and table.

Listing 17

publicfinalvoidsetup()throwsException{

finalKijiTableLayoutuserLayout=

KijiTableLayout.createFromEffectiveJsonResource("/layout/users.json");

finalStringuserTableName=userLayout.getName();

mUserTableURI=

KijiURI.newBuilder(getKiji().getURI()).withTableName(userTableName).build();

new InstanceBuilder (getKiji())

.withTable(userTableName,userLayout)

.withRow("user-1").withFamily("info").withQualifier("track_plays")

.withValue(2L,"song-2") .withValue(3L,"song-1")

.withRow("user-2").withFamily("info").withQualifier("track_plays")

.withValue(2L,"song-3") .withValue(3L,"song-2") .withValue(4L,"song-1")

.withRow("user-3").withFamily("info").withQualifier("track_plays")

.withValue(1L,"song-5")

.build();

End

Run and verify SequentialPlayCounter and
SequentialPlayCountReducer

KijiGatherJobBuilder is used to create a test
MapReduce job. This job builder can be used outside the context of
a test to configure and run jobs programatically. The job is then
run using Hadoop’s local job runner. The resulting output sequence
file is then validated.

Listing 18

//Configureandrunjob.

finalFileoutputDir=newFile(getLocalTempDir(),"output.sequence_file");

finalPathpath=newPath("file://"+outputDir);

finalKijiMapReduceJobmrjob=KijiGatherJobBuilder.create()

.withConf(getConf())

.withGatherer(SequentialPlayCounter.class)

.withReducer(SequentialPlayCountReducer.class)

.withInputTable(mUserTableURI)

//Note:thelocalmap/reducejobrunnerdoesnotallowmorethanonereducer:

.withOutput(newAvroKeyValueMapReduceJobOutput(newPath("file://"+outputDir),1))

.build();

assertTrue(mrjob.run());

Reading back files is easy with normal file or table
readers, currently avrokv files can be read in a limited way using
a KeyValueStoreReader. 

AvroKVRecordKeyValueStore.BuilderkvStoreBuilder=AvroKVRecordKeyValueStore.builder()

.withInputPath(path).withConfiguration(getConf());

finalAvroKVRecordKeyValueStoreoutputKeyValueStore=kvStoreBuilder.build();

KeyValueStoreReaderreader=outputKeyValueStore.open();

//Checkthatourresultsarecorrect.

assertTrue(reader.containsKey("song-1"));

SongCountsong1Result=(SongCount)reader.get("song-1");

assertEquals(2L,song1Result.getCount().longValue());

//AvrostringsaredeserializedtoCharSequencesinJava,.toString()allowsjunitto

correctly

//comparetheexpectedandactualvalues.

assertEquals("song-2",song1Result.getSongId().toString());

assertTrue(reader.containsKey("song-2"));

SongCountsong2Result=(SongCount)reader.get("song-2");

assertEquals(1L,song2Result.getCount().longValue());

//AvrostringsaredeserializedtoCharSequencesinJava,.toString()allowsjunitto

//comparetheexpectedandactualvalues.

assertEquals("song-3",song2Result.getSongI

End

Running the Example

kijigather

--gatherer=org.kiji.examples.music.gather.SequentialPlayCounter

--reducer=org.kiji.examples.music.reduce.SequentialPlayCountReducer

--input="format=kijitable=${KIJI}/users"

--output="format=avrokvfile=output.sequentialPlayCountnsplits=2"

--lib=${LIBS_DIR}

Verify

Because this job outputs Avro key­value files, which
are binary and hard to read directly, we can use the Hadoop job
tracker to verify the success of the job. Using your favorite
browser, navigate to the JobTracker page (localhost:50030 by
default). This is where you can monitor all your Hadoop jobs.
Locate the Kiji gather:SequentialPlayCounter /
SequentialPlayCountReducer job and navigate to the job page by
clicking on the Job ID. On the job page, check that Map output
records number roughly 7000.

This MapReduce job processes the result of the
SequentialSong MapReduce job and writes a list of the top songs
played after each song (the key) to the corresponding row in the
songs table.

IdentityMapper.java

This is a stunning homage to Java boilerplate. This
mapper is the identity function; it just emits the same keys and
values as it receives without changing them.

TopNextSongsReducer.java

The keys passed into this reducer are song ids and the
values are SongCount records. In order to find the songs most
frequently played after a given song, we need to identify the
SongCount records with the largest number of counts, for every
key.

To do this efficiently, we will maintain an ordered
collection of SongCount records, that has a maximum size. As we
iterate through all the values, we will keep the top SongCount
records seen so far in our ordered collection.This reducer:

● Creates an ordered collection that will maintain a
list of the top SongCount records, for each key.

● Examines each value for a key, and maintains a
running list of the top SongCount records seen so far.

● Write a TopNextSongs record to the songs table.

Create an ordered Collection

In out setup method, we instantiate a TreeSet that
will be reused. TreeSets use their comparator (as opposed to a
class’ equals method) to determine if an element is already in the
set. In order for our TreeSet to contain multiple SongCount records
with the same count, we must make sure that our comparator
differentiates SongCount records with the same number of counts,
but with different song ids.

Listing 19

public void setup(Context context) throws IOException, InterruptedException {

super.setup(context); // Any time you override setup, call super.setup(context);

mTopSongs = new TopSongs();

// This TreeSet will keep track of the "largest" SongCount objects seen so far. Two

SongCount

// objects, song1 and song2, can be compared and the object with the largest value in the field

// count will the declared the largest object.

mTopNextSongs = new TreeSet<SongCount>(new Comparator<SongCount>() {

@Override

public int compare(SongCount song1, SongCount song2) {

if (song1.getCount().compareTo(song2.getCount()) == 0) {

return song1.getSongId().toString().compareTo(song2.getSongId().toString());

} else {

return song1.getCount().compareTo(song2.getCount());

}

};

}



End

Maintain a collection of the top SongCount
records

To find the top N songs, we iterate through the values
associated with a given key, adding that value to our set, and then
removing the smallest value if our set is larger than the number of
top SongCount records we want to find.

It is worth pointing out that when you call
value.datum(), the same SongCount record, with different fields,
will be returned. Many Hadoop projects reuse objects, so be aware!
To get around the problem that this creates with trying to use a
set, we create a new SongCount record for each value using
SongCount’s builder method.

Listing 20

protected void reduce(AvroKey<CharSequence> key, Iterable<AvroValue<SongCount>> values,

KijiTableContext context) throws IOException {

// We are reusing objects, so we should make sure they are cleared for each new key.

mTopNextSongs.clear();



// Iterate through the song counts and track the top ${mNumberOfTopSongs} counts.

for (AvroValue<SongCount> value : values) {

// Remove AvroValue wrapper.

SongCount currentSongCount = SongCount.newBuilder(value.datum()).build();



mTopNextSongs.add(currentSongCount);

// If we now have too many elements, remove the element with the smallest count.

if (mTopNextSongs.size() > mNumberOfTopSongs) {

mTopNextSongs.pollFirst();

}

}

// Set the field of mTopSongs to be a list of SongCounts corresponding to the top songs played

// next for this key/song.

mTopSongs.setTopSongs(Lists.newArrayList(mTopNextSongs));

End

Write TopNextSongs to the songs
table

We can write the list of top next songs to the
“info:top_next_songs” column using context.put(). The only thing to
remember with this method, is that the first arguement is expected
to be an entityId. Luckily, context also contains methods for
generating EntityIds.

...

// Write this to the song table.

context.put(context.getEntityId(key.datum().toString()), "info", "top_next_songs",

mTopSongs);

}

TestTopNextSongsPipeline.java

Two jobs are constructed during this test and run one after
another. The first job outputs to an intermediate Avro container file written to the
local file system which is used as input by the second job. Each of
the jobs is configured using a job builder:

Listing 21

// Configure and run job.

// Note: the local map/reduce job runner does not allow more than one reducer:

.withOutput(new AvroKeyValueMapReduceJobOutput(path, 1))

.build();

// Configure second job.

final MapReduceJobOutput tableOutput = new

DirectKijiTableMapReduceJobOutput(mSongTableURI, 1);

final KijiMapReduceJob mrjob2 = KijiMapReduceJobBuilder.create()

.withConf(getConf())

.withInput(new AvroKeyValueMapReduceJobInput(path))

.withMapper(IdentityMapper.class)

.withReducer(TopNextSongsReducer.class)

.withOutput(tableOutput).build();

// Run both jobs and confirm that they are successful.

assertTrue(mrjob1.run());

assertTrue(mrjob2.run());

End

The results of these two jobs end up being written to a Kiji
table. To validate the output data a KijiTableReader is used to
read the records in question.

Listing 22

mSongTable = getKiji().openTable(songTableName);

mSongTableReader = mSongTable.openTableReader();

// ...

KijiDataRequest request = KijiDataRequest.builder()

.addColumns(ColumnsDef.create()

build();

TopSongs valuesForSong1 = mSongTableReader.get(mSongTable.getEntityId("song-1"),

request)

.getMostRecentValue("info", "top_next_songs");

assertEquals("Wrong number of most popular songs played next for song-1", 3,

valuesForSong1.getTopSongs().size());

TopSongs valuesForSong2 = mSongTableReader.get(mSongTable.getEntityId("song-2"),

request)

.getMostRecentValue("info", "top_next_songs");

LOG.info("the list of song counts {}", valuesForSong2.getTopSongs().toString());

assertEquals("Wrong number of most popular songs played next for song-2", 2,

valuesForSong2.getTopSongs().size());

TopSongs valuesForSong8 = mSongTableReader.get(mSongTable.getEntityId("song-8"),

request)

.getMostRecentValue("info", "top_next_songs");

LOG.info("the list of song counts {}", valuesForSong2.getTopSongs().toString());

assertEquals("Wrong number of most popular songs played next for song-8", 1,

valuesForSong8.getTopSongs().size());

assertEquals("The onyl song played aftert song-8 is song-1.", "song-1",

valuesForSong8.getTopSongs().get(0).getSongId().toString());

.withMaxVersions(Integer.MAX_VALUE)

.add("info", "top_next_songs"))

End

Running the example

Listing 23

--reducer=org.kiji.examples.music.reduce.TopNextSongsReducer 

--input="format=avrokv file=output.sequentialPlayCount" 

--output="format=kiji table=${KIJI}/songs nsplits=1" 

--lib=${LIBS_DIR}

End

Verify

Since we write TopNextSongs back to the Kiji table, we
can use the Kiji command­line tools to inspect our Kiji tables.

Listing 24

kiji scan ${KIJI}/songs/info:top_next_songs --max-rows=3



entity-id=['song-32'] [1366938451355] info:top_next_songs

{"topSongs": [{"song_id": "song-39", "count": 9},

{"song_id": "song-30", "count": 18}, {"song_id": "song-31", "count": 19}]}



entity-id=['song-49'] [1366938451406] info:top_next_songs

{"topSongs": [{"song_id": "song-45", "count": 13},

{"song_id": "song-40", "count": 14}, {"song_id": "song-41", "count": 15}]}



entity-id=['song-10'] [1366938451272] info:top_next_songs

{"topSongs": [{"song_id": "song-18", "count": 21},

{"song_id": "song-11", "count": 44}, {"song_id": "song-10", "count": 49}]}

End

Now, to generate recommendations for each user, we
define a map­only MapReduce job that will process each row in the
user table and apply our recommendation strategy to it.

NextSongRecommender

The NextSongRecommender is an example of a
KijiProducer. A producer operates on a single row of input data and
generates new outputs that are written to the same row. It can also
refer to external sources of data via KeyValueStores in addition to
the input from the row. For every row this producer processes, it
will:

● Read the most recent value from the
“info:track_plays” column of the users table.

This is the song ID of the most recently played song
by the user.

● Look up a list of the songs most frequently played
next from the songs table.

● Use external data sources (in this case the list of
songs most frequently played next that we computed and wrote to the
“songs” table) to generate a recommendation for each user.

● Write that recommendation to the
“info:next_song_rec” column of the users table.

Get The Most Recent Song Played

Like in a gatherer, you specify the required columns
for your producer in the getDataRequest method. We only want the
most recent value from this column, so we can use the create()
convenience method.

public KijiDataRequest getDataRequest() {

// Only request the most recent version from the "info:track_plays" column.

return KijiDataRequest.create("info", "track_plays");

}

In our produce() method, we then access our requested
data through the KijiRowData:

String mostRecentSong = input.<CharSequence>getMostRecentValue("info", "track_plays")

toString(); //Avrostrings getdeserialized toCharSequences, so.toString() the result.

Join External Data Sources

KeyValueStores allow you to access external data
sources in a MapReduce job. This is a common pattern in MapReduce
jobs, as it allows us to integrate two sources of data. In this
case, we will use the “top_next_songs” column of our “songs” table
as a KeyValueStore. In order to access KeyValueStores in a KijiMR
Job, the class that needs the external data must implement
KeyValueStoreClient. This interface requires that you implement
getRequiredStores(). The value that you must return from
getRequiredStores is a map from the name of a KeyValueStore to the
default implementation.

For reasons pertaining to KijiMR­91 we leave our
default implementation unconfigured.

public Map<String, KeyValueStore<?, ?>> getRequiredStores() {

return RequiredStores.just("nextPlayed",

UnconfiguredKeyValueStore.builder().build());

}

This default implementation must be overriden when this producer
is run. In the unit test, it is programmatically overriden using a
job builder. When we run it from the command line, we will override
the default implementation using the KVStoreConfig.xml file

Generate a Recommendation

To generate a recommendation from the list of songs
that are most likely to be played next, we do the simplest thing
possible; choose the first element of the list.

private CharSequence recommend(List<SongCount> topNextSongs) {

return topNextSongs.get(0).getSongId(); // Do the simplest possible thing.

}

Write the Output to a Column

To write our recommendation to the table, we need to
declare what column we are writing to.

public String getOutputColumn() {

return "info:next_song_rec";

}

Since the column is already declared, to write a value
to it, we simply call context.put() with the value we want to write
as the parameter.

context.put(recommend(popularNextSongs));

Test NextSongRecommender

To test NextSongRecommender, we need specify which
KijiTable we want to use to back our KeyValueStore. We do this by
constructing the KeyValueStore we want to use, via the
KeyValueStore’s builder method. We then override the KeyValueStore
binding in this job configuration by using the withStore() method
of JobBuilders.

KijiTableKeyValueStore.Builder kvStoreBuilder = KijiTableKeyValueStore.builder();

kvStoreBuilder.withColumn("info", "top_next_songs").withTable(mSongTableURI);

Running the example

When we run this example, we again need to specify
which KijiTable we want to use to back our KeyValueStore. This
time, we will override the KeyValueStore binding from the command
line using an XML configuration file (located at
{KIJI_HOME}/examples/music/KVStoreConfig.xml). The contents of the
file are displayed below. If you are not using BentoBox, you may
need to modify this XML file so that the URI points to the songs
table you would like to use.

Listing 25

<?xml version="1.0" encoding="UTF-8"?>

<stores>

 <store name="nextPlayed" class="org.kiji.mapreduce.kvstore.lib.KijiTableKeyValueStore">

    <configuration>

       <property>

         <name>table.uri</name>

         <!-- This URI can be replace with the URI of a different 'songs' table to use.

-->

       <value>kiji://.env/kiji_music/songs</value>

</property>

<property>

 <name>column</name>

 <value>info:top_next_songs</value>

</property>

</configuration>

</store>

</stores>

End

Now, run the command:

Listing 26

kiji produce 

--producer=org.kiji.examples.music.produce.NextSongRecommender 

--input="format=kiji table=${KIJI}/users" 

--output="format=kiji table=${KIJI}/users nsplits=2" 

--lib=${LIBS_DIR} 

--kvstores=${MUSIC_HOME}/KVStoreConfig.xml



End

The input and output for the producer come from the
Kiji table “users”, and the KeyValueStores are specified by the
KVStoreConfig.xml file.

Verify

kiji scan ${KIJI}/users/info:next_song_rec --max-rows=3

These are our recommendations for the next song to play for each
user!

Listing 27

entity-id='user-41'[1361564713968]info:next_song_re

song-41




entity-id='user-3'[1361564713980]info:next_s

song-2




entity-id='user-13'[1361564713990]info:next_song_rec

song-27

End

Now that you have run through the tutorial, you
have:

  • Efficiently imported data into a KijiTable in Bulk
    Importing.

  • Manipulated data in and between Kiji and HDFS in
    PlayCount.

  • Use a gatherer to generate the next­song pairs, and
    use a generic MapReduce job to aggregate them into counts and
    SequentialPlayCount

  • Filtered the next song counts by popularity and wrote
    a list of the most popular to a Kiji table in NextTopSong.

  • Used a producer and joined together data sources, to
    generate the recommendations for our users in Music Recommendation
    Producer.

Moreover, now that you understand how the mechanics
work, you can begin to improve on the incredibly simple
recommendation algorithm we have implemented. It is worth noting
that we have put thought into our data generator, and you should be
able to leverage patterns in what the users listen to.

Have a look at the recommendations generated by the
music recommendation producer. There are a few problems to notice
here:

  • Users can get recommended songs that they just
    listened to. You can improve the logic of the recommend() method in
    the producer to avoid recommending recently played songs.

  • We only recommend songs that have been played
    immeadiately after eachother. Our idea of songs that are similar
    enough to recommend is too narrow. In order to broaden
    our definition, you can:

  • Instead of looking back one song play, you can
    increase the “play radius” by incorporating songs that have been
    played several song ahead of the song being analysed.

  • Implement item­item collaborative filtering,
    perhaps using the Mahout implementation.

  • Cluster users together and use that information
    to enhance recommendations.

  • Once you have implemented another recommendation
    strategy, you can combine multiple startegies in the recommendation
    producer by using multiple key­value stores and modifying the
    recommend method.

Now that you have gotten your feet wet, you should
join our user group mailing list. It’s a great place to ask
questions and talk about all the cool apps you are building with
Kiji.

Aaron Kimball, Founder & Chief
Architect

Aaron founded WibiData in 2010. He has worked with
Hadoop since 2007 and is a committer on the Apache Hadoop project.
In addition, Aaron founded the Apache Sqoop data import tool and
Apache MRUnit Hadoop testing library projects. Previously, he was
the first engineer hired by Cloudera, the leading provider of
Apache Hadoop-based software and services. Aaron holds a B.S. in
Computer Science from Cornell University and a M.S. in Computer
Science from the University of Washington.

Lee Shang, Member of the Technical
Staff

Lee joined WibiData in 2012. He holds a BS in
Computer Science from Carnegie Mellon University. Previously he
worked on developing systems for making strategic buying decisions
at Amazon.com. Before that, he worked on distributed simulation
frameworks for the Department of Defense.

 


 


Author

Aaron Kimball and Lee Shang

All Posts by Aaron Kimball and Lee Shang

Comments
comments powered by Disqus