tag:blogger.com,1999:blog-3724568676280197732024-03-18T03:27:54.563-07:00Agile Java ManMusings on Data Science, Software Architecture, Functional Programming and whatnot.Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.comBlogger484125tag:blogger.com,1999:blog-372456867628019773.post-33554011021104520002024-03-09T02:42:00.000-08:002024-03-09T02:45:35.982-08:00Big Data and CPU Caches I'd <a href="https://javaagile.blogspot.com/2024/02/spark-and-schemas.html">previously posted</a> about how Spark's data frame schema is an optimization not an enforcement. If you look at Spark's code, schemas save checking whether something is null. That is all. <div><br /></div><div>Can this really make so much of a diffence? Surprisingly, omitting a null check can optimize your code by an order of magnitude. </div><div><br /></div><div>As ever, the devil is in the detail. A single null check is hardly likely to make a difference to your code. But when you are checking billions of times, you need to take it seriously. </div><div><br /></div><div>There is another dimension to this problem. If you're checking the same reference (or a small set of them) then you're probably going to be OK. But if you are null checking large numbers of references, this is where you're going to see performance degradation.</div><div><br /></div><div>The reason is that a small number of references can live happily in your CPU cache. As this number grows, they're less likely to be cached and your code will force memory to be loaded from RAM into the CPU.</div><div><br /></div><div>Modern CPUs cache data to avoid hitting RAM. My 2.40GHz Intel Xeon E-2286M has three levels of cache, each bigger (and slower) than the next:<br /><br /><span style="font-family: courier; font-size: xx-small;">$ sudo dmidecode -t cache </span></div><div><div><span style="font-family: courier; font-size: xx-small;">Cache Information </span></div><div><span style="font-family: courier; font-size: xx-small;"><span style="white-space: normal;"><span style="white-space: pre;"> </span>Socket Designation: L1 Cache </span> </span></div><div><span style="white-space: normal;"><span style="font-family: courier; font-size: xx-small;"><span style="white-space: pre;"> </span>Maximum Size: 512 kB </span></span></div><div><span style="font-family: courier; font-size: xx-small;">... </span></div><div><span style="font-family: courier; font-size: xx-small;"><span style="white-space: normal;"><span style="white-space: pre;"> </span>Socket Designation: L2 Cache </span> </span></div><div><span style="white-space: normal;"><span style="font-family: courier; font-size: xx-small;"><span style="white-space: pre;"> </span>Maximum Size: 2048 kB </span></span></div><div><span style="font-family: courier; font-size: xx-small;">... </span></div><div><span style="font-family: courier; font-size: xx-small;"><span style="white-space: normal;"><span style="white-space: pre;"> </span>Socket Designation: L3 Cache </span> </span></div><div><span style="font-family: courier; font-size: xx-small;"><span style="white-space: normal;"><span style="white-space: pre;"> </span></span>Maximum</span><span style="white-space: normal;"><span style="font-family: courier; font-size: xx-small;"> Size: 16384 kB </span> </span></div><div><br /></div></div><div>Consequently, the speed we can randomly access an array of 64-bit numbers depends on the size of the array. To demonstrate, <a href="https://github.com/PhillHenry/JavaPlayground/blob/main/benchmarking/src/test/java/uk/co/odinconsultants/memory/JMH_CacheHits.java">here</a> is some code that demonstrates. The results look like this:</div><div><br /></div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjSIuo9geATQgyiQ_cc8cAqNM99yZvcQ_omk4h78QFEOoFjOt7TtQVGaHqwhnCw_u3CkT3ugq0VLXBW9IQ0J4z0FZUcYz0_vNErwBSrg5SLWUd7wgjuFc11ccutxB0JRDqV2iI5DUVyxruwYb0vsJoA4nj4IzxPKDwVbH8f_tdRAkegn9AJR49hJcb9XAoO/s640/cpu_cache_timings.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="480" data-original-width="640" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjSIuo9geATQgyiQ_cc8cAqNM99yZvcQ_omk4h78QFEOoFjOt7TtQVGaHqwhnCw_u3CkT3ugq0VLXBW9IQ0J4z0FZUcYz0_vNErwBSrg5SLWUd7wgjuFc11ccutxB0JRDqV2iI5DUVyxruwYb0vsJoA4nj4IzxPKDwVbH8f_tdRAkegn9AJR49hJcb9XAoO/s16000/cpu_cache_timings.png" /></a></div><br /><div>Who would have thought little optimizations on big data can make such a huge difference?</div><div><br /></div>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-69803311405932221962024-02-24T02:08:00.000-08:002024-02-24T02:08:40.120-08:00Home made Kubernetes cluster<p>When trying to run ArgoCD, I came across <a href="https://github.com/argoproj/argo-cd/issues/11783">this</a> problem that was stopping me from connecting. Using <span style="font-family: courier; font-size: xx-small;">kubectl port-forward...</span>, I was able to finally connect. But even then, if I ran:</p><p><span style="font-family: courier; font-size: xx-small;">$ kubectl get services --namespace argocd<br />NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE<br />argocd-applicationset-controller ClusterIP 10.98.20.142 <none> 7000/TCP,8080/TCP 19h<br />argocd-dex-server ClusterIP 10.109.252.231 <none> 5556/TCP,5557/TCP,5558/TCP 19h<br />argocd-metrics ClusterIP 10.106.130.22 <none> 8082/TCP 19h<br />argocd-notifications-controller-metrics ClusterIP 10.109.57.97 <none> 9001/TCP 19h<br />argocd-redis ClusterIP 10.100.158.58 <none> 6379/TCP 19h<br />argocd-repo-server ClusterIP 10.111.224.112 <none> 8081/TCP,8084/TCP 19h<br />argocd-server LoadBalancer 10.102.214.179 <b><pending></b> 80:30081/TCP,443:30838/TCP 19h<br />argocd-server-metrics ClusterIP 10.96.213.240 <none> 8083/TCP 19h</span></p><div>Why was my <span style="font-family: courier; font-size: x-small;">EXTERNAL-IP</span> still <b style="font-family: courier; font-size: x-small;">pending</b>? It appears that this is a natural consequence of running my K8s cluster in Minikube [<a href="https://stackoverflow.com/questions/44110876/kubernetes-service-external-ip-pending">SO</a>].</div><div><br /></div><div>So, I decided to build my own Kubernetes cluster. <a href="https://phoenixnap.com/kb/install-kubernetes-on-ubuntu">This</a> step-by-step guide proved really useful. I built a small cluster of 2 nodes on heterogeneous hardware. Note that although you can use different OSs and hardware, you really need to use the same version of K8s on all boxes (see this <a href="https://stackoverflow.com/questions/55767652/kubernetes-master-worker-node-kubeadm-join-issue">SO</a>).</div><div><br /></div><div><div><span style="font-family: courier; font-size: xx-small;">$ kubectl get nodes -o wide</span></div><div><span style="font-family: courier; font-size: xx-small;">NAME STATUS ROLES AGE VERSION INTERNAL-IP EXTERNAL-IP OS-IMAGE KERNEL-VERSION CONTAINER-RUNTIME</span></div><div><span style="font-family: courier; font-size: xx-small;"><b>adele</b> Ready <none> 18h v1.28.2 192.168.1.177 <none> Ubuntu 18.04.6 LTS 5.4.0-150-generic containerd://1.6.21</span></div><div><span style="font-family: courier; font-size: xx-small;"><b>nuc</b> Ready control-plane 18h v1.28.2 192.168.1.148 <none> Ubuntu 22.04.4 LTS 6.5.0-18-generic containerd://1.7.2</span></div></div><div><br /></div><div>Great! However, Flannel did not seem to be working properly:<br /><br /><div><span style="font-family: courier; font-size: xx-small;">$ kubectl get pods --namespace kube-flannel -o wide </span></div><div><span style="font-family: courier; font-size: xx-small;">NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES</span></div><div><span style="font-family: courier; font-size: xx-small;">kube-flannel-ds-4g8gg 0/1 CrashLoopBackOff 34 (2m53s ago) 152m 192.168.1.148 <b>nuc</b> <none> <none></span></div><div><span style="font-family: courier; font-size: xx-small;">kube-flannel-ds-r4xvt 0/1 CrashLoopBackOff 26 (3m11s ago) 112m 192.168.1.177 <b>adele</b> <none> <none></span></div></div><div><br /></div><div>And <span style="font-family: courier; font-size: xx-small;">journalctl -fu kubelet</span> was puking <span style="font-family: courier; font-size: xx-small;">"Error syncing pod, skipping"</span> messages.</div><div><br /></div><div>Aside: Flannel is a container on each node that coordinates the segmentation of the virtual network. For coordination, it can use etcd, which can be thought of like Zookeeper in the Java ecosystem. "Flannel does not control how containers are networked to the host, only how the traffic is transported between hosts." [<a href="https://github.com/flannel-io/flannel?tab=readme-ov-file">GitHub</a>]</div><div><br /></div><div>The guide seemed to omit one detail that lead to me to see the Flannel container puking something like this error:</div><div><br /></div><div><div><span style="font-family: courier; font-size: xx-small;">E0427 06:08:23.685930 13405 memcache.go:265] couldn’t get current server API group list: Get “https://X.X.X.X:6443/api?timeout=32s 2”: dial tcp X.X.X.X:6443: connect: connection refused</span></div></div><div><br /></div><div>Following <a href="https://stackoverflow.com/questions/50833616/kube-flannel-cant-get-cidr-although-podcidr-available-on-node">this</a> SO answer revealed that the cluster's CIDR had not been set. So, I patched it following <a href="https://stackoverflow.com/questions/52633215/kubernetes-worker-nodes-not-automatically-being-assigned-podcidr-on-kubeadm-join">this</a> [SO] advice so:<br /><br /><div><span style="font-family: courier; font-size: xx-small;">kubectl patch node <b>nuc</b> -p '{"spec":{"podCIDR":"10.244.0.0/16"}}'</span></div><div><span style="font-family: courier; font-size: xx-small;">kubectl patch node <b>adele</b> -p '{"spec":{"podCIDR":"10.244.0.0/16"}}'</span></div><div><br /></div>which will work until the next reboot (one of the SO answers describes how to make that permanent as does <a href="https://stackoverflow.com/questions/61811883/flannel-is-crashing-for-slave-node">this</a> one).</div><div><br />Anyway, this was the puppy and now the cluster seems to be behaving well.</div><div><br /></div><div>Incidentally, this gives a lot of log goodies:</div><div><p><span style="font-family: courier; font-size: xx-small;">kubectl cluster-info dump</span></p></div>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-6407297228737112192024-02-15T06:02:00.000-08:002024-02-15T06:02:33.668-08:00Spark and Schemas<p>I helped somebody on Discord with a tricksy <a href="https://discord.com/channels/566333122615181327/566334526540742656/1207678606122295329">problem</a>. S/he was using a Python UDF in PySpark and seeing <span style="font-family: courier; font-size: x-small;">NullPointerException</span>s. This suggests a Java problem as the Python error message for an NPE looks more like "<span style="font-family: courier; font-size: x-small;">AttributeError: 'NoneType' object has no attribute</span> ..." But why would Python code cause Spark to throw an NPE?</p><p>The problem was the UDF was defining a <span style="font-family: courier; font-size: x-small;">returnType</span> struct that stated a <span style="font-family: courier; font-size: x-small;">StructField</span> was not nullable.<br /><br /></p><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh3ceUQPlCqd1hBHg-r9gu4ajQAHbYxnL6nGdgN3lF2AWHd5TC252yb-M7OPHHntbCQrnL-sbk1SNLAFQ10ROP21V_9TNGqr_nAnXuDsuFmljvu0jqwdkL4oE27AYR2zc_MDjvg7WTH7lF78WXm83Be2w5wTubGuO6u6urRsXb5BoI2tTTSiHvI6ltt4m6v/s654/pyspark_udf.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="461" data-original-width="654" height="453" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh3ceUQPlCqd1hBHg-r9gu4ajQAHbYxnL6nGdgN3lF2AWHd5TC252yb-M7OPHHntbCQrnL-sbk1SNLAFQ10ROP21V_9TNGqr_nAnXuDsuFmljvu0jqwdkL4oE27AYR2zc_MDjvg7WTH7lF78WXm83Be2w5wTubGuO6u6urRsXb5BoI2tTTSiHvI6ltt4m6v/w640-h453/pyspark_udf.png" width="640" /></a></div><br /><div>The line <span style="font-family: courier; font-size: x-small;">charge_type.lower</span> (highlighted) was a red herring as they had clearly changed more than one thing when experimenting (always change <b>one thing at a time!</b>)<br /><br />Note that Spark regards the <span style="font-family: courier; font-size: small;">nullable</span> field as advisory only.</div><div><div></div><blockquote><div>When you define a schema where all columns are declared to not have null values , <b>Spark will not enforce</b> that and will happily let null values into that column. The nullable signal is simply to help Spark SQL optimize for handling that column.</div><div>- Spark, The Definitive Guide</div></blockquote><div></div><div>And the reason is in <a href="https://github.com/apache/spark/blob/edf4ac4b518d0d69f7012ff5c0f1428fe45412ba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala#L130">this</a> code where Spark is generating bespoke code. If <span style="font-family: courier; font-size: x-small;">nullable</span> is false, it does not check the reference unnecessarily. But if there reference is null, Spark barfs like so:<br /><br /><div><span style="font-family: courier; font-size: xx-small;">Caused by: java.lang.NullPointerException</span></div><div><span style="font-family: courier; font-size: xx-small;"> at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:110)</span></div><div><span style="font-family: courier; font-size: xx-small;"> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_2$(Unknown Source)</span></div><div><span style="font-family: courier; font-size: xx-small;"> at org.apache.spark.sql.catalyst.expressions.<b>GeneratedClass</b>$SpecificUnsafeProjection.apply(Unknown Source)</span></div><div><span style="font-family: courier; font-size: xx-small;"> at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$11(<b>EvalPythonExec</b>.scala:148)</span></div><div><br /></div><div>So, the Python returned without an NPE but caused the JVM code to error as the struct it returns contains nulls when it said it wouldn't.<br /><br /></div></div></div>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-26429295234284624942024-01-28T03:57:00.000-08:002024-01-28T03:57:33.767-08:00The Death of Data Locality?<div>Data locality is where the computation and the storage are on the same node. This means we don't need to move huge data sets around. But it's a pattern that has fallen out of fashion in recent years.</div><div><br /></div><div>With a lot of cloud offerings, we lose the data locality that made <a href="https://github.com/apache/hadoop/blob/b2fac14828b69c761858dd7cb9ab17313c28b161/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java#L277">Hadoop</a> such a great framework on which to run Spark some 10 years ago. The cloud providers counter this with a "just rent more nodes" argument. But if you have full control over your infra, say you're on prem, throwing away data locality is a huge waste.<div><br /></div><div>Just to recap, data locality gives you doubleplusgood efficiency. Not only does the network not take a hit (as it doesn't need to send huge amoungs of data from storage to compute nodes) but we retain OS treats like caching. </div><div><br /></div><div>What? The OS has built in caching? Have you ever <span style="font-family: courier;">grep</span>ped a large directory and then noticed that executing the same command a second time is orders of magnitude faster than the first time? That's because modern operating systems leave pages in memory unless there is a reason to dispose of them. So, most of the time, there is no point in putting some caching layer on the same machine as where the database lives - a strange anti-pattern I've seen in the wild.</div><div><br /></div><div>Of course, none of this is not available over the network.</div><div><br /></div><div>Another advantage of having the data locally is that apps can employ a pattern called "<a href="https://javaagile.blogspot.com/2023/11/memories-are-made-of-these.html">memory mapping</a>". The idea is that as far as the app is concerned, a file is just a location in memory. You read it just like you would a sequence of bytes in RAM. Hadoop takes advantage of <a href="https://github.com/apache/hadoop/blob/40467519399c0e51beb25d6e557c55859382e8cf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java#L79">this</a>.</div><div><br /></div><div>Why is memory mapping useful? Well, you don't even need to make <a href="https://javaagile.blogspot.com/2012/07/why-system-calls-are-slow.html">kernel calls</a> so there is no context switching and certainly no copying data. <a href="https://github.com/PhillHenry/JavaPlayground/blob/bed20b52d89930e83a279480ef8eb4e5b9171441/src/main/java/uk/co/odinconsultants/memory/MemoryMapMain.java">Here</a> is an example of how to do this in Java. You can prove to yourself that there are no kernel calls by running:</div><div><br /></div><div><span style="font-family: courier; font-size: xx-small;">sudo strace -p $(jstack $(jps | grep MemoryMapMain | awk '{print $1}') | grep ^\"main | perl -pe s/\].*\//g | perl -pe s/.*\\[//g)</span></div><div><br /></div><div>Note there are kernel calls in setting up the memory mapping but after that, there is nothing as we read the entire file.</div><div><br />So, why have many architects largely abandoned data locality? It's generally a matter of economics as the people at MinIO point out <a href="https://min.io/solutions/hdfs-migration">here</a>. The idea is that if your data is not homogenous, you might be paying for, say, 16 CPUs on a node that's just being used for storage. An example might be that you have a cluster with 10 years of data but you mainly use that last two years. If the data for the first eight years is living on expensive hardware and rarely accessed, that could be a waste of money.</div><div><br /></div><div>So, should you use data locality today? The answer, as ever, is "it depends".<br /><br /></div></div>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-85056542960323001782024-01-23T04:12:00.000-08:002024-01-23T05:24:01.002-08:00Avoiding Spark OOMEs<div>Spark can process more data than it can fit into memory. So why does it sometimes fail with <span style="font-family: courier;">OutOfMemoryException</span>s when joining unskewed data sets?</div><div><br /></div><div>An interesting way to counter OOMEs in a large <span style="font-family: courier;">join</span> is <a href="https://stackoverflow.com/questions/70317018/pyspark-salting-an-inner-join-in-the-presence-of-skew">here</a> [SO] where rows are given a random integer seed that is used in addition to the usual condition. In theory, this breaks down the data into more manageable chunks.</div><div><br /></div><div>Another standard exercise is to <span style="font-family: courier;"><a href="https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html">repartition</a></span> the data. But this causes a shuffle and it may actually be the <b><span style="font-family: courier;">repartition</span> itself</b> that causes of an OOME.</div><div><br /></div><div>In practice, I've found persisting the data frame to disk and reading it back yields better results. The number of partitions being written is rarely the number that is read back. That is, you get a more natural partition for free (or almost free. Obviously, some time is taken in writing to disk). And there is no repartition that could throw an OOME.</div><div><br /></div><div><a href="https://discord.com/channels/566333122615181327/566334526540742656/1198309575057739917">This</a> question came up on Discord where somebody is trying to <span style="font-size: medium;">crossJoin</span> a huge amount of data. I suggested a solution that uses <span style="font-family: courier;">mapPartitions</span>. The nice thing about this method is that your code is passed a lazy data structure. As long as you don't try to call something like <span style="font-family: courier;">toList</span> on it, it will pull data into memory as needed and garbage collect it after it's written out.</div><br />By using a lazy <span style="font-family: courier; font-size: small;">Iterator</span>, Spark can write far more memory than it has to disk. As Spark consumes from the <span style="font-family: courier; font-size: x-small;">Iterator</span>, it measures its memory. When it starts looking a bit full, it flushes to disk. Here is the memory usage of <a href="Code at: https://github.com/PhillHenry/SparkPlayground/blob/main/src/main/scala/uk/co/odinconsultants/spark/katas/BigDataSmallMemoryMain.scala">this</a> code that uses <span style="font-family: courier;">mapPartitions</span> to write to <span style="font-family: courier;">/tmp/results_parquet</span> a data set that is much larger than the JVMs heap:<br /><br /><table align="center" cellpadding="0" cellspacing="0" class="tr-caption-container" style="margin-left: auto; margin-right: auto;"><tbody><tr><td style="text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiIQl9Sw9iV0mvs3p90WA65PNL6BR9ek2GRltJvAD6lUPZa-pNoX9yJ5vp8N04sCm-GzyCHfbtq1pxhh7ay3gK2AFxw2yYakzpVKnDEV5S8KuBmh5SeSzvhrLE1Cy8P4UYQr7okPu55KQvdYZps1cnWx-_NiyYeOAOvu1hjeF7RFjfp18775V7WDK6vM5_N/s1079/Spark512mbWriting1.4gb.png" style="margin-left: auto; margin-right: auto;"><img border="0" data-original-height="932" data-original-width="1079" height="552" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiIQl9Sw9iV0mvs3p90WA65PNL6BR9ek2GRltJvAD6lUPZa-pNoX9yJ5vp8N04sCm-GzyCHfbtq1pxhh7ay3gK2AFxw2yYakzpVKnDEV5S8KuBmh5SeSzvhrLE1Cy8P4UYQr7okPu55KQvdYZps1cnWx-_NiyYeOAOvu1hjeF7RFjfp18775V7WDK6vM5_N/w640-h552/Spark512mbWriting1.4gb.png" width="640" /></a></td></tr><tr><td class="tr-caption" style="text-align: center;">Spark with 0.5gb heap writing 1.3gb files</td></tr></tbody></table>If we run:<br /><br /><span style="font-family: courier;">watch "du -sh /tmp/results_parquet"</span><div><br /></div><div>we can see that upon each GC, more is written to disk.</div><div><br /></div><div>The result is a huge dataframe that could not fit into memory can now be <span style="font-family: courier;">join</span>ed with another.</div><div><br /></div><div>As an aside: Uber has been doing some work on dealing with OOMEs in Spark. See their article <a href="https://www.uber.com/en-US/blog/dynamic-executor-core-resizing-in-spark/">here</a>. TL;DR; they're proposing that in the event of an OOME, Spark adapts and increases the memory to CPU ratio by asking come cores to step down before it re-attempts the failed stage. Ergo, each compute unit has more memory than before. </div><div><br /></div>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-85691200919942234672024-01-11T03:46:00.000-08:002024-01-11T03:46:45.682-08:00Hilbert Curves<p>When you want to cluster data together over multiple dimensions, you can use <a href="https://javaagile.blogspot.com/2023/11/z-order.html">Z-Order</a>. But a better algorithm is the Hilbert Curve, a fractal that makes a best attempt to keep adjacent points together in a 1-dimensional space.</p><p>From DataBrick's Liquid Cluster design <a href="https://docs.google.com/document/d/1FWR3odjOw4v4-hjFy_hVaNdxHVs4WuK1asfB6M6XEMw/edit">doc</a> we get this graphical representation of what it looks like:<br /></p><table align="center" cellpadding="0" cellspacing="0" class="tr-caption-container" style="margin-left: auto; margin-right: auto;"><tbody><tr><td style="text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgP8l2LB7rvZDnnRwAsEq1QYwtdqr_nSooEPejvLft2m-lY9LlcXYbjZc5yHBoOtQVNdg67dy_bu9k_er8UjyMogSATbrmGK-eeVtt9yS8CyRnPjAACh7kRzJ_DFlLMrVbZ-wlKHKw7Vh8c1CL03LL2cfzTBuvuxLsoduyE8u_i7Q-Zz27oyYLhjRkUsZfJ/s516/hilbert_curve.png" style="margin-left: auto; margin-right: auto;"><img border="0" data-original-height="516" data-original-width="499" height="320" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgP8l2LB7rvZDnnRwAsEq1QYwtdqr_nSooEPejvLft2m-lY9LlcXYbjZc5yHBoOtQVNdg67dy_bu9k_er8UjyMogSATbrmGK-eeVtt9yS8CyRnPjAACh7kRzJ_DFlLMrVbZ-wlKHKw7Vh8c1CL03LL2cfzTBuvuxLsoduyE8u_i7Q-Zz27oyYLhjRkUsZfJ/s320/hilbert_curve.png" width="309" /></a></td></tr><tr><td class="tr-caption" style="text-align: center;">Dotted line squares represent files</td></tr></tbody></table><br /><div>A Hilbert curve has the property that adjacent nodes (on the red line, above) have a distance of 1. Note that a property of the Hilbert curve is the adjacent points on the curve are nearest neighbours in the original n-dimensional space but the opposite is not necessarily true. Not all nearest neighbours in the n-dimensional space are adjacent on the curve. How could they be if points have more than 2 neighbours in the original space?<div><br /></div><div>An algorithm in C for navigating this square can be found <a href="https://www.compuphase.com/hilbert.htm">here</a>. A Python toolkit for handling Hilbert curves can be found <a href="https://github.com/PrincetonLIPS/numpy-hilbert-curve">here</a> [GitHub]. And a Java implementation can be found <a href="https://stackoverflow.com/questions/499166/mapping-n-dimensional-value-to-a-point-on-hilbert-curve">here</a> [SO].</div><div><br /></div><div>The application of this in Big Data is that the data is now sorted. If we were to read through the files following the red line, then each node we encountered is one away from the last. <a href="https://javaagile.blogspot.com/2023/11/z-order.html">Z-Ordering</a> does not have this property. </div><div><br /></div><table align="center" cellpadding="0" cellspacing="0" class="tr-caption-container" style="margin-left: auto; margin-right: auto;"><tbody><tr><td style="text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEg4t0uQ95rLklLc1CKu51-hCsV5eRnvVyKAO-Ys6w3yKh6AW0Xy5oqW3rGxIezbJ4HMPfPRvo7YCszQmLV_nqWmVg7GaLsMG8xVQAP_a0JKU4N1oLJhyRmeZawEshlchMhXDogQNsWsO_fo_zKIvz8TSwmTVFhC_39-1bKxraTJ49EgsDn3REvUl2PCdh0Z/s455/zorder_dense_2d.png" imageanchor="1" style="margin-left: auto; margin-right: auto;"><img border="0" data-original-height="455" data-original-width="436" height="400" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEg4t0uQ95rLklLc1CKu51-hCsV5eRnvVyKAO-Ys6w3yKh6AW0Xy5oqW3rGxIezbJ4HMPfPRvo7YCszQmLV_nqWmVg7GaLsMG8xVQAP_a0JKU4N1oLJhyRmeZawEshlchMhXDogQNsWsO_fo_zKIvz8TSwmTVFhC_39-1bKxraTJ49EgsDn3REvUl2PCdh0Z/w384-h400/zorder_dense_2d.png" width="384" /></a></td></tr><tr><td class="tr-caption" style="text-align: center;">Z-ordering. Lines indicate contiguous data. Colours indicate different files.</td></tr></tbody></table><br /><div>Unlike the Hilbert curve at the top of this page, there are some large jumps. In fact, the average step is not 1.0 as for the Hilbert curve but 1.557 in this example - over 50% more!</div><div><br /></div><div>This greater efficiency is true even if we don't take the unlikely case that the data is tightly packed. Below are examples where the data is more realistic and not every possible point (a red +) is actually associated with data (a blue circle).</div><div><br /></div><table align="center" cellpadding="0" cellspacing="0" class="tr-caption-container" style="margin-left: auto; margin-right: auto;"><tbody><tr><td style="text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjlMqAqOMxR36UdBn6bWBbTro-7McUj2T4AnicNX-ObQxSJSnXNCFFhvK5Cp7pkflag2svTrhZdfi-vlOa_fdgygWTqzjWhrgQ_uQTdecBPB14SbsKtB9EcS-LWgmJUprNWXuxTRtAy7yOE2snuZKJLAANxvVCXNXFVShg3aO_555d4idJvMCT4iWKNJMBs/s455/example_2d_color.png" style="margin-left: auto; margin-right: auto;"><img border="0" data-original-height="455" data-original-width="436" height="400" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjlMqAqOMxR36UdBn6bWBbTro-7McUj2T4AnicNX-ObQxSJSnXNCFFhvK5Cp7pkflag2svTrhZdfi-vlOa_fdgygWTqzjWhrgQ_uQTdecBPB14SbsKtB9EcS-LWgmJUprNWXuxTRtAy7yOE2snuZKJLAANxvVCXNXFVShg3aO_555d4idJvMCT4iWKNJMBs/w384-h400/example_2d_color.png" width="384" /></a></td></tr><tr><td class="tr-caption" style="text-align: center;">A Hilbert curve over sparse data</td></tr></tbody></table><br /><div>To understand what is going on, we need to appreciate <a href="https://en.wikipedia.org/wiki/Gray_code">Gray Codes</a> [Wikipedia] which is an alternative numbering system in binary where adjacent numbers only differ by one bit changing (see that parallel with Hilbert curves?). For each bit, for each dimension, we create a mask from the Gray code and do some bit manipulation found <a href="https://github.com/PrincetonLIPS/numpy-hilbert-curve/blob/main/hilbert/encode.py">here</a> and we'll eventually have a bijective map ℤ<sup>d</sup> → ℤ.</div><div><br /></div></div><div>The jumps between adjacent data points is less extreme in Hilbert curves. You can see this by-eye if look at a slightly larger space (code <a href="https://github.com/PhillHenry/MathematicalPlayground/blob/master/graphics/hilbert_2d.py">here</a>):</div><div><br /></div><table align="center" cellpadding="0" cellspacing="0" class="tr-caption-container" style="margin-left: auto; margin-right: auto;"><tbody><tr><td style="text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhpdTxYBXYjhIAbRCZPp4HWcybBJQiqLdJ4Sl5erShyf5bP-EjGNzNKAWSfbrhYSQzcoFXgOrsuiaVy_4oqRDajTR8S9n3-VQPC6mcJO_RJY1W8Ac-49IXtnCqO0PPX0GLK7THrcIfB2bo-JRRRdk2Vhd2yuRRlWZbAF8EUfvPJf3ZPASIbmwlaEXYyK4Ke/s455/hilbert_2d.png" imageanchor="1" style="margin-left: auto; margin-right: auto;"><img border="0" data-original-height="455" data-original-width="436" height="400" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhpdTxYBXYjhIAbRCZPp4HWcybBJQiqLdJ4Sl5erShyf5bP-EjGNzNKAWSfbrhYSQzcoFXgOrsuiaVy_4oqRDajTR8S9n3-VQPC6mcJO_RJY1W8Ac-49IXtnCqO0PPX0GLK7THrcIfB2bo-JRRRdk2Vhd2yuRRlWZbAF8EUfvPJf3ZPASIbmwlaEXYyK4Ke/w384-h400/hilbert_2d.png" width="384" /></a></td></tr><tr><td class="tr-caption" style="text-align: center;">A Hilbert curve over sparse data</td></tr></tbody></table><br /><div>Typically, the jumps between data points are never more than a couple of positions (average of 1.433). Now, compare this to a similar space using Z-Ordering:</div><table align="center" cellpadding="0" cellspacing="0" class="tr-caption-container" style="margin-left: auto; margin-right: auto;"><tbody><tr><td style="text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgIJDR7pK5y0bkolup4xMOXMJEen9TrTmDB9-XuzUS47Qv7u4Cl2EtQf_KG6sW3Und2P7XHjspruhyszL_I1ZPwAzIb3QdqfmtHCj6TS9pghktKR7lGXJmvHCHv1fEDsC5K-ZnZ6GILBj8l5bdS9krMW5yuIrDEys8NLK3aKNE7j0vaSJhFFmMz6LhgBKRn/s455/zorder_sparse_2d.png" imageanchor="1" style="margin-left: auto; margin-right: auto;"><img border="0" data-original-height="455" data-original-width="436" height="400" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgIJDR7pK5y0bkolup4xMOXMJEen9TrTmDB9-XuzUS47Qv7u4Cl2EtQf_KG6sW3Und2P7XHjspruhyszL_I1ZPwAzIb3QdqfmtHCj6TS9pghktKR7lGXJmvHCHv1fEDsC5K-ZnZ6GILBj8l5bdS9krMW5yuIrDEys8NLK3aKNE7j0vaSJhFFmMz6LhgBKRn/w384-h400/zorder_sparse_2d.png" width="384" /></a></td></tr><tr><td class="tr-caption" style="text-align: center;">Z-Order over a similar sparse space</td></tr></tbody></table><br /><div>and you can see larger jumps between some data points. The average is 2.083 in this run. That's 45% higher than in the Hilbert curve.</div><div><br /></div><div>Hilbert curves are not currently implemented in Apache Iceberg but are in Databrick's Delta Lake.</div>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-42962169144783359902024-01-03T09:29:00.000-08:002024-01-12T02:24:36.998-08:00GPU vs CPU vs AVX<div class="separator" style="clear: both; text-align: left;"><br />Vector databases are all the rage. So, I looked at three different ways of multiplying vectors: CPU, GPU and Advanced Vector Extensions that leverages <a href="https://en.wikipedia.org/wiki/Single_instruction,_multiple_data">SIMD</a> instructions if your hardware supports them. To access the GPU, I'm using the <a href="https://javaagile.blogspot.com/2023/10/java-and-gpu.html">Tornado Java VM</a>. For AVX, I'm using the JVM's <span style="font-family: courier; font-size: xx-small;">jdk.incubator.vector</span> module, available since JDK16.</div><div class="separator" style="clear: both; text-align: left;"><br /></div><div class="separator" style="clear: both; text-align: left;">(Code in my GitHub repo <a href="https://github.com/PhillHenry/Victor">here</a>).</div><div class="separator" style="clear: both; text-align: left;"><br /></div><div class="separator" style="clear: both; text-align: left;">The reason we're looking at vector mulitplication is that searching for vectors (what the vector DB is all about) usually uses something like the approximate nearest neighbour algorithm. One way to implement it is something like Ethan Lui's implementation mentioned in a past blogpost <a href="https://javaagile.blogspot.com/2023/08/can-we-apply-ml-to-logging.html">here</a>. Briefly: it multiplies your vector by random vectors resulting in a vector whose bits are on or off depending on the sign of each element in the product.</div><div class="separator" style="clear: both; text-align: left;"><br /></div><div class="separator" style="clear: both; text-align: left;">The results are as follow (note, the GPU is a Quadro T2000 that apparently has 4gb of memory, 1024 cores and a bandwidth of 128 gigabits per second).</div><div class="separator" style="clear: both; text-align: left;"><br /></div><table align="center" cellpadding="0" cellspacing="0" class="tr-caption-container" style="margin-left: auto; margin-right: auto;"><tbody><tr><td style="text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhthzqgk8WI2bxT7dQ96BxVGn6dnG1_KNQpuJdfubtD7DG-p2RJru6j-fCN3nJ7b3yMMpr7AdYB2rM9-mQzdYSyBhQ9m3P8Szfwl6BW2uXbyKDvhmfTIntxS-qSuON-GJy607q-1SazSKzNV4_-AxDmZ9nwV_A9QG3toDHv0AVa57sYSX1ughqfOMvxbYi_/s640/gpu_vs_cpu_avx.png" imageanchor="1" style="margin-left: auto; margin-right: auto;"><img border="0" data-original-height="480" data-original-width="640" height="480" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhthzqgk8WI2bxT7dQ96BxVGn6dnG1_KNQpuJdfubtD7DG-p2RJru6j-fCN3nJ7b3yMMpr7AdYB2rM9-mQzdYSyBhQ9m3P8Szfwl6BW2uXbyKDvhmfTIntxS-qSuON-GJy607q-1SazSKzNV4_-AxDmZ9nwV_A9QG3toDHv0AVa57sYSX1ughqfOMvxbYi_/w640-h480/gpu_vs_cpu_avx.png" width="640" /></a></td></tr><tr><td class="tr-caption" style="text-align: center;"></td></tr></tbody></table><div class="separator" style="clear: both; text-align: left;">You can see that there is a huge fixed cost to using the GPU but once you get sufficiently large vectors, it's worth it. But what causes this fixed cost?</div><div><br /></div><div>On my Intel Xeon E-2286M CPU @ 2.40GHz, kernel calls take typically 17.8ns.<br /><br /><div><span style="font-family: courier; font-size: xx-small;"> 17.776 ±(99.9%) 0.229 ns/op [Average]</span></div><div><span style="font-family: courier; font-size: xx-small;"> (min, avg, max) = (17.462, 17.776, 19.040), stdev = 0.306</span></div><div><span style="font-family: courier; font-size: xx-small;"> CI (99.9%): [17.547, 18.005] (assumes normal distribution)</span></div><div><br /></div>JNI calls take a little longer at about 21.9ns:<br /><br /><div><span style="font-family: courier; font-size: xx-small;"> 21.853 ±(99.9%) 0.488 ns/op [Average]</span></div><div><span style="font-family: courier; font-size: xx-small;"> (min, avg, max) = (21.345, 21.853, 23.254), stdev = 0.651</span></div><div><span style="font-family: courier; font-size: xx-small;"> CI (99.9%): [21.365, 22.340] (assumes normal distribution)</span></div><br /><div>So, it doesn't seem that the fixed costs incurred in the GPU vector multiplication is due to context switching when calling the kernel or calls via JNI.</div></div><div><br /></div><div>Note the maximum vector size for this test was 8 388 608 <span style="font-family: courier;">float</span>s. </div><div><br /></div><div>That's 268 435 456 bits or 0.25 gigabits.</div><div><br /></div>Based on just bandwidth alone and ignoring everything else, each call should be about 1.95ms. This matches the average observed time (1.94971ms). <div><br /></div><div>This suggests the actual calculation is incredibly fast and only the low bandwidth is slowing it down. Tornado VM appears to have minimal room for improvement - you really are getting the best you can out of the hardware.</div><div><br /></div>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-56298778171900854262023-12-23T01:01:00.000-08:002023-12-23T01:09:44.119-08:00Cloud native<p>A cloud native approach to writing code is that the instance in which it lives can die at any time.</p><p></p><blockquote>"Users sometimes explicitly send the SIGKILL signal to a process using kill -KILL or kill -9. However, this is generally a mistak. A well-designed application will have a handler for SIGTERM that causes the application to exit gracefully, cleaning up temporary files and realeasing other resources beforehand. Killing a process with SIGKILL bypasses the SIGTERM handler." - The Linux Programming Interface (Micahel Kerrisk)</blockquote>Using <span style="font-family: courier;">docker stop</span> sends <span style="font-family: courier;">SIGTERM</span>. <br />Using <span style="font-family: courier;">docker kill</span> sends <span style="font-family: courier;">SIGKILL</span>.<br /><br />The latter does not give the JVM a chance to clean up. In fact, no process in any language has the chance to clean up with <span style="font-family: courier;">SIGKILL</span>. (SIGTERM on <i>any</i> thread - not just <span style="font-family: courier;">main</span> - causes the whole JVM process to end and shutdown hooks to execute.) <br /><br /><u>A Tini problem...</u><br /><br />If the JVM process creates another process is killed with SIGKILL, that process carries on living but its parent becomes (on Ubuntu 20.04.6 LTS) <span style="font-family: courier;">systemd</span> which in turn is owned by <span style="font-family: courier;">init</span> (PID 1).<p></p><p>Running your JVM directly in a Docker container has some issues. This revolves around Linux treating PID 1 as special. And the <span style="font-family: courier;">ENTRYPOINT</span> for any Docker container is PID 1.<br /><br />In Linux, PID 1 should be <span style="font-family: courier;">init</span>. On my Linux machine, I see:<br /></p><p><span style="font-family: courier; font-size: xx-small;">$ ps -ef | head -2<br />UID PID PPID C STIME TTY TIME CMD<br />root <b>1</b> 0 0 Oct21 ? 00:18:23 /sbin/<b>init</b> splash</span></p><p>This process serves a special purpose. It handles SIGnals and zombie processes. Java is not built with that in mind so it's best to bootstrap it with a small process called <span style="font-family: courier;"><a href="https://github.com/krallin/tini">tini</a></span>. There's a good discussion why this is important <a href="https://github.com/krallin/tini/issues/8">here</a> on GitHub. Basically, Tini will forward the signal that killed the JVM onto any zombies that are left behind. This gives them the chance to clean up too. </p><p>It also passes the JVM's exit code on so we can know how it failed. Exit codes 0-127 are reserved [<a href="https://stackoverflow.com/questions/17671234/is-there-a-complete-list-of-jvm-exit-codes">SO</a>] and the value of the kill (<span style="font-family: courier;">kill -l</span> lists them) is added to 128. If you want to set the exit code in the shutdown hook, note you need to call <span style="font-family: courier;">Runtime.halt</span> rather than <span style="font-family: courier;">Runtime.exit</span> (to which <span style="font-family: courier;">System.exit</span> delegates). The <span style="font-family: courier;">exit</span> method will cause the JVM to hang in this situation [<a href="https://stackoverflow.com/questions/19798452/what-happens-if-system-exit-is-called-again-while-a-jvm-shutdown-is-already-in">SO</a>].</p>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-74004887338569213832023-12-12T05:58:00.000-08:002023-12-12T05:58:58.475-08:00ML and Logs (pt2)<p>Further to my <a href="https://javaagile.blogspot.com/2023/08/can-we-apply-ml-to-logging.html">attempt</a> to use machine learning to make sense of huge amounts of logs, I've been looking at the results. My <a href="https://github.com/PhillHenry/KafkaPlayground/tree/main/src/main/python">PoC</a> can:</p><blockquote style="border: none; margin: 0px 0px 0px 40px; padding: 0px; text-align: left;"><p><u>Find log entries with the highest information</u></p><p>When debugging my Kafka cluster, these lines had the highest average entropy:</p><p><span style="font-family: courier; font-size: xx-small;">kafka1: 2023-07-04 14:14:18,861 [RaftManager id=1] Connection to node 3 (kafka3/172.31.0.4:9098) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)</span></p><p><span style="font-family: courier; font-size: xx-small;">kafka1: 2023-07-04 14:17:32,605 [RaftManager id=1] Connection to node 2 (kafka2/172.31.0.3:9098) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)</span></p><p><span style="font-family: courier; font-size: xx-small;">kafka2: 2023-07-04 14:17:31,957 [TransactionCoordinator id=2] Connection to node 3 (localhost/127.0.0.1:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)</span></p><p><span style="font-family: courier; font-size: xx-small;">kafka1: 2023-07-04 14:17:32,605 [RaftManager id=1] Node 2 disconnected. (org.apache.kafka.clients.NetworkClient)</span></p><p><span style="font-family: courier; font-size: xx-small;">kafka2: 2023-07-04 14:17:31,957 [TransactionCoordinator id=2] Node 3 disconnected. (org.apache.kafka.clients.NetworkClient)</span></p><p>As it happened, this correctly highlighted my problem (Docker Compose networking was misconfigured). But I don't know if I got lucky.</p><p><u>Bucket similar-but-different lines</u></p><p>Using the same algorithm as Twitter, we can bucket similar but lexically different lines, for example:</p><p><span style="font-family: courier; font-size: xx-small;">2023-07-04 14:14:21,480 [QuorumController id=3] ConfigResource(type=TOPIC, name='__consumer_offsets'): set configuration <b>cleanup.policy</b> to <b>compact</b> (org.apache.kafka.controller.ConfigurationControlManager)</span></p><p><span style="font-family: courier; font-size: xx-small;">2023-07-04 14:14:21,489 [QuorumController id=3] ConfigResource(type=TOPIC, name='__consumer_offsets'): set configuration <b>compression.type</b> to <b>producer</b> (org.apache.kafka.controller.ConfigurationControlManager)</span></p><p>This means that we can:</p></blockquote><p style="text-align: left;"></p><ul style="text-align: left;"><ul><li>discard boilerplate lines of little value like those above</li><li>check the distribution of all nodes in a given bucket (for example, if one node is under-represented within a bucket - that is, not logging the same as its peers - this might be an issue).</li></ul></ul><blockquote style="border: none; margin: 0px 0px 0px 40px; padding: 0px; text-align: left;"><div>There's one slight gotcha here: in the Kafka example above, we're using the Raft protocol so it's not too surprising that the number of nodes is N-1 for some configurations as one has been elected leader and the others are followers.</div><p><u>Trace high information tokens through the system</u></p><p>Words with high entropy can be traced across my cluster. For instance, my PoC classified <span style="font-family: courier;">wUi1RthMRPabI8rHS_Snig</span> as possessing high information. This happens to be an internal Kafka UUID for a topic and tracing its occurrence through the logs show that despite Docker <a href="https://javaagile.blogspot.com/2023/09/spark-kafka-and-docker.html">network issues</a>, all nodes agreed on the topic ID as did the client. So, clearly some communication was happening despite the misconfiguration.</p></blockquote><p></p><p><u>Investigation</u></p><p>I finally solved my Kafka problem. The Kafka client was running on the host OS and could see the individual Kafka containers but these brokers could not talk to each other. The reason was they needed to advertise themselves both as localhost (for the sake of the Kafka client that lives outside Docker) and also using their internal names (so they could talk within the Docker network).</p><p>My PoC could not tell me exactly what the problem was but it successfully highlighted the suspects.</p><p><u>The PoC</u></p><p>So, how does the PoC work? For the entropy, we train the model on a dictionary of English words so it can learn what is a "normal" word, rather than say <span style="font-family: courier;">wUi1RthMRPabI8rHS_Snig</span>. We disregard lines that are fewer than 6 words (including the FQN of the classes - each package being one word); take the average entropy and present the lines that look the most informative.</p><p>For the LSH, we use one-hot encoding of word shingles to create our vectors.</p><p><u>Future plans</u></p><p>I'd like to show the graph of paths the high-entropy words take through the system (node and log line).</p><p>I'd also like to try other systems. Maybe I got lucky with Kafka as there are lovely, high-entropy UUID scattered throughout the logs (for example, consumer group IDs).</p><p>Thirdly, this PoC has been great for small amounts of data, but what about big data? It really needs to be rewritten in a JVM language and made to run in Spark.</p>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-55124431248079336102023-11-30T22:25:00.000-08:002023-11-30T22:25:14.640-08:00Memories are made of these<p>Some notes on new memory models I've been looking at recently.<br /><br /><u>Zero copy</u><br /><br />"Device controllers cannot do DMA directly into user space, but the same effect is achievable by exploiting ... [the fact] more than one virtual address can refer to the same physical memory location. [Thus] the DMA hardware (which can access only physical memory addresses) can fill a buffer that is simultaneously visible to both the kernel and a user space process." - Java NIO, Ron Hitchens</p><p>Virtual memory paging is "often referred to as swapping, though true swapping is done at the process level, not the page level" [ibid].</p><p>An excellent visual representation of what's going on in during a zero-copy is <a href="https://2minutestreaming.beehiiv.com/p/apache-kafka-zero-copy-operating-system-optimization">here</a> from Stanislav Kozlovski (who ends with the knock-out punch that is makes very little difference to Kafka since generally costs of network IO and encryption cancel any savings). Anyway, the take-away points are: </p><p></p><ul style="text-align: left;"><li>Zero-copy "<b>doesn’t</b> actually mean you make literally zero copies" it's just that it "does not make unnecessary copies of the data."</li><li>Fewer context switches happen.</li><li>A further optimization to DMA is where the disk "read buffer directly copies data to the NIC buffer - not to the socket buffer. This is the so-called <b>scatter-gather</b> operation (a.k.a Vectorized I/O). [It is] the act of only storing read buffer <i>pointers</i> in the socket buffer, and having the DMA engine read those addresses directly from memory."</li></ul><p></p><p><u>Java's new vector API</u><br /><br />A new way of dealing with vectors is outlined at <a href="https://openjdk.org/jeps/426">JEP426</a> (Vector API). It leverages new CPU features like <a href="https://en.wikipedia.org/wiki/Advanced_Vector_Extensions">Advanced Vector Extensions</a> [Wikipedia] that provide new machine instructions to execute Single Instructions on Multiple Data (SIMD). <br /><br />Martin Stypinski has an interseting <a href="https://medium.com/@Styp/java-18-vector-api-do-we-get-free-speed-up-c4510eda50d2">article</a> that shows adding two floating point vectors together gain very little from the new API but a linear equation like <span style="font-family: courier;">y = mx + c</span> (which has obvious applications to machine learning) can improve performance by an order of magnitude.<br /><br /><u>Project Panama</u><br /><br /><a href="https://openjdk.org/projects/panama/">Project Panama</a> deals with interconnecting the JVM with native code. Oracle's Gary Frost talks about this in his <a href="https://www.youtube.com/watch?v=lbKBu3lTftc&t=1988s">presentation</a> on accessing the GPU from Java. The difficulty he encountered was allocating heap memory and passing it to the GPU. Unfortunately, the garbage collector might reorganise the heap making the pointer to that memory obsolete. With Project Panama, this would not happen as the allocation would be through the JVM but off the heap. <br /><br /><u>Apache Arrow</u><br /><br />Arrow provides an agreed memory format for data so you can "share data across languages and processes." [<a href="https://arrow.apache.org/use_cases/#:~:text=The%20Arrow%20format%20allows%20serializing,performance%20gains%20when%20transferring%20data.">docs</a>] <br /><br />This differs from Google's Protobuf in that "Protobuf is designed to create a common <i>on the wire</i> or <i>disk</i> format for data." [<a href="https://stackoverflow.com/questions/66521194/comparison-of-protobuf-and-arrow">SO</a>] Any data from Protobuf that is deserialized will be done in the the same way that language always handles it.<br /><br />This inter-process ability allows Spark (which runs in the JVM) to use Pandas (which runs in a Python process).</p><p>"Perhaps the single biggest memory management problem with pandas is the requirement that data must be loaded completely into RAM to be processed... Arrow serialization design provides a “data header” which describes the exact locations and sizes of all the memory buffers for all the columns in a table. This means you can memory map huge, <b>bigger-than-RAM</b> datasets and evaluate pandas-style algorithms on them in-place without loading them into memory like you have to with pandas now. <b>You could read 1 megabyte from the middle of a 1 terabyte table</b>, and you only pay the cost of performing those random reads totalling 1 megabyte... Arrow’s memory-mapping capability also allows multiple processes to work with the same large dataset without moving it or copying it in any way. "[10 Things I hate about Pandas, by Pandas author, <a href="https://wesmckinney.com/blog/apache-arrow-pandas-internals/">Wes McKinny</a>]</p><p>"The ability to memory map files allows you to treat file data on disk as if it was in memory. This exploits the virtual memory capabilities of the operating system to dynamically cache file content without committing memory resources to hold a copy of the file." [NIO - Hitchens].</p><p><u>MySQL vs Postgres</u></p><p>There's a great comparison between the two major open source DBs <a href="https://www.uber.com/en-GB/blog/postgres-to-mysql-migration/">here</a> at Uber. Amongst the many insights, there is a mention that MySQL uses a cache "logically similar to the Linux page cache but implemented in userspace... It results in fewer context switches. Data accessed via the InnoDB buffer pool doesn’t require any user/kernel context switches. The worst case behavior is the occurrence of a TLB [Translation Lookaside Buffer] miss, which is relatively cheap and can be minimized by using huge pages."</p><p>"On systems that have large amounds of memory and where applications require large blocks of memory, using huge pages reduces the number of entries required in the hardware memory management unit's translation look-aside buffer (TLB). This is beneficial because <b>entries in the TLB are usually a scarce resource</b>... For example, x86-32 allows 4mb pages as an alternative to 4kb pages)" [The Linux Programming Interface]</p>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-26883154475407479722023-11-09T07:10:00.000-08:002024-01-09T07:59:50.678-08:00Z-order<p>Z-ordering is an optimization technique in big data that allows faster access since similar data lives together. We discuss the algorithm that defines what is similar here. </p><p>Imagine a logical grid where all the values of one column run across the top and all the values from another run down the side. If we were to sort this data, every datum can be placed somewhere in that grid. <br /><br />Now, if the squares of the grid were mapped to files and all the data in each cell were to live in those files, we have made searching much easier as we now know the subset of files in which it may live. We've essentially sorted in not just one dimension but two (although we can do higher).</p><p>This can be especially useful when we want to sort the data but don't know exactly what to sort on - a common connundrum when dealing with <a href="https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1703788925597849?thread_ts=1703580174.116649&cid=C025PH0G1D4">events</a>. Say we have <span style="font-family: courier;">event_time</span>, <span style="font-family: courier;">insert_time</span> and <span style="font-family: courier;">update_time</span>. Which do we choose? We could sort the data three times, each time on one column but this is impractical with huge data sets. Enter z-order.</p><p>Note that the Z-Order really needs 2 or more columns on which to act. Only one column is the degenerate case. "Zorder/Hilbert etc on a single dimension are just a hierarchal sort" [Russell Spitzer on <a href="https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1672931369853579?thread_ts=1672924030.653269&cid=C025PH0G1D4">Slack</a>].</p><p>(<a href="https://www.dremio.com/blog/how-z-ordering-in-apache-iceberg-helps-improve-performance/">This</a> is a good article about z-ordering from the perspective of Apache Iceberg.)</p><p>For an example in Delta Lake, we can see <a href="https://github.com/delta-io/delta/blob/616af05e487a9a4ccffe90a9469cb03674607690/spark/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeZOrderSuite.scala#L195">this</a> code that creates a data set with columns <span style="font-family: courier;">c1</span>, <span style="font-family: courier;">c2</span> and <span style="font-family: courier;">c3</span> whose values are <span style="font-family: courier;">[x, 99-x, x+50 mod 100]</span> for x <span style="font-family: courier;">[0, 99]</span>. After z-ordering it, these numbers are split into 4 different files. <a href="https://github.com/PhillHenry/MathematicalPlayground/blob/master/graphics/plot_3d_points.py">Generating</a> a graphic illustrates how the data points are distributed over those files:</p><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEi0HVJmld3R6VJfNzOrenJaf5t6UezJ2t1G60JwYs0Pwncgn4ZxhghMczAicri09oASwrvzqDLDC6npoRz3m1VfuFC6X708Swxh2AZDlyCPutBHjUkxXewHzbTEe6Hmpj6G9_jIyTVGGSPmuYkETD9y7J-vNeVf9j3nZPP9FwNZgGp6fqedQZF2_wi_Kbcs/s1053/z_ordering.png" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="875" data-original-width="1053" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEi0HVJmld3R6VJfNzOrenJaf5t6UezJ2t1G60JwYs0Pwncgn4ZxhghMczAicri09oASwrvzqDLDC6npoRz3m1VfuFC6X708Swxh2AZDlyCPutBHjUkxXewHzbTEe6Hmpj6G9_jIyTVGGSPmuYkETD9y7J-vNeVf9j3nZPP9FwNZgGp6fqedQZF2_wi_Kbcs/s16000/z_ordering.png" /></a></div>The idea behind how we calculate which cell a datum falls into is best described <a href="https://en.wikipedia.org/wiki/Z-order_curve">here</a> on Wikipedia. But, in brief, the binary representation of the data points is interleaved to give a z-value per tuple. In our example, I see a <span style="font-family: courier;">[0, 99, 50]</span> mapped to the byte array <span style="font-family: courier;">[0, 0, 0, 0, 0, 0, 0, 0, 0, 9, -112, 26]</span>.<br /><p>I took a look at the Delta Lake code <a href="https://github.com/delta-io/delta/blob/616af05e487a9a4ccffe90a9469cb03674607690/spark/src/main/scala/org/apache/spark/sql/delta/skipping/MultiDimClustering.scala#L90">here</a> where a Spark Column object is created that wraps a DL <span style="font-family: courier;">InterleaveBits</span> type which in turn is a subclass of Spark's <a href="https://javaagile.blogspot.com/2023/05/spark-catalysts.html">Catalyst</a> <span style="font-family: courier;">Expression</span> type. This executes on Spark's <span style="font-family: courier;">InternalRow</span>, that is, the raw data on the executors.</p><p>The reason the code is doing this is to add a column with which we can repartition the data with the SQL <span style="font-family: courier; font-size: xx-small;">CAST(interleavebits(rangepartitionid(c1), rangepartitionid(c2), rangepartitionid(c3)) AS STRING)</span>. The <span style="font-family: courier; font-size: x-small;">rangepartitionid</span> keyword is part of the Delta Lake machinery.</p><p>Using this z-order value (plus a random key), the DataFrame the Delta code now calls <span style="font-family: courier;">repartitionByRange</span> which samples the data [<a href="https://stackoverflow.com/questions/65809909/spark-what-is-the-difference-between-repartition-and-repartitionbyrange">SO</a>] and breaks it into discrete ranges.</p><p>Given the interleaving of the columns <span style="font-family: courier;">c1</span>, <span style="font-family: courier;">c2</span> and <span style="font-family: courier;">c3</span> their order has minimal impact on the z-value so it's no surprise to see nearby data clustering into the same files, as we can see in the graphic. In fact, if you look at the DataFrame during the repartition process:<br /><br /></p><p><span style="font-family: courier; font-size: xx-small;">+---+---+---+-------------------------------------------+</span><br /><span style="font-family: courier; font-size: xx-small;">| c1| c2| c3|c7b6b480-c678-4686-aa99-283988606159-rpKey1|<br />+---+---+---+-------------------------------------------+<br />| 0| 99| 50| \t�|<br />| 1| 98| 51| \t�|<br />| 2| 97| 52| \t�b|<br />| 3| 96| 53| \t�e|<br />| 4| 95| 54| \b��|<br />| 5| 94| 55| \b��|<br />| 6| 93| 56| \b��|<br />| 7| 92| 57| \b��|<br />| 8| 91| 58| \b�|<br />| 9| 90| 59| \b�|<br />| 10| 89| 60| \b�b|<br />| 11| 88| 61| \b�e|<br />| 12| 87| 62| \b��|<br />| 13| 86| 63| \b��|<br />| 14| 85| 64| \f)�|<br />| 15| 84| 65| \f)�|<br />| 16| 83| 66| \f`|<br />| 17| 82| 67| \f`|<br />| 18| 81| 68| \f`b|<br />| 19| 80| 69| \f`e|<br />+---+---+---+-------------------------------------------+</span><br /><br />you can see the slowly changing values by which things are partitioned (column <span style="font-family: courier; font-size: x-small;">c7b6b480-c678-4686-aa99-283988606159-rpKey1</span> - a random name so it doesn't clash with other column names. It's dropped immediately after the call to <span style="font-family: courier;">repartitionByRange</span>)</p>
Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-79656918795890528652023-10-25T08:44:00.002-07:002023-10-26T04:15:23.330-07:00Java and the GPU<p>Java was always built to abstract away the hardware on which it runs but it's approach to GPU has been somewhat <a href="https://www.youtube.com/watch?v=lbKBu3lTftc&t=1988s">late to the game</a> [Gary Frost, YouTube].<br /><br />There are projects out there that promise to give Java access to the GPU. I looked at <a href="https://github.com/aparapi/aparapi">Aparapi</a> but it appears to be moribud. So, I gravitated to TornadoVM which Frost describes as "state of the art".<br /><br />The trouble is that TornadoVM runs everything in <a href="https://github.com/beehive-lab/docker-tornadovm">a Docker image</a> that has all the shared objects built in. This is fine for a quick demo - this is the result of running on my Quadro T2000:<br /></p><p><span style="font-family: courier; font-size: xx-small;">docker-tornadovm$ ./run_nvidia_openjdk.sh tornado -cp example/target/example-1.0-SNAPSHOT.jar example.MatrixMultiplication<br />Computing MxM of 512x512<br /><span style="white-space: pre;"> </span>CPU Execution: 1.17 GFlops, Total time = 230 ms<br /><span style="white-space: pre;"> </span>GPU Execution: 268.44 GFlops, Total Time = 1 ms<br /><span style="white-space: pre;"> </span>Speedup: 230x</span></p><div>This demonstrates how the GPU runs a nested for-loop doing matrix multiplication much faster than the same code on the CPU. But it runs it all in a Docker container and I need to package a JAR everytime I make a change. How do I run it outside the container?<br /><br />To work this out, I opened a shell in the Docker image and saw that the TornadoVM build it uses was built from Git branch <span style="font-family: courier;">d3062accc</span>. So, the first thing was to checkout that branch of <a href="https://github.com/beehive-lab/TornadoVM">TornadoVM</a> and build it.</div><p>I built with:<br /><br /><span style="font-family: courier; font-size: xx-small;">mvn clean install -Pgraal-jdk-11-plus<br /></span><br />using the <span style="font-family: courier;">graalvm-ee-java11-22.3.4</span> JDK.</p><p>Note that you'll need <a href="https://github.com/oracle/graal">Graal</a> as the TornadoVM code has dependencies on it. I built my own Graal JDK by following the instructions <a href="https://github.com/oracle/graal/issues/588">here</a> but using a different branch as I couldn't find the download for the <span style="font-family: courier;">graal.version</span> defined in the TornadoVM <span style="font-family: courier;">pom.xml</span>. Note, you'll also need <a href="https://github.com/graalvm/mx"><span style="font-family: courier;">mx</span></a> and a <a href="https://github.com/oracle/graal/issues/4751">bootstrapping JDK</a> that has the right compiler interface (JVMCI), in my case <span style="font-family: courier;"><a href="https://github.com/graalvm/labs-openjdk-21/releases">labsjdk-ce-21.0.1-jvmci-23.1-b19</a></span>.<br /><br />So far, so good. I ran the <span style="font-family: courier;">tornado</span> script which is just a wrapper around a call to the <span style="font-family: courier;">java</span> executable (don't forget to set your <span style="font-family: courier;">JAVA_HOME</span> environment variable to point at the Graal JDK) but it complained it could not see a <span style="font-family: courier;">tornado.backend</span> file. <br /><br />Again, a sneaky look at the Docker container indicated that we have to tell it which driver to use. So, I created the file and told it <span style="font-family: courier;">tornado.backends=opencl-backend</span> but then <span style="font-family: courier;">tornado</span> complained it didn't have the OpenCL drivers. Oops. </p><p>You have to build the drivers you want seperately it seems. But if you try to build Tornado drivers without the native OpenCL dev library, you'll see:<br /><br /><span style="font-family: courier; font-size: xx-small;">TornadoVM/tornado-drivers/opencl-jni$ mvn clean install # <b>yes, Maven cmake via cmake-maven-plugin</b><br />....<br />/usr/bin/ld: cannot find -lOpenCL<br />...</span><br /><br />The Docker image saves you from having to install the OpenCL libraries on your machine. To get it working on bare metal, I played it safe and got an old Ubuntu box and <a href="https://askubuntu.com/questions/796770/how-to-install-libopencl-so-on-ubuntu">installed</a> them there. You'll need to install them with:</p><p><span style="font-family: courier; font-size: xx-small;">sudo apt install ocl-icd-opencl-dev</span></p><div>and then ran Maven in the <span style="font-family: courier;">opencl*</span> sub directories. This time, the Maven build completed successfully. <br /><br />However, running <span style="font-family: courier;">tornado</span> in the subsequent <span style="font-family: courier;">dist</span> folder still pukes but with something like:</div><div><br /><div><span style="font-family: courier; font-size: xx-small;">Caused by: uk.ac.manchester.tornado.api.exceptions.TornadoRuntimeException: OpenCL JNI Library not found</span></div><div><span style="white-space: normal;"><span style="font-family: courier; font-size: xx-small;"><span style="white-space: pre;"> </span>at tornado.drivers.opencl@0.15.1/uk.ac.manchester.tornado.drivers.opencl.OpenCL.<clinit>(OpenCL.java:68)</span></span></div><div><span style="white-space: normal;"><span style="font-family: courier; font-size: xx-small;"><span style="white-space: pre;"> </span>... 11 more</span></span></div><div><br /></div><div>Not what I was expecting. I found I needed to:</div><br /></div><div><span style="font-family: courier; font-size: xx-small;">cp ./tornado-drivers/opencl-jni/target/linux-amd64-release/cmake/libtornado-opencl.so $TORNADO_SDK/lib</span></div><div><br /></div><div>Where <span style="font-family: courier; font-size: x-small;">TORNADO_SDK</span> is pointing at the relevent <span style="font-family: courier;">dist</span> folder.<br /><br />Now, finally, you can run on the bare metal:</div><div><br /><div><span style="font-family: courier; font-size: xx-small;">$ tornado -cp target/classes/ example.MatrixMultiplication</span></div><div><span style="font-family: courier; font-size: xx-small;">Computing MxM of 512x512</span></div><div><span style="font-family: courier; font-size: xx-small;"><span style="white-space: pre;"> </span>CPU Execution: 1.21 GFlops, Total time = 222 ms</span></div><div><span style="font-family: courier; font-size: xx-small;"><span style="white-space: pre;"> </span>GPU Execution: 17.90 GFlops, Total Time = 15 ms</span></div><div><span style="font-family: courier; font-size: xx-small;"><span style="white-space: pre;"> </span>Speedup: 14x</span></div><div><br /></div><div>(Results from an old NVIDIA GeForce GTX 650)</div></div><p>Note, you'll need to also run it with the Graal JVM. Set both the <span style="font-family: courier;">PATH</span> and <span style="font-family: courier;">JAVA_HOME</span> environment variables to point to it.<br /><br /><u>Where now?</u><br /><br />This is a nice introduction to running Java on the GPU but it's just the start. There are many caveats. Example: what if your Java code throws an Exception? GPUs have no equivalent of exceptions so what happens then? More to come.</p>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-14735987371395401352023-10-12T02:12:00.000-07:002023-10-12T02:12:36.639-07:00Dependency hell<p>In these days of ChatGPT, it's easy to forget that most of the time, a developer isn't actually cutting code at all, but debugging it. This is my own personal hell in getting Spark and Kafka in Docker containers talking to a driver on the host.</p><p>Firstly, I was seeing <span style="font-family: courier; font-size: xx-small;">No TypeTag available</span> when my code was trying to use the Spark Encoders. <a href="https://stackoverflow.com/questions/73836319/scala-spark-encoders-productx-where-x-is-a-case-class-keeps-giving-me-no-ty">This SO answer</a> helped. Basically, my code is Scala 3 and "<span style="font-family: courier; font-size: xx-small;">Encoders.product[classa]</span> is a Scala 2 thing. This method accepts an implicit <span style="font-family: courier; font-size: x-small;">TypeTag</span>. There are no <span style="font-family: courier; font-size: x-small;">TypeTag</span>s in Scala 3". Yikes. This is probably one reason the upgrade path in Spark to Scala 3 is proving difficult. The solution I used was to create a SBT sub <span style="font-family: courier; font-size: xx-small;">Project</span> that was entirely Scala 2 and from here I called Spark.</p><p>The next problem was seeing my Spark jobs fail with:<br /></p><p><span style="font-family: courier; font-size: xx-small;">Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6) (172.30.0.7 executor 0): java.lang.ClassCastException: <b>cannot assign instance of scala.collection.generic.DefaultSerializationProxy to field org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition.inputPartitions</b> of type scala.collection.immutable.Seq in instance of org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition</span></p><p>This is a contender for the error message with the greatest misdirection. You think it's a serialization problem but it isn't directly so.</p><p>Although other Spark users have <a href="https://issues.apache.org/jira/browse/SPARK-19938">reported</a> it, Ryan Blue mentions that it isn't really a Spark issue but a Scala <a href="https://github.com/scala/bug/issues/9237">issue</a>.</p><p>Anyway, I tried all sorts of things like change my JDK (note the <span style="font-family: courier; font-size: xx-small;">sun.*</span> packages have been <a href="https://advancedweb.hu/a-categorized-list-of-all-java-and-jvm-features-since-jdk-8-to-21/">removed in later JDKs</a> so you need to follow the advice in <a href="https://stackoverflow.com/questions/73465937/apache-spark-3-3-0-breaks-on-java-17-with-cannot-access-class-sun-nio-ch-direct">this</a> SO answer). I tried creating an uber jar but was thwarted by duped dependencies [<a href="https://stackoverflow.com/questions/25144484/sbt-assembly-deduplication-found-error">SO</a>], <span style="font-family: courier; font-size: xx-small;">Invalid signature file digest</span> errors as some were signed [<a href="https://stackoverflow.com/questions/34855649/invalid-signature-file-digest-for-manifest-main-attributes-exception-while-tryin">SO</a>] that forced me to strip the signtures out [<a href="https://stackoverflow.com/questions/46040071/how-to-remove-specifics-files-from-maven-shaded-plugin">SO</a>] but still falling foul of Kafka's <span style="font-family: courier; font-size: xx-small;">DataSourceRegister</span> file being stripped out [<a href="https://stackoverflow.com/questions/48011941/why-does-formatkafka-fail-with-failed-to-find-data-source-kafka-even-wi">SO</a>].</p><p>The first step in the right direction came from <a href="https://stackoverflow.com/questions/73473309/cannot-assign-instance-of-scala-collection-immutable-listserializationproxy-to">here</a> and another <a href="https://stackoverflow.com/questions/39953245/how-to-fix-java-lang-classcastexception-cannot-assign-instance-of-scala-collect">SO question</a> where the SparkSession is recommended to be built by adding <span style="font-family: courier; font-size: xx-small;">.config("spark.jars", PATHS)</span> where <span style="font-family: courier; font-size: x-small;">PATHS</span> is a comma delimited string of the full paths of all the JARs you want to use. Surprisingly, this turned out to include Spark JARs themselves, including in my case <span style="font-family: courier; font-size: xx-small;">spark-sql-kafka-0-10_2.13</span> which oddly does not come as part of the Spark installation. By adding them as <span style="font-family: courier; font-size: x-small;">spark.jars</span>, they are uploaded into the <span style="font-family: courier; font-size: xx-small;">work</span> subdirectory of a Spark node.</p><p>After this, there was just some minor domain name mapping issues to clear up in both the host and container before the whole stack worked without any further errors being puked.</p>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-30096040263075661082023-10-09T01:59:00.003-07:002023-10-09T07:02:10.020-07:00My "What data science can learn from software engineering" presentation<p>Dr Chris Monit and I presented <a href="https://www.youtube.com/watch?v=0gkITkmMx3Y#t=5m13s">this</a> presentation at the London MLOps meetup last week. TL;DR: maximize your chances of a successful delivery in data science by adopting best practices that the software industry has established.<br /></p><table align="center" cellpadding="0" cellspacing="0" class="tr-caption-container" style="margin-left: auto; margin-right: auto;"><tbody><tr><td style="text-align: center;"><a href="https://blogger.googleusercontent.com/img/a/AVvXsEi0N-HsLWFM3ppjgBnIuAM31WcNh8VfsZJRzJ6ljYcJQqnRTPvEnd9hXjq_0bW371ybXQHkI4BAUQIudN7N_ZufmNibjRxs9fdOA9d8NfhbwEVXjAzoZDSaQRDiBL6FqupipbBca0LlIXMMcSIjl2J3S4CIqyZ7SlXpQtsc6Jq3HLs9QTBYKJok8bHTJx9U" style="margin-left: auto; margin-right: auto;"><img alt="" data-original-height="592" data-original-width="797" height="297" src="https://blogger.googleusercontent.com/img/a/AVvXsEi0N-HsLWFM3ppjgBnIuAM31WcNh8VfsZJRzJ6ljYcJQqnRTPvEnd9hXjq_0bW371ybXQHkI4BAUQIudN7N_ZufmNibjRxs9fdOA9d8NfhbwEVXjAzoZDSaQRDiBL6FqupipbBca0LlIXMMcSIjl2J3S4CIqyZ7SlXpQtsc6Jq3HLs9QTBYKJok8bHTJx9U=w400-h297" width="400" /></a></td></tr><tr><td class="tr-caption" style="text-align: center;">"Think of MLOps as the process of automating machine learning using DevOps methodologies" - Practical MLOps (O'Reilly)</td></tr></tbody></table><br /><p></p>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-77166302528097897932023-10-02T08:05:00.003-07:002023-10-02T08:05:52.857-07:00Packaging Python<p>Python build tools are unifying behind a common interface of <a href="https://drivendata.co/blog/python-packaging-2023">pyproject.toml</a>.<br /><a href="https://packaging.python.org/en/latest/tutorials/packaging-projects/">This</a> and <a href="https://dev.to/astrojuanlu/best-resources-on-python-packaging-fea">this</a> are great guides. The gist of the former is that you create a TOML file that conforms to a specification then you can use any build tool to run it. The gist of the latter is the whole Python packaging ecosystem.<br /><br />The salient commands for building and deploying with your TOML file are:<br /><br /><span style="font-family: courier; font-size: xx-small;">python3 -m build<br />python3 -m twine upload --repository pypi dist/*</span><br /><br />Note, you want to clean your dist directory first.</p><p><u>The Snag</u></p><p>The idea of using any Python build tool is not quite there yet. Poetry only implements a subset of the specification. Also, the specification has a leaky abstraction. On <a href="https://discord.com/channels/1024276579444072499/1024284272170905641/1143177045858328626">Discord</a>, Prof. Nick Radcliffe explains that the promise of using "any" lead him to naively use <span style="font-family: courier;">setuptools</span>.</p><p></p><blockquote><p>Nick Radcliffe — 08/21/2023 2:37 PM</p><p>Also, in case anyone is interested (related to packaging, above) I'm currently in the process of packaging a fairly large Python codebase using new-style packaging (<span style="font-family: courier;">pyproject.toml</span> rather than <span style="font-family: courier;">setup.py</span>). It wasn't quite my first use of it, but this project is much more complex. Initially, I chose <span style="font-family: courier;">setuptools</span> as the build backend, since (a) it didn't seem like it should matter much and (b) I didn't think I needed anything special. That was a big mistake for me: <b>it turns out the <span style="font-family: courier;">setuptools</span> back-end ignores almost everything except Python code in building your package</b>. Whereas my package (which has over 10k files) also have about 1,000 non-python files (everything from .txt and .json to shape files, CSV files, and HTML and markdown and all sorts). Some of these are needed for testing (which for some reason some people think don't need to be distributed...as if people shouldn't care about whether the installed software works in situ, rather than just on the developer's machine in the CI system), but others are needed just in the ordinary course of using the software. <span style="font-family: courier;">setuptools</span> has a way to let you include extra stuff, but it's very manual and would be very error-prone for me. Anyway, the TL;DR is that <b>I switched to Flit as the backend and everything "just worked"</b>. Not saying Flit will work better for you; but it sure as hell worked better for me!</p><p>Also, the reason I chose flit was that the third bullet in "<a href="https://flit.pypa.io/en/stable/rationale.html">Why use Flit?</a>" is "Data files within a package directory are automatically included. Missing data files has been a common packaging mistake with other tools."</p><p>It also says: "The version number is taken from your package’s version attribute, so that always matches the version that tools like pip see." Which also seems extremely sane (and probably I don't need to do the automatic updating of my pyproject.toml to do that.</p></blockquote><p></p><div><u>Success has many parents...</u></div><div><br /></div><div>... but it appears that PyPI packages have only one. Although the <span style="font-family: courier;">authors</span> tag can take a list, adding multiple entries is ignored. The reason is that it's best practise to use a mailing list (see <a href="https://bugs.python.org/msg93284">here</a>).</div><div><br /></div><div>And so my package to facilitate the creation of synthetic data now <a href="https://pypi.org/project/pysynic/">lives in PyPI</a> much like my Java code is deployed to <a href="https://mvnrepository.com/search?q=uk.co.odinconsultants&d=uk.co.odinconsultants">mvnrepository</a>.</div><div><br /></div>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-30807409316491120392023-09-25T02:06:00.001-07:002023-09-25T02:08:50.059-07:00Spark, Kafka and Docker<p>I want to run a Spark Structured Streaming application that consumes from a Kafka cluster all within Docker. I've finally got it working [messy code <a href="https://github.com/PhillHenry/StreamingPlayground/blob/main/modules/core/src/main/scala/uk/co/odinconsultants/sss/SparkStructuredStreamingMain.scala">here</a> in my GitHub], but it was not without its own pain.</p><p>The biggest problem is getting all the components talking to each other. First, you need a <a href="https://docs.docker.com/network/drivers/bridge/">bridge network</a>. "In terms of Docker, a bridge network uses a software bridge which allows containers connected to the same bridge network to communicate, while providing isolation from containers which are not connected to that bridge network." [docs]. Think of it as giving your containers their own namespace.</p><p>Secondly, the Spark worker needs to connect to Kafka, the Spark master and the Spark driver. The first two are just a matter of mapping the Spark master and Kafka containers in the worker. What's harder is getting the worker to talk to the driver that may be running on the computer that hosts Docker.</p><p>One sign you've got it wrong is if you see "<span style="font-family: courier; font-size: xx-small;">Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources</span>" [<a href="https://stackoverflow.com/questions/38118572/initial-job-has-not-accepted-any-resources-check-your-cluster-ui-to-ensure-that">SO</a>] in your driver logs. This message is a little ambiguous as it may have nothing to do with resources but connectivity. </p><p></p><table align="center" cellpadding="0" cellspacing="0" class="tr-caption-container" style="margin-left: auto; margin-right: auto;"><tbody><tr><td style="text-align: center;"><a href="https://blogger.googleusercontent.com/img/a/AVvXsEgx6hM95xx3SelgVoFfPrvnVpWX1Gpl-wlysTrFyxbfssh24bYoi1kHBY4A-dFLHc4ia_s-oU9tlQUmqTN_zE5Vz_bWX6ukGMMUMWc0-Fe0SuKjtSypjnUEW8e8NVQTekpHXJNHV4Teney2km0MS8PVllGwKnxjxgzWBv0kyOpP_otaRgAcw_aADamO54E5" style="margin-left: auto; margin-right: auto;"><img alt="" data-original-height="299" data-original-width="1896" height="101" src="https://blogger.googleusercontent.com/img/a/AVvXsEgx6hM95xx3SelgVoFfPrvnVpWX1Gpl-wlysTrFyxbfssh24bYoi1kHBY4A-dFLHc4ia_s-oU9tlQUmqTN_zE5Vz_bWX6ukGMMUMWc0-Fe0SuKjtSypjnUEW8e8NVQTekpHXJNHV4Teney2km0MS8PVllGwKnxjxgzWBv0kyOpP_otaRgAcw_aADamO54E5=w640-h101" width="640" /></a></td></tr><tr><td class="tr-caption" style="text-align: center;">Resources seem to be fine</td></tr></tbody></table><p></p><p>To solve it, you need the driver to define <span style="font-family: courier; font-size: xx-small;">spark.driver.host</span> and <span style="font-family: courier; font-size: xx-small;">spark.driver.port</span>. For the host, we need it to be the magic address of <span style="font-family: courier; font-size: xx-small;">172.17.0.1</span>. This is the default "IP address of the gateway between the Docker host and the bridge network" [<a href="https://docs.docker.com/network/network-tutorial-standalone/">docs</a>]. The port is arbitrary.</p><p>[Aside: it's also worth ensuring that the all components are running the exact same version of Spark. I saw a rare error <span style="font-family: courier; font-size: xx-small;">ERROR Inbox: Ignoring error java.lang.AssertionError: assertion failed: CPUs per task should be > 0</span> and the only thing Google produced was <a href="https://github.com/bitnami/charts/issues/14391">this</a> Bitnami ticket. Ensuring all version were the same made it go away.]</p><p>What's more, the worker needs these in its config. You can pass it the host and port with something like <span style="font-family: courier; font-size: xx-small;">SPARK_WORKER_OPTS="-Dspark.driver.host=172.17.0.1 -Dspark.driver.port=SPARK_DRIVER_PORT"</span> in its start up script.</p><p>But there is one last gotcha. If still can't get things to work, you might want to login to your worker container and run <span style="font-family: courier; font-size: xx-small;">netstat</span>. If you see the connection to the driver in a state of <span style="font-family: courier; font-size: xx-small;"><a href="https://javaagile.blogspot.com/2012/08/jconsole-and-firewalls.html">SYN_SENT</a></span>, your firewall on the host is probably blocking the connection from the container.</p><p>Annoyingly, you probably won't see any error messages being puked from the Driver. It will just hang somwhere near <span style="font-family: courier; font-size: xx-small;">org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:929)</span>. I only started seeing error messages when I aligned all version of Spark (see above) and it read: <span style="font-family: courier; font-size: xx-small;">java.io.IOException: Connecting to /172.17.0.1:36909 timed out (120000 ms) </span></p><p>Looking in that Docker container showed:<br /></p><p><span style="font-family: courier; font-size: xx-small;">bash-5.0# netstat -nap<br />Active Internet connections (servers and established)<br />Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name <br />...<br />tcp 0 1 192.168.192.7:53040 172.17.0.1:<b>36909</b> <b>SYN_SENT</b> 1109/java</span></p><div>and on the host machine where my process is 29006:<br /><br /><div><span style="font-family: courier; font-size: xx-small;">(base) henryp@adele:~$ netstat -nap | grep 36909</span></div><div><span style="font-family: courier; font-size: xx-small;">tcp6 0 0 172.17.0.1:<b>36909</b> :::* LISTEN 29006/java </span> </div></div><p>Aha, that looks like the problem. It turns out that I have to open the firewall for the block manager too and set a static port for it on the Driver with <span style="font-family: courier; font-size: xx-small;"><a href="https://stackoverflow.com/questions/74206612/how-to-handle-dynamic-port-in-apache-spark-hadoop-yarn">spark.driver.blockManager.port</a></span>.</p><p>Finally, you should be able to have a Spark master and worker plus Kafka instances all running within Docker along with the driver running on the host using your favourite IDE.</p>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-30944503944875988932023-08-30T04:10:00.000-07:002023-08-30T04:10:10.207-07:00Can we apply ML to logging?<p>Kibana is a Typescript/Javascript product to create visuals of logs. <a href="https://github.com/opensearch-project/OpenSearch">OpenSearch</a>'s <a href="https://github.com/opensearch-project/opensearch-dashboards">Dashboards</a> is the Apache licensed fork of this. Kibana is great when you know what you are looking for. But what if you don't?</p><p><u>Example</u></p><p>I have a small Kafka cluster of three nodes using the <a href="http://javaagile.blogspot.com/2023/07/kafka-quorum-in-docker.html">Raft</a> protocol. I send messages then check a consumer has read all the messages. This integration test passes every time. There are no ERRORs. However, every so often, this test takes over 2 minutes when it should take about 20 seconds.</p><p>The number of lines on a good run is 4.5k and on the bad run about 20k. Twenty thousand lines is a lot to go through when you don't know what you're looking for.</p><p>I slightly adapted the code <a href="http://ethen8181.github.io/machine-learning/recsys/content_based/lsh_text.html">here</a> to turn my logs into <a href="https://javaagile.blogspot.com/2017/02/tweaking-tf-idf.html">TF-IDF</a> vectors and used <a href="https://javaagile.blogspot.com/2016/02/locality-sensitive-hashing-in-tweets.html">Locality Sensitive Hashing</a> to map them to a lower dimensional space. Now, we can visualise what's going on. </p><p>The good run looks like this:</p><p><br /></p><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/a/AVvXsEgIEsu-09rV09MdN_R_M4AsEdj4n3FHeRzvoQdr37fdSi5Ck9Wm6dm_FhrUJm_BG_GDOlTu_odGjltPsdxIz255UNQMagYv63gx4b_krpESRjgqlaRveSmhLm93K0EBtKW3JWWytz-cViNdVjVqpe40qaNMNgSXIgQXoB93tuZJYMm3cokAQAr31MJZC1kk" style="margin-left: 1em; margin-right: 1em;"><img alt="" data-original-height="544" data-original-width="1375" height="254" src="https://blogger.googleusercontent.com/img/a/AVvXsEgIEsu-09rV09MdN_R_M4AsEdj4n3FHeRzvoQdr37fdSi5Ck9Wm6dm_FhrUJm_BG_GDOlTu_odGjltPsdxIz255UNQMagYv63gx4b_krpESRjgqlaRveSmhLm93K0EBtKW3JWWytz-cViNdVjVqpe40qaNMNgSXIgQXoB93tuZJYMm3cokAQAr31MJZC1kk=w640-h254" width="640" /></a></div><br /><br />Note that there are two dominant lines that map to:<br /><p></p><p><span style="font-family: courier; font-size: xx-small;">[2023-07-04 14:13:10,089] INFO [TransactionCoordinator id=2] Node 1 disconnected. (org.apache.kafka.clients.NetworkClient)<br />[2023-07-04 14:13:10,089] WARN [TransactionCoordinator id=2] Connection to node 1 (localhost/127.0.0.1:9091) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)</span></p><div>repeated over and over for about 10 seconds.</div><p>The bad run looks like this:<br /><br /></p><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/a/AVvXsEiJzJ3JG3ifu0qZVDi7HINpkk0jqdsyqjXOVEg2rIyKI8zXlRqIEuX11V51ghseqKu5A236w3AKBfWpHTkPJ8NIUKScUgKs-MrZ0IUjzhG6W2FAG-dv3tKuO7fluvUrEOAVoWOL1uqovCo-9UJM2ZFxh4sihYfsSEBe_SCymqe4gs9-j1AXjKPpK36YmVZ0" style="margin-left: 1em; margin-right: 1em;"><img alt="" data-original-height="549" data-original-width="1371" height="256" src="https://blogger.googleusercontent.com/img/a/AVvXsEiJzJ3JG3ifu0qZVDi7HINpkk0jqdsyqjXOVEg2rIyKI8zXlRqIEuX11V51ghseqKu5A236w3AKBfWpHTkPJ8NIUKScUgKs-MrZ0IUjzhG6W2FAG-dv3tKuO7fluvUrEOAVoWOL1uqovCo-9UJM2ZFxh4sihYfsSEBe_SCymqe4gs9-j1AXjKPpK36YmVZ0=w640-h256" width="640" /></a></div><br /><br />Here, the dominant lines in the diagram that are from:<p></p><p><span style="font-family: courier; font-size: xx-small;">[2023-07-04 14:16:21,755] WARN [TransactionCoordinator id=2] Connection to node 1 (localhost/127.0.0.1:9091) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)<br />[2023-07-04 14:16:21,805] INFO [TransactionCoordinator id=2] Node 1 disconnected. (org.apache.kafka.clients.NetworkClient)<br />[2023-07-04 14:16:21,805] WARN [TransactionCoordinator id=2] Connection to node 3 (localhost/127.0.0.1:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)<br />[2023-07-04 14:16:21,805] INFO [TransactionCoordinator id=2] Node 3 disconnected. (org.apache.kafka.clients.NetworkClient)</span></p><div>again being repeated but this time, it lasts for about 2 minutes.</div><div><br /></div><div>[The code in Ethen Lui's GitHub is really quite clever. Rather than using MinHashing, he's projecting the feature vectors against some randomly generated vectors and making a bit map from it. This can be turned into a single integer which represents the feature's bucket. Note that the number of vectors does not really change the dimensionality of the space but it does change how consistent different runs are - more vectors leads to greater repeatability]</div><div><br /></div><div>Still at something of a loss, I checked out Bitnami's Kafka instances (<a href="https://github.com/bitnami/containers.git">here</a>), changed the logging in <span style="font-family: courier; font-size: xx-small;">bitnami/kafka/3.5/debian-11/rootfs/opt/bitnami/scripts/kafka/postunpack.sh</span> by adding the line:</div><div><br /></div><div><div><span style="font-family: courier; font-size: x-small;">replace_in_file "${KAFKA_CONF_DIR}/log4j.properties" "INFO" "DEBUG"</span></div></div><div><br /></div><div>and built the Docker image again. Now it gives me DEBUG statements.</div><div><br /></div><div><u>Fix</u></div><div><br /></div><div>The problem of non-determinism is still foxing me but the solution became clear with all these mentions of <span style="font-family: courier; font-size: x-small;">localhost</span>. We need the client to communicate with the cluster on <span style="font-family: courier; font-size: x-small;">localhost</span> because the client is unaware that the Kafka instances are hosted in Docker. However, each broker does need to know it's talking to another Docker container as the ports of its peers are not available within its own sandbox. </div><div><br /></div><div>The solution was to use slightly different values for the listeners as the advertised listeners (<span style="font-family: courier; font-size: xx-small;">KAFKA_CFG_LISTENERS</span> vs <span style="font-family: courier; font-size: xx-small;">KAFKA_CFG_ADVERTISED_LISTENERS</span>. Note that Bitnami expects environment variables prepended with <span style="font-family: courier; font-size: x-small;">KAFKA_CFG_</span> and periods as underscores before it converts them into a Kafka-friendly <span style="font-family: courier; font-size: xx-small;">server.properties</span> file). </div><div><br /></div><div>The listeners were of the form <span style="font-family: courier;">OUTSIDE://:9111</span> while the <i>advertised</i> listeners were of the form <span style="font-family: courier;">OUTSIDE://localhost:9111</span>. The label <span style="font-family: courier;">OUTSIDE</span> apparently is arbitrary. It's just used as a reference, say in <span style="font-family: courier; font-size: xx-small;">listener.security.protocol</span> (in Kafka-speak; munge with the necessary Bitnami mappings to make it appear in <span style="font-family: courier; font-size: x-small;">server.properties</span>) where you'll see something like <span style="font-family: courier; font-size: xx-small;"><b>OUTSIDE</b>:PLAINTEXT</span>. </div><div><br /></div><div><u>Conclusion</u></div><div><br />Although I've fixed the Kafka issue I was facing, applying ML to the logs was only a partial help. I still need to understand the Kafka Raft code better before it can truly be of use.</div><div><br /></div>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-82959551389541565372023-07-06T03:07:00.005-07:002023-07-07T02:07:48.662-07:00Kafka Raft in Docker<p>These days, you don't need Zookeeper to run a Kafka cluster. Instead, when correctly configured, Kafka uses the <a href="https://softwaremill.com/implementing-raft-using-project-loom/">Raft</a> algorithm (where "the nodes trust the elected leader"[<a href="https://en.m.wikipedia.org/wiki/Raft_(algorithm)">Wikipedia</a>]) to coordinate itself.</p><p>I started to follow Gunnar Morling's <a href="https://www.morling.dev/blog/exploring-zookeeper-less-kafka/">blog</a> but it seems his version of Kafka containers have not been updated so I used <a href="https://hub.docker.com/r/bitnami/kafka/">Bitnami's</a>. However, configuring them to run a Raft cluster proved difficult.</p><p>I want to programatically create the cluster rather than use docker-compose as I want greater control over it. So, I wrote <a href="https://github.com/PhillHenry/KafkaPlayground/blob/main/modules/core/src/main/scala/uk/co/odinconsultants/kafka/KafkaDemoMain.scala">this</a> code that talks to Docker via it's API using a Java library. </p><p>Firstly, the Kafka instances couldn't see each other. <br /><br />Diagnosing the containers proved difficult as I could not install <a href="https://javaagile.blogspot.com/2022/05/the-cli-for-busy-data-scientists-and.html">my favourite Linux tools</a>. When I tried, I was told directory <span style="font-family: courier; font-size: x-small;">/var/lib/apt/lists/partial</span> is missing. This seems to be deliberate as the <a href="https://github.com/bitnami/containers/blob/main/bitnami/kafka/3.5/debian-11/Dockerfile">Dockerfile</a> explicitly deletes it, <a href="https://docs.docker.com/develop/develop-images/dockerfile_best-practices/#run">to keep images slim</a>. So, I took out that line and added:</p><p><span style="font-family: courier; font-size: xx-small;">RUN apt-get update && apt-get upgrade -y && \<br /> apt-get clean && apt-get update && \<br /> apt-get install -y net-tools && \<br /> apt-get install -y iputils-ping && \<br /> apt-get install -y procps && \<br /> apt-get install -y lsof</span></p><div>then rebuilt the containers. [Aside: use <span style="font-family: courier; font-size: x-small;">ps <b>-ax</b></span> to see all the processes in these containers. I was stumped for a while not seeing the Java process that I knew was running].</div><div><br /></div><div>Using these Linux tools, I could see the containers could not even <span style="font-family: courier; font-size: x-small;">ping</span> each other. Oops, I need to create a <a href="https://stackoverflow.com/questions/59036198/how-to-connect-to-custom-network-when-building-image-container-in-docker-java-ap">Docker network</a> [SO] and add it to the containers. Now, their logs show that the Kafka containers are at least starting and talking to each other. </div><div><br /></div><div>However, the client running on the host machine was puking lots of messages like "<span style="font-family: courier; font-size: xx-small;">Cancelled in-flight API_VERSIONS request with correlation id 1 due to node -1 being disconnected</span>". First, I <a href="https://stackoverflow.com/questions/43217248/exception-on-apache-kafka-error-getting-request-for-apikey-3-and-apiversion">checked</a> [SO] that the Kafka client library and container were both version 3. But the consensus on the internet appears to be that this error is due to a connection failure. </div><div><br /></div><div>Using <span style="font-family: courier; font-size: x-small;">netstat</span> on the host showed that the host port was indeed open. But this seemed to be due to Docker opening the port to map it to its container but the container not <span style="font-family: courier; font-size: x-small;">LISTEN</span>ing on that port. It appears you can tell Kafka on which port to listen with an environment variable that looks like: <br /><br /></div><div><span style="font-family: courier; font-size: xx-small;">KAFKA_CFG_LISTENERS=PLAINTEXT://:$hostPort,CONTROLLER://:$controllerPort</span></div><div><br /></div><div>where <span style="font-family: courier; font-size: x-small;">hostPort</span> is what you want Docker to map and <span style="font-family: courier; font-size: x-small;">controllerPort</span> corresponds to what is in the <span style="font-family: courier; font-size: x-small;">KAFKA_CFG_CONTROLLER_QUORUM_VOTERS</span> environment variable.</div><div><br /></div><div>The next problem was when my client connects, it cannot see the machine called <span style="font-family: courier;">kafka2</span>. What's happening here is that having connected to the bootstrap, the client is asked to contact another machine, in this case something called <span style="font-family: courier;">kafka2</span>. </div><div><br /></div><div>Now, the JVM running on the host knows nothing about a network that is internal to Docker. To solve this, you could have Docker use the <span style="font-family: courier;"><a href="https://docs.docker.com/network/drivers/host/">host</a></span> network (which means that everything running on the machine can see each other - fine for testing but a security nightmare). You could subvert the JVM's DNS mappings (rather than faffing around with a <a href="https://stackoverflow.com/questions/37242217/access-docker-container-from-host-using-containers-name">DNS proxy</a>) using <a href="https://burningwave.github.io/tools/#configuring-host-resolution">BurningWave</a> or Java 18's <a href="https://docs.oracle.com/en/java/javase/18/docs/api/java.base/java/net/spi/InetAddressResolverProvider.html"><span style="font-family: courier; font-size: x-small;">InetAddressResolverProvider</span></a>. But perhaps the simplest way is configuring Kafka itself to advertise itself as <span style="font-family: courier;"><a href="https://www.confluent.io/blog/kafka-listeners-explained/">localhost</a></span> [Confluent] using the <span style="font-family: courier;">KAFKA_CFG_ADVERTISED_LISTENERS</span> environment variable.</div><div><br /></div><div>And that was it: a Kafka cluster running on my laptop that was reading and writing messages using the Raft algorithm. There are still a few lose ends: why on some runs a node drops out of the cluster non-deterministically even if the functionality was correct as far as the client was concerned. I'll solve that another day.</div><div><br /></div>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-3110598642201156322023-06-25T01:27:00.004-07:002023-09-06T05:18:05.442-07:00Diagnosing K8s Installations<p>Well, this was much harder than I thought. I tried to update Minikube and my local Kubernetes installation.</p><p>First, I installed everything I thought I needed:<br /><br /><span style="font-family: courier; font-size: xx-small;">sudo apt-get install -y kubelet kubeadm kubectl kubernetes-cni</span><br /><br />and then download and install Minikube per the instructions. Unfortunately, when I did, I'd see it complain that possibly "<span style="font-family: courier; font-size: xx-small;">the kubelet is not running</span>".<br /><br />Running:</p><p><span style="font-family: courier; font-size: xx-small;">systemctl status kubelet</span><br /><br />showed that kubelet was exiting with error code 1.</p><p>Running:<br /><br /><span style="font-family: courier; font-size: xx-small;">journalctl -fu kubelet</span></p><p>showed it puking stack traces but this line is the salient one:<br /><br /><span style="font-family: courier; font-size: xx-small;">... failed to run Kubelet: running with swap on is not supported, please disable swap! or set --fail-swap-on flag to false...</span></p><p>But where do I put it? As it happened, I had to add <span style="font-family: courier; font-size: xx-small;">--fail-swap-on=false</span> to the <span style="font-family: courier; font-size: xx-small;"><a href="https://askubuntu.com/questions/1077778/how-do-i-pass-flags-when-starting-a-systemd-service">ExecStart</a></span> line in <span style="font-family: courier; font-size: xx-small;">/etc/systemd/system/kubelet.service.d/10-kubeadm.conf</span> (I found this by <span style="font-family: courier; font-size: xx-small;">grep</span>ping <span style="font-family: courier; font-size: xx-small;">ExecStart</span> in <span style="font-family: courier; font-size: xx-small;">/etc/</span>). You then run:<br /><br /><span style="font-family: courier; font-size: xx-small;">sudo systemctl daemon-reload</span><br /><br />to have changes recognised. Then, it's a matter of configuring Kubernetes system wide:</p><p><span style="font-family: courier; font-size: xx-small;">sudo kubeadm init --ignore-preflight-errors=Swap</span><br /><br />(I needed to ignore the fact that my system has <span style="font-family: courier; font-size: x-small;">Swap</span> space as I'm only working on proof of concepts, not running a K8s cluster in production. Using or not using swap space is nuanced - see <a href="https://chrisdown.name/2018/01/02/in-defence-of-swap.html?utm_source=pocket_saves">here</a> for more information. Basically, what is best depends on the situation. TL;DR; it can speed things up but also delay the inevitable death of a pathological system). </p><p>You can run <span style="font-family: courier; font-size: xx-small;">kubeadm reset</span> if you foul things up.</p><p>Also, I had to <span style="font-family: courier; font-size: xx-small;">rm -rf <a href="https://stackoverflow.com/questions/72334044/the-connection-to-the-server-localhost8080-was-refused-did-you-specify-the-r">$HOME/.kube</a></span> and <span style="font-family: courier; font-size: xx-small;">$HOME/.minikube</span> since <span style="font-family: courier; font-size: xx-small;">kubectl config view</span> showed me a profile that was literally years out of data. The <span style="font-family: courier; font-size: x-small;">.kube</span> config can be regenerated with:<br /></p><p><span style="font-family: courier; font-size: xx-small;">mkdir -p $HOME/.kube<br />sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config<br />sudo chown $(id -u):$(id -g) $HOME/.kube/config</span><br /><br />(lines that <span style="font-family: courier; font-size: x-small;">kubeadm</span> kindly told me to run). Now, running <span style="font-family: courier; font-size: xx-small;">kubectl get all</span> gives me a sensible output.<br /><br />The story doesn't quite end there. After a few hibernations of my laptop, minikube was failing to start again with (apparently) certificate errors. This <a href="https://github.com/kubernetes/minikube/issues/13638">issue</a> helped me: I executed <span style="font-family: courier; font-size: xx-small;">minikube delete && minikube start</span> and all was well with the World once more.</p>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-87309978016736124952023-06-25T01:04:00.000-07:002023-06-25T01:04:12.615-07:00I've got a fever, and the only prescription is MOR COW (bell)<p>Apache Iceberg deals with updates in two different ways:<br /></p><ul style="text-align: left;"><li>Merge on read</li><li>Copy on write</li></ul><p></p><p>What are these strategies? To illustrate, I've some BDD code that writes and updates 20 rows of data using both <a href="https://github.com/PhillHenry/IcebergPlayground/blob/main/src/test/scala/uk/co/odinconsultants/iceberg/MergeOnReadSpec.scala">MOR</a> and <a href="https://github.com/PhillHenry/IcebergPlayground/blob/main/src/test/scala/uk/co/odinconsultants/iceberg/CopyOnWriteSpec.scala">COW</a>. (There's an interesting <a href="https://www.dremio.com/blog/row-level-changes-on-the-lakehouse-copy-on-write-vs-merge-on-read-in-apache-iceberg/">article</a> at Dremio about this topic but some of the code is Dremio specific).</p><p>Simply put, <i>copy-on-write</i> replaces the entire parquet file if a single row in updated. </p><p>In <i>merge-on-read</i>, a file containing only the updated data (and a file saying which row was affected) are written. It is the reader that needs to reconcile the data.</p><p>The two strategies are for different audiences. COW makes writes slow and reads fast. MOR makes reads slow and writes fast.</p><p>Focussing on the more complex of the two strategies, MOR, we see that it creates <i>four</i> new files compared to COW's two. This maps to <i>two</i> different parquet files (as each parquet file in both strategies has a <span style="font-family: courier;">.crc</span> file that holds its metadata). One file contains the updated data, the other a <i>textual</i> <i>reference</i> to the original file containing the original data. No files are deleted and the original parquet file remains the same. No data is redundant.</p><p>Iceberg has other means of storing the delete data other than position in the file. It also can check equality. However, this appears to be <a href="https://github.com/apache/iceberg/issues/6196">not supported by Spark</a> at the moment.</p><p>(BTW, the title of this post refers to <a href="https://www.youtube.com/watch?v=tPh7OZew5oo">this</a> SNL sketch).</p>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-54628907400469951062023-06-19T02:13:00.000-07:002023-06-19T02:13:00.880-07:00Data Contracts<p>What they are? "Data Contracts are first and foremost a cultural change toward data-centric collaboration" [Chad Sanderson's <a href="https://dataproducts.substack.com/p/the-rise-of-data-contracts">blog</a> and <a href="https://dataproducts.substack.com/p/an-engineers-guide-to-data-contracts?utm_source=substack&utm_campaign=post_embed&utm_medium=web">here</a>]</p><p><u>Examples of why they're necessary</u></p><p>A company wants to find the average of a consumer for a particular product. They find the age is 42. They are somewhat suprised by this as it's older than they expected so they check their workings - add all the ages and divide by the number of customers who bought it. After confirming the maths, the value is indeed 42 and they report it to their boss. Unfortunately, the mean age was artificially inflated because a subset of customers had an age of '999' because the system that captured that data used it as a placeholder for 'unknown'.</p><p>The next example actually happened. We were measuring average length of stay (LoS) in hospitals. When sampling the data, everything looked fine. But out of the millions of patients, a very small number (~30) had a discharge date of 1/1/1900. Clearly, the system that captured that data used this value as a token for 'unknown'. This erroneously reduced the overall LoS. The data bug was only caught when drilling down into individual hospitals, some average LoS figures were negative. Until then, we were merrily reporting the wrong national figure.</p><p><u>Is it a purely cultural problem?</u></p><p>The trouble about cultural solutions is that they depend on unreliable units called "humans". For instance, a friend of mine was the victim of an upstream, breaking change originating in the Zurich office. When he contacted the team, they were unaware of both him, his London team and the fact they were consumers of this data.</p><p>I asked on the <a href="https://lists.apache.org/thread/z1oyoqdl40wcxwbkmxpkp3hlzfrp661g">Apache dev mailing lists</a> if we could implement a more robust, technical solution for Spark. Watch this space for developments.</p><p><u>Possible solutions</u></p><p>Andrew Jones (who is writing a book on data contracts) <a href="https://medium.com/gocardless-tech/implementing-data-contracts-at-gocardless-3b5c49074d13">uses</a> JSON Schema to validate his data. "JSON Schema is a powerful tool for validating the structure of JSON data" [JSON Schema <a href="https://json-schema.org/understanding-json-schema/">docs</a>]. </p><p><a href="https://medium.com/@teabot">Elliot West</a> of Dreamio (see the mailing list traffic) also favours JSON Schema. However, because JSON has only a few data types (strings, arrays, numericals etc) it's not rich enough to enforce constraints like "date X must be before date Y".<br /><u><br />Implementations</u></p><div style="line-height: 1.38; margin-bottom: 0pt; margin-top: 0pt; text-align: left;"><span style="font-family: inherit;"><span style="background-color: transparent; color: black; font-style: normal; font-variant: normal; font-weight: 400; text-decoration: none; vertical-align: baseline; white-space: pre-wrap;">This is a new area of development but Databrick's Delta Live Tables (</span><a href="https://www.databricks.com/product/delta-live-tables" style="text-decoration: none;"><span style="background-color: transparent; color: #1155cc; font-style: normal; font-variant: normal; font-weight: 400; text-decoration-skip-ink: none; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">DLT</span></a><span style="background-color: transparent; color: black; font-style: normal; font-variant: normal; font-weight: 400; text-decoration: none; vertical-align: baseline; white-space: pre-wrap;">) claims it can “prevent bad data from flowing into tables through validation and integrity checks and avoid data quality errors with predefined error policies (fail, drop, alert or quarantine data).”
</span></span><span style="font-family: inherit;"><span style="background-color: transparent; color: black; font-style: normal; font-variant: normal; font-weight: 400; text-decoration: none; vertical-align: baseline; white-space: pre-wrap;"><br /></span></span><span style="font-family: inherit;"><span style="background-color: transparent; color: black; font-style: normal; font-variant: normal; font-weight: 400; text-decoration: none; vertical-align: baseline; white-space: pre-wrap;">Unfortunately, it seems to be Python-only: </span><span style="background-color: transparent; color: black; font-style: normal; font-variant: normal; font-weight: 400; text-decoration: none; vertical-align: baseline; white-space: pre-wrap;">“Can I use Scala or Java libraries in a Delta Live Tables pipeline? </span></span><span style="font-family: inherit; font-variant-alternates: normal; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space-collapse: preserve;">No, Delta Live Tables supports only SQL and Python. You cannot use JVM libraries in a pipeline. Installing JVM libraries will cause unpredictable behavior, and may break with future Delta Live Tables releases.” [</span><a href="https://docs.databricks.com/delta-live-tables/external-dependencies.html" style="font-family: inherit; text-decoration-line: none;"><span style="color: #1155cc; font-variant-alternates: normal; font-variant-east-asian: normal; font-variant-numeric: normal; text-decoration-line: underline; text-decoration-skip-ink: none; vertical-align: baseline; white-space-collapse: preserve;">docs</span></a><span style="font-family: inherit; font-variant-alternates: normal; font-variant-east-asian: normal; font-variant-numeric: normal; vertical-align: baseline; white-space-collapse: preserve;">]</span></div><p></p>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-38227768068973577892023-06-09T02:17:00.002-07:002024-01-03T03:56:34.685-08:00Modern DevOps Tools<p>Some tools that I've had too little time to investigate thoroughly.<br /><br /><u>TestContainers</u><br />The free and open source <a href="https://www.testcontainers.org/features/configuration/">TestContainers</a> offers huge convenience to developers. For instance, you can fire up a <i>very</i> lightweight Postgres container in just a second or two. This ZIO SQL test (<a href="https://github.com/zio/zio-sql/blob/1dc612e2efbdad85526933875847242406df64e8/postgres/src/test/scala/zio/sql/postgresql/AgregationSpec.scala">AgregatingSpec</a>) ran in just 3.85s on my laptop. In that time, it started a Docker container, populated the Postgres database in it with test data, ran some Scala code against it then tore down the container. The container can last as long as the JVM so all your tests can use it before it detects the JVM is exiting whereupon it will kill the container.<br /><br /><u>MinIO</u><br />If you need to run S3 API compatible storage locally, you can try <a href="https://github.com/minio/minio">MinIO</a>. It's written in Go and open source and allows you to have a local Docker container emulating Amazon storage. <br /><br /><u>DuckDB</u><br />This <a href="https://github.com/duckdb/duckdb">open source</a>, C++ application allows you to run SQL against Parquet files without having to fire up a whole platform. You can even run <a href="https://www.reddit.com/r/dataengineering/comments/10wd2e0/comment/j7n52p2/?utm_source=share&utm_medium=web2x&context=3">DBeaver against it</a>.</p><p><u>Crossplane</u><br /><a href="https://github.com/crossplane/crossplane">Crossplane</a> is an open source Go project that "connects your Kubernetes cluster to external, non-Kubernetes resources, and allows platform teams to build custom Kubernetes APIs to consume those resources." [<a href="https://docs.crossplane.io/v1.11/getting-started/introduction/">docs</a>]</p><p><u>Scala Native</u><br />You can now convert Scala code to stand alone executable binaries using <a href="https://www.baeldung.com/scala/native-apps-scala-native">Scala Native</a> [baeldung]. It currently only works with single threaded applications. The output can be converted to WebAssembly...</p><p><u>WebAssembly</u><br /><a href="https://en.wikipedia.org/wiki/WebAssembly">Wikipedia</a> describes WebAssembly as "a portable binary-code format and a corresponding text format for executable programs ... for facilitating interactions between such programs and their host environment." It is an "open standard and aims to support any language on any operating system".</p><p><u>Tapir</u><br />Is a type-safe, Scala library that documents HTTP endpoints.</p><p><u>GraphQL</u><br /><a href="https://github.com/graphql/graphql-spec">GraphQL</a> is a type system, query language, etc accessible through a single endpoint that only returns what is asked of it and no surplus information. It's a spec and there are implementations in a number of languages. The graph bit comes in insofar a "query is a path in the graph, going from the root type to its subtypes until we reach scalar types with no subfields." [<a href="https://dev.to/bogdanned/the-graph-in-graphql-1l99">Bogdan Nedelcu</a>]</p><p><u>LLVM</u><br />LLVM is an open source tool chain written in C++. The 'VM' in LLVM originally stood for Virtual Machine but these days but this is no longer the case. Instead of being a virtual machine, it turns any major language into a common intermediate code that can then be turned to machine code. </p><p><u>GraalVM</u><br /><a href="https://github.com/oracle/graal">GraalVM</a> is an open source JDK and JRE written in Java itself and has its roots in project Maxine. But it's more than that. It offers compilation to native code as well as supporting polyglot code via its Truffle framework, a language-agnostic AST.</p><p><u>Quarkus</u><br />Based on GraalVM (above), <a href="https://github.com/quarkusio/quarkus">Quarkus</a> is an open source Java Framework tailored for Kubernetes. Since the JVM code is natively compiled, startup and memory sizes are small.</p><p><u>Spring Boot</u><br />Is an "opinionated" Java framework that favours convention-over-configuration and runs Spring apps with the minimum of fuss.</p><p><u>Python/Java Interop</u><br />Together, Python and Java both dominate the data engineering landscape. These languages can interoperate via <a href="https://github.com/py4j/py4j">Py4J</a> which uses sockets to allow Python to invoke Java code and <a href="https://github.com/jython/jython">Jython</a> which runs Python code wholely inside the JVM. Py4J is used extensively in Spark to allow PySpark devs to talk to Spark JVMs. <br />Jython, unfortunately, does not support Python 3.</p><p><u>Project Nessie</u><br />Nessie is an open source, JVM project that promises to do to big data what Git did to code: versioning, branching etc. It apparently sits nicely on top of <a href="https://projectnessie.org/features/intro/">Iceberg and DataBricks</a>.</p><p>The <a href="https://github.com/treeverse/lakeFS">lakeFS</a> project is a open source, Go project that offers similar functionality.</p><p><u>Cloud native CI/CD</u><br /><a href="https://tekton.dev/">Tekton</a> that is written in <a href="https://github.com/tektoncd">GoLang</a>.<br /><a href="https://github.com/argoproj/argoproj">Argo</a> is a Python based, Kubernetes native tool. For instance, it handles rolling deployments building on K8's <span style="font-family: courier; font-size: x-small;">RollingUpdate</span> strategy which does not natively control traffic flow during an update. <br />CircleCI seems to be mostly closed source.<br /><br /><u>Pipelines</u><br />Interestingly, CI/CD and data pipelines both use directed acycliclic graphs but with very different intent. User Han on Discord eloquently spelled out the architectural distinction:</p><p></p><blockquote>Specifically the reason is in batch data [Pipeline] processing, we tend to scale things out horizontally by a whole lot, sometimes using GPUs. This is not a common feature supported by CI/CD workflow tools. In summary: <p>Jenkins, CodePipeline, Github Actions, TeamCity, Argo, etc ==> used to build DAGs for CI/CD, tends to have shorter run time, less compute requirement, and fairly linear in dependencies.</p><p>Airflow, Dagster, Prefect, Flyte, etc ==> used to build data and/or machine learning pipelines. It tend to have longer run time, larger horizontal scaling needs, and sometimes complex dependencies. Data pipelines also sometimes have certain needs, e.g., backfilling, resume, rerun, parameterization, etc that's not common in CI/CD pipelines</p></blockquote><p></p><div>Interestingly, Luigi looks like it's dead as even its creator says on <a href="https://twitter.com/bernhardsson/status/1508428757874393094">Twitter</a>.</div><div><br /></div><div><u>Istio</u><br /><a href="https://github.com/istio/istio">Istio</a> is an open source GoLang project that transparently provides "a uniform way to integrate microservices, manage traffic flow across microservices, enforce policies and aggregate telemetry data."<br /><br /></div>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-66145212273278764562023-06-07T02:37:00.001-07:002023-12-05T01:26:04.010-08:00SBT cheat sheet<p>To see the <b>dependency tree</b>, run:<br /></p><pre class="lang-scala s-code-block" style="border-radius: var(--br-md); border: 0px; box-sizing: inherit; color: var(--highlight-color); font-stretch: inherit; font-variant-east-asian: inherit; font-variant-numeric: inherit; line-height: var(--lh-md); margin-bottom: 0px; margin-top: 0px; max-height: 600px; overflow-wrap: normal; overflow: auto; padding: var(--su12); vertical-align: baseline; width: auto;"><code class="hljs language-scala" style="border: 0px; box-sizing: inherit; font-stretch: inherit; font-style: inherit; font-variant: inherit; font-weight: inherit; line-height: inherit; margin: 0px; padding: 0px; vertical-align: baseline; white-space: inherit;"><span style="font-family: courier; font-size: x-small;">sbt compile:dependencyTree</span></code></pre><pre class="lang-scala s-code-block" style="border-radius: var(--br-md); border: 0px; box-sizing: inherit; color: var(--highlight-color); font-family: var(--ff-mono); font-size: var(--fs-body1); font-stretch: inherit; font-variant-east-asian: inherit; font-variant-numeric: inherit; line-height: var(--lh-md); margin-bottom: 0px; margin-top: 0px; max-height: 600px; overflow-wrap: normal; overflow: auto; padding: var(--su12); vertical-align: baseline; width: auto;"><code class="hljs language-scala" style="border: 0px; box-sizing: inherit; font-family: inherit; font-size: var(--fs-body1); font-stretch: inherit; font-style: inherit; font-variant: inherit; font-weight: inherit; line-height: inherit; margin: 0px; padding: 0px; vertical-align: baseline; white-space: inherit;"><br /></code></pre><pre class="lang-scala s-code-block" style="border-radius: var(--br-md); border: 0px; box-sizing: inherit; color: var(--highlight-color); font-family: var(--ff-mono); font-size: var(--fs-body1); font-stretch: inherit; font-variant-east-asian: inherit; font-variant-numeric: inherit; line-height: var(--lh-md); margin-bottom: 0px; margin-top: 0px; max-height: 600px; overflow-wrap: normal; overflow: auto; padding: var(--su12); vertical-align: baseline; width: auto;">(see the answer of 13/12/21 on <a href="https://stackoverflow.com/questions/25519926/how-to-see-dependency-tree-in-sbt">this</a> SO question)</pre><pre class="lang-scala s-code-block" style="border-radius: var(--br-md); border: 0px; box-sizing: inherit; color: var(--highlight-color); font-family: var(--ff-mono); font-size: var(--fs-body1); font-stretch: inherit; font-variant-east-asian: inherit; font-variant-numeric: inherit; line-height: var(--lh-md); margin-bottom: 0px; margin-top: 0px; max-height: 600px; overflow-wrap: normal; overflow: auto; padding: var(--su12); vertical-align: baseline; width: auto;"><br /></pre><pre class="lang-scala s-code-block" style="border-radius: var(--br-md); border: 0px; box-sizing: inherit; color: var(--highlight-color); font-family: var(--ff-mono); font-size: var(--fs-body1); font-stretch: inherit; font-variant-east-asian: inherit; font-variant-numeric: inherit; line-height: var(--lh-md); margin-bottom: 0px; margin-top: 0px; max-height: 600px; overflow-wrap: normal; overflow: auto; padding: var(--su12); vertical-align: baseline; width: auto;"><span style="font-family: Times New Roman;"><span style="white-space: normal;">You might want to tell SBT to use wider margins if it truncatest the tree. Do that with [</span></span><a href="https://stackoverflow.com/questions/37412934/how-to-make-sbt-not-truncate-its-output" style="font-family: "Times New Roman"; white-space: normal;">SO</a><span style="font-family: Times New Roman;"><span style="white-space: normal;">]:<br /><br /></span></span><pre class="lang-scala s-code-block" style="border-radius: var(--br-md); border: 0px; box-sizing: inherit; color: var(--highlight-color); font-family: var(--ff-mono); font-size: var(--fs-body1); font-stretch: inherit; font-variant-east-asian: inherit; font-variant-numeric: inherit; line-height: var(--lh-md); margin-bottom: 0px; margin-top: 0px; max-height: 600px; overflow-wrap: normal; overflow: auto; padding: var(--su12); vertical-align: baseline; width: auto;"><span style="font-family: courier; font-size: xx-small;"><span style="white-space: normal;">val defaultWidth = 40</span></span></pre><pre class="lang-scala s-code-block" style="border-radius: var(--br-md); border: 0px; box-sizing: inherit; color: var(--highlight-color); font-family: var(--ff-mono); font-size: var(--fs-body1); font-stretch: inherit; font-variant-east-asian: inherit; font-variant-numeric: inherit; line-height: var(--lh-md); margin-bottom: 0px; margin-top: 0px; max-height: 600px; overflow-wrap: normal; overflow: auto; padding: var(--su12); vertical-align: baseline; width: auto;"><span style="font-family: courier; font-size: xx-small;"><span style="white-space: normal;">val maxColumn = math.max(JLine.usingTerminal(_.getWidth), defaultWidth) - 8</span></span></pre></pre><pre class="lang-scala s-code-block" style="border-radius: var(--br-md); border: 0px; box-sizing: inherit; color: var(--highlight-color); font-family: var(--ff-mono); font-size: var(--fs-body1); font-stretch: inherit; font-variant-east-asian: inherit; font-variant-numeric: inherit; line-height: var(--lh-md); margin-bottom: 0px; margin-top: 0px; max-height: 600px; overflow-wrap: normal; overflow: auto; padding: var(--su12); vertical-align: baseline; width: auto;"><br /></pre>To coax a dependency from Scala 2 to 3, use something like this:<div><br /></div><div><span style="font-family: courier; font-size: xx-small;">
<span></span>libraryDependencies ++= List(
("io.laserdisc" %% "fs2-aws-s3" % "5.0.2").<b>withCrossVersion(CrossVersion.for3Use2_13)</b>
)</span><span color="var(--highlight-color)" face="var(--ff-mono)" style="font-size: var(--fs-body1);">
</span><div><br /></div><div>To use the classpath of a particular module, use something like this [see <a href="https://stackoverflow.com/questions/62684718/can-not-import-3rd-party-libs-into-sbt-console">SO</a>]:</div></div><div><br /></div><div><span style="font-family: courier; font-size: x-small;">sbt <b>"project core"</b> console</span></div><div><br /></div><div>In this particular case, we're opening a <b>REPL</b> with the classpath of a given module.</div><div><br /></div><div>Use versionScheme (SBT <a href="https://www.scala-sbt.org/1.x/docs/Publishing.html#Version+scheme">docs</a>) if you're writing libraries. This hints at how to handle clashing dependencies. Otherwise, you might see errors when depending on other libraries that "can be overridden using libraryDependencySchemes or evictionErrorLevel" [<a href="https://www.scala-lang.org/blog/2021/02/16/preventing-version-conflicts-with-versionscheme.html">Scala-Lang</a>]</div><div><br /></div><div>One last thing: SBT tests run in parallel <i>by default</i>. This can ruin integration tests so you might want to try <a href="https://stackoverflow.com/questions/54203972/how-to-run-test-suites-sequentially-in-scalatest-sbt">this</a> workaround [SO] to ensure your tests run serialized.</div><div><br /></div>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-53176989094470468032023-05-31T02:46:00.000-07:002023-05-31T02:46:22.338-07:00Spark Catalysts<p>I've been trying to see how Spark works under the covers. The TL;DR is that it services your queries by dynamically writing Java code on the driver that it then compiles with <a href="http://janino-compiler.github.io/janino/">Janino</a> before sending it over the wire to the executors.<br /><br />Let's take this Scala code on the Spark CLI:<br /></p><p><span style="font-family: courier; font-size: xx-small;">val ds = spark.createDataFrame(List(("a", 1), ("b", 2), ("c", 4)))<br />ds.writeTo("spark_file_test_writeTo").create()<br />spark.sqlContext.sql("update spark_file_test_writeTo set _2=42")</span></p><div>Pretty simple but what goes on deep down is complex. First, Spark uses an <a href="https://www.antlr.org/">Antlr</a> lexer and a parser (a lexer tokenizes; a parser builds an <a href="https://en.wikipedia.org/wiki/Abstract_syntax_tree">AST</a>) to turn that ugly SQL statement into a tree of Scala <span style="font-family: courier; font-size: x-small;">case class</span>es. Then it creates the Java code in <span style="font-family: courier; font-size: x-small;">WholeStageCodegenExec</span> (<a href="https://github.com/apache/spark/blob/400655957812c34b45b923a27eac05455c1c0828/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L663">source</a>). In this Java, you'll see a subclass of <span style="font-family: courier; font-size: x-small;"><a href="https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java">BufferedRowIterator</a></span> that looks something like:</div><div><br /><div><span style="font-family: courier; font-size: xx-small;">columnartorow_mutableStateArray_3[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(4, 64);</span></div></div><div><span style="font-family: courier; font-size: xx-small;">...<br /></span><div><span style="font-family: courier; font-size: xx-small;">columnartorow_mutableStateArray_3[1].<b>write(1, 42)</b>;</span></div></div><div><br /></div><div>in a method called <span style="font-family: courier; font-size: x-small;">processNext</span>. That <span style="font-family: courier; font-size: x-small;">42</span> is Spark setting our value for a row. If we added a <span style="font-family: courier; font-size: x-small;">where</span> clause in our SQL, you'd see the generated code branching. That is, the generated code can access all the other fields in a row. </div><div><br /></div><div>If you <a href="https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-SparkPlan-WholeStageCodegenExec.html">import an implicit</a>, you can run <span style="font-family: courier; font-size: x-small;">debugCodeGen()</span> on the CLI to see the code more easily. </div><div><br /></div><div><br /></div>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0tag:blogger.com,1999:blog-372456867628019773.post-53913996491831868922023-05-26T03:30:00.004-07:002023-06-09T01:44:26.893-07:00Spark and Iceberg<p>Here are some notes I took when playing with Apache <a href="https://iceberg.apache.org/">Iceberg</a> plugged into Spark. (Update: Iceberg was already supported by <a href="https://cloud.google.com/blog/products/data-analytics/announcing-apache-iceberg-support-for-biglake">Google</a> but is now supported by AWS's Athena for Apache Spark - see <a href="https://aws.amazon.com/about-aws/whats-new/2023/06/amazon-athena-apache-spark-hudi-iceberg-delta-lake/">here</a>).</p><p>I'm running Spark 3.3 and Iceberg 1.2.1 with:<br /></p><p><span style="font-family: courier; font-size: xx-small;">./spark-shell --packages org.apache.iceberg:<b>iceberg-spark-runtime-3.3</b>_2.13:1.2.1\<br /> --conf spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005\<br /> --conf <b>spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions</b> <b>\</b><br /> --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \<br /> --conf spark.sql.catalog.spark_catalog.type=hive \<br /> --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \<br /> --conf spark.sql.catalog.local.type=hadoop \<br /> --conf <b>spark.sql.catalog.local.warehouse=/tmp/warehouse</b> \<br /> --conf spark.sql.defaultCatalog=local</span></p><p>This does a few things.</p><p>First, notice that the version of <span style="font-family: courier; font-size: x-small;">iceberg-spark-runtime</span> must be the same major/minor version of the Spark you're running against. The library will automatically be downloaded by Spark if you don't already have it.</p><p>Data is saved in the directory defined by <span style="font-family: courier; font-size: x-small;">spark.sql.catalog.local.warehouse</span>. Inside will be 2 directories, <span style="font-family: courier; font-size: x-small;">data</span> and <span style="font-family: courier; font-size: x-small;">metadata</span>.</p><p><u>Spark code</u></p><p>This took me a few hours to get a good feel for since there are lots of <a href="https://en.wikipedia.org/wiki/Thunk">thunks</a> passed around and functionality only executed when <span style="font-family: courier; font-size: x-small;">lazy</span> values are touched. This is just the gist of what's going on:</p><p>Code wise, the interesting functionality appears to be in <span style="font-family: courier; font-size: x-small;">RuleExecutor.execute</span> as each <span style="font-family: courier; font-size: x-small;">Rule[_]</span> is executed via <span style="font-family: courier; font-size: x-small;">apply</span>. The interesting code in the Iceberg Spark extension is <span style="font-family: courier; font-size: x-small;">apply</span>d here as they all extend <span style="font-family: courier; font-size: x-small;">Rule[LogicalPlan]</span>.</p><p><span style="font-family: courier; font-size: small;">LogicalPlan</span> is a type of <span style="font-family: courier; font-size: x-small;">QueryPlan.</span> <span style="font-family: courier; font-size: small;">QueryPlan</span> can be either logical or physical, the latter being an extension of <span style="font-family: courier; font-size: x-small;">SparkPlan</span>. You can spot these leafs in an AST quite easily as the naming convention say they end with <span style="font-family: courier; font-size: x-small;">Exec</span>. And it appears to be <span style="font-family: courier; font-size: x-small;">V2TableWriteExec</span> where the driver hands control over to the executors.</p><p>The tree of <span style="font-family: courier; font-size: x-small;">LogicalPlans</span> are traversed in <span style="font-family: courier; font-size: x-small;">CheckAnalysis.checkAnalysis</span>. But it's the instantiation of a <span style="font-family: courier; font-size: x-small;">Dataset</span> that where its <span style="font-family: courier; font-size: x-small;">logicalPlan</span> references the lazy <span style="font-family: courier; font-size: x-small;">QueryExecution.commandExecuted</span> causing it to invoke <span style="font-family: courier; font-size: x-small;">eagerlyExecuteCommands(analyzed)</span>. </p><p>A notable sub-type of <span style="font-family: courier; font-size: x-small;">LogicalPlan</span> is <span style="font-family: courier; font-size: x-small;">Command</span>. This represents "a non-query command to be executed by the system".</p><p>Since Spark 3, an interface is available that is "responsible for creating and initializing the actual data writer at executor side. Note that, the writer factory will be serialized and sent to executors, then the data writer will be created on executors and do the actual writing." [<span style="font-family: courier; font-size: small;">DataWriterFactory</span> <a href="https://spark.apache.org/docs/3.3.1/api/java/org/apache/spark/sql/connector/write/DataWriterFactory.html">docs</a>]</p><p>Finally, making sense of the AST was problematic when the flow disappeared into <span style="font-family: courier; font-size: x-small;">WholeStageCodeGen</span> as Spark then rolls the tree up and <a href="https://javaagile.blogspot.com/2018/12/more-spark-query-plans.html">converts</a> it to JVM bytecode.</p><p><u>Iceberg code</u></p><p>All the Iceberg initialization happens in <span style="font-family: courier; font-size: x-small;"><a href="https://github.com/apache/iceberg/blob/master/spark/v3.2/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala">IcebergSparkSessionExtensions</a></span>. This subverts Spark's usual functionality and injects Iceberg specific functionality. One of the things it can inject is an <span style="font-family: courier; font-size: x-small;">IcebergSparkSqlExtensionsParser</span> that visits the AST tree as it parses a SQL String to create a <span style="font-family: courier; font-size: small;">LogicalPlan</span>. </p><p>Iceberg also provides its own implementation of <span style="font-family: courier; font-size: x-small;">DataWriterFactory</span> so it can use its own <span style="font-family: courier; font-size: x-small;">Table</span> implementation under the Spark covers that allows (for instance) its own configuration for <span style="font-family: courier; font-size: x-small;">TableScan</span>s.</p><p>It's Iceberg's <span style="font-family: courier; font-size: x-small;">SnapshotProducer.commit()</span> that's been injected into the Spark machinery that creates the manifest files.</p><p><br /></p>Phillip Henryhttp://www.blogger.com/profile/08460042407514133056noreply@blogger.com0