RT,在Hive中,使用了Group By后,是无法再sort,再取Top K的,我们可以用UDF + distributed by + sort by 实现这个功能。
参考自:EXTRACT TOP N RECORDS IN EACH GROUP IN HADOOP/HIVE
Assume you have a table with three columns: user, category and value. For each user, you want to select top N categories. To achieve this in hive, you can use the following query:
add jar Rank.jar; create temporary function rank as 'com.example.hive.udf.Rank'; SELECT * FROM ( SELECT *, rank(user) as row_number FROM ( SELECT user, category, value FROM $compTable WHERE user is NOT NULL AND AND ctr > 0 DISTRIBUTE BY user SORT BY user, value desc ) A ) B WHERE row_number < 5 ORDER BY user, row_number
备注1:这里使用了distribute by 替代 order by,因为后者需要写UDAF。而Sort by user后,相同的user连续在了一起,可以用这个特性,在UDF中,等价的实现Group By功能:-)
备注2:注意,这里要双层SELECT,才能选 row_number < 5
In the above query, I am using a custom rank function. The overall approach is as follows:
- divide the data by user (distribute by user)
- Sort each group by user and value (sort by user, value desc)
- Within each group, assign rank order to each record. This is achieved by custom rank function. The rank function keeps track of last user key and simply increments the counter. As soon as it sees a new user, it reset counter to zero. Since the data is already sorted by user and is in descending order of value, we know for sure that all records related to a single user will be sent to the same node and they will be grouped together and also sorted by value.
- Pick top 5 categories (where rank < 5). Note since our index starts with 0, we only need to categories from 0 to 4.
Below is the custom rank function:
package com.example.hive.udf; import org.apache.hadoop.hive.ql.exec.UDF; public final class Rank extends UDF{ private int counter; private String last_key; public int evaluate(final String key){ if ( !key.equalsIgnoreCase(this.last_key) ) { this.counter = 0; this.last_key = key; } return this.counter++; } }