Wednesday, February 12, 2020

Spark Structured Streaming Behaviour - part 1


The Environment

Once again, I use docker containers (as outlined in a previous post here) to give me Kafka, Spark and HDFS.

Things might not work out-of-the-box, so here are some tips:

Check the number of brokers with (SO):

echo dump | nc localhost 2181 | grep brokers

You can check the number of messages in Kafka (SO) with:

$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:32773 --topic test_datum --time -1
test_datum:0:10000

The Code

The toy code can be found here and looks something like:

import uk.co.odinconsultants.sssplayground.kafka.Consuming._
import uk.co.odinconsultants.sssplayground.joins.RunningAverageMain._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}

val stream  = streamStringsFromKafka(spark, kafkaUrl = "192.168.6.1:32773", topicName = "test_datum", parsingDatum).withWatermark("ts", "1 minute")

val ds      = stream.groupBy(window('ts, "1 minute", "1 minute"), 'id).agg(count('id), mean('amount)).toDF("timestamp", "id", "count", "mean")

Note that we must groupBy on the window:
The manual states: withWatermark must be called on the same column as the timestamp column used in the aggregate. For example, 
df.withWatermark("time", "1 min").groupBy("time2").count() 
is invalid in Append output mode, as watermark is defined on a different column from the aggregation column. Simply stated, for Append you need WaterMark.
[StackOverflow]

We set the trigger processing time to be 10s (see below) which is fine for our quantity of data. If it's too short for your data, you'll see WARNings like:

2020-02-12 14:40:26 WARN  ProcessingTimeExecutor:66 - Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 14605 milliseconds

We use Trigger.Continuous as obviously we don't want Once in this use case and Continuous gives:

Unknown type of trigger: ContinuousTrigger(10000)

if we're writing to Parquet (SO). This makes sense as Parquet as columnar so we need to need a relatively large set of data before we can write it.


Methodology

Between each test, we delete the Kafka topic with:

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic  test_datum

where test_datum is the name of our topic.


OutputMode

There are three OutputModes: Complete, Append and Update. We'll start with conceptually the easiest.


Complete

The code to start the query is this:

val query = ds.writeStream.format("console").outputMode(OutputMode.Complete()).option("truncate", "false").trigger(Trigger.ProcessingTime(10000)).start()

At 2020-02-11 15:57:57 we pumped in 1000 messages into Kafka over 3 keys with this script  and saw:

Batch: 0
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp                                 |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|1  |334  |500.5|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|0  |333  |501.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|2  |333  |500.0|
+------------------------------------------+---+-----+-----+

At 15:59:19, we pump in some more:

Batch: 1
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp                                 |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|2  |333  |500.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|1  |334  |500.5|
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|0  |333  |501.0|
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|1  |334  |500.5|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|0  |333  |501.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|2  |333  |500.0|
+------------------------------------------+---+-----+-----+

and then quickly and within the same minute (at precisely 15:59:34) we pump in 1000 more:

Batch: 2
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp                                 |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|2  |666  |500.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|1  |334  |500.5|
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|0  |666  |501.0|
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|1  |668  |500.5|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|0  |333  |501.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|2  |333  |500.0|
+------------------------------------------+---+-----+-----+

Look! Our values for this time window increased.

Further sending messages (16:01:55) outside that time window just adds new rows:

Batch: 3
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp                                 |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|2  |666  |500.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|1  |334  |500.5|
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|0  |666  |501.0|
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|1  |668  |500.5|
|[2020-02-11 16:01:00, 2020-02-11 16:02:00]|1  |334  |500.5|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|0  |333  |501.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|2  |333  |500.0|
|[2020-02-11 16:01:00, 2020-02-11 16:02:00]|0  |333  |501.0|
|[2020-02-11 16:01:00, 2020-02-11 16:02:00]|2  |333  |500.0|
+------------------------------------------+---+-----+-----+

But with each batch, we show all the data every collected, albeit aggregated.


Update

Same code as before other than now OutputMode is Update(). At 2020-02-12 06:37:07 we pumped 1000 messages  and we start our query with:

Almost immediately we see:

Batch: 0
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp                                 |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-12 06:37:00, 2020-02-12 06:38:00]|1  |334  |500.5|
|[2020-02-12 06:37:00, 2020-02-12 06:38:00]|0  |333  |501.0|
|[2020-02-12 06:37:00, 2020-02-12 06:38:00]|2  |333  |500.0|
+------------------------------------------+---+-----+-----+

We do some more testing and some Batch have similar values and some Batch are empty. But at 06:39:33 and 06:39:50, we pump another 1000 messages each into Kafka and see first:

Batch: 4
-------------------------------------------
+------------------------------------------+---+-----+-----+
|id                                        |ts |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-12 06:39:00, 2020-02-12 06:40:00]|1  |334  |500.5|
|[2020-02-12 06:39:00, 2020-02-12 06:40:00]|0  |333  |501.0|
|[2020-02-12 06:39:00, 2020-02-12 06:40:00]|2  |333  |500.0|
+------------------------------------------+---+-----+-----+

and then:

Batch: 6
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp                                 |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-12 06:39:00, 2020-02-12 06:40:00]|1  |668  |500.5|
|[2020-02-12 06:39:00, 2020-02-12 06:40:00]|0  |666  |501.0|
|[2020-02-12 06:39:00, 2020-02-12 06:40:00]|2  |666  |500.0|
+------------------------------------------+---+-----+-----+

(Batch: 5 was empty)

So, Update represents a rolling aggregation within the most recent time window.


Append

OutputMode.Append is a bit weird.

We have sent messages to Kafka but Spark only shows three empty batches after starting up. We send another 1000 messages at 2020-02-11 16:05:45:

Batch: 3
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp                                 |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-11 16:03:00, 2020-02-11 16:04:00]|2  |333  |500.0|
|[2020-02-11 16:03:00, 2020-02-11 16:04:00]|1  |334  |500.5|
|[2020-02-11 16:03:00, 2020-02-11 16:04:00]|0  |333  |501.0|
+------------------------------------------+---+-----+-----+

But these don't appear to be the most recent messages as their time window is not what we expect. So, we send 1000 more at 16:06:09 but the next 3 batches are empty again. So, we send 1000 more at 16:07:01 but note that it's our 16:06:09 messages that come through:

Batch: 7
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp                                 |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-11 16:05:00, 2020-02-11 16:06:00]|2  |333  |500.0|
|[2020-02-11 16:05:00, 2020-02-11 16:06:00]|1  |334  |500.5|
|[2020-02-11 16:05:00, 2020-02-11 16:06:00]|0  |333  |501.0|
+------------------------------------------+---+-----+-----+

There appears to be two things at work here:

First, Spark 2.4.0 [addendum: and v2.4.5 as well] doesn't appear to consumer messages until there is a next batch waiting. This appears to be expected but undesirable as JIRA-24156 notes:
consider a streaming aggregation query with watermark-based state cleanup. The watermark is updated after every batch with new data completes. The updated value is used in the next batch to clean up state, and output finalized aggregates in append mode. However, if there is no data, then the next batch does not occur, and cleanup/output gets delayed unnecessarily. This is true for all stateful streaming operators - aggregation, deduplication, joins, mapGroupsWithState.
Secondly, Spark does not consume the messages until a time of our withWatermark plus our window functions windowDuration has passed. I didn't see the 16:06:09 messages until after 16:08:28. (16:06-16:07 was the window, plus one minute of watermark plus some time on the Trigger mean it would be some time in 16:08 at the earliest that I see the messages).

I tried to solve this with:

val query = ds.writeStream.format("console").outputMode(OutputMode.Append()).option("truncate", "false").trigger(Trigger.Continuous(10000)).start()

but it doesn't work as now the code throws:

org.apache.spark.sql.AnalysisException: Continuous processing does not support EventTimeWatermark operations.;;

and removing the watermark results in:

org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;


Writing as Parquet

I run the code that writes Parquet to HDFS with:

--executor-cores 3 --num-executors 2 

as Docker seems to use all my CPUs despite telling to to use just 6 (cores x executors). I have 16 cores so using of multiples of 3, the Spark worker can only use 15 giving me one free for my Spark shell.

Anyway, persisting with Complete gave

Data source parquet does not support Complete output mode; 

This makes sense as Complete contains all the data and we can't stream it as the only time the data can be frozen in stone is when the stream is no more.

Persisting with Update gave me:

Data source parquet does not support Update output mode;

Since we're updating on the fly with Update,  it doesn't make sense to write this to Parquet either as the results that come earlier are rendered incorrect by the results that come after but in the same window.

Persisting with Append works but with the aforementioned caveat that you can't see the data until another batch comes in.


Sorting

To make our diagnosis easier, we might like to sort the batches. Unfortunately,

val query = ds.sort('id).writeStream.format("console") .outputMode(OutputMode.Append()).start()

gives:

org.apache.spark.sql.AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;;

which kinda makes sense as you can't sort a (potentially infinite stream).


No comments:

Post a Comment