Sunday, February 19, 2023

Diagnosing Spark Docker issues

I'm trying to talk on my host machine to a docker container running Spark. Unfortunately, upon connection I see:

Caused by: java.lang.RuntimeException: java.io.InvalidClassException: org.apache.spark.rpc.netty.RpcEndpointVerifier$CheckExistence; local class incompatible: stream classdesc serialVersionUID = 5378738997755484868, local class serialVersionUID = 7789290765573734431

This appears to be something of a known issue with this container.

We get the client code's classpath using SBT [SO]: 

sbt "export runtime:fullClasspath"

and taking the classpath for my module, we run:

serialver -classpath $FROM_ABOVE org.apache.spark.rpc.netty.RpcEndpointVerifier\$CheckExistence

which yields:

org.apache.spark.rpc.netty.RpcEndpointVerifier$CheckExistence:    private static final long serialVersionUID = 5378738997755484868L;

(The error message is coming from Spark master).

On the host, we login to the master container with:

docker exec -it $(docker ps | grep spark-master:3.2.1 | awk '{print $1}') /bin/bash

where we run:

for FILE in $(find spark/jars/) ; do { echo $FILE ; unzip -l $FILE | grep CheckExistence ; } done

and discover the class is in spark/jars/spark-core_2.12-3.2.1.jar. Hmm, the name suggests this Docker container has a Spark instance built with Scala 2.12 and I'm using Scala 3 which is compatible with 2.13 but not (apparently) 2.12.

Mounting the host file system from the container:

docker run -it -v /tmp:/mnt/disk bde2020/spark-master:3.2.1-hadoop3.2 /bin/bash

I copied all the Spark jars to a temporary folder. Then on the host:

CP="" ; for JAR in $(ls /tmp/jars/*.jar) ; do { CP="$CP:$JAR" ; } done
serialver -classpath $CP org.apache.spark.rpc.netty.RpcEndpointVerifier\$CheckExistence

yielded 7789290765573734431L.

So, at this point it appears I am SooL and need to get a new container.

Friday, February 17, 2023

Spark and Pandas

It seems that not just the PySpark driver but also the Spark JVM workers spawns Python daemons (see Obi Orciuch's blog)

Spark keeps a track of Python processes (source). In PythonWorkerFactory we read: "Because forking processes from Java is expensive, we prefer to launch a single Python daemon, pyspark/daemon.py (by default) and tell it to fork new workers for our tasks. This daemon currently only works on UNIX-based systems now because it uses signals for child management, so we can also fall back to launching workers, pyspark/worker.py (by default) directly. "

Indeed, running:

$SPARK_HOME/sbin/start-all.sh
$SPARK_HOME/bin/pyspark  --master=spark://127.0.1.1:7077


and then running in this shell:

something = 'hello ' * 1000000
another_string = spark.sparkContext.parallelize(something)
another_string.map(lambda a: a.upper()).take(100)

During all this, if you run:

watch -d "ps -ef | grep -i python | grep -v ps | grep -v grep"

you'll see a Python instance stared when the PySpark shell but also something like "python3 -m pyspark.daemon". If you find the parent of this process, you'll find it's a Java process called "CoarseGrainedExecutorBackend". This is a cluster worker.

Indeed, if you accidentally use a different Python version in your driver as your workers (the workers will default to the system's version of Python), you'll see errors like this:

Exception: Python in worker has different version 3.6 than that in driver 3.9, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

"Note that all data for a group will be loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied on groups and it is up to the user to ensure that the grouped data will fit into the available memory." (Spark docs)

Also note that Wes McKinny, author of Pandas, notes "my rule of thumb for pandas is that you should have 5 to 10 times as much RAM as the size of your dataset" (albeit in 2017). 

The applyInPandas function is the gateway for Pandas in PySpark but be warned. "This function requires a full shuffle. All the data of a group will be loaded into memory, so the user should be aware of the potential OOM risk if data is skewed and certain groups are too large to fit in memory." (Spark docs)

There is some nice documentation here but theSpark code that passes the Python process all it needs is here in GitHub.