Perfect your playlist

Let Kiji be your DJ

It’s the year 3000, and new music is being created at such a fast pace that your company Pandorify can no longer analyze and categorize music fast enough to recommend it to your users. Fortunately, technology has kept up! With KijiSchema and KijiMR, you can leverage all the song play data you have collected over the last thousand years to make recommendations for your users.

We believe that past patterns are a good source for future recommendations. To impose structure on the data, we will use a layout in KijiSchema. To effectively use this gigantic flood of data, we can use MapReduce paradigms provided by KijiMR to analyze and provide recommendations.

In this tutorial, we demonstrate how to use KijiMR to leverage your data effectively. You will:

● Efficiently import data into a KijiTable.

● Manipulate data in and between Kiji and HDFS.

● Use a gatherer to generate the next­song pairs, and use a generic MapReduce job to aggregate them into counts.

● Use a producer and join together data sources, to generate the recommendations for our users.

How to Use this Tutorial

● Links to Javadoc ­ Class names link to the relevant Javadoc: EntityId.

● Code Walkthrough ­ Code snippets are in gray boxes with language specific syntax highlighting.

System.out.println("Hello Kiji");

● Shell Commands ­ Shell commands to run the above code will be in light blue boxes, and the +results in grey.

Hello Kiji

For this tutorial, we assume you are either using the standalone Kiji BentoBox or have installed the individual components described on the Kiji Get Started page: http://www.kiji.org/getstarted/.

If you don't have a working environment yet, you can install the standalone Kiji BentoBox in a few quick steps:

● Download archive: http://archive.kiji.org/tarballs/kiji­bento­chirashi­1.2.1­release.tar.gz

● Extract archive: 

tar xzf kiji­bento­*.tar.gz

● Start HBase: 

cd kiji­bento­chirashi; source bin/kiji­env.sh; bento start

If you already installed BentoBox, make sure you have started it:

bento start

Compiling

If you have downloaded the standalone Kiji BentoBox, the code for this tutorial is already compiled and located in the ${KIJI_HOME}/examples/music/ directory. Commands in this tutorial will depend on this location:

export MUSIC_HOME=${KIJI_HOME}/examples/music

If you are not using the Kiji BentoBox, set MUSIC_HOME to the path of your local kiji music repository.

Once you have done this, if you are using Kiji BentoBox you can skip to “Set your environment variables” if you want to get started playing with the example code. Otherwise, follow these steps to compile it from source.

The source code for this tutorial can be found in ${MUSIC_HOME}. The source is included along with a Maven project. To get started using Maven, consult Getting started With Maven or the Apache Maven Homepage.

The following tools are required to compile this project:

● Maven 3.x

● Java 6

To compile, run mvn package from ${MUSIC_HOME}. The build artifacts (.jar files) will be placed in the ${MUSIC_HOME}/target/ directory. This tutorial assumes you are using the pre­built jars included with the music recommendation example under ${MUSIC_HOME}/lib/. If you wish to use jars of example code that you have built, you should adjust the command lines in this tutorial to use the jars in ${MUSIC_HOME}/target/.

Set your environment variables

After Bento starts, it will display ports you will need to complete this tutorial. It will be useful to know the address of the MapReduce JobTracker webapp (http://localhost:50030 by default) while working through this tutorial.

It will be useful to define an environment variable named KIJI that holds a Kiji URI to the Kiji instance we’ll use during this tutorial.

export KIJI=kiji://.env/kiji_music

To work through this tutorial, various Kiji tools will require that Avro data type definitions particular to the working music recommendation example be on the classpath. You can add your artifacts to the Kiji classpath by running: 

export LIBS_DIR=${MUSIC_HOME}/lib export KIJI_CLASSPATH="${LIBS_DIR}/*"

Install Kiji and Create Tables

Install your Kiji instance:

kiji install --kiji=${KIJI}

Create the Kiji music tables:

kiji-schema-shell --kiji=${KIJI} --file=${MUSIC_HOME}/music_schema.ddl

This command uses kiji­schema­shell to create the tables using the KijiSchema DDL, which makes specifying table layouts easy. See the KijiSchema DDL Shell reference for more information on the KijiSchema DDL.

Upload Data to HDFS

Upload the data set to HDFS:

hadoop fs -mkdir kiji-mr-tutorial

hadoop fs -copyFromLocal ${MUSIC_HOME}/example_data/*.json kiji-mr-tutorial/

Upload the import descriptor for the bulk­importer in the next section to HDFS: hadoop fs -copyFromLocal ${MUSIC_HOME}/import/song-plays-import-descriptor.json kiji-mr-tutorial/

Here at Pandforify, we have about a millenium of data collected about our users’ listening patterns. This would take a huge amount of time to load into a Kiji table, when we used a single machine to issue writes to our table, one row at a time. Instead we will show you how to use MapReduce to efficiently import such large amounts of data into Kiji.

Custom Bulk Importers

One of the ways to bulk import your data is to extend KijiBulkImporter and override its produce() method to insert rows in a distributed manner into the Kiji table. In the example below, we use this method to populate the song metadata.

Input files contain JSON data representing song metadata, with one song per line. Below is the whitespace­augmented example of a single row in our input file song-metadata.json.

Listing 1

{



"song_id":"0",

"song_name":"song0",

"artist_name":"artist1",

"album_name":"album1",

"genre":"awesome",

"tempo":"140",

"duration":"180"

}

End

The SongMetadataBulkImporter class extends KijiBulkImporter. It expects a text input format where the input keys are the byte offsets of each line in the input file and the input values are the lines of text described above.

In the produce() method of the class, extract the JSON as follows:

// Parse JSON:

final JSONObject json = (JSONObject) parser.parse(line.toString());

// Extract JSON fields:

final String songId = json.get("song_id").toString();

Use an Avro record called SongMetaData described below:
Listing 2

record SongMetadata {

string song_name;

string artist_name;

string album_name;

string genre;

long tempo;

long duration;

}

End

Then build an Avro metadata record from the parsed JSON.

Listing 3

final SongMetadata song = SongMetadata.newBuilder()

.setSongName(songName)

.setAlbumName(albumName)

.setArtistName(artistName)

.setGenre(genre)

.setTempo(tempo)

.setDuration(duration)

.build();

End

We create an EntityId object in order to use the song ID as the row key.

final EntityId eid = context.getEntityId(songId);

Finally, write this Avro record to a cell in our Kiji table with the song ID as the row key.

context.put(eid, "info", "metadata", song);

As an aside, take care while using explicit timestamps when writing to Kiji. You can read about common pitfalls of timestamps in HBase on the Kiji blog for more details.

Running the Example

Run the bulk import tool by specifying SongMetadataBulkImporter as the importer, the Kiji table +songs as the output, and song-metadata.json as the input with the following command:

Listing 4

kiji bulk-import \

--importer=org.kiji.examples.music.bulkimport.SongMetadataBulkImporter \

--lib=${LIBS_DIR} \

--output="format=kiji table=${KIJI}/songs nsplits=1" \

--input="format=text file=kiji-mr-tutorial/song-metadata.json"

End

When the MapReduce bulk import job runs, KijiMR will warn you that jars are already added. This is normal and not a cause for alarm. Once the MapReduce job actually starts, you will receive periodic progress updates for the map and reduce phases of the job. When the job completes, MapReduce will print a number of metrics describing the results of the job. You can also examine the output of your job at the JobTracker Web UI: http://localhost:50030.

Verify

Verify that the songs table records were added properly by executing:

kiji scan ${KIJI}/songs --max-rows=3

Here’s what the first three entries should look like:

Listing 5

entity-id=['song-32'] [1366936225070] info:metadata

{"song_name": "song name-32", "artist_name": "artist-2", "album_name": "album-0", "genre": "genre1.0", "tempo": 120, "duration": 180}



entity-id=['song-49'] [1366936225102] info:metadata

{"song_name": "song name-49", "artist_name": "artist-3", "album_name": "album-1", "genre": "genre4.0", "tempo": 150, "duration": 180}



entity-id=['song-36'] [1366936225077] info:metadata

{"song_name": "song name-36", "artist_name": "artist-2", "album_name": "album-0", "genre": "genre1.0", "tempo": 90, "duration": 0}

End

Bulk importing using table import descriptors

In the example below, we use an import descriptor to bulk import our history of song plays from the song-plays.json into the user table. This method of bulk import requires a table import descriptor, which is a JSON file containing:

  • The table that is the destination of the import.

  • The table column families.

  • The name of the destination column.

  • The name of the source field to import from.

  • The source for the entity ID.

  • An optional timestamp to use instead of system timestamp.

  • The format version of the import descriptor.

The import descriptor used for the user table is shown below:

We then use the pre­written JSONBulkImporter which expects a JSON file. Each line in this file represents a separate JSON object to be imported into a row. The JSON object is described by an import descriptor such as the one above. Target columns whose sources are not present in the JSON object are skipped.

This descriptor parametrizes a special MapReduce job, where every row of the input file is parsed, and inserted into the users Kiji table. The value of user_id will be used as the row key in the Kiji table, the timestamp will be retrieved from the play_time field. The value of song_id will be extracted and inserted into the info:track_plays column.

Running the example

Copy the descriptor file into HDFS.

hadoopfs-copyFromLocal\

${MUSIC_HOME}/import/song-plays-import-descriptor.json\

kiji-mr-tutoria

Run the bulk import tool by specifying JSONBulkImporteras the importer, the Kiji table usersas the output, andsong-plays.jsonas the input with the following command: kijibulk-import\

Listing 6

{

name : "users",

families : [ {

name : "info",

columns : [ {

name : "track_plays",

source : "song_id"

} ]

} ],

entityIdSource : "user_id",

overrideTimestampSource : "play_time",

version : "import-1.0"

}

End
Listing 7

-Dkiji.import.text.input.descriptor.path=kiji-mr-tutorial/song-plays-import-descriptor.js on\

--importer=org.kiji.mapreduce.lib.bulkimport.JSONBulkImporter\

--output="format=kijitable=${KIJI}/usersnsplits=1"\

--input="format=textfile=kiji-mr-tutorial/song-plays.json"\

--lib=${LIBS_DIR}

End

Verify

Verify that the usertable records were added properly by executing: 

kijiscan${KIJI}/users--max-rows=3

Here’s what the first three entries should look like:

entity-id=['user-41'][1325762580000]info:track_plays

song-41

entity-id=['user-3'][1325751420000]info:track_plays

song-0

entity-id=['user-13'][1325750400000]info:track_plays

The ‘Hello World!’ of MapReduce

To quote Scalding Developers Hadoop is a distributed system for counting words. Unfortunately, we here at Pandorify are fresh out of words, but have the play history of bajillions of users listening to bazillions of different songs.

This MapReduce job uses the listening history of our users that we have stored in the “users” Kiji table to calculate the total number of times each song has been played. The result of this computation is written to a text file in HDFS.

SongPlayCounter

The SongPlayCounter is an example of a Gatherer. A gatherer is essentially a mapper that gets input from a KijiTable. SongPlayCounter proceeds through discrete stages:

  • Setup reusable resources.

  • Read all values from column: “info:track_plays”.

  • Process the data from “info:track_plays” and emit a key­value pair for each track ID each time it occurs.

Initialize Resources

First, SongPlayCounter prepares any resources that may be needed by the gatherer. In Hadoop, reusable objects are commonly instantiated only once to protect against long garbage collection pauses. This is particularly important with Kiji because long garbage collection pauses can cause MR jobs to fail because various underlying HBase resources timeout or cannot be found.

Since setup() is an overriden method, we call super.setup() to ensure that all resources are initialized properly. If you open resources in setup(), be sure to close or release them in the corresponding cleanup() method.

publicvoidsetup(GathererContext<Text,LongWritable>context)throwsIOException{ super.setup(context);//Anytimeyouoverridesetup,callsuper.setup(context); m = new Text();

}

Read track play data from the table

A gatherer takes input from a table, so it must declare what data it will need. It does this in the form of a KijiDataRequest, which is defined in getDataRequest(). For the song count job, we want to request all songs that have been played, for every user. In order to get all of the values written to the “info:track_plays” column, we must specify that the maximum number of versions

we want. The special constant that specifies that you want all versions of data in a column is HConstants.ALL_VERSIONS. Otherwise, we will only get the most recent

Listing 8

publicKijiDataRequestgetDataRequest(){

//Thismethodishowwespecifywhichcolumnsineachrowthegathereroperateson.

//Inthiscase,weneedallversionsoftheinfo:track_playscolumn.

finalKijiDataRequestBuilderbuilder=KijiDataRequest.builder();

builder.newColumnsDef()

.withMaxVersions(HConstants.ALL_VERSIONS)//Retrieveallversions.

.add("info","track_plays");

returnbuilder.build();

}



End

Process track play data into key­value pairs for occurrences

Called once for each row in the Kiji table, gather() retrieves all the values in the “info:track_plays” column, and for each value, sets the Text object we are reusing to contain the current value, writes the key­value pairs using context.write(mText,ONE)and then clears the Text object before the next call to gather.

Listing 9

publicvoidgather(KijiRowDatarow,GathererContext<Text,LongWritable>context)

throwsIOException{

//Thegathermethodoperatesononerowatatime. Foreachuser,weiterate

//alltheirtrackplaysandemitapairofthetrackIDandthenumber1.

NavigableMap<Long,CharSequence>trackPlays=row.getValues("info","track_plays");

for(CharSequencetrackId:trackPlays.values()){

mText.set(trackId.toString());

context.write(mText,ONE);

mText.clear();

} }

End

LongSumReducer

The key­value pairs emitted from the gatherer are shuffled and sorted by the MapReduce

framework, so each call to the reducer is given a key and an iterator of all values associated with a key. The LongSumReducer calls reduce() for each key and sums all of the associated values to produce a total play count for each song ID. The LongSumReducer has three stages:

  • Setup reusable resources.

  • Sum the values associated with a key.

  • Output the key paired with the sum.

Because summing values is such a common MapReduce operation, LongSumReducer is provided by the KijiMR library.

Initialize Resources

It is common practice to avoid instantiating new objects in map or reduce methods as Hadoop developers have a (perhaps now outdated) skepticism of garbage collection in the JVM.

protectedvoidsetup(Contextcontext){

mValue=newLongWritab

}

Sum Values and Output Total

Called once for each key, reduce() combines all of the values associated with a key by adding then together and writing the total for each key to the output collector.

Listing 10

publicvoidreduce(Kkey,Iterator<LongWritable>values,

Contextcontext)throwsIOException,InterruptedException{

longsum=0;

for(LongWritablevalue:values){

sum+=value.get();

}

End

TestSongPlayCounter

To verify that SongPlayCounter performs as expected, SongPlayCounter’s test:

  • Creates and populates an in­memory Kiji instance.

  • Runs a MapReduce job with SongPlayCounter as the gatherer and

  • LongSumReducer as the reducer.

  • Verifies that the output is as expected.

Create an in­memory Kiji instance

The InstanceBuilder class provides methods for populating a test Kiji instance. Once the test instance has been defined, its build method is called, creating the in­memory instance and table.

Listing 11

publicfinalvoidsetup()throwsException{

finalKijiTableLayoutlayout=

KijiTableLayout.createFromEffectiveJsonResource("/layout/users.json");

finalStringtableName=layout.getName();

newInstanceBuilder(getKiji())

.withTable(tableName,layout)

.withRow("user-1").withFamily("info").withQualifier("track_plays")

.withValue(1L,"song-1") .withValue(2L,"song-2") .withValue(3L,"song-3")

.withRow("user-2").withFamily("info").withQualifier("track_plays")

.withValue(1L,"song-1") .withValue(2L,"song-3") .withValue(3L,"song-4") .withValue(4L,"song-1")

.withRow("user-3").withFamily("info").withQualifier("track_plays")

.withValue(1L,"song-5")

.build();

}

End

Run and verify SongPlayCounter

KijiGatherJobBuilder is used to create a test MapReduce job. This job builder can be used outside the context of a test to configure and run jobs programmatically. The job is then run using Hadoop’s local job runner. The resulting output sequence file is then validated.

Listing 12

finalFileoutputDir=newFile(getLocalTempDir(),"output.sequence_file");

finalKijiMapReduceJobmrjob=KijiGatherJobBuilder.create()

.withConf(getConf())

.withGatherer(SongPlayCounter.class)

.withReducer(LongSumReducer.class)

.withInputTable(mTableURI)

//Note:thelocalmap/reducejobrunnerdoesnotallowmorethanonereducer:

.withOutput(newSequenceFileMapReduceJobOutput(newPath("file://"+outputDir),1))

.build();

readSequenceFile(newFile(outputDir,"part-r-00000"),counts);

Final<String, Long==Maps.newTreeMap();

LOG.info("Countsmap:{}",counts);

assertTrue(mrjob.run());

assertEquals(5,counts.size());

assertEquals(3L,(long)counts.get("song-1")); assertEquals(1L,(long)counts.get("song-2")); assertEquals(2L,(long)counts.get("song-3")); assertEquals(1L,(long)counts.get("song-4")); assertEquals(1L,(long)counts.get("song-5"));

End

Running the Example

Listing 13

kijigather\

--gatherer=org.kiji.examples.music.gather.SongPlayCounter\

--reducer=org.kiji.mapreduce.lib.reduce.LongSumReducer\

--input="format=kijitable=${KIJI}/users"\

--output="format=textfile=output.txt_filensplits=2"\

--lib=${LIBS_DIR}

End

Verify

To confirm that the gather job worked, examine the output using hadoop filesystem command line tools:

hadoopfs-textoutput.txt_file/part-r-00000|head-3

song-1 100

song-10272

song-12101

Instead of recommending the most popular songs to everyone using Pandorify, we want to tailor our recommendations based on user’s listening history. For every user, we will look up the most recent song they have listened to and then recommend the song most frequently played after it. In order to do that, we need to create an index so that for each song, we can quickly look up what the most popular songs to listen to afterwards are.

So, we need to count the number of times two songs have been played, one after another. The SequentialPlayCounter and SequentialPlayCountReducer allow us to do that.

SequentialPlayCounter

SequentialPlayCounter operates in much the same way that SongPlayCounter does, but it requires a more complex key structure to store both the song played and the song that followed. The easiest way work with complex keys in Kiji is to use Avro. We define a SongBiGram, which will be our key, as a pair of songs played sequentially by a single user.

Listing 14

/**Songplaybigram.*/

recordSongBiGram{

/**TheIDofthefirstsongplayedinasequence.*/

stringfirst_song_played;

/**TheIDofthesongplayedimmediatelyafterit.*/

stringsecond_song_played;

}

End

Whereas SongPlayCounter’s output value class was Text.class, SequentialPlayCounter uses AvroKey.class which requires that we also implement AvroKeyWriter and override getAvroKeyWriterSchema() to fully define the Avro key format. SequentialPlayCounter executes the same basic stages as SongPlayCounter, but with a more complex gather operation.

Read track play data and compose complex keys SequentialPlayCounter reads the same data as SongPlayCounter, but maintains a “sliding window” of the most recent two track ids. For each song after the first, gather() emits a key­value pair where the key is a SongBiGram of the two most recently played songs, and the value is one (1) as a tally.


Listing 15


/**{@inheritDoc}*/


@Override


publicvoidgather(KijiRowDatainput,GathererContext<AvroKey<SongBiGram>,


LongWritable>context)


throwsIOException{


//Hereweuseaslidingwindowtobuildbigramsthatrepresentpairsofsongsthat


//everbeenplayedonefollowinganother.


//ThevariablesfirstSongandsecondSongslidealongasweiteratethroughthetrack


CharSequencefirstSong=null;


CharSequencenextSong=null;


NavigableMap<Long,CharSequence>trackPlays=input.getValues("info","track_plays");


for(CharSequencetrackId:trackPlays.values()){//Iteratethroughthisuser's


trackplays.


//Slidethewindowonesongover.


firstSong=nextSong;


nextSong=trackId;


//IffirstSongisnull,weareatthebeginningofthelistandourslidingwindow


//onlycontainsonesong,sodon'toutputit.Otherwise...


if(null!=firstSong){


//Createthebigramofthesetwosongs.


mBiGram.setFirstSongPlayed(firstSong); mBiGram.setSecondSongPlayed(nextSong); //Emitthebigramofthesetwosongs.


context.write(newAvroKey<SongBiGram>(mBiGram),ONE);


} }


}


End

SequentialPlayCountReducer

This reducer takes in pairs of songs that have been played sequentially and the number one. It then computes the number of times those songs have been played together, and emits the ID of the first song as the key, and aSongCount record representing the song played after the first as the value. A SongCount record has a field containing the ID of the subsequent song and a field for the number of times it has been played after the initial song.

This reducer takes AvroKey as input, and writes AvroKey and AvroValue as output, so it must implementAvroKeyReader, AvroKeyWriter, and AvroValueWriter. The keys we are emitting are just strings so we could use aText key. Instead, we made the choice to use an AvroKey so that we could use the Kiji defined AvroKeyValue output format, which requires that you output AvroKey and AvroValue.

The schema for our Avro key is so simple that we don’t have to add a record to our avdl file in order to return the correct schema in getWriterSchema(). Instead, we can use the static methods avro provides for creating schemas of primitive types.

publicSchemagetAvroKeyWriterSchema()throwsIOException{

// Since we are writing AvroKeys, we need to specify the schema

returnSongBiGram.SCHEMA$;

}

Sum Sequential Plays

SequentialPlayCountReducer starts with the same reduction operation that LongSumReducer used to count track plays in the SongCount example, but diverges when emitting key­value pairs. Instead of passing the keys through the reducer, SequentialPlayCountReducer creates new keys based on the track IDs in the SongBiGram keys. The new keys are simply the first track ID from each bi­gram, while the second track ID becomes part of the SongCountvalue.

Listing 16

protectedvoidreduce(AvroKey<SongBiGram>key,Iterable<LongWritable>values,Context context)

throwsIOException,InterruptedException{

//Initializesumtozero.

longsum=0L;

//Addupallthevalues. WhenthisreducerisusedafterSongPlayCounter,every

value

//shouldbethenumber1,sowearejustcountingthenumberoftimesthesecond

song

//wasplayedafterthefirst(thekey).

for(LongWritablevalue:values){

sum+=value.get();

}

//Setvaluesforthiscount.

finalSongBiGramsongPair=key.datum();

finalSongCountnextSongCount=SongCount.newBuilder()

.setCount(sum)

.setSongId(songPair.getSecondSongPlayed())

.build();

//Writeoutresultforthissong.

context.write(

newAvroKey<CharSequence>(songPair.getFirstSongPlayed().toString()),

newAvroValue<SongCount>(nextSongCount));

}

End

TestSequentialSongPlayCounter

To verify that SequentialPlayCounter and SequentialPlayCountReducer function as expected, their test:

  • Creates and populates an in­memory Kiji instance

  • Runs a MapReduce job with SequentialPlayCounter as the gatherer and

  • SequentialPlayCountReducer as the reducer

  • Verifies that the output is as expected

Create an in­memory Kiji instance

The InstanceBuilder class provides methods for populating a test Kiji instance. Once the test instance has been defined, its build method is called, creating the in­memory instance and table.

Listing 17

publicfinalvoidsetup()throwsException{

finalKijiTableLayoutuserLayout=

KijiTableLayout.createFromEffectiveJsonResource("/layout/users.json");

finalStringuserTableName=userLayout.getName();

mUserTableURI=

KijiURI.newBuilder(getKiji().getURI()).withTableName(userTableName).build();

new InstanceBuilder (getKiji())

.withTable(userTableName,userLayout)

.withRow("user-1").withFamily("info").withQualifier("track_plays")

.withValue(2L,"song-2") .withValue(3L,"song-1")

.withRow("user-2").withFamily("info").withQualifier("track_plays")

.withValue(2L,"song-3") .withValue(3L,"song-2") .withValue(4L,"song-1")

.withRow("user-3").withFamily("info").withQualifier("track_plays")

.withValue(1L,"song-5")

.build();

End

Run and verify SequentialPlayCounter and SequentialPlayCountReducer

KijiGatherJobBuilder is used to create a test MapReduce job. This job builder can be used outside the context of a test to configure and run jobs programatically. The job is then run using Hadoop’s local job runner. The resulting output sequence file is then validated.

Listing 18

//Configureandrunjob.

finalFileoutputDir=newFile(getLocalTempDir(),"output.sequence_file");

finalPathpath=newPath("file://"+outputDir);

finalKijiMapReduceJobmrjob=KijiGatherJobBuilder.create()

.withConf(getConf())

.withGatherer(SequentialPlayCounter.class)

.withReducer(SequentialPlayCountReducer.class)

.withInputTable(mUserTableURI)

//Note:thelocalmap/reducejobrunnerdoesnotallowmorethanonereducer:

.withOutput(newAvroKeyValueMapReduceJobOutput(newPath("file://"+outputDir),1))

.build();

assertTrue(mrjob.run());

Reading back files is easy with normal file or table readers, currently avrokv files can be read in a limited way using a KeyValueStoreReader. 

AvroKVRecordKeyValueStore.BuilderkvStoreBuilder=AvroKVRecordKeyValueStore.builder()

.withInputPath(path).withConfiguration(getConf());

finalAvroKVRecordKeyValueStoreoutputKeyValueStore=kvStoreBuilder.build();

KeyValueStoreReaderreader=outputKeyValueStore.open();

//Checkthatourresultsarecorrect.

assertTrue(reader.containsKey("song-1"));

SongCountsong1Result=(SongCount)reader.get("song-1");

assertEquals(2L,song1Result.getCount().longValue());

//AvrostringsaredeserializedtoCharSequencesinJava,.toString()allowsjunitto

correctly

//comparetheexpectedandactualvalues.

assertEquals("song-2",song1Result.getSongId().toString());

assertTrue(reader.containsKey("song-2"));

SongCountsong2Result=(SongCount)reader.get("song-2");

assertEquals(1L,song2Result.getCount().longValue());

//AvrostringsaredeserializedtoCharSequencesinJava,.toString()allowsjunitto

//comparetheexpectedandactualvalues.

assertEquals("song-3",song2Result.getSongI

End

Running the Example

kijigather\

--gatherer=org.kiji.examples.music.gather.SequentialPlayCounter\

--reducer=org.kiji.examples.music.reduce.SequentialPlayCountReducer\

--input="format=kijitable=${KIJI}/users"\

--output="format=avrokvfile=output.sequentialPlayCountnsplits=2"\

--lib=${LIBS_DIR}

Verify

Because this job outputs Avro key­value files, which are binary and hard to read directly, we can use the Hadoop job tracker to verify the success of the job. Using your favorite browser, navigate to the JobTracker page (localhost:50030 by default). This is where you can monitor all your Hadoop jobs. Locate the Kiji gather:SequentialPlayCounter / SequentialPlayCountReducer job and navigate to the job page by clicking on the Job ID. On the job page, check that Map output records number roughly 7000.

This MapReduce job processes the result of the SequentialSong MapReduce job and writes a list of the top songs played after each song (the key) to the corresponding row in the songs table.

IdentityMapper.java

This is a stunning homage to Java boilerplate. This mapper is the identity function; it just emits the same keys and values as it receives without changing them.

TopNextSongsReducer.java

The keys passed into this reducer are song ids and the values are SongCount records. In order to find the songs most frequently played after a given song, we need to identify the SongCount records with the largest number of counts, for every key.

To do this efficiently, we will maintain an ordered collection of SongCount records, that has a maximum size. As we iterate through all the values, we will keep the top SongCount records seen so far in our ordered collection.This reducer:

● Creates an ordered collection that will maintain a list of the top SongCount records, for each key.

● Examines each value for a key, and maintains a running list of the top SongCount records seen so far.

● Write a TopNextSongs record to the songs table.

Create an ordered Collection

In out setup method, we instantiate a TreeSet that will be reused. TreeSets use their comparator (as opposed to a class’ equals method) to determine if an element is already in the set. In order for our TreeSet to contain multiple SongCount records with the same count, we must make sure that our comparator differentiates SongCount records with the same number of counts, but with different song ids.

Listing 19

public void setup(Context context) throws IOException, InterruptedException {

super.setup(context); // Any time you override setup, call super.setup(context);

mTopSongs = new TopSongs();

// This TreeSet will keep track of the "largest" SongCount objects seen so far. Two

SongCount

// objects, song1 and song2, can be compared and the object with the largest value in the field

// count will the declared the largest object.

mTopNextSongs = new TreeSet<SongCount>(new Comparator<SongCount>() {

@Override

public int compare(SongCount song1, SongCount song2) {

if (song1.getCount().compareTo(song2.getCount()) == 0) {

return song1.getSongId().toString().compareTo(song2.getSongId().toString());

} else {

return song1.getCount().compareTo(song2.getCount());

}

};

}



End

Maintain a collection of the top SongCount records

To find the top N songs, we iterate through the values associated with a given key, adding that value to our set, and then removing the smallest value if our set is larger than the number of top SongCount records we want to find.

It is worth pointing out that when you call value.datum(), the same SongCount record, with different fields, will be returned. Many Hadoop projects reuse objects, so be aware! To get around the problem that this creates with trying to use a set, we create a new SongCount record for each value using SongCount’s builder method.

Listing 20

protected void reduce(AvroKey<CharSequence> key, Iterable<AvroValue<SongCount>> values,

KijiTableContext context) throws IOException {

// We are reusing objects, so we should make sure they are cleared for each new key.

mTopNextSongs.clear();



// Iterate through the song counts and track the top ${mNumberOfTopSongs} counts.

for (AvroValue<SongCount> value : values) {

// Remove AvroValue wrapper.

SongCount currentSongCount = SongCount.newBuilder(value.datum()).build();



mTopNextSongs.add(currentSongCount);

// If we now have too many elements, remove the element with the smallest count.

if (mTopNextSongs.size() > mNumberOfTopSongs) {

mTopNextSongs.pollFirst();

}

}

// Set the field of mTopSongs to be a list of SongCounts corresponding to the top songs played

// next for this key/song.

mTopSongs.setTopSongs(Lists.newArrayList(mTopNextSongs));

End

Write TopNextSongs to the songs table

We can write the list of top next songs to the “info:top_next_songs” column using context.put(). The only thing to remember with this method, is that the first arguement is expected to be an entityId. Luckily, context also contains methods for generating EntityIds.

...

// Write this to the song table.

context.put(context.getEntityId(key.datum().toString()), "info", "top_next_songs",

mTopSongs);

}

TestTopNextSongsPipeline.java

Two jobs are constructed during this test and run one after another. The first job outputs to an intermediate Avro container file written to the local file system which is used as input by the second job. Each of the jobs is configured using a job builder:

Listing 21

// Configure and run job.

// Note: the local map/reduce job runner does not allow more than one reducer:

.withOutput(new AvroKeyValueMapReduceJobOutput(path, 1))

.build();

// Configure second job.

final MapReduceJobOutput tableOutput = new

DirectKijiTableMapReduceJobOutput(mSongTableURI, 1);

final KijiMapReduceJob mrjob2 = KijiMapReduceJobBuilder.create()

.withConf(getConf())

.withInput(new AvroKeyValueMapReduceJobInput(path))

.withMapper(IdentityMapper.class)

.withReducer(TopNextSongsReducer.class)

.withOutput(tableOutput).build();

// Run both jobs and confirm that they are successful.

assertTrue(mrjob1.run());

assertTrue(mrjob2.run());

End

The results of these two jobs end up being written to a Kiji table. To validate the output data a KijiTableReader is used to read the records in question.

Listing 22

mSongTable = getKiji().openTable(songTableName);

mSongTableReader = mSongTable.openTableReader();

// ...

KijiDataRequest request = KijiDataRequest.builder()

.addColumns(ColumnsDef.create()

build();

TopSongs valuesForSong1 = mSongTableReader.get(mSongTable.getEntityId("song-1"),

request)

.getMostRecentValue("info", "top_next_songs");

assertEquals("Wrong number of most popular songs played next for song-1", 3,

valuesForSong1.getTopSongs().size());

TopSongs valuesForSong2 = mSongTableReader.get(mSongTable.getEntityId("song-2"),

request)

.getMostRecentValue("info", "top_next_songs");

LOG.info("the list of song counts {}", valuesForSong2.getTopSongs().toString());

assertEquals("Wrong number of most popular songs played next for song-2", 2,

valuesForSong2.getTopSongs().size());

TopSongs valuesForSong8 = mSongTableReader.get(mSongTable.getEntityId("song-8"),

request)

.getMostRecentValue("info", "top_next_songs");

LOG.info("the list of song counts {}", valuesForSong2.getTopSongs().toString());

assertEquals("Wrong number of most popular songs played next for song-8", 1,

valuesForSong8.getTopSongs().size());

assertEquals("The onyl song played aftert song-8 is song-1.", "song-1",

valuesForSong8.getTopSongs().get(0).getSongId().toString());

.withMaxVersions(Integer.MAX_VALUE)

.add("info", "top_next_songs"))

End

Running the example

Listing 23

--reducer=org.kiji.examples.music.reduce.TopNextSongsReducer \

--input="format=avrokv file=output.sequentialPlayCount" \

--output="format=kiji table=${KIJI}/songs nsplits=1" \

--lib=${LIBS_DIR}

End

Verify

Since we write TopNextSongs back to the Kiji table, we can use the Kiji command­line tools to inspect our Kiji tables.

Listing 24

kiji scan ${KIJI}/songs/info:top_next_songs --max-rows=3



entity-id=['song-32'] [1366938451355] info:top_next_songs

{"topSongs": [{"song_id": "song-39", "count": 9},

{"song_id": "song-30", "count": 18}, {"song_id": "song-31", "count": 19}]}



entity-id=['song-49'] [1366938451406] info:top_next_songs

{"topSongs": [{"song_id": "song-45", "count": 13},

{"song_id": "song-40", "count": 14}, {"song_id": "song-41", "count": 15}]}



entity-id=['song-10'] [1366938451272] info:top_next_songs

{"topSongs": [{"song_id": "song-18", "count": 21},

{"song_id": "song-11", "count": 44}, {"song_id": "song-10", "count": 49}]}

End

Now, to generate recommendations for each user, we define a map­only MapReduce job that will process each row in the user table and apply our recommendation strategy to it.

NextSongRecommender

The NextSongRecommender is an example of a KijiProducer. A producer operates on a single row of input data and generates new outputs that are written to the same row. It can also refer to external sources of data via KeyValueStores in addition to the input from the row. For every row this producer processes, it will:

● Read the most recent value from the “info:track_plays” column of the users table.

This is the song ID of the most recently played song by the user.

● Look up a list of the songs most frequently played next from the songs table.

● Use external data sources (in this case the list of songs most frequently played next that we computed and wrote to the “songs” table) to generate a recommendation for each user.

● Write that recommendation to the “info:next_song_rec” column of the users table.

Get The Most Recent Song Played

Like in a gatherer, you specify the required columns for your producer in the getDataRequest method. We only want the most recent value from this column, so we can use the create() convenience method.

public KijiDataRequest getDataRequest() {

// Only request the most recent version from the "info:track_plays" column.

return KijiDataRequest.create("info", "track_plays");

}

In our produce() method, we then access our requested data through the KijiRowData:

String mostRecentSong = input.<CharSequence>getMostRecentValue("info", "track_plays")

toString(); //Avrostrings getdeserialized toCharSequences, so.toString() the result.

Join External Data Sources

KeyValueStores allow you to access external data sources in a MapReduce job. This is a common pattern in MapReduce jobs, as it allows us to integrate two sources of data. In this case, we will use the “top_next_songs” column of our “songs” table as a KeyValueStore. In order to access KeyValueStores in a KijiMR Job, the class that needs the external data must implement KeyValueStoreClient. This interface requires that you implement getRequiredStores(). The value that you must return from getRequiredStores is a map from the name of a KeyValueStore to the default implementation.

For reasons pertaining to KijiMR­91 we leave our default implementation unconfigured.

public Map<String, KeyValueStore<?, ?>> getRequiredStores() {

return RequiredStores.just("nextPlayed",

UnconfiguredKeyValueStore.builder().build());

}

This default implementation must be overriden when this producer is run. In the unit test, it is programmatically overriden using a job builder. When we run it from the command line, we will override the default implementation using the KVStoreConfig.xml file

Generate a Recommendation

To generate a recommendation from the list of songs that are most likely to be played next, we do the simplest thing possible; choose the first element of the list.

private CharSequence recommend(List<SongCount> topNextSongs) {

return topNextSongs.get(0).getSongId(); // Do the simplest possible thing.

}

Write the Output to a Column

To write our recommendation to the table, we need to declare what column we are writing to.

public String getOutputColumn() {

return "info:next_song_rec";

}

Since the column is already declared, to write a value to it, we simply call context.put() with the value we want to write as the parameter.

context.put(recommend(popularNextSongs));

Test NextSongRecommender

To test NextSongRecommender, we need specify which KijiTable we want to use to back our KeyValueStore. We do this by constructing the KeyValueStore we want to use, via the KeyValueStore’s builder method. We then override the KeyValueStore binding in this job configuration by using the withStore() method of JobBuilders.

KijiTableKeyValueStore.Builder kvStoreBuilder = KijiTableKeyValueStore.builder();

kvStoreBuilder.withColumn("info", "top_next_songs").withTable(mSongTableURI);

Running the example

When we run this example, we again need to specify which KijiTable we want to use to back our KeyValueStore. This time, we will override the KeyValueStore binding from the command line using an XML configuration file (located at {KIJI_HOME}/examples/music/KVStoreConfig.xml). The contents of the file are displayed below. If you are not using BentoBox, you may need to modify this XML file so that the URI points to the songs table you would like to use.

Listing 25

<?xml version="1.0" encoding="UTF-8"?>

<stores>

 <store name="nextPlayed" class="org.kiji.mapreduce.kvstore.lib.KijiTableKeyValueStore">

    <configuration>

       <property>

         <name>table.uri</name>

         <!-- This URI can be replace with the URI of a different 'songs' table to use.

-->

       <value>kiji://.env/kiji_music/songs</value>

</property>

<property>

 <name>column</name>

 <value>info:top_next_songs</value>

</property>

</configuration>

</store>

</stores>

End

Now, run the command:

Listing 26

kiji produce \

--producer=org.kiji.examples.music.produce.NextSongRecommender \

--input="format=kiji table=${KIJI}/users" \

--output="format=kiji table=${KIJI}/users nsplits=2" \

--lib=${LIBS_DIR} \

--kvstores=${MUSIC_HOME}/KVStoreConfig.xml



End

The input and output for the producer come from the Kiji table “users”, and the KeyValueStores are specified by the KVStoreConfig.xml file.

Verify

kiji scan ${KIJI}/users/info:next_song_rec --max-rows=3

These are our recommendations for the next song to play for each user!

Listing 27

entity-id='user-41'[1361564713968]info:next_song_re

song-41




entity-id='user-3'[1361564713980]info:next_s

song-2




entity-id='user-13'[1361564713990]info:next_song_rec

song-27

End

Now that you have run through the tutorial, you have:

  • Efficiently imported data into a KijiTable in Bulk Importing.

  • Manipulated data in and between Kiji and HDFS in PlayCount.

  • Use a gatherer to generate the next­song pairs, and use a generic MapReduce job to aggregate them into counts and SequentialPlayCount

  • Filtered the next song counts by popularity and wrote a list of the most popular to a Kiji table in NextTopSong.

  • Used a producer and joined together data sources, to generate the recommendations for our users in Music Recommendation Producer.

Moreover, now that you understand how the mechanics work, you can begin to improve on the incredibly simple recommendation algorithm we have implemented. It is worth noting that we have put thought into our data generator, and you should be able to leverage patterns in what the users listen to.

Have a look at the recommendations generated by the music recommendation producer. There are a few problems to notice here:

  • Users can get recommended songs that they just listened to. You can improve the logic of the recommend() method in the producer to avoid recommending recently played songs.

  • We only recommend songs that have been played immeadiately after eachother. Our idea of songs that are similar enough to recommend is too narrow. In order to broaden our definition, you can:

  • Instead of looking back one song play, you can increase the “play radius” by incorporating songs that have been played several song ahead of the song being analysed.

  • Implement item­item collaborative filtering, perhaps using the Mahout implementation.

  • Cluster users together and use that information to enhance recommendations.

  • Once you have implemented another recommendation strategy, you can combine multiple startegies in the recommendation producer by using multiple key­value stores and modifying the recommend method.

Now that you have gotten your feet wet, you should join our user group mailing list. It's a great place to ask questions and talk about all the cool apps you are building with Kiji.

Aaron Kimball, Founder & Chief Architect

Aaron founded WibiData in 2010. He has worked with Hadoop since 2007 and is a committer on the Apache Hadoop project. In addition, Aaron founded the Apache Sqoop data import tool and Apache MRUnit Hadoop testing library projects. Previously, he was the first engineer hired by Cloudera, the leading provider of Apache Hadoop-based software and services. Aaron holds a B.S. in Computer Science from Cornell University and a M.S. in Computer Science from the University of Washington.

Lee Shang, Member of the Technical Staff

Lee joined WibiData in 2012. He holds a BS in Computer Science from Carnegie Mellon University. Previously he worked on developing systems for making strategic buying decisions at Amazon.com. Before that, he worked on distributed simulation frameworks for the Department of Defense.





 


 


Aaron Kimball & Lee Shang

What do you think?

JAX Magazine - 2014 - 06 Exclucively for iPad users JAX Magazine on Android

Comments

Latest opinions