Friday, October 23, 2020

Optimizing Spark/DataBricks in Azure

We have about 1.5TB of parquet data in Gen 2 storage on Azure and we're trying to read it with a DataBricks cluster of 8 nodes each with 56gb and 16 cores. All I'm trying to do is some basic select('aColumn).distinct and where('anotherColumn < X).count()s.

Surprisingly, this is running like a dog: the first query takes about 50 minutes and the second about 18. Tuning is an empirical science but this felt wrong as 1.5TB of data is not that much these days.

Ganglia Metrics

Ganglia showed that the cluster was running at high capacity although the network usage seemed low. If this is showing the data being pulled from Gen2 into Spark workers, at 300mb/s it would take about an hour to pull it all out of storage. Indeed, this is roughly how long it took. 

However, when I took thread dumps, the executor threads were not waiting on IO. Typically, they were in Spark's CompressibleColumnBuilder (which, according to the docs, "builds optionally compressed byte buffer for a column"). So, it seemed that the cluster really was CPU-bound on parsing the files and garbage collection - which was bad but not horrendous:

Spark GC logs

Calling cache() on the data didn't seem to make queries much faster. Looking at the thread dumps again showed the threads spending a lot of time in CompressibleColumnBuilder for the initial select distinct and in java.io.FileInputStream for the subsequent counts. 

Remembering that  persisting with a storage level of MEMORY_AND_DISK_SER had given me a quick win in the past, I thought I'd try it again but with no success.

I tried enabling the Delta Cache. This is supposed to cache the Gen2 data in the cluster:
Storage Tab

Compare this to the storage tab when we're not using Delta Cache and you'll see there won't be any data in the "Parquet IO Cache". (Note that the page says "RDDs" when we're only using the Dataset API. This is standard). Anyway, enabling Delta Cache seemed to make no difference at all.

Using a "Delta Cache Accelerated" cluster actually made the initial distinct.count() take much longer (1.47 hours) with only marginally reduced time for the next query (16.8 minutes).

Disk is incredibly slow compared to memory. Ultimately, I was trying to process too much data with too little RAM and there's only so far tuning will get you before your metrics plateau. So, I made a cluster of 25 nodes each with 28gb of RAM and 8 cores. This did help but not as much as I was expecting. The distinct.count() now took 38 minutes and the where(...).count() 12 minutes. This is between a 25%-33% improvement but still seems a long time in absolute terms.

I want my Data Locality 

The problem appears to be that all the data is pulled to the executors even if we're only interested in just one column [see StackOverflow]. Evidence for this came from another look at Ganglia:

Ganglia Metrics during second query
This was taken after the distinct.count() and during the where(...).count() query. Note that there is no more network activity, suggesting the first query cached the data.

"In Azure really fast networks compensate for having data and compute separated." [ibid] Indeed, the threads seemed to spend all their time on the first call in code to parse the Parquet rather than blocking on IO. But you can't get over the fact that my two queries are doing much the same thing (no joins) but the first takes three times longer than the second. To demonstrate, instead of two different queries, I ran the distinct.count() query twice. The first time took 42 minutes, the second 15. So, it appears that pulling 1.5TB of data from Gen2 into Spark takes about 25 minutes.

Azure is not the only cloud offering that implements this architectural trick. GCP takes all the data out of storage and puts it into an ephemeral HDFS cluster.

Solution

A new day, a new dawn. Let's start the cluster again and go from the beginning. This time, I won't explicitly enable IO cache and I won't even call .cache() on my DataFrame. To my amazement, the same query ran in about 30 seconds today! It seemed that this .cache() call was the root of my problems as re-introducing it returned us to a 50 minute wait. This is pretty much the opposite of what you'd expect with an on-prem Hadoop/YARN cluster.

No comments:

Post a Comment