Saturday, December 21, 2024

Debugging Polaris in Docker

Iceberg and Polaris sitting in a tree...

I have a proof of concept on GitHub that demonstrates how to use Apache Iceberg. Since I need Apache Polaris as Iceberg's metastore, I have it running in a container. 

If I create a catalog of FILE type, if the file path for storage is X, does this refer to the filesystem of Polaris or Spark?
Michael Collado
it's going to be both, so you shouldn't really use it in that scenario. The Polaris server is going to read/write metadata.json files in its own container's file system and the spark notebook will read/write data files in its own container's filesystem, so... [Discord]
In my PoC, I use a shared filesystem mount where both the Polaris container writes as well as the host's Spark instance.

However, tests were failing with the minimum of logging. When running Docker as a non-root user, the error in the Polaris logs looks like:

{"timestamp":1734433498899,"level":"INFO","thread":"dw-51 - POST /api/catalog/v1/manual_spark/namespaces/my_namespace/tables/IcebergCRUDSpec","logger":"org.apache.polaris.service.exception.IcebergExceptionMapper","message":"Handling runtimeException Failed to get file system for path: file:/tmp/polaris/my_namespace/IcebergCRUDSpec/metadata/00000-0daa8a08-5b5d-459a-bdd0-0663534f2007.metadata.json","mdc":{"spanId":"6ea71bffea6af726","traceId":"8b485bf56e7e27aac2e47ede876e02bd","realm":"default-realm","request_id":null},"params":{}}

When running containerised Polaris as root, the tests passed but I couldn't clean up the files on the shared filesystem mount afterwards as I was not running the test suite as root on the hosts.

Digging Deeper

That string ("Failed to get file system for path") lead me to org.apache.iceberg.hadoop.Util.getFs. Unfortuately, the nested exception is wrapped in the error reported above and lost.

So, we start the container with these flags:

 -eJAVA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:8788 -p8788:8788

since polaris-dropwizard-service is part expecting JAVA_OPTS to be set. 

Great, now we can put a breakpoint in Util.getFs and printStackTrace on the nested exception. It shows:

Caused by: javax.security.auth.login.LoginException: java.lang.NullPointerException: invalid null input: name
        at jdk.security.auth/com.sun.security.auth.UnixPrincipal.<init>(UnixPrincipal.java:71)
        at jdk.security.auth/com.sun.security.auth.module.UnixLoginModule.login(UnixLoginModule.java:134)
        at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:754)
        at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:678)
        at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:676)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:714)
        at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:676)
        at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:587)
        at org.apache.hadoop.security.UserGroupInformation$HadoopLoginContext.login(UserGroupInformation.java:2148)

A quick look at the JDK code shows that UnixSystem.getUsername appears to be returning a null. And this appears to be because there is no user with my ID in the container - d'oh.

A Solution

One solution is to have a bespoke Docker entrypoint that creates the user (if it doesn't exist) given a user ID passed by the Fabric8's docker-maven-plugin and runs Polaris as that user. If it's the same user as that running the integration tests, both Polaris and Spark can write to the same directory and tables can be dropped without permission issues.

Monday, December 9, 2024

Pipeline testing

A lot of developer wonder how to test pipelines - see this Discord thread. The best way we've found is to create fixed, known data that when our transform acts on it, we can make reasonable assertions about what comes out.

Synthetic data

We are operating in the healthcare domain. We have a data set of events at hospitals and we want to turn them into a running total of patients who happen to occupy the hospital on any given day. Our test data should be:
precisely x medical events over D days for y different unique patients, distributed evenly over hospitals {a,b,c} where each patient is admitted on day i mod D and discharged i mod w days later, where i is the unique patient id in range [0,y]
Finally, the discharge date is null every z patients because we know we have bad data (urgh).
If we turn this natural language into a (Python) code signature, it looks like this: 

def random_inpatient_data(
    num_rows: int,
    num_patients: int,
    providers: list,
    discharge_null_every=97,
    attendance_date_fn=lambda i: random_timestamp(i, datetime.strptime("2021-11-01", "%Y-%m-%d")),
    start_time_fn=random_time_given,
) -> DataFrame:
    """
    Fake but realistic data for tests.
    :param num_rows: Total number of raws for this data set
    :param num_patients: The number of unique patients in this data set
    :param providers: possible providers from white a synthetic site will be created
    :param discharge_null_every: step size for null discharge dates
    :param attendance_date_fn: A function taking a seed and returning an attendance date
    :param start_time_fn: A function taking a date and a seed and generating a timestamp
    :return: Synthetic Inpatient data.
    """
...

There's some things to note. First, that there are sensible defaults so our tests create the data with only the peculiarities salient to its needs. Secondly, the data is random-ish but the same for any given set of parameters. 

It all sounds a little complicated but the whole thing is some two dozen lines that's called repeatedly across the test suite.

The assertions

There are certain invariants we can assert on. Some are straightforward like the hospital occupancy must be always be equal or greater than zero; or that the number of admissions for a day must always be less or equal to the running total for that day.

Some are a little more involved, for instance the sum of daily occupancy deltas over the time frame is zero (ie, everybody who is admitted is ultimately discharged). Obviously the algorithm must be robust and not count zombie patients who appear to be never discharged - remember that "the discharge date is null every z patients" above?

Another invariant is that we should have a reading for all days in a contiguous block. Remember that the input data is a series of events. If 10 patients are admitted on Monday and 5 are dischared on Friday and noting happens on Tuesday to Thursday, do we still have readings for those dates even though nothing happened? (We should)

Crafting the test data also raised some interesting corner cases that we needed to take back to the business analysts. For example. if a patient is discharged the same day they're admitted, do they show up on that day's occupancy numbers or not? If the discharge date is null what do we do with this patient? 

Conclusion

The use of synthetic data is a powerful tool when building automated regression tests. Tools like Deequ can test data quality but require a curated data sets. This is much harder than it sounds especially when their schemas and semantics change. 
 
Creating code that tests the functionality of your pipeline allows you to refactor your code with confidence. Once you try it, you'll never go back.

Diagnosing Kafka

The Problem

I have an integration test that create a cluster of 4 Kafka brokers using the Kraft protocol (ie, no Zookeeper). It then kills one and upon sending more messages, expects a consumer to receive them. Approximately 1 in 4 times, it fails as no messages are received. This 1 in 4 figure seems suspicious...

Killing a node sometimes meant the cluster was in a bad state. The other brokers kept regularly barfing UnknownHostException as they tried to talk to the missing node. Looking at the topics shed some light on the problem.

$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test_topic
Topic: test_topic TopicId: MaHwLf2jQ62VNUxtiFUGvw PartitionCount: 2 ReplicationFactor: 1 Configs: 
Topic: test_topic Partition: 0 Leader: 4 Replicas: 4 Isr: 4
Topic: test_topic Partition: 1 Leader: none Replicas: 1 Isr: 1

The Solution

Having modified the test so that it now creates the NewTopic with a replication factor of 2, the same command now gives:

Topic: test_topic TopicId: w37ZKZnqR-2AdmT76oiWsw PartitionCount: 2 ReplicationFactor: 2 Configs: 
Topic: test_topic Partition: 0 Leader: 4 Replicas: 4,1 Isr: 4,1
Topic: test_topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2

The test now passes every time (so far). We appear to have fixed the problem.

The Reason

Kafka replicates data over nodes (the Replicas above). These replicas may or may not be in-synch replications (Isr). These ISRs acknowledges the latest writes to the leader within a specified time and only they are considered for clean leader election.
"Producers write data to and consumers read data from topic partition leaders.  Synchronous data replication from leaders to followers ensures that messages are copied to more than one broker. Kafka producers can set the acks configuration parameter to control when a write is considered successful." [Disaster Recovery for Multi- Datacenter Apache Kafka Deployments, Confluent]
Conclusion

But what if this were a production topic not an integration test - how would you fix it? Well, if the data is not replicated, when the broker(s) hosting it die, you've lost the data. You can configure the topic to use unclean.leader.election.enable by using the a Kafka CLI tool. But it's a trade off. "If we allow out-of-sync replicas to become leaders, we will have data loss and data inconsistencies." [Conduktor]