I'm trying to get Spark to work on Azure in a Kubernetes container. With Azure, you provision some boxes that have K8s running on them but it's the provisioned boxes you pay for irrespective of the containers running (or not) in Kubernetes.
This gives us greater control over what is deployed and when. However, talking to Azure storage has not been easy.
Talking to Azure Storage with Java drivers
First, I want to use the latest image of Spark. At the time of writing, this is v3.0.0-preview2 but I couldn't find a Docker image for it so I built my own.
$ git checkout v3.0.0-preview2
$ mvn clean install -DskipTests
I then build a Docker image using Spark's ./bin/docker-image-tool.sh and publish it to my Docker hub account. With some K8s config that looks a little like this, I can start up a Spark cluster. In the Spark REPL, I run some fairly hacky code to give me access to Azure:
val accountName = ???
val accountKey = ???
val container = ???
val confKey = s"fs.azure.account.key.$accountName"
val confKey2 = s"$confKey.blob.core.windows.net"
val confKey3 = s"$confKey.dfs.core.windows.net"
val confKey4 = s"$confKey.file.core.windows.net"
val sas = ???
spark.conf.set( s"fs.azure.sas.$container.$accountName.dfs.core.windows.net", sas)
spark.conf.set( s"fs.azure.sas.$container.$accountName.file.core.windows.net", sas)
sc.hadoopConfiguration.set( s"fs.azure.sas.$container.$accountName.dfs.core.windows.net", sas)
sc.hadoopConfiguration.set( s"fs.azure.sas.$container.$accountName.file.core.windows.net", sas)
spark.conf.set( confKey, accountKey)
spark.conf.set( confKey2, accountKey)
spark.conf.set( confKey3, accountKey)
spark.conf.set( confKey4, accountKey)
val clazz = "org.apache.hadoop.fs.azure.NativeAzureFileSystem"
sc.hadoopConfiguration.set("fs.abfs.impl", clazz)
sc.hadoopConfiguration.set("fs.abfss.impl", clazz)
sc.hadoopConfiguration.set("fs.azure", clazz)
sc.hadoopConfiguration.set("fs.wasbs.impl", clazz)
sc.hadoopConfiguration.set("fs.wasb.impl", clazz)
sc.hadoopConfiguration.set(confKey, accountKey)
sc.hadoopConfiguration.set(confKey2, accountKey)
sc.hadoopConfiguration.set(confKey3, accountKey)
sc.hadoopConfiguration.set(confKey4, accountKey)
You can get the credentials you need by running:
$ az storage account keys list -n ACCOUNT_NAME -g GROUP --subscription YOUR_AZURE_SUBSCRIPTION
and the SAS (Shared Access Signature) from the Azure web console (you may need to allow HTTP). Without the SAS, you may get what is probably the most uninformative error message I have ever seen:
Value for one of the query parameters specified in the request URI is invalid.
Which parameter and why is it invalid? This originates on the server side so there is no chance to debug it.
Anyway, when I tried to read from an Azure File System using something like this:
I saw a stack trace puked with:
... Caused by: com.microsoft.azure.storage.StorageException: The specified Rest Version is Unsupported.
Sniffing the network traffic with:
$ tcpdump -A -nn host MY_BOX_IP and MICROSOFTS_API_IP_ADDRESS -i eth0
Was showing:
14:45:32.445119 IP MY_BOX_IP.59274 > MICROSOFTS_API_IP_ADDRESS.80: Flags [P.], seq 1:603, ack 1, win 502, options [nop,nop,TS val 1401020617 ecr 1846141065], length 602: HTTP: HEAD ...
...
Accept: application/xml
Accept-Charset: UTF-8
Content-Type:
x-ms-version: 2014-02-14
User-Agent: Azure-Storage/2.0.0 (JavaJRE 1.8.0_242; Linux 4.15.0-1066-azure)
x-ms-client-request-id: dfd4qall-6657-45f7-9ed5-00e455e95bee
Host: MY_BOX.dfs.core.windows.net
Connection: keep-alive
This is an ancient version. Spark by default depends on Hadoop 2.7.4 which pulls in azure-storage 2.0.0 (see hadoop/hadoop-project/pom.xml), a very old version.
OK, so let's rebuild Spark with this:
mvn clean install -DskipTests -Phadoop-3.2
The hadoop-3.2 profile gives me a dependency on, you guessed it, a later version of Hadoop that provides the transitive dependency of azure-storage 7.0.0 that dates to February 2018.
I push this to my Docker Hub account with:
$ ./bin/docker-image-tool.sh -r docker.io/ph1ll1phenry -t spark3.0.0-preview2_hadoop3.2.0 build
$ docker images | grep spark3.0.0-preview2_hadoop3.2.0
ph1ll1phenry/spark spark3.0.0-preview2_hadoop3.2.0 931173a555b6 About a minute ago 545MB
$ docker tag 931173a555b6 ph1ll1phenry/spark3.0.0-preview2_hadoop3.2.0
$ docker push ph1ll1phenry/spark3.0.0-preview2_hadoop3.2.0
don't forget to tag it (SO) and deploy my cluster on Kubernetes as before.
But this too appears to be an old client as running against Azure Storage results in the same exception despite tcpdump now showing the relevant HTTP header as:
x-ms-version: 2017-07-29
Some crumb of comfort is that I can read the file if I use a slightly different URL.
val df = spark.read.parquet(s"wasb://landing@$accountName.blob.core.windows.net/MY_PARQUET")
However, any writing results in:
com.microsoft.azure.storage.StorageException: This operation is not permitted on a non-empty directory.
even when the directory is not empty at all.
Another soupcon of good news is that at least I can read and write from and to Azure Blob Containers:
val df = spark.read.text(s"wasbs://$container@$accountName.blob.core.windows.net/MY_PARQUET")
This works.
What's the version number, Kenneth?
In desperation, I forced Spark to use a more recent version by running Spark with:
--packages org.apache.hadoop:hadoop-azure:3.2.0,com.microsoft.azure:azure-storage:8.6.0
And sure enough, tcpdump shows:
x-ms-version: 2019-02-02
This seems to pass the x-ms-version check but then results in:
Incorrect Blob type, please use the correct Blob type to access a blob on the server. Expected BLOCK_BLOB, actual UNSPECIFIED.
which (I'm guessing) is due to incompatibility due to azure-storage not being the version hadoop-azure is expecting.
I've left a message on the Hadoop users' mailing list asking for help as I still want to talk to the Azure File System rather than a Blob Container.
Hacky workaround
To get a Linux Azure instance that can mount Gen 2 storage, check out this GitHub repository.
I changed the kubernetes-volume-drivers/flexvolume/blobfuse/deployment/Dockerfile so:
FROM ubuntu:18.04
...
RUN apt update && apt install -y openjdk-8-jdk
giving me a newer OS and installing Java. Then, I deploy the OS image to Docker Hub:
$ cd kubernetes-volume-drivers/flexvolume/blobfuse/deployment
$ docker build -t blobfuse-jdk8 blobfuse-flexvol-installer/
Now let's build and deploy it:
$ docker images | grep blobfuse-jdk8
$ docker tag IMAGE_TAG ph1ll1phenry/blobfuse-openjdk-8-jdk-slim
$ docker push ph1ll1phenry/blobfuse-openjdk-8-jdk-slim
and having a look at Docker Hub, I can see my OS image.
Now, we need to get Spark to use this OS, so I slightly bastardise its Dockerfile:
$ git diff
...
-FROM openjdk:8-jdk-slim
+FROM ph1ll1phenry/blobfuse-openjdk-8-jdk-slim
We build it, docker push it and kubernetes apply the a slightly different yaml file and now Spark has a mount onto the Azure File System at /mnt/data.
Note that you will have had to apply the secret in K8s that looks something like this:
apiVersion: apps/v1
kind: Secret
metadata:
name: storage-secret
namespace: blogdemodeployments
type: Opaque
data:
azurestorageaccountname: ???
azurestorageaccountkey: ???
where the credentials are as we used for Spark's REPL.
The Solution
$ git diff
...
-FROM openjdk:8-jdk-slim
+FROM ph1ll1phenry/blobfuse-openjdk-8-jdk-slim
Note that you will have had to apply the secret in K8s that looks something like this:
apiVersion: apps/v1
kind: Secret
metadata:
name: storage-secret
namespace: blogdemodeployments
type: Opaque
data:
azurestorageaccountname: ???
azurestorageaccountkey: ???
where the credentials are as we used for Spark's REPL.
The Solution
This came from our friendly Azure sysadmin. Basically, it's to use OAuth, so:
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth2.client.secret", SECRET)
spark.conf.set("fs.azure.account.oauth2.client.id" , APP_ID)
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/" + TENANT + "/oauth2/token")
spark.conf.set("fs.azure.account.auth.type." + accountName + ".dfs.core.windows.net", "SharedKey")
spark.conf.set("fs.azure.account.key." + accountName + ".dfs.core.windows.net", accountKey)
where
- The tenant is the ID of our active directory in Azure
- The app id (also known as client id) is the ID of the service principal
- The secret is something you create under the service principal which you use to authenticate (i.e. a password)
Et voila. You can now use Spark 3 to talk to Azure file systems to both read and write.
No comments:
Post a Comment