Back Pressure
Backpressure is defined at Wikipedia in the context of routing "as an algorithm for dynamically routing traffic over a multi-hop network by using congestion gradients."
Note that back pressure within Spark was once an option (see the Spark property spark.streaming.backpressure.enabled). However, it appears that back pressure is not necessary in Spark Structured Streaming from Kafka (StackOverflow):
"Structured Streaming cannot do real backpressure, because, such as, Spark cannot tell other applications to slow down the speed of pushing data into Kafka.
"If you mean dynamically changing the size of each internal batch in Structured Streaming, then NO. ...Generally, Structured Streaming will try to process data as fast as possible by default. There are options in each source to allow to control the processing rate, such as maxFilesPerTrigger in File source, and maxOffsetsPerTrigger in Kafka source."
In general, Kafka consumers don't need back pressure. Note what the creators of the .NET Kafka client write: "The Kafka consumer will only pull from Kafka as fast as you are handling the messages. If you are forwarding the messages consumed from Kafka onto another queue, simply pause before adding more to that queue if it is full... If you question was just to not poll too fast on consumer side (to avoid taking too much memory), then yes, pause will be ok when available. You can also simply not call Poll when your buffer is full"
What does the notion of a DataFrame's mean in the world of streams? "When you retrieve the data at first, the number of partitions will be equal to the number of Kafka partition... If your Kafka topic has only 1 partition, that means that prior to groupByKey, your internal stream will contain a single partition, which won't be parallalized but executed on a single executor. As long as your Kafka partition count is greater than 1, your processing will be parallel. After the shuffle boundary, Spark will re-partition the data to contain the amount of partitions specified by the spark.sql.shuffle.partitions." (StackOverflow)
So, although you might have set the org.apache.spark.sql.streaming.OutputMode to be Append, the files are not actually appended. A new file is created each trigger time. If the OutputMode for Parquet is Complete, you'll get "Data source parquet does not support Complete output mode" because the structure of a Parquet file is such that its columns are uninterrupted and lie back-to-back.
Partitions and Parallelism
What does the notion of a DataFrame's mean in the world of streams? "When you retrieve the data at first, the number of partitions will be equal to the number of Kafka partition... If your Kafka topic has only 1 partition, that means that prior to groupByKey, your internal stream will contain a single partition, which won't be parallalized but executed on a single executor. As long as your Kafka partition count is greater than 1, your processing will be parallel. After the shuffle boundary, Spark will re-partition the data to contain the amount of partitions specified by the spark.sql.shuffle.partitions." (StackOverflow)
Many, Small Files
Note that the nature of streaming means many files may be created (at least one per mapper per trigger duration - "interval between checkpoints"). Too many files can swamp the Hadoop Name Node so you may want to curb this. Evo Eftimov (in his blog) talks of ways you can do this. Amongst his ideas, you can increase the trigger time or coalesce them in a batch job. We do the latter but have not got to the tuning stage yet. The option, maxFilesPerTrigger, appeared to make no difference to us.
"Note that when writing DataFrame to Parquet even in “Append Mode”, Spark Streaming does NOT append to already existing parquet files – it simply adds new small parquet files to the same output directory... The columns in parquet are stored sequentially one after another and the next such table data segment has to be ready before beginning to write to a parquet file." [ibid]
"In terms of purging old parquet files you may want to partition the data and then periodically delete old partitions as needed. Otherwise you can't just delete rows if all the data is being written to the same output path." (StackOverflow). Consequently, we partition the incoming streams on time and then delete old directories after they have been collated by a Spark batch job.
Unions and watermarks
Streams can be unioned as they are persisted to the same sink. However, at first I was getting the error "There are [1] sources in the checkpoint offsets and now there are [2] sources requested by the query. Cannot continue." This appeared to be just remarking that what it thought was a single source is actually now 2. Adding withWatermark appears to have fixed it
"A streaming query can have multiple input streams that are unioned or joined together... the global watermark will safely move at the pace of the slowest stream and the query output will be delayed accordingly" (the Spark docs).
Caveat
To summarise our architecture:
- Reads from a union multiple streams and writes partitioned on a timestamp.
- Coalesces each partition one-by-one sorting the data on field X as it goes.
It is currently working fine in QA but has yet to meet production levels of data.
A good post. Thanks for sharing
ReplyDelete+1, thanks for this.
ReplyDeleteQuestion: did you add withWatermark to the unioned stream or the two streams individually? I still see the exception "There are [1] sources in the checkpoint offsets and now there are [2] sources requested by the query. Cannot continue." when unioning a new source, whether it is watermarked alone, either stream is watermarked, or there is a global watermark applied to the unioned result.