Saturday, March 28, 2020

Unit testing in Functional Programming (part 2)


ZIO Test

This is quite beautiful. A few years ago, I blogged about a problem I had that some code was making an assertion on a bifunctor but the assertion was on the happy path. However, if the code returned an effect that represented the unhappy path, the assertion was not run and the test erroneously passed.

One could blame a remiss developer but, Dr Matthew Pocock made the point that "if key information about how to use an API is not enforced by that API, then IMHO that API is broken". I'm inclined to agree.

Fast forward five years, and now I have zio-test that will stop such code from ever being committed (see the full code here).

  override def spec: ZSpec[TestEnvironment, Any] = suite("ZIO test")(
    testM("fails on unhappy path even with no assertions"){
      val result:   Option[String]            = None
      val asserted: Option[TestResult]        = result.map(x => assert(x)(equalTo("this is naive as it assumes happy path")))
      val myZio:    zio.IO[Unit, TestResult]  = ZIO.fromOption(asserted)
      myZio
    }
  )

Cleverly, this testing framework fails with:

    Fiber failed.
    A checked error was not handled.
    ()
    
    Fiber:Id(1585237038923,15) was supposed to continue to:
      a future continuation at zio.test.package$ZTest$.apply(package.scala:104)
    
    Fiber:Id(1585237038923,15) execution trace:
      at zio.ZIO$.fromOption(ZIO.scala:2682)
      at uk.co.odinconsultants.fp.zio.ZioSpec$.spec(ZioSpec.scala:24)
      at zio.ZIO$.effectSuspendTotal(ZIO.scala:2260)

No more ScalaTest for me...

The principle is simply that if a None is passed to ZIO.fromOption, then the generated ZIO represents "the moral equivalent of `throw` for pure code" (from the ZIO ScalaDocs). This seems a little opionated but it's a stated quality of its creator ("I do think ZIO core should provide one opinionated way to do things to make scaling in larger teams easier" says John de Goes on GitHub)

There's an awful lot of fibre fun going on as well (the tests run on their own fibres). And it's useful to remember that in typical FP style, your code does not run in the testM block. It merely describes what is to be tested by setting up a testing data structure that actually runs at the end-of-the-world.

Cats Effects

Cats effects has its own testing framework. You can use it with:

    implicit val testContext: TestContext       = TestContext()
    implicit val cs:          ContextShift[IO]  = testContext.contextShift(IO.ioEffect)
    implicit val timer:       Timer[IO]         = testContext.timer(IO.ioEffect)

then fast forward time with:

testContext.tick(60 seconds)

It seems that it is usual to call IO.unsafeToFuture and then make the assertion on the subsequent Future.

However, there is a limit to TestContext if you're testing concurrency in the effect engine itself.
"You can test some things with TestContext, which is deterministic, but not everything. You can't test things like 'make interruption happen exactly at this flatMap'. So I use a mix of TestContext, which is deterministic, with tests that sleep appropriately to simulate different scenarios and sometimes tests that run on real concurrency, but run multiple times (ab)using scalacheck properties. I think the work in CE3 that provides a completely deterministic implementation of Concurrent offers some promise in that direction though." - Fabio Labella, Gitter, April 18 2010
Further examples include:

Check if the code starts on a proper thread pool
Check if elements are sent downstream in the original order
Check if effect is synchronous (for async you'd need tick() )

(Credit to Piotr Gawryƛ for cataloguing them).

Counting in FS2

Counting is a pretty typical thing to do in a unit test. What's the best way?
PhillHenry 
I have some code that creates a stream and now I want to test it. I'd like my 'mock' function passed to its .mapAsync to count how many times it is called (the production function will not do this). I could use a java.util.concurrent.atomic.AtomicInteger in my test class but is there are more elegant way?
Fabio Labella @SystemFw
@PhillHenry you can use Ref for that
principle is the same though: 
Stream.eval(Ref[IO].of(0)).flatMap { state =>   val f: A => IO[B] = state.update(_ + 1) >> yourThing   yourStream  } 
I'd recommend to watch my talk on shared state, it is likely the reason the Ref was not updating is that you are not yet familiar with the share-through-flatMap concept and you are creating two refs by mistake
Refs seem to be the accepted way of unit testing [SO].

A full example of can be found on my GitHub where I use a SignallingRef to count enqueues and deques.

Does Tagless Final help?

Tagless final is often touted as a good way to write testable code but:
"Ultimately, the testability of tagless-final programs requires they code to an interface, not an implementation. Yet, if applications follow this principle, they can be tested even without tagless-final!...Using tagless-final doesn’t provide any inherent benefits to testability. The testability of your application is completely orthogonal to its use of tagless-final, and comes down to whether or not you follow best practices—which you can do with or without tagless-final."
Although De Goes appreciates Tagless Final he has 5 excellent criticisms here [DZone]

Although Gavin Bisesi (Gitter, Mar 31 18:11) lists the main selling points of Tagless Final are such fine things as:
"If I have F=IO now, but later I have some ApmTracingT[F, *] effect that gives me span/duration tracking, my implementation doesn't change at all, just I just pass a different F when I construct it" and "actual interop (cats-effect vs zio vs monix? All work)"
he also says the obvious benefit to testing is:
"if a test wants a no-op version of Store, then F can be Id[_], and your test now doesn't contain IO concurrency."


Wednesday, March 25, 2020

Spark on ACID


Spark can create the illusion of ACID transactions with an open source library from Databricks called Delta Lake [GitHub]. Just bundle the package with your code and use format("delta") on your reads and writes and you're good to go. Data is stored as Parquet as usual but now we also have associated metadata stored in JSON files. These files indicate what state the data was in at each incremental step.

Do note, however, for clients of this data to take advantage of these ACID properties, they too need to be 'delta aware'.

What's happening?

The idea behind Delta Lake is very simple. Along with the Parquet, we record which files are relevent in the JSON file. These files are immutable but we may add more JSON files with each change.

Note that even if the directory has nested folders (say, from a partitionBy) all JSON files still seems to live at the top-level.

Working example

The code for this example lives in my GitHub repository and can be run out-of-the-box on a single JVM. It saves the files to MiniHDFS, that is, an in-memory Hadoop emulator.

In the example we:

  1. save the initial data
  2. append this data with some more data 
  3. save some data this time overwriting the first
  4. update the data
  5. vacuum

Save the initial data

After persisting the first batch to HDFS in the delta format and SaveMode.Append, we see in the directory of the parquet:

hdfs://127.0.0.1:36089/DeltaPersistSpec/_delta_log/00000000000000000000.json
hdfs://127.0.0.1:36089/DeltaPersistSpec/part-00000-5a70a0c7-2421-4fda-bf1e-fe584576eac5-c000.snappy.parquet
...

With the judicious help of the Linux CLI tool, jq [SO], the JSON file looks like this:

{
  "commitInfo": {
    "timestamp": 1585046814345,
    "operation": "WRITE",
    "operationParameters": {
      "mode": "Append",
      "partitionBy": "|[|]"
    },
    "isBlindAppend": true
  }
}
{
  "protocol": {
    "minReaderVersion": 1,
    "minWriterVersion": 2
  }
}

Append the data

After writing a second batch again with SaveMode.Append, we see:

hdfs://127.0.0.1:36089/DeltaPersistSpec/_delta_log/00000000000000000000.json
hdfs://127.0.0.1:36089/DeltaPersistSpec/_delta_log/00000000000000000001.json
hdfs://127.0.0.1:36089/DeltaPersistSpec/part-00000-5a70a0c7-2421-4fda-bf1e-fe584576eac5-c000.snappy.parquet
...

The first JSON file is as before but the second looks like:

{
  "commitInfo": {
    "timestamp": 1585046817232,
    "operation": "WRITE",
    "operationParameters": {
      "mode": "Append",
      "partitionBy": "|[|]"
    },
    "readVersion": 0,
    "isBlindAppend": true

  }
...
}
{
  "add": {
    "path": "part-00000-fe3fba1f-5167-49a3-9a13-25d07ebd7fb6-c000.snappy.parquet",
    "partitionValues": {},
    "size": 350,
    "modificationTime": 1585046817219,
    "dataChange": true
  }
}
{
  "add": {
    "path": "part-00001-f1a3eb5f-d392-4f7d-bf16-8aac55517c21-c000.snappy.parquet",
...
}

Overwrite the first batches

Next we save some more but this time with SaveMode.Overwrite. Now we see a third JSON file (along with the first two and it looks like this:

{
  "commitInfo": {
    "timestamp": 1585046819611,
    "operation": "WRITE",
    "operationParameters": {
      "mode": "Overwrite",
      "partitionBy": "|[|]"
    },
    "readVersion": 1,
    "isBlindAppend": false
  }
}
{
  "add": {
    "path": "part-00001-79f7a25d-c87a-4c5b-983e-1696e9f4b165-c000.snappy.parquet",
...
{
  "remove": {
    "path": "part-00000-fe3fba1f-5167-49a3-9a13-25d07ebd7fb6-c000.snappy.parquet",
    "deletionTimestamp": 1585046819610,
    "dataChange": true
  }
}
...

As you can see, we see our original files being marked as removed. However they still stay where they are.

Update the data

Now, we treat the file like a SQL table and update it with something like:

import io.delta.tables._

      val deltaTable  = DeltaTable.forPath(filename)
      deltaTable.update(col("value") === 1, Map("word" -> lit("wotcha")))

and a fourth JSON file appears that look like this:

{
  "commitInfo": {
    "timestamp": 1585046822731,
    "operation": "UPDATE",
    "operationParameters": {
      "predicate": "(value#1417=1)"
    },
    "readVersion": 2,
    "isBlindAppend": false
  }
}
{
  "remove": {
    "path": "part-00008-0b439cc3-a2bc-4d4e-b4a8-4b2a719b4edf-c000.snappy.parquet",
    "deletionTimestamp": 1585046822195,
    "dataChange": true
  }
}
{
  "remove": {
...
{
  "add": {
...

Vacuum

Vacuuming does to the Parquet files what a DB vacuum does to a database (that is, garbage collect).

No new JSON file is created for this step. The only change is that Parquet files disappear. In this example in my GitHub repository, the number of parquet files is reduced from 39 to 10.

Only the Parquet files that are not referenced in the later JSON metadata files have disappeared. So, any transactions that are using the old metadata are now in trouble. So, this may impact long running transactions.

Conclusion

Delta Lake is definitely an exciting step in the right direction but apparently it can be slow [Medium]. These version numbers and dates in the JSON metadata allow you to see the how the data looked at particulat points in time [databricks.com].


Friday, March 13, 2020

What sorcery is this?


This library very clever. Imagine you want to listen on a TCP port. You represent that port number with an Int. But what if somebody misconfigures the value and you only realise the mistake at runtime? Wouldn't it be nice to catch it compile time? Now you can!

import eu.timepit.refined._
import eu.timepit.refined.api.Refined
import eu.timepit.refined.numeric._

  type Port = Int Refined Interval.ClosedOpen[W.`1`.T, W.`65535`.T]

Is all we need to have compile time checking that an Int representing a port is in a sensible range. For instance, if we had:

  def listenOn(port: Port) = ??? 

then this works:

    listenOn(1: Port)

but these fail to compile:

    listenOn(0: Port)
    listenOn(65536: Port)

exactly as we expect and desire.

But it gets better. A common error in neural nets judging by the forums is not getting the size of matrices correct and only finding out at runtime after they have been running for a while. Wouldn't it be good to know at compile time? Now we can (code taken from this GitHub repository):


type ExactInt = Int Refined Equal[A] forSome { type A <: Int }

case class Matrix2[ROWS ExactInt, COLS  ExactInt](nRows: ROWS, nCols: COLS) {

  val REQUIRED_COLS =  Witness.mkWitness(nCols)

  def multiply[T ExactInt](x: Matrix2[REQUIRED_COLS.T, T]): Matrix2[ROWS, T] = Matrix2(nRows, x.nCols)

}


Note we don't actually do the multiplication - this is just a PoC and we're just interested in types.

Also note that this forSome is nicely explained by Rob Norris on his blog here where he deals with F-Bound Types (where an "F-bounded type is parameterized over its own subtypes") and uses forSome to enforce that.

Now we can create some matrices:

    val _3x7: Matrix2[Exactly[W.`3`.T], Exactly[W.`7`.T]]   = Matrix2(3: Exactly[W.`3`.T], 7: Exactly[W.`7`.T])
    val _7x8: Matrix2[Exactly[W.`7`.T], Exactly[W.`8`.T]]   = Matrix2(7: Exactly[W.`7`.T], 8: Exactly[W.`8`.T])
    val _8x7: Matrix2[Exactly[W.`8`.T], Exactly[W.`7`.T]]   = Matrix2(8: Exactly[W.`8`.T], 7: Exactly[W.`7`.T])

and this compiles:

    _3x7.multiply(_7x8)

(which is good because a 3x7 matrix can be multiplied by a 7x8 matrix) but this doesn't:

    _3x7.multiply(_8x7) // doesn't compile as expected

which makes sense as a 3x7 matrix cannot be multiplied by an 8x7 matrix. 

To do

I'm still constructing the matrix with both a type and a value that corresponds to that type. I've not yet worked out how I can just use the type and derive the value inside the matrix. If I work it out, I'll write another blog post.


Why FP?


In my last position, I was surprised by the number of programmers who were writing Java in the Scala language. That is, different language, same style and idioms. This to me misses the point of Scala. It is not just a better Java. In the realm of distributed computing, it's essential to fully embrace its functional programming aspects.

I gave a talk that was well-received by my colleagues and have since moved in. Unfortunately, the new shop is like the old where the developers are writing Java in Scala. So, these are the notes for my presentation that I may soon have to dust off and present again.

Intro

  • there are FP design patterns just like GoF in OO
  • Haskell and Scala programmers can talk in FP patterns just as fluently as Java and C++ programmers can talk in OO patterns
  • ground work for FP dates back to 1920s-1950s mathematics.
  • I’m assuming you all already know basic FP points (eg, immutability, don’t throw exceptions etc)
  • Disclaimer: I’m not the best FP programmer in the world and although I love Cats, am far from an expert.

Motivation

  • Make programming more mathematical using math proofs. This way, logic errors can be caught by the compiler rather than at runtime.
  • A common maths language allows us to more easily reason about the code (see Semigroups below).
  • Since everybody is singing to the same hymn sheet, code had a lot less boilerplate and is easier to reason about.

For-comprehensions

  • Advantage: calling and called code are decoupled regarding the ‘container’ type.
  • Advantage: which monad doesn’t matter (eg, Try, Either, Option, List etc).
  • Terminates early
  • The obvious structure for sequential operation.
  • Sequence: that is M[T[_]] => T[M[_]] can take a list of results and produce a result.

Which brings me to:

Semigroups

  • Useful in aggregation.
  • They’re associative operations on a type.
  • Question: is f(x1, f(x2, f(x3, x4))) associative for adding integers? Eg, is it the same as f(x1, f(f(x2, x3), x4)))?
  • Is it associative for subtracting integers?
  • It allows partitioning
  • If it’s also commutative, you can multi-thread the operations!

Applications to big data

  • Associativity allows partitioning
  • Commutativity allows maximum parralelism
  • Addition of real numbers is associative and commutative
  • Concatenating strings is associative but it’s not commutative
  • Substring real numbers is neither associative nor commutative

Disadvantages

  • If you use Cats, there are some odd imports and dependencies you need to get used to (not too hard)
  • You need to learn a new ‘pattern language’ but since it’s been about since the mid 20th Century, it’s not going to change overnight.
Conclusion

The following was not in my presentation but is just a great description of why we should prefer an FP approach:
"Consider a service which must read some data from shared state in order to proceed. The shared state is stored on other servers (e.g. Redis, Postgres, some other service, etc). State must be read from multiple of these sources in order to formulate a response. These services have varying latencies and availabilities, and you need to be failover each one individually to alternate primaries.

"Each read requires an asynchronous connect/read/close resource management process, its own retry and independent exponential backoff cycle, its own failover, etc. All of them must be run in parallel such that if any of them fails, they're all aborted as quickly as possible. Meanwhile, the client connection itself might get aborted, which should clean up all associated resources. In the event of any errors, or a success, the response must be returned to the client. All asynchronously.

"Doing the above without functional effects is astonishingly complex even with absolute best-in-class tools. Your best bet outside of IO is to use something like Future with very careful and prescribed use of cooperative explicit cancelation checks and extremely well thought-out refactoring into defs. It would be possible but brittle.

"Now imagine taking the above and adding an extra data source which needs to be aggregated to form a response. With FP this is trivial: you just write it independently and then add it to the parTraverse. With any other approach, even Futures, this is not really all that straightforward.

"To me, the above unequivocally demonstrates the value of functional programming in modern scalable service architectures" - Daniel Spiewak


Tuesday, March 3, 2020

Azure, Docker and K8s


I'm trying to get Spark to work on Azure in a Kubernetes container. With Azure, you provision some boxes that have K8s running on them but it's the provisioned boxes you pay for irrespective of the containers running (or not) in Kubernetes.

This gives us greater control over what is deployed and when. However, talking to Azure storage has not been easy.

Talking to Azure Storage with Java drivers

First, I want to use the latest image of Spark. At the time of writing, this is v3.0.0-preview2 but I couldn't find a Docker image for it so I built my own.

$ git checkout v3.0.0-preview2
$ mvn clean install -DskipTests

I then build a Docker image using Spark's ./bin/docker-image-tool.sh and publish it to my Docker hub account. With some K8s config that looks a little like this, I can start up a Spark cluster. In the Spark REPL, I run some fairly hacky code to give me access to Azure:

val accountName = ???
val accountKey  = ???
val container   = ???
val confKey     = s"fs.azure.account.key.$accountName"
val confKey2    = s"$confKey.blob.core.windows.net"
val confKey3    = s"$confKey.dfs.core.windows.net"
val confKey4    = s"$confKey.file.core.windows.net"
val sas         = ???

spark.conf.set( s"fs.azure.sas.$container.$accountName.dfs.core.windows.net", sas)
spark.conf.set( s"fs.azure.sas.$container.$accountName.file.core.windows.net", sas)
sc.hadoopConfiguration.set( s"fs.azure.sas.$container.$accountName.dfs.core.windows.net", sas)
sc.hadoopConfiguration.set( s"fs.azure.sas.$container.$accountName.file.core.windows.net", sas)

spark.conf.set( confKey,  accountKey)
spark.conf.set( confKey2, accountKey)
spark.conf.set( confKey3, accountKey)
spark.conf.set( confKey4, accountKey)

val clazz = "org.apache.hadoop.fs.azure.NativeAzureFileSystem"
sc.hadoopConfiguration.set("fs.abfs.impl",  clazz)
sc.hadoopConfiguration.set("fs.abfss.impl", clazz)
sc.hadoopConfiguration.set("fs.azure",      clazz)
sc.hadoopConfiguration.set("fs.wasbs.impl", clazz)
sc.hadoopConfiguration.set("fs.wasb.impl",  clazz)
sc.hadoopConfiguration.set(confKey,         accountKey)
sc.hadoopConfiguration.set(confKey2,        accountKey)
sc.hadoopConfiguration.set(confKey3,        accountKey)
sc.hadoopConfiguration.set(confKey4,        accountKey)

You can get the credentials you need by running:

$ az storage account keys list -n ACCOUNT_NAME -g GROUP --subscription YOUR_AZURE_SUBSCRIPTION

and the SAS (Shared Access Signature) from the Azure web console (you may need to allow HTTP). Without the SAS, you may get what is probably the most uninformative error message I have ever seen:

Value for one of the query parameters specified in the request URI is invalid.

Which parameter and why is it invalid? This originates on the server side so there is no chance to debug it.

Anyway, when I tried to read from an Azure File System using something like this:

val df = spark.read.parquet(s"abfs://$container@$accountName.dfs.core.windows.net/MY_PARQUET_FILE")

I saw a stack trace puked with:

... Caused by: com.microsoft.azure.storage.StorageException: The specified Rest Version is Unsupported.

Sniffing the network traffic with:

$ tcpdump -A -nn host MY_BOX_IP and MICROSOFTS_API_IP_ADDRESS -i eth0

Was showing:

14:45:32.445119 IP MY_BOX_IP.59274 > MICROSOFTS_API_IP_ADDRESS.80: Flags [P.], seq 1:603, ack 1, win 502, options [nop,nop,TS val 1401020617 ecr 1846141065], length 602: HTTP: HEAD ...
...
Accept: application/xml
Accept-Charset: UTF-8
Content-Type:
x-ms-version: 2014-02-14
User-Agent: Azure-Storage/2.0.0 (JavaJRE 1.8.0_242; Linux 4.15.0-1066-azure)
x-ms-client-request-id: dfd4qall-6657-45f7-9ed5-00e455e95bee
Host: MY_BOX.dfs.core.windows.net
Connection: keep-alive

This is an ancient version. Spark by default depends on Hadoop 2.7.4 which pulls in azure-storage 2.0.0 (see hadoop/hadoop-project/pom.xml), a very old version.

OK, so let's rebuild Spark with this:

mvn clean install -DskipTests -Phadoop-3.2

The hadoop-3.2 profile gives me a dependency on, you guessed it, a later version of Hadoop that provides the transitive dependency of azure-storage 7.0.0 that dates to February 2018.

I push this to my Docker Hub account with:

$ ./bin/docker-image-tool.sh  -r docker.io/ph1ll1phenry -t spark3.0.0-preview2_hadoop3.2.0 build
$ docker images | grep spark3.0.0-preview2_hadoop3.2.0
ph1ll1phenry/spark                         spark3.0.0-preview2_hadoop3.2.0   931173a555b6        About a minute ago   545MB
$ docker tag 931173a555b6 ph1ll1phenry/spark3.0.0-preview2_hadoop3.2.0
$ docker push ph1ll1phenry/spark3.0.0-preview2_hadoop3.2.0

don't forget to tag it (SO) and deploy my cluster on Kubernetes as before.

But this too appears to be an old client as running against Azure Storage results in the same exception despite tcpdump now showing the relevant HTTP header as:

x-ms-version: 2017-07-29

Some crumb of comfort is that I can read the file if I use a slightly different URL.

val df = spark.read.parquet(s"wasb://landing@$accountName.blob.core.windows.net/MY_PARQUET")

However, any writing results in:

com.microsoft.azure.storage.StorageException: This operation is not permitted on a non-empty directory.

even when the directory is not empty at all.

Another soupcon of good news is that at least I can read and write from and to Azure Blob Containers:

val df = spark.read.text(s"wasbs://$container@$accountName.blob.core.windows.net/MY_PARQUET")

This works.

What's the version number, Kenneth?

In desperation, I forced Spark to use a more recent version by running Spark with:

--packages org.apache.hadoop:hadoop-azure:3.2.0,com.microsoft.azure:azure-storage:8.6.0

And sure enough, tcpdump shows:

x-ms-version: 2019-02-02

This seems to pass the x-ms-version check but then results in:

Incorrect Blob type, please use the correct Blob type to access a blob on the server. Expected BLOCK_BLOB, actual UNSPECIFIED.

which (I'm guessing) is due to incompatibility due to azure-storage not being the version hadoop-azure is expecting.

I've left a message on the Hadoop users' mailing list asking for help as I still want to talk to the Azure File System rather than a Blob Container.

Hacky workaround

To get a Linux Azure instance that can mount Gen 2 storage, check out this GitHub repository.

I changed the kubernetes-volume-drivers/flexvolume/blobfuse/deployment/Dockerfile so:

FROM ubuntu:18.04
...
RUN apt update && apt install -y openjdk-8-jdk

giving me a newer OS and installing Java. Then, I deploy the OS image to Docker Hub:

$ cd kubernetes-volume-drivers/flexvolume/blobfuse/deployment
$ docker build  -t blobfuse-jdk8 blobfuse-flexvol-installer/

Now let's build and deploy it: 

$ docker images | grep blobfuse-jdk8
$ docker tag IMAGE_TAG ph1ll1phenry/blobfuse-openjdk-8-jdk-slim
$ docker push ph1ll1phenry/blobfuse-openjdk-8-jdk-slim

and having a look at Docker Hub, I can see my OS image.

Now, we need to get Spark to use this OS, so I slightly bastardise its Dockerfile:

$ git diff
...
-FROM openjdk:8-jdk-slim
+FROM ph1ll1phenry/blobfuse-openjdk-8-jdk-slim

We build it, docker push it and kubernetes apply the a slightly different yaml file and now Spark has a mount onto the Azure File System at /mnt/data.

Note that you will have had to apply the secret in K8s that looks something like this:

apiVersion: apps/v1
kind: Secret
metadata:
  name: storage-secret
  namespace: blogdemodeployments
type: Opaque
data:
  azurestorageaccountname: ???
  azurestorageaccountkey:  ???

where the credentials are as we used for Spark's REPL.

The Solution

This came from our friendly Azure sysadmin. Basically, it's to use OAuth, so:

spark.conf.set("fs.azure.account.auth.type",                          "OAuth")
spark.conf.set("fs.azure.account.oauth2.client.secret",               SECRET)
spark.conf.set("fs.azure.account.oauth2.client.id" ,                  APP_ID)
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/" + TENANT + "/oauth2/token")
spark.conf.set("fs.azure.account.auth.type." + accountName + ".dfs.core.windows.net", "SharedKey")
spark.conf.set("fs.azure.account.key."       + accountName + ".dfs.core.windows.net", accountKey)

where 

  • The tenant is the ID of our active directory in Azure
  • The app id (also known as client id) is the ID of the service principal
  • The secret is something you create under the service principal which you use to authenticate (i.e. a password)
Et voila. You can now use Spark 3 to talk to Azure file systems to both read and write.