Friday, June 12, 2020

Spark, Parquet and Schemas


Recently, I've been playing with Spark and Parquet. Here are a few tidbits I thought were interesting.

Nulls

I was surprised to see that attaching a Schema to a DataFrame did not enforce its constraints ("Schema object passed to createDataFrame has to match the data, not the other way around" - SO). Instead, Spark does a "best efforts" attempt to have data read from HDFS to comply with the schema, but it does not throw exceptions if it can't be done. For instance, columns in the schema that don't appear in the file are filled with nulls even if the schema says they cannot be null.
"When you define a schema where all columns are declared to not have null values , Spark will not enforce that and will happily let null values into that column. The nullable signal is simply to help Spark SQL optimize for handling that column. If you have null values in columns that should not have null values, you can get an incorrect result or see strange exceptions that can be hard to debug."
Spark: The Definitive Guide: Big Data Processing Made Simple (p102)
The solution was to deserialize everything into the appropriate case class for the Dataset. Since I had to do bespoke validations on the objects, this wasn't too great an issue.

Note that "when writing Parquet files, all columns are automatically converted to be nullable for compatibility reasons" (from the Spark docs). The compatibility being the Parquet spec (see below) which we have to map to when writing.


Floating points

Reading a parquet file that had decimal(38, 10) for a floating point type, taking its schema with DataFrame.schema and then using that self same schema to load the Parquet again with:

spark.read.schema(theSchema).parquet("...").as[MyDomainObject].show()

unfortunately gives me:

org.apache.spark.sql.AnalysisException: Cannot up cast `REDACTED` from decimal(38,10) to decimal(38,18) as it may truncate
The type path of the target object is:
- field (class: "scala.math.BigDecimal", name: "REDACTED")
- root class: "MyDomainObject"
You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object;

where MyDomainObject has a field that is simply a scala.math.BigDecimal.

The problem seems to be due to the underlying store from whence the data was plucked (as this SO question shows a similar problem with MySQL). 

Modifying the schema by hand to use only decimal(38, 18) solved the problem. This inconvenience has its own JIRA that's unfortuately marked as "Won't Fix".

Since "the types supported by the file format are intended to be as minimal as possible" (from the Parquet docs), the floating point types are your standard IEEE formats. It appears that if you want BigDecimals, the data will be saved in a Java representation by using Parquet's native BYTE_ARRAY types since Parquet is language agnostic. Spark assumes as it's defined as a decimal(38, 18) in SYSTEM_DEFAULT here.

Since this is the default, if you save your data in anything less, you must read the file with a schema if you're hoping to have a Dataset.as[MyDomainObject]. You might need to cast the original values as this StackOverflow answer suggests or else the decimal point might not be were you were expecting it to be.


Timestamps

As already mentioned, the Parquet types are restricted to a IEEE compliant types and a few primitive others. Therefore, all the richness of the Java ecosystem is lost and there needs to be a mapping between these primitives and Java classes. It appears that Dates and Timestamps are converted from the INT64 type as you'll see casting errors if your types misalign. 


Sharing schemas

You can load and save the schema as JSON if you like (as this SO answer shows). You can also add metadata to the schema with a simple:

import org.apache.spark.sql.types._
val metaBuilder = new MetadataBuilder()
val sillyFields = schemaFromJsonFile.map(_.copy(metadata = metaBuilder.putString("You", "suck!").build))
val sillySchema = StructType(sillyFields.toArray)
spark.read.schema(sillyStructType).parquet("...").show()

and all is good with the World.

The advantage of this is that you can pass annotated schemas between teams in a (fairly) human-readable format like JSON.


Conclusion

Spark schemas and Parquet do not align one-to-one since Spark schemas could apply to any file format (or even just data in memory). Some elements of the schema are lost when written to Parquet (eg, everything in Parquet is nullable irrespective of the Spark schema) and some types in Scala/Java are not represented in Parquet.

No comments:

Post a Comment