Published on 00/00/0000
Last updated on 00/00/0000
Published on 00/00/0000
Last updated on 00/00/0000
Share
Share
PRODUCT
7 min read
Share
Apache Spark on Kubernetes series: Introduction to Spark on Kubernetes Scaling Spark made simple on Kubernetes The anatomy of Spark applications on Kubernetes Monitoring Apache Spark with Prometheus Spark History Server on Kubernetes Spark scheduling on Kubernetes demystified Spark Streaming Checkpointing on Kubernetes Deep dive into monitoring Spark and Zeppelin with Prometheus Apache Spark application resilience on Kubernetes
Apache Zeppelin on Kubernetes series: Running Zeppelin Spark notebooks on Kubernetes Running Zeppelin Spark notebooks on Kubernetes - deep dive
Apache Kafka on Kubernetes series: Kafka on Kubernetes - using etcd
Spark Streaming applications are special
Spark applications capable of processing data continuously, which allows reuse of code for batch processing, joining streams against historical data, or the running of ad-hoc queries on stream data. These streaming scenarios require special considerations when apps run for long periods and without interruption. To begin with, you'll need at least these two things:
For the scheduler, and for Spark in general, we use Spark on Kubernetes. If you need to deploy a Kubernetes cluster to a cloud provider, you can use Pipeline to do the heavy lifting for you. By default, Kubernetes takes care of failing Spark executors and drivers by restarting failing pods. Although this is enough for executors
, for a driver
it is necessary, but insufficient. In order to make such a driver
more resilient to failure, Spark checkpointing must first be enabled.
When speaking about Spark checkpointing, it is necessary that we distinguish between two different varieties:
This blogpost focuses on the Metadata
, because that's the checkpointing needed to recover from failures. If you're interested in a scenario which requires data checkpointing, you should check out the Spark documentation.
Metadata checkpointing
saves the required metadata, so, in case of failure, the application can continue where it left of. Usually, the most common storage layer for the checkpoint is HDFS or S3. For Kubernetes and in the cloud, you'll probably be using S3 in favor of managing your own HDFS cluster. On the other hand, S3 is slow and, if you're working with large Spark streaming applications, you'll face bottlenecks and issues pertaining to slowness. One of your better
options is to use either EFS on Amazon, AzureFile on Azure, or GCE-PD on Google.
For this blogpost we'll use AzureFiles, and showcase the usage of Spark streaming with AzureFiles on Azure AKS, using the latest 0.2.0 version of Pipeline.
Azure Files offers fully managed file shares in the cloud that are accessible via the Server Message Block (SMB) protocol and can be used concurrently by multiple cloud VMs.
It turns out that just Spark checkpointing, alone, doesn't work with the current Kubernetes Spark implementation. In order to support Spark checkpointing, we need to make the following changes:
Pods
are vulnerable because they're only scheduled to one node, so if a node fails its corresponding pods will never be rescheduled.checkpointdir
, this also requires some configuring of spark.kubernetes.checkpointdir.enable
,spark.kubernetes.checkpointdir.path
, spark.kubernetes.checkpointdir.size
We built our own Spark version with all of the above changes, which is available on DockerHub (banzaicloud/spark-driver:v2.2.0-k8s-1.0.207). We are going to contribute these changes back in a PR
soon.
Next, we need a Spark Streaming application that has been properly configured to enable checkpointing. For simplicity's sake, we're going to use a slightly modified version of the Spark Streaming example, NetworkWordCount. This application will consume data from a netcat
server, which, to keep things managable, we'll run on our local machine. Since we're running Kubernetes clusters on AKS, we need to ensure that the netcat
server running on our host is reachable from outside. We'll use ngrok for that.
nc -lk 9999
ngrok tcp 9999
After creating an AKS cluster with Pipeline, we need to change and delete the default storageclass
created by Azure, because it is of the AzureDisk
type, when what we want is AzureFile
. If KUBECONFIG is set correctly (e.g.: export KUBECONFIG=path to kubeconfig), the new storage class can be created with kubectl create
via the following yaml:
kubectl create -f - <<EOF
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
annotations:
storageclass.kubernetes.io/is-default-class: "true"
name: azurefile
provisioner: kubernetes.io/azure-file
parameters:
skuName: Standard_LRS
location: eastus
storageAccount: banzaicloud
EOF
The old storageclass can be deleted with kubectl delete storageclass default
. Feel free to skip this phase, if using AzureDisk
for storage is ammenable to you. However, you should note that we picked AzureFiles in order to leverage some of its comperative benefits. AzureFiles
also requires a new StorageAccount, which should be created in the same resource group as a cluster, and, if you're mimicking the example given above, the account name needs to be banzaicloud
. Next, you've got to deploy the Spark Helm chart, which creates all the required subsystems for your Spark job. To do that, use our Postman collection to create a deployment called banzaicloud-stable/spark
. To submit a Spark application from your computer to a Kubernetes cluster, you need a spark-submit
that supports the previously mentioned configs. We've created a tar.tgz that can be downloaded and used to submit your Spark application. Download the file and untar it to the directory of your choosing. When you're finished, clone or download the example and build it:
ls -lah
total 80
drwxr-xr-x 11 baluchicken staff 352B Feb 21 15:07 .
drwxr-xr-x 24 baluchicken staff 768B Feb 5 16:30 ..
drwxr-xr-x 14 baluchicken staff 448B Feb 21 19:07 .git
-rw-r--r-- 1 baluchicken staff 43B Feb 5 16:55 .gitignore
drwxr-xr-x 11 baluchicken staff 352B Feb 21 19:07 .idea
-rw-r--r-- 1 baluchicken staff 11K Feb 20 14:57 LICENSE
-rw-r--r-- 1 baluchicken staff 2.9K Feb 20 14:57 pom.xml
-rw-r--r-- 1 baluchicken staff 241B Jan 31 14:55 settings.xml
-rw-r--r-- 1 baluchicken staff 14K Feb 5 16:54 spark-network-word-count.iml
drwxr-xr-x 3 baluchicken staff 96B Jan 31 14:55 src
mvn clean package
To submit a Spark app from your local machine, you must use a resource staging server (if you'd like to learn more about spark-submit/resource staging servers read this blogpost). Port-forward to upload your jar to the Kubernetes cluster. In order to do that, find your resource staging server and use kubectl
's port-forwarding feature:
kubectl get pods
NAME READY STATUS RESTARTS AGE
shuffle-ncmw9 1/1 Running 0 30m
falling-monkey-spark-rss-867c7c855d-h82nzj 1/1 Running 0 30m
kubectl port-forward ulterior-dingo-spark-rss-558ff96bb4-ng7sj 31000:10000
Forwarding from 127.0.0.1:31000 -> 10000
Please note that sometimes port-forwarding fails. Restart and try again if you're having trouble.
If port-forward
is running, you can submit your Spark app with the following command:
bin/spark-submit --verbose \
--deploy-mode cluster \
--class com.banzaicloud.SparkNetworkWordCount \
--master k8s://<replace this with the kubernetesendpoint> \
--kubernetes-namespace default \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.app.name=NetworkWordCount \
--conf spark.kubernetes.driver.docker.image=banzaicloud/spark-driver:v2.2.0-k8s-1.0.207 \
--conf spark.kubernetes.executor.docker.image=banzaicloud/spark-executor:v2.2.0-k8s-1.0.207 \
--conf spark.kubernetes.initcontainer.docker.image=banzaicloud/spark-init:v2.2.0-k8s-1.0.207 \
--conf spark.kubernetes.checkpointdir.enable=true \
--conf spark.driver.cores="300m" \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.shuffle.namespace=default \
--conf spark.kubernetes.resourceStagingServer.uri=http://localhost:31000 \
--conf spark.kubernetes.resourceStagingServer.internal.uri=http://spark-rss:10000 \
--conf spark.local.dir=/tmp/spark-local \
file:///<your path to the jar>spark-network-word-count-1.0-SNAPSHOT.jar tcp://0.tcp.ngrok.io <your ngrok port> file:///checkpointdir
Now check and see if your cluster is alive:
kubectl get pods
NAME READY STATUS RESTARTS AGE
networkwordcount-1519234651100-driver-6frsx 1/1 Running 0 4m
networkwordcount-1519234651100-v2wj-exec-1 1/1 Running 0 3m
networkwordcount-1519234651100-v2wj-exec-2 1/1 Running 0 3m
shuffle-ncmw9 1/1 Running 0 1h
falling-monkey-spark-rss-867c7c855d-h82nz 1/1 Running 0 1h
To check if the checkpointing works, kill the driver pod:
kubectl delete pod networkwordcount-1519234651100-driver-6frsx
pod "networkwordcount-1519234651100-driver-6frsx" deleted
The Job
will automatically restart the pod for you:
kubectl get pods
NAME READY STATUS RESTARTS AGE
networkwordcount-1519234651100-driver-lj7pr 1/1 Running 0 1m
networkwordcount-1519234651100-fbd4-exec-1 1/1 Running 0 22s
networkwordcount-1519234651100-fbd4-exec-2 1/1 Running 0 22s
shuffle-ncmw9 1/1 Running 0 1h
falling-monkey-spark-rss-867c7c855d-h82nz 1/1 Running 0 1h
Check the driver logs and look for checkpointing dir usage:
<pre class="language-unknown"><code class="language-unknown">kubectl logs networkwordcount-1519234651100-driver-lj7pr
CheckpointReader:54 - Attempting to load checkpoint from file file:/checkpointdir/checkpoint-1519236375000
Checkpoint:54 - Checkpoint for time 1519236375000 ms validated
CheckpointReader:54 - Checkpoint successfully loaded from file file:/checkpointdir/checkpoint-1519236375000</code></pre>
That's it for now - you can go through this same process on AWS EFS by changing AzureFiles to EFS, and by reading our EFS blog.
Get emerging insights on innovative technology straight to your inbox.
Discover how AI assistants can revolutionize your business, from automating routine tasks and improving employee productivity to delivering personalized customer experiences and bridging the AI skills gap.
The Shift is Outshift’s exclusive newsletter.
The latest news and updates on generative AI, quantum computing, and other groundbreaking innovations shaping the future of technology.