Thursday, October 23, 2025

Exactly Once Semantics in Iceberg/Kafka

Iceberg has a Kafka Connect component that ensures exactly one semantics despite there being two transactions for the two systems (Kafka and Iceberg) per write.

So, how does is this done? The answer is simple: idemptotency. This code here that prepares the data to be committed but by delegating to this code in MergingSnapshotProducer, it ensures there are no dupes. 

Note the Coordinator commits metadata to Iceberg.

The Coordinator and the Worker dance a tango. The steps go like this: 

The Worker writes Iceberg data to storage. Having saved its SinkRecords to its RecordWriter, it then goes on to processControlEvents.

This involves polling the internal topic for Kafka ConsumerRecords. If this worker is the in the consumer group, and the event is a START_COMMIT, it sends to its internal topic all the ContentFiles that it was responsible for writing, wrapped in a DataWritten object. It also sends a DataComplete object with these Events all as part of a single Kafka transaction.

In turn, if the Coordinator receives a DataComplete object, it calls the code that idempotently writes to Iceberg mentioned above within Iceberg transactions. That is, if the ContentFiles wrapped in the DataWritten object are already in the metadata, they are essentially ignored.

The Coordinator can also trigger a commit if it deems it to have timed out.

The key thing is the worker only acknowledges the SinkRecords it read from the external Kafka topic as part of the same transaction it uses to send the Events to its internal Kafka topic. That way, if the worker crashes after writing to Iceberg storage, those SinkRecords will be read again from Kafka and written again to Iceberg storage. However, the Kafka metadata will be updated exactly once - at the potential cost of some orphan files, it seems.

[In addition, Coordinator sends a CommitToTable and CommitComplete objects to its internal Kafka topic in an unrelated transaction. This appears to be for completeness as I can't see what purpose it serves.]

It's the Coordinator that, in a loop, sent a StartCommit object onto the internal Kafka topic in the first place. It only does this having deemed the commit ready (see previous blog post).

And so the cycle is complete.

Monday, October 20, 2025

Spark and K8s on AWS

This is a high-level overview of my work creating a Spark cluster running in a managed AWS Kubernetes cluster (EKS) giving Spark the permissions to write to cross cloud storage. I might write further 

EKS

Create a Kubernetes cluster with:

eksctl create cluster --name spark-cluster --nodes 3

Note that this can take about 15 minutes.

Note that you can have several K8s contexts on your laptop. You can see them with:

kubectl config get-contexts

and choose one with:

kubectl config use-context <your-cluster-context-name>

then you can run kubectl get pods -A and see K8s in the cloud.

A couple of caveats: I had to install aws-iam-authenticator by building it from scratch. This also required me to install GoLang. Binaries are installed in ~/go/bin.

Authorization

The hard thing about setting up a Kubernetes cluster in AWS is configuring permissions. This guide helped me but it was strictly the section Option 2: Using EKS Pod Identity that helped me. 

Basically, you have to configure a bridge between the K8s cluster and AWS. This is done through through a CSI (Container Storage Interface), a standard K8s mechanism for storage but in this case it's a means for storing secrets.

The high-level recipe for using secrets is:
  1. Create the secret with aws secretsmanager create-secret ... 
  2. Create a policy with aws iam create-policy... This reference the secret's ARNs in step 1.
  3. Create a role with aws iam create-role... that allows a role to be assumed via STS.
  4. Attach the policy in step 2 with the role in step 3 with aws iam attach-role-policy... 
  5. Create a SecretProviderClass with kubectl apply -f...  that references the secrets created in step 1
  6. Associate your K8s Deployment with the SecretProviderClass using its volumes.
Thanks to the installation of the CSI addons, secrets can be mounted in a Polaris container that are then used to vend credentials to Spark.

Diagnosing Spark in the Cluster

Connecting to Spark from my laptop produced these logs:

25/09/23 18:15:58 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://a457XXX.eu-west-2.elb.amazonaws.com:7077...
25/09/23 18:15:58 INFO TransportClientFactory: Successfully created connection to a457XXX.eu-west-2.elb.amazonaws.com/3.9.78.227:7077 after 26 ms (0 ms spent in bootstraps)
25/09/23 18:16:18 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://a457XXX.eu-west-2.elb.amazonaws.com:7077...
25/09/23 18:16:38 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://a457XXX.eu-west-2.elb.amazonaws.com:7077...

where the connections were timing out. So I logged in with:

kubectl exec -it prod-master-0 -- /bin/bash

and looked at the actual logs under /opt/spark/ which were far more illuminating.

Spark Kubernetes Operator

I used the Spark operator to configure a Spark cluster for me. However, examples/prod-cluster-with-three-workers.yaml seemed to be out of synch with the CRDs installed by Helm(?). The apiVersion seems to need to be:

apiVersion: spark.apache.org/v1alpha1

This change meant I could then start my Spark cluster.

Aside: to restart the whole cluster, use:

kubectl delete -f spark_cluster.yaml
kubectl apply  -f spark_cluster.yaml

AWS CLI annoyances

Running the aws command rather annoyingly adds control characters. I spent a frustrated hour wondering why the exact text rendered from its output could not be used as input for another command. 

To remove the invisible control characters, run something like:

aws secretsmanager list-secrets --query "SecretList[?Name=='CloudDetails'].ARN" --output text --no-cli-pager | tr -d '[:cntrl:]' | tr -d '\n' 

where in this example, I trying to find the ARN of a particular secret.

Wednesday, October 15, 2025

Configuring Polaris Part 1

To vend credentials, Polaris needs an AWS (or other cloud provider) account. But what if you want to talk to several AWS accounts? Well this ticket suggests an interesting workaround. It's saying "yeah, use just one AWS account but if you need to use others, set up a role that allows access to other AWS accounts, accounts outside the one that role lives in."

We are working in a cross cloud environment. We talk to not just AWS but GCP and Azure clouds. We happen to host Polaris in AWS but this choice was arbitrary. We can give Polaris the ability to vend credentials for all clouds no matter where it sits.

Integration with Spark

It's the spark.sql.catalog.YOUR_CATALOG.warehouse SparkConf value that identifies the Polaris catalog.

The YOUR_CATALOG defines the namespace. In fact, the top level value, spark.sql.catalog.YOUR_CATALOG, tells Spark which catalog to use (Hive, Polaris, etc).

So, basically, your config should look something like:

spark.sql.catalog.azure.oauth2.token                                            POLARIS_ACCESS_TOKEN
spark.sql.catalog.azure.client_secret                                                         s3cr3t
spark.sql.catalog.azure.uri                                        http://localhost:8181/api/catalog
spark.sql.catalog.azure.token                                                  POLARIS_ACCESS_TOKEN
spark.sql.catalog.azure.type                                                                    rest
spark.sql.catalog.azure.scope                                                     PRINCIPAL_ROLE:ALL
spark.sql.catalog.azure.client_id                                                               root
spark.sql.catalog.azure.warehouse                                                              azure
spark.sql.catalog.azure.header.X-Iceberg-Access-Delegation                        vended-credentials
spark.sql.catalog.azure.credential                                                       root:s3cr3t
spark.sql.catalog.azure.cache-enabled                                                          false
spark.sql.catalog.azure.rest.auth.oauth2.scope                                    PRINCIPAL_ROLE:ALL
spark.sql.catalog.azure                                        org.apache.iceberg.spark.SparkCatalog 

This is the config specific to my Azure catalog. AWS and GCP would have very similar config.

One small issue [GitHub] is that I needed the Iceberg runtime lib to be the first in the Maven dependencies.  

Local Debugging

Put:

"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005",

in build.gradle.kts in the 

tasks.named<QuarkusRun>("quarkusRun") {
  jvmArgs =
    listOf(

section then run with:

./gradlew --stop && ./gradlew run

then you'll then be able to remotely debug by attaching to port 5005.

Configuring Polaris in a remote environment

Note that Polaris heavily uses Quarkus. "Quarkus aggregates configuration properties from multiple sources, applying them in a specific order of precedence." [docs]. First, java -D... properties, environment variables, application.properties (first on the local filepath then in the dependencies) and finally the hard-coded values.

Polaris in integration tests

Naturally, you're going to want to write a suite of regression tests. This is where the wonderful TestContainers shines. You can fire up a Docker container of Polaris in Java code.

There are some configuration issues. AWS and Azure are easy to configure within Polaris. You must just pass them the credentials as environment variables. GCP is a little harder as it's expecting a file of JSON containing its credentials (the Application Default Credentials file). Fortunately, TestContainers allows you to copy that file over once the container has started running.

          myContainer = new GenericContainer<>("apache/polaris:1.1.0-incubating")
                    // AWS
                    .withEnv("AWS_ACCESS_KEY_ID",     AWS_ACCESS_KEY_ID)
                    .withEnv("AWS_SECRET_ACCESS_KEY", AWS_SECRET_ACCESS_KEY)
                    // Azure
                    .withEnv("AZURE_CLIENT_SECRET", AZURE_CLIENT_SECRET)
                    .withEnv("AZURE_CLIENT_ID",     AZURE_CLIENT_ID)
                    .withEnv("AZURE_TENANT_ID",     AZURE_TENANT_ID)
                    // Polaris
                    .withEnv("POLARIS_ID",     POLARIS_ID)
                    .withEnv("POLARIS_SECRET", POLARIS_SECRET)
                    .withEnv("POLARIS_BOOTSTRAP_CREDENTIALS", format("POLARIS,%s,%s", POLARIS_IDPOLARIS_SECRET))
                    // GCP
                    .withEnv("GOOGLE_APPLICATION_CREDENTIALS", GOOGLE_FILE)
                    .waitingFor(Wait.forHttp("/q/health").forPort(8182).forStatusCode(200));
            ;
            myContainer.setPortBindings(List.of("8181:8181", "8182:8182"));
            myContainer.start();
            myContainer.copyFileToContainer(Transferable.of(googleCreds.getBytes()), GOOGLE_FILE);

The other thing you want for a reliable suite of tests is to wait until Polaris starts. Fortunately, Polaris is cloud native and offers a health endpoint which TestContainers can poll.

Polaris in EKS

I found I had to mix both AWS's own library (software.amazon.awssdk:eks:2.34.6) with the official Kubernetes library (io.kubernetes:client-java:24.0.0) before I could interrogate the Kubernetes cluster in AWS from my laptop and look at the logs of the Polaris container. 

        EksClient eksClient = EksClient.builder()
                                       .region(REGION)
                                       .credentialsProvider(DefaultCredentialsProvider.create())
                                       .build();

        DescribeClusterResponse clusterInfo = eksClient.describeCluster(
                DescribeClusterRequest.builder().name(clusterName).build());

        AWSCredentials awsCredentials = new BasicAWSCredentials(
                AWS_ACCESS_KEY_ID,
                AWS_SECRET_ACCESS_KEY);
        var authentication = new EKSAuthentication(new STSSessionCredentialsProvider(awsCredentials),
                                                   region.toString(),
                                                   clusterName);

        ApiClient client = new ClientBuilder()
                .setBasePath(clusterInfo.cluster().endpoint())
                .setAuthentication(authentication)
                .setVerifyingSsl(true)
                .setCertificateAuthority(Base64.getDecoder().decode(clusterInfo.cluster().certificateAuthority().data()))
                .build();
        Configuration.setDefaultApiClient(client);

Now you'll be able to query and monitor Polaris from outside AWS's Kubernetes offering, EKS.