## Monday, May 26, 2014

### Queues and Statistical Quirks

We use Oracle's Coherence Incubator in our project, in particular the Push Replication pattern to move documents from the primary site to a warm, disaster recovery site located 50 miles away. The Push Replication pattern in turn depends on the Messaging Pattern. This involves having in-memory queues of objects that are waiting to be published.

The trouble is, the distribution across these queues is not even. As a result, some queues can be full (and close to causing an OutOfMemoryError thus forcing us to reject client requests) and others empty. We looked at the distribution function but it seems to be more a quirk of probability.

Take this Scala code to demonstrate what I am talking about. Let's have a given number of buckets that have X documents randomly distributed over them each iteration. Let's consume a fixed number from each bucket each iteration (the batch size). If there are less than the batch size in the bucket, we just take them all and that's that. We then let this run for an arbitrary number of iterations.

The simulation code looks like this:

package com.phenry.stats

import scala.util.Random

object PublisherDistributionMain {

def main(args : Array[String]) = {
val numBuckets    = args(0).toInt
val numIterations = args(1).toInt
val numNewDocs    = args(2).toInt
val batchSize     = args(3).toInt
var buckets       = new Array[Int](numBuckets)

for (i <- 0 to numIterations) {
populate(buckets, numNewDocs)
buckets = depopulate(buckets, batchSize)
}
print(buckets)
}

def print(buckets : Array[Int]) : Unit = {
val largest = max(buckets)

buckets.map(_ * 50 / largest).foldLeft(0)((acc, elem) => {
println("%2d : ".format(acc) + "#" * elem)
acc + 1
})

val stats = new MeanAndStdDeviation
println("\nmean = %.3f, std dev = %.3f".format(stats.mean(buckets), stats.stdDev(buckets)))
}

def max(buckets : Array[Int]) : Int = {
buckets.reduce((x, y) => if (x > y) x else y)
}

def depopulate(buckets : Array[Int], batchSize : Int) : Array[Int] = {
buckets.map(x => if (x < batchSize) 0 else (x - batchSize))
}

def populate(buckets : Array[Int], numNewDocs : Int) : Unit = {
val random = new Random
for (i <- 0 to numNewDocs) {
val bucket = random.nextInt(buckets.length)
buckets(bucket) = buckets(bucket) + 1
}
}
}

With a simple stats class (MeanAndStdDeviation) looking like this:

package com.phenry.stats

class MeanAndStdDeviation {

implicit def integer2Double(x : Integer)          = x.doubleValue()
implicit def integer2DoubleList(x : List[Int])    = x.map(i => i.doubleValue())
implicit def integer2DoubleList(x : Array[Int])   = x.map(i => i.doubleValue())
implicit def double2Integer(x : Double)           = x.intValue
implicit def double2IntegerList(x : List[Double]) = x.map(d => d.intValue)

val mean      = (x : Array[Int])      => sum(x) / x.length

def sum(results : List[Double]) : Double = {
results.foldRight(0d)((x, y) => x + y)
}

def stdDev(results : Array[Int]) : Double = {
val theMean = mean(results);
var sumOfSquaredDiffs = 0d
for (aResult <- results) sumOfSquaredDiffs += math.pow((aResult - theMean), 2)
math.sqrt(sumOfSquaredDiffs / results.length)
}

def sum(results : Array[Double]) : Double = {
results.reduce((x, y) => x + y)
}

.
.

(Scala really is nice for writing succinct functions...)

The results after 100 000 iterations look like this:

0 : ################
1 : ###############################
2 : ###############
3 : ##############
4 : #################
5 : ####
6 : ##########
7 : ######################
8 : ##############################
9 : ##################
10 : ##############
11 : ############
12 : ################
13 : ##############
14 : ##############################
15 : ####
16 : ##################################################
17 : ################
18 : ###################
19 : #############

that is, very lumpy. Bucket #15 has a fraction of what bucket #16 has, for instance.

As a result, our system will reject users' requests complaining it has not enough space despite the fact that many nodes in the cluster have plenty of memory.

Time to write a new consumer algorithm...