Wednesday, September 19, 2018

Spark and Buckets


Unbalanced Partitions

Deliberately trying to get Spark to have an unbalanced partition

val n  = 10000000
val df = sc.range(1 to n).toDF("id")
df.groupByKey{ r => r.getInt(0) / n }.agg(count("id")).show()

seems fine but

val allKeysIn1Partition = df.map { r => 1 -> (r.getInt(0), "A" * 10000)
val repartitioned       = allKeysIn1Partition.repartition($"_1")
val pSizes              = repartitioned.mapPartitions { x => Seq(x.size).iterator }
pSizes.agg(max($"value"))

shows rapid progress for all but the last partition and indeed this query shows all the data lives in one partition.

Buckets

"Buckets can help with the predicate pushdown since every value belonging to one value will end up in one bucket. So if you bucket by 31 days and filter for one day Hive will be able to more or less disregard 30 buckets. Obviously this doesn't need to be good since you often WANT parallel execution like aggregations.

"So to summarize buckets are a bit of an older concept and I wouldn't use them unless I have a clear case for it. The join argument is not that applicable anymore..." (from here).

Note that you can't use bucketing for a simple save. It must be a saveAsTable call (see Laskowski's Mastering Spark SQL for more information) which is for Hive interoperability. Otherwise you'll get a "'save' does not support bucketBy and sortBy right now" error.

Partitioning vs. Bucketing

"Bucketing is another technique for decomposing data sets into more manageable parts" (from here). In Hive, for example, "suppose a table using date as the top-level partition and employee_id as the second-level partition leads to too many small partitions. Instead, if we bucket the employee table and use employee_id as the bucketing column, the value of this column will be hashed by a user-defined number into buckets. Records with the same employee_id will always be stored in the same bucket."

In Spark, "partitionBy ... has limited applicability to columns with high cardinality. In contrast bucketBy distributes data across a fixed number of buckets and can be used when a number of unique values is unbounded" (from the Spark documentation).

No comments:

Post a Comment