Creating Nested data (Parquet) in Spark SQL/Hive from non-nested data

Posted on April 4, 2015 by

0


  • Sometimes you need to create denormalized data from normalized data, for instance if you have data that looks like
CREATE TABLE flat (
  propertyId string,
  propertyName String,
  roomname1 string,
  roomsize1 string,
  roomname2 string,
  roomsize2 int,
  ..
)

but we want something like

 

CREATE TABLE nested (
   propertyId string,
   propertyName string,
   rooms <array<struct<roomname:string,roomsize:int>>
)

 

This can be done with a pretty horrific query, but we want to do it in spark sql by manipulating the rows programmatically.
Let’s see step by step, loading data from a CSV file with a flat structure, and inserting in a nested hive table.
These commands can be run from spark-shell. Later, when we write the buildRecord() function, we’ll have to wrap everything in an object because any code that is going to be executed in the workers needs to extend the Serializable trait.

 

import com.databricks.spark.csv._
import org.apache.spark.sql.Row
// build hive context
val hc = new org.apache.spark.sql.hive.HiveContext(sc)
// load data (flat)
val data = hc.csvFile("hdfs:///data/data.csv")
// make hive aware of our RDD as a table
hc.registerRDDAsTable(data,"data")

Notice that we used

hc.registerRDDAsTable(data)

instead of

data.registerTempTable()

I found that in spark 1.2.1, registerTempTable won’t make the table available to hive, and if you want to transfer data between actual hive tables and temporary tables, you have to use registerRDDAsTable or you’ll get a ‘table not found’ error from hive.

 

SchemaRDDs return data in form of object of class Row. Row is also how SchemaRDDs expect to receive data and hive tables are basically one form of SchemaRDDs.
If an RDD built from a CVS file had the same schema we could just do something like

hc.sql("insert into table1 select * from table2")

 

but in this case, before inserting, we have to transform the data so it has the same structure as the table we want to put it in.

We observe also that the structure of the record is two scalars, followed by an array of four structs.

We want to store it in a hive nested table, so we create it:

 

  hc.sql("""CREATE TABLE IF NOT EXISTS nested (
   propertyId string,
   propertyName string,
   rooms array<struct<roomname:string,roomsize:int>>
) STORED AS PARQUET
""")

 

We can then build the record as:

 val nestedRDD = data.map(buildRecord(_))

// this builds a nested record
 def buildRecord(r:Row):Row = {
        println(r)
    var res  = Seq[Any]()
    // takes the first two elements
    res = res ++ r.slice(0,2) 
    // now res = [ 'some id','some name']

    // this will contain all the array elements
    var ary = Seq[Any]() 
    // we assume there are 2 groups of columns
    for (i <- 0 to 1 ) {      
       // 0-based indexes, takes (2,3) (4,5) .. 
       //and converts to appropriate type
       ary = ary :+ Row( r.getString( 2 + 2 * i), 
                         r.getString(2 + 1 + 2*i).toInt )
    }
    // adds array as an element and returns it
    res = res :+ ary 
    Row.fromSeq(res)
  }
}

Notice a few things here:

  • we had to convert the data approriately. CSV files have a header with the field name, but not the type, so we must know in advance how to convert data. This could be done with a case class in Scala.
  • We convert the scalars, when we have an array we just build a sequence (in this case a list), and when we have  a struct we use Row
  • Rows can be built in two ways, one as Row( element1, element2,..), but if you want to build them from a sequence, use Row.fromSeq like above.

Assuming the table called ‘nested’ was created as the CREATE TABLE definition earlier, we can use it to infer its schema and apply it to the newly built rdd.

// copy schema from hive table and apply to RDD
val nested = hc.sql("select * from nested limit 0")
val nestedRDDwithSchema = hc.applySchema(nestedRDD, nested.schema)

 

now we can insert, after registering the new rdd as a table

 

hc.registerRDDAsTable(nestedRDDwithSchema, "nestedrdd")
hc.sql("insert into nested select * from nestedrdd")

 

et voilà !
Now data is available in hive/parquet/sparksql as nested:

hive> select * from nested;
OK
bhaa123	My house	[{"roomname":"kitchen","roomsize":134},{"roomname":"bedroom","roomsize":345}]
pasa372	Other house	[{"roomname":"living room","roomsize":433},{"roomname":"bedroom","roomsize":332}]

Let’s see the complete code:

import com.databricks.spark.csv._
import org.apache.spark.sql.Row
import org.apache.spark.{SparkConf,SparkContext}

object nesting extends Serializable {
 def main(args: Array[String])  {
  val sc = new SparkContext(new SparkConf())

  val hc = new org.apache.spark.sql.hive.HiveContext(sc)

  // Change this to your file location
  val data = hc.csvFile("file:///data/data.csv")
  hc.registerRDDAsTable(data,"data")

  hc.sql("""CREATE TABLE IF NOT EXISTS nested (
   propertyId string,
   propertyName string,
   rooms array<struct<roomname:string,roomsize:int>>
) STORED AS PARQUET
""")

  val nestedRDD = data.map(buildRecord(_))

  // after the map, Spark does not know the schema
  // of the result RDD. We can just copy it from the
  // hive table using applySchema
  val nested = hc.sql("select * from nested limit 0")
  val schemaNestedRDD = hc.applySchema(nestedRDD, nested.schema)

  hc.registerRDDAsTable(schemaNestedRDD,"schemanestedrdd")

  hc.sql("insert overwrite table nested select * from schemanestedrdd")

 }

  def buildRecord(r:Row):Row = {
        println(r)
    var res  = Seq[Any]()
    // takes the first two elements
    res = res ++ r.slice(0,2) 
    // now res = [ 'some id','some name']
    var ary = Seq[Any]() // this will contain all the array elements
    // we assume there are 2 groups of columns
    for (i <- 0 to 1 ) {      
       // 0-based indexes, takes (2,3) (4,5) .. 
       // and converts to appropriate type
       ary = ary :+ Row( r.getString( 2 + 2 * i), 
                         r.getString(2 + 1 + 2*i).toInt )
    }
    // adds array as an element and returns it
    res = res :+ ary 
    Row.fromSeq(res)
  }
}

 

The code is available on github here: https://github.com/rcongiu/spark-nested-example

Comments

comments

Posted in: programming