Tuesday, September 22, 2020

Spark Gotchas


Some miscellaneous notes on Spark I've been making the last few weeks.


Triggers and OutputModes when Aggregating

I was experimenting with aggregating a query that reads from Kafka and writes Parquet to HDFS.

The interplay between watermarking, Trigger and OutputMode look a bit like this:
  • Queries that aggregate with OutputMode.Append yet with no watermark will always throw "Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;" irrespective of the Trigger
  • Queries that aggregate with OutputMode.Complete  with "Data source parquet does not support Complete output mode;". Even text files will barf with "Data source text does not support Complete output mode;".
  • Queries that aggregatie with OutputMode.Append but with a watermark behave like this:

TriggerBehaviour
ContinuousUnknown type of trigger: ContinuousTrigger(5000) [1] 

or

Continuous processing does not support EventTimeWatermark operations.;;
ProcessingTimeNo errors but last batch is not pulled from Kafka
none ("When no trigger is specified, Structured Streamin stats the processing of a new batch as soon as the previous one is finished." - Stream Processing with Apache Spark)No errors but last batch is not pulled from Kafka
OnceNo errors but nothing is pulled from Kafka.
Running the query twice with awaitTermination pulls some elements. 

The reason Spark leaves a little something on Kakfa is (apparently) for efficiency. There appear to be a pull request for making Spark consume the last bit from Kafka but in its associated Jira somebody mentions my StackOverflow question that talks about Spark's behaviour being unchanged.

So perplexed was I that I asked the Spark mailing lists. The answer from Spark committer, Jungtaek Lim, was illuminating:

"Unfortunately there's no mechanism on SSS to move forward only watermark without actual input, so if you want to test some behavior on OutputMode.Append you would need to add a dummy record to move watermark forward."

This doesn't seem to be because we're writing Parquet. Text files will also behave this way.


Defragging Question

Spark will create a new file per executor thread per ProcessTrigger time (as this test code demonstrates). This can lead to a lot of files that could ultimately choke your Name Node if you're persisting to HDFS. This may necessitate a roll-your-own fix [SO].

If you're using DataBricks, you can do this "bin-packing" by running the OPTIMIZE command. Unfortunately, I could not see the code to do this inside the Delta library DataBricks have open sourced. (I discuss other ways of dealing with this problem without Delta in this blog post).

Since our use case at work is such that we only want the output of stream processing once a day, we opted for Trigger.Once. Since we're not aggregating, all of the data is pulled and processed. Since this is streaming in a very loose sense of the word (it's more batch processing that gets its data in one short, sharp stream), we avoided having large numbers of small files.


Exceptions in processing DataFrames

Spark can optionally have a "best efforts" approach to exceptions rather than failing the whole process.

spark.range(10000).repartition(7).map { i =>
  if (i == 9999) { Thread.sleep(5000); throw new RuntimeException("oops!") }
  else i
}.write.option("mapreduce.fileoutputcommitter.algorithm.version", ALGO_VERSION).mode("append").parquet(s"/tmp/test_algo_$ALGO_VERSION")

When ALGO_VERSION=1, nothing is written to HDFS. On the other hand, when ALGO_VERSION=2, the partition in which the exception occurred is not written but all the other ones are.

see https://databricks.com/blog/2017/05/31/transactional-writes-cloud-storage.html


DataFrame types

You can see the schema as JSON with:

spark.read.parquet("THE_FILE").schema.json

The type "timestamp" would map the data to a java.sql.Timestamp (the equivalent of a SQL TIMESTAMP type). This Java type is a subclass of java.util.Date.

The type "date" would map to a java.sql.Date (the equivalent of a SQL DATE type). This is also a subclass of java.util.Date.

See this StackOverflow page  for descriptions of the SQL types.

Trying to read from a Parquet file that uses "date" and coercing it to a "timestamp" will result in:

Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary


Idempotency

DataStreamWriter.foreachBatch is experimental and "is supported only the in the micro-batch execution modes (that is, when the trigger is not continuous)". It provides a batchId that "can be used deduplicate and transactionally write the output (that is, the provided Dataset) to external systems. The output Dataset is guaranteed to exactly same for the same batchId (assuming all operations are deterministic in the query" [Spark docs].


[1] "You can set the optional Continuous Trigger in queries that satisfy the following conditions: Read from supported sources like Kafka and write to supported sinks like Kafka, memory, console." SO quoting DataBricks.

No comments:

Post a Comment