I am currently working to set up an OLAP data warehouse using Hive on top of Hadoop. We have a considerable amount of data that comes from the ad servers on which we need to perform various kinds of analysis.
Writing a map-reduce job is not difficult in principle – it's just time consuming and requires the skills of a trained java engineer, which wouldn't be needed were we using SQL. That's where hive comes in: it allows us to query an hadoop data store using a flavor of SQL.
Hive stores data using common hadoop formats: text and sequence files.
Unfortunately, our data is not in either of those formats and it would be unpractical to reformat it. Fortunately Hive is very extendable: you can plug your own file format, even your own serialization/deserialization. What's the difference between the two ?
At the time I wrote this not, it's not mentioned in the hive wiki that you can actually specify your own file format. If you dig in the code and look in the Query Language grammar (ql/src/java/org/apache/hadoop/hive/ql/Hive.g) you'll see that you can actually pick your input and output format:
KW_STORED KW_AS KW_SEQUENCEFILE -> TOK_TBLSEQUENCEFILE
| KW_STORED KW_AS KW_TEXTFILE -> TOK_TBLTEXTFILE
| KW_STORED KW_AS KW_RCFILE -> TOK_TBLRCFILE
| KW_STORED KW_AS KW_INPUTFORMAT inFmt=StringLiteral KW_OUTPUTFORMAT outFmt=StringLiteral
So in the CREATE TABLE you could specify STORED AS INPUTFORMAT 'com.yoursite.YourInputFormatClass' OUTPUTFORMAT='com.yoursite.YourOutputFormatClass'.
Writing your own input/output format is not trivial since you'll have to take care of how to create the splits from the input. Having the splits of the right size it's very important for Hadoop performance so you should write your own splitting logic only if you know what you're doing.
Should you decide to write your own input format, how do you plug it in ? Hive will add to its classpath all the files in a directory specified by either the HIVE_AUX_JARS_PATH environment variable or the –auxpath command line option for the hive shell. This is also how you add a SerDe, and an UDF function.
In this post I'll just cover the writing of a SerDe. I will cover the InputFormat in another post.
SerDe tells Hive how to map the column names in the table to values within the Writable object. Note that the Writable Object may be an arbitrarily complex object hierarchy.
To efficiently handle introspection, Hive implements its own, optimized for high-performance repetitive tasks. It uses ObjectInspectors that do not keep state and are immutable, so we just need one per type.
Types inherit from org.apache.hadoop.hive.serde2.typeinfo.TypeInfo and can belong to one of these Categories:
public interface ObjectInspector {
public static enum Category {
PRIMITIVE, LIST, MAP, STRUCT
};
Each one of these 4 types has its own interface that extends ObjectInspector:
Since our file format is basically a Map, I tried to implement a SerDe with a MapObjectInspector that reads into our format. Unfortunately, it didn't work. It looks like in the execution, Hive expects to have a Struct, not a map. So your table record HAS TO be a struct, with the other categories being used just for columns/attributes.
From a theoretical point of view, it makes sense. When we issue a SELECT statement, we're expecting a result that it's a set of values that are of different type...which is the definition of a struct. For that we cannot use a primitive nor lists nor maps since they are homogenous collections (List, Map).
So, how does the SerDe work ?
The SerDe interface is simply the union of two interface: serialization and deserialization. Since hadoop expects a Writable, we serialize/deserialize to/from Writable.
public interface Serializer {
public void initialize(Configuration conf, Properties tbl) throws SerDeException;
public Class<? extends Writable> getSerializedClass();
public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException;
}
public interface Deserializer {
public void initialize(Configuration conf, Properties tbl) throws SerDeException;
public Object deserialize(Writable blob) throws SerDeException;
public ObjectInspector getObjectInspector() throws SerDeException;
}
To know how to do serialization/deserialization we do need to know how to map the columns to whatever the internal structure of our object is and also eventually do our type conversions. Since serialize/deserialize methods are called for every record we should optimize this conversion as much as possible, pre-building this mapping in efficient data structures when the SerDe is started. This is can be done in the initialize() method of the SerDe.
In my case, I use these four attributes:
As seen below, we retrieve the list of columns and types from properties, and we build the TypeInfo (hive's way to categorize the columns) from utility methods. Particularly, we create the row type info as a struct info, with the given list of columns and type ( rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes) )
// Get column names and sort order
String columnNameProperty = tbl.getProperty("columns");
String columnTypeProperty = tbl.getProperty("columns.types");
if (columnNameProperty.length() == 0) {
columnNames = new ArrayList();
} else {
columnNames = Arrays.asList(columnNameProperty.split(","));
}
if (columnTypeProperty.length() == 0) {
columnTypes = new ArrayList();
} else {
columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
}
assert (columnNames.size() == columnTypes.size());
if (tbl.containsKey("lwes.event_name")) {
allEventName = tbl.getProperty("lwes.event_name");
}
// Create row related objects
rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
rowObjectInspector = (StructObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
row = new ArrayList(columnNames.size());
for (int i = 0; i < columnNames.size(); i++) {
row.add(null);
}
// Get the sort order
String columnSortOrder = tbl.getProperty(Constants.SERIALIZATION_SORT_ORDER);
columnSortOrderIsDesc = new boolean[columnNames.size()];
for (int i = 0; i < columnSortOrderIsDesc.length; i++) {
columnSortOrderIsDesc[i] = (columnSortOrder != null && columnSortOrder.charAt(i) == '-');
}
So, above we retrieved the hive columns and defined what our rows are. Now we need t define how we want to map those columns to our data. In my case, I had to map lwes events (http://www.lwes.org). Lwes is an event system that produces event files that are basically key/value pairs. It's a binary format optimized for size and it's free and open source. Every event has a type, and a different set of possible keys. My first idea was to just map hive column names to lwes keys, but the former are case insensitive while the latter are not. So we want to have a natural mapping for the columns that are lowercase, but we can then use SerDe properties to manually map the one that are not. Also, we may want multiple event types in a table, or we may want just one event type. In the first case, we do want to specify the event type we care about as well. That happened above in the statement allEventName = tbl.getProperty("lwes.event_name"). But let's see how to specify the mapping :
CREATE TABLE blah (
….....
u_header_ua string,
u_ox_url string,
…....
)
PARTITIONED BY(dt STRING)
ROW FORMAT SERDE 'org.openx.data.hive.journalserde.EventSerDe'
WITH SERDEPROPERTIES (
'lwes.event_name'='My::Event',
'sender_ip'='SenderIP',
'sender_port'='SenderPort',
'receipt_time'='ReceiptTime',
'site_id'='SiteID')
So, u_header_ua will be automatically mapped to the event key with the same name. “sender_ip” will be mapped to the key “SenderIP”. We want to store these mappings in a hash to be used by the Serializer and Deserializer. Since we have to put the data in a row record that's defined as ArrayList , we need to map the event key to We'll store them in Map> fieldsForEventName = new HashMap>(). FieldAndPosition is a simple data structure that stores the Lwes key along with its position in the ArrayList we need to return.
Let's see how it works:
// take each hive column and find what it maps to into the event list
int colNumber = 0;
for (String columnName : columnNames) {
String fieldName;
String eventName;
// column not defined in SerDe properties and no event specified.
if (!tbl.containsKey(columnName) && allEventName == null) {
LOG.debug("Column " + columnName +
" is not mapped to an eventName:field through SerDe Properties");
continue;
} else if (allEventName != null) {
// no key, but in a single-type event file specified in lwes.event_name
eventName = allEventName;
fieldName = columnName;
} else {
// we found a special mapping
String fullEventField = tbl.getProperty(columnName);
String[] parts = fullEventField.split("::");
// we are renaming the column
if (parts.length < 1 || (parts.length == 1 && allEventName != null)) {
System.out.println("Malformed EventName::Field " + fullEventField);
continue;
} else if (parts.length == 1 && allEventName != null) {
// adds the name. We're not specifying the event.
fieldName = parts[0];
eventName = allEventName;
} else {
fieldName = parts[parts.length - 1];
eventName = fullEventField.substring(0, fullEventField.length() - 2 - fieldName.length());
}
LOG.debug("Mapping " + columnName + " to EventName " + eventName +
", field " + fieldName);
}
if (!fieldsForEventName.containsKey(eventName)) {
fieldsForEventName.put(eventName, new LinkedList());
}
fieldsForEventName.get(eventName).add(new FieldAndPosition(fieldName, colNumber));
colNumber++;
}
What the deserialize method does is return a
ArrayList row;
Let's see how we do it, using the data structures we've built so far:
@Override
public Object deserialize(Writable w) throws SerDeException {
LOG.debug("JournalSerDe::deserialize");
if (w instanceof EventListWritable) {
EventListWritable ew = (EventListWritable) w;
for (Event ev : ew.getEvents()) {
deserializeEvent(ev);
}
} else if (w instanceof EventWritable) {
EventWritable ew = (EventWritable) w;
Event ev = ew.getEvent();
deserializeEvent(ev);
} else {
throw new SerDeException("I don't know how to deserialize " + w.getClass().getName());
}
return row;
}
Here are the gory details of deserializing an event, including type conversion:
public void deserializeEvent(Event ev) throws SerDeException {
if (this.fieldsForEventName.containsKey(ev.getEventName())) {
for (FieldAndPosition fp : fieldsForEventName.get(ev.getEventName())) {
TypeInfo type = columnTypes.get(fp.getPosition());
LOG.debug("Deserializing " + columnNames.get(fp.getPosition()));
try {
if (type.getTypeName().equals(Constants.STRING_TYPE_NAME)) {
if ( ev.get(fp.getField()) != null )
row.set(fp.getPosition(), ev.get(fp.getField()).toString());
else
row.set(fp.getPosition(), null);
} else if (type.getTypeName().equals(Constants.INT_TYPE_NAME)) {
row.set(fp.getPosition(), ev.getInt32(fp.getField()));
} else if (type.getTypeName().equals(Constants.SMALLINT_TYPE_NAME) ||
type.getTypeName().equals(Constants.TINYINT_TYPE_NAME)) {
row.set(fp.getPosition(), new Short(ev.getInt16(fp.getField())));
} else if (type.getTypeName().equals(Constants.BIGINT_TYPE_NAME)) {
row.set(fp.getPosition(), ev.getInt64(fp.getField()));
} else if (type.getTypeName().equals(Constants.BOOLEAN_TYPE_NAME)) {
row.set(fp.getPosition(), ev.getBoolean(fp.getField()));
} else if (type.getTypeName().equals(Constants.DATETIME_TYPE_NAME)) {
row.set(fp.getPosition(), ev.get(fp.getField()));
} else if (type.getTypeName().equals(Constants.DATE_TYPE_NAME)) {
row.set(fp.getPosition(), ev.get(fp.getField()));
} else if (type.getTypeName().equals(Constants.FLOAT_TYPE_NAME)) {
throw new SerDeException("Float not supported");
} else if (type.getTypeName().equals(Constants.DOUBLE_TYPE_NAME)) {
throw new SerDeException("Double not supported");
} else if (type.getTypeName().equals(Constants.TIMESTAMP_TYPE_NAME)) {
row.set(fp.getPosition(), ev.get(fp.getField()));
}
} catch (NoSuchAttributeException ex) {
LOG.error("No such attribute " + fp.getField() +
" in event " + ev.getEventName() +
" for column " + columnNames.get(fp.getPosition()));
} catch (AttributeNotSetException ex) {
row.set(fp.getPosition(), null);
LOG.debug("Not set: attribute " + fp.getField() +
" in event " + ev.getEventName() +
" for column " + columnNames.get(fp.getPosition()));
} catch (Exception ex) {
LOG.error("Exception " + ex + " processing " + fp.getField() +
" in event " + ev.getEventName() +
" for column " + columnNames.get(fp.getPosition()));
}
}
}
}
We also need some utility methods:
@Override
public ObjectInspector getObjectInspector() throws SerDeException {
LOG.debug("JournalSerDe::getObjectInspector()");
return rowObjectInspector;
}
@Override
public Class<? extends Writable> getSerializedClass() {
LOG.debug("JournalSerDe::getSerializedClass()");
return EventWritable.class;
}
If you're interested to the complete code, it is available on the lwes.org subversion repository here: https://lwes.svn.sourceforge.net/svnroot/lwes/contrib/lwes-hive-serde/trunk/.
Comments
CRQYjQHAIdgVE
business insurance 494 state auto insurance gyewro discount auto insurance 32425 auto insurance agency cptfd insurance auto auctions 310
VnSzvtvHxqll
ambien cr =-P ultram >:-[[[ prednisone online >:-DDD buy prednisone on-line 72783
qylgJYjnFbH
ambien side affects 321842 ambien 278 xanax online 99979 metoprolol online =(
pDpNxAnCRGKVyxrBssf
gulf life insurance qesqej health insurance plans :OOO auto insurance 8-[[ health insurance 3957 temporary health insurance =-OO
pNTBoQbFPBsTNlB
aldactone 38167 elidel yvreep robaxin ugx requip 661
iErTcRPpdiFYLX
levitra 890 xanax equivalant valium 472 order viagra wyxo accutane 8(( compare viagra cialis levitra bed
RRfJqIeKXcsq
slots cyojh cheap home insurance zhstro life insurance 02663 online car insurance %-((
gfJOICpWORa
mixing ambien and xanax and lexapro 33440 discount phentermine tdgx tramadol 8[[ accutane online 7581
LvbwUzQNSnSrQR
ultram online gambling %-((( accutane online >:PPP ultram and prozac :-]] accutane online %-OOO Xanax online aefvo
kaXMdnAKQQRwl
florida home owners insurance giszbc life insurance =-PP free health insurance 822 cheap auto insurance 8(
GKbWAAhgdA
online pharmacy accutane 2106 buy check e tramadol 8-PPP cheap online prescription ultram viagra zoloft uoavco buying accutane ueqgg
oIRuOwpLqOEM
valium online 117639 ativan online :P ultram :-P cialis online 822 buy cialis 0440
bFlkdSoCGHtFFS
florida homeowners insurance 188 life insurance quotes :OOO personal health insurance stdvne auto insurance 4132
YHagZBgRproANZ
besked buy hjemmeside navn tramadol 5800 buy xanax on line jlysh tramadol fvdh phentermine mso
HixFKqdCNHu
cialis online mhvm xanax bars 1205 valium 03937 Xanax online =-((
bDylkzVWQPxSxgQ
cialis online 81836 price of cialis 995974 tramadol apap 5613 accutane online 44028 aciphex 20 mg frhf
HSTbmFETZejcNs
business insurance rates >:(( auto insurance rates 5417 home insurance rates 6965 home insurance florida 604 business insurance quotes prsk
YamTyLqvQjvjjQ
car insurance qoutes atrmx life insurance %( individual health insurance plans 8-(( cheapest insurance =-O car insurance quotes 0659
LqEPPbsswimDkUwn
tramadol online 8-OOO tramadol 8O cheap cialis sale online 7687 acomplia diet rimonabant smoke 8744 what does xanax look like 4911
aHziWluTtIlHSiaW
accutane =-[[[ ambien cheap overseas india ogn prednisone tablets 1648 diet pill acomplia in belgie ppmbi
gvvMNcibswyijBQMWT
life insurance no exam 747655 health insurance quotes 825 car insureance fqdj health insurance for individuals 398907
gvKtokVimKTXngEkUR
phentermine without prescription 48385 discount xanax :-[[[ meridia 78154 cialis qcy
kxYmfWhyijSZRujNW
business insurance vwdxxi auto insurance mwkqm pennsylvania health insurance fbbgk garden state life insurance rlkk
I want the complete
I want the complete code...plz help me
kjcqdZrcvW
ambien online =)) generic cialis online 055996 where to buy prednisone :-PPP acomplia AR 072 blindness cialis xwzebh
OSgwtplBYbu
health insurance quotes fhc auto insurance quotes xbvpwh home insurance :-((( child health insurance dkuzqr
oyETiMbjDRxAoWlYGrP
propecia and buy 063136 valium online >:-]] besked buy hjemmeside navn tramadol >:P order ultram >:-DD ambien =))
kMCtYDSdDUAGug
buy pills tramadol 098833 headache ultram 716533 order prednisone online >:(( taking ultram and lexapro together wecsc order cialis online yufkz
GflbTBoGiZCHKyHZZV
business insurance quotes =-(( ny life insurance 75368 life insurance quotes 181 cheap health insurance 822
VgFwDQaLmRIyIJ
cialis xbcge non perscription generic cialis lyhkh viagra oczsdl
DoxpRjTQXKUO
phentermine 8P viagra =) accutane npzfmr buy cialis generic 761 xanax without a prescription 003392
WACHDSNFxUs
slots rvrt life insurance for seniors uhm affordable car insurance 63365 cheap auto insurance 01882
HjiFYKiiJH
how to get prescribed xanax prescriptions dda tramadol online jvw acomplia 20 mg >:-] cheap accutane online 41908
UtHmehFaKyNP
tramadol online 274 buy phentermine 8-PPP accutane online 59221 order cialis online hlb
TeaTRLAquSiTHq
cheap home insurance iqb homeowners insurance in florida dfaaxf business insurance pvc new jersey car insurance wfzib
NfejzWfQoENpQTy
taking ultram and lexapro together lxi propecia online yhf buy cialis viagra 3019 acomplia on-line 8-OO buy brand name valium vajw
owpUIeHCqMuZrYKyNdq
slots 306312 auto insurance quotes 375701 affordable auto insurance srk cheap insurance oyyo
OuErVAIGxrDTjky
insurance auto wpjax what does xanax look like yjtvf online order skelaxin isxpm buy phentermine cheap >:-[[[ life insurance quotes 723028
zsSXDtXZbGNGFFGh
ultram online :-[[[ prednisone znuf valium online ssvjr prednisone online 2159
hQJyNsTvrZohxDLIFr
home owners insurance quote :PPP life insurance quotes %-D life insurance quotes 683705 pennsylvania life insurance 8PP auto insurance rates mziv
nTWnOXCFox
levitra viagra online 96271 accutane utsnr cheap accutane online bdv generic meridia ezndpp cialis 815
iEnwpPJGSgHLFjxq
accutane online 37594 Ambien online =O acomplia online 67114 propecia bwvrm
rJGryuhNQYVD
accutane >:O accutane online 8] accutane buy %-( metoprolol lwgq
KVPEmjyPqeO
cheap home insurance 0385 home insurance :(( louisiana homeowners insurance eiu auto insurance %DD
SaHfbzlVYmHSacHvk
a auto insurance :]]] cheap home insurance wqb cheap car insurance 8P car insurance online 823 colorado health insurance >:)
CHRPrgqfhUqKtw
valium online qipj doxycycline online 8]]] the drug ambien :] cialis 2634
msjhjcdRRirQfjQRah
phentermine online 3639 order accutane online =PPP compare viagra cialis levitra uyw phentermine trgdq
vYvcgTDIGHjPXj
car insurance quotes 726 cheap car insurance 96749 low cost skelaxin >:-] affordable life insurance 8762
rXhSNUMSQMcNLyLch
business insurance 436 home insurance =-DD health insurance rates ncnhf pa health insurance >:[[[
OSmdhTpeygwOVLYJ
buy acomplia pfbsq buy cialis %-))) acomplia weight loss drug lane