Wednesday, November 18, 2015

Interpreting Spark

Spark has a nice web interface that allows you to find problems in the jobs you submit to it. Here are some notes I made on using it. First, let's clarify some terminology:

Task, Stage, Job

"Invoking an action inside a Spark application triggers the launch of a Spark job to fulfill it...  The execution plan consists of assembling the job’s transformations into stages. A stage corresponds to a collection of tasks [map, flatMap etc] that all execute the same code, each on a different subset of the data. Each stage contains a sequence of transformations that can be completed without shuffling the full data.

"At each stage boundary, data is written to disk by tasks in the parent stages and then fetched over the network by tasks in the child stage. Because they incur heavy disk and network I/O, stage boundaries can be expensive and should be avoided when possible." [1]

Narrow and Wide Transformation

"For the RDDs returned by so-called narrow transformations like map and filter, the records required to compute the records in a single partition reside in a single partition in the parent RDD...  Spark also supports transformations with wide dependencies such as groupByKey and reduceByKey. In these dependencies, the data required to compute the records in a single partition may reside in many partitions of the parent RDD" [1]

And this is what can induce a shuffle. Imagine a diagram with time running down the Y-axis. Now imagine this diagram having vertical columns representing a partition at various points in time with horizontal slices representing RDDs in which these partitions live, stacked such that parents are (naturally) above children. Now, if we draw arrows from the data points in one RDD/Partition combo to the data on which it relies, we'd hope those arrows remain in the same vertical partition stream. If they cross streams, a shuffle ensues.

The Web GUI

You may see Jobs that have Skipped Stages. This is nothing to worry about. From the code:

    This may be an underestimate because the job start event references all of the result
    stages' transitive stage dependencies, but some of these stages might be skipped if their
    output is available from earlier runs.
    See for a more extensive discussion.

When looking at my own application's performance (open source finance toy & tool found here on GitHub), a DAG (directed, acyclic graphs)  may look like this:

My application takes end-of-day stock prices of two different companies and runs a Pearson correlation on them.

In the picture above we see the details of Job 1. Stages 4,5 and 6 are skipped as they were already loaded by Job 0 (which is not shown here).

Stages 7 and 8 are more interesting. I join the two data sets (keyed on date) and map over both of them to get just the stock price (discarding the date). Note that this is done on both stock data series in parallel.

From then on, all work is done in Spark's MLLib library. First, the zip and map is done in Correlation.computeCorrelationWithMatrixImpl. Then, in calculating the Pearson Correlation from the data, it calls RDD.treeAggregate twice.

This method takes an identity and two functions. The first function handles how the data is aggregated within a partition. The second then handles the totals of this function over all partitions. Since this latter function requires "crossing the streams", a stage finishes and a new one begins (Stage 8 which happens to be another call to treeAggregate).

This visualization will also tell you when one of the RDDs is cached - it will be denoted by a green highlighted dot [2] although we do not see this on the picture above.

[1] Cloudera blog.

[2] Databricks blog.

No comments:

Post a Comment