Monday, November 9, 2020

Cache in the Azure

Gotcha!

What surprised me with the Azure/DataBricks/Spark architecture was a lack of cache coherence. Data is pulled from Azure's Gen2 and is stored closer to the Spark executors upon calling cache(). However, if another cluster updates that data, don't expect the first cluster to see it.

In my case, I was using DataBrick's Delta format to overwrite data as outlined here in the Delta Lake docs. But first, a business requirement demanded that if we're overwriting anything, an override flag must be set. However, because of a lack of cache coherence, this check indicated that there was nothing to overwrite even if another cluster had written that partition. This caused my code to overwrite what was there irrespective of the flag! This significantly changes the semantics of my Big Data application.

Furthermore, any writes after cache() has been called are not reflected in the original Gen2 storage in Azure...

"The Delta cache automatically detects when data files are created or deleted and updates its content accordingly. You can write, modify, and delete table data with no need to explicitly invalidate cached data." [DataBricks Delta Cache docs] This doesn't appear to be the case, at least if it's referring to the original file(s).

"Multiple writers across multiple clusters can simultaneously modify a table partition and see a consistent snapshot view of the table and there will be a serial order for these writes" [DataBrick Delta docs] Again, this might be true but not after cache() has been called, it appears.

The only interesting piece of architecture is that the Azure Gen2 storage was mounted using:

dbutils.fs.mount(  
  source       = s"wasbs://${dst}@${account}.blob.core.windows.net/",  
  mountPoint   = x,  
  extraConfigs = Map(s"fs.azure.account.key.$account.blob.core.windows.net" -> dbutils.secrets.get(scope = "discope", key = key))

Investigation continue but a friend at a completely different company that uses Databricks confirmed it's not me to whom this is happening. He too tried to replace a partition with a .option("replaceWhere=...") and although it appeared to work "locally" it did not change the underlying file store changed the underlying data store but the problem is upon further reading. The Delta log has been cached also and is now stale. You need to run spark.catalog.clearCache and then reads are OK.

Sunday, November 8, 2020

Statistical Covariance Part 2

I quoted in a previous post somebody who said that covariance matrices always have a determinant of 0. This isn't strictly true. What can I say? I'm not a very good mathematician.

The argument was that the means are always substracted from each row and with simple algebra you could demonstrate that the determinant is 0. But this ignores the fact that covariance is the expected value of two rows having their expected values subtracted. 

Say we have two different distributions from which we draw combinations. However, for reasons peculiar to the use case, certain combinations are not allowed. When we tabulate the probabilities for this state space, we'll have zeros in some cells. Even though the inner product of the corresponding two probability vectors might not be zero, the expected probability of the two together is.

Note another caveat. Bearing in mind that correlation is just covariance divided by the root of the product of both variances, "the correlation between A and B is only a measure of the strength of the linear relationship between A and B. Two random variables can be perfectly related, to the point where one is a deterministic function of the other, but still have zero correlation if that function is non-linear." [Daniel Shiebler's blog]

Positive Definite

A matrix is positive definite if xT M x > 0 (an equivalent definition is "a symmetric matrix whose eigenvalues are all positive real numbers" - Coding the Matrix, Klein, definition 12.6.1) 

Note that all matrices that can be expressed as ATA are positive semi definite (see this SO answer). The proof is simple: substitute ATA for M above. 

xT M x = xT ATA x = (x A)T(A x)  

and any non-zero vector or real number multiplied by itself is positive. Since our covariance matrix can be expressed as ATA it too is at least positive semidefinite (xT M x ≥ 0). But we know it must also be positive definite as you can't invert M x = 0 for x ≠ 0.

Why this last statement is true can be explained at this elegant SO answer. Basically, if M x = 0 for x ≠ 0 then each row of M must be linarly dependent on each other for the equation 

M i,j xj = 0 ∀ i 

to hold. That is, if you give me (n-1) values in a row, I can tell you the value of the last one. Here's a simple Python/Numpy example where the last value is a row is just the sum of the first two. It's easy to see that when multipled with the vector [1, 1, -1] this matrix would be in the null space:

>>> M = np.asmatrix([[1, 2, 3], [4, 5, 9], [6, 7, 13]])
>>> np.linalg.det(M)
0.0

But if a matrix is linearly dependent, it's determinant must be 0. And if a matrix's determinant is 0, it cannot be inverted. QED.

What if my Matrix is Singular?

We add a tiny amount to make it non-singular. This is called conditioning. It sounds like a hack but it does have a basis in maths. In a frequentist interpretation, this is ridge regression. In a Bayesian interpretation, it's the prior.

Briefly, the argument goes that for ridge regression, we penalize large model parameters. So, instead of minimizing our error (θ X - yactual) we minimize our error plus the penalty:

yestimate = (θ X - yactual)2 + λ θT θ

by differentiating with respect to θ. Solve this equation and you'll see a [XT X + λ I]-1.

The argument for the Bayesian prior briefly goes like this: if we take a frequentist view and assume that the error in our data is Gaussian and plug yestimate into it, we'll see our familiar equation for a Gaussian multiplied by eλθTθ. Since the Bayesian posterior,  p(θ|Data,Model) must equal the frequentist probability, eλθTθ is the only term that maps to p(θ|Model) simple because it's the only one with θ in it. Therefore, our conditioning manifests itself in the prior.

Full derivations appear in Jake VanderPlas' wonderful blog.

Friday, November 6, 2020

Scala Proofs - Part 1

According to the Scaladocs,

An instance of A <:< B witnesses that A is a subtype of B.
Requiring an implicit argument of the type A <:< B encodes the generalized constraint A <: B.
So, for instance:

  class MySuper

  class MySub extends MySuper 

  def demandsProof[A](implicit a: A <:< MySuper): Unit = ???

demands proof that any A is a subclass of MySuper. We can ensure that this is satisfied without calling it with an argument:

  demandsProof[MySub] // OK
Rob Norris @tpolecat Apr 02 22:26
A <:< B extends A => B and is available implicitly if A <: B
A =:= B extends A => B and is available implicitly if A is the same type as B
This operator comes in handy when I want to enforce at compile time a rule that says one can only invert square matrices [Wikipedia]. My code looks like this:

@typeclass trait Invert[M[_ <: Length, _ <: Length]] {
  @op("inv") def invert[A <: Length, B <: Length](x: M[A, B])(implicit a: A =:= B): M[A, A]
}

(Note the use of Simulacrum to automatically generate type classes.)

Adam Rosien describes the difference between A <:< B and B >: A at InnerProduct:
There is an analogous implicitly passed value for subtype bounds, but the subtype bound is not syntactic sugar for it. (I thought it was.) That is, the type signature

def widen[A, B >: A](fa: F[A]): F[B] 
is equivalent to the type signature

def widen[A, B](fa: F[A])(implicit ev: A <:< B): F[B] 
but the former is not converted to the latter by the compiler, as is the case for context bounds. [my emphasis]
We can prove this by decompiling the Java bytecode with javap.  The first gives:

  public <A, B> F widen(F);

and the second, context-bound example:

  public <A, B> F widen(F, scala.Predef$$less$colon$less<A, B>);

This highlights a useful equivalence:
Containers of a subtype can be transformed into containers of its supertype, if you can map over the container. The usual defintion of covariance emphasizes subtypes, but the ability to map is a more general, and useful, definition.
Anyway, why would you use one over the other? See the source code for Scala's Option:

sealed trait Option[+A] {
  // def flatten[B, A <: Option[B]]: Option[B] = ...
  // won't work, since the A in flatten shadows the class-scoped A.

  def flatten[B](implicit ev: A <:< Option[B]): Option[B]
    = if(isEmpty) None else ev(get)
  // Because (A <:< Option[B]) <: (A => Option[B]), ev can be called to turn the
  // A from get into an Option[B], and because ev is implicit, that call can be
  // left out and inserted automatically.
}