Wednesday, January 29, 2025

Databricks in an ADF pipeline

ADF is a nasty but ubiquitous techology in the Azure space. It's low-code and that means a maintenance nightmare. What's more, if you try to do anything remotely clever, you'll quickly hit a brick wall.

Fortunately, you can make calls from ADF to Databricks notebooks where you can write code. In our case, this was to grab the indexing SQL from SQL Server and version control it in Git. At time of writing, there was no Activity in ADF to access Git.

To access Git you need a credential. You don't want to hardcode it into notebook as anybody who has access to it can see it. So, you store it as a secret with something like:

echo GIT_CREDENTIAL | databricks secrets put-secret YOUR_SCOPE YOUR_KEY
databricks secrets put-acl YOUR_SCOPE YOUR_USER READ

where YOUR_SCOPE is the namespace in which the secret lives (can be anything modulo some reserved strings); YOUR_KEY is the key for retrieval and YOUR_USER is the user onto whom you wish to bestow access.

Now, your notebook can retrieve this information with:

git_credential = dbutils.secrets.get(scope="YOUR_SCOPE", key="YOUR_KEY")

and although others might see this code, the value is only accessible at runtime and only when YOUR_USER is running it.

Next, we want to get an argument that is passed to the Databricks notebook from ADF. You can do this in Python with:

dbutils.widgets.text(MY_KEY, "")
parameter = dbutils.widgets.get(MY_KEY)

where MY_KEY is a base parameter in the calling ADF Notebook Activity, for example:

Passing arguments to a Databricks notebook from ADF

Finally, you pass a return value back to ADF with:

dbutils.notebook.exit(YOUR_RETURN_VALUE)

and use it back in the ADF workflow by referencing @activity("YOUR_NOTEBOOK_ACTIVITY_NAME").output.runOutput in this case where we want to run the SQL the notebook returned:

ADF using the value returned from the Databricks notebook

That's how ADF talks to Databricks and how Databricks replies. There's still the small matter of how the notebook invokes Git.

This proved unexpectedly complicated. To check the code out was easy:

import subprocess
print(subprocess.run(["git", "clone", f"https://NHSXEI:{git_credential.strip()}@dev.azure.com/YOUR_GIT_URL/{repo_name}"], capture_output=True))

but to run some arbitrary shell commands in that directory via Python was complicated as I could not cd into this new directory. This is because cd is a built-in [SO] of the shell not an executable. So, instead I used a hack: my Python wrote some dynamically generated shell script, wrote it to a file and executed it from a static piece of shell script. Bit ick but does the job.

Monday, January 27, 2025

Upgrading Kubernetes

I'm adding a new node to my home K8s cluster. Conveniently, I can find the command to run on the client who wants to join by running this on the master [SO]:

kubeadm token create --print-join-command

However, joining proved a problem because my new machine has Kubernets 1.32 and my old cluster is still on 1.28. K8s allows a difference of 1 but this was just too big a jump. 

So, time to upgrade the cluster.

First I had to update the executables by folowing the official documents to put the correct keyrings in place. I also had to overridr the held packages with --allow-change-held-packages as all my boxes are Ubuntu and can be upgraded in lockstep. This meant I didn't have to run:

sudo kubeadm upgrade apply 1.32
sudo kubeadm config images pull --kubernetes-version 1.32.1

However, I must have bodged something as getting the nodes showed the master was in a state of Ready,SchedulingDisabled [SO] where uncordon did the trick. I was also getting "Unable to connect to the server: tls: failed to verify certificate: x509" amongst other errors), so I rolled back [ServerFault] all config with:

sudo kubeadm reset

To be honest, I had to do this a few times as until I worked out that my bodge was an incorrect IP address in my /etc/hosts file for one of my nodes - d'oh.

Then I followed the orginal instructions I used to set up the cluster last year. But don't forget the patch I mention in a previous post. Also note that you must add KUBELET_EXTRA_ARGS="--cgroup-driver=cgroupfs" to /etc/default/kubelet and the JSON to /etc/docker/daemon.json on the worker nodes too

[Addendum. I upgrade an Ubuntu box from 20 to 22 and had my flannel and proxy pods on that box constantly crashing. Proxy was reporting "nodePortAddresses is unset; NodePort connections will be accepted on all local IPs. Consider using `--nodeport-addresses primary`". Following the instructions in the previoud paragraph a second time solved the problem as the upgrade had clearly blatted some config.]

I checked the services [SO] with:

systemctl list-unit-files | grep running | grep -i kube 

which showed the kubelet is running (enabled means it will restart upon the next reboot; you can have one without the other) and 

sudo systemctl status kubelet

Things seemed OK:

$ kubectl get pods --all-namespaces 
NAMESPACE      NAME                          READY   STATUS    RESTARTS      AGE
kube-flannel   kube-flannel-ds-7zmz8         1/1     Running   4 (88m ago)   89m
kube-flannel   kube-flannel-ds-sh4nk         1/1     Running   9 (64m ago)   86m
kube-system    coredns-668d6bf9bc-748ql      1/1     Running   0             94m
kube-system    coredns-668d6bf9bc-zcxfp      1/1     Running   0             94m
kube-system    etcd-nuc                      1/1     Running   8             94m
kube-system    kube-apiserver-nuc            1/1     Running   8             94m
kube-system    kube-controller-manager-nuc   1/1     Running   6             94m
kube-system    kube-proxy-dd4gc              1/1     Running   0             94m
kube-system    kube-proxy-hlzhj              1/1     Running   9 (63m ago)   86m
kube-system    kube-scheduler-nuc            1/1     Running   6             94m

Note the nuc node name indicates it's running on my cluster's master and the other pods (flannelcoredns and kube-proxy) have an instance on each node in the cluster.

Note also that we'd expect two Flannel pods as there are two nodes in the cluster.

It's worth noting at this point that kubectl is a client side tool. In fact, you won't be able to see the master until you scp /etc/kubernetes/admin.conf on the master into your local ~/.kube/config.

Contrast this with kubeadm which is a cluster side tool.

Wednesday, January 22, 2025

Notes on LLMs

I've been learning to build LLMs the hard way. These are some notes I made on my journey.

Intro

I'm not going to train my LLM from scratch. Instead, I'm going to use somebody else's ("Pre Training") and tune it ("Supervised Fine Tuning"). The differences can be seen here [Microsoft] but suffice to say that Pre Training takes huge resources whereas Supervised Fine Tuning can be achieved on modest hardware.

The Environment

I'll be extensively using these Python libraries:
  • TRL "is a library created and maintained by Hugging Face to train LLMs using SFT and preference alignment." [1]
  • Unsloth "uses custom kernels to speed up training (2-5x) and reduce memory use (up to 80% less memory)... It is based on TRL and provides many utilities, such as automatically converting models into the GGUF quantization [see below] format." [1]
  • "Accelerate is a library that enables the same PyTorch code to be run across any distributed configuration by adding just four lines of code!" [HuggingFace] It offloads large matrices to disk or CPU
Preference alignment is an umbrella term. It "addresses the shortcomings of SFT by incorporating direct human or AI feedback into the training process" whereas SFT is just trained on a corpus of text learning its structure.

But on Ubuntu 20, my code gives:

Detected kernel version 5.4.0, which is below the recommended minimum of 5.5.0; this can cause the process to hang. It is recommended to upgrade the kernel to the minimum version or higher.

This appears to be referring to the Linux kernel [SO]

OOMEs

Using a newer kernel (6.5.0-1025-oem) gets me further but then barfs with:

  File "/home/henryp/venv/llms/lib/python3.10/site-packages/transformers/activations.py", line 56, in forward
    return 0.5 * input * (1.0 + torch.tanh(math.sqrt(2.0 / math.pi) * (input + 0.044715 * torch.pow(input, 3.0))))
torch.OutOfMemoryError: CUDA out of memory. Tried to allocate 1.17 GiB. GPU 0 has a total capacity of 7.75 GiB of which 529.62 MiB is free. Including non-PyTorch memory, this process has 7.22 GiB memory in use. Of the allocated memory 6.99 GiB is allocated by PyTorch, and 87.96 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

Some advice I received was:
You need to reduce the batch size even more.
You can even try using different optimizers as optimizers hold a lot of memory - Use 8bitadam or adafactor.
You can try LoRA - Low rank adaptation.
You can also quantize the model to load in lower bit precisions [Discord]
"LoRA is a parameter-efficient technique for fine-tuning LLMs... The primary purpose of LoRA is to enable the fine-tuning of LLMs with significantly reduced
computational resources."

The idea is that when we fine tune the weights matrix (W), we'd semantically do this:

W' = W + M

But if M is large, that's a lot of memory we're talking about.

The trick with LoRA that instead of M we use two matrices of much smaller rank, A and B and replace M with AB. as long as A has the same number of rows as M and B the same number of columns, they can otherwise be much smaller (referred to as r in the library code). "Larger ranks may capture more diverse tasks but could lead to overfitting" [1, p214].

Sure, we lose some accuracy but when my code now reads:

model, tokenizer = FastLanguageModel.from_pretrained( # Unsloth
...
            load_in_4bit=True, # "You can activate QLoRA by setting load_in_4bit to True"  LLMEngineering, p251
        )

it runs to completion. Note that QLora combines LoRA with quantization. "The core of QLoRA’s approach involves quantizing the base model parameters to a custom 4-bit NormalFloat (NF4) data type, which significantly reduces memory usage." [1]

Note that I also tried to set a quantization_config and although the fine tuning stage worked, when I tried to use the model, but caused the code using the model to complain about non-zero probabilities for reasons I don't yet understand. 

Preference Alignments

Reinforcement Learning from Human Feedback "typically involves training a separate reward model and then using reinforcement learning algorithms like PPO to fine-tune the language model... This is typically done by presenting humans with different answers and asking them to indicate which one they prefer." [1]

Proximal Policy Optimization (PPO) "is one of the most popular RLHF algorithms. Here, the reward model is used to score the text that is generated by the trained model. This reward is regularized by an additional Kullback–Leibler (KL) divergence factor, ensuring that the distribution of tokens stays similar to the model before training (frozen model)." [1]

Direct Preference Optimization uses Kulback-Leibler [previous post] to compare an accepted answer with a rejected one mitigating the need for an RL algorithm.

The idea with preference alignment is that the adjustment to the underlying model is much smaller. Compare my trained model weights (M in the above equation):

$ du -sh saved_model/model_mlabonne
321M    saved_model/model_mlabonne

with the model weight on which it was based (W in the above equation):

$ du -sh ~/.cache/huggingface/hub/models--mlabonne--TwinLlama-3.1-8B/
15G     /home/henryp/.cache/huggingface/hub/models--mlabonne--TwinLlama-3.1-8B/

and it's tiny.

[1] The LLM Engineers Handbook

Monday, January 20, 2025

Notes on GPUs and Spark

There are moves afoot to give Spark GPU access out of the box. From Project Hydrogen for Spark: "Although Spark supports [Kubernetes and YARN], Spark itself is not aware of GPUs exposed by them and hence Spark cannot properly request GPUs and schedule them for users. This leaves a critical gap to unify big data and AI workloads and make life simpler for end users."

To play with it, I tried to build Spark Rapids from NVidia. Unfortunately, the tests barfed with:

GpuDeviceManagerSuite:
*** RUN ABORTED ***
  java.lang.OutOfMemoryError: Could not allocate native memory: std::bad_alloc: out_of_memory: RMM failure at:/home/jenkins/agent/workspace/jenkins-spark-rapids-jni-release-10-cuda11/target/libcudf/cmake-build/_deps/rmm-src/include/rmm/mr/device/arena_memory_resource.hpp:181: Maximum pool size exceeded
  at ai.rapids.cudf.Rmm.allocInternal(Native Method)
  at ai.rapids.cudf.Rmm.alloc(Rmm.java:519)
  at ai.rapids.cudf.DeviceMemoryBuffer.allocate(DeviceMemoryBuffer.java:147)
  at ai.rapids.cudf.DeviceMemoryBuffer.allocate(DeviceMemoryBuffer.java:137)
  at com.nvidia.spark.rapids.GpuDeviceManagerSuite.$anonfun$new$4(GpuDeviceManagerSuite.scala:57)
  at com.nvidia.spark.rapids.GpuDeviceManagerSuite.$anonfun$new$4$adapted(GpuDeviceManagerSuite.scala:52)
  at com.nvidia.spark.rapids.TestUtils$.withGpuSparkSession(TestUtils.scala:139)
  at com.nvidia.spark.rapids.GpuDeviceManagerSuite.$anonfun$new$3(GpuDeviceManagerSuite.scala:52)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)

OK, so let's use some NVidia tools to find out what's going on. Running nvidia-smi yields:

| NVIDIA-SMI 470.256.02   Driver Version: 470.256.02   CUDA Version: 11.4     |

So running the NVidia profiler, nsight-sys, means I can only profile the CPU not the GPU since the driver is too old (I'm using Ubuntu 20).

It seems [SO] that the CUDA toolkit v12 has a minimum CC of 5 and my Quadro T2000 has a capability factor of 7.5 [NVIDIA] so I should be good if I had an upgraded driver. (Seems that v535 of the NVIDIA driver may have some issues, though).

But it was about this time I bought a new laptop (Lenovo Thinkpad P1 Gen 7 - awesome machine) which came with Ubuntu 22 preinstalled and most of the software I needed. However, nsight-sys was barfing.


The actual error was: Cannot mix incompatible Qt library (5.15.3) with this library (5.15.2)

So, I reinstalled it from the NVIDIA website and now run the command line:

/usr/local/NVIDIA-Nsight-Compute-2024.3/ncu-ui

to see the NSight Compute GUI. There seem to be a few people on the forums suggesting that installing the NVidia tools by hand lead to them being more reliable.

Anyway, I can now run this little handy script:

$ cat ~/bin/nvidia_prof 
/usr/local/NVIDIA-Nsight-Compute-2024.3/target/linux-desktop-glibc_2_11_3-x64/ncu --verbose --config-file off --export /tmp/nvidia.log --force-overwrite  $@

appending it with the code I want to run and I can view the output in ncu-ui and (hopefully) see where the problem is.

Another Java GPU library

It's worth noting that another library in Java that accesses the GPU but this time in a different way. DJL appears to use JNA to access the CUDA library. The interface is CudaLibrary that native implementation of which appears to be a thin wrapper around some Cuda code.

JNA eliminates the boilerplate of JNI. It dynamically maps Java method calls to native functions at runtime. Consequently, JNA has higher runtime overhead compared to JNI because it uses reflection and runtime mapping.

Azure real estate

Azure Data Factory

Think Airflow moving data around and has connectors to enable ingest/egress. This graphic oozes details.
"The main engine that is responsible for the running of Data Factory is called the integration runtime." [3]

Azure Pipelines

CI/CD in the Azure world. 

External Tables

Rather than a bulk insert, Azure SQL can point to a blob store and use the data there as if it were a DB table. Which strategy you use when the underlying blob storage is update appears to be configurable. You do this with something like "CREATE EXTERNAL TABLE ... WITH (LOCATION = ...

"This specifies where Synapse [see below] should load the data files (CSV files) from, and the column delimiters used in the CSV." [1]

This appears to be only available on Azure SQL not SQL Server 2016 according to this SO (nor 2022 if my Docker container is anything to go by). Apparently, PolyBase [SO] is needed.  

The big downsides of external tables include lack of indexes, referential integrity and performance.

PolyBase

"PolyBase enables your SQL Server instance to query data with T-SQL directly from SQL Server, Oracle, Teradata, MongoDB, Hadoop clusters, Cosmos DB, and S3-compatible object storage without separately installing client connection software" [Microsoft] This " is to allow the data to stay in its original location and format"

Synapse
"Azure Synapse Analytics enables you to use SQL and Spark technologies to analyze big data, offers a Data Explorer for log and time series data analytics, and can persist data in its native data warehouse." [1]
"Azure Synapse provides more analytic storage."[2]
"Synapse prefers an ELT (extract, load, transform) process for data ingestion over an ETL (extract, transform, load) process... the process basically moves data into Synapse and “loads” the data before the transforms happen." [2]

Think of Synapse as Azure's Hive.

Managed Instances

"Because of work seamlessly. its design, Azure SQL Managed Instance provides many more features that provide parity with SQL Server and yet provides the benefits of a fully managed service." [4]

[1] Azure Cookbook [O'Reilly]
[2] Architecting IoT Solutions on Azure [O'Reilly]
[3] Azure for Architects [O'Reilly]
[4] The Developers Guide to Azure [Microsoft]

Friday, January 10, 2025

The New Logging

Twenty years ago, everybody was writing their own logging framework. Today, when distributed computing is the norm, distributed logging is a necessity.

Functional Programmer really comes into its own in such an environment. So, it's no surprise that the FP tools already support distributed logging. In the Cats ecosystem, there is Natchez. This can feed into distributed tracing system like the open source Jaeger, a Go application that can store the data in Elastic or Cassandra. Zipkin is another if you prefer a Java implementation. 

[Regarding Natchez "unless you have a specific reason to want to use Natchez, you may want to look at otel4s instead. I think the community will be migrating to otel4s once it is binary-stable (Natchez is great and still works well but if you're starting fresh you may as well use the library that implements the industry standard for tracing)" - Discord]

MDC

First, some terminology.

"Mapped Diagnostic Context is to provide a way to enrich log messages with pieces of information that could be not available in the scope where the logging actually occurs, but that can be indeed useful to better track the execution of the program." [Baeldung]

Aside: don't use this mechanism in your business logic code.
Fabio Labella @SystemFw Jan 13 17:20
I don't know how many details I can give, but basically it got to the point that C-level executives knew the word "ThreadLocal", which is really bad. But basically someone had built an entire internal framework based essentially on MDC for a lot of business logic in a multitentant environment (falling down the slippery slope that it was "context"), and that at some point there were a crap load of race conditions due to ThreadLocal + asynchrony, risking crosstalk, which in a financial institution can well mean your whole company gets shut down
Spans and Kernels

"The usual tracing approach involves threading spans (aka the context) throughout the application we wish to instrument. On the other hand, distributed tracing requires a so-called kernel to be able to continue the previous tracing span." - Functional Event Driven Architecture, Volpe.

Cats

Within an effects engine, ThreadLocal becomes the wrong tool. You can use Task Local in Monix to create MDC functionality.
Gerry Fletcher @gerryfletch Jan 13 17:17
It's purely to add the request id into every log line
Fabio Labella @SystemFw Jan 13 17:17
log4cats has a withContext that lets you do that without relying on state
In "Practical FP in Scala", Gabriel Volpe says:
Normally, people use the Slf4j implementation, which is created as shown below. 
implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]
In our application, we are going to use Log4cats , which is by now the standard logging library of the Cats ecosystem.Whenever we need logging capability, all we need is to add a Logger constraint to our effect type. For instance:

def program[F[_]: Apply: Logger]: F[Unit] =
  Logger[F].info("starting program") *> doSomething
The idea is that your code logs as normal but transparently these events are sent to some aggregator. What's more, the event are contextual - they might take place over several nodes in a cluster.

The FP magic in the above Logger[F].info("... code is that there is an implicit Logger in the ether (this code's enclosing scope will define [F[_]: Logger: ...). This Logger might just write to disk or might send the message to an external system. This calling code doesn't care. The relevant Logger is merely summoned.

Regarding architecture:
Christopher Davenport @ChristopherDavenport Jan 13 17:49
So I can write middlewares on my logger and clients that enhance them with aspects about the user and then I can queue them into my in memory queue for when its something thats decoupled from response cycle. I keep all the logging across the entire stack, even after submission to a queue as a result.
Including things like request-id, user-information, and can continue to propogate that information to other services for tracing reasons.

Theres also a lifter that can lift a client into a Kleisli in http4s, so if you do that before the rest of your app you can still work in a fully abstract F, just pass in the client.

I extract and enhance my loggers at the point I know user identity and then pass those around with a ton of enhanced information to make debugging simple.
The very nice Gabriel Volpe test trading application creates using docker-compose some microservices to demonstrate distributed logging. However, to get the full use out of it, you need to set up a HoneyComb account to view them properly. (You also need to fudge the classpath if you want to Feed the cluster with some fake trades as it needs access to modules/domain/jvm/target/test-classes).