Writing a Hive SerDe for LWES event files

Posted on October 27, 2009 by

0


I am currently working to set up an OLAP data warehouse using Hive on top of Hadoop. We have a considerable amount of data that comes from the ad servers on which we need to perform various kinds of analysis.

Writing a map-reduce job is not difficult in principle – it’s just time consuming and requires the skills of a trained java engineer, which wouldn’t be needed were we using SQL. That’s where hive comes in: it allows us to query an hadoop data store using a flavor of SQL.

 

Hive stores data using common hadoop formats: text and sequence files.
Unfortunately, our data is not in either of those formats and it would be unpractical to reformat it. Fortunately Hive is very extendable: you can plug your own file format, even your own serialization/deserialization. What’s the difference between the two ?
 

  • File Format: determines how data is stored, from the file set to the parsing of single key/value pairs: compression, binary format, splits, etc.
  • SerDe: maps how you deserialize a key/value pair to a set of columns belonging to a hive table

At the time I wrote this not, it’s not mentioned in the hive wiki that you can actually specify your own file format. If you dig in the code and look in the Query Language grammar (ql/src/java/org/apache/hadoop/hive/ql/Hive.g) you’ll see that you can actually pick your input and output format:

     KW_STORED KW_AS KW_SEQUENCEFILE  -> TOK_TBLSEQUENCEFILE
      | KW_STORED KW_AS KW_TEXTFILE  -> TOK_TBLTEXTFILE
      | KW_STORED KW_AS KW_RCFILE  -> TOK_TBLRCFILE
      | KW_STORED KW_AS KW_INPUTFORMAT inFmt=StringLiteral KW_OUTPUTFORMAT outFmt=StringLiteral

So in the CREATE TABLE you could specify STORED AS INPUTFORMAT ‘com.yoursite.YourInputFormatClass’ OUTPUTFORMAT=’com.yoursite.YourOutputFormatClass’.

Writing your own input/output format is not trivial since you’ll have to take care of how to create the splits from the input. Having the splits of the right size it’s very important for Hadoop performance so you should write your own splitting logic only if you know what you’re doing.

Should you decide to write your own input format, how do you plug it in ? Hive will add to its classpath all the files in a directory specified by either the HIVE_AUX_JARS_PATH environment variable or the –auxpath command line option for the hive shell. This is also how you add a SerDe, and an UDF function.

In this post I’ll just cover the writing of a SerDe. I will cover the InputFormat in another post.

SerDe tells Hive how to map the column names in the table to values within the Writable object. Note that the Writable Object may be an arbitrarily complex object hierarchy.

To efficiently handle introspection, Hive implements its own, optimized for high-performance repetitive tasks. It uses ObjectInspectors that do not keep state and are immutable, so we just need one per type.
Types inherit from org.apache.hadoop.hive.serde2.typeinfo.TypeInfo and can belong to one of these Categories:

public interface ObjectInspector {
  public static enum Category {
    PRIMITIVE, LIST, MAP, STRUCT
  };

Each one of these 4 types has its own interface that extends ObjectInspector:

  • PrimitiveObjectInspector
  • ListObjectInspector
  • MapObjectInspector
  • StructObjectInspector

 

Since our file format is basically a Map, I tried to implement a SerDe with a MapObjectInspector that reads into our format. Unfortunately, it didn’t work. It looks like in the execution, Hive expects to have a Struct, not a map. So your table record HAS TO be a struct, with the other categories being used just for columns/attributes.
From a theoretical point of view, it makes sense. When we issue a SELECT statement, we’re expecting a result that it’s a set of values that are of different type…which is the definition of a struct. For that we cannot use a primitive nor lists nor maps since they are homogenous collections (List, Map).

So, how does the SerDe work ?
The SerDe interface is simply the union of two interface: serialization and deserialization. Since hadoop expects a Writable, we serialize/deserialize to/from Writable.

public interface Serializer {
  public void initialize(Configuration conf, Properties tbl) throws SerDeException;
  public Class<? extends Writable> getSerializedClass();
  public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException;
}

public interface Deserializer {
  public void initialize(Configuration conf, Properties tbl) throws SerDeException;
  public Object deserialize(Writable blob) throws SerDeException;
  public ObjectInspector getObjectInspector() throws SerDeException;
}

To know how to do serialization/deserialization we do need to know how to map the columns to whatever the internal structure of our object is and also eventually do our type conversions. Since serialize/deserialize methods are called for every record we should optimize this conversion as much as possible, pre-building this mapping in efficient data structures when the SerDe is started. This is can be done in the initialize() method of the SerDe.

In my case, I use these four attributes:

  • List columnNames;
  • List columnTypes;
  • TypeInfo rowTypeInfo;
  • ObjectInspector rowObjectInspector;

As seen below, we retrieve the list of columns and types from properties, and we build the TypeInfo (hive’s way to categorize the columns) from utility methods. Particularly, we create the row type info as a struct info, with the given list of columns and type ( rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes) )

     // Get column names and sort order
        String columnNameProperty = tbl.getProperty("columns");
        String columnTypeProperty = tbl.getProperty("columns.types");
        if (columnNameProperty.length() == 0) {
            columnNames = new ArrayList();
        } else {
            columnNames = Arrays.asList(columnNameProperty.split(","));
        }
        if (columnTypeProperty.length() == 0) {
            columnTypes = new ArrayList();
        } else {
            columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
        }
        assert (columnNames.size() == columnTypes.size());

        if (tbl.containsKey("lwes.event_name")) {
            allEventName = tbl.getProperty("lwes.event_name");
        }

        // Create row related objects
        rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
        rowObjectInspector = (StructObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
        row = new ArrayList(columnNames.size());

        for (int i = 0; i < columnNames.size(); i++) {
            row.add(null);
        }

        // Get the sort order
        String columnSortOrder = tbl.getProperty(Constants.SERIALIZATION_SORT_ORDER);
        columnSortOrderIsDesc = new boolean[columnNames.size()];
        for (int i = 0; i < columnSortOrderIsDesc.length; i++) {
            columnSortOrderIsDesc[i] = (columnSortOrder != null && columnSortOrder.charAt(i) == '-');
        }

So, above we retrieved the hive columns and defined what our rows are. Now we need t define how we want to map those columns to our data. In my case, I had to map lwes events (http://www.lwes.org). Lwes is an event system that produces event files that are basically key/value pairs. It’s a binary format optimized for size and it’s free and open source. Every event has a type, and a different set of possible keys. My first idea was to just map hive column names to lwes keys, but the former are case insensitive while the latter are not. So we want to have a natural mapping for the columns that are lowercase, but we can then use SerDe properties to manually map the one that are not. Also, we may want multiple event types in a table, or we may want just one event type. In the first case, we do want to specify the event type we care about as well. That happened above in the statement allEventName = tbl.getProperty("lwes.event_name"). But let’s see how to specify the mapping :

CREATE TABLE blah (
     &hellip;.....
    u_header_ua string,
    u_ox_url string,
     &hellip;....
 )
PARTITIONED BY(dt STRING)
 ROW FORMAT SERDE 'org.openx.data.hive.journalserde.EventSerDe'
WITH SERDEPROPERTIES (
        'lwes.event_name'='My::Event',
        'sender_ip'='SenderIP',
        'sender_port'='SenderPort',
        'receipt_time'='ReceiptTime',
        'site_id'='SiteID')

So, u_header_ua will be automatically mapped to the event key with the same name. “sender_ip” will be mapped to the key “SenderIP”. We want to store these mappings in a hash to be used by the Serializer and Deserializer. Since we have to put the data in a row record that’s defined as ArrayList , we need to map the event key to We’ll store them in Map> fieldsForEventName = new HashMap>(). FieldAndPosition is a simple data structure that stores the Lwes key along with its position in the ArrayList we need to return.

Let’s see how it works:

     // take each hive column and find what it maps to into the event list
        int colNumber = 0;
        for (String columnName : columnNames) {
            String fieldName;
            String eventName;
            // column not defined in SerDe properties and no event specified.
            if (!tbl.containsKey(columnName) && allEventName == null) {
                LOG.debug("Column " + columnName + 
                        " is not mapped to an eventName:field through SerDe Properties");
                continue;
            } else if (allEventName != null) {
                // no key, but in a single-type event file specified in lwes.event_name
                eventName = allEventName;
                fieldName = columnName;
            } else {
                // we found a special mapping
                String fullEventField = tbl.getProperty(columnName);
                String[] parts = fullEventField.split("::");

                // we are renaming the column
                if (parts.length < 1 || (parts.length == 1 && allEventName != null)) {
                    System.out.println("Malformed EventName::Field " + fullEventField);
                    continue;
                } else if (parts.length == 1 && allEventName != null) {
                    // adds the name. We're not specifying the event.
                    fieldName = parts[0];
                    eventName = allEventName;
                } else {
                    fieldName = parts[parts.length - 1];
                    eventName = fullEventField.substring(0, fullEventField.length() - 2 - fieldName.length());
                }
                LOG.debug("Mapping " + columnName + " to EventName " + eventName +
                        ", field " + fieldName);
            }

            if (!fieldsForEventName.containsKey(eventName)) {
                fieldsForEventName.put(eventName, new LinkedList());
            }

            fieldsForEventName.get(eventName).add(new FieldAndPosition(fieldName, colNumber));

            colNumber++;
        }

What the deserialize method does is return a

    ArrayList row;

Let’s see how we do it, using the data structures we’ve built so far:

    @Override
    public Object deserialize(Writable w) throws SerDeException {
        LOG.debug("JournalSerDe::deserialize");

        if (w instanceof EventListWritable) {
            EventListWritable ew = (EventListWritable) w;
            for (Event ev : ew.getEvents()) {
                deserializeEvent(ev);
            }
        } else if (w instanceof EventWritable) {
            EventWritable ew = (EventWritable) w;
            Event ev = ew.getEvent();
            deserializeEvent(ev);
        } else {
            throw new SerDeException("I don't know how to deserialize " + w.getClass().getName());
        }
        return row;
    }

Here are the gory details of deserializing an event, including type conversion:

    public void deserializeEvent(Event ev) throws SerDeException {
        if (this.fieldsForEventName.containsKey(ev.getEventName())) {

            for (FieldAndPosition fp : fieldsForEventName.get(ev.getEventName())) {
                
                TypeInfo type = columnTypes.get(fp.getPosition());
                
                LOG.debug("Deserializing " + columnNames.get(fp.getPosition()));

                try {
                    if (type.getTypeName().equals(Constants.STRING_TYPE_NAME)) {              
                        if ( ev.get(fp.getField()) != null )
                            row.set(fp.getPosition(), ev.get(fp.getField()).toString());
                        else
                            row.set(fp.getPosition(), null);
                    } else if (type.getTypeName().equals(Constants.INT_TYPE_NAME)) {
                       row.set(fp.getPosition(), ev.getInt32(fp.getField()));
                    } else if (type.getTypeName().equals(Constants.SMALLINT_TYPE_NAME) ||
                               type.getTypeName().equals(Constants.TINYINT_TYPE_NAME)) {
                       row.set(fp.getPosition(), new Short(ev.getInt16(fp.getField())));
                    } else if (type.getTypeName().equals(Constants.BIGINT_TYPE_NAME)) {
                        row.set(fp.getPosition(), ev.getInt64(fp.getField()));
                    } else if (type.getTypeName().equals(Constants.BOOLEAN_TYPE_NAME)) {
                        row.set(fp.getPosition(), ev.getBoolean(fp.getField()));
                    } else if (type.getTypeName().equals(Constants.DATETIME_TYPE_NAME)) {
                        row.set(fp.getPosition(), ev.get(fp.getField()));
                    } else if (type.getTypeName().equals(Constants.DATE_TYPE_NAME)) {
                        row.set(fp.getPosition(), ev.get(fp.getField()));
                    } else if (type.getTypeName().equals(Constants.FLOAT_TYPE_NAME)) {
                        throw new SerDeException("Float not supported");
                    } else if (type.getTypeName().equals(Constants.DOUBLE_TYPE_NAME)) {
                        throw new SerDeException("Double not supported");
                    } else if (type.getTypeName().equals(Constants.TIMESTAMP_TYPE_NAME)) {
                        row.set(fp.getPosition(), ev.get(fp.getField()));
                    } 
                } catch (NoSuchAttributeException ex) {
                    LOG.error("No such attribute " + fp.getField() +
                            " in event " + ev.getEventName() +
                            " for column " + columnNames.get(fp.getPosition()));
                } catch (AttributeNotSetException ex) {
                    row.set(fp.getPosition(), null);
                    LOG.debug("Not set:  attribute " + fp.getField() +
                            " in event " + ev.getEventName() +
                            " for column " + columnNames.get(fp.getPosition()));
                } catch (Exception ex) {
                    LOG.error("Exception " + ex + " processing " + fp.getField() +
                            " in event " + ev.getEventName() +
                            " for column " + columnNames.get(fp.getPosition()));
                }
            }
        }
    }

We also need some utility methods:

    @Override
    public ObjectInspector getObjectInspector() throws SerDeException {
        LOG.debug("JournalSerDe::getObjectInspector()");
        return rowObjectInspector;
    }

    @Override
    public Class<? extends Writable> getSerializedClass() {
        LOG.debug("JournalSerDe::getSerializedClass()");
        return EventWritable.class;
    }

If you’re interested to the complete code, it is available on the lwes.org subversion repository here: https://lwes.svn.sourceforge.net/svnroot/lwes/contrib/lwes-hive-serde/trunk/.

Comments

comments

Tagged: ,
Posted in: programming