A JSON read/write SerDe for Hive

Posted on July 11, 2011

0


Today I finished coding another SerDe for Hive which, with my employer’s permission, I published on github here: https://github.com/rcongiu/Hive-JSON-Serde.git.

Since the code is still fresh in my mind, I thought I’d write another article on how to write a SerDe, since the official documentation on how to do it it scarce and you’d have to read the hive code directly like I had to do.

How a SerDe is used in Hive

First of all, let’s have a look at how a SerDe interacts with Hive.
A SerDe is an interface composed by Serialization and Deserialization.
Let’s see the methods required for both interfaces:

public interface Serializer {
  void initialize(Configuration conf, Properties tbl) throws SerDeException;
  Class<!--? extends Writable--> getSerializedClass();
  Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException;
}

public interface Deserializer {
  void initialize(Configuration conf, Properties tbl) throws SerDeException;
  Object deserialize(Writable blob) throws SerDeException;
  ObjectInspector getObjectInspector() throws SerDeException;
}

We can see that serialization for Hive means to somehow turn an Object into a Writable, while deserialization does the inverse. One way to do that would be to have objects to implement a certain interface, but hive designers chose another path, that is, to have an ObjectInspector, that is, an auxiliary object that can look at a Java object and make it digestible for Hive. An ObjectInspector does not carry any object information so it can be cached for a certain class of objects. This is done for performance reasons, you create one ObjectInspector and reuse it for all the records in your query.

The ObjectInspector

As I just said, the ObjectInspector lets hive look into a Java object and works as an adapter pattern, adatpting a Java Object as one of the 5 following abstractions, defined in the ObjectInspector interface:

  • PRIMITIVE
  • LIST
  • MAP
  • STRUCT
  • UNION

Here’s the code for the ObjectInspector interface:

package org.apache.hadoop.hive.serde2.objectinspector;
/**
 * ObjectInspector helps us to look into the internal structure of a complex
 * object.
 *
 * A (probably configured) ObjectInspector instance stands for a specific type
 * and a specific way to store the data of that type in the memory.
 *
 * For native java Object, we can directly access the internal structure through
 * member fields and methods. ObjectInspector is a way to delegate that
 * functionality away from the Object, so that we have more control on the
 * behavior of those actions.
 *
 * An efficient implementation of ObjectInspector should rely on factory, so
 * that we can make sure the same ObjectInspector only has one instance. That
 * also makes sure hashCode() and equals() methods of java.lang.Object directly
 * works for ObjectInspector as well.
 */
public interface ObjectInspector extends Cloneable {

  /**
   * Category.
   *
   */
  public static enum Category {
    PRIMITIVE, LIST, MAP, STRUCT, UNION
  };

  /**
   * Returns the name of the data type that is inspected by this
   * ObjectInspector. This is used to display the type information to the user.
   *
   * For primitive types, the type name is standardized. For other types, the
   * type name can be something like "list<int>", "map<int,string>", java class
   * names, or user-defined type names similar to typedef.
   */
  String getTypeName();

  /**
   * An ObjectInspector must inherit from one of the following interfaces if
   * getCategory() returns: PRIMITIVE: PrimitiveObjectInspector LIST:
   * ListObjectInspector MAP: MapObjectInspector STRUCT: StructObjectInspector.
   */
  Category getCategory();
}
</int,string></int>

We can see how the methods shared by all ObjectInspector are basically only introspection: get which Category the ObjectInspector is (Primitive, List, Map, etc.) and the type name (for instance, map).

An alternative to an adapter would be to have the object we want to manipulate implement some interfaces, but it would not be as flexible since we may not always have the luxury to extend a preexisting class.

It works well in our case. The basic idea for the SerDe is to use the JSON library from json.org (http://json.org/java/). The library can read a line and parse it to a JSONObject. A Hive data row is a Struct, since it’s a collection of columns of different types. This means we have to write a StructObjectInspector capable of reading a JSON Object. A JSONObject also can contain JSONArrays, which we’ll map to Hive Lists. For primitives, we can use the standard Hive object inspectors. Hive Maps also will work with JSONObjects. JSON does not distinguish between structs and maps, since in a JSON map you can have key/values of different types, while in a Hive map they have to be of the declared type (for instance map).

Let’s have a look at some ObjectInspectors:

  public interface PrimitiveObjectInspector 
    extends ObjectInspector {

  /**
   * The primitive types supported by Hive.
   */
  public static enum PrimitiveCategory {
    VOID, BOOLEAN, BYTE, SHORT, INT, LONG,
    FLOAT, DOUBLE, STRING, UNKNOWN
  };

  /**
   * Get the primitive category of
   * the PrimitiveObjectInspector.
   */
  PrimitiveCategory getPrimitiveCategory();

  /**
   * Get the Primitive Writable class
   * which is the return type of
   * getPrimitiveWritableObject() and 
   * copyToPrimitiveWritableObject().
   */
  Class<?> getPrimitiveWritableClass();

  /**
   * Return the data in an instance of
   * primitive writable Object. If the Object
   * is already a primitive writable 
   * Object, just return o.
   */
  Object getPrimitiveWritableObject(Object o);

  /**
   * Get the Java Primitive class
   * which is the return type of
   * getJavaPrimitiveObject().
   */
  Class<?> getJavaPrimitiveClass();

  /**
   * Get the Java Primitive object.
   */
  Object getPrimitiveJavaObject(Object o);

  /**
   * Get a copy of the Object in the same 
   * class, so the return value can be
   * stored independently of the parameter.
   * 
   * If the Object is a Primitive Java Object,
   * we just return the parameter
   * since Primitive Java Object is immutable.
   */
  Object copyObject(Object o);

  /**
   * Whether the ObjectInspector prefers
   * to return a Primitive Writable Object
   * instead of a Primitive Java Object. 
   * This can be useful for determining the
   * most efficient way to getting data out
   * of the Object.
   */
  boolean preferWritable();
}

We can see all the supported primitive types, as well as the methods that need to be implemented to retrieve the Java object containing the data.

The List Object Inspector has methods to get the nth element, to get the list length, and to retrieve the object collection:

/**
 * ListObjectInspector.
 *
 */
public interface ListObjectInspector extends ObjectInspector {

  // ** Methods that does not need a data object **
  ObjectInspector getListElementObjectInspector();

  // ** Methods that need a data object **
  /**
   * returns null for null list, out-of-the-range index.
   */
  Object getListElement(Object data, int index);

  /**
   * returns -1 for data = null.
   */
  int getListLength(Object data);

  /**
   * returns null for data = null.
   * 
   * Note: This method should not return a List object that is reused by the
   * same ListObjectInspector, because it's possible that the same
   * ListObjectInspector will be used in multiple places in the code.
   * 
   * However it's OK if the List object is part of the Object data.
   */
  List<!--?--> getList(Object data);
}

Now, let’s look at the StructObjectInspector, that is used to model a Hive row.

/**
 * StructObjectInspector.
 *
 */
public abstract class StructObjectInspector implements ObjectInspector {

  // ** Methods that does not need a data object **
  /**
   * Returns all the fields.
   */
  public abstract List&lt;? extends StructField&gt; getAllStructFieldRefs();

  /**
   * Look up a field.
   */
  public abstract StructField getStructFieldRef(String fieldName);

  // ** Methods that need a data object **
  /**
   * returns null for data = null.
   */
  public abstract Object getStructFieldData(Object data, StructField fieldRef);

  /**
   * returns null for data = null.
   */
  public abstract List&lt;Object&gt; getStructFieldsDataAsList(Object data);

  @Override
  public String toString() {
    StringBuilder sb = new StringBuilder();
    List<!--? extends StructField--> fields = getAllStructFieldRefs();
    sb.append(getClass().getName());
    sb.append("&lt;");
    for (int i = 0; i &lt; fields.size(); i++) {
      if (i &gt; 0) {
        sb.append(",");
      }
      sb.append(fields.get(i).getFieldObjectInspector().toString());
    }
    sb.append("&gt;");
    return sb.toString();
  }
}

/**
 * StructField is an empty interface.
 * 
 * Classes implementing this interface are considered to represent a field of a
 * struct for this serde package.
 */
public interface StructField {

  /**
   * Get the name of the field. The name should be always in lower-case.
   */
  String getFieldName();

  /**
   * Get the ObjectInspector for the field.
   */
  ObjectInspector getFieldObjectInspector();

}

Now let’s write some code. For a struct, hive expects an object than can be cast to Array<Object>.
Notice that we are extending from StandardStructObjectInspector, coding only the JSON library-specific parts of the interface, while reusing all the boilerplate code to manage initialization, reading the field list, mapping them to positions in an array, etc. (code here http://svn.apache.org/repos/asf/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardStructObjectInspector.java)

/**
 * This Object Inspector is used to look into a JSonObject object.
 * We couldn't use StandardStructObjectInspector since that expects 
 * something that can be cast to an Array<Object>.
 * @author rcongiu
 */
public class JsonStructObjectInspector extends StandardStructObjectInspector {

    public JsonStructObjectInspector(List<String> structFieldNames, 
            List<ObjectInspector> structFieldObjectInspectors) {
       super(structFieldNames, structFieldObjectInspectors);
    }

    @Override
    public Object getStructFieldData(Object data, StructField fieldRef) {
    if (data == null) {
      return null;
    }
    JSONObject obj = (JSONObject) data;
    MyField f = (MyField) fieldRef; 

    int fieldID = f.getFieldID();
    assert (fieldID >= 0 && fieldID < fields.size());

    try {
        return obj.get(f.getFieldName());
    } catch (JSONException ex) {
        // if key does not exist
        return null; 
    }
  }

    static List<Object> values = new ArrayList<Object;gt;();
    @Override
    public List<Object> getStructFieldsDataAsList(Object o) {
        JSONObject jObj = (JSONObject) o;
        values.clear();

        for(int i =0; i< fields.size(); i ++) {
            try {
                values.add(jObj.get(fields.get(i).getFieldName()));
                 } catch (JSONException ex) {
                // we're iterating through the keys so 
                // this should never happen
                throw new RuntimeException("Key not found");
            }
        }

        return values;
    }
}

Hive supports lists, which can be mapped to a JSON Array object. The ObjectInspector to let hive access it looks like

public class JsonListObjectInspector extends StandardListObjectInspector {
    JsonListObjectInspector(ObjectInspector listElementObjectInspector) {
        super(listElementObjectInspector);
    }

     @Override
  public List<?> getList(Object data) {
    if (data == null) {
      return null;
    }
    JSONArray array = (JSONArray) data;
    return array.getAsArrayList();
  }

  @Override
  public Object getListElement(Object data, int index) {
    if (data == null) {
      return null;
    }
    JSONArray array = (JSONArray) data;
    try {
        return array.get(index);
    } catch(JSONException ex) {
        return null;
    }
  }

  @Override
  public int getListLength(Object data) {
    if (data == null) {
      return -1;
    }
    JSONArray array = (JSONArray) data;
    return array.length();
  }
}

You can clearly see how this is an Adapter Pattern.

To implement Maps, we need to do some extra work, since the JSONObject object does not implement all the method needed by a Map. The objectInspector gas a getMap() method, which returns a Map. JSONObject has no such method so we’ d have to build the map on the spot, but I preferred to write a Map adapter arount JSONOBject. Code for it here.

public class JsonMapObjectInspector extends StandardMapObjectInspector {

    public JsonMapObjectInspector(ObjectInspector mapKeyObjectInspector, 
            ObjectInspector mapValueObjectInspector) {
        super(mapKeyObjectInspector, mapValueObjectInspector);
    }

  @Override
  public Map&lt;?, ?&gt; getMap(Object data) {
    if (data == null) {
      return null;
    }

    JSONObject jObj = (JSONObject) data;

    return new JSONObjectMapAdapter(jObj);
  }

  @Override
  public int getMapSize(Object data) {
    if (data == null) {
      return -1;
    }
     JSONObject jObj = (JSONObject) data;
    return jObj.length();
  }

  @Override
  public Object getMapValueElement(Object data, Object key) {
    if (data == null) {
      return -1;
    }

     JSONObject jObj = (JSONObject) data;
        try {
            return jObj.get(key.toString());
        } catch (JSONException ex) {
            // key does not exists -> like null
            return null;
        }
  }   
}

The SerDe

Now that we have all the ObjectInspectors we can write out SerDe.
As we saw at the beginning there are three entry points: initialize, serialize, deserialize.

Initialization

During initialization, hive gives the SerDe information on the table that it’s trying to access. For example, it needs the number of columns and their type. The Serde get a Propertyobject, as you can see below, the properties

Constants.LIST_COLUMNS

and

Constants.LIST_COLUMN_TYPES

contain a comma-separated list of column names and column type respectively.

List<String> columnNames;
    List<TypeInfo> columnTypes;
    StructTypeInfo rowTypeInfo;
    StructObjectInspector rowObjectInspector;

    @Override
    public void initialize(Configuration conf, Properties tbl) throws SerDeException {
        LOG.debug("Initializing SerDe");
        // Get column names and types
        String columnNameProperty = tbl.getProperty(Constants.LIST_COLUMNS);
        String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES);

        LOG.debug("columns " + columnNameProperty + " types " + columnTypeProperty);

        // all table column names
        if (columnNameProperty.length() == 0) {
            columnNames = new ArrayList<String>();
        } else {
            columnNames = Arrays.asList(columnNameProperty.split(","));
        }

        // all column types
        if (columnTypeProperty.length() == 0) {
            columnTypes = new ArrayList<TypeInfo>();
        } else {
            columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
        }
        assert (columnNames.size() == columnTypes.size());

Next, we build our ObjectInspector. To do that, we have to build a TypeInfo object through the appropriate factory. The TypeInfo object contains the list of fields, and their types. Since a Struct can contain other types, we can see in the StructTypeInfo code that it can traverse the struct and build TypeInfos for its fields as needed. The TypeInfo is also used to build the signature of the ObjectInspector and cache it.

        // Create row related objects
        rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
        rowObjectInspector = (StructObjectInspector) JsonObjectInspectorFactory.getJsonObjectInspectorFromTypeInfo(rowTypeInfo);

    }

At this point, we’ve built the ObjectInspector, and we’re ready to Deserialize/Serialize

Deserialization

Deserialization is trivial: we get a Writable String with the JSON data and we have a JSONObject parse it. We then return that object. The ObjectInspector will tell Hive how to handle it:

 @Override
    public Object deserialize(Writable w) throws SerDeException {
        Text rowText = (Text) w;

        // Try parsing row into JSON object
        JSONObject jObj;
        try {
            jObj = new JSONObject(rowText.toString()) {

                /**
* In Hive column names are case insensitive, so lower-case all
* field names
*
* @see org.json.JSONObject#put(java.lang.String,
* java.lang.Object)
*/
                @Override
                public JSONObject put(String key, Object value)
                        throws JSONException {
                    return super.put(key.toLowerCase(), value);
                }
            };
        } catch (JSONException e) {
            // If row is not a JSON object, make the whole row NULL
            LOG.error("Row is not a valid JSON Object - JSONException: "
                    + e.getMessage());
            throw new SerDeException(e);
        }
        return jObj;
    }

So when hive wants to access a field myfield</>, will use the JsonStructObjectInspector.getStructFieldData() passing ot it the object and the myfield in the form of StructField reference, which pairs a field name with its corrispective ObjectInspector. This is done in JsonObjectInspectorFactory.getJsonObjectInspectorFromTypeInfo.

Serialization

You may be expecting that serialization is also simple, that we get a JSONObject and call its

toString

method…not quite. Your serializer could be getting any kind of object and the only assumption you can make is that its associated ObjectInspector will be some subclass of

StructObjectInspector

since that’s how a hive table is modeled. To serialize the record we’ll have to analyze the ObjectInspector, using it to instrospect the given data object and building our JSONObject as we go.

We can see how the serializer just calls

SerializeStruct

and turns the JSONObject into a

Text
@Override
    public Writable serialize(Object obj, ObjectInspector objInspector) 
                     throws SerDeException {
        // make sure it is a struct record
        if (objInspector.getCategory() != Category.STRUCT) {
            throw new SerDeException(getClass().toString()
                    + " can only serialize struct types, but we got: "
                    + objInspector.getTypeName());
        }

        JSONObject serializer =
            serializeStruct( obj, (StructObjectInspector) objInspector, columnNames);

        Text t = new Text(serializer.toString());

        return t;
    }

You can see how that works here:

    private JSONObject serializeStruct( Object obj,
            StructObjectInspector soi, List<String> columnNames) {
        // do nothing for null struct
        if (null == obj) {
            return null;
        }

        JSONObject result = new JSONObject();

        List<? extends StructField> fields = soi.getAllStructFieldRefs();

        for (int i =0; i&lt; fields.size(); i++) {
            StructField sf = fields.get(i);
            Object data = soi.getStructFieldData(obj, sf);

            if (null != data) {
                try {
                    // we want to serialize columns with their proper HIVE name,
                    // not the _col2 kind of name usually generated upstream
                    result.put((columnNames==null?sf.getFieldName():columnNames.get(i)),
                            serializeField(
                                data,
                                sf.getFieldObjectInspector()));
                } catch (JSONException ex) {
                   LOG.warn("Problem serialzing", ex);
                   throw new RuntimeException(ex);
                }
            }
        }
        return result;
    }

it loops through all the struct fields, and serializes them one by one. The heart of deserialization is the

serializeField()

method, which uses the ObjectInspector to read the data object and maps it to calls to the JSONObject we’re building:

    Object serializeField(Object obj,
            ObjectInspector oi ){
        if(obj == null) return null;

        Object result = null;
        switch(oi.getCategory()) {
            case PRIMITIVE:
                PrimitiveObjectInspector poi = (PrimitiveObjectInspector)oi;
                switch(poi.getPrimitiveCategory()) {
                    case VOID:
                        result = null;
                        break;
                    case BOOLEAN:
                        result = (((BooleanObjectInspector)poi).get(obj)?
                                            Boolean.TRUE:
                                            Boolean.FALSE);
                        break;
                    case BYTE:
                        result = (((ShortObjectInspector)poi).get(obj));
                        break;
                    case DOUBLE:
                        result = (((DoubleObjectInspector)poi).get(obj));
                        break;
                    case FLOAT:
                        result = (((FloatObjectInspector)poi).get(obj));
                        break;
                    case INT:
                        result = (((IntObjectInspector)poi).get(obj));
                        break;
                    case LONG:
                        result = (((LongObjectInspector)poi).get(obj));
                        break;
                    case SHORT:
                        result = (((ShortObjectInspector)poi).get(obj));
                        break;
                    case STRING:
                        result = (((StringObjectInspector)poi).getPrimitiveJavaObject(obj));
                        break;
                    case UNKNOWN:
                        throw new RuntimeException("Unknown primitive");
                }
                break;
            case MAP:
                result = serializeMap(obj, (MapObjectInspector) oi);
                break;
            case LIST:
                result = serializeArray(obj, (ListObjectInspector)oi);
                break;
            case STRUCT:
                result = serializeStruct(obj, (StructObjectInspector)oi, null);
                break;
        }
        return result;
    }

Notice how we reuse standard ObjectInspectors for primitives, while we have our own deserialization code for lists, maps and structs. Since we descend the object structure as we go, we can have arbitrarily deep nesting in the JSON data in both reading it and writing it.

Usage

Using the SerDe is simple, just add it to your queries (or in your hive classpath) and do something like this:

CREATE TABLE json_test (
           name string,
           favorite_foods list<string>,
           subject_grade map<string,string>
) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS SEQUENCEFILE;

This will be able to read json data like

{ "name":"Roberto", "favorite_foods":["Sushi","Pizza"], "subject_grade":{"math":good","histori":"so so"}}

using a query like

SELECT name, favorite_foods[0], subject_grade['math'] FROM json_test;

Source Code

available here: https://github.com/rcongiu/Hive-JSON-Serde/
Pre-built binary available in the Downloads section.

Tagged: ,
Posted in: programming