Friday, January 10, 2025

The New Logging

Twenty years ago, everybody was writing their own logging framework. Today, when distributed computing is the norm, distributed logging is a necessity.

Functional Programmer really comes into its own in such an environment. So, it's no surprise that the FP tools already support distributed logging. In the Cats ecosystem, there is Natchez. This can feed into distributed tracing system like the open source Jaeger, a Go application that can store the data in Elastic or Cassandra. Zipkin is another if you prefer a Java implementation. 

[Regarding Natchez "unless you have a specific reason to want to use Natchez, you may want to look at otel4s instead. I think the community will be migrating to otel4s once it is binary-stable (Natchez is great and still works well but if you're starting fresh you may as well use the library that implements the industry standard for tracing)" - Discord]

MDC

First, some terminology.

"Mapped Diagnostic Context is to provide a way to enrich log messages with pieces of information that could be not available in the scope where the logging actually occurs, but that can be indeed useful to better track the execution of the program." [Baeldung]

Aside: don't use this mechanism in your business logic code.
Fabio Labella @SystemFw Jan 13 17:20
I don't know how many details I can give, but basically it got to the point that C-level executives knew the word "ThreadLocal", which is really bad. But basically someone had built an entire internal framework based essentially on MDC for a lot of business logic in a multitentant environment (falling down the slippery slope that it was "context"), and that at some point there were a crap load of race conditions due to ThreadLocal + asynchrony, risking crosstalk, which in a financial institution can well mean your whole company gets shut down
Spans and Kernels

"The usual tracing approach involves threading spans (aka the context) throughout the application we wish to instrument. On the other hand, distributed tracing requires a so-called kernel to be able to continue the previous tracing span." - Functional Event Driven Architecture, Volpe.

Cats

Within an effects engine, ThreadLocal becomes the wrong tool. You can use Task Local in Monix to create MDC functionality.
Gerry Fletcher @gerryfletch Jan 13 17:17
It's purely to add the request id into every log line
Fabio Labella @SystemFw Jan 13 17:17
log4cats has a withContext that lets you do that without relying on state
In "Practical FP in Scala", Gabriel Volpe says:
Normally, people use the Slf4j implementation, which is created as shown below. 
implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]
In our application, we are going to use Log4cats , which is by now the standard logging library of the Cats ecosystem.Whenever we need logging capability, all we need is to add a Logger constraint to our effect type. For instance:

def program[F[_]: Apply: Logger]: F[Unit] =
  Logger[F].info("starting program") *> doSomething
The idea is that your code logs as normal but transparently these events are sent to some aggregator. What's more, the event are contextual - they might take place over several nodes in a cluster.

The FP magic in the above Logger[F].info("... code is that there is an implicit Logger in the ether (this code's enclosing scope will define [F[_]: Logger: ...). This Logger might just write to disk or might send the message to an external system. This calling code doesn't care. The relevant Logger is merely summoned.

Regarding architecture:
Christopher Davenport @ChristopherDavenport Jan 13 17:49
So I can write middlewares on my logger and clients that enhance them with aspects about the user and then I can queue them into my in memory queue for when its something thats decoupled from response cycle. I keep all the logging across the entire stack, even after submission to a queue as a result.
Including things like request-id, user-information, and can continue to propogate that information to other services for tracing reasons.

Theres also a lifter that can lift a client into a Kleisli in http4s, so if you do that before the rest of your app you can still work in a fully abstract F, just pass in the client.

I extract and enhance my loggers at the point I know user identity and then pass those around with a ton of enhanced information to make debugging simple.
The very nice Gabriel Volpe test trading application creates using docker-compose some microservices to demonstrate distributed logging. However, to get the full use out of it, you need to set up a HoneyComb account to view them properly. (You also need to fudge the classpath if you want to Feed the cluster with some fake trades as it needs access to modules/domain/jvm/target/test-classes).

Saturday, December 21, 2024

Debugging Polaris in Docker

Iceberg and Polaris sitting in a tree...

I have a proof of concept on GitHub that demonstrates how to use Apache Iceberg. Since I need Apache Polaris as Iceberg's metastore, I have it running in a container. 

If I create a catalog of FILE type, if the file path for storage is X, does this refer to the filesystem of Polaris or Spark?
Michael Collado
it's going to be both, so you shouldn't really use it in that scenario. The Polaris server is going to read/write metadata.json files in its own container's file system and the spark notebook will read/write data files in its own container's filesystem, so... [Discord]
In my PoC, I use a shared filesystem mount where both the Polaris container writes as well as the host's Spark instance.

However, tests were failing with the minimum of logging. When running Docker as a non-root user, the error in the Polaris logs looks like:

{"timestamp":1734433498899,"level":"INFO","thread":"dw-51 - POST /api/catalog/v1/manual_spark/namespaces/my_namespace/tables/IcebergCRUDSpec","logger":"org.apache.polaris.service.exception.IcebergExceptionMapper","message":"Handling runtimeException Failed to get file system for path: file:/tmp/polaris/my_namespace/IcebergCRUDSpec/metadata/00000-0daa8a08-5b5d-459a-bdd0-0663534f2007.metadata.json","mdc":{"spanId":"6ea71bffea6af726","traceId":"8b485bf56e7e27aac2e47ede876e02bd","realm":"default-realm","request_id":null},"params":{}}

When running containerised Polaris as root, the tests passed but I couldn't clean up the files on the shared filesystem mount afterwards as I was not running the test suite as root on the hosts.

Digging Deeper

That string ("Failed to get file system for path") lead me to org.apache.iceberg.hadoop.Util.getFs. Unfortuately, the nested exception is wrapped in the error reported above and lost.

So, we start the container with these flags:

 -eJAVA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:8788 -p8788:8788

since polaris-dropwizard-service is part expecting JAVA_OPTS to be set. 

Great, now we can put a breakpoint in Util.getFs and printStackTrace on the nested exception. It shows:

Caused by: javax.security.auth.login.LoginException: java.lang.NullPointerException: invalid null input: name
        at jdk.security.auth/com.sun.security.auth.UnixPrincipal.<init>(UnixPrincipal.java:71)
        at jdk.security.auth/com.sun.security.auth.module.UnixLoginModule.login(UnixLoginModule.java:134)
        at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:754)
        at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:678)
        at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:676)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:714)
        at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:676)
        at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:587)
        at org.apache.hadoop.security.UserGroupInformation$HadoopLoginContext.login(UserGroupInformation.java:2148)

A quick look at the JDK code shows that UnixSystem.getUsername appears to be returning a null. And this appears to be because there is no user with my ID in the container - d'oh.

A Solution

One solution is to have a bespoke Docker entrypoint that creates the user (if it doesn't exist) given a user ID passed by the Fabric8's docker-maven-plugin and runs Polaris as that user. If it's the same user as that running the integration tests, both Polaris and Spark can write to the same directory and tables can be dropped without permission issues.

Monday, December 9, 2024

Pipeline testing

A lot of developer wonder how to test pipelines - see this Discord thread. The best way we've found is to create fixed, known data that when our transform acts on it, we can make reasonable assertions about what comes out.

Synthetic data

We are operating in the healthcare domain. We have a data set of events at hospitals and we want to turn them into a running total of patients who happen to occupy the hospital on any given day. Our test data should be:
precisely x medical events over D days for y different unique patients, distributed evenly over hospitals {a,b,c} where each patient is admitted on day i mod D and discharged i mod w days later, where i is the unique patient id in range [0,y]
Finally, the discharge date is null every z patients because we know we have bad data (urgh).
If we turn this natural language into a (Python) code signature, it looks like this: 

def random_inpatient_data(
    num_rows: int,
    num_patients: int,
    providers: list,
    discharge_null_every=97,
    attendance_date_fn=lambda i: random_timestamp(i, datetime.strptime("2021-11-01", "%Y-%m-%d")),
    start_time_fn=random_time_given,
) -> DataFrame:
    """
    Fake but realistic data for tests.
    :param num_rows: Total number of raws for this data set
    :param num_patients: The number of unique patients in this data set
    :param providers: possible providers from white a synthetic site will be created
    :param discharge_null_every: step size for null discharge dates
    :param attendance_date_fn: A function taking a seed and returning an attendance date
    :param start_time_fn: A function taking a date and a seed and generating a timestamp
    :return: Synthetic Inpatient data.
    """
...

There's some things to note. First, that there are sensible defaults so our tests create the data with only the peculiarities salient to its needs. Secondly, the data is random-ish but the same for any given set of parameters. 

It all sounds a little complicated but the whole thing is some two dozen lines that's called repeatedly across the test suite.

The assertions

There are certain invariants we can assert on. Some are straightforward like the hospital occupancy must be always be equal or greater than zero; or that the number of admissions for a day must always be less or equal to the running total for that day.

Some are a little more involved, for instance the sum of daily occupancy deltas over the time frame is zero (ie, everybody who is admitted is ultimately discharged). Obviously the algorithm must be robust and not count zombie patients who appear to be never discharged - remember that "the discharge date is null every z patients" above?

Another invariant is that we should have a reading for all days in a contiguous block. Remember that the input data is a series of events. If 10 patients are admitted on Monday and 5 are dischared on Friday and noting happens on Tuesday to Thursday, do we still have readings for those dates even though nothing happened? (We should)

Crafting the test data also raised some interesting corner cases that we needed to take back to the business analysts. For example. if a patient is discharged the same day they're admitted, do they show up on that day's occupancy numbers or not? If the discharge date is null what do we do with this patient? 

Conclusion

The use of synthetic data is a powerful tool when building automated regression tests. Tools like Deequ can test data quality but require a curated data sets. This is much harder than it sounds especially when their schemas and semantics change. 
 
Creating code that tests the functionality of your pipeline allows you to refactor your code with confidence. Once you try it, you'll never go back.

Diagnosing Kafka

The Problem

I have an integration test that create a cluster of 4 Kafka brokers using the Kraft protocol (ie, no Zookeeper). It then kills one and upon sending more messages, expects a consumer to receive them. Approximately 1 in 4 times, it fails as no messages are received. This 1 in 4 figure seems suspicious...

Killing a node sometimes meant the cluster was in a bad state. The other brokers kept regularly barfing UnknownHostException as they tried to talk to the missing node. Looking at the topics shed some light on the problem.

$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test_topic
Topic: test_topic TopicId: MaHwLf2jQ62VNUxtiFUGvw PartitionCount: 2 ReplicationFactor: 1 Configs: 
Topic: test_topic Partition: 0 Leader: 4 Replicas: 4 Isr: 4
Topic: test_topic Partition: 1 Leader: none Replicas: 1 Isr: 1

The Solution

Having modified the test so that it now creates the NewTopic with a replication factor of 2, the same command now gives:

Topic: test_topic TopicId: w37ZKZnqR-2AdmT76oiWsw PartitionCount: 2 ReplicationFactor: 2 Configs: 
Topic: test_topic Partition: 0 Leader: 4 Replicas: 4,1 Isr: 4,1
Topic: test_topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2

The test now passes every time (so far). We appear to have fixed the problem.

The Reason

Kafka replicates data over nodes (the Replicas above). These replicas may or may not be in-synch replications (Isr). These ISRs acknowledges the latest writes to the leader within a specified time and only they are considered for clean leader election.
"Producers write data to and consumers read data from topic partition leaders.  Synchronous data replication from leaders to followers ensures that messages are copied to more than one broker. Kafka producers can set the acks configuration parameter to control when a write is considered successful." [Disaster Recovery for Multi- Datacenter Apache Kafka Deployments, Confluent]
Conclusion

But what if this were a production topic not an integration test - how would you fix it? Well, if the data is not replicated, when the broker(s) hosting it die, you've lost the data. You can configure the topic to use unclean.leader.election.enable by using the a Kafka CLI tool. But it's a trade off. "If we allow out-of-sync replicas to become leaders, we will have data loss and data inconsistencies." [Conduktor]

Wednesday, November 27, 2024

Cancel culture

Cancellation is a thorny issue in software - especially when we use containers as they're constantly being killed. 

First, let's look at how some Scala frameworks handle it. This Discord chat is about where the boundaries lie in Cats. Basically, if code running flatMaps that are chained together is cancelled, the next flatMap may be executed (you can see that with a println) but the IO created in it is not.

We're going to SIGTERM this code in CatsCleanup:

  val resource = Resource.make(IO.println("acquire"))(_ => IO.println("release"))

  def run: IO[Unit] = for {
    _ <- resource.use(_ => IO.readLine)
  } yield {
    ()

with 

kill -SIGTERM $(jps | grep CatsCleanup | awk '{print $1}')

In 3.5.4 the Resource is released and the output looks like:

acquire
release

Process finished with exit code 143 (interrupted by signal 15:SIGTERM)

(The exit code when killing a process is 128 + the sigterm code. You can see the last exit code of a process in a Unix-like system with echo $?).

The equivalent code in ZIO (2.0.21):

  val release = ZIO.logInfo("release")

  val resource = ZIO.acquireRelease(ZIO.logInfo("acquire"))(_ => {

      release
  })

  override def run: ZIO[Any & ZIOAppArgs & Scope, Any, Any] = for {
    _ <- resource
    _ <-zio.Console.readLine("press a key")
  } yield ()

does nothing.

Why it's important

With many workloads moving to, say, Kubernetes, cancellation comes with the territory. 
"What happens when a pod starts up, and what happens when a pod shuts down? 
"When a pod starts in a rolling deployment without the readiness probe configured ... the pod starts receiving traffic even though the pod is not ready. The absence of a readiness probe makes the application unstable. ... 

The problem is"it takes more time to update the iptables rules than for the containers to be terminated by the Kubelet... The Kubelet immediately sends a SIGTERM signal to the container, and the endpoints controller sends a request back to the API server for the pod endpoints to be removed from all service objects... Due to the difference in task completion time, Services still route traffic to the endpoints of the terminating pods

[The solution involves] "adding a preStop hook to the deployment configuration. Before the container shuts down completely, we will configure the container to wait for 20 seconds. It is a synchronous action, which means the container will only shut down when this wait time is complete". 

This gives your application time to clean itself up.


Monday, November 11, 2024

Databricks in Azure

The Databricks plugin for VSCode is actually quite nice. Once installed, you can login via your IDE:


and Sign in to Databricks workspace using profile in your ~/.databrickscfg file. For my windows VM, this is just a host string pointing at my cluster and a auth_type of databricks-cli. Signing in opens a browser and I just authorise my session.

Now, the nice thing is that the code I develop in VSCode locally can be run remotely on the cluster by choosing Run File as Workflow


This uploads all my code into a Databricks cluster and I can see the job running clicking the link:
which opens my browser to the job's console.

Interestingly. trying to read via a notebook a Spark Dataframe that had been persisted as Parquet from this job lead to an obscure error. Apparently, I have to run:

spark.conf.set("spark.databricks.io.cache.enabled", "false")

Databricks as part of the Pipeline

Azure Data Factory is a little like Airflow but trying to do something clever with it (eg, use a CLI) is difficult. We found that having it invoke a Databricks notebook allowed us to do anything a little more sophisticated.

Note that you'll have to create a Databricks Linked Service to allow your ADF pipeline access to the notebook. These Microsoft docs proved helpful.

If you want the notebook to read from a parameter ADF passes to it, use something like this:

def read_param(key: str) -> str:
    dbutils.widgets.text(key, "")
    parameter = dbutils.widgets.get(key)
    return parameter

where key corresponds to one of the Base Parameters in the Notebook ADF activity. Note that there is a 10k character limit on these arguments [SO].

If you want the notebook to return something back to the ADF flow, have it terminate with:

dbutils.notebook.exit(YOUR_VALUE_HERE)

[See this SO.]

Connecting Databricks to a Database

If you want to connect a Databricks notebook to a cloud-based SQL Server instance, you can run:

%pip install adal
%pip install openpyxl

servername = "YOUR_MACHINE_NAME"
jdbcHostname = f"{servername}.database.windows.net"
jdbcDatabase = "YOUR_DB" 
driver = "com.microsoft.sqlserver.jdbc.spark"
url = f"jdbc:sqlserver://{jdbcHostname};databaseName={jdbcDatabase}"

as a one off, then:

import adal 
authority = f"https://login.microsoftonline.com/common"
context = adal.AuthenticationContext(authority)
jdbcPort = 1433
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)

client_id = "YOUR_CLIENT_ID" # Client ID for Azure Cloud Shell (you do not have to replace this)
resource_app_id_url = "https://database.windows.net/" 

code = context.acquire_user_code(resource_app_id_url, client_id)
token = context.acquire_token_with_device_code(resource_app_id_url, code, client_id)
access_token = token["accessToken"]

Now you can use Spark to read the DB with the help of this token:

from pyspark.sql import SQLContext

jdbc_db_practice_mapping = (
        spark.read
        .format("com.microsoft.sqlserver.jdbc.spark")
        .option("url", url)
        .option("dbtable", "RightCare_ref.practice_CCG_quarter_lookup")
        .option("accessToken", access_token)
        .option("encrypt", "true")
        .option("hostNameInCertificate", "*.database.windows.net")
        .load()
)

Et voila. You're reading from SQL Server.

Saturday, October 26, 2024

NVIDIA Rapids

You can make Spark go faster by offloading some of the work to the GPU. There is an NVIDIA library (spark-rapids) to do this. 

A quick introduction

There are quite a few references to UCX in Spark Rapids. Assuming you have the hardware, this allows remote direct memory access (RDMA), basically sharing data in memory that circumvents the kernel.

"UVM or universal memory can allow main host memory to act essentially as swap for device(GPU) memory. This allows the GPU to process more data than fits in memory, but  can result in slower processing. This is an experimental feature." (from RapidsConf)

The spillStorageSize is the "Amount of off-heap host memory to use for buffering spilled GPU data before spilling  to local disk. Use -1 to set the amount to the combined size of pinned and pageable memory pools."

Old Ubuntu

Unfortunately, when running a test on Ubuntu 18 (I know, I know) I saw:

Caused by: java.lang.UnsatisfiedLinkError: /home/henryp/Code/Scala/SparkEcosystem/spark-rapids/integration_tests/target/tmp/cudf3561040550923512030.so: /lib/x86_64-linux-gnu/libc.so.6: version `GLIBC_2.28' not found (required by /home/henryp/Code/Scala/SparkEcosystem/spark-rapids/integration_tests/target/tmp/cudf3561040550923512030.so)
...
        at ai.rapids.cudf.NativeDepsLoader.loadDep(NativeDepsLoader.java:246)

After installing the latest Ubuntu on VMWare, it appears that you cannot access the GPU using VMWare Workstation.

You can however use a Docker image - just check that you have the Docker daemon that can handle NVIDIA installed by running:

henryp@adele: docker info | grep Runtimes 
Runtimes: io.containerd.runc.v2 nvidia runc

or 

henryp@adele:~$ grep -A2 -i nvidia /etc/docker/daemon.json
        "nvidia": {
            "path": "/usr/bin/nvidia-container-runtime",
            "runtimeArgs": []
        }

and download an image from here.

Now, I just need to run something like:

docker run --rm --gpus all  --runtime=nvidia  -it   -v /home/henryp/Code/Scala/SparkEcosystem/spark-rapids/:/home/henryp/Code/Scala/SparkEcosystem/spark-rapids/ -v /home/henryp/Tools:/home/henryp/Tools -v /home/henryp/.m2:/.m2  -v /usr/local/bin/Java:/usr/local/bin/Java  --user root  nvidia/cuda:12.6.1-devel-ubi8 /bin/bash

and once I'm in, run:

yum -y install git
yum -y install diffutils
yum -y install rsync
export MAVEN_HOME=/home/henryp/Tools/Build/Maven/Latest
export PATH=$MAVEN_HOME/bin:$PATH
export JAVA_HOME=/usr/local/bin/Java/Latest17
export PATH=$JAVA_HOME/bin:$PATH

Constantly installing some tools for Rapids to build proved a bit tedious, so I extended the NVIDIA docker image with this Dockerfile:

FROM nvidia/cuda:12.6.1-devel-ubi8

RUN yum -y install git
RUN yum -y install diffutils
RUN yum -y install rsync

Also, if I wanted to use mvnDebug [SO], I had to set the network of the container to host using --network host [SO]. Then, it's just a matter of running:

mvnDebug scalatest:test -DforkMode=never

and attaching a debugger from the host machine.

Unfortunately, sometimes when I close and re-open my laptop, the tests start failing with:

...
WindowedBlockIteratorSuite:
terminate called after throwing an instance of 'cudf::jni::jni_exception'
  what():  CUDA ERROR: code 999
/home/henryp/Tools/Build/Maven/Latest/bin/mvnDebug: line 36:   216 Aborted                 (core dumped) env MAVEN_OPTS="$MAVEN_OPTS" MAVEN_DEBUG_OPTS="$MAVEN_DEBUG_OPTS" "`dirname "$0"`/mvn" "$@"

Apparently, I must reboot my machine :(

Some miscellaneous code notes

Finally, I get to look at the code in action. Rapids adds RapidsExecutorPlugin (which extends the Spark ExecutorPlugin interface) that immediately initializes the GPU and memory (see GpuDeviceManager.initializeGpuAndMemory). Note that in setGpuDeviceAndAcquire we see a comment that says: 
"cudaFree(0) to actually allocate the set device - no process exclusive required since we are relying on Spark to schedule it properly and not give it to multiple executors"
This is why the tests (HashAggregatesSuite) a hard-coded to use just one CPU core.

Rapids then has a parallel set of classes that look a lot like the Spark classes that represent linear algebra structures. For example, there is a ColumnVector abstraction in both Spark and Rapids. The interesting Rapids one is GpuColumVector  - which implements this Spark interface - that can be instantiated by a GpuShuffleExchangeExecBase. Amongst other things, objects of these classes contain the address of their off-heap data and a reference counter.

Still playing.