关于在hadoop中,如何让reduce阶段同一个key下的values有序,一篇很好的文章,写的比《Hadoop权威指南》清楚!
转载自:
http://www.bigdataspeak.com/2013/02/hadoop-how-to-do-secondary-sort-on_25.html
The problem at hand here is that you need to work upon a sorted values set in your reducer.
Inputs were of the following form:
udid,datetime,<other-details>
Now, say we have following inputs:
udid1,1970-01-11 23:00:00,<...>
udid1,1970-01-01 23:00:00,<...>
udid1,1970-01-21 23:00:00,<...>
So, when udids are made to be key in the mappers, you would expect following input group in reducer (ignoring the rest of the input details other than datetime for clarity):
<udid1, { 1970-01-11 23:00:00, 1970-01-01 23:00:00,1970-01-21 23:00:00 } >
But we needed to sort the records as per datetime before we might process them. Hence, instead of the above, following is what was required (note that dates are sorted now) :
<udid1, { 1970-01-01 23:00:00, 1970-01-11 23:00:00,1970-01-21 23:00:00 } >
We had a very tough time sorting all the values based on a datetime in the reducer as this needed to be done in-memory and was a great performance killer. Sometimes due to skewed data, i.e. a lot of records for a particular udid, we used to get into Out-Of-Memory issues.
But, not anymore. 🙂
What we did?
We made hadoop to do this job for us. It isn't very simple and straight forward but I will try to make as much sense as possible.
Hadoop doesn't sort on values. Period. So, we will have to trick it. For this we will have to make our secondary sort column to be a part of the key of the mapper as well as have to leave it in the value also. Hence, a sample output from mapper would something like:
< [udid1,1970-01-11 23:00:00] , [1970-01-11 23:00:00,<other-details>] >
Following four things need to be created/customized:
- Map output key class (I have named it CompositeKey)
- Partitioner class (ActualKeyPartitioner)
- Grouping comparator class (ActualKeyGroupingComparator)
- Comparator class (CompositeComparator)
The CompositeKey class is more or less self-explanatory, it would be something like following class:
2013.8.12备注:Key的compareTo方法,应该不是必须的,可以不重写!
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableUtils; /** * This key is a composite key. The "actual" * key is the UDID. The secondary sort will be performed against the datetime. */ public class CompositeKey implements WritableComparable { private String udid; private String datetime; public CompositeKey() { } public CompositeKey(String udid, String datetime) { this.udid = udid; this.datetime = datetime; } @Override public String toString() { return (new StringBuilder()).append(udid).append(',').append(datetime).toString(); } @Override public void readFields(DataInput in) throws IOException { udid = WritableUtils.readString(in); datetime = WritableUtils.readString(in); } @Override public void write(DataOutput out) throws IOException { WritableUtils.writeString(out, udid); WritableUtils.writeString(out, datetime); } @Override public int compareTo(CompositeKey o) { int result = udid.compareTo(o.udid); if (0 == result) { result = datetime.compareTo(o.datetime); } return result; } /** * Gets the udid. * * @return UDID. */ public String getUDID() { return udid; } public void setUDID(String udid) { this.udid = udid; } /** * Gets the datetime. * * @return Datetime */ public String getDatetime() { return datetime; } public void setDatetime(String datetime) { this.datetime = datetime; } }
Secondly, we need to implement our own partitioner. The reason we need to do so is that now we are emitting both udid and datetime as the key and the defaut partitioner (HashPartitioner) would then not be able to ensure that all the records related to a certain udid comes to the same reducer (partition). Hence, we need to make the partitioner to only consider the actual key part (udid) while deciding on the partition for the record. This can be done as follows:
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class ActualKeyPartitioner extends Partitioner<CompositeKey, Text> { HashPartitioner<Text, Text> hashPartitioner = new HashPartitioner<Text, Text>(); Text newKey = new Text(); @Override public int getPartition(CompositeKey key, Text value, int numReduceTasks) { try { // Execute the default partitioner over the first part of the key newKey.set(key.getUDID()); return hashPartitioner.getPartition(newKey, value, numReduceTasks); } catch (Exception e) { e.printStackTrace(); return (int) (Math.random() * numReduceTasks); // this would return a random value in the range // [0,numReduceTasks) } } }
Now, the partitioner only makes sure that the all records related to the same udid comes to a particular reducer, but it doesn't guarantee that all of them will come in the same input group (i.e. in a single reduce() call as the list of values). In order to make sure of this, we will need to implement our own grouping comparator. We shall do similar thing as we did for the partitioner, i.e. only look at the actual key (udid) for grouping of reducer inputs. This can be done as follows:
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class ActualKeyGroupingComparator extends WritableComparator { protected ActualKeyGroupingComparator() { super(CompositeKey.class, true); } @SuppressWarnings("rawtypes") @Override public int compare(WritableComparable w1, WritableComparable w2) { CompositeKey key1 = (CompositeKey) w1; CompositeKey key2 = (CompositeKey) w2; // (check on udid) return key1.getUDID().compareTo(key2.getUDID()); } }
The final thing left is the secondary sorting over datetime field. To achieve this we shall just create our own comparator which first checks for the equality of udids and only if they are equal goes on to check for the datetime field. This shall be implemented as follows and is pretty much self-explanatory:
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class CompositeKeyComparator extends WritableComparator { protected CompositeKeyComparator() { super(CompositeKey.class, true); } @SuppressWarnings("rawtypes") @Override public int compare(WritableComparable w1, WritableComparable w2) { CompositeKey key1 = (CompositeKey) w1; CompositeKey key2 = (CompositeKey) w2; // (first check on udid) int compare = key1.getUDID().compareTo(key2.getUDID()); if (compare == 0) { // only if we are in the same input group should we try and sort by value (datetime) return key1.getDatetime().compareTo(key2.getDatetime()); } return compare; } }
After creating all these classes, you just need to set this up in your run() as follows:
job.setMapOutputKeyClass(CompositeKey.class); job.setPartitionerClass(ActualKeyPartitioner.class); job.setGroupingComparatorClass(ActualKeyGroupingComparator.class); job.setSortComparatorClass(CompositeKeyComparator.class);
You are done. You shall get pre-sorted values(sorted over datetime) for each udid in your reduce method.