Saturday, December 23, 2023

Cloud native

A cloud native approach to writing code is that the instance in which it lives can die at any time.

"Users sometimes explicitly send the SIGKILL signal to a process using kill -KILL or kill -9. However, this is generally a mistak. A well-designed application will have a handler for SIGTERM that causes the application to exit gracefully, cleaning up temporary files and realeasing other resources beforehand. Killing a process with SIGKILL bypasses the SIGTERM handler." - The Linux Programming Interface (Micahel Kerrisk)
Using docker stop sends SIGTERM.
Using docker kill sends SIGKILL.

The latter does not give the JVM a chance to clean up. In fact, no process in any language has the chance to clean up with SIGKILL. (SIGTERM on any thread - not just main - causes the whole JVM process to end and shutdown hooks to execute.) 

A Tini problem...

If the JVM process creates another process is killed with SIGKILL, that process carries on living but its parent becomes (on Ubuntu 20.04.6 LTS) systemd which in turn is owned by init (PID 1).

Running your JVM directly in a Docker container has some issues. This revolves around Linux treating PID 1 as special. And the ENTRYPOINT for any Docker container is PID 1.

In Linux, PID 1 should be init. On my Linux machine, I see:

$ ps -ef | head -2
UID        PID  PPID  C STIME TTY          TIME CMD
root         1     0  0 Oct21 ?        00:18:23 /sbin/init splash

This process serves a special purpose. It handles SIGnals and zombie processes. Java is not built with that in mind so it's best to bootstrap it with a small process called tini. There's a good discussion why this is important here on GitHub. Basically, Tini will forward the signal that killed the JVM onto any zombies that are left behind. This gives them the chance to clean up too. 

It also passes the JVM's exit code on so we can know how it failed. Exit codes 0-127 are reserved [SO] and the value of the kill (kill -l lists them) is added to 128. If you want to set the exit code in the shutdown hook, note you need to call Runtime.halt rather than Runtime.exit (to which System.exit delegates). The exit method will cause the JVM to hang in this situation [SO].

Tuesday, December 12, 2023

ML and Logs (pt2)

Further to my attempt to use machine learning to make sense of huge amounts of logs, I've been looking at the results. My PoC can:

Find log entries with the highest information

When debugging my Kafka cluster, these lines had the highest average entropy:

kafka1: 2023-07-04 14:14:18,861 [RaftManager id=1] Connection to node 3 (kafka3/172.31.0.4:9098) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

kafka1: 2023-07-04 14:17:32,605 [RaftManager id=1] Connection to node 2 (kafka2/172.31.0.3:9098) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

kafka2: 2023-07-04 14:17:31,957 [TransactionCoordinator id=2] Connection to node 3 (localhost/127.0.0.1:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

kafka1: 2023-07-04 14:17:32,605 [RaftManager id=1] Node 2 disconnected. (org.apache.kafka.clients.NetworkClient)

kafka2: 2023-07-04 14:17:31,957 [TransactionCoordinator id=2] Node 3 disconnected. (org.apache.kafka.clients.NetworkClient)

As it happened, this correctly highlighted my problem (Docker Compose networking was misconfigured). But I don't know if I got lucky.

Bucket similar-but-different lines

Using the same algorithm as Twitter, we can bucket similar but lexically different lines, for example:

2023-07-04 14:14:21,480 [QuorumController id=3] ConfigResource(type=TOPIC, name='__consumer_offsets'): set configuration cleanup.policy to compact (org.apache.kafka.controller.ConfigurationControlManager)

2023-07-04 14:14:21,489 [QuorumController id=3] ConfigResource(type=TOPIC, name='__consumer_offsets'): set configuration compression.type to producer (org.apache.kafka.controller.ConfigurationControlManager)

This means that we can:

    • discard boilerplate lines of little value like those above
    • check the distribution of all nodes in a given bucket (for example, if one node is under-represented within a bucket - that is, not logging the same as its peers - this might be an issue).
There's one slight gotcha here: in the Kafka example above, we're using the Raft protocol so it's not too surprising that the number of nodes is N-1 for some configurations as one has been elected leader and the others are followers.

Trace high information tokens through the system

Words with high entropy can be traced across my cluster. For instance, my PoC classified wUi1RthMRPabI8rHS_Snig as possessing high information. This happens to be an internal Kafka UUID for a topic and tracing its occurrence through the logs show that despite Docker network issues, all nodes agreed on the topic ID as did the client. So, clearly some communication was happening despite the misconfiguration.

Investigation

I finally solved my Kafka problem. The Kafka client was running on the host OS and could see the individual Kafka containers but these brokers could not talk to each other. The reason was they needed to advertise themselves both as localhost (for the sake of the Kafka client that lives outside Docker) and also using their internal names (so they could talk within the Docker network).

My PoC could not tell me exactly what the problem was but it successfully highlighted the suspects.

The PoC

So, how does the PoC work? For the entropy, we train the model on a dictionary of English words so it can learn what is a "normal" word, rather than say wUi1RthMRPabI8rHS_Snig. We disregard lines that are fewer than 6 words (including the FQN of the classes - each package being one word); take the average entropy and present the lines that look the most informative.

For the LSH, we use one-hot encoding of word shingles to create our vectors.

Future plans

I'd like to show the graph of paths the high-entropy words take through the system (node and log line).

I'd also like to try other systems. Maybe I got lucky with Kafka as there are lovely, high-entropy UUID scattered throughout the logs (for example, consumer group IDs).

Thirdly, this PoC has been great for small amounts of data, but what about big data? It really needs to be rewritten in a JVM language and made to run in Spark.

Thursday, November 30, 2023

Memories are made of these

Some notes on new memory models I've been looking at recently.

Zero copy

"Device controllers cannot do DMA directly into user space, but the same effect is achievable by exploiting ... [the fact] more than one virtual address can refer to the same physical memory location. [Thus] the DMA hardware (which can access only physical memory addresses) can fill a buffer that is simultaneously visible to both the kernel and a user space process." - Java NIO, Ron Hitchens

Virtual memory paging is "often referred to as swapping, though true swapping is done at the process level, not the page level" [ibid].

An excellent visual representation of what's going on in during a zero-copy is here from Stanislav Kozlovski (who ends with the knock-out punch that is makes very little difference to Kafka since generally costs of network IO and encryption cancel any savings). Anyway, the take-away points are: 

  • Zero-copy "doesn’t actually mean you make literally zero copies" it's just that it "does not make unnecessary copies of the data."
  • Fewer context switches happen.
  • A further optimization to DMA is where the disk "read buffer directly copies data to the NIC buffer - not to the socket buffer.  This is the so-called scatter-gather operation (a.k.a Vectorized I/O).  [It is] the act of only storing read buffer pointers in the socket buffer, and having the DMA engine read those addresses directly from memory."

Java's new vector API

A new way of dealing with vectors is outlined at JEP426 (Vector API). It leverages new CPU features like Advanced Vector Extensions [Wikipedia] that provide new machine instructions to execute Single Instructions on Multiple Data (SIMD).  

Martin Stypinski has an interseting article that shows adding two floating point vectors together gain very little from the new API but a linear equation like y = mx + c (which has obvious applications to machine learning) can improve performance by an order of magnitude.

Project Panama

Project Panama deals with interconnecting the JVM with native code. Oracle's Gary Frost talks about this in his presentation on accessing the GPU from Java. The difficulty he encountered was allocating heap memory and passing it to the GPU. Unfortunately, the garbage collector might reorganise the heap making the pointer to that memory obsolete. With Project Panama, this would not happen as the allocation would be through the JVM but off the heap. 

Apache Arrow

Arrow provides an agreed memory format for data so you can "share data across languages and processes." [docs]

This differs from Google's Protobuf in that "Protobuf is designed to create a common on the wire or disk format for data." [SO] Any data from Protobuf that is deserialized will be done in the the same way that language always handles it.

This inter-process ability allows Spark (which runs in the JVM) to use Pandas (which runs in a Python process).

"Perhaps the single biggest memory management problem with pandas is the requirement that data must be loaded completely into RAM to be processed... Arrow serialization design provides a “data header” which describes the exact locations and sizes of all the memory buffers for all the columns in a table. This means you can memory map huge, bigger-than-RAM datasets and evaluate pandas-style algorithms on them in-place without loading them into memory like you have to with pandas now. You could read 1 megabyte from the middle of a 1 terabyte table, and you only pay the cost of performing those random reads totalling 1 megabyte... Arrow’s memory-mapping capability also allows multiple processes to work with the same large dataset without moving it or copying it in any way. "[10 Things I hate about Pandas, by Pandas author, Wes McKinny]

"The ability to memory map files allows you to treat file data on disk as if it was in memory. This exploits the virtual memory capabilities of the operating system to dynamically cache file content without committing memory resources to hold a copy of the file." [NIO - Hitchens].

MySQL vs Postgres

There's a great comparison between the two major open source DBs here at Uber. Amongst the many insights, there is a mention that MySQL uses a cache "logically similar to the Linux page cache but implemented in userspace... It results in fewer context switches. Data accessed via the InnoDB buffer pool doesn’t require any user/kernel context switches. The worst case behavior is the occurrence of a TLB [Translation Lookaside Buffer] miss, which is relatively cheap and can be minimized by using huge pages."

"On systems that have large amounds of memory and where applications require large blocks of memory, using huge pages reduces the number of entries required in the hardware memory management unit's translation look-aside buffer (TLB). This is beneficial because entries in the TLB are usually a scarce resource... For example, x86-32 allows 4mb pages as an alternative to 4kb pages)" [The Linux Programming Interface]

Thursday, November 9, 2023

Z-order

Z-ordering is an optimization technique in big data that allows faster access since similar data lives together. We discuss the algorithm that defines what is similar here. 

Imagine a logical grid where all the values of one column run across the top and all the values from another run down the side. If we were to sort this data, every datum can be placed somewhere in that grid.

Now, if the squares of the grid were mapped to files and all the data in each cell were to live in those files, we have made searching much easier as we now know the subset of files in which it may live. We've essentially sorted in not just one dimension but two (although we can do higher).

This can be especially useful when we want to sort the data but don't know exactly what to sort on - a common connundrum when dealing with events. Say we have event_time, insert_time and update_time. Which do we choose? We could sort the data three times, each time on one column but this is impractical with huge data sets. Enter z-order.

Note that the Z-Order really needs 2 or more columns on which to act. Only one column is the degenerate case. "Zorder/Hilbert etc on a single dimension are just a hierarchal sort" [Russell Spitzer on Slack].

(This is a good article about z-ordering from the perspective of Apache Iceberg.)

For an example in Delta Lake, we can see this code that creates a data set with columns c1, c2 and c3 whose values are [x, 99-x, x+50 mod 100] for x [0, 99]. After z-ordering it, these numbers are split into 4 different files. Generating a graphic illustrates how the data points are distributed over those files:

The idea behind how we calculate which cell a datum falls into is best described here on Wikipedia. But, in brief, the binary representation of the data points is interleaved to give a z-value per tuple. In our example, I see a [0, 99, 50] mapped to the byte array [0, 0, 0, 0, 0, 0, 0, 0, 0, 9, -112, 26].

I took a look at the Delta Lake code here where a Spark Column object is created that wraps a DL InterleaveBits type which in turn is a subclass of Spark's Catalyst Expression type. This executes on Spark's InternalRow, that is, the raw data on the executors.

The reason the code is doing this is to add a column with which we can repartition the data with the SQL CAST(interleavebits(rangepartitionid(c1), rangepartitionid(c2), rangepartitionid(c3)) AS STRING). The rangepartitionid keyword is part of the Delta Lake machinery.

Using this z-order value (plus a random key), the DataFrame the Delta code now calls repartitionByRange which samples the data [SO] and breaks it into discrete ranges.

Given the interleaving of the columns c1c2 and c3 their order has minimal impact on the z-value so it's no surprise to see nearby data clustering into the same files, as we can see in the graphic. In fact, if you look at the DataFrame during the repartition process:

+---+---+---+-------------------------------------------+
| c1| c2| c3|c7b6b480-c678-4686-aa99-283988606159-rpKey1|
+---+---+---+-------------------------------------------+
|  0| 99| 50|                                       \t�|
|  1| 98| 51|                                       \t�|
|  2| 97| 52|                                       \t�b|
|  3| 96| 53|                                       \t�e|
|  4| 95| 54|                                       \b��|
|  5| 94| 55|                                       \b��|
|  6| 93| 56|                                       \b��|
|  7| 92| 57|                                       \b��|
|  8| 91| 58|                                       \b�|
|  9| 90| 59|                                       \b�|
| 10| 89| 60|                                       \b�b|
| 11| 88| 61|                                       \b�e|
| 12| 87| 62|                                       \b��|
| 13| 86| 63|                                       \b��|
| 14| 85| 64|                                       \f)�|
| 15| 84| 65|                                       \f)�|
| 16| 83| 66|                                       \f`|
| 17| 82| 67|                                       \f`|
| 18| 81| 68|                                       \f`b|
| 19| 80| 69|                                       \f`e|
+---+---+---+-------------------------------------------+


you can see the slowly changing values by which things are partitioned (column c7b6b480-c678-4686-aa99-283988606159-rpKey1 -  a random name so it doesn't clash with other column names. It's dropped immediately after the call to repartitionByRange)

Wednesday, October 25, 2023

Java and the GPU

Java was always built to abstract away the hardware on which it runs but it's approach to GPU has been somewhat late to the game [Gary Frost, YouTube].

There are projects out there that promise to give Java access to the GPU. I looked at Aparapi but it appears to be moribud. So, I gravitated to TornadoVM which Frost describes as "state of the art".

The trouble is that TornadoVM runs everything in a Docker image that has all the shared objects built in. This is fine for a quick demo - this is the result of running on my Quadro T2000:

docker-tornadovm$ ./run_nvidia_openjdk.sh tornado -cp example/target/example-1.0-SNAPSHOT.jar example.MatrixMultiplication
Computing MxM of 512x512
CPU Execution: 1.17 GFlops, Total time = 230 ms
GPU Execution: 268.44 GFlops, Total Time = 1 ms
Speedup: 230x

This demonstrates how the GPU runs a nested for-loop doing matrix multiplication much faster than the same code on the CPU. But it runs it all in a Docker container and I need to package a JAR everytime I make a change. How do I run it outside the container?

To work this out, I opened a shell in the Docker image and saw that the TornadoVM build it uses was built from Git branch d3062accc. So, the first thing was to checkout that branch of TornadoVM and build it.

I built with:

mvn clean install -Pgraal-jdk-11-plus

using the graalvm-ee-java11-22.3.4 JDK.

Note that you'll need Graal as the TornadoVM code has dependencies on it. I built my own Graal JDK by following the instructions here but using a different branch as I couldn't find the download for the graal.version defined in the TornadoVM pom.xml. Note, you'll also need mx and a bootstrapping JDK that has the right compiler interface (JVMCI), in my case labsjdk-ce-21.0.1-jvmci-23.1-b19.

So far, so good. I ran the tornado script which is just a wrapper around a call to the java executable (don't forget to set your JAVA_HOME environment variable to point at the Graal JDK) but it complained it could not see a tornado.backend file.

Again, a sneaky look at the Docker container indicated that we have to tell it which driver to use. So, I created the file and told it tornado.backends=opencl-backend but then tornado complained it didn't have the OpenCL drivers. Oops. 

You have to build the drivers you want seperately it seems. But if you try to build Tornado drivers without the native OpenCL dev library, you'll see:

TornadoVM/tornado-drivers/opencl-jni$ mvn clean install # yes, Maven cmake via cmake-maven-plugin
....
/usr/bin/ld: cannot find -lOpenCL
...


The Docker image saves you from having to install the OpenCL libraries on your machine. To get it working on bare metal, I played it safe and got an old Ubuntu box and installed them there. You'll need to install them with:

sudo apt install ocl-icd-opencl-dev

and then ran Maven in the opencl* sub directories. This time, the Maven build completed successfully.

However, running tornado in the subsequent dist folder still pukes but with something like:

Caused by: uk.ac.manchester.tornado.api.exceptions.TornadoRuntimeException: OpenCL JNI Library not found
at tornado.drivers.opencl@0.15.1/uk.ac.manchester.tornado.drivers.opencl.OpenCL.<clinit>(OpenCL.java:68)
... 11 more

Not what I was expecting. I found I needed to:

cp ./tornado-drivers/opencl-jni/target/linux-amd64-release/cmake/libtornado-opencl.so $TORNADO_SDK/lib

Where TORNADO_SDK is pointing at the relevent dist folder.

Now, finally, you can run on the bare metal:

$ tornado -cp target/classes/ example.MatrixMultiplication
Computing MxM of 512x512
CPU Execution: 1.21 GFlops, Total time = 222 ms
GPU Execution: 17.90 GFlops, Total Time = 15 ms
Speedup: 14x

(Results from an old NVIDIA GeForce GTX 650)

Note, you'll need to also run it with the Graal JVM. Set both the PATH and JAVA_HOME environment variables to point to it.

Where now?

This is a nice introduction to running Java on the GPU but it's just the start. There are many caveats. Example: what if your Java code throws an Exception? GPUs have no equivalent of exceptions so what happens then? More to come.

Thursday, October 12, 2023

Dependency hell

In these days of ChatGPT, it's easy to forget that most of the time, a developer isn't actually cutting code at all, but debugging it. This is my own personal hell in getting Spark and Kafka in Docker containers talking to a driver on the host.

Firstly, I was seeing No TypeTag available when my code was trying to use the Spark Encoders. This SO answer helped. Basically, my code is Scala 3 and "Encoders.product[classa] is a Scala 2 thing. This method accepts an implicit TypeTag. There are no TypeTags in Scala 3". Yikes. This is probably one reason the upgrade path in Spark to Scala 3 is proving difficult. The solution I used was to create a SBT sub Project that was entirely Scala 2 and from here I called Spark.

The next problem was seeing my Spark jobs fail with:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6) (172.30.0.7 executor 0): java.lang.ClassCastException: cannot assign instance of scala.collection.generic.DefaultSerializationProxy to field org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition.inputPartitions of type scala.collection.immutable.Seq in instance of org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition

This is a contender for the error message with the greatest misdirection. You think it's a serialization problem but it isn't directly so.

Although other Spark users have reported it, Ryan Blue mentions that it isn't really a Spark issue but a Scala issue.

Anyway, I tried all sorts of things like change my JDK (note the sun.* packages have been removed in later JDKs so you need to follow the advice in this SO answer). I tried creating an uber jar but was thwarted by duped dependencies [SO], Invalid signature file digest errors as some were signed [SO] that forced me to strip the signtures out [SO] but still falling foul of Kafka's DataSourceRegister file being stripped out [SO].

The first step in the right direction came from here and another SO question where the SparkSession is recommended to be built by adding .config("spark.jars", PATHS) where PATHS is a comma delimited string of the full paths of all the JARs you want to use. Surprisingly, this turned out to include Spark JARs themselves, including in my case spark-sql-kafka-0-10_2.13 which oddly does not come as part of the Spark installation. By adding them as spark.jars, they are uploaded into the work subdirectory of a Spark node.

After this, there was just some minor domain name mapping issues to clear up in both the host and container before the whole stack worked without any further errors being puked.

Monday, October 9, 2023

My "What data science can learn from software engineering" presentation

Dr Chris Monit and I presented this presentation at the London MLOps meetup last week. TL;DR: maximize your chances of a successful delivery in data science by adopting best practices that the software industry has established.

"Think of MLOps as the process of automating machine learning using DevOps methodologies" - Practical MLOps (O'Reilly)

Monday, October 2, 2023

Packaging Python

Python build tools are unifying behind a common interface of pyproject.toml.
This and this are great guides. The gist of the former is that you create a TOML file that conforms to a specification then you can use any build tool to run it. The gist of the latter is the whole Python packaging ecosystem.

The salient commands for building and deploying with your TOML file are:

python3 -m build
python3 -m twine upload --repository pypi dist/*


Note, you want to clean your dist directory first.

The Snag

The idea of using any Python build tool is not quite there yet. Poetry only implements a subset of the specification. Also, the specification has a leaky abstraction. On Discord, Prof. Nick Radcliffe explains that the promise of using "any" lead him to naively use setuptools.

Nick Radcliffe — 08/21/2023 2:37 PM

Also, in case anyone is interested (related to packaging, above) I'm currently in the process of packaging a fairly large Python codebase using new-style packaging (pyproject.toml rather than setup.py). It wasn't quite my first use of it, but this project is much more complex. Initially, I chose setuptools as the build backend, since (a) it didn't seem like it should matter much and (b) I didn't think I needed anything special. That was a big mistake for me: it turns out the setuptools back-end ignores almost everything except Python code in building your package. Whereas my package (which has over 10k files) also have about 1,000 non-python files (everything from .txt and .json to shape files, CSV files, and HTML and markdown and all sorts). Some of these are needed for testing (which for some reason some people think don't need to be distributed...as if people shouldn't care about whether the installed software works in situ, rather than just on the developer's machine in the CI system), but others are needed just in the ordinary course of using the software.  setuptools has a way to let you include extra stuff, but it's very manual and would be very error-prone for me. Anyway, the TL;DR is that I switched to Flit as the backend and everything "just worked". Not saying Flit will work better for you; but it sure as hell worked better for me!

Also, the reason I chose flit was that the third bullet in "Why use Flit?" is "Data files within a package directory are automatically included. Missing data files has been a common packaging mistake with other tools."

It also says: "The version number is taken from your package’s version attribute, so that always matches the version that tools like pip see." Which also seems extremely sane (and probably I don't need to do the automatic updating of my pyproject.toml to do that.

Success has many parents...

... but it appears that PyPI packages have only one. Although the authors tag can take a list, adding multiple entries is ignored. The reason is that it's best practise to use a mailing list (see here).

And so my package to facilitate the creation of synthetic data now lives in PyPI much like my Java code is deployed to mvnrepository.

Monday, September 25, 2023

Spark, Kafka and Docker

I want to run a Spark Structured Streaming application that consumes from a Kafka cluster all within Docker. I've finally got it working [messy code here in my GitHub], but it was not without its own pain.

The biggest problem is getting all the components talking to each other. First, you need a bridge network. "In terms of Docker, a bridge network uses a software bridge which allows containers connected to the same bridge network to communicate, while providing isolation from containers which are not connected to that bridge network." [docs]. Think of it as giving your containers their own namespace.

Secondly, the Spark worker needs to connect to Kafka, the Spark master and the Spark driver. The first two are just a matter of mapping the Spark master and Kafka containers in the worker. What's harder is getting the worker to talk to the driver that may be running on the computer that hosts Docker.

One sign you've got it wrong is if you see "Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources" [SO] in your driver logs. This message is a little ambiguous as it may have nothing to do with resources but connectivity. 

Resources seem to be fine

To solve it, you need the driver to define spark.driver.host and spark.driver.port. For the host, we need it to be the magic address of 172.17.0.1. This is the default "IP address of the gateway between the Docker host and the bridge network" [docs]. The port is arbitrary.

[Aside: it's also worth ensuring that the all components are running the exact same version of Spark. I saw a rare error ERROR Inbox: Ignoring error java.lang.AssertionError: assertion failed: CPUs per task should be > 0 and the only thing Google produced was this Bitnami ticket. Ensuring all version were the same made it go away.]

What's more, the worker needs these in its config. You can pass it the host and port with something like SPARK_WORKER_OPTS="-Dspark.driver.host=172.17.0.1 -Dspark.driver.port=SPARK_DRIVER_PORT" in its start up script.

But there is one last gotcha. If still can't get things to work, you might want to login to your worker container and run netstat. If you see the connection to the driver in a state of SYN_SENT, your firewall on the host is probably blocking the connection from the container.

Annoyingly, you probably won't see any error messages being puked from the Driver. It will just hang somwhere near org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:929). I only started seeing error messages when I aligned all version of Spark (see above) and it read: java.io.IOException: Connecting to /172.17.0.1:36909 timed out (120000 ms) 

Looking in that Docker container showed:

bash-5.0# netstat -nap
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name   
...
tcp        0      1 192.168.192.7:53040     172.17.0.1:36909        SYN_SENT    1109/java

and on the host machine where my process is 29006:

(base) henryp@adele:~$ netstat -nap | grep 36909
tcp6       0      0 172.17.0.1:36909        :::*                    LISTEN      29006/java  

Aha, that looks like the problem. It turns out that I have to open the firewall for the block manager too and set a static port for it on the Driver with spark.driver.blockManager.port.

Finally, you should be able to have a Spark master and worker plus Kafka instances all running within Docker along with the driver running on the host using your favourite IDE.

Wednesday, August 30, 2023

Can we apply ML to logging?

Kibana is a Typescript/Javascript product to create visuals of logs. OpenSearch's Dashboards is the Apache licensed fork of this. Kibana is great when you know what you are looking for. But what if you don't?

Example

I have a small Kafka cluster of three nodes using the Raft protocol. I send messages then check a consumer has read all the messages. This integration test passes every time. There are no ERRORs. However, every so often, this test takes over 2 minutes when it should take about 20 seconds.

The number of lines on a good run is 4.5k and on the bad run about 20k. Twenty thousand lines is a lot to go through when you don't know what you're looking for.

I slightly adapted the code here to turn my logs into TF-IDF vectors and used Locality Sensitive Hashing to map them to a lower dimensional space. Now, we can visualise what's going on. 

The good run looks like this:




Note that there are two dominant lines that map to:

[2023-07-04 14:13:10,089] INFO [TransactionCoordinator id=2] Node 1 disconnected. (org.apache.kafka.clients.NetworkClient)
[2023-07-04 14:13:10,089] WARN [TransactionCoordinator id=2] Connection to node 1 (localhost/127.0.0.1:9091) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

repeated over and over for about 10 seconds.

The bad run looks like this:



Here, the dominant lines in the diagram that are from:

[2023-07-04 14:16:21,755] WARN [TransactionCoordinator id=2] Connection to node 1 (localhost/127.0.0.1:9091) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2023-07-04 14:16:21,805] INFO [TransactionCoordinator id=2] Node 1 disconnected. (org.apache.kafka.clients.NetworkClient)
[2023-07-04 14:16:21,805] WARN [TransactionCoordinator id=2] Connection to node 3 (localhost/127.0.0.1:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2023-07-04 14:16:21,805] INFO [TransactionCoordinator id=2] Node 3 disconnected. (org.apache.kafka.clients.NetworkClient)

again being repeated but this time, it lasts for about 2 minutes.

[The code in Ethen Lui's GitHub is really quite clever. Rather than using MinHashing, he's projecting the feature vectors against some randomly generated vectors and making a bit map from it. This can be turned into a single integer which represents the feature's bucket. Note that the number of vectors does not really change the dimensionality of the space but it does change how consistent different runs are - more vectors leads to greater repeatability]

Still at something of a loss, I checked out Bitnami's Kafka instances (here), changed the logging in bitnami/kafka/3.5/debian-11/rootfs/opt/bitnami/scripts/kafka/postunpack.sh by adding the line:

replace_in_file "${KAFKA_CONF_DIR}/log4j.properties" "INFO" "DEBUG"

and built the Docker image again. Now it gives me DEBUG statements.

Fix

The problem of non-determinism is still foxing me but the solution became clear with all these mentions of localhost. We need the client to communicate with the cluster on localhost because the client is unaware that the Kafka instances are hosted in Docker. However, each broker does need to know it's talking to another Docker container as the ports of its peers are not available within its own sandbox. 

The solution was to use slightly different values for the listeners as the advertised listeners (KAFKA_CFG_LISTENERS vs KAFKA_CFG_ADVERTISED_LISTENERS. Note that Bitnami expects environment variables prepended with KAFKA_CFG_ and periods as underscores before it converts them into a Kafka-friendly server.properties file). 

The listeners were of the form OUTSIDE://:9111 while the advertised listeners were of the form OUTSIDE://localhost:9111. The label OUTSIDE apparently is arbitrary. It's just used as a reference, say in listener.security.protocol (in Kafka-speak; munge with the necessary Bitnami mappings to make it appear in server.properties) where you'll see something like OUTSIDE:PLAINTEXT

Conclusion

Although I've fixed the Kafka issue I was facing, applying ML to the logs was only a partial help. I still need to understand the Kafka Raft code better before it can truly be of use.

Thursday, July 6, 2023

Kafka Raft in Docker

These days, you don't need Zookeeper to run a Kafka cluster. Instead, when correctly configured, Kafka uses the Raft algorithm (where "the nodes trust the elected leader"[Wikipedia]) to coordinate itself.

I started to follow Gunnar Morling's blog but it seems his version of Kafka containers have not been updated so I used Bitnami's. However, configuring them to run a Raft cluster proved difficult.

I want to programatically create the cluster rather than use docker-compose as I want greater control over it. So, I wrote this code that talks to Docker via it's API using a Java library. 

Firstly, the Kafka instances couldn't see each other. 

Diagnosing the containers proved difficult as I could not install my favourite Linux tools. When I tried, I was told directory /var/lib/apt/lists/partial is missing. This seems to be deliberate as the Dockerfile explicitly deletes it, to keep images slim. So, I took out that line and added:

RUN apt-get update && apt-get upgrade -y && \
    apt-get clean && apt-get update && \
    apt-get install -y net-tools && \
    apt-get install -y iputils-ping && \
    apt-get install -y  procps && \
    apt-get install -y lsof

then rebuilt the containers. [Aside: use ps -ax to see all the processes in these containers. I was stumped for a while not seeing the Java process that I knew was running].

Using these Linux tools, I could see the containers could not even ping each other. Oops, I need to create a Docker network [SO] and add it to the containers. Now, their logs show that the Kafka containers are at least starting and talking to each other. 

However, the client running on the host machine was puking lots of messages like "Cancelled in-flight API_VERSIONS request with correlation id 1 due to node -1 being disconnected". First, I checked [SO] that the Kafka client library and container were both version 3.  But the consensus on the internet appears to be that this error is due to a connection failure. 

Using netstat on the host showed that the host port was indeed open. But this seemed to be due to Docker opening the port to map it to its container but the container not LISTENing on that port. It appears you can tell Kafka on which port to listen with an environment variable that looks like:

KAFKA_CFG_LISTENERS=PLAINTEXT://:$hostPort,CONTROLLER://:$controllerPort

where hostPort is what you want Docker to map and controllerPort corresponds to what is in the KAFKA_CFG_CONTROLLER_QUORUM_VOTERS environment variable.

The next problem was when my client connects, it cannot see the machine called kafka2. What's happening here is that having connected to the bootstrap, the client is asked to contact another machine, in this case something called kafka2

Now, the JVM running on the host knows nothing about a network that is internal to Docker. To solve this, you could have Docker use the host network (which means that everything running on the machine can see each other - fine for testing but a security nightmare). You could subvert the JVM's DNS mappings (rather than faffing around with a DNS proxy) using BurningWave or Java 18's InetAddressResolverProvider. But perhaps the simplest way is configuring Kafka itself to advertise itself as localhost [Confluent] using the KAFKA_CFG_ADVERTISED_LISTENERS environment variable.

And that was it: a Kafka cluster running on my laptop that was reading and writing messages using the Raft algorithm. There are still a few lose ends: why on some runs a node drops out of the cluster non-deterministically even if the functionality was correct as far as the client was concerned. I'll solve that another day.

Sunday, June 25, 2023

Diagnosing K8s Installations

Well, this was much harder than I thought. I tried to update Minikube and my local Kubernetes installation.

First, I installed everything I thought I needed:

sudo apt-get install -y kubelet kubeadm kubectl kubernetes-cni

and then download and install Minikube per the instructions. Unfortunately, when I did, I'd see it complain that possibly "the kubelet is not running".

Running:

systemctl status kubelet

showed that kubelet was exiting with error code 1.

Running:

journalctl -fu kubelet

showed it puking stack traces but this line is the salient one:

... failed to run Kubelet: running with swap on is not supported, please disable swap! or set --fail-swap-on flag to false...

But where do I put it? As it happened, I had to add --fail-swap-on=false to the ExecStart line in /etc/systemd/system/kubelet.service.d/10-kubeadm.conf (I found this by grepping ExecStart in /etc/). You then run:

sudo systemctl daemon-reload

to have changes recognised. Then, it's a matter of configuring Kubernetes system wide:

sudo kubeadm init --ignore-preflight-errors=Swap

(I needed to ignore the fact that my system has Swap space as I'm only working on proof of concepts, not running a K8s cluster in production. Using or not using swap space is nuanced - see here for more information. Basically, what is best depends on the situation. TL;DR; it can speed things up but also delay the inevitable death of a pathological system). 

You can run kubeadm reset if you foul things up.

Also, I had to rm -rf $HOME/.kube and $HOME/.minikube since kubectl config view showed me a profile that was literally years out of data. The .kube config can be regenerated with:

mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config


(lines that kubeadm kindly told me to run). Now, running kubectl get all gives me a sensible output.

The story doesn't quite end there. After a few hibernations of my laptop, minikube was failing to start again with (apparently) certificate errors. This issue helped me: I executed minikube delete && minikube start and all was well with the World once more.

I've got a fever, and the only prescription is MOR COW (bell)

Apache Iceberg deals with updates in two different ways:

  • Merge on read
  • Copy on write

What are these strategies? To illustrate, I've some BDD code that writes and updates 20 rows of data using both MOR and COW. (There's an interesting article at Dremio about this topic but some of the code is Dremio specific).

Simply put, copy-on-write replaces the entire parquet file if a single row in updated. 

In merge-on-read, a file containing only the updated data (and a file saying which row was affected) are written. It is the reader that needs to reconcile the data.

The two strategies are for different audiences. COW makes writes slow and reads fast. MOR makes reads slow and writes fast.

Focussing on the more complex of the two strategies, MOR, we see that it creates four new files compared to COW's two. This maps to two different parquet files (as each parquet file in both strategies has a .crc file that holds its metadata). One file contains the updated data, the other a textual reference to the original file containing the original data. No files are deleted and the original parquet file remains the same. No data is redundant.

Iceberg has other means of storing the delete data other than position in the file. It also can check equality. However, this appears to be not supported by Spark at the moment.

(BTW, the title of this post refers to this SNL sketch).

Monday, June 19, 2023

Data Contracts

What they are? "Data Contracts are first and foremost a cultural change toward data-centric collaboration" [Chad Sanderson's blog and here]

Examples of why they're necessary

A company wants to find the average of a consumer for a particular product. They find the age is 42. They are somewhat suprised by this as it's older than they expected so they check their workings - add all the ages and divide by the number of customers who bought it. After confirming the maths, the value is indeed 42 and they report it to their boss. Unfortunately, the mean age was artificially inflated because a subset of customers had an age of '999' because the system that captured that data used it as a placeholder for 'unknown'.

The next example actually happened. We were measuring average length of stay (LoS) in hospitals. When sampling the data, everything looked fine. But out of the millions of patients, a very small number (~30) had a discharge date of 1/1/1900. Clearly, the system that captured that data used this value as a token for 'unknown'. This erroneously reduced the overall LoS. The data bug was only caught when drilling down into individual hospitals, some average LoS figures were negative. Until then, we were merrily reporting the wrong national figure.

Is it a purely cultural problem?

The trouble about cultural solutions is that they depend on unreliable units called "humans". For instance, a friend of mine was the victim of an upstream, breaking change originating in the Zurich office. When he contacted the team, they were unaware of both him, his London team and the fact they were consumers of this data.

I asked on the Apache dev mailing lists if we could implement a more robust, technical solution for Spark. Watch this space for developments.

Possible solutions

Andrew Jones (who is writing a book on data contracts) uses JSON Schema to validate his data. "JSON Schema is a powerful tool for validating the structure of JSON data" [JSON Schema docs]. 

Elliot West of Dreamio (see the mailing list traffic) also favours JSON Schema. However, because JSON has only a few data types (strings, arrays, numericals etc) it's not rich enough to enforce constraints like "date X must be before date Y".

Implementations

This is a new area of development but Databrick's Delta Live Tables (DLT) claims it can “prevent bad data from flowing into tables through validation and integrity checks and avoid data quality errors with predefined error policies (fail, drop, alert or quarantine data).”
Unfortunately, it seems to be Python-only: “Can I use Scala or Java libraries in a Delta Live Tables pipeline? No, Delta Live Tables supports only SQL and Python. You cannot use JVM libraries in a pipeline. Installing JVM libraries will cause unpredictable behavior, and may break with future Delta Live Tables releases.” [docs]

Friday, June 9, 2023

Modern DevOps Tools

Some tools that I've had too little time to investigate thoroughly.

TestContainers
The free and open source TestContainers offers huge convenience to developers. For instance, you can fire up a very lightweight Postgres container in just a second or two. This ZIO SQL test (AgregatingSpec) ran in just 3.85s on my laptop. In that time, it started a Docker container, populated the Postgres database in it with test data, ran some Scala code against it then tore down the container. The container can last as long as the JVM so all your tests can use it before it detects the JVM is exiting whereupon it will kill the container.

MinIO
If you need to run S3 API compatible storage locally, you can try MinIO. It's written in Go and open source and allows you to have a local Docker container emulating Amazon storage. 

DuckDB
This open source, C++ application allows you to run SQL against Parquet files without having to fire up a whole platform. You can even run DBeaver against it.

Crossplane
Crossplane is an open source Go project that "connects your Kubernetes cluster to external, non-Kubernetes resources, and allows platform teams to build custom Kubernetes APIs to consume those resources." [docs]

Scala Native
You can now convert Scala code to stand alone executable binaries using Scala Native [baeldung]. It currently only works with single threaded applications. The output can be converted to WebAssembly...

WebAssembly
Wikipedia describes WebAssembly as "a portable binary-code format and a corresponding text format for executable programs ... for facilitating interactions between such programs and their host environment." It is an "open standard and aims to support any language on any operating system".

Tapir
Is a type-safe, Scala library that documents HTTP endpoints.

GraphQL
GraphQL is a type system, query language, etc accessible through a single endpoint that only returns what is asked of it and no surplus information. It's a spec and there are implementations in a number of languages. The graph bit comes in insofar a "query is a path in the graph, going from the root type to its subtypes until we reach scalar types with no subfields." [Bogdan Nedelcu]

LLVM
LLVM is an open source tool chain written in C++. The 'VM' in LLVM originally stood for Virtual Machine but these days but this is no longer the case. Instead of being a virtual machine, it turns any major language into a common intermediate code that can then be turned to machine code. 

GraalVM
GraalVM is an open source JDK and JRE written in Java itself and has its roots in project Maxine. But it's more than that. It offers compilation to native code as well as supporting polyglot code via its Truffle framework, a language-agnostic AST.

Quarkus
Based on GraalVM (above), Quarkus is an open source Java Framework tailored for Kubernetes. Since the JVM code is natively compiled, startup and memory sizes are small.

Spring Boot
Is an "opinionated" Java framework that favours convention-over-configuration and runs Spring apps with the minimum of fuss.

Python/Java Interop
Together, Python and Java both dominate the data engineering landscape. These languages can interoperate via Py4J which uses sockets to allow Python to invoke Java code and Jython which runs Python code wholely inside the JVM. Py4J is used extensively in Spark to allow PySpark devs to talk to Spark JVMs.
Jython, unfortunately, does not support Python 3.

Project Nessie
Nessie is an open source, JVM project that promises to do to big data what Git did to code: versioning, branching etc. It apparently sits nicely on top of Iceberg and DataBricks.

The lakeFS project is a open source, Go project that offers similar functionality.

Cloud native CI/CD
Tekton that is written in GoLang.
Argo is a Python based, Kubernetes native tool. For instance, it handles rolling deployments building on K8's RollingUpdate strategy which does not natively control traffic flow during an update. 
CircleCI seems to be mostly closed source.

Pipelines
Interestingly, CI/CD and data pipelines both use directed acycliclic graphs but with very different intent. User Han on Discord eloquently spelled out the architectural distinction:

Specifically the reason is in batch data [Pipeline] processing, we tend to scale things out horizontally by a whole lot, sometimes using GPUs. This is not a common feature supported by CI/CD workflow tools. In summary: 

Jenkins, CodePipeline, Github Actions, TeamCity, Argo, etc ==> used to build DAGs for CI/CD, tends to have shorter run time, less compute requirement, and fairly linear in dependencies.

Airflow, Dagster, Prefect, Flyte, etc ==> used to build data and/or machine learning pipelines. It tend to have longer run time, larger horizontal scaling needs, and sometimes complex dependencies.  Data pipelines also sometimes have certain needs, e.g., backfilling, resume, rerun, parameterization, etc that's not common in CI/CD pipelines

Interestingly, Luigi looks like it's dead as even its creator says on Twitter.

Istio
Istio is an open source GoLang project that transparently provides "a uniform way to integrate microservices, manage traffic flow across microservices, enforce policies and aggregate telemetry data."

Wednesday, June 7, 2023

SBT cheat sheet

To see the dependency tree, run:

sbt compile:dependencyTree

(see the answer of 13/12/21 on this SO question)

You might want to tell SBT to use wider margins if it truncatest the tree. Do that with [SO]:

val defaultWidth = 40
val maxColumn = math.max(JLine.usingTerminal(_.getWidth), defaultWidth) - 8

To coax a dependency from Scala 2 to 3, use something like this:

  libraryDependencies ++= List( ("io.laserdisc" %% "fs2-aws-s3" % "5.0.2").withCrossVersion(CrossVersion.for3Use2_13) )

To use the classpath of a particular module, use something like this [see SO]:

sbt "project core" console

In this particular case, we're opening a REPL with the classpath of a given module.

Use versionScheme (SBT docs) if you're writing libraries. This hints at how to handle clashing dependencies. Otherwise, you might see errors when depending on other libraries that "can be overridden using libraryDependencySchemes or evictionErrorLevel" [Scala-Lang]

One last thing: SBT tests run in parallel by default. This can ruin integration tests so you might want to try this workaround [SO] to ensure your tests run serialized.

Wednesday, May 31, 2023

Spark Catalysts

I've been trying to see how Spark works under the covers. The TL;DR is that it services your queries by dynamically writing Java code on the driver that it then compiles with Janino before sending it over the wire to the executors.

Let's take this Scala code on the Spark CLI:

val ds = spark.createDataFrame(List(("a", 1), ("b", 2), ("c", 4)))
ds.writeTo("spark_file_test_writeTo").create()
spark.sqlContext.sql("update spark_file_test_writeTo set _2=42")

Pretty simple but what goes on deep down is complex. First, Spark uses an Antlr lexer and a parser (a lexer tokenizes; a parser builds an AST) to turn that ugly SQL statement into a tree of Scala case classes. Then it creates the Java code in WholeStageCodegenExec (source). In this Java, you'll see a subclass of BufferedRowIterator that looks something like:

columnartorow_mutableStateArray_3[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(4, 64);
...
columnartorow_mutableStateArray_3[1].write(1, 42);

in a method called processNext. That 42 is Spark setting our value for a row. If we added a where clause in our SQL, you'd see the generated code branching. That is, the generated code can access all the other fields in a row. 

If you import an implicit, you can run debugCodeGen() on the CLI to see the code more easily. 


Friday, May 26, 2023

Spark and Iceberg

Here are some notes I took when playing with Apache Iceberg plugged into Spark. (Update: Iceberg was already supported by Google but is now supported by AWS's Athena for Apache Spark - see here).

I'm running Spark 3.3 and Iceberg 1.2.1 with:

./spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.13:1.2.1\
    --conf spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005\
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hive \
    --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.local.type=hadoop \
    --conf spark.sql.catalog.local.warehouse=/tmp/warehouse \
    --conf spark.sql.defaultCatalog=local

This does a few things.

First, notice that the version of iceberg-spark-runtime must be the same major/minor version of the Spark you're running against. The library will automatically be downloaded by Spark if you don't already have it.

Data is saved in the directory defined by spark.sql.catalog.local.warehouse. Inside will be 2 directories, data and metadata.

Spark code

This took me a few hours to get a good feel for since there are lots of thunks passed around and functionality only executed when lazy values are touched. This is just the gist of what's going on:

Code wise, the interesting functionality appears to be in RuleExecutor.execute as each Rule[_] is executed via apply. The interesting code in the Iceberg Spark extension is applyd here as they all extend Rule[LogicalPlan].

LogicalPlan is a type of QueryPlan. QueryPlan can be either logical or physical, the latter being an extension of SparkPlan. You can spot these leafs in an AST quite easily as the naming convention say they end with Exec. And it appears to be V2TableWriteExec where the driver hands control over to the executors.

The tree of LogicalPlans are traversed in CheckAnalysis.checkAnalysis. But it's the instantiation of a Dataset that where its logicalPlan references the lazy QueryExecution.commandExecuted causing it to invoke eagerlyExecuteCommands(analyzed)

A notable sub-type of LogicalPlan is Command. This represents "a non-query command to be executed by the system".

Since Spark 3, an interface is available that is "responsible for creating and initializing the actual data writer at executor side. Note that, the writer factory will be serialized and sent to executors, then the data writer will be created on executors and do the actual writing." [DataWriterFactory docs]

Finally, making sense of the AST was problematic when the flow disappeared into WholeStageCodeGen as Spark then rolls the tree up and converts it to JVM bytecode.

Iceberg code

All the Iceberg initialization happens in IcebergSparkSessionExtensions. This subverts Spark's usual functionality and injects Iceberg specific functionality. One of the things it can inject is an IcebergSparkSqlExtensionsParser that visits the AST tree as it parses a SQL String to create a LogicalPlan

Iceberg also provides its own implementation of DataWriterFactory so it can use its own Table implementation under the Spark covers that allows (for instance) its own configuration for TableScans.

It's Iceberg's SnapshotProducer.commit() that's been injected into the Spark machinery that creates the manifest files.


Sunday, May 7, 2023

The Joy of Sets?

The standard Scala Set violates basic algebra. Let's take two sets, x and y:

@ val x = Set(1, 2, 3) 
x: Set[Int] = Set(1, 2, 3)

@ val y = Set(3, 2, 1) 
y: Set[Int] = Set(3, 2, 1)

@ x == y 
res2: Boolean = true

If they're equal, we should be able to substitute one for another. Let's see how that goes:

@ def giveHead(s: Set[Int]): Int = s.head 
defined function giveHead

@ giveHead(x) 
res6: Int = 1

@ giveHead(y) 
res7: Int = 3

So, I get different results depending on which of two equal objects I call the method - crazy. 

topkek
Isn’t Map.toList impure or something like that?
tpolecat
It's pure but non-congruent with equality. m1 === m2 doesn't imply m1.toList === m2.toList because they could differ in their iteration order. That's why it's not Foldable. You can get those instances from alleycats I believe.
The unordered business is there to guarantee that Eq and Foldable are congruent (i.e., that a === b implies a.toList === b.toList) which is not necessarily true of data types like Set and Map: these are only UnorderedFoldable which requires you to fold into a commutative monoid which destroys any information you might get from observing iteration order. Which is to say "unordered" doesn't necessarily say anything about execution order, it just says any ordering that might be there won't be observable. 
Although often forgotten, sets are not functors.

You cannot parTraverse a Set (or Maps) in Cats. It just won't compile. This is a property of that particular data structure but it's not limited to Sets:
Fabio Labella @SystemFw Nov 16 16:55
You cannot traverse an [FS2] Stream, because it implies it's finite. Look at evalMap instead if the use case is "evaluate an action on each element". 
Instead, you must parUnorderedTraverse a Set.
Adam Rosien @arosien Nov 10 20:17
every Traverse is also an UnorderedTraverse, so they should act the same in that case (and since every CommutativeApplicative is also an Applicative, you can still call unorderedTraverse on something that has a Traverse)
To traverse a set (albeit unordered), the UnorderedTraverse will put the Set into a higher kinded type G[_]. If we want to take advantage of this say for List[_], we need a concrete implementation of a Parallel[List]. Among other things, this defines how to map from List[_] to Set[_] and back again. This brings us back to the problems with set's toList function - but at least now we're explicit in how we want this handled.