Saturday, August 10, 2019

Spark in Anger


Why doesn't Dataset have reduceByKey?

Well, it does have a reduce these days but it's marked as 'experimental'. StackOverflow says use a groupByKey instead then calling reduceGroups on the subsequent KeyValueGroupedDataset. Alternatively, this SO post says use a groupBy followed by an agg.


Too many files/buckets

If you try to write a Dataset using bucketBy, you might get too many files, as seen on the Spark mailing list here. The solution is that "you need to first repartition ... since each task will write out the buckets/files. If the bucket keys are distributed randomly across the RDD partitions, then you will get multiple files per bucket."


Making the stats of a Parquet file visible

No need for mr-parquet-tools (which didn't seem to want to show stats no matter how much coaxing even when setting the parquet.strings.signed-min-max.enabled Hadoop configuration). Using the Dataframe.describe method you can see the minimum and maximum values. The trick is to load each .parquet file as a Dataframe even if it is actually part of a greater whole as we do here.

It seemed that there needed to be 400 files created before a file would be shared by different values of the sortBy column. I don't know at the moment which configuration key controls this behaviour.


Deterministic Order?

The method zipWithIndex is odd. It is contiguous but not necessarily deterministic. This latter link shows that the index may change on re-evaluation [briefly: a zipWithIndex on an RDD that is then joined to itself does not necessarily have tuples that have both elements are equal to each other. This is surprising but not a bug since the RDD was never ordered before the call to zipWithIndex and therefore there are no guarantees].

None other than Sean Owen comments on StackOverflow:

"An RDD has a guaranteed order if it is the result of a sortBy operation, for example. An RDD is not a set; it can contain duplicates. Partitioning is not opaque to the caller, and can be controlled and queried. Many operations do preserve both partitioning and order, like map. That said I find it a little easy to accidentally violate the assumptions that zip depends on, since they're a little subtle."

This is just like SQL. Unless you order by, you can make no guarantees on the order even if it often presents results in order.


Co-location vs. co-partitioned

"The lack of a shuffle does not mean that no data will have to be moved between nodes. It's possible for two RDDs to have the same partitioner (be co-partitioned) yet have the corresponding partitions located on different nodes (not be co-located).  This situation is still better than doing a shuffle, but it's something to keep in mind. Co-location can improve performance, but is hard to guarantee." [StaclOverflow]


Badly written queries

I come across a lot of "why does Spark run out of memory when it's supposed to be scalable?" questions. This is often because the chosen algorithm simply does not scale. Here are some silly mistakes I've seen:
  • Do a word count by concatenating the records and at the end, count the words in the accumulated corpus.
  • Counting the number of permutations of a collection (ie, the number of power sets) by generating them. Note that this can work OK when the number of elements are small and so can pass testing only to blow up when faced with real, production data. (The solution is simply to calculate 2x where x is the size of the original collection).

OutOfMemoryErrors in machine learning

...is often because of how the ML libraries have been written - badly in some cases. A lot of the work is done in the driver (SO). The standard, first approach (as mentioned in the official docs here) follows:

"Sometimes, you will get an OutOfMemoryError not because your RDDs don’t fit in memory, but because the working set of one of your tasks, such as one of the reduce tasks in groupByKey, was too large. Spark’s shuffle operations (sortByKey, groupByKey, reduceByKey, join, etc) build a hash table within each task to perform the grouping, which can often be large. The simplest fix here is to increase the level of parallelism, so that each task’s input set is smaller. Spark can efficiently support tasks as short as 200 ms, because it reuses one executor JVM across many tasks and it has a low task launching cost, so you can safely increase the level of parallelism to more than the number of cores in your clusters."


No comments:

Post a Comment