Monday, January 31, 2022

Spark Log Likelihood

Log-likelihood

I wanted to see how the number of iterations in a Generealized Linear Model effected the performance of my mode. One way to do this is with log-likelihood. "Probability is used before data are available to describe plausibility of a future outcome, given a value for the parameter. Likelihood is used after data are available to describe plausibility of a parameter value." [Wikipedia]. In Bayesian inference, if P(C|D) is the probability, P(D|C) is the likelihood. We take the logs to make the maths easier.

Spark

My model was built using Spark's ML library. With it, I was trying to model the number of hours spent waiting in A&E. If the patient waits more than 4 hours, they have breached our target. The code looks like this:

from pyspark.ml.regression import GeneralizedLinearRegression
glr = GeneralizedLinearRegression(family="binomial", link="logit", featuresCol=OUTPUT_COL, labelCol=self.dependent_column, predictionCol=PREDICTION_COLUMN)

Since I am modeling the number of hours waiting, a binomial is an appropriate distribution.

Unfortunately, Spark does not appear to have a log-likelihood function out-of-the-box for GLMs. So, I tried to write my own.

In the Python library, StatsModels, the (abbreviated) code for Logit log likelihood is:

def loglike(self, params):
    q = 2*self.endog - 1
    X = self.exog
    return np.sum(np.log(self.cdf(q*np.dot(X,params))))

def cdf(self, X):
    X = np.asarray(X)
    return 1/(1+np.exp(-X))

from discrete_model.py.

This I turned into PySpark code:    

from scipy import special

    def log_likelihood(self) -> float:
        '''
        PySpark version of the code in StatsModel's discreet_model.Probit.loglikelihood function.
        :return: the log likelihood of the data on which this object was fitted.
        '''

        log_likelihood = "log_likelihood"
        inverse = "inverse"
        lin_pred = "lin_pred"
        df = self.df
        params = self.model.coefficients
        intercept = float(self.model.intercept)

        @F.udf
        def dot_params(exog) -> float:
            vecs = exog.toArray()
            assert len(vecs) == len(params)
            x_dot_params = sum(map(lambda x: x[0] * x[1], zip(vecs, params)))
            return float(x_dot_params) + intercept

        @F.udf
        def loglike_obs(endog, mu) -> float:
            n = 1.
            y = endog * n
            x = (special.gammaln(n + 1) - special.gammaln(y + 1) -
                    special.gammaln(n - y + 1) +
                    y * math.log(mu / (1 - mu + 1e-20)) +
                    n * math.log(1 - mu + 1e-20))
            return float(x)

        df = df.withColumn(lin_pred, dot_params(F.col(OUTPUT_COL)))  # + offset_exposure = 0. by default
        df = df.withColumn(inverse, 1. / (1 + F.exp(-1 * F.col(lin_pred))))
        df = df.withColumn(log_likelihood, loglike_obs(F.col(self.dependent_column), F.col(inverse)))
        df = df.agg(F.sum(log_likelihood).alias(log_likelihood))
        rows = df.select(log_likelihood).collect()
        return rows[0][0]

Messy but it works.

AIC

As it happens, I didn't need to bother. Log-likelihood can be derived from a method that is already in the Spark API.

Once you call GeneralizedLinearRegression.fit, you'll have a GeneralizedLinearRegressionModel. An attribute of this is the summary (of type GeneralizedLinearRegressionTrainingSummary). From this you can get the AIC:

>>> model.summary.aic
116384.05296519656

The AIC is the Akaike Information Criteria, a metric that uses Kullback-Leibler divergence. The equation is a function of the log-likelihood and the number of parameters in the model (including the intercept).

Note that the AIC is calculated based on the choice of family when we create our GeneralizedLinearRegression object.

To get the log-likelihood, some code like this:

        aic = model.summary.aic
        k = (data_frame.count() - model.summary.degreesOfFreedom)
        log_likelihood = ((2 * k) - aic) / 2

should suffice.

I'd previously used log-likelihood as a metric with which I could compare the efficacy of "number of iterations" values. Here, the data was the same and so was the model. AIC is useful if we're comparing two (or more) different models.

Thursday, January 20, 2022

Impure Evil

"Debugging is like being the detective in a crime movie where you are also the murderer."

Here's a case study of the dangers of using functions that are not pure in a lazily evaluation environment. While it's obvious that the rand function is not pure, it's less obvious with others.

The requirements

We have a table of national accident & emergency events that also contains patient data. We want just the patients from this data. Some patients appear more than once. The question is: how do we reduce multiple rows for the same patient into a single row?

This is not as straightforward as it sounds. The same patient may appear in the events that are years apart. In that time, they may have live in a different area code. So, one solution was to just choose any as it was not overly important. Therefore, we used the first function. After all, why not? We just want an arbitrary value in the event we have more than only one choice.

Once we have this data, we want to express the ratio of patients with property Z to those without segmented on hospital attended. It is not relevent what Z actually is but we will have to make two calculations: one for the set of those with Z and one for all patients irrespective of Z.

The problem

When we ran this in Spark and saw some values where the numerator was greater than the denominator. 

The diagnosis

The problem arises when we use impure functions, especially with lazy evaluation.

  1. The first function will return a non-deterministic value for patients with multiple attendances. There is no guarantee which it will choose.
  2. Spark is lazy by default.
  3. So, upon calculating the denominators, first is invoked and it gives X for a given patient. Then, when calculating the numerators, first is invoked again (because of #2) but this time returns Y (because of #1) for the same patient. The result is that sometimes more people will fit the criteria for the numerator than the denominator for any given slice-and-dice of the data.
Note that this would also be true of most database implementations. For instance, SQL Server's FIRST_VALUE is also non-deterministic.

Solution

Calling cache() will sometimes help but is not robust. If, for instance, an executor crashes, the data is regenerated by another instance and it may not necessarily be the same as what was generated before.

A better solution is to adopt a deterministic algorithm when choosing from a collection of values. We used max but it could be anything else. 

However, note that there is no guarantee that there is a consistency between values. For instance, we have a patient's area code expressed as a code recognised nationally and also as a code recognised by the health providers. There should be a one-to-one mapping between them. But given more than one of each, applying a lexical max to both may not produce a valid pair.