Sunday, June 25, 2023

Diagnosing K8s Installations

Well, this was much harder than I thought. I tried to update Minikube and my local Kubernetes installation.

First, I installed everything I thought I needed:

sudo apt-get install -y kubelet kubeadm kubectl kubernetes-cni

and then download and install Minikube per the instructions. Unfortunately, when I did, I'd see it complain that possibly "the kubelet is not running".

Running:

systemctl status kubelet

showed that kubelet was exiting with error code 1.

Running:

journalctl -fu kubelet

showed it puking stack traces but this line is the salient one:

... failed to run Kubelet: running with swap on is not supported, please disable swap! or set --fail-swap-on flag to false...

But where do I put it? As it happened, I had to add --fail-swap-on=false to the ExecStart line in /etc/systemd/system/kubelet.service.d/10-kubeadm.conf (I found this by grepping ExecStart in /etc/). You then run:

sudo systemctl daemon-reload

to have changes recognised. Then, it's a matter of configuring Kubernetes system wide:

sudo kubeadm init --ignore-preflight-errors=Swap

(I needed to ignore the fact that my system has Swap space as I'm only working on proof of concepts, not running a K8s cluster in production. Using or not using swap space is nuanced - see here for more information. Basically, what is best depends on the situation. TL;DR; it can speed things up but also delay the inevitable death of a pathological system). 

You can run kubeadm reset if you foul things up.

Also, I had to rm -rf $HOME/.kube and $HOME/.minikube since kubectl config view showed me a profile that was literally years out of data. The .kube config can be regenerated with:

mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config


(lines that kubeadm kindly told me to run). Now, running kubectl get all gives me a sensible output.

The story doesn't quite end there. After a few hibernations of my laptop, minikube was failing to start again with (apparently) certificate errors. This issue helped me: I executed minikube delete && minikube start and all was well with the World once more.

I've got a fever, and the only prescription is MOR COW (bell)

Apache Iceberg deals with updates in two different ways:

  • Merge on read
  • Copy on write

What are these strategies? To illustrate, I've some BDD code that writes and updates 20 rows of data using both MOR and COW. (There's an interesting article at Dremio about this topic but some of the code is Dremio specific).

Simply put, copy-on-write replaces the entire parquet file if a single row in updated. 

In merge-on-read, a file containing only the updated data (and a file saying which row was affected) are written. It is the reader that needs to reconcile the data.

The two strategies are for different audiences. COW makes writes slow and reads fast. MOR makes reads slow and writes fast.

Focussing on the more complex of the two strategies, MOR, we see that it creates four new files compared to COW's two. This maps to two different parquet files (as each parquet file in both strategies has a .crc file that holds its metadata). One file contains the updated data, the other a textual reference to the original file containing the original data. No files are deleted and the original parquet file remains the same. No data is redundant.

Iceberg has other means of storing the delete data other than position in the file. It also can check equality. However, this appears to be not supported by Spark at the moment.

(BTW, the title of this post refers to this SNL sketch).

Monday, June 19, 2023

Data Contracts

What they are? "Data Contracts are first and foremost a cultural change toward data-centric collaboration" [Chad Sanderson's blog and here]

Examples of why they're necessary

A company wants to find the average of a consumer for a particular product. They find the age is 42. They are somewhat suprised by this as it's older than they expected so they check their workings - add all the ages and divide by the number of customers who bought it. After confirming the maths, the value is indeed 42 and they report it to their boss. Unfortunately, the mean age was artificially inflated because a subset of customers had an age of '999' because the system that captured that data used it as a placeholder for 'unknown'.

The next example actually happened. We were measuring average length of stay (LoS) in hospitals. When sampling the data, everything looked fine. But out of the millions of patients, a very small number (~30) had a discharge date of 1/1/1900. Clearly, the system that captured that data used this value as a token for 'unknown'. This erroneously reduced the overall LoS. The data bug was only caught when drilling down into individual hospitals, some average LoS figures were negative. Until then, we were merrily reporting the wrong national figure.

Is it a purely cultural problem?

The trouble about cultural solutions is that they depend on unreliable units called "humans". For instance, a friend of mine was the victim of an upstream, breaking change originating in the Zurich office. When he contacted the team, they were unaware of both him, his London team and the fact they were consumers of this data.

I asked on the Apache dev mailing lists if we could implement a more robust, technical solution for Spark. Watch this space for developments.

Possible solutions

Andrew Jones (who is writing a book on data contracts) uses JSON Schema to validate his data. "JSON Schema is a powerful tool for validating the structure of JSON data" [JSON Schema docs]. 

Elliot West of Dreamio (see the mailing list traffic) also favours JSON Schema. However, because JSON has only a few data types (strings, arrays, numericals etc) it's not rich enough to enforce constraints like "date X must be before date Y".

Implementations

This is a new area of development but Databrick's Delta Live Tables (DLT) claims it can “prevent bad data from flowing into tables through validation and integrity checks and avoid data quality errors with predefined error policies (fail, drop, alert or quarantine data).”
Unfortunately, it seems to be Python-only: “Can I use Scala or Java libraries in a Delta Live Tables pipeline? No, Delta Live Tables supports only SQL and Python. You cannot use JVM libraries in a pipeline. Installing JVM libraries will cause unpredictable behavior, and may break with future Delta Live Tables releases.” [docs]

Friday, June 9, 2023

Modern DevOps Tools

Some tools that I've had too little time to investigate thoroughly.

TestContainers
The free and open source TestContainers offers huge convenience to developers. For instance, you can fire up a very lightweight Postgres container in just a second or two. This ZIO SQL test (AgregatingSpec) ran in just 3.85s on my laptop. In that time, it started a Docker container, populated the Postgres database in it with test data, ran some Scala code against it then tore down the container. The container can last as long as the JVM so all your tests can use it before it detects the JVM is exiting whereupon it will kill the container.

MinIO
If you need to run S3 API compatible storage locally, you can try MinIO. It's written in Go and open source and allows you to have a local Docker container emulating Amazon storage. 

DuckDB
This open source, C++ application allows you to run SQL against Parquet files without having to fire up a whole platform. You can even run DBeaver against it.

Crossplane
Crossplane is an open source Go project that "connects your Kubernetes cluster to external, non-Kubernetes resources, and allows platform teams to build custom Kubernetes APIs to consume those resources." [docs]

Scala Native
You can now convert Scala code to stand alone executable binaries using Scala Native [baeldung]. It currently only works with single threaded applications. The output can be converted to WebAssembly...

WebAssembly
Wikipedia describes WebAssembly as "a portable binary-code format and a corresponding text format for executable programs ... for facilitating interactions between such programs and their host environment." It is an "open standard and aims to support any language on any operating system".

Tapir
Is a type-safe, Scala library that documents HTTP endpoints.

GraphQL
GraphQL is a type system, query language, etc accessible through a single endpoint that only returns what is asked of it and no surplus information. It's a spec and there are implementations in a number of languages. The graph bit comes in insofar a "query is a path in the graph, going from the root type to its subtypes until we reach scalar types with no subfields." [Bogdan Nedelcu]

LLVM
LLVM is an open source tool chain written in C++. The 'VM' in LLVM originally stood for Virtual Machine but these days but this is no longer the case. Instead of being a virtual machine, it turns any major language into a common intermediate code that can then be turned to machine code. 

GraalVM
GraalVM is an open source JDK and JRE written in Java itself and has its roots in project Maxine. But it's more than that. It offers compilation to native code as well as supporting polyglot code via its Truffle framework, a language-agnostic AST.

Quarkus
Based on GraalVM (above), Quarkus is an open source Java Framework tailored for Kubernetes. Since the JVM code is natively compiled, startup and memory sizes are small.

Spring Boot
Is an "opinionated" Java framework that favours convention-over-configuration and runs Spring apps with the minimum of fuss.

Python/Java Interop
Together, Python and Java both dominate the data engineering landscape. These languages can interoperate via Py4J which uses sockets to allow Python to invoke Java code and Jython which runs Python code wholely inside the JVM. Py4J is used extensively in Spark to allow PySpark devs to talk to Spark JVMs.
Jython, unfortunately, does not support Python 3.

Project Nessie
Nessie is an open source, JVM project that promises to do to big data what Git did to code: versioning, branching etc. It apparently sits nicely on top of Iceberg and DataBricks.

The lakeFS project is a open source, Go project that offers similar functionality.

Cloud native CI/CD
Tekton that is written in GoLang.
Argo is a Python based, Kubernetes native tool. For instance, it handles rolling deployments building on K8's RollingUpdate strategy which does not natively control traffic flow during an update. 
CircleCI seems to be mostly closed source.

Pipelines
Interestingly, CI/CD and data pipelines both use directed acycliclic graphs but with very different intent. User Han on Discord eloquently spelled out the architectural distinction:

Specifically the reason is in batch data [Pipeline] processing, we tend to scale things out horizontally by a whole lot, sometimes using GPUs. This is not a common feature supported by CI/CD workflow tools. In summary: 

Jenkins, CodePipeline, Github Actions, TeamCity, Argo, etc ==> used to build DAGs for CI/CD, tends to have shorter run time, less compute requirement, and fairly linear in dependencies.

Airflow, Dagster, Prefect, Flyte, etc ==> used to build data and/or machine learning pipelines. It tend to have longer run time, larger horizontal scaling needs, and sometimes complex dependencies.  Data pipelines also sometimes have certain needs, e.g., backfilling, resume, rerun, parameterization, etc that's not common in CI/CD pipelines

Interestingly, Luigi looks like it's dead as even its creator says on Twitter.

Istio
Istio is an open source GoLang project that transparently provides "a uniform way to integrate microservices, manage traffic flow across microservices, enforce policies and aggregate telemetry data."

Wednesday, June 7, 2023

SBT cheat sheet

To see the dependency tree, run:

sbt compile:dependencyTree

(see the answer of 13/12/21 on this SO question)

You might want to tell SBT to use wider margins if it truncatest the tree. Do that with [SO]:

val defaultWidth = 40
val maxColumn = math.max(JLine.usingTerminal(_.getWidth), defaultWidth) - 8

To coax a dependency from Scala 2 to 3, use something like this:

  libraryDependencies ++= List( ("io.laserdisc" %% "fs2-aws-s3" % "5.0.2").withCrossVersion(CrossVersion.for3Use2_13) )

To use the classpath of a particular module, use something like this [see SO]:

sbt "project core" console

In this particular case, we're opening a REPL with the classpath of a given module.

Use versionScheme (SBT docs) if you're writing libraries. This hints at how to handle clashing dependencies. Otherwise, you might see errors when depending on other libraries that "can be overridden using libraryDependencySchemes or evictionErrorLevel" [Scala-Lang]

One last thing: SBT tests run in parallel by default. This can ruin integration tests so you might want to try this workaround [SO] to ensure your tests run serialized.