Saturday, November 26, 2022

MLOps: debugging a pipeline

The domain

Healthcare in England is broken down into about 40 regions. For each region, we want to measure the differences in clinical outcomes conditioned on the ethnic and socioeconomic categories of the patients. To do this, we feed the data for each health region into a Spark GLM.

The problem

Everything was fine with our pipeline for six months before it started to blow up with:

Caused by: org.apache.spark.SparkException: Failed to execute user defined function(GeneralizedLinearRegressionModel$$Lambda$4903/0x0000000101ee9840: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>, double) => double)

Now, before we do a deep dive, the first thing to note is that we have a robust suite of tests that use synthetic data and they are all passing. 

Secondly, the code that was blowing up was used by five other data sets and they were all working fine in production.

If the code seems OK but one path through the ML pipeline was blowing up in code common to other paths, what does this suggest? Well, if it's not the code, there must be something suspicious about the data, right? The tests use synthetic data so of course they would pass.

The investigation

The first course of action when debugging is to take a good, long stare at the error. This might be obvious but many devs pay insufficient attention to it as it's generally a hundred lines of hard-to-read stack trace. This is like a detective who disregards the crime scene because there's too much evidence to collect. 

Anyway, our murder scene was full of Scala and Python stack traces but if we persevere, we find the line that was triggering the error was a call to Dataframe.collect(). This is generally suspicious but on this occasion, we knew we were dealing with a very small data set so this seemed safe. Indeed there were no OOMEs which is the most common problem with calls to collect()

But remember Spark is lazily evaluated. It could be something deeper in the stack that is the root cause. So, navigating a few stack frames previous, we see some one-hot encoding of ethnic groups. Hmm, what can go wrong with one-hot encoding? Well, one potential gotcha is when there is only one category, an exception will be raised.

However, this seemed unlikely. We break down ethnicities into only five groups and there are over a million people in each health region. It would be extraordinarily unlikely if there were a region that only had patients of a single ethnicity. 

Time to look at the data.

Any region with such homogenous patient data probably has very little data to begin with so lets count the number of rows per region. And bingo! there it is: a region called null that has a single (white) patient. This was a recent development in the data being fed into the model which explained why things had worked so well for so long.

The offending row comes from upstream data sets curated by a different department entirely so we're still considering what to do. For now, we could apply a band-aid and filter out any regions called null or better still, any region with fewer than a few thousand patients (as otherwise we're likely to get single cohorts).

One model to rule them?

At the end of the day, the code, the model and the data need to be considered holistically. For instance, which data sets you feed into a model must be evaluated beforehand. 

As an example, we also condition on age bands in this particular GLM model so if we were to feed neonatal or paediatric data into the model it would blow up as all patients would fall into the 0-18 age band. Obvious when you think about it but perhaps surprising if you've inherited somebody else's code.

Saturday, November 12, 2022

Architectural patterns

Some architectural terms (old and new) that I keep bumping into.

Eventual Consistency
"Eventual consistency — also called optimistic replication — is a consistency model used in distributed computing to achieve high availability that informally guarantees that, if no new updates are made to a given data item, ultimately all accesses to that item will return the last updated value.  Eventually-consistent services are often classified as providing BASE semantics (basically-available, soft-state, eventual consistency), in contrast to traditional ACID ... Another great model that can be directly implemented in the application layer is strong eventual consistency (SEC), which can be achieved via conflict-free replicated data types (CRDT), giving us the missing safety property of eventual consistency.  

"Event-driven applications usually favor eventual consistency, for the most part. However, we could also opt-in for strong consistency in particular system areas. Thus, it is fair to say we can combine both consistency models depending on our use case." - Functional Event-Driven Architecture, Volpe

The impacts of consistency on Microservices

Microservices should ideally be totally independent. For example, in a highway  management system, the weather service is totally orthoganol to the roadworks service even though both have an impact on congestion. However, microservices in the real world often have soft dependencies. As a result, "in a microservices world, we don’t have the luxury of relying on a single strongly consistent database. In that world, inconsistency is a given." [James Roper]

Hugo Oliviera Rocha outlines some antipatterns here. The first is "events as simple notifications. The source system publishes an event notifying the consumers that something changed in its domain. Then the consumers will request additional information to the source system... The main issue and the main reason why this option should be seldom used is when you apply it to a larger scale.

"[I]nstead of requesting the source system for additional information, it is possible to save the data internally as a materialized read model... The main issue isn’t the disk space, it is the initialization, maintenance, and keeping that data accurate."

He says event sourcing is just a band aid and suggests using fat (ie, denormalised) messages. The downside it they can be chunky.

CRDT
"To implement eventually consistent counting correctly, you need to make use of structures called conflict-free replicated data types (commonly referred to as CRDTs). There are a number of CRDTs for a variety of values and operations: sets that support only addition, sets that support addition and removal, numbers that support increments, numbers that support increments and decrements, and so forth." - Big Data, Nathan Marz

To a functional programmer, this looks a lot like semigroups and reducing.

Data Mesh
"Unlike traditional monolithic data infrastructures that handle the consumption, storage, transformation, and output of data in one central data lake, a data mesh supports distributed, domain-specific data consumers and views “data-as-a-product,” with each domain handling their own data pipelines. The tissue connecting these domains and their associated data assets is a universal interoperability layer that applies the same syntax and data standards." [TowardsDataScience]

"Data Mesh is a journey so you cannot implement Data Mesh per-se, you need to adopt the principles and start to make incremental changes." Adidas's journey [Medium]. Of the seven points given, two (decentralization and self-service) are the antithesis of ontologies.

Batch Views
"The batch views are like denormalized tables in that one piece of data from the master dataset may get indexed into many batch views. The key difference is that the batch views are defined as functions on the master dataset. Accordingly, there is no need to update a batch view because it will be continually rebuilt from the master dataset. This has the additional benefit that the batch views and master dataset will never be out of sync."  Big Data, Nathan Marz

Saga Pattern
"The Saga Pattern is as microservices architectural pattern to implement a transaction that spans multiple services. A saga is a sequence of local transactions. Each service in a saga performs its own transaction and publishes an event. The other services listen to that event and perform the next local transaction" [DZone]

Example in Cats here.

Type 1 and 2 data evolution
Slowly changing dimensions [Wikipedia] is a "concept that was introduced by in  Kimball and Ross in The Data Warehouse Toolkit."  A strategy could be that the data source "tracks historical data by creating multiple records. This is called a type 2 dimension." [The Enterprise Big Data Lake - Gorelik].  

Type 1 is overwritting a row's data as opposed to type that adds a new row.

Data Marts
Definitions for data marts tend to be a bit wooly but the best I heard was from a colleague who defined it as "data structured for use cases and particularly queries."

Data Marts tend to use type 2 dimensions (see above). 

Hexagon Architecture
Hexagon a.k.a Onion a.k.a Ports and Adapters "give us patterns on how to separate our domain from the ugliness of implementation." [Scala Pet Store on GitHub] This is an old pattern, as anybody who has written microservices will know, but the name was new to me.  The idea is that there are many faces the app shows the outside world for means of communication but the kernel inside "is blissfully ignorant of the nature of the input device." [Alistair Cockburn] This faciliates testing and reduces cognitive overhead that comes from having business logic scattered over many tiers and codebases.

Microservices
This is a huge area but here are some miscellaneous notes.

Before you jump on board with the Java based Lagom, it's worth noting that Martin Fowler wrote "Don't start with microservices – monoliths are your friend". This provoked a whole debate here. It's all worth reading but the comment that stuck out for me was:
"Former Netflix engineer and manager here. My advice:
Start a greenfield project using what you know ... Microservices is more often an organization hack than a scaling hack. Refactor to separate microservices when either: 1) the team is growing and needs to split into multiple teams, or 2) high traffic forces you to scale horizontally. #1 is more likely to happen first. At 35-50 people a common limiting factor is coordination between engineers. A set of teams with each team developing 1 or more services is a great way to keep all teams unblocked because each team can deploy separately. You can also partition the business complexity into those separate teams to further reduce the coordination burden."
A fine example of Conway's Law.

Builds in large organisations

Interestingly, Facebook report Git not being scalable. Meanwhile, Google uses Bazel which is supposed to be polyglot and very scalable.

Strangler Pattern
This is one of those obvious patterns that I never knew had a name.

"The Strangler pattern is one in which an “old” system is put behind an intermediary facade. Then, over time external replacement services for the old system are added behind the facade... Behind the scenes, services within the old system are refactored into a new set of services." [RedHat]

Downsides can be the maintenance effort.

Medallion architecture

This [DataBricks] divides data sets into bronze (raw), silver (cleaned) and gold (application-ready).

(GitHub) Action stations

Here are some notes I made on learning GitHub Actions:

There are some implicit environment variables. For instance, GITHUB_ENV (docs) is a temporary file that can hold environment variables like this:

          echo "ENVIRONMENT=develop" >> $GITHUB_ENV

This only appears to take an effect in the next run block.

In addition to these, there are contexts, which are "a way to access information about workflow runs, runner environments, jobs, and steps." For instance github.ref that refers to "the branch or tag ref that triggered the workflow run" (docs) and you use it with something like:

        if: endsWith(github.ref, '/develop')

To set up secrets you follow the instructions here. It asks you to go to the Settings tab on GitHub page. If you can't see it, you don't have permission to change them. You can reference these secrests like any other context. For example, to login to AWS:

      - name: Configure AWS Credentials
        uses: aws-actions/configure-aws-credentials@v1
        with:
          aws-access-key-id: '${{ secrets.AWS_ACCESS_KEY_ID }}'
          aws-secret-access-key: '${{ secrets.AWS_SECRET_ACCESS_KEY }}'
          aws-region: eu-west-2


Where aws-actions/configure-aws-credentials@v1 (and its ilk) are plugins to facilitate access to third party tools.

Contexts can also reference the output of actions. For example:

      - name: Login to Amazon ECR
        id: login-ecr
        uses: aws-actions/amazon-ecr-login@v1
      - name: 'Build, tag, and push image to Amazon ECR'
        env:
          ECR_REGISTRY: '${{ steps.login-ecr.outputs.registry }}'


Where login-ecr is an arbitrary ID but outputs.registry is part of the action's data structure.

Saturday, November 5, 2022

The semantics of Streaming

A clever but non-technical colleague asked why a batch system could not simply stream its data. The reason this is a big ask is that the semantics of batch and streaming are different, no matter how we try to pretend they are not. 

A file has a notion of completeness, it has an end. A stream does not neccesarily. You might like to send a message in a stream that indicates it has finished but now you impose an ordering constraint that the file did not necessarily have. 

And if you impose a constraint on order, you can no longer parallelize reading the stream. Again, no such constraint exists with a file. 

Note that these semantic objections are orthoganol to the argument that streams can be viewed as tables [Confluent]. That argument is merely an abstraction whereas the rest of this post focusses on the real differences between streams and batches.

Size

Using Scala's (2.13) built-in streams, we can create a stream of Fibonacci numbers with:

val fibs: Stream[Int] = 0 #:: fibs.scanLeft(1)(_ + _) // from the docs: `scanLeft` is analogous to `foldLeft`

We can then pretend that this stream is a Seq just like any other.

val seq: Seq[Int] = fibs
println(seq.take(5).mkString(", ")) // 0, 1, 1, 2, 3

But what kind of Seq never terminates when you call on it a simple .size?

Aside from the fact that Seq is generally frowned upon (it makes no performance guarantees unlike Vector and List; Cats incidentally eschews its use and you can't do things like call sequence on it), we can't pretend that potentially infinite streams are the same as strictly finite sequences.

Empty Streams

... present problems. Paul Snively on the FS2 chat said:
I don't know if it matters, but keep in mind that the types of Stream.empty and Stream.emits(List.empty[A]) are not the same.
You can see in the REPL that this is true:

scala> Stream.emits(List.empty[String])
val res0: fs2.Stream[[x] =>> fs2.Pure[x], String] = Stream(..)
scala> Stream.empty
val res1: fs2.Stream[fs2.Pure, fs2.INothing] = Stream(..)

Things are even worse if you try to "run" the stream:

scala> Stream.emits(List.empty[String]).repeat(10)

This just hangs while also using an entire core. So does this:

scala> Stream.empty.repeat(10)

Effectful streams
Lucas Kasser @lkasser1 Jul 03 06:22
If I have a Stream[IO, A], is there a way to access the individual IOs? I'd like to be able to get a Stream[IO, IO[A]] so that I can retry individual elements in the stream.
I've looked through the docs, but I didn't see any function like uneval

Fabio Labella @SystemFw Jul 03 09:58
No, it's not possible because a Stream is not just a List of IOs
it's monadic, so it's more like a tree (some of the IOs depends on the result of previous ones)
Complete vs Incomplete Data

Some ciphers (for instance, RSA) need the whole data to de/encrypt. "Some modes of operation can make block ciphers [like AES] act as stream ciphers." [SO] This differs from a true streaming cipher like (ChaCha20) but by using Chunks, we can simulate it.

Grouping & Streaming in Spark
"Developing the translation layer (called runner) from Apache Beam to Apache Spark we faced an issue with the Spark Structured Streaming framework: the problem is that this framework does not support more than one aggregation in a streaming pipeline. For example, you cannot do a group by then a reduce by in a streaming pipeline. There is an open ticket in the Spark project, an ongoing design and an ongoing PR, but, as for now, they received no update since the summer 2019. As a consequence, the Beam runner based on this framework is on hold waiting for this feature from the Spark project." [Etienne Chauchot's blog]
Basically, if there are two grouping operations, op1 and op2, the grouping in op1 might make the datra to be fed into op2 out-of-date. It might have gone stale while it was living in op1's buffer.
"[S]treaming systems define the notion of watermark. It is what gives the system the notion of completeness of data in a constant flow of streaming data. It is the point in time when the system should not receive older elements. As streaming systems rely on windowing to divide this stream of data, the watermark can also be defined as the system notion of when all the data in a certain window can be expected to have arrived in the streaming pipeline. When the watermark passes the end of the window, the system outputs data." [ibid]