Wednesday, November 27, 2024

Cancel culture

Cancellation is a thorny issue in software - especially when we use containers as they're constantly being killed. 

First, let's look at how some Scala frameworks handle it. This Discord chat is about where the boundaries lie in Cats. Basically, if code running flatMaps that are chained together is cancelled, the next flatMap may be executed (you can see that with a println) but the IO created in it is not.

We're going to SIGTERM this code in CatsCleanup:

  val resource = Resource.make(IO.println("acquire"))(_ => IO.println("release"))

  def run: IO[Unit] = for {
    _ <- resource.use(_ => IO.readLine)
  } yield {
    ()

with 

kill -SIGTERM $(jps | grep CatsCleanup | awk '{print $1}')

In 3.5.4 the Resource is released and the output looks like:

acquire
release

Process finished with exit code 143 (interrupted by signal 15:SIGTERM)

(The exit code when killing a process is 128 + the sigterm code. You can see the last exit code of a process in a Unix-like system with echo $?).

The equivalent code in ZIO (2.0.21):

  val release = ZIO.logInfo("release")

  val resource = ZIO.acquireRelease(ZIO.logInfo("acquire"))(_ => {

      release
  })

  override def run: ZIO[Any & ZIOAppArgs & Scope, Any, Any] = for {
    _ <- resource
    _ <-zio.Console.readLine("press a key")
  } yield ()

does nothing.

Why it's important

With many workloads moving to, say, Kubernetes, cancellation comes with the territory. 
"What happens when a pod starts up, and what happens when a pod shuts down? 
"When a pod starts in a rolling deployment without the readiness probe configured ... the pod starts receiving traffic even though the pod is not ready. The absence of a readiness probe makes the application unstable. ... 

The problem is"it takes more time to update the iptables rules than for the containers to be terminated by the Kubelet... The Kubelet immediately sends a SIGTERM signal to the container, and the endpoints controller sends a request back to the API server for the pod endpoints to be removed from all service objects... Due to the difference in task completion time, Services still route traffic to the endpoints of the terminating pods

[The solution involves] "adding a preStop hook to the deployment configuration. Before the container shuts down completely, we will configure the container to wait for 20 seconds. It is a synchronous action, which means the container will only shut down when this wait time is complete". 

This gives your application time to clean itself up.


Monday, November 11, 2024

Databricks in Azure

The Databricks plugin for VSCode is actually quite nice. Once installed, you can login via your IDE:


and Sign in to Databricks workspace using profile in your ~/.databrickscfg file. For my windows VM, this is just a host string pointing at my cluster and a auth_type of databricks-cli. Signing in opens a browser and I just authorise my session.

Now, the nice thing is that the code I develop in VSCode locally can be run remotely on the cluster by choosing Run File as Workflow


This uploads all my code into a Databricks cluster and I can see the job running clicking the link:
which opens my browser to the job's console.

Interestingly. trying to read via a notebook a Spark Dataframe that had been persisted as Parquet from this job lead to an obscure error. Apparently, I have to run:

spark.conf.set("spark.databricks.io.cache.enabled", "false")

Databricks as part of the Pipeline

Azure Data Factory is a little like Airflow but trying to do something clever with it (eg, use a CLI) is difficult. We found that having it invoke a Databricks notebook allowed us to do anything a little more sophisticated.

Note that you'll have to create a Databricks Linked Service to allow your ADF pipeline access to the notebook. These Microsoft docs proved helpful.

If you want the notebook to read from a parameter ADF passes to it, use something like this:

def read_param(key: str) -> str:
    dbutils.widgets.text(key, "")
    parameter = dbutils.widgets.get(key)
    return parameter

where key corresponds to one of the Base Parameters in the Notebook ADF activity. Note that there is a 10k character limit on these arguments [SO].

If you want the notebook to return something back to the ADF flow, have it terminate with:

dbutils.notebook.exit(YOUR_VALUE_HERE)

[See this SO.]

Connecting Databricks to a Database

If you want to connect a Databricks notebook to a cloud-based SQL Server instance, you can run:

%pip install adal
%pip install openpyxl

servername = "YOUR_MACHINE_NAME"
jdbcHostname = f"{servername}.database.windows.net"
jdbcDatabase = "YOUR_DB" 
driver = "com.microsoft.sqlserver.jdbc.spark"
url = f"jdbc:sqlserver://{jdbcHostname};databaseName={jdbcDatabase}"

as a one off, then:

import adal 
authority = f"https://login.microsoftonline.com/common"
context = adal.AuthenticationContext(authority)
jdbcPort = 1433
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)

client_id = "YOUR_CLIENT_ID" # Client ID for Azure Cloud Shell (you do not have to replace this)
resource_app_id_url = "https://database.windows.net/" 

code = context.acquire_user_code(resource_app_id_url, client_id)
token = context.acquire_token_with_device_code(resource_app_id_url, code, client_id)
access_token = token["accessToken"]

Now you can use Spark to read the DB with the help of this token:

from pyspark.sql import SQLContext

jdbc_db_practice_mapping = (
        spark.read
        .format("com.microsoft.sqlserver.jdbc.spark")
        .option("url", url)
        .option("dbtable", "RightCare_ref.practice_CCG_quarter_lookup")
        .option("accessToken", access_token)
        .option("encrypt", "true")
        .option("hostNameInCertificate", "*.database.windows.net")
        .load()
)

Et voila. You're reading from SQL Server.