Structured data in Hive: a generic UDF to sort arrays of structs

Posted on September 17, 2013

0


Introduction

Hive has a rich and complex data model that supports maps, arrays and structs, that could be mixed and matched, leading to arbitrarily nested structures, like in JSON. I wrote about a JSON SerDe in another post and if you use it, you know it can lead to pretty complicated nested tables.

Unfortunately, hive HQL has not a lot of built in functions to deal with nested data.

One frequent need we have is to sort an array of structs by an arbitrary field. That is, if your table definition is something like

CREATE TABLE mytable (
    ....
   friends array<struct<name string, age int>>,
 ...
)

you may actually have the friends array sorted by either age or name, depending on what you want to do with it. Unfortunately the built in sort function will sort array elements according to their natural order, but it is not clear what that would be for a struct, I guess it will order by the first struct field, then the second.

I thought this would be a good example of a simple generic UDF, since the actual sorting operation is trivial in java, so we can focus on the piping we’ll have to put around it.  You can find other examples of generic UDFs, for instance this one , I wanted to write a simple one to expose only the essential piping around a sort operation.

Our goal is to write a generic UDF that could be used on the above table to do something like:

SELECT sort_array_by(friends,'age') from mytable

to sort your array of friends by the age field.

UDFs vs Generic UDFs

Let’s take a step back and have a look at that ‘generic’ in UDF. You may not actually be familiar with UDFs at all, let’s see what an UDF (aka ‘plain’ UDF) is and how Generic UDFs differ from UDFs. A good reading about the differences can be found here.

A UDF , or ‘User Defined Function’ is a function you can code in java to perform something you can’t do in SQL. For instance here at OpenX where I work, we have written UDFs to perform currency conversions so we can write things like

SELECT SUM(currency_convert( p_coin, 'USD', p_revenue)) FROM ..

where currency_convert is a function with a signature (STRING cur1,STRING cur2,BIGINT amount) that converts amount from cur1 to cur2 (in our example, whatever the publisher currency is to USD).

This function is coded as a ‘plain’ UDF, so it’s coded as a java class that implements an evaluate method with that signature:

public class CurrencyConverter extends UDF {
    CurrencyExchangeTable convTable = null;

    CurrencyExchangeTable getExchangeTable() throws Exception {
	if(convTable == null) {
	    convTable = new CurrencyExchangeTable();
	} 
	return convTable;
    }

    public long evaluate(final String from , final String to, long amount) 
            throws Exception {
        return (long) (amount * getExchangeTable().getConversionRate(from.toUpperCase(),to.toUpperCase()));
    }
}

 

This is pretty simple to write, and hive will infer the evaluate method to call and the return value type from your class using Java reflection.

In Generic UDFs that is not the case and the developer needs to code the parameter/return value type and value handling with more flexibility and better performance at the price of some added complexity. Also, AFAIK plain UDFs cannot handle complex (nested) types so if that’s what you need, you have to use Generic UDFs.

The bottom line is, with plain UDFs your input parameters are fixed in number and type, as is its return value, while in a Generic UDF you can code it to accept pretty much any set of inputs and outputs. Also, plain UDFs use java reflection and could be considerable slower than a generic UDF.

 

Hive Data Structures, ObjectInspector and DeferredObject

I mentioned that Hive has a riche set of data structures. Beside primitive types (int, float, bigint, string, etc) you can have structures that can contain either primitives or other structures, so you could build an arbitrarily nested data structure. You can use:

  • arrays/lists: can contain a set of elements, all of the same type
  • maps: can contain key/value pairs, with keys of one type and values of the some or other type
  • structs: they contain an ordered list of values, each one could be of a different type (like C structs)

a record in a table is modeled as a struct, as each record is a set of values, each one with possibly a different type.

The interesting part is how hive implements these structures. You’d expect that they’d be mapped as the correspondent java structures, but it’s a little different from that. Whenever we need to read data from the serialized format, a java.lang.Object reference is passed as data, together with an associated ObjectInspector.  ObjectInspectors have a category associated to it that can be one of  PRIMITIVE, LIST, MAP, STRUCT or UNION. Let’s skip UNION (which is used in SELECT .. UNION ), we see that each category maps to a different data structure. This way, hive does not need to use introspection to read the data, nor the deserialized object need to implement any interface or extend any subclass, but applies the Adapter Design Pattern to present an uniform interface to objects of different type.

For example, if you’re deserializing JSON data, a deserialized JSON object can be thought as a struct: it does have a set of values, each one possibly of a different type. To make Hive understand it, we can write a JsonStructObjectInspector as an adapter from an Hive struct to a JSON Object.
Now, the other piece of the equation in generic UDFs is the DeferredObject.
Since we’ll be executing UDFs for every record (like in the SerDe) we want to be as efficient as possible. That means that we don’t want to deserialize data unless it’s really needed, as we’d be creating unnecessary Java Objects and more work for the garbage collector.

Hive solves this with DeferredObject, which holds something that may or not be deserialized. Generic UDFs gets their parameters passed as arrays of DeferredObject, together with a matched array of ObjectInspectors. ObjectInspectors are passed during initialization, while DeferredObjects are passed for each record. So the skeleton of our generic UDF would be something like:

public class ArrayStructSortUDF extends GenericUDF {
    protected ObjectInspector[] argumentOIs;

     @Override 
    public ObjectInspector initialize(ObjectInspector[] ois) throws UDFArgumentException {
              argumentOIs = ois;
     }

    @Override
    public Object evaluate(DeferredObject[] dos) throws HiveException {
    ....
    }
}

 

As you see above, we store the ObjectInspectors we get from hive for use later. Each ObjectInspector holds the list of field types that we can expect when the function is called for each value and it’s up to us to decide what we can deal with and what we don’t like. We can also decide how many parameters we’d like. For instance,  we could rewrite the CONCAT UDF that takes an arbitrary number of parameters of any type  and returns a string with all those parameters converted to string and concatenated. You could not do that with a ‘plain’ UDF.

Also, it can do short-circuit evaluation, that is, ignore parameters it doesn’t need saving the cost of deserializing them (like COALESCE that returns the first non-null parameter).

Note that the initialize method returns an ObjectInspector: that’s the ObjectInspector that can read the result, so in the initialize method is also where you decide the type of your result. In the example that follows, the ObjectInspector of the result is the same as the one for the first argument, since we’re returning the first argument sorted, but you can see a more sophisticated initialize method  in the example here: http://www.baynote.com/2012/11/a-word-from-the-engineers/.

Putting all together

here’s the code, reproduced with permission from my employer OpenX

package com.congiu.udf;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.LIST;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

/**
 *
 * @author rcongiu
 */
@Description(name = "array_struct_sort",
	value = "_FUNC_(array(struct1,struct2,...), string myfield) - "
	+ "returns the passed array struct, ordered by the given field  " ,
	extended = "Example:\n"
	+ "  > SELECT _FUNC_(str, 'myfield') FROM src LIMIT 1;\n"
	+ " 'b' ")
public class ArrayStructSortUDF extends GenericUDF {
    protected ObjectInspector[] argumentOIs;

    ListObjectInspector loi;
    StructObjectInspector elOi;

    // cache comparators for performance
    Map<String,Comparator> comparatorCache = new HashMap<String,Comparator>();

    @Override 
    public ObjectInspector initialize(ObjectInspector[] ois) throws UDFArgumentException {
	// all common initialization
	argumentOIs = ois;

	// clear comparator cache from previous invokations
	comparatorCache.clear();

	return checkAndReadObjectInspectors(ois);
    }

     /**
     * Utility method to check that an object inspector is of the correct type,
     * and returns its element object inspector
     * @param oi
     * @return
     * @throws UDFArgumentTypeException 
     */
    protected ListObjectInspector checkAndReadObjectInspectors(ObjectInspector[] ois) 
	    throws UDFArgumentTypeException, UDFArgumentException {
	// check number of arguments. We only accept two,
	// the list of struct to sort and the name of the struct field
	// to sort by
	if(ois.length != 2 ) {
	    throw new UDFArgumentException("2 arguments needed, found " + ois.length );
	}

	// first argument must be a list/array
	if (! ois[0].getCategory().equals(LIST)) {
		throw new UDFArgumentTypeException(0, "Argument 1"
			+ " of function " + this.getClass().getCanonicalName() + " must be " + Constants.LIST_TYPE_NAME
			+ ", but " + ois[0].getTypeName()
			+ " was found.");
	}

	// a list/array is read by a LIST object inspector
	loi = (ListObjectInspector) ois[0];

	// a list has an element type associated to it
	// elements must be structs for this UDF
	if( loi.getListElementObjectInspector().getCategory() != ObjectInspector.Category.STRUCT) {
		throw new UDFArgumentTypeException(0, "Argument 1"
			+ " of function " +  this.getClass().getCanonicalName() + " must be an array of structs " +
			" but is an array of " + loi.getListElementObjectInspector().getCategory().name());
	}

	// store the object inspector for the elements
	elOi = (StructObjectInspector)loi.getListElementObjectInspector();

	// returns the same object inspector
	return	loi;
    }

    // to sort a list , we must supply our comparator
    public  class StructFieldComparator implements Comparator {
	StructField field;

	public StructFieldComparator(String fieldName) {
	    field = elOi.getStructFieldRef(fieldName);
	}

	public int compare(Object o1, Object o2) {

	    // ok..so both not null
	    Object f1 =	elOi.getStructFieldData(o1, field);
	    Object f2 = elOi.getStructFieldData(o2, field);
	    // compare using hive's utility functions
	    return ObjectInspectorUtils.compare(f1, field.getFieldObjectInspector(), 
		    f2, field.getFieldObjectInspector());
	}
    }

    // factory method for cached comparators
    Comparator getComparator(String field) {
	if(!comparatorCache.containsKey(field)) {
	    comparatorCache.put(field, new StructFieldComparator(field));
	}
	return comparatorCache.get(field);
    }

    @Override
    public Object evaluate(DeferredObject[] dos) throws HiveException {
	// get list
	if(dos==null || dos.length != 2) {
	    throw new HiveException("received " + (dos == null? "null" :
		    Integer.toString(dos.length) + " elements instead of 2"));
	}

	// each object is supposed to be a struct
	// we make a shallow copy of the list. We don't want to sort 
	// the list in place since the object could be used elsewhere in the
	// hive query
	ArrayList al = new ArrayList(loi.getList(dos[0].get()));

	// sort with our comparator, then return
	// note that we could get a different field to sort by for every
	// invocation
	Collections.sort(al, getComparator( (String) dos[1].get()) );

	return al;
    }

    @Override
    public String getDisplayString(String[] children) {
	return  (children == null? null : this.getClass().getCanonicalName() + "(" + children[0] + "," + children[1] + ")");
    }

}

Maven dependencies for this code would be :

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-core</artifactId>
      <version>0.20.2</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>0.8.0</version>
      <scope>provided</scope>
    </dependency>

Comments

comments

Posted in: programming