Friday, February 17, 2023

Spark and Pandas

It seems that not just the PySpark driver but also the Spark JVM workers spawns Python daemons (see Obi Orciuch's blog)

Spark keeps a track of Python processes (source). In PythonWorkerFactory we read: "Because forking processes from Java is expensive, we prefer to launch a single Python daemon, pyspark/daemon.py (by default) and tell it to fork new workers for our tasks. This daemon currently only works on UNIX-based systems now because it uses signals for child management, so we can also fall back to launching workers, pyspark/worker.py (by default) directly. "

Indeed, running:

$SPARK_HOME/sbin/start-all.sh
$SPARK_HOME/bin/pyspark  --master=spark://127.0.1.1:7077


and then running in this shell:

something = 'hello ' * 1000000
another_string = spark.sparkContext.parallelize(something)
another_string.map(lambda a: a.upper()).take(100)

During all this, if you run:

watch -d "ps -ef | grep -i python | grep -v ps | grep -v grep"

you'll see a Python instance stared when the PySpark shell but also something like "python3 -m pyspark.daemon". If you find the parent of this process, you'll find it's a Java process called "CoarseGrainedExecutorBackend". This is a cluster worker.

Indeed, if you accidentally use a different Python version in your driver as your workers (the workers will default to the system's version of Python), you'll see errors like this:

Exception: Python in worker has different version 3.6 than that in driver 3.9, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

"Note that all data for a group will be loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied on groups and it is up to the user to ensure that the grouped data will fit into the available memory." (Spark docs)

Also note that Wes McKinny, author of Pandas, notes "my rule of thumb for pandas is that you should have 5 to 10 times as much RAM as the size of your dataset" (albeit in 2017). 

The applyInPandas function is the gateway for Pandas in PySpark but be warned. "This function requires a full shuffle. All the data of a group will be loaded into memory, so the user should be aware of the potential OOM risk if data is skewed and certain groups are too large to fit in memory." (Spark docs)

There is some nice documentation here but theSpark code that passes the Python process all it needs is here in GitHub.

No comments:

Post a Comment