Wednesday, February 12, 2020

Spark Structured Streaming Behaviour - part 1


The Environment

Once again, I use docker containers (as outlined in a previous post here) to give me Kafka, Spark and HDFS.

Things might not work out-of-the-box, so here are some tips:

Check the number of brokers with (SO):

echo dump | nc localhost 2181 | grep brokers

You can check the number of messages in Kafka (SO) with:

$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:32773 --topic test_datum --time -1
test_datum:0:10000

The Code

The toy code can be found here and looks something like:

import uk.co.odinconsultants.sssplayground.kafka.Consuming._
import uk.co.odinconsultants.sssplayground.joins.RunningAverageMain._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}

val stream  = streamStringsFromKafka(spark, kafkaUrl = "192.168.6.1:32773", topicName = "test_datum", parsingDatum).withWatermark("ts", "1 minute")

val ds      = stream.groupBy(window('ts, "1 minute", "1 minute"), 'id).agg(count('id), mean('amount)).toDF("timestamp", "id", "count", "mean")

Note that we must groupBy on the window:
The manual states: withWatermark must be called on the same column as the timestamp column used in the aggregate. For example, 
df.withWatermark("time", "1 min").groupBy("time2").count() 
is invalid in Append output mode, as watermark is defined on a different column from the aggregation column. Simply stated, for Append you need WaterMark.
[StackOverflow]

We set the trigger processing time to be 10s (see below) which is fine for our quantity of data. If it's too short for your data, you'll see WARNings like:

2020-02-12 14:40:26 WARN  ProcessingTimeExecutor:66 - Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 14605 milliseconds

We use Trigger.Continuous as obviously we don't want Once in this use case and Continuous gives:

Unknown type of trigger: ContinuousTrigger(10000)

if we're writing to Parquet (SO). This makes sense as Parquet as columnar so we need to need a relatively large set of data before we can write it.


Methodology

Between each test, we delete the Kafka topic with:

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic  test_datum

where test_datum is the name of our topic.


OutputMode

There are three OutputModes: Complete, Append and Update. We'll start with conceptually the easiest.


Complete

The code to start the query is this:

val query = ds.writeStream.format("console").outputMode(OutputMode.Complete()).option("truncate", "false").trigger(Trigger.ProcessingTime(10000)).start()

At 2020-02-11 15:57:57 we pumped in 1000 messages into Kafka over 3 keys with this script  and saw:

Batch: 0
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp                                 |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|1  |334  |500.5|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|0  |333  |501.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|2  |333  |500.0|
+------------------------------------------+---+-----+-----+

At 15:59:19, we pump in some more:

Batch: 1
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp                                 |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|2  |333  |500.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|1  |334  |500.5|
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|0  |333  |501.0|
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|1  |334  |500.5|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|0  |333  |501.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|2  |333  |500.0|
+------------------------------------------+---+-----+-----+

and then quickly and within the same minute (at precisely 15:59:34) we pump in 1000 more:

Batch: 2
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp                                 |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|2  |666  |500.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|1  |334  |500.5|
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|0  |666  |501.0|
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|1  |668  |500.5|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|0  |333  |501.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|2  |333  |500.0|
+------------------------------------------+---+-----+-----+

Look! Our values for this time window increased.

Further sending messages (16:01:55) outside that time window just adds new rows:

Batch: 3
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp                                 |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|2  |666  |500.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|1  |334  |500.5|
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|0  |666  |501.0|
|[2020-02-11 15:59:00, 2020-02-11 16:00:00]|1  |668  |500.5|
|[2020-02-11 16:01:00, 2020-02-11 16:02:00]|1  |334  |500.5|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|0  |333  |501.0|
|[2020-02-11 15:57:00, 2020-02-11 15:58:00]|2  |333  |500.0|
|[2020-02-11 16:01:00, 2020-02-11 16:02:00]|0  |333  |501.0|
|[2020-02-11 16:01:00, 2020-02-11 16:02:00]|2  |333  |500.0|
+------------------------------------------+---+-----+-----+

But with each batch, we show all the data every collected, albeit aggregated.


Update

Same code as before other than now OutputMode is Update(). At 2020-02-12 06:37:07 we pumped 1000 messages  and we start our query with:

Almost immediately we see:

Batch: 0
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp                                 |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-12 06:37:00, 2020-02-12 06:38:00]|1  |334  |500.5|
|[2020-02-12 06:37:00, 2020-02-12 06:38:00]|0  |333  |501.0|
|[2020-02-12 06:37:00, 2020-02-12 06:38:00]|2  |333  |500.0|
+------------------------------------------+---+-----+-----+

We do some more testing and some Batch have similar values and some Batch are empty. But at 06:39:33 and 06:39:50, we pump another 1000 messages each into Kafka and see first:

Batch: 4
-------------------------------------------
+------------------------------------------+---+-----+-----+
|id                                        |ts |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-12 06:39:00, 2020-02-12 06:40:00]|1  |334  |500.5|
|[2020-02-12 06:39:00, 2020-02-12 06:40:00]|0  |333  |501.0|
|[2020-02-12 06:39:00, 2020-02-12 06:40:00]|2  |333  |500.0|
+------------------------------------------+---+-----+-----+

and then:

Batch: 6
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp                                 |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-12 06:39:00, 2020-02-12 06:40:00]|1  |668  |500.5|
|[2020-02-12 06:39:00, 2020-02-12 06:40:00]|0  |666  |501.0|
|[2020-02-12 06:39:00, 2020-02-12 06:40:00]|2  |666  |500.0|
+------------------------------------------+---+-----+-----+

(Batch: 5 was empty)

So, Update represents a rolling aggregation within the most recent time window.


Append

OutputMode.Append is a bit weird.

We have sent messages to Kafka but Spark only shows three empty batches after starting up. We send another 1000 messages at 2020-02-11 16:05:45:

Batch: 3
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp                                 |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-11 16:03:00, 2020-02-11 16:04:00]|2  |333  |500.0|
|[2020-02-11 16:03:00, 2020-02-11 16:04:00]|1  |334  |500.5|
|[2020-02-11 16:03:00, 2020-02-11 16:04:00]|0  |333  |501.0|
+------------------------------------------+---+-----+-----+

But these don't appear to be the most recent messages as their time window is not what we expect. So, we send 1000 more at 16:06:09 but the next 3 batches are empty again. So, we send 1000 more at 16:07:01 but note that it's our 16:06:09 messages that come through:

Batch: 7
-------------------------------------------
+------------------------------------------+---+-----+-----+
|timestamp                                 |id |count|mean |
+------------------------------------------+---+-----+-----+
|[2020-02-11 16:05:00, 2020-02-11 16:06:00]|2  |333  |500.0|
|[2020-02-11 16:05:00, 2020-02-11 16:06:00]|1  |334  |500.5|
|[2020-02-11 16:05:00, 2020-02-11 16:06:00]|0  |333  |501.0|
+------------------------------------------+---+-----+-----+

There appears to be two things at work here:

First, Spark 2.4.0 [addendum: and v2.4.5 as well] doesn't appear to consumer messages until there is a next batch waiting. This appears to be expected but undesirable as JIRA-24156 notes:
consider a streaming aggregation query with watermark-based state cleanup. The watermark is updated after every batch with new data completes. The updated value is used in the next batch to clean up state, and output finalized aggregates in append mode. However, if there is no data, then the next batch does not occur, and cleanup/output gets delayed unnecessarily. This is true for all stateful streaming operators - aggregation, deduplication, joins, mapGroupsWithState.
Secondly, Spark does not consume the messages until a time of our withWatermark plus our window functions windowDuration has passed. I didn't see the 16:06:09 messages until after 16:08:28. (16:06-16:07 was the window, plus one minute of watermark plus some time on the Trigger mean it would be some time in 16:08 at the earliest that I see the messages).

I tried to solve this with:

val query = ds.writeStream.format("console").outputMode(OutputMode.Append()).option("truncate", "false").trigger(Trigger.Continuous(10000)).start()

but it doesn't work as now the code throws:

org.apache.spark.sql.AnalysisException: Continuous processing does not support EventTimeWatermark operations.;;

and removing the watermark results in:

org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;


Writing as Parquet

I run the code that writes Parquet to HDFS with:

--executor-cores 3 --num-executors 2 

as Docker seems to use all my CPUs despite telling to to use just 6 (cores x executors). I have 16 cores so using of multiples of 3, the Spark worker can only use 15 giving me one free for my Spark shell.

Anyway, persisting with Complete gave

Data source parquet does not support Complete output mode; 

This makes sense as Complete contains all the data and we can't stream it as the only time the data can be frozen in stone is when the stream is no more.

Persisting with Update gave me:

Data source parquet does not support Update output mode;

Since we're updating on the fly with Update,  it doesn't make sense to write this to Parquet either as the results that come earlier are rendered incorrect by the results that come after but in the same window.

Persisting with Append works but with the aforementioned caveat that you can't see the data until another batch comes in.


Sorting

To make our diagnosis easier, we might like to sort the batches. Unfortunately,

val query = ds.sort('id).writeStream.format("console") .outputMode(OutputMode.Append()).start()

gives:

org.apache.spark.sql.AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;;

which kinda makes sense as you can't sort a (potentially infinite stream).


Saturday, February 8, 2020

Cloud devops (part 2)


Kubernetes in 30 seconds

A container is an isolated application. For example, it may be Spark in what looks like its own OS.

A pod is the smallest unit of deployment and has one to many containers.

A replication controller maintains the requisite number of pods.

A service provides an API to external tools to manage the whole configuration, handles things like networking and lifetime management. "Headless-services allow us to reach each Pod directly, rather than the service acting as a load-balancer or proxy." [dev.to]

A namespace is a totally isolated logical area in which to run all of the above.

(This [YouTube] is a good 5 minute overview from VMWare that also talks about the kubelets - the primary “node agent” that runs on each node - and how they talk to the apiserver).


Google Kubernetes and Fabric8

Create a cluster in the "Kubernetes Engine" tab of your Google account. This can take about 5 minutes.

You will need to login:

$ gcloud auth login

and get the Google Kubernetes credentials:

$ gcloud container clusters get-credentials standard-cluster-1 --zone us-central1-a --project YOUR_PROJECT_NAME

Now, when you run:

$ kubectl config view

you'll see your Google config (as well as your MiniKube config if you've been using that already).

Now click on your Google Kubernetes cluster in the browser and find the 'Endpoint' value. Paste this into your Java code thus:

        String master = "https://YOUR_ENDPOINT:443/";
        if (args.length == 1) {
            master = args[0];
        }

        Config config = new ConfigBuilder().withMasterUrl(master).build();
        try (final KubernetesClient client = new DefaultKubernetesClient(config)) {
...

and you're good to go. I had Fabric8's FullExample running locally on my laptop creating and destroying pods in my Google hosted Kubernetes cluster.


Kubernetes cheats

Deleting a pod is not quite so straightforward as:

$ kubectl delete pods hello-minikube-797f975945-kntql
pod "hello-minikube-797f975945-kntql" deleted

(see this SO answer). If we've assigned replicas then a new pod may be started.

$ kubectl get pods
NAME                              READY   STATUS    RESTARTS   AGE
hello-minikube-797f975945-rnqwc   1/1     Running   0          4m49s

Eh? Where did that come from?

You need to set the replicas to 0 with something like:

$ kubectl get all
NAME                                  READY   STATUS    RESTARTS   AGE
pod/hello-minikube-797f975945-rnqwc   1/1     Running   0          5m56s

NAME                     TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)          AGE
service/hello-minikube   NodePort    10.96.206.25           8080:32689/TCP   112m
service/kubernetes       ClusterIP   10.96.0.1              443/TCP          116m

NAME                             READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/hello-minikube   1/1     1            1           112m

NAME                                        DESIRED   CURRENT   READY   AGE
replicaset.apps/hello-minikube-797f975945   1         1         1       112m
$ kubectl scale --replicas=0 deployment.apps/hello-minikube

(from this SO answer)

In production, it might be useful to login to running containers. You can do this with:

kubectl exec --stdin --tty shell-demo -- /bin/bash

(From the Kubernes docs)

Monday, February 3, 2020

Exactly once..?


Late to the party, I read this (Exactly Once Support in Kafka) where the author quotes from this Reddit chat.

"Another objection I’ve heard to this is that it isn’t really “exactly once” but actually “effectively once”. I don’t disagree that that phase is better (though less commonly understood) but I’d point out that we’re still debating the definitions of undefined terms!
"...To my mind this is what people mean by exactly-once delivery in the context of pub/sub messaging [my emphasis]: namely that you can publish messages and they will be delivered one time exactly by one or more receiving application." [Jay Kreps on Medium]

"Kafka did not solve the Two Generals Problem... To achieve exactly-once processing semantics, we must have a closed system with end-to-end support for modeling input, output, and processor state as a single, atomic operation... Kafka provides exactly-once processing semantics because it’s a closed system. There is still a lot of difficulty in ensuring those semantics are maintained across external services, but Confluent attempts to ameliorate this through APIs and tooling. But that’s just it: it’s not exactly-once semantics in a building block that’s the hard thing, it’s building loosely coupled systems that agree on the state of the world." [BraveNewGeek]

Idempotency is the degenerate state, ie an example that actually belongs to another, probably simpler class (a triangle with one side of zero length is a degenerate case in that it's actually a much simpler geometric structure - namely, a line).

Like the Monty Hall problem, the answer depends on the exact semantics of the question.

Confluent have given Kafka "effectively once" semantics by adding a monotonically incrementing ID to each message (much like TCP does). See "what exactly once semantics mean in Apache Kafka, why it is a hard problem, and how the new idempotence and transactions features in Kafka enable correct exactly once stream processing using Kafka’s Streams API." [Confluent Blog]

- The producer send operation is now idempotent.
- Kafka now supports atomic writes across multiple partitions through the new transactions API... on the Consumer side, you have two options for reading transactional messages, expressed through the “isolation.level” consumer config

The problem:

"The broker can crash after writing a message but before it sends an ack back to the producer. It can also crash before even writing the message to the topic. Since there is no way for the producer to know the nature of the failure, it is forced to assume that the message was not written successfully and to retry it. In some cases, this will cause the same message to be duplicated in the Kafka partition log, causing the end consumer to receive it more than once... The producer send operation is now idempotent."

"Lastly, ordering is only guaranteed if max.in.flight.requests.per.connection == 1:

"Producer configuration settings from the Apache Kafka documentation: max.in.flight.requests.per.connection (default: 5): The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled)."
[SO]

But although Confluent has given Kafka exactly once semantics, these semantics do not extend outside of its closed system.

From Confluent again: "Even when they use Kafka as a source for stream processing and need to recover from a failure, they can only rewind their Kafka offset to reconsume and reprocess messages, but cannot rollback the associated state in an external system, leading to incorrect results when the state update is not idempotent.”

"a consumer reads messages from a Kafka topic, some processing logic transforms those messages [and] writes the resulting messages to another Kafka." This is a closed system.

Conclusion

TCP also handles dupes and gives idempotency but just because your app uses TCP it doesn't mean it is idempotent.


Saturday, February 1, 2020

Kafka, Spark and HDFS in Docker on one Laptop


Starting Spark, HDFS and Kafka all in a Docker-ised environment is very convenient but not without its niggles. Here's what I did to run a Spark Structured Streaming app on my laptop.

Start a Kafka/ZK cluster in Docker following this link [GitHub] and for Spark/HDFS, try here [GitHub]. Note that in the Kafka/ZK config, you will have to change the value for KAFKA_ADVERTISED_HOST_NAME in docker-compose.yml to correspond to your computer each time you fire it up.     

Note that Docker creates a virtual network. You can add virtual networks to containers [SO] but you don't need to worry about that if you use docker-compose up -d. The -d switch prevents the containers output being spurged to stdout.

You can test Zookeeper is indeed up and running with:

$ echo ruok | nc localhost 2181
imok$

Note that on the host machine, you can run:

$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
test_topic

And see the internal topic __consumer_offsets has automatically been created to store offsets. The topic test_topic is something I made that we'll need later.

We can even jump onto the Kafka container and watch the logs:

$ docker exec -it kafkadocker_kafka_1 bash
bash-4.4# tail -f /opt/kafka_2.12-2.4.0/logs/server.log

Let's now see who has the Kafka port:

$ docker ps | grep 9092
02fc5122f6e2        kafkadocker_kafka                                "start-kafka.sh"         9 minutes ago       Up 9 minutes             0.0.0.0:32770->9092/tcp                                    kafkadocker_kafka_1

Note that this means that the Kafka server is listening on the host OS on port 32770 not 9092. We'll need this to tell Spark where Kafka is.

Note the Spark worker has just 1gb of heap.

docker exec -it dockerhadoopsparkworkbench_spark-worker_1 bash
root@a68aff72a10f:/# jps
229 Worker
1178 Jps
root@a68aff72a10f:/# cat /proc/229/cmdline
/docker-java-home/bin/java-cp/spark//conf/:/spark/jars/*:/etc/hadoop/:/opt/hadoop-2.8.0/share/hadoop/common/lib/*:/opt/hadoop-2.8.0/share/hadoop/common/*:/opt/hadoop-2.8.0/share/hadoop/hdfs/:/opt/hadoop-2.8.0/share/hadoop/hdfs/lib/*:/opt/hadoop-2.8.0/share/hadoop/hdfs/*:/opt/hadoop-2.8.0/share/hadoop/yarn/lib/*:/opt/hadoop-2.8.0/share/hadoop/yarn/*:/opt/hadoop-2.8.0/share/hadoop/mapreduce/lib/*:/opt/hadoop-2.8.0/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar-Xmx1gorg.apache.spark.deploy.worker.Worker--webui-port8081spark://spark-master:7077root@a68aff72a10f:/#

If you want to execute a Spark shell, run:

$ docker exec -it spark-master /bin/bash ./spark/bin/spark-shell --master spark://spark-master:7077

Or, if you want to include a JAR:

docker run --rm -it --network dockerhadoopsparkworkbench_default --env-file ./hadoop.env -e SPARK_MASTER=spark://spark-master:7077 --volume  /home/henryp/Code/Scala/MyCode/SSSPlayground/target/:/example bde2020/spark-base:2.4.0-hadoop2.8-scala2.12 /spark/bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.0  --jars /example/SSSPlayground-1.0-SNAPSHOT-jar-with-dependencies.jar --master spark://spark-master:7077

Docker creates a virtual network and you can see that with:

$ docker network ls
NETWORK ID          NAME                                 DRIVER              SCOPE
2f804fb10173        bridge                               bridge              local
caeab723a6c7        dockerhadoopsparkworkbench_default   bridge              local
dbb8f4df303a        host                                 host                local
b8ba799f4916        kafkadocker_default                  bridge              local
a15ee4bf8c1f        kafkasparkhadoopzk_default           bridge              local
d796a747993d        none                                 null                local

The network bridge is the default but we'll use the network in which the Spark containers sit when we deploy the local uber-jar with:

docker run --rm -it --network dockerhadoopsparkworkbench_default --env-file ./hadoop.env -e SPARK_MASTER=spark://spark-master:7077 --volume  /home/henryp/Code/Scala/MyCode/SSSPlayground/target/:/example bde2020/spark-base:2.4.0-hadoop2.8-scala2.12 /spark/bin/spark-submit --class=uk.co.odinconsultants.sssplayground.windows.ConsumeKafkaMain --master spark://spark-master:7077  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.0  /example/SSSPlayground-1.0-SNAPSHOT-jar-with-dependencies.jar 10.107.222.63:32770 test_topic /streaming_test 600000

using paths relevent to you. This creates another container called spark-base that lives for just the duration of the app. For what it's worth, the code lives here [GitHub].

This command line takes some explaining:

  • We're getting docker to mount a directory with the --volume switch. 
  • To have Spark streaming from Kafka you need the dependency defined with the --packages switch. This is nothing to do with Docker but is essential for Spark and Kafka to talk to each other.
  • The address, 10.107.222.63, is my host OS's IP. 
  • Finally, we need to tell Spark where the Kafka bootstrap servers are and this is port 32770 we saw earlier.
Pumping messages through Kafka from the host OS leads to data accumulating in HDFS and we can see that with:

$ docker exec -it dockerhadoopsparkworkbench_datanode_1  hadoop fs -du -h /
946.7 M  /streaming_test
1.9 K    /streaming_testcheckpoint
47.7 K   /tmp

So, evidently things are working.