The Environment
Once again, I use docker containers (as outlined in a previous post here) to give me Kafka, Spark and HDFS.
Things might not work out-of-the-box, so here are some tips:
Check the number of brokers with (SO):
echo dump | nc localhost 2181 | grep brokers
You can check the number of messages in Kafka (SO) with:
$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:32773 --topic test_datum --time -1
test_datum:0:10000
The Code
The toy code can be found here and looks something like:
import uk.co.odinconsultants.sssplayground.kafka.Consuming._
import uk.co.odinconsultants.sssplayground.joins.RunningAverageMain._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val stream = streamStringsFromKafka(spark, kafkaUrl = "192.168.6.1:32773", topicName = "test_datum", parsingDatum).withWatermark("ts", "1 minute")
val ds = stream.groupBy(window('ts, "1 minute", "1 minute"), 'id).agg(count('id), mean('amount)).toDF("timestamp", "id", "count", "mean")
The manual states: withWatermark must be called on the same column as the timestamp column used in the aggregate. For example,
df.withWatermark("time", "1 min").groupBy("time2").count()
is invalid in Append output mode, as watermark is defined on a different column from the aggregation column. Simply stated, for Append you need WaterMark.[StackOverflow]
We set the trigger processing time to be 10s (see below) which is fine for our quantity of data. If it's too short for your data, you'll see WARNings like:
2020-02-12 14:40:26 WARN ProcessingTimeExecutor:66 - Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 14605 milliseconds
We use Trigger.Continuous as obviously we don't want Once in this use case and Continuous gives:
Unknown type of trigger: ContinuousTrigger(10000)
if we're writing to Parquet (SO). This makes sense as Parquet as columnar so we need to need a relatively large set of data before we can write it.
Methodology
Between each test, we delete the Kafka topic with:
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test_datum
where test_datum is the name of our topic.
OutputMode
There are three OutputModes: Complete, Append and Update. We'll start with conceptually the easiest.
Complete
The code to start the query is this:
val query = ds.writeStream.format("console").outputMode(OutputMode.Complete()).option("truncate", "false").trigger(Trigger.ProcessingTime(10000)).start()
Batch: 0
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|1 |334 |500.5|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|0 |333 |501.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|2 |333 |500.0|
+------------------------------------------+---+-----+-----+
At 15:59:19, we pump in some more:
Batch: 1
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|2 |333 |500.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|1 |334 |500.5|
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|0 |333 |501.0|
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|1 |334 |500.5|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|0 |333 |501.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|2 |333 |500.0|
+------------------------------------------+---+-----+-----+
Batch: 2
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|2 |666 |500.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|1 |334 |500.5|
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|0 |666 |501.0|
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|1 |668 |500.5|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|0 |333 |501.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|2 |333 |500.0|
+------------------------------------------+---+-----+-----+
Further sending messages (16:01:55) outside that time window just adds new rows:
Batch: 3
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|2 |666 |500.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|1 |334 |500.5|
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|0 |666 |501.0|
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|1 |668 |500.5|
|[2020-02-11 16:01:00, 2020-02-11 16:02:00]|1 |334 |500.5|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|0 |333 |501.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|2 |333 |500.0|
|[2020-02-11 16:01:00, 2020-02-11 16:02:00]|0 |333 |501.0|
|[2020-02-11 16:01:00, 2020-02-11 16:02:00]|2 |333 |500.0|
+------------------------------------------+---+-----+-----+
Update
Same code as before other than now OutputMode is Update(). At 2020-02-12 06:37:07 we pumped 1000 messages and we start our query with:
Almost immediately we see:
Batch: 0
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-12 06:37:00, 2020-02-12 06:38:00]|1 |334 |500.5|
|[2020-02-12 06:37:00, 2020-02-12 06:38:00]|0 |333 |501.0|
|[2020-02-12 06:37:00, 2020-02-12 06:38:00]|2 |333 |500.0|
+------------------------------------------+---+-----+-----+
Batch: 4
-------------------------------------------
+------------------------------------------+---+-----+-----+
|id |ts |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-12 06:39:00, 2020-02-12 06:40:00]|1 |334 |500.5|
|[2020-02-12 06:39:00, 2020-02-12 06:40:00]|0 |333 |501.0|
|[2020-02-12 06:39:00, 2020-02-12 06:40:00]|2 |333 |500.0|
+------------------------------------------+---+-----+-----+
and then:
Batch: 6
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-12 06:39:00, 2020-02-12 06:40:00]|1 |668 |500.5|
|[2020-02-12 06:39:00, 2020-02-12 06:40:00]|0 |666 |501.0|
|[2020-02-12 06:39:00, 2020-02-12 06:40:00]|2 |666 |500.0|
+------------------------------------------+---+-----+-----+
So, Update represents a rolling aggregation within the most recent time window.
Append
OutputMode.Append is a bit weird.
We have sent messages to Kafka but Spark only shows three empty batches after starting up. We send another 1000 messages at 2020-02-11 16:05:45:
Batch: 3
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-11 16:03:00, 2020-02-11 16:04:00]|2 |333 |500.0|
|[2020-02-11 16:03:00, 2020-02-11 16:04:00]|1 |334 |500.5|
|[2020-02-11 16:03:00, 2020-02-11 16:04:00]|0 |333 |501.0|
+------------------------------------------+---+-----+-----+
Batch: 7
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-11 16:05:00, 2020-02-11 16:06:00]|2 |333 |500.0|
|[2020-02-11 16:05:00, 2020-02-11 16:06:00]|1 |334 |500.5|
|[2020-02-11 16:05:00, 2020-02-11 16:06:00]|0 |333 |501.0|
+------------------------------------------+---+-----+-----+
First, Spark 2.4.0 [addendum: and v2.4.5 as well] doesn't appear to consumer messages until there is a next batch waiting. This appears to be expected but undesirable as JIRA-24156 notes:
consider a streaming aggregation query with watermark-based state cleanup. The watermark is updated after every batch with new data completes. The updated value is used in the next batch to clean up state, and output finalized aggregates in append mode. However, if there is no data, then the next batch does not occur, and cleanup/output gets delayed unnecessarily. This is true for all stateful streaming operators - aggregation, deduplication, joins, mapGroupsWithState.Secondly, Spark does not consume the messages until a time of our withWatermark plus our window functions windowDuration has passed. I didn't see the 16:06:09 messages until after 16:08:28. (16:06-16:07 was the window, plus one minute of watermark plus some time on the Trigger mean it would be some time in 16:08 at the earliest that I see the messages).
I tried to solve this with:
val query = ds.writeStream.format("console").outputMode(OutputMode.Append()).option("truncate", "false").trigger(Trigger.Continuous(10000)).start()
but it doesn't work as now the code throws:
org.apache.spark.sql.AnalysisException: Continuous processing does not support EventTimeWatermark operations.;;
and removing the watermark results in:
org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Writing as Parquet
I run the code that writes Parquet to HDFS with:
--executor-cores 3 --num-executors 2
as Docker seems to use all my CPUs despite telling to to use just 6 (cores x executors). I have 16 cores so using of multiples of 3, the Spark worker can only use 15 giving me one free for my Spark shell.
Anyway, persisting with Complete gave
Data source parquet does not support Complete output mode;
This makes sense as Complete contains all the data and we can't stream it as the only time the data can be frozen in stone is when the stream is no more.
Persisting with Update gave me:
Data source parquet does not support Update output mode;
Since we're updating on the fly with Update, it doesn't make sense to write this to Parquet either as the results that come earlier are rendered incorrect by the results that come after but in the same window.
Persisting with Append works but with the aforementioned caveat that you can't see the data until another batch comes in.
Sorting
To make our diagnosis easier, we might like to sort the batches. Unfortunately,
val query = ds.sort('id).writeStream.format("console") .outputMode(OutputMode.Append()).start()
gives:
org.apache.spark.sql.AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;;
which kinda makes sense as you can't sort a (potentially infinite stream).