Monday, September 25, 2023

Spark, Kafka and Docker

I want to run a Spark Structured Streaming application that consumes from a Kafka cluster all within Docker. I've finally got it working [messy code here in my GitHub], but it was not without its own pain.

The biggest problem is getting all the components talking to each other. First, you need a bridge network. "In terms of Docker, a bridge network uses a software bridge which allows containers connected to the same bridge network to communicate, while providing isolation from containers which are not connected to that bridge network." [docs]. Think of it as giving your containers their own namespace.

Secondly, the Spark worker needs to connect to Kafka, the Spark master and the Spark driver. The first two are just a matter of mapping the Spark master and Kafka containers in the worker. What's harder is getting the worker to talk to the driver that may be running on the computer that hosts Docker.

One sign you've got it wrong is if you see "Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources" [SO] in your driver logs. This message is a little ambiguous as it may have nothing to do with resources but connectivity. 

Resources seem to be fine

To solve it, you need the driver to define spark.driver.host and spark.driver.port. For the host, we need it to be the magic address of 172.17.0.1. This is the default "IP address of the gateway between the Docker host and the bridge network" [docs]. The port is arbitrary.

[Aside: it's also worth ensuring that the all components are running the exact same version of Spark. I saw a rare error ERROR Inbox: Ignoring error java.lang.AssertionError: assertion failed: CPUs per task should be > 0 and the only thing Google produced was this Bitnami ticket. Ensuring all version were the same made it go away.]

What's more, the worker needs these in its config. You can pass it the host and port with something like SPARK_WORKER_OPTS="-Dspark.driver.host=172.17.0.1 -Dspark.driver.port=SPARK_DRIVER_PORT" in its start up script.

But there is one last gotcha. If still can't get things to work, you might want to login to your worker container and run netstat. If you see the connection to the driver in a state of SYN_SENT, your firewall on the host is probably blocking the connection from the container.

Annoyingly, you probably won't see any error messages being puked from the Driver. It will just hang somwhere near org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:929). I only started seeing error messages when I aligned all version of Spark (see above) and it read: java.io.IOException: Connecting to /172.17.0.1:36909 timed out (120000 ms) 

Looking in that Docker container showed:

bash-5.0# netstat -nap
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name   
...
tcp        0      1 192.168.192.7:53040     172.17.0.1:36909        SYN_SENT    1109/java

and on the host machine where my process is 29006:

(base) henryp@adele:~$ netstat -nap | grep 36909
tcp6       0      0 172.17.0.1:36909        :::*                    LISTEN      29006/java  

Aha, that looks like the problem. It turns out that I have to open the firewall for the block manager too and set a static port for it on the Driver with spark.driver.blockManager.port.

Finally, you should be able to have a Spark master and worker plus Kafka instances all running within Docker along with the driver running on the host using your favourite IDE.