Friday, July 1, 2016

Cosine Similarities on a AWS cluster

De-duping data is perennial problem that must be solved before you can do anything interesting with big data. In our case, we have about a terabyte of data that represents all the companies in the world from various sources. There is obviously duplication between sources but also within sources as their data is dirty (aside: I know one startup that's buying data for their domain, de-duping it and selling it back to the vendor thus cutting costs. There's money in them there dupes).

The algorithms used in this are TF-IDF and Cosine similarity. TF-IDF is simply the ratio of the frequency of a term in the entire corpus of text to the frequency of that term in a single document that makes up that corpus. Cosine similarity is a little harder but not beyond anybody with a secondary education in maths. However, other people have explained it better than me (see here) so I won't dwell on it.

First, we did a Proof of Concept on an AWS cluster. This was just randomly generated data on a Spark cluster on five c4.8xlarge boxes. The random data was a 100 million x 100 million sparse matrix (by sparse, I mean one data point every hundredth element). There were a few hiccups (see here and here for performance tuning hints) but we eventually got it to work. Processing the Cosine similarity took a mere 28 minutes. Cool.

But like any PoC, it had the wind knocked out of its sails when it met the real world. The first problem was the dictionary for the TF-IDF was too large to be broadcast to all the nodes since there is a built in memory limit to any single object that is sent across the wire between nodes (see here and here). So, we needed somewhere we could store the dictionary and chose HBase.

The next performance problem that my PoC  did not prepare me for was how long it took to load (and parse) a terabyte of data that happened to be in XML format on HDFS. I hit two problems here and both were discovered by judicious use of jstack running on the executor nodes. Something like:

while ( true ) ; do { jstack `jps | grep CoarseGrainedExecutorBackend | awk '{print \$1}'` | grep -A35 ^\\\"Execut ; uptime ; sleep 20 ; } done

 (I recommend this approach no matter which Big Data framework you are using).

The first problem was lock contention coming from Xerces. In Scala, parse your XML with something like:

scala.xml.XML.loadXML(xmlString, saxParser)

where is saxParser is cached. This will save a huge amount of contention.

The second problem was thread contention in Spark's use of Kryo. It was instantiating a serializer on each deserialization it seemed (see here) resulting in shuffles taking about 1.2 hours (max. 5.1 hours). When I used plain old Java serialization, the average was about 3.5 minutes (max. 8 minutes). This made Java serialization faster than Kryo! Upgrading from Spark 1.3.1 to 1.6.1 is expected to fix this.

In both cases, the load average went up from about 1.0 to 20.0 after  the fix was implemented. This is a good thing (these are 32-core machines). If the beefy boxes in your cluster have a load average of about 1.0 then you're clearly not using all their processing ability.

The Cosine similarities result in pairs of IDs. Running Spark's GraphX over these pairs finds a graph of entities that are similar. However, there is a problem here that manifested itself by one partition taking what felt like forever to finish when all the other partitions had finished (resulting in the job timing out). Here, jstack wasn't so useful. I could see all the time was taken in Scala's Seq.toSet function that lead me on a wild goose chase wondering why Scala's collection library was so damned slow. In fact, it was the data that was causing a problem.

All my tests looked pretty good but when faced with huge amounts of realistic data, this strange phenomena (typically experienced in Single-linkage Clustering as we're doing here) was seen. Imagine we are making chains of similar words and putting matching clusters together. Imagine this chain:

cat -> mat -> mate -> make -> Mike

(where -> means "is similar to") then they're all grouped together and cat is similar to Mike - clearly nonsense. My code was then doing a 2nd round check (nothing to do with Cosine similarities this time) and comparing each element with every other. With X elements this lead to X(X-1) comparisons. In my tests, this was fine but in the real world, X was about 10 000 and so there were millions of comparisons.

For the time being, I've increased the threshold of similarity to 0.9 (out of a maximum of 1.0) and I don't see this phenomena any more. However, I have to think of a more robust solution for the long term, such as Louvain Modularity.

[Aside - final note on Cosine similarities: Cosine Similarity is not a distance metric (as pointed out on this StackOverflow answer)]

No comments:

Post a Comment