Monday, June 18, 2018

To Normalize or Not?


I've demonstrated in previous posts that pre-processing our data improved our accuracy more than hyperparameter tuning. But "preprocessing in machine learning is somewhat a very black art" as this StackExchange post says. "It is not written down in papers a lot why several preprocessing steps are essential to make it work... To make things more complicated, it depends heavily on the method you use and also on the problem domain."

Clearly, I'm not the only person who is confused as this StackOverflow questions makes clear.

All the answers on this page are enlightening - that given 4 points for age and height of people, the clustering is different if we use metric or imperial measurements since the distance between them will be different; that normalizing tends to make clusters in K-Means more distinct; that "if you have a neural network and just apply an affine transformation [x' = ax + b] to your data, the network does not lose or gain anything in theory. In practice, however, a neural network works best if the inputs are centered and white." There is a caveat for regularization.

I looked into this last point and did some (non-exhaustive) experiments. Results are from Spark's MultilayerPerceptronClassifier using 100 epochs. As ever, results are indicative and not to be taken too seriously (for instance, a difference of a percent may just be noise).

Word VectorSentence VectorFeature VectorAccuracy (%)
L1NoneStandardized95
L1L2Standardized95
L2L2Standardized94
L2NoneNone94
L1NoneNone94
L1L2None94
NoneL2Standardized84
NoneNoneNone81
NoneNoneStandardized81
NoneL2None78
NoneL1None77
NoneL2L18
L1NoneL17
NoneNoneL16
StandardizedNoneNone6
L2L2L15
L2NoneL13

All sentence vectors were constructed from simply adding the relevant word vectors together.

Although I spent less time on TensorFlow, I also noticed that pre-processing was important there too. Using the same architecture, unnormalized data gave me about 50% accuracy while L2 normalized words gave me 93% accuracy. Neither accuracy seemed to be increasing after the 1000 epochs I let them run for.

Changing tack completely, I also one-hot encoded my words to form sentence vectors and achieved a respectable but not stellar accuracy of 84%.

I thought that perhaps I was helping the neural net by pre-processing but it would still learn anyway if given enough time. This appears not to be true. Taking the data with no processing of the word, sentence and column vectors, the accuracy with increasing number of epochs looked like this:

Number of epochsAccuracy (5)
10080.9
20082.6
100084.7
200085.3
500085.3

This demonstrates a clear plateau in accuracy at about 85% when we know we can achieve an accuracy as high as 95% using the same classifier and the same data (if rendered somewhat differently).


Thursday, June 7, 2018

Spark Query Plans (Gotcha!)


I was exploring why Spark was taking hours to run a support vector machine on my data when other ML algorithms were taking minutes. I called .explain() on my DataFrame but I found surprisingly little information on the web concerning interpreting Spark query plans.

The best source I found was this at Simon Fraser University. The take-away points are:

  • "Read [query plans] from the bottom up."
  • Exchanges eg "Exchange hashpartitioning(..." are shuffles.
  • InMemoryRelation and InMemoryTableScan "will look in memory for data, calculating and caching if necessary". 
  • Range and Project are from select()s, withColumn() etc

Playing around, I found InMemoryRelation and InMemoryTableScan can be generated by adding a .cache().

scala> val df = sc.range(1, 100000).toDF
scala> df.explain()
== Physical Plan ==
*SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]

scala> val add1 = df.map { r => r.getLong(0) - 1 }
scala> add1.explain()
== Physical Plan ==
*SerializeFromObject [input[0, bigint, false] AS value#10L]
+- *MapElements <function1>, obj#9: bigint
   +- *DeserializeToObject createexternalrow(value#2L, StructField(value,LongType,false)), obj#8: org.apache.spark.sql.Row
      +- *SerializeFromObject [input[0, bigint, false] AS value#2L]
         +- Scan ExternalRDDScan[obj#1L]

scala> val cachedAdd1 = add1.cache()
scala> cachedAdd1.explain()
== Physical Plan ==
InMemoryTableScan [value#10L]
   +- InMemoryRelation [value#10L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *SerializeFromObject [input[0, bigint, false] AS value#10L]
            +- *MapElements <function1>, obj#9: bigint
               +- *DeserializeToObject createexternalrow(value#2L, StructField(value,LongType,false)), obj#8: org.apache.spark.sql.Row
                  +- *SerializeFromObject [input[0, bigint, false] AS value#2L]
                     +- Scan ExternalRDDScan[obj#1L]

So, that explained the last two actions in my plan but I was no nearer understanding my problem. However, I did notice that there were lots of User Defined Functions in my DataFrame, presumably from Spark's Transformers that took my raw textual data and helped me turn them into vectors.

Could this be my problem? There appear to have been several problems with UDFs (see SPARK-17728 for example) and no shortage of people warning against them (for example here: "Using a UDF implies deserialization to process the data in classic Scala and then reserialize it. UDFs can be replaced by Spark SQL functions, there are already a lot of them and new ones are regularly added.")

But I could not replicate the issue by testing in the Spark shell:

scala> val df = sc.range(0, 5).toDF
scala> val noisyAdd1UDF = udf{ x: Long => println("noisyAdd1UDF = " + x); x + 1 } // not a pure function
scala> val dfWithUdf = df.withColumn("added", noisyAdd1UDF('value))
scala> dfWithUdf.explain()
== Physical Plan ==
*Project [value#283L, UDF(value#283L) AS added#287L]
+- *SerializeFromObject [input[0, bigint, false] AS value#283L]
   +- Scan ExternalRDDScan[obj#282L]

Now, let's show() it twice. Each time, the UDF is re-calculated:

scala> dfWithUdf.show()
noisyAdd1UDF = 0
...
noisyAdd1UDF = 4
+-----+-----+
|value|added|
+-----+-----+
|    0|    1|
...
|    4|    5|
+-----+-----+

scala> dfWithUdf.show()
noisyAdd1UDF = 0
...
noisyAdd1UDF = 2
+-----+-----+
|value|added|
+-----+-----+
|    0|    1|
...
|    4|    5|
+-----+-----+

So, no surprises. Calling cache() stopped subsequent show()s from calling the UDF again as expected.

The Gotcha

It turns out that the problem is not directly with a UDF but with the DataFrame that is created when we add the UDF. The new DataFrame does not inherit its parent's StorageLevel (note that other operations like randomSplit will do the same thing).

We can see this by doing:

scala> val df = sc.range(0, 100000).toDF
df: org.apache.spark.sql.DataFrame = [value: bigint]

scala> df.storageLevel
res67: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)

scala> df.cache()
res68: df.type = [value: bigint]

scala> df.storageLevel
res69: org.apache.spark.storage.StorageLevel = StorageLevel(disk, memory, deserialized, 1 replicas)

scala> val dfWithUdf = df.withColumn("value", noisyAdd1UDF('value))
dfWithUdf: org.apache.spark.sql.DataFrame = [value: bigint]

scala> dfWithUdf.storageLevel
res70: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)

Now, while other Spark classifiers might also user withColumn, they discard the other columns that would call the UDF and thus result in the DataFrame being re-calculated. Whereas, OneVsRest does not do this. Indeed, it cannot as does not know whether those columns will be used by the classifiers it wraps.

And we can see this if we again look at the Query Plans. Our cached DataFrame will use InMemory plans:

scala> df.explain()
== Physical Plan ==
InMemoryTableScan [value#744L]
   +- InMemoryRelation [value#744L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *SerializeFromObject [input[0, bigint, false] AS value#744L]
            +- Scan ExternalRDDScan[obj#743L]

but although it sits on top of this cacheDataFrame, our non-cacheDataFrame will have as its top layer plan, a Project that calls our UDF:

scala> dfWithUdf.explain()
== Physical Plan ==
*Project [UDF(value#744L) AS value#753L]
+- InMemoryTableScan [value#744L]
      +- InMemoryRelation [value#744L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *SerializeFromObject [input[0, bigint, false] AS value#744L]
               +- Scan ExternalRDDScan[obj#743L]

In turn, cacheing this puts another layer of InMemoryXXX on top:

scala> dfWithUdf.explain()
== Physical Plan ==
InMemoryTableScan [value#753L]
   +- InMemoryRelation [value#753L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Project [UDF(value#744L) AS value#753L]
            +- InMemoryTableScan [value#744L]
                  +- InMemoryRelation [value#744L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- *SerializeFromObject [input[0, bigint, false] AS value#744L]
                           +- Scan ExternalRDDScan[obj#743L]

The solution is to use checkpoint(). This collapses the Query Plan, freezing the results and obviates the need to recalculate anything.

Wednesday, June 6, 2018

Spark: checkpoint or cache?


Previously, 20k sentences would take a few minutes using Spark's built-in machine learning algorithms. So, I was surprised that 20k larger documents could take far, far longer.

Note: each data-point was reduced to a vector of length 20 irrespective of whether it was a short sentence or a long piece of text. So document size should have been irrelevant.

After over three hours, my One-vs-All, Support Vector Machine still was not finished.

I managed to fix the problem but first, here are the results for the models I used on the larger documents. Again, the figures are indicative as I did not run the models more than once.

ModelAccuracy (%)
LogisticRegression96.1
MultilayerPerceptronClassifier96.0
NaiveBayes90.7
RandomForestClassifier85.2
LinearSVC and OneVsRest74.2

Despite calling .cache() on every DataFrame in sight and everything apparently fitting into memory with gigabytes to spare, performance was dreadful. (Note that DataSet.cache has a default StorageLevel of MEMORY_AND_DISK whereas RDD.cache defaults to MEMORY_ONLY).

Using jstat, it was clear all the executor threads were involved in some sort of serialization with deep stack-frames in java.io.ObjectInputStream.

So, instead I tried calling checkpoint on the DataFrame before passing it to the machine learning algorithm. This has the effect of removing all lineage and writing to the HDFS directory you must set in sc.setCheckpointDir.

"Broadly speaking, we advise persisting when jobs are slow and checkpointing when they are failing" [1] say Karau and Warren but since my job was so slow it might as well have failed I had nothing to lose.

Indeed, the SVM was now able to process the data in a few minutes. Quite why cacheing didn't work as expected is something of a mystery that I will look at in another post.

[1] High Performance Spark



Monday, June 4, 2018

Manifold Destiny


Manifolds

Manifolds are "'spaces' of points, together with a notion of distance that looked like Euclidean distance on small scales but which could be quite different at larger scales." [1]

"Although there is a formal mathematical meaning to the term manifold, in machine learning it tends to be used more loosely to designate a connected set of points that can be approximated well by considering only a small number of degrees of freedom, or dimensions, embedded in a higher-dimensional space... In the context of machine learning, we allow the dimensionality of the manifold to vary from one point to another. This often happens when a manifold intersects itself. For example, a figure eight is a manifold that has a single dimension in most places but two dimensions at the intersection at the center.

"Many machine learning problems seem hopeless if we expect the machine learning algorithm to learn functions with interesting variations across all of ℝn. Manifold learning algorithms surmount this obstacle by assuming that most of ℝn consists of invalid inputs, and that interesting inputs occur only along a collection of manifolds containing a small subset of points, with interesting variations in the output of the learned function occurring only along directions that lie on the manifold, or with interesting variations happening only when we move from one manifold to another. Manifold learning was introduced in the case of continuous-valued data and the unsupervised learning setting, although this probability concentration idea can be generalized to both discrete data and the supervised learning setting: the key assumption remains that probability mass is highly concentrated."

The manifold hypothesis is the observation "that the probability distribution over images, text strings, and sounds that occur in real life is highly concentrated." [2]

Transforming the data

"The most logical way to transform hour is into two variables that swing back and forth out of sink. Imagine the position of the end of the hour hand of a 24-hour clock. The x position swings back and forth out of sink with the y position. For a 24-hour clock you can accomplish this with x=sin(2pi*hour/24),y=cos(2pi*hour/24).

"You need both variables or the proper movement through time is lost. This is due to the fact that the derivative of either sin or cos changes in time where as the (x,y) position varies smoothly as it travels around the unit circle.

"Finally, consider whether it is worthwhile to add a third feature to trace linear time, which can be constructed my hours (or minutes or seconds) from the start of the first record or a Unix time stamp or something similar. These three features then provide proxies for both the cyclic and linear progression of time e.g. you can pull out cyclic phenomenon like sleep cycles in people's movement and also linear growth like population vs. time" (StackExchange).

Normalizing the data

"If the input variables are combined linearly, as in an MLP, then it is rarely strictly necessary to standardize the inputs, at least in theory. The reason is that any rescaling of an input vector can be effectively undone by changing the corresponding weights and biases, leaving you with the exact same outputs as you had before. However, there are a variety of practical reasons why standardizing the inputs can make training faster and reduce the chances of getting stuck in local optima. Also, weight decay and Bayesian estimation can be done more conveniently with standardized inputs" (comp.ai.neural-nets FAQ).

Just how normalization and standardization can effect the overall accuracy for different optimizers can be seen on Rinat Maksutov's blog here.

In general, it improves performance rather than making a difference. "It is good idea not just to normalize data but also to scale them. This is intended for faster approaching to global minima at error surface" (StackOverflow).

Doing just the opposite, multiplying all elements in all vectors by a random factor between 0.5 and 1.5 made no difference to the accuracy (94.5%) of an ANN in the "20 Newsgroups" data set. By comparison, the choice of optimizer made a huge difference (15.6% for gradient descent irrespective of whether the vectors are normalized or not. The default optimizer for Spark is l-BFGS).

[1] The Princeton Companion to Mathematics
[2] Deep Learning (Goodfellow, Bengio, Courville)