{scrollbar} |
How to use a custom partitioner in Pentaho MapReduce. |
In order to follow along with this how-to guide you will need the following:
The sample data file needed for this guide is:
File Name |
Content |
Parsed, raw weblog data |
Note: If you have already completed the Using Pentaho MapReduce to Parse Weblog Data guide the data should already be in the correct spot.
Add the file to your cluster by running the following:
hadoop fs -mkdir /user/pdi/weblogs hadoop fs -mkdir /user/pdi/weblogs/parse hadoop fs -put weblogs_parse.txt /user/pdi/weblogs/parse/part-00000 |
This guide expands upon the Using Pentaho MapReduce to Generate an Aggregate Dataset guide. If you have completed this guide you should already have the necessary code, otherwise download aggregate_mapper.ktr, aggregate_reducer.ktr, and aggregate_mr.kjb.
Start Hadoop if it is not already running.
In this task you will create a Java partitioner that takes a key in the format client_ip tab year tab month and partition on the year.
You can download CustomPartitioner.jar containing the partitioner if you don't want to do every step |
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Partitioner; public class YearPartitioner implements Partitioner<Text, LongWritable> { public void configure(JobConf job) {} public int getPartition(Text key, LongWritable value, int numReduceTasks) { String sKey = key.toString(); String[] splits=sKey.split("\t"); //Split the key on tab int year = Integer.parseInt(splits[1]); //The year is the second field return year % numReduceTasks; //Return the year mod number of reduce tasks as the partitioner number to send the record to. } } |
javac -classpath ${HADOOP_HOME}/hadoop-core.jar YearPartitioner.java |
jar cvf CustomPartitioner.jar YearPartitioner.class |
In this task you will deploy the custom partitioner to the cluster so it may be used in the Distributed Cache.
hadoop fs -mkdir /distcache |
hadoop fs -put CustomPartitioner.jar /distcache |
In this task you will configure the aggregate_mr.kjb job to use the custom partitioner.
You can download the already completed aggregate_mr_partition.kjb if you do not want to do every step |
Name |
Value |
Explanation |
mapred.cache.files |
/distcache/CustomPartitioner.jar |
Adds the Custom Partitioner to the distributed cache for the job. |
mapred.job.classpath.files |
/distcache/CustomPartitioner.jar |
Adds the Custom Partitioner from the distributed cache to the java classpath for the job. |
mapred.partitioner.class |
YearPartitioner |
Tells the job to use the YearPartitioner class. |
hadoop fs -cat /user/pdi/aggregate_mr/part-00000 |
hadoop fs -cat /user/pdi/aggregate_mr/part-00001 | head -10 |
hadoop fs -cat /user/pdi/aggregate_mr/part-00002 | head -10 |