Typically, a small amount of code is executed against a large amount of data in Spark etc. Because of this, your code needs to be efficient. I've just come a code that was taking more time to read and parse the data than to actually do any computations on it!
Don't be lazy
The problem showed up when I looked at the threads of the cluster's executors (you don't need jstack to do this; you can do it from the Spark GUI). Most executor threads were BLOCKED on accessing a lazy val while only one thread was RUNNABLE while accessing it. Oops - lazy vals use synchronization under the covers (see here for why lazy can be expensive).
Not always obvious
Another innocuous looking piece of code might be this:
private static final Random random = new java.util.Random();
static long[] generateRandom(int num, long maxExclusive) {
long bitMask = randomBitMask(maxExclusive);
int index = 0;
long count = 0;
long[] results = new long[num];
while (index < num) {
long nextVal = count ^ bitMask;
if (nextVal < maxExclusive) {
results[index] = nextVal;
index++;
}
count++;
}
return results;
}
private static long randomBitMask(long maxExclusive) {
long seed = random.nextLong();
return seed & createMask(maxExclusive);
}
private static long createMask(long highest) {
long leftMost = Long.highestOneBit(highest);
return (leftMost << 1) - 1;
}
This is an attempt at implementing a non-repeating list of random numbers using a Linear Feedback Shift Register.
Basically, I wanted a list of (semi-) random numbers that don't repeat themselves. If you want only a few from a large set, you might just get a random one and add it to the list, re-trying if you've seen it before. This is fine but if you want lots of numbers out of a set only slightly larger (that is when num is comparable to maxExclusive), you'll be doing a lot of retrying. This is where the code above comes in. Unfortunately, it is considerably slower than the "try again" approach when num is a good deal less than maxExclusive.
So, benchmarking with this in the pom.xml:
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>${jmh.version}</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>${jmh.version}</version>
</dependency>
(where jmh.version is 1.11.3) and something in a src/test/java that looks something like this:
public static final int LARGE = 100000000;
/**
Result "javaRandomNumbersBenchmark_LARGE":
198.036 ±(99.9%) 37.176 ops/s [Average]
(min, avg, max) = (123.962, 198.036, 290.256), stdev = 42.812
CI (99.9%): [160.861, 235.212] (assumes normal distribution)
Benchmark Mode Cnt Score Error Units
RandomStatsBenchmark.javaRandomNumbersBenchmark_LARGE thrpt 20 197.703 ± 36.994 ops/s
*/
@Benchmark
@Fork(1)
public void javaRandomNumbersBenchmark_LARGE() {
JavaRandomStats.generateRandom(10, LARGE);
}
/**
Result "simpleUniqueRandomNumbersBenchmark_LARGE":
3103855.467 ±(99.9%) 24691.158 ops/s [Average]
(min, avg, max) = (2845502.900, 3103855.467, 3277692.295), stdev = 104543.910
CI (99.9%): [3079164.308, 3128546.625] (assumes normal distribution)
*/
@Benchmark
@Fork(1)
public void simpleUniqueRandomNumbersBenchmark_LARGE() {
uniqueRandomNumbersSmallNumLargeMax(10, LARGE);
}
More contention
Spark's Broadcast variables are an efficient way to access read-only data without making network calls. However, in the documentation examples, you'll see code like this:
records mapPartitions { rowIter =>
rowIter map { case (record) =>
val x = broadcastX.value
// ... do something with x
}
}
But I noticed a lot of contention when reading the value deep down in TorrentBroadcast.readBroadcastBlock (version 1.3.1). Re-writing the code slightly to:
records mapPartitions { rowIter =>
val x = broadcastX.value
rowIter map { case (record) =>
// ... do something with x
}
}
meant the executor threads suffered far less contention as the broadcast variable is only obtained once. Doing this, another 10% or so of time was saved.