Tuesday, July 17, 2018

Spark CLI nohup


You may package a JAR and use spark-submit to submit the code to Spark. But sometimes you want to hack and sometimes this hack might be long running query. How do you keep spark-shell running after you have gone home?

It took me some fiddling but this works (with a bit of help from StackExchange).

In Unix shell #1, do:

mkfifo my_pipe
nohup spark-shell YOUR_CONFIG < my_pipe > YOUR_OUTPUT_FILE

Now in Unix shell #2, do:

nohup cat YOUR_SPARK_SCALA > my_pipe 2>/dev/null &

You should now see the Spark shell jump into life.

Now, back in shell #1, press CTRL+z, type:

jobs

identify your application's PID and type

bg PID

You may now logoff and go home.


Thursday, July 12, 2018

Are you better?


There is a certain amount of randomness in neural nets. Run it twice and you're unlikely to get the same answer. So, when making changes, how do we know things have improved?

I ran exactly the same Spark neural net code on slightly different data. The data represented the same use case but in one run I used L1 normalization and in the other L2. For each normalization technique, I ran the neural net 9 times. The accuracy looked like this:
Accuracy of a neural net. Red is when using data with L2 normalization, blue with L1.
At first blush, it looks like L1 is better but it's not certain. After all some of the high values for L2 accuracy are higher than the low values for L1. And since each normalization only has 9 data points each, could this apparent difference just be due to luck?

Accuracy (%)Std. Dev. (%)
L194.90.36
L294.30.42


The Classical Approach

The standard deviation in a sample can be used to estimate the standard deviation of the whole population. The standard deviation of the mean is σ/√N (aka the standard error) where σ is the (true) standard deviation of the population and N is the number of measurements per trial [Boaz p772].

To demonstrate, let's take 5 observations 1 million times. We'll compare the calculated value with the results of a simulation in Python:

    N = 5
    Nsamp = 10 ** 6
    sigma_x = 2

    x = np.random.normal(0, sigma_x, size=(Nsamp, N))

    mu_samp = x.mean(1)  # 10 ** 6 values that are the mean of each row
    sig_samp = sigma_x * N ** -0.5

    print("{0:.3f} should equal {1:.3f}".format(np.std(mu_samp, ddof=1), sig_samp))

This outputs:

0.894 should equal 0.894

(Code stolen from here).

Using this relationship between the standard deviation of the samples and the true mean, we can then conduct a t-test.

"Using a t-test allows you to test  
  • whether the mean of a sample differs significantly from an expected value, or 
  • whether the means of two groups differ significantly from an expected value, or 
  • whether the means of two groups differ significantly from each other.
"[William 'Student'] Gosset's key observation was the dependence on sample size for determining the probability that the mean of the population lies within a given distance of the mean of the sample, if a normal distribution is assumed... Gosset noted that when samples are collected from a normal distribution, and if the number of samples is small, and these are used to estimate the variance, then the distribution (for the variable x):

t = x̄ - μ / (σ/√N)

is both flatter, and has more observations appearing in the tails, than a normal distribution, when the samples sizes are less than 30... This distribution is known as the t distribution and approximates a normal distribution if n (and by implication [degrees of freedom]) are large (greater than 30 in practical terms)." [1]

All things being equal, "most of the time, you’d expect to get t-values close to 0. That makes sense, right? Because if you randomly select representative samples from a population, the mean of most of those random samples from the population should be close to the overall population mean, making their differences (and thus the calculated t-values) close to 0... The t-value measures the size of the difference relative to the variation in your sample data... T is simply the calculated difference represented in units of standard error" [from a blog here].

Calculating the t-value in Python is easy:

t_value, p_value) = stats.ttest_ind(data.l1, data.l2, equal_var=False)
print("t = {0:.3f}, p = {1:.3f}".format(t_value, p_value))

t = 2.755, p = 0.014

The values for p (probability of getting a given t-value) and the t-value itself are related. Given a distribution that represents the PDF for a value of t, then p is the area under that curve that represents the probability of having that t value or less.

"Confidence limits are expressed in terms of a confidence coefficient. Although the choice of confidence coefficient is somewhat arbitrary, in practice 90%, 95%, and 99% intervals are often used, with 95% being the most commonly used." [NIST]

Given our data, it is unlikely they had been drawn from the same universe of data because p=0.014. This result is in our 95% (if not 99%) confidence interval.

[Aside: "As a technical note, a 95% confidence interval does not mean that there is a 95% probability that the interval contains the true mean. The interval computed from a given sample either contains the true mean or it does not. Instead, the level of confidence is associated with the method of calculating the interval. The confidence coefficient is simply the proportion of samples of a given size that may be expected to contain the true mean. That is, for a 95% confidence interval, if many samples are collected and the confidence interval computed, in the long run about 95 % of these intervals would contain the true mean." [NIST]]

The Bayesian Approach

"If you have taken a statistics course, you have probably been taught this technique (although not necessarily learned this technique... you may have felt uncomfortable with the derivation" [2].

So, let's now refer to Bayesian Methods for Hackers (at github). I assumed that, given the data, the true standard deviation uniformly distributed between 0% and 2%, and that the means were uniformly anywhere between 90% and 99%. Finally, I assumed the values for the accuracy are distributed by a Gaussian.

These assumptions may or may not be correct but they're not outrageous and PyMC should find the optimum answer anyway.

The probable difference between the mean accuracys look like this (code available on my own GitHub repository):
Histogram of a the difference in accuracy between L1 and L2 normalization using the same ANN.
Which shows that, given what we know, there is most likely an improvement in using L1 over L2 of about 0.5%, a quantity the classical approach did not tell us. There is a possibility that despite the data, L2 is actually better than L1 but as we can see from the graph this is very unlikely.

[1] Statistics in a Nutshell, O'Reilly
[2] Bayesian Methods for Hackers, Davidson-Pilon

Python Crib Sheet #2


Context managers - aka 'with'

"Context managers wrap a block and manage requirements on entry and departure from the block and are marked by the with keyword. File objects are context managers... we know that the file will be closed immediately after the last read, whether the operation was successful or not... closure of the file is also assured, because it’s part of the file object’s context management, so we don’t need to write the code. In other words, by using with combined with a context management (in this case a file object), we don’t need to worry about the routine cleanup." [1]

In pseudo-code (from effbot):

    class controlled_execution:
        def __enter__(self):
            set things up
            return thing
        def __exit__(self, type, value, traceback):
            tear things down

    with controlled_execution() as thing:
         some code


* Operator

"A special parameter can be defined that will collect all extra positional arguments in a function call into a tuple" [1]. See the example here where "zip is its own inverse". That is, it can both turn two lists into one list of tuples (as you'd expect) and also turn a list of tuples into two lists.

For example:

>>> def g(*x):
...     for a in x:
...         print(a)
... 
>>> g(*(1, 2))
1
2
>>> g((1, 2))
(1, 2)
>>> g(*[(1, 2), (3, 4)])
(1, 2)
(3, 4)
>>> g([(1, 2), (3, 4)])

[(1, 2), (3, 4)]

That is, adding * means the tuple or list becomes a sequence of their constituents.

To summarize:

def g(*x)def g(x)
g((1, 2))[(1,2)][1, 2]
g(*(1, 2))[1, 2]TypeError


The future works (certain T&Cs may apply)

"It’s possible to import several features of Python 3, like division, into Python 2.x by using the __future__ module; but even if you imported all the features of the __future__ library, the differences in library structure, the distinction between strings and Unicode, and so on all would make for code that would be hard to maintain and debug." [1]

"With __future__ module's inclusion, you can slowly be accustomed to incompatible changes or to such ones introducing new keywords." (StackOverflow).

An example is the differences between integer division in Python 2 and 3. In Python 2, the result is another integer, in Python 3 it's a float. Using __future__ can make Python 2 work like Python 3:

$ python
Python 2.7.12 (default, Nov 19 2016, 06:48:10)
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> 1/3
0
>>> from __future__ import division
>>> 1/3
0.3333333333333333

[1] The Quick Python Book

Monday, June 18, 2018

To Normalize or Not?


I've demonstrated in previous posts that pre-processing our data improved our accuracy more than hyperparameter tuning. But "preprocessing in machine learning is somewhat a very black art" as this StackExchange post says. "It is not written down in papers a lot why several preprocessing steps are essential to make it work... To make things more complicated, it depends heavily on the method you use and also on the problem domain."

All the answers on this page are enlightening - that given 4 points for age and height of people, the clustering is different if we use metric or imperial measurements since the distance between them will be different; that normalizing tends to make clusters in K-Means more distinct; that "if you have a neural network and just apply an affine transformation [x' = ax + b] to your data, the network does not lose or gain anything in theory. In practice, however, a neural network works best if the inputs are centered and white." There is a caveat for regularization.

I looked into this last point and did some (non-exhaustive) experiments. Results are from Spark's MultilayerPerceptronClassifier using 100 epochs. As ever, results are indicative and not to be taken too seriously (for instance, a difference of a percent may just be noise).

Word VectorSentence VectorFeature VectorAccuracy (%)
L1NoneStandardized95
L1L2Standardized95
L2L2Standardized94
L2NoneNone94
L1NoneNone94
L1L2None94
NoneL2Standardized84
NoneNoneNone81
NoneNoneStandardized81
NoneL2None78
NoneL1None77
NoneL2L18
L1NoneL17
NoneNoneL16
StandardizedNoneNone6
L2L2L15
L2NoneL13

All sentence vectors were constructed from simply adding the relevant word vectors together.

Although I spent less time on TensorFlow, I also noticed that pre-processing was important there too. Using the same architecture, unnormalized data gave me about 50% accuracy while L2 normalized words gave me 93% accuracy. Neither accuracy seemed to be increasing after the 1000 epochs I let them run for.

Changing tack completely, I also one-hot encoded my words to form sentence vectors and achieved a respectable but not stellar accuracy of 84%.

I thought that perhaps I was helping the neural net by pre-processing but it would still learn anyway if given enough time. This appears not to be true. Taking the data with no processing of the word, sentence and column vectors, the accuracy with increasing number of epochs looked like this:

Number of epochsAccuracy (5)
10080.9
20082.6
100084.7
200085.3
500085.3

This demonstrates a clear plateau in accuracy at about 85% when we know we can achieve an accuracy as high as 95% using the same classifier and the same data (if rendered somewhat differently).


Thursday, June 7, 2018

Spark Query Plans (Gotcha!)


I was exploring why Spark was taking hours to run a support vector machine on my data when other ML algorithms were taking minutes. I called .explain() on my DataFrame but I found surprisingly little information on the web concerning interpreting Spark query plans.

The best source I found was this at Simon Fraser University. The take-away points are:

  • "Read [query plans] from the bottom up."
  • Exchanges eg "Exchange hashpartitioning(..." are shuffles.
  • InMemoryRelation and InMemoryTableScan "will look in memory for data, calculating and caching if necessary". 
  • Range and Project are from select()s, withColumn() etc

Playing around, I found InMemoryRelation and InMemoryTableScan can be generated by adding a .cache().

scala> val df = sc.range(1, 100000).toDF
scala> df.explain()
== Physical Plan ==
*SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]

scala> val add1 = df.map { r => r.getLong(0) - 1 }
scala> add1.explain()
== Physical Plan ==
*SerializeFromObject [input[0, bigint, false] AS value#10L]
+- *MapElements <function1>, obj#9: bigint
   +- *DeserializeToObject createexternalrow(value#2L, StructField(value,LongType,false)), obj#8: org.apache.spark.sql.Row
      +- *SerializeFromObject [input[0, bigint, false] AS value#2L]
         +- Scan ExternalRDDScan[obj#1L]

scala> val cachedAdd1 = add1.cache()
scala> cachedAdd1.explain()
== Physical Plan ==
InMemoryTableScan [value#10L]
   +- InMemoryRelation [value#10L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *SerializeFromObject [input[0, bigint, false] AS value#10L]
            +- *MapElements <function1>, obj#9: bigint
               +- *DeserializeToObject createexternalrow(value#2L, StructField(value,LongType,false)), obj#8: org.apache.spark.sql.Row
                  +- *SerializeFromObject [input[0, bigint, false] AS value#2L]
                     +- Scan ExternalRDDScan[obj#1L]

So, that explained the last two actions in my plan but I was no nearer understanding my problem. However, I did notice that there were lots of User Defined Functions in my DataFrame, presumably from Spark's Transformers that took my raw textual data and helped me turn them into vectors.

Could this be my problem? There appear to have been several problems with UDFs (see SPARK-17728 for example) and no shortage of people warning against them (for example here: "Using a UDF implies deserialization to process the data in classic Scala and then reserialize it. UDFs can be replaced by Spark SQL functions, there are already a lot of them and new ones are regularly added.")

But I could not replicate the issue by testing in the Spark shell:

scala> val df = sc.range(0, 5).toDF
scala> val noisyAdd1UDF = udf{ x: Long => println("noisyAdd1UDF = " + x); x + 1 } // not a pure function
scala> val dfWithUdf = df.withColumn("added", noisyAdd1UDF('value))
scala> dfWithUdf.explain()
== Physical Plan ==
*Project [value#283L, UDF(value#283L) AS added#287L]
+- *SerializeFromObject [input[0, bigint, false] AS value#283L]
   +- Scan ExternalRDDScan[obj#282L]

Now, let's show() it twice. Each time, the UDF is re-calculated:

scala> dfWithUdf.show()
noisyAdd1UDF = 0
...
noisyAdd1UDF = 4
+-----+-----+
|value|added|
+-----+-----+
|    0|    1|
...
|    4|    5|
+-----+-----+

scala> dfWithUdf.show()
noisyAdd1UDF = 0
...
noisyAdd1UDF = 2
+-----+-----+
|value|added|
+-----+-----+
|    0|    1|
...
|    4|    5|
+-----+-----+

So, no surprises. Calling cache() stopped subsequent show()s from calling the UDF again as expected.

The Gotcha

It turns out that the problem is not directly with a UDF but with the DataFrame that is created when we add the UDF. The new DataFrame does not inherit its parent's StorageLevel (note that other operations like randomSplit will do the same thing).

We can see this by doing:

scala> val df = sc.range(0, 100000).toDF
df: org.apache.spark.sql.DataFrame = [value: bigint]

scala> df.storageLevel
res67: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)

scala> df.cache()
res68: df.type = [value: bigint]

scala> df.storageLevel
res69: org.apache.spark.storage.StorageLevel = StorageLevel(disk, memory, deserialized, 1 replicas)

scala> val dfWithUdf = df.withColumn("value", noisyAdd1UDF('value))
dfWithUdf: org.apache.spark.sql.DataFrame = [value: bigint]

scala> dfWithUdf.storageLevel
res70: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)

Now, while other Spark classifiers might also user withColumn, they discard the other columns that would call the UDF and thus result in the DataFrame being re-calculated. Whereas, OneVsRest does not do this. Indeed, it cannot as does not know whether those columns will be used by the classifiers it wraps.

And we can see this if we again look at the Query Plans. Our cached DataFrame will use InMemory plans:

scala> df.explain()
== Physical Plan ==
InMemoryTableScan [value#744L]
   +- InMemoryRelation [value#744L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *SerializeFromObject [input[0, bigint, false] AS value#744L]
            +- Scan ExternalRDDScan[obj#743L]

but although it sits on top of this cacheDataFrame, our non-cacheDataFrame will have as its top layer plan, a Project that calls our UDF:

scala> dfWithUdf.explain()
== Physical Plan ==
*Project [UDF(value#744L) AS value#753L]
+- InMemoryTableScan [value#744L]
      +- InMemoryRelation [value#744L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *SerializeFromObject [input[0, bigint, false] AS value#744L]
               +- Scan ExternalRDDScan[obj#743L]

In turn, cacheing this puts another layer of InMemoryXXX on top:

scala> dfWithUdf.explain()
== Physical Plan ==
InMemoryTableScan [value#753L]
   +- InMemoryRelation [value#753L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Project [UDF(value#744L) AS value#753L]
            +- InMemoryTableScan [value#744L]
                  +- InMemoryRelation [value#744L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- *SerializeFromObject [input[0, bigint, false] AS value#744L]
                           +- Scan ExternalRDDScan[obj#743L]

The solution is to use checkpoint(). This collapses the Query Plan, freezing the results and obviates the need to recalculate anything.

Wednesday, June 6, 2018

Spark: checkpoint or cache?


Previously, 20k sentences would take a few minutes using Spark's built-in machine learning algorithms. So, I was surprised that 20k larger documents could take far, far longer.

Note: each data-point was reduced to a vector of length 20 irrespective of whether it was a short sentence or a long piece of text. So document size should have been irrelevant.

After over three hours, my One-vs-All, Support Vector Machine still was not finished.

I managed to fix the problem but first, here are the results for the models I used on the larger documents. Again, the figures are indicative as I did not run the models more than once.

ModelAccuracy (%)
LogisticRegression96.1
MultilayerPerceptronClassifier96.0
NaiveBayes90.7
RandomForestClassifier85.2
LinearSVC and OneVsRest74.2

Despite calling .cache() on every DataFrame in sight and everything apparently fitting into memory with gigabytes to spare, performance was dreadful. (Note that DataSet.cache has a default StorageLevel of MEMORY_AND_DISK whereas RDD.cache defaults to MEMORY_ONLY).

Using jstat, it was clear all the executor threads were involved in some sort of serialization with deep stack-frames in java.io.ObjectInputStream.

So, instead I tried calling checkpoint on the DataFrame before passing it to the machine learning algorithm. This has the effect of removing all lineage and writing to the HDFS directory you must set in sc.setCheckpointDir.

"Broadly speaking, we advise persisting when jobs are slow and checkpointing when they are failing" [1] say Karau and Warren but since my job was so slow it might as well have failed I had nothing to lose.

Indeed, the SVM was now able to process the data in a few minutes. Quite why cacheing didn't work as expected is something of a mystery that I will look at in another post.

[1] High Performance Spark



Monday, June 4, 2018

Manifold Destiny


Manifolds

Manifolds are "'spaces' of points, together with a notion of distance that looked like Euclidean distance on small scales but which could be quite different at larger scales." [1]

"Although there is a formal mathematical meaning to the term manifold, in machine learning it tends to be used more loosely to designate a connected set of points that can be approximated well by considering only a small number of degrees of freedom, or dimensions, embedded in a higher-dimensional space... In the context of machine learning, we allow the dimensionality of the manifold to vary from one point to another. This often happens when a manifold intersects itself. For example, a figure eight is a manifold that has a single dimension in most places but two dimensions at the intersection at the center.

"Many machine learning problems seem hopeless if we expect the machine learning algorithm to learn functions with interesting variations across all of ℝn. Manifold learning algorithms surmount this obstacle by assuming that most of ℝn consists of invalid inputs, and that interesting inputs occur only along a collection of manifolds containing a small subset of points, with interesting variations in the output of the learned function occurring only along directions that lie on the manifold, or with interesting variations happening only when we move from one manifold to another. Manifold learning was introduced in the case of continuous-valued data and the unsupervised learning setting, although this probability concentration idea can be generalized to both discrete data and the supervised learning setting: the key assumption remains that probability mass is highly concentrated."

The manifold hypothesis is the observation "that the probability distribution over images, text strings, and sounds that occur in real life is highly concentrated." [2]

Transforming the data

"The most logical way to transform hour is into two variables that swing back and forth out of sink. Imagine the position of the end of the hour hand of a 24-hour clock. The x position swings back and forth out of sink with the y position. For a 24-hour clock you can accomplish this with x=sin(2pi*hour/24),y=cos(2pi*hour/24).

"You need both variables or the proper movement through time is lost. This is due to the fact that the derivative of either sin or cos changes in time where as the (x,y) position varies smoothly as it travels around the unit circle.

"Finally, consider whether it is worthwhile to add a third feature to trace linear time, which can be constructed my hours (or minutes or seconds) from the start of the first record or a Unix time stamp or something similar. These three features then provide proxies for both the cyclic and linear progression of time e.g. you can pull out cyclic phenomenon like sleep cycles in people's movement and also linear growth like population vs. time" (StackExchange).

Normalizing the data

"If the input variables are combined linearly, as in an MLP, then it is rarely strictly necessary to standardize the inputs, at least in theory. The reason is that any rescaling of an input vector can be effectively undone by changing the corresponding weights and biases, leaving you with the exact same outputs as you had before. However, there are a variety of practical reasons why standardizing the inputs can make training faster and reduce the chances of getting stuck in local optima. Also, weight decay and Bayesian estimation can be done more conveniently with standardized inputs" (comp.ai.neural-nets FAQ).

Just how normalization and standardization can effect the overall accuracy for different optimizers can be seen on Rinat Maksutov's blog here.

In general, it improves performance rather than making a difference. "It is good idea not just to normalize data but also to scale them. This is intended for faster approaching to global minima at error surface" (StackOverflow).

Doing just the opposite, multiplying all elements in all vectors by a random factor between 0.5 and 1.5 made no difference to the accuracy (94.5%) of an ANN in the "20 Newsgroups" data set. By comparison, the choice of optimizer made a huge difference (15.6% for gradient descent irrespective of whether the vectors are normalized or not. The default optimizer for Spark is l-BFGS).

[1] The Princeton Companion to Mathematics
[2] Deep Learning (Goodfellow, Bengio, Courville)