Here's an example of cosine similarities in Spark using the previously described algorithm.
Imagine we have a matrix where the rows are vectors:
0
|
0.5
|
0.5
|
0.4
|
0.25
|
0.5
|
0
|
0
|
0.4
|
0.8
|
0
|
0.25
|
0
|
0
|
1.0
|
0
|
In Spark, this is represented by an RDD of SparseVectors. Each SparseVector represents one row in the matrix.
Recall that for each vector, we attach the corresponding row and column index to each non-zero element. We then reduceByKey using the column index as our key.
We do this to all columns but as an example, let's take the second column (k=2).
0
|
0.5
|
0.5
|
0.4
|
0.25
|
0.5
|
0
|
0
|
0.4
|
0.8
|
0
|
0.25
|
0
|
0
|
1.0
|
0
|
Giving us the tuples { (1, 0.5), (2, 0.5), (3, 0.8), (4, 0) } corresponding to k=2. Note that the first element of each tuple is the row index and the second the value in the highlighted cell.
Since the this tuple is keyed on k, all these values will end up in the same Spark partition.
Partition A | (k=2) -> { (1, 0.5), (2, 0.5), (3, 0.8) } |
Partition B | (k=3) -> ... |
Partition C | ... |
Note that we're not interested in values that are zero so we can discard the last tuple in the original set (zeroes simply disappear). For all the others, we want a Cartesian product of this set with itself and then we want to multiply out the values for each pair. Furthermore, we're not interested in the product of a tuple with itself and we only care about co-ordinates (i,j) when i< j because we don't want to do the same work twice.
So the result will look like this:
{ ((1,2), 0.25), ((1,3), 0.4), ((2,3), 0.4) }
At this point, we're no longer interested in k and can discard it. Each element of the above set moves to a partition according to the first element of the tuple. So, we'd expect something like:
Partition A | |
Partition B | ((1,2), 0.25), ((1,3), 0.4) |
Partition C | ((2,3), 0.4) |
Note that all the interesting information in the example so far was (arbitrarily) on Partition A. Now, it's been shuffled to Partitions B and C. This is just an example. The shuffled data could have ended up in any partition.
If we do the same again for, say, the last column, we'd expect the results of multiplying rows 1 and 3. The other rows have zero in that column so we're not interested in them.
Now, our Spark partitions would look something like:
Partition A | |
Partition B | ((1,2), 0.25), ((1,3), 0.4), ((1,3), 0.1) |
Partition C | ((2,3), 0.4) |
When we're done with all the columns, our partitions will look something like:
Partition A | |
Partition B | ((1,2), 0.25), ((1,3), 0.4), ((1,3), 0.1), ((1,4) 0.5) |
Partition C | ((2,3), 0.4), ((2,3), 0.1) |
Finally, if we reduce on each partition by the co-ordinates key, we'll get:
Partition A | |
Partition B | ((1,2), 0.25), ((1,3), 0.5), ((1,4) 0.5) |
Partition C | ((2,3), 0.5) |
which if we imagine is a matrix looks like this:
0
|
0.25
|
0.5
|
0.5
|
0
|
0
|
0.5
|
0
|
0
|
0
|
0
|
0
|
0
|
0
|
0
|
0
|
But, wait, that's not what our matrix multiplied with its transpose should look like. If we take the matrix at the top of this page and multiply it by its transpose we get:
0.66
|
0.25
|
0.5
|
0.5
|
0.25
|
0.3125
|
0.5
|
0
|
0.5
|
0.5
|
0.8625
|
0
|
0.5
|
0
|
0
|
1.0
|
The reason, of course, is that we're not interested in the diagonal as that's the similarity of a vector with itself (in real life, the diagonals would each have value 1.0 but I wanted to keep the numbers simple). What's more, we're only interested in half the matrix (the upper half) as the lower half is the same and we don't want to do the same work twice (in fact, any matrix multiplied by its transpose will be symmetric like this).
Consequently, only the upper half (the highlighted cells) are the same as the matrix we calculated. The rest of it is of no use to us.