Thursday, December 21, 2017

Parquet vs. Avro vs. Orc


Different big data access patterns require different data formats. Here are some notes I made while playing with the common ones.

File formats

"Avro is a Row based format. If you want to retrieve the data as a whole you can use Avro. Parquet is a Column based format. If your data consists of lot of columns but you are interested in a subset of columns then you can use Parquet" (StackOverflow).

Parquet

Parquet is based on Dremel which "represents nesting using groups of fields and repetition using repeated fields. There is no need for any other complex types like Maps, List or Sets as they all can be mapped to a combination of repeated fields and groups" (Twitter blogs).

You can get the Parquet Tools from here. It allows you to examine the Parquet files with commands like this on some address data:

$ hadoop jar ~/Tools/parquet-tools-1.9.0.jar meta  /HDFS/FILE.parquet
17/12/14 09:02:59 INFO hadoop.ParquetFileReader: Initiating action with parallelism: 5
17/12/14 09:02:59 INFO hadoop.ParquetFileReader: reading another 1 footers
17/12/14 09:02:59 INFO hadoop.ParquetFileReader: Initiating action with parallelism: 5
file:        /HDFS/FILE.parquet
creator:     parquet-mr version 1.5.0-cdh5.12.0 (build ${buildNumber})
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"LON","type":"double","nullable":true,"metadata":{}},{"name":"LAT","type":"double","nullable":true,"metadata":{}},{"name":"NUMBER","type":"string","nullable":true,"metadata":{}},
.
.


file schema: spark_schema
--------------------------------------------------------------------------------
LON:         OPTIONAL DOUBLE R:0 D:1
LAT:         OPTIONAL DOUBLE R:0 D:1
NUMBER:      OPTIONAL BINARY O:UTF8 R:0 D:1
.
.

row group 1: RC:16 TS:1067 OFFSET:4
--------------------------------------------------------------------------------
LON:          DOUBLE SNAPPY DO:0 FPO:4 SZ:181/177/0.98 VC:16 ENC:BIT_PACKED,PLAIN,RLE
LAT:          DOUBLE SNAPPY DO:0 FPO:185 SZ:181/177/0.98 VC:16 ENC:BIT_PACKED,PLAIN,RLE
NUMBER:       BINARY SNAPPY DO:0 FPO:366 SZ:95/95/1.00 VC:16 ENC:PLAIN_DICTIONARY,BIT_PACKED,RLE
.
.

Looking at the stats demonstrate that your data really is partitioned as you expect and has sensible splits (eg, A-z is wrong but A-F, F-M etc is better). See Ryan Blue's lectures here and his slide deck here for more information.

This is the metadata of a Parquet file that was generated from a Spark DataFrame that looked like:

scala> df.printSchema
root
 |-- LON: double (nullable = true)
 |-- LAT: double (nullable = true)
 |-- NUMBER: string (nullable = true)

[Note that NUMBER is actually a string because some house numbers can look like "221b" - as for Sherlock Holmes].

Because Parquet only accesses the parts of files it needs, it's been reported to be even faster than having the data in memory.

The R and D stand for Repetition Level and Definition Level.

"Definition levels specify how many optional fields in the path for the column are defined" (Parquet documentation) Put another way "to support nested records we need to store the level for which the field is null... from 0 at the root of the schema up to the maximum level for this column". In our case, it's 1 as we expect one or zero (they're nullable) values for our columns.

Repetition is 0 since none of our fields live in a collection ("in a flat schema there is no repetition and the repetition level is always 0")

Of course, the downside of a columnar format is when you want to retrieve/use many columns or even whole rows. This is where Avro might be a better choice.

Avro

"The project was created to address the major downside of Hadoop Writables: lack of language portability. Avro assumes the schema is always present - at read and write time - which makes for very compact encoding" [1] and it "suports schema evolution" (SlideShare).

Orc

"The ORC format showed up in Hive 0.11." (MapR).  It "breaks rows into row groups and applies columnar compression and indexing within these row groups... If a column is sorted, relevant records will get confined to one area on disk and the other pieces will be skipped very quickly." (HortonWorks)

It "offers excellent compression" [ibid]. Indeed, when I was storing the same data structure (for open source address data for Austria) in Parquet and Orc files, Orc was roughly twice as efficient.

Embarrassingly good compression

Although Parquet and Orc produce roughly equivalent sized files, Orc has a neat trick up its sleeve that is fantastic under certain circumstances. If you have a common field, Orc appears to compress it very efficiently. This is very nice if you're using massively denormalized data that once lived in a highly normalized RDMBS.

To demonstrate, in Spark, let's do:

scala> val fname = "/user/Data/repeated_large_val"

scala> val _1000Chars = "012345678901234... " // 1000  long String

scala> sc.parallelize(1 to 100000000, 100).cache().map(x => (x, _1000Chars)).toDF.write.mode(org.apache.spark.sql.SaveMode.Overwrite).parquet(fname)

scala> sc.parallelize(1 to 100000000, 100).cache().map(x => (x, _1000Chars)).toDF.write.mode(org.apache.spark.sql.SaveMode.Overwrite).orc(fname + "_orc")

Now, on the UNIX CLI:

$ hadoop fs -du -h /user/Data/
567.9 M  1.7 G   /user/Data/repeated_large_val
2.6 M    7.8 M   /user/Data/repeated_large_val_orc

That's a saving of 2 orders of magnitude!

And just to show there is nothing up my sleeve:

scala> val orcFromFile = spark.read.orc(fname + "_orc")
orcFromFile: org.apache.spark.sql.DataFrame = [_1: int, _2: string]

scala> orcFromFile.count()
res15: Long = 100000000

Partitioning

Note the effects of partitioning a Spark Dataset on the underlying file structure. It can break the file into sub-directories where a key becomes a directory name.

For example, take this code:

      spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .csv("IN_FILE").sort(partitionBy)
        .write.mode(SaveMode.Overwrite)
        .parquet("OUT_UNPARTITIONED")

Creates a file structure like this:

$ hadoop fs -ls OUT_UNPARTITIONED.parquet | head
Found 201 items
-rw-rw-r--   3 user user          0 2017-11-23 09:10  OUT_UNPARTITIONED.parquet/_SUCCESS
-rw-rw-r--   3 user user     489818 2017-11-23 09:09  OUT_UNPARTITIONED.parquet/part-00000-51284a5b-ebf6-47c4-8188-315e09f240e1-c000.snappy.parquet
-rw-rw-r--   3 user user     565884 2017-11-23 09:09  OUT_UNPARTITIONED.parquet/part-00001-51284a5b-ebf6-47c4-8188-315e09f240e1-c000.snappy.parquet
.
.

Whereas, partitioning so:

      spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .csv("IN_FILE").sort(partitionBy)
        .write.partitionBy("STREET").mode(SaveMode.Overwrite)
        .parquet("OUT_PARTITIONED")

produces a directory structure like:

$ hadoop fs -ls OUT_PARTITIONED.parquet | head
Found 69415 items
drwxrwxr-x   - user user          0 2017-11-23 09:16 OUT_PARTITIONED.parquet/STREET=%22%22%22Glück-auf%22%22-Straße%22
drwxrwxr-x   - user user          0 2017-11-23 09:16 OUT_PARTITIONED.parquet/STREET=%22Kellergasse %22%22Moorberg%22%22%22
drwxrwxr-x   - user user          0 2017-11-23 09:16 OUT_PARTITIONED.parquet/STREET=0Montangebiet
drwxrwxr-x   - user user          0 2017-11-23 09:16 OUT_PARTITIONED.parquet/STREET=0Tallage

where in this particular instance, I had street data. Note that since I partitioned on STREET, each street now has its own directory.

This can be handy when you want to join within a partition but be careful you don't create too many entries in the Hadoop NameNode.

You get a similar difference if you use Orc, that is:

.
.
   .write.partitionBy("STREET").mode(SaveMode.Overwrite).orc("OUT_PARTITIONED_ORC")


Data interchange

Hive does not mandate any particular file format. Files are stored verbatim.” [1] Indeed, if we load a (Parquet) file with Spark and save it into the HDFS directory used byHive:

scala> val df = spark.read.parquet(file)
scala> df.write.saveAsTable("HIVE_DB_NAME.austria_table")

then open Hive, we can read it:

$ hive
hive> use HIVE_DB_NAME;
OK
Time taken: 1.435 seconds
hive> show tables;
OK
austria_table
hive> select count(*) from austria_table;
.
.
Stage-Stage-1: Map: 2  Reduce: 1   Cumulative CPU: 23.32 sec   HDFS Read: 44969 HDFS Write: 8 SUCCESS
Total MapReduce CPU Time Spent: 23 seconds 320 msec
OK
2804994
Time taken: 44.574 seconds, Fetched: 1 row(s)

Now, if we use Spark to do a similar thing with Orc:

scala> val dfOrc = spark.read.orc(“/Data/OpenAddress/austria_partionedByStreet_2_orc")
scala> val df = spark.read.option("header", "true").option("inferSchema", "true").csv("/Data/OpenAddress/austria.csv")
scala> df.write.mode(SaveMode.Overwrite).saveAsTable("HIVE_DB_NAME.austria_table_orc")

We can read it just as easily in Hive:

hive> select count(*) from austria_table_orc;
.
.
2804994
Time taken: 36.396 seconds, Fetched: 1 row(s)

[1] Hadoop, the Definitive Guide


1 comment:

  1. Thanks, Very nice article. Also the links mentioned here are very informative.

    ReplyDelete