Sunday, April 30, 2023

Yet more Spark tips

Backfilling

You can use inline or inline_outer (the latter outputting nulls [SO]) to explode an array of structs into a table [PySpark docs]. Combined with sequence, you can generate new rows. This is very useful for backfilling data. 

Leveraging this StackOverflow answer, I could backfill data with this Spark SQL:

inline_outer(
    transform(
        sequence(0,
                 int(datediff(2022-10-23, END_DATE)),
                 7), 
        i -> (date_add(END_DATE, i) as EVENT_DATE,
              IF(i=0, Y, YHAT) as YHAT)
    )
)

This would create new rows for each week (7) beween a date in column END_DATE and 23 October 2022. For each row we generate, we get that date and a value Y if it's the first new date or YHAT if not.

Combine this with the columns we want fixed with:

    df = df.selectExpr(LIST_OF_COLUMNS + [SQL_STRING_ABOVE])

The keyword transform maps elements in an array using a user-defined function [docs]. The array can be generated by sequence which simply generates an array of elements that have typesafety - that is, if you were adding a sequence of integers to a date, it will yield increasing dates.

Reading RDBMS from Spark

Spark defaults to one JVM reading the data from a call to an RDBMS. It cannot possibly know how to divide the work a priori. However, you can get it to partition the workload if you tell it how - see here

Skew

Skew can now be autohandled apparently (see the Spark docs) since version 3.0.

Partitions and OOMEs

You don't need to just call .collect() to see:

org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 16 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

This can happen when there are too many partitions [StackOverflow] and the amount of metadata overwhelms the driver. The solution is to use fewer partitions or add more driver memory.

Further notes on diagnosing this can be found in a previous post here.

PyArrow does not come as standard

"Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using the pandas_udf as a decorator or to wrap the function, and no additional configuration is required. A Pandas UDF behaves as a regular PySpark function API in general." [Spark docs

However, in Palantir's Foundry, this code gives:

  File "/myproject/datasets/_PH_Test.py", line 12, in test_ph
    @pandas_udf("col1 string, col2 long")
  File "/scratch/asset-install/6add7b36ad350d9f0c07885622f2e3ae/miniconda36/lib/python3.6/site-packages/pyspark/sql/pandas/functions.py", line 332, in pandas_udf
    require_minimum_pyarrow_version()
  File "/scratch/asset-install/6add7b36ad350d9f0c07885622f2e3ae/miniconda36/lib/python3.6/site-packages/pyspark/sql/pandas/utils.py", line 56, in require_minimum_pyarrow_version
    "it was not found." % minimum_pyarrow_version) from raised_error
ImportError: PyArrow >= 1.0.0 must be installed; however, it was not found.

Locally, trying to install it initially met with failure:

$ pip install pyarrow
...
    CMake Error at /home/henryp/Downloads/Temp/cmake-3.23.1-linux-x86_64/share/cmake-3.23/Modules/FindPackageHandleStandardArgs.cmake:230 (message):
      Could NOT find Arrow (missing: ARROW_INCLUDE_DIR ARROW_LIB_DIR
      ARROW_FULL_SO_VERSION ARROW_SO_VERSION)
...

Upgrading pip solved that one (pip install --upgrade pip)