Joins in Hadoop using CompositeInputFormat

Posted on June 7, 2009


One of the first questions that a ‘traditional’ ETL engineer asks when learning hadoop is, “How do I do a join ?”

For instance, how can we do in hadoop something like querying for the names of all employees who are in a California city:

SELECT e.name, c.name from employees e INNER JOIN cities c
    on e.city_id = c.id AND c.state ='CA'

If one dataset is big and the other is small enough that you’re sure it will fit into memory, you can take advantage of the Distributed Cache feature that can copy around a berkeleyDB or any other file-based hash that you can use for your join in either the map or the reducer (wherever it makes sense).

The method I discuss here is to use CompositeInputFormat which you can use if:

  • all of the files you want to join are sorted
  • all of them have the same joining key
  • the files are too big to do the join using DistributedCache

The way it work is similar in principle to mergesort. If you have n files sorted by their join key, you can combine them easily, reading the records one by one from each files so that you are always reading the records with the same key.

This is how CompositeInputFormat works. It will read your files and deliver a TupleWritable object to the mapper.

To configure a job beside setting the CompositeInputFormat class as input format, you have to specify the join expression. You can specify outer or inner. I haven’t seen a way to do left or right outer.

jobConf.setInputFormat(CompositeInputFormat.class);
jobConf.set("mapred.join.expr", CompositeInputFormat.compose(
                "outer", KeyValueTextInputFormat.class,
                FileInputFormat.getInputPaths(jobConf)));

You also have to specify what is actually the format of the files you’re reading. That’s why you see KeyValueTextInputFormat.class in the code above.
CompositeInputFormat will delegate picking the key to that class. In this example, it will take the first column in a tab-separated file, as it is default behavior for KeyValueTextInputFormat.

The cool thing about CompositeInputFormat is that you can pick your i

What next ? If you look at the example in the hadoop distribution, src/examples/org/apache/hadoop/examples/Join.java, you see that IdentityMapper and IdentityReducer are used to just output the join record.
What if you need to manipulate it though?
For example, a common task in ETL is dumping data periodically from a source, and build a file with the changes to keep another system that may be remote, so we need to minimize the bandwidth usage.
Assuming we have a ‘old’ and a ‘new’ file, we need to transmit:
- the records that are in ‘new’ and not in ‘old’
- the records that are both in ‘new’ and ‘old’, and whose content is different
- the records that are in ‘old’ but not in ‘new’, marked for deletion.

Let’s assume our final format is :

key1 val1
key2 val2 
key3 -

where the first 2 records are added/updated, the third removed.

The mapper will take care of joining ‘old’ and ‘new’. We can then just leave the IdentityMapper.
The reducer will pick up a (Text,Iterator<TupleWriter>). There should be just one value per key, if not it probably means the input files are not actually sorted.

        @Override
        public void reduce(Text key, Iterator<TupleWritable> values,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {
            // should be just one value
            int count = 0;
            while(values.hasNext()) {
                TupleWritable val = values.next();

                if(count>0)
                    throw new IOException("Can't have 2 tuples for same key, there must be something wring in the join");

                boolean hasOld = val.has(0);
                boolean hasNew = val.has(1);

                if(hasNew && !hasOld ) {
                    // new record
                    output.collect(key,(Text)val.get(1));
                } else if( hasNew && hasOld ) {
                    if(! val.get(0).equals(val.get(1))) {
		// modified record
                        output.collect(key,(Text)val.get(1));
                    }
                } else if(hasOld && ! hasNew) {
                    Text u = new Text("-" + ((Text)val.get(0)).toString());
	        // remove
                    output.collect(key, u);
                }
                count++;
            }
        }

TupleWritable keeps the records from the files it’s joining in the same order. Since we’re doing an outer join, we’re using tuple.has() to see if the nth component of the tuple is actually present, and tuple.get() to actually get it.
Don’t forget to set the appropriate classes for output:

        jobConf.setOutputFormat(TextOutputFormat.class);
        jobConf.setOutputValueClass(TupleWritable.class);

This is not the most efficient way, it’s just an example. It would actually be more efficient to do the checks in the mapper with no reducer at all. This way we save the shuffle/sort on the reducer.

Tagged: ,
Posted in: programming