Writing Hive Custom Aggregate Functions (UDAF)

转载自:《Writing Hive Custom Aggregate Functions (UDAF): Part II》

Now that we got eclipse configured (see Part I) for UDAF development, its time to write our first UDAF. Searching for custom UDAF, most people might have already came across the following page, GenericUDAFCaseStudy.This page has a good explanation of how to write a good generic UDAF but as a newbie it can be daunting, especially if you haven’t dealt with hive API before. However, as show below, there is another much more simpler way to write a UDAF by directly extending the UDAF Class.

UDAF Class Overview
A UDAF class has two main expects:

  1. Evaluator: It is the main logic that does the actual aggregation. For example if we are writing a UDAF to calculate average, evaluator will be responsible for writing the logic on how to handle incoming streams of data and calculate average. Evaluators implement UDAFEvaluator interface.
  2. Resolver: It provides the link between the hive and evaluator. It is responsible to return the right kind of evaluator depending on the input parameters. For instance let say we have two different evaluators to calculate sum: one for integer and another for double. Depending on the input type, resolver will decide whether to return the evaluator associated with the integer type or double type. However if you are extending UDAF class then you don’t need to worry about writing Resolver. In particular, hive (in particular GenericUDAFBridge class) will use reflection to determine which evaluator to use.

UDAFEvaluator Class Overview
Before we dive down into the details of writing a UDAFEvaluator, let’s first conceptually see how a Evaluator works. Conceptually, writing a evaluator is same as writing a python reducer. For instance, lets assume we have a housing data as below and we are interested in calculating average home price per zipcode. On the mapper side we extract zipcode and price per house. On the reducer side we stream record sorted by zipcode and as shown in the code below we need two variables in order to calculate average price per zipcode. First is the “total_value” to keep track of sum of prices and “cnt” to keep track of number of houses observed per zipcode. We reset these variables whenever we observe a new zipcode.

import os
import sys
 
#initialize a variable to keep track of last observed zipcode
last_zipcode = None
total_value = 0
cnt = 0
 
#Stream through input records
#Records will be sorted by zipcode
for line in sys.stdin:
    '''Iterate through all the input records'''
 
    #extract zipcode and price from input record
    zipcode, price = line.strip().split("\t")
    price = int(price)
 
    #if its a new zipcode
    #then price average price for the last observed zipcode
    if zipcode != last_zipcode:
        if last_zipcode:
            print "{0}\t{1}".format(last_zipcode, total_value/float(cnt))
 
        #update last observed zipcode
        last_zipcode = zipcode
        #reset variables related to calculating average
        total_value = 0
        cnt = 0
 
    #increment total_value and number of items observed
    total_value += price
    cnt += 1
 
#handle last observed zipcode
if last_zipcode:
    print "{0}\t{1}".format(last_zipcode, total_value/float(cnt))

 

There are three aspects of the above python code that directly translates to the three functions of an Evaluator class:

  1. public void init(): Lines 6,7,27 and 28 in the above code are responsible for resetting variables required to calculate average home price. Similarly when extending UDAFEvaluator class, we need to implement an init method that is responsible for resetting variables so that we can handle a new group. Whenever hive observes a new group key, it will call the init method so to make sure that all the variables are reinitialized.
  2. public boolean iterate([list of arguments]): lines 31 and 32 are responsible for handling an input record and increment relevant variables. Similarly, hive calls iterate function of the UDAFEvaluator class and pass all the variables from the SQL query as arguments to the iterate method. The iterate method is then responsible for increment the variables.
  3. public [RETURN TYPE] terminate(): In the above code, lines 21,22, and 23 are responsible for checking if the key has changed and if so print the zipcode and average home price. However in hive, the terminate function is little different. It is only responsible for the returning the aggregate value i.e. average in the above example. After the last record of a group has been sent to the UDAFEvaluator, hive will call the terminate function and grab its output.

Now there are two other functions that we need to implement in order to get our UDAFEvaluator working, namely

  1. public [INTERMEDIATE RETURN TYPE] terminatePartial() and
  2. public void merge([INTERMEDIATE RETURN TYPE]).

These two functions are more for optimization that enables map side aggregation (or usage of combiner). Let’s see how it will work in the above example. The main bottleneck in hadoop is transferring data from mapper to reducer. terminatePartial and merge address this by doing partial aggregation at the mapper side. For instance, as shown in the figure below, when computing average home price rather than sending all the records related to a particular zipcode to a reducer we can use combiner to calculate total value and cnt and send these two values to the reducer. This step is referred as terminatePartial. It is responsible for returning the total_value and cnt. Then on the reducer side we need a method to handle the partial computation which is done by merge method.

combiner

Okie..now let’s see how to actually code a UDAF.

AverageUDAF Example
Below is a sample implementation of Average function.

package com.meetup.udaf;
 
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.hive.ql.metadata.HiveException;
 
@Description(
        name = "Average",
        value = "_FUNC_(double) - Computes mean",
        extended = "select attr1, CustomAverage(value) from tbl group by attr1;"
)
public class AverageUDAF extends UDAF{
    static final Log LOG = LogFactory.getLog(AverageUDAF.class.getName());
 
    public static class AverageUDAFEvaluator implements UDAFEvaluator{
 
        /**
         * Use item class to serialize intermediate computation
         */
        public static class Item{
            double total_value = 0;
            int cnt = 0;
        }
         
        private Item item = null;
         
        /**
         * function: Constructor
         */
        public AverageUDAFEvaluator(){
            super();
            init();
        }
         
        /**
         * function: init()
         * Its called before records pertaining to a new group are streamed
         */
        public void init() {
            LOG.debug("======== init ========");
            item = new Item();         
        }
         
        /**
         * function: iterate
         * This function is called for every individual record of a group
         * @param value
         * @return
         * @throws HiveException
         */
        public boolean iterate(double value) throws HiveException{
            LOG.debug("======== iterate ========");
            if(item == null)
                throw new HiveException("Item is not initialized");
            item.total_value = item.total_value + value;
            item.cnt = item.cnt + 1;
            return true;
        }
         
        /**
         * function: terminate
         * this function is called after the last record of the group has been streamed
         * @return
         */
        public double terminate(){
            LOG.debug("======== terminate ========");           
            return item.total_value/item.cnt;
        }
         
        /**
         * function: terminatePartial
         * this function is called on the mapper side and
         * returns partially aggregated results.
         * @return
         */
        public Item terminatePartial(){
            LOG.debug("======== terminatePartial ========");           
            return item;
        }
         
         
        /**
         * function: merge
         * This function is called two merge two partially aggregated results
         * @param another
         * @return
         */
        public boolean merge(Item another){
            LOG.debug("======== merge ========");          
            if(another == null) return true;
            item.total_value += another.total_value;
            item.cnt += another.cnt;
            return true;
        }
    }
}

 

In order to build and test the above the code in hive, create a new Maven project (as explained in my previous post) and create a package “com.meetup.udaf”. Lastly copy and paste the above code in a file named “AverageUDAF.java” within the com.meetup.udaf package. Now right click on the project title > Run As > Maven install. This will build a jar in the target folder where the project source is available.

Note that you are not limited to a single implementation of UDAFEvaluator class. But a single class that extends UDAF class can have multiple implementation of UDAFEvaluator such as one for integer, one for string, etc. Hive will use Reflection to determine appropriate evaluator class. For instance, UDAFExampleMax class has six different implementation of UDAFEvaluator for short, int, long, float, double and string.

I hope this makes writing your first UDAF little easy 🙂

Few Things To Keep Note Of:
1. Make sure that the inner class that implements UDAFEvaluator is defined as public. Otherwise Hive won’t be able to use reflection and determine the UDAFEvaluator implementation.
2. If dealing with arrays, make sure that you use ArrayList instead of simple arrays. This is especially for the internal Item class that stores intermediate results. Hive won’t be able to properly serialize or de-serialize simple arrays and will give you all kinds of weird errors.

特别提醒:

如果在Evaluator中采用了map/list存储一些group的信息,则在merge方法中,对other的信息,必须要deep copy出来,然后再塞到this的list/map中,否则结果可能会不对。

这是因为,这个other对象会被hive自动复用。

Leave a Reply

Your email address will not be published. Required fields are marked *