Saturday, April 6, 2024

When adding more CPUs does not help distressed CPUs

This is an interesting problem on Discourse where the symptoms belie the cause. Here, a very beefy Spark cluster is taking a long time process (admittedly) a large amount of data. However, it's the CPUs that are getting hammered. 

Insanely high CPU usage

The temptation at this point is to add more CPU resources but this won't help much.

When your Spark jobs that are not computationally intensive are using large amounts of CPU, there's an obvious suspect. Let's check time spent in Garbage Collection:


Insanely large GC Times

Shuffle per worker seems modest but look at those GC Times. In a five hours job, nearly two hours is spent just garbage collecting. 

And this is something of a surprise to people new to Spark. Sure, it delivers on its promise to process more data than can fit in memory but if you want it to be performant, you need to give it as much memory as possible.  

Friday, April 5, 2024

Network Adventures in Azure Databricks

My Azure Databricks cluster could not see one of my Blob containers although it could see others in the same subscription. The error in Databricks looked something like this: 

ExecutionError: An error occurred while calling o380.ls.
: Status code: -1 error code: null error message: java.net.SocketTimeoutException: connect timed outjava.net.SocketTimeoutException: connect timed out
        at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:423)
        at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.completeExecute(AbfsRestOperation.java:274)
        at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.lambda$execute$0(AbfsRestOperation.java:214)
        at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation(IOStatisticsBinding.java:464)
...

My first suspicion was that because they were in different resource groups, this could explain things.

Resource groups
"Resource groups are units of deployment in ARM [Azure Resource Manager]. 
"They are containers grouping multiple resource instances in a security and management boundary. 
"A resource group is uniquely named in a subscription. 
"Resources can be provisioned on different Azure regions and yet belong to the same resource group. 
"Resource groups provide additional services to all the resources within them. Resource groups provide metadata services, such as tagging, which enables the categorization of resources; the policy-based management of resources; RBAC; the protection of resources from accidental deletion or updates; and more... 
"They have a security boundary, and users that don't have access to a resource group cannot access resources contained within it.  Every resource instance needs to be part of a resource group; otherwise, it cannot be deployed." [Azure for Architects]
That last paragraph is interesting because I can access the container I want via the Azure portal. So, a friendly sysadmin suggested this was barking up the wrong tree and instead looked at:

Virtual Networks
"A VNet is required to host a virtual machine. It provides a secure communication mechanism between Azure resources so that they can connect to each other. 
"The VNets provide internal IP addresses to the resources, facilitate access and connectivity to other resources (including virtual machines on the same virtual network), route requests, and provide connectivity to other networks. 
"A virtual network is contained within a resource group and is hosted within a region, for example, West Europe. It cannot span multiple regions but can span all datacenters within a region, which means we can span virtual networks across multiple Availability Zones in a region. For connectivity across regions, virtual networks can be connected using VNet-to-VNet connectivity." [Azure for Architects]
Nothing obvious here. Both Databricks and the container were on the same network. However, they weren't on the same subnet.

Network Security Groups
"Subnets provide isolation within a virtual network. They can also provide a security boundary. Network security groups (NSGs) can be associated with subnets, thereby restricting or allowing specific access to IP addresses and ports. Application components with separate security and accessibility requirements should be placed within separate subnets." [Azure for Architects]
And this proved to be the problem. Databricks and the container are on the same virtual network but not the same subnet and there was an NSG blocking communication between these subnets.

Note that changes can take a few minutes to propagate, sometimes faster but sometimes slower. My sysadmin says he has seen it take up to an hour.

AWS Real Estate

Just some notes I've made playing around with AWS real estate.

ECS
Amazon's offering that scales Docker containers. Whereas EC2 is simply a remote VM, ECS is a "logical grouping of EC2 machines" [SO]

Fargate
Is a serverless version of EC2 [SO].
 
Kinesis
A propriertary Amazon Kafka replacement. While Kafka writes data locally, Kinesis uses a quorum of shards.

MSK
Amazon also offers a hosted Kafka solution called MSK (Managed Streaming for Kafka). 

Lambda
Runs containers like Docker that exists for up to 15 minutes and whose storage is ephemeral.

Glue
A little like Hive. It has crawlers that are batch jobs that compile metadata, thus doing some of the job of Hive's metastore. In fact, you can delegate the meta store that Spark uses to use Glue as its backing store. See:

EMR
EMR is AWS's MapReduce tool on which we can run Spark. "You can configure Hive to use the AWS Glue Data Catalog as its metastore." [docs] If you want to run Spark locally but still take advantage of Glue, follow these instructions.

Athena
Athena is AWS's hosted Trino offering. You can make data in S3 buckets available to Athena by using Glue crawlers.

Step Functions
AWS's orchestration of different services within Amazon's cloud.

CodePipeline
...is AWS's CI/CD offering.

Databases
DynamoDB is a key/value store and Aurora is a distributed relational DB.

Sunday, March 24, 2024

Iceberg locks and catalogs

Although Hadoop Meta Store is used for most Spark implementations, it's not recommended for Iceberg. HMS does not support retries and deconflicting commits.

"HadoopCatalog has a number of drawbacks and we strongly discourage it for production use.  There are certain features like rename and drop that may not be safe depending on the storage layer and you may also require a lock manager to get atomic behavior.  JdbcCatalog is a much better alternative for the backing catalog." [Iceberg Slack]

Iceberg comes with a DynamoDb (AWS) implementation of the lock manager. Looking at the code, it appears that acquiring the lock uses an optimistic strategy. You can tell DynamoDB to put a row in the table iff it doesn't exist already. If it does, the underlying AWS library throws a  software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException. There's a test for this in the AWS module here. It needs an AWS account to run.

"This is necessary for a file system-based catalog to ensure atomic transaction in storages like S3 that do not provide file write mutual exclusion." [Iceberg docs] This is a sentiment echoed in this blog.

The issue is the rename, not the data transfer. "Each object transfer is atomic. That is, either a whole file is transferred, or none of it is. But the directory structure is not atomic and a failure can cause mv to fail mid-way." [AWS Questions]

In the old world of HDFS, Spark would write its output to a temporary directory then atomically rename that directory to that of the final destination. However, S3 is not a file system but a blob store and the notion of a directory is just that: notional. When we change a "directory's" name, all the files in a directory need to be renamed one-by-one and renaming all the files Spark outputs is not atomic in S3. Implementations that talk to their own file system must implement Hadoop's OutputCommitter and Spark will call these when preparing to write etc.

The only mention of the lock manager in "Apache Iceberg: The Definitive Guide" is:

"If you are using AWS Glue 3.0 with Iceberg 0.13.1, you must also set the additional configurations for using the Amazon DynamoDB lock manager to ensure atomic transactions. AWS Glue 4.0, on the other hand, uses optimistic locking by default."

which is a bit too cryptic for me apparently because Glue 4.0 has a different version of Iceberg that uses optimistic locking [Discourse].

Catalogs

Catalogs "allows [Iceberg] to ensure consistency with multiple readers and writers and discover what tables are available in the environment... the primary high level requirement for a catalog implementation to work as an Iceberg catalog is to map a table path (e.g., “db1.table1”) to the file path of the metadata file that has the table’s current state."

The Catalogs are:

  • Hadoop. Note that Hadoop is used loosely. "Note anytime you use a distributed file system (or something that looks like one) to store the current metadata pointer, the catalog used is actually called the 'hadoop' catalog." [1] The most important potential downside of this Catalog is "It requires the file system to provide a file/object rename operation that is atomic to prevent data loss when concurrent writes occur." And there are others. "A Hadoop catalog doesn’t need to connect to a Hive MetaStore, but can only be used with HDFS or similar file systems that support atomic rename. Concurrent writes with a Hadoop catalog are not safe with a local FS or S3." [Iceberg docs]
  • Hive. Apart from running an additional process (unlike the Hadoop catalog), "It requires the file system to provide a file/object rename operation that is atomic to prevent data loss when concurrent writes occur." [1]
  • AWS Glue. "Like the Hive catalog, it does not support multi-table transactions" [1]
  • Nessie gives a Git-like experience for data lakes but the two main disadvantages are that you must run the infrastructure yourself (like Hive) and it's not compatible with all engines.
  • REST is by nature simple, is implementation agnostic and "the REST catalog supports multi- table transactions"  "REST Catalog is actually a protocol with a client implementation in the library.  There are examples of how to adapt that protocol to different catalog backends (like HMS or JDBC)... The REST protocol allows for advanced features that other catalogs cannot support, but that doesn't mean all of those features will be available for every REST implementation" [Slack]
  • JDBC is near ubiquitous but "it doesn’t support multi-table transactions". "With JDBC the database does the locking, so no external lock manager is required" [Slack]

So, which should you use? From contributor, Daniel Weeks in Slack:

"If you're not using HMS currently, I would suggest going with JdbcCatalog, which you can also use directly or with a REST frontend... I would strongly suggest using JDBC Catalog unless there's something specific you need. HMS is built for hive and iceberg is not hive.  There is both a lot of completely and baggage that comes with hive.  For example, if you change the table scheme directly in hive, it does not change the schema in your iceberg table.  Same with setting table properties. JDBC is super lightweight and native to iceberg, so if you don't have hive, I would avoid using it.

"There are multiple projects that are starting to adopt REST and I expect that only to grow, but that doesn't mean you necessarily need it right now.  The main thing to think about is using multiple catalogs (not limit yourself to a single one). You can use JDBC directly now (most engines support it), but you can always add a REST frontend later.  They can co-exist and REST can even proxy to your JDBC backend"

[1] "Apache Iceberg: The Definitive Guide"

Saturday, March 9, 2024

Big Data and CPU Caches

I'd previously posted about how Spark's data frame schema is an optimization not an enforcement. If you look at Spark's code, schemas save checking whether something is null. That is all. 

Can this really make so much of a diffence? Surprisingly, omitting a null check can optimize your code by an order of magnitude. 

As ever, the devil is in the detail. A single null check is hardly likely to make a difference to your code. But when you are checking billions of times, you need to take it seriously. 

There is another dimension to this problem. If you're checking the same reference (or a small set of them) then you're probably going to be OK. But if you are null checking large numbers of references, this is where you're going to see performance degradation.

The reason is that a small number of references can live happily in your CPU cache. As this number grows, they're less likely to be cached and your code will force memory to be loaded from RAM into the CPU.

Modern CPUs cache data to avoid hitting RAM. My 2.40GHz Intel Xeon E-2286M has three levels of cache, each bigger (and slower) than the next:

$ sudo dmidecode -t cache  
Cache Information                       
Socket Designation: L1 Cache                     
Maximum Size: 512 kB                 
...                  
Socket Designation: L2 Cache                   
Maximum Size: 2048 kB                
...                      
Socket Designation: L3 Cache                      
Maximum Size: 16384 kB             

Consequently, the speed we can randomly access an array of 64-bit numbers depends on the size of the array. To demonstrate, here is some code that demonstrates. The results look like this:


Who would have thought little optimizations on big data can make such a huge difference?

Saturday, February 24, 2024

Home made Kubernetes cluster

When trying to run ArgoCD, I came across this problem that was stopping me from connecting. Using kubectl port-forward..., I was able to finally connect. But even then, if I ran:

$ kubectl get services --namespace argocd
NAME                                      TYPE           CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE
argocd-applicationset-controller          ClusterIP      10.98.20.142     <none>        7000/TCP,8080/TCP            19h
argocd-dex-server                         ClusterIP      10.109.252.231   <none>        5556/TCP,5557/TCP,5558/TCP   19h
argocd-metrics                            ClusterIP      10.106.130.22    <none>        8082/TCP                     19h
argocd-notifications-controller-metrics   ClusterIP      10.109.57.97     <none>        9001/TCP                     19h
argocd-redis                              ClusterIP      10.100.158.58    <none>        6379/TCP                     19h
argocd-repo-server                        ClusterIP      10.111.224.112   <none>        8081/TCP,8084/TCP            19h
argocd-server                             LoadBalancer   10.102.214.179   <pending>     80:30081/TCP,443:30838/TCP   19h
argocd-server-metrics                     ClusterIP      10.96.213.240    <none>        8083/TCP                     19h

Why was my EXTERNAL-IP still pending? It appears that this is a natural consequence of running my K8s cluster in Minikube [SO].

So, I decided to build my own Kubernetes cluster. This step-by-step guide proved really useful. I built a small cluster of 2 nodes on heterogeneous hardware. Note that although you can use different OSs and hardware, you really need to use the same version of K8s on all boxes (see this SO).

$ kubectl get nodes -o wide
NAME    STATUS   ROLES           AGE   VERSION   INTERNAL-IP     EXTERNAL-IP   OS-IMAGE             KERNEL-VERSION      CONTAINER-RUNTIME
adele   Ready    <none>          18h   v1.28.2   192.168.1.177   <none>        Ubuntu 18.04.6 LTS   5.4.0-150-generic   containerd://1.6.21
nuc     Ready    control-plane   18h   v1.28.2   192.168.1.148   <none>        Ubuntu 22.04.4 LTS   6.5.0-18-generic    containerd://1.7.2

Great! However, Flannel did not seem to be working properly:

$ kubectl get pods --namespace kube-flannel -o wide 
NAME                    READY   STATUS             RESTARTS         AGE    IP              NODE    NOMINATED NODE   READINESS GATES
kube-flannel-ds-4g8gg   0/1     CrashLoopBackOff   34 (2m53s ago)   152m   192.168.1.148   nuc     <none>           <none>
kube-flannel-ds-r4xvt   0/1     CrashLoopBackOff   26 (3m11s ago)   112m   192.168.1.177   adele   <none>           <none>

And journalctl -fu kubelet was puking  "Error syncing pod, skipping" messages.

Aside: Flannel is a container on each node that coordinates the segmentation of the virtual network. For coordination, it can use etcd, which can be thought of like Zookeeper in the Java ecosystem. "Flannel does not control how containers are networked to the host, only how the traffic is transported between hosts." [GitHub]

The guide seemed to omit one detail that lead to me to see the Flannel container puking something like this error:

E0427 06:08:23.685930 13405 memcache.go:265] couldn’t get current server API group list: Get “https://X.X.X.X:6443/api?timeout=32s 2”: dial tcp X.X.X.X:6443: connect: connection refused

Following this SO answer revealed that the cluster's CIDR had not been set. So, I patched it following this [SO] advice so:

kubectl patch node nuc -p '{"spec":{"podCIDR":"10.244.0.0/16"}}'
kubectl patch node adele -p '{"spec":{"podCIDR":"10.244.0.0/16"}}'

which will work until the next reboot (one of the SO answers describes how to make that permanent as does this one).

Anyway, this was the puppy and now the cluster seems to be behaving well.

Incidentally, this gives a lot of log goodies:

kubectl cluster-info dump

Thursday, February 15, 2024

Spark and Schemas

I helped somebody on Discord with a tricksy problem. S/he was using a Python UDF in PySpark and seeing NullPointerExceptions. This suggests a Java problem as the Python error message for an NPE looks more like "AttributeError: 'NoneType' object has no attribute ..." But why would Python code cause Spark to throw an NPE?

The problem was the UDF was defining a returnType struct that stated a StructField was not nullable.


The line charge_type.lower (highlighted) was a red herring as they had clearly changed more than one thing when experimenting (always change one thing at a time!)

Note that Spark regards the nullable field as advisory only.
When you define a schema where all columns are declared to not have null values , Spark will not enforce that and will happily let null values into that column. The nullable signal is simply to help Spark SQL optimize for handling that column.
- Spark, The Definitive Guide
And the reason is in this code where Spark is generating bespoke code. If nullable is false, it does not check the reference unnecessarily. But if there reference is null, Spark barfs like so:

Caused by: java.lang.NullPointerException
        at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:110)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_2$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$11(EvalPythonExec.scala:148)

So, the Python returned without an NPE but caused the JVM code to error as the struct it returns contains nulls when it said it wouldn't.