I've been writing some code that performs TFIDF on a corpus of text made up of M number of records. Given the frequency of each term, I can build a normalized vector for each record made up of the terms in it. To reduce complexity, I ignore terms that appear more than W times.
Cosine Similarities and Matrix Multiplication
(Note: since Spark 1.6.0, there is a method in IndexedRowMatrix.columnSimilarities to compute cosine similarities. However, this only delegates to an old 1.2.0 method that uses DIMSUM. This blows up on anything remotely large with OOMEs. See here for more information).
If we have a set of vectors that we want to multiply together to find their cosine similarities, we can imagine them as a matrix where each row in that matrix is one of our vectors.
So, imagine we have M vectors { v_{1}, v_{2}, v_{3}, ... v_{M} }. Each vector is made up of N elements so, for instance, vector v_{1 }can be expressed as { v_{11}, v_{12}, v_{13 }... v_{1N} }.
We can express this as a matrix that looks like:
v_{11}  v_{12}  v_{13}  ...  v_{1N} 
v_{21}  v_{22}  v_{23}  ...  v_{2N} 
v_{31}  v_{32}  v_{33}  ...  v_{3N} 
.  .  .  ...  . 
v_{M1}  v_{M2}  v_{M3}  ...  v_{MN} 
(You'll notice that each row represents a record and each column represents a term. If a term appears in that record then v_{ij} is nonzero).
What we want is a matrix that holds every vector multiplied by every other vector. For instance, for v_{1} and v_{2} we want
v_{11}v_{21}+ v_{12}v_{22}+ v_{13}v_{23 }+... v_{1N}v_{2N}
Well, this is the same as multiplying the matrix by its own transpose:


So, using the brevity of summation notation, if this matrix is called A then:
N  
A_{ij} =  Σ  v_{in}v_{jn}  
n=1 
Or, using Einstein notation, we can state it even more succinctly:
A_{ij} = v_{in}v_{jn}
since the n index is repeated.
Given two matrices, one m x n the other n x p, the complexity of multiplying them is O(mnp). In our case, we're multiplying a matrix by its transpose so p = m. Therefore, the complexity is O(m^{2}n)  pretty bad.
The good news is that our matrix is sparse. How sparse depends on our variable W that determines at what point we start discarding terms in our TFIDF stage.
Spark Implementation
Since we are just multiplying a matrix by its transpose, we might want to use BlockMatrix.multiply. However, the documentation tells us:
"The output BlockMatrix will only consist of blocks of DenseMatrix. This may cause some performance issues until support for multiplying two sparse matrices is added."Since M and N for us is typically hundreds of millions, this is a problem.
So, we might want to just manually multiply any two vectors, i and j, in our equation for A_{ij}. The trouble is that if we have two vectors in a Spark RDD, v_{i} and v_{j}, they may very well live in different partitions. And since we want to calculate v_{i} . v_{j} for all i,j then we'll definitely be hitting all partition combinations and there will be O(M^{2}) multiplications.
We want to reduce the amount of interpartition shuffling and avoid O(M^{2}) multiplications of Nsized vectors. One solution might be to take each vector and for each nonzero element attach it's column ID which we'll call k. We can then reduceByKey (where the k is the key) and start multiplying out the set of values for each key.
That is, associated with every column, k, there will be a set of vectors that have nonzero elements for their kth element. There will be a maximum of W of them since that's the limit we imposed on how often a term appears before we consider it insignificant. We'll call this set K where in setbuilder notation:
K = { v_{xk} ∈ A  v_{xk} ≠ 0 and K ≤ W}
We then take a Cartesian product of this set and attach the tuple (i,j), that is, its coordinates such that we have a set, D, where:
D^{k} = { ((i,j), v_{ik }. v_{jk})  v_{ik}, v_{jk} ∈ K and i < j}
We ignore the case where i=j since we're not interested in the cosine similarities of a vector with itself. We know it's 1 as they're unit vectors.
Note, that this can be computationally expensive as it scales as O(W^{2}).
Now we can reduceByKey on the coordinates (i,j) and we'll have obtained the cosine similarity for all i, j.
Validity of this approach
Mathematically, if our column index is k and we call the contribution of the term corresponding to k to any cosine similarity between any pair of vectors (i,j), D_{i,j}^{k} then:
D_{i,j}^{k} = v_{ik}v_{jk}
which is just our equation above for A_{ij} (with k=n) thus proving that this technique is mathematically correct at least. How good its performance is something else...
By making the key the first element of a tuple, all values will live in the same partition. Spark will partition Tuple2s according to the first element in the tuple.
Partitions
We can demonstrate this so:
type Pair = (Long, Long)
val pairs = (1 to 100).map(x => (x.toLong, Random.nextLong()))
val rdd = sparkContext.parallelize(pairs)
which gives us an RDD of random pairs of Longs, the first being between 1 and 100. Now we create a function that can map over the RDD assigning each element a number unique to the partition it's in:
type Enumerated[T] = (Int, T)
val idGenerator = new AtomicInteger(0)
def partitionMapper[T]: Iterator[T] => Iterator[Enumerated[T]] = { pairIterator =>
val partitionId = idGenerator.getAndIncrement()
val enumerated = ArrayBuffer[Enumerated[T]]()
pairIterator.foreach { pair =>
enumerated += ((partitionId, pair))
}
enumerated.toIterator
}
Finally, we see the output:
val partitioned = rdd mapPartitions partitionMapper[Pair]
val inMem = partitioned.collect()
val groupedPairs = inMem.groupBy(_._1).toSeq.sortBy(_._1)
groupedPairs foreach { case (partitionId, enumerated) =>
val vals = enumerated.map(_._2._1)
println(s"partition #$partitionId: ${vals.min} to ${vals.max} ")
}
Which produces:
partition #0: 51 to 75
partition #1: 1 to 25
partition #2: 26 to 50
partition #3: 76 to 100
Very neat.
Conclusion
It looks like I am not the only person who is rollinghisown distributed matrix multiplication solution using Spark (see this link here). It seems others have found you don't get it outofthebox. There appear to be moves afoot to address this.
No comments:
Post a Comment