It seems that not just the PySpark driver but also the Spark JVM workers spawns Python daemons (see Obi Orciuch's blog)
Spark keeps a track of Python processes (source). In PythonWorkerFactory we read: "Because forking processes from Java is expensive, we prefer to launch a single Python daemon, pyspark/daemon.py (by default) and tell it to fork new workers for our tasks. This daemon currently only works on UNIX-based systems now because it uses signals for child management, so we can also fall back to launching workers, pyspark/worker.py (by default) directly. "
Indeed, running:
$SPARK_HOME/sbin/start-all.sh
$SPARK_HOME/bin/pyspark --master=spark://127.0.1.1:7077
and then running in this shell:
something = 'hello ' * 1000000
another_string = spark.sparkContext.parallelize(something)
another_string.map(lambda a: a.upper()).take(100)
watch -d "ps -ef | grep -i python | grep -v ps | grep -v grep"
you'll see a Python instance stared when the PySpark shell but also something like "python3 -m pyspark.daemon". If you find the parent of this process, you'll find it's a Java process called "CoarseGrainedExecutorBackend". This is a cluster worker.
"Note that all data for a group will be loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied on groups and it is up to the user to ensure that the grouped data will fit into the available memory." (Spark docs)
Also note that Wes McKinny, author of Pandas, notes "my rule of thumb for pandas is that you should have 5 to 10 times as much RAM as the size of your dataset" (albeit in 2017).
The applyInPandas function is the gateway for Pandas in PySpark but be warned. "This function requires a full shuffle. All the data of a group will be loaded into memory, so the user should be aware of the potential OOM risk if data is skewed and certain groups are too large to fit in memory." (Spark docs)
There is some nice documentation here but theSpark code that passes the Python process all it needs is here in GitHub.
No comments:
Post a Comment