Published on 00/00/0000
Last updated on 00/00/0000
Published on 00/00/0000
Last updated on 00/00/0000
Share
Share
INSIGHTS
8 min read
Share
At Banzai Cloud we are building a feature-rich enterprise-grade application platform, built for containers on top of Kubernetes, called Pipeline. Applications deployed to Pipeline automatically inherit the platform's features: enterprise-grade security, observability (centralized log collection, monitoring and tracing), discovery, high availability and resiliency, just to name a few - encapsulated in spotguides. One of the most popular spotguides we deploy is Spark. In the past few months we've been working and pushing many pull requests to make Spark a first class player on Kubernetes and to make it resilient. There is a good collection of posts (and contributions) we've already put out, so, if you are interested in one of these features, read:
Apache Spark on Kubernetes series: Introduction to Spark on Kubernetes Scaling Spark made simple on Kubernetes The anatomy of Spark applications on Kubernetes Monitoring Apache Spark with Prometheus Spark History Server on Kubernetes Spark scheduling on Kubernetes demystified Spark Streaming Checkpointing on Kubernetes Deep dive into monitoring Spark and Zeppelin with Prometheus Apache Spark application resilience on Kubernetes Collecting Spark History Server event logs in the cloud
Update - The solution described below using
ReadWriteOnce
storage for checkpointing dir works. However, there may be use cases that requireReadWriteMany/Object
storage to be used for checkpointing directory. In such instances the solution described is imperfect. One possible solution is to use Spark Operator. Another possible solution would be the utilization ofStatefulJob
- a new Kubernetes feature.
Today's post will focus on Spark Driver resiliency on Kubernetes, illustrated through the simulation of a node failure. Note that, at this point (upstream), not all Spark components are resilient - we've highlighted these in this post: Apache Spark application resilience on Kubernetes, however, we have now contributed all changes as pull requests - in the meantime you can use our Spark fork as you see fit. Earlier we proposed a change PR to Apache Spark, which enables driver resiliency by using a Kubernetes Job instead of a Pod. In this blog post we will not go into details about implementation, as we have already published two blog posts on that subject. Instead, we will focus on the question, is it a good idea to use Kubernetes Jobs to make Spark resilient to failures?. Lets start by keeping in mind that Spark batch processing applications are stateless. If the driver pod dies all previous computation is lost, and the newly created driver will pick up computation over again from the beginning.
This is not a Spark on Kubernetes limitation as Spark on Yarn also behaves the same way.
Spark Stream processing applications are different from batch processing, as they are statefull when checkpointing is enabled. With checkpointing enabled, Spark persists data so that in case of failure it can recover it. Correctness is vital here, since we cannot afford checkpoint file corruptions. Corruption may occur if two Spark driver pods run simultaneously while working on the same Spark Job. We can see that correctness is not an issue when the Spark submit creates a Pod because, in case of a Node failure, the Pod will simply die and the Kubernetes Controller will not reschedule a new Pod. If there is a Job, the controller will reschedule a new Pod, as long as the last one died without completing said job. The question is, is it possible that two Spark driver pods executing the same Spark job might be scheduled simultaneously. Well, yes, because the Job controller is greedy and, in the event of network failure, will schedule a new driver. However, based on our observations, such instances do not affect correctness at all.
We will simulate the following scenario: We are going to use a GKE cluster, which can be created easily, either by using Pipeline or by using the Google Cloud Console UI.
We are going to use
iptables
, so please useUbuntu
as the image type. Make sure you create a cluster with at least 2 nodes.
Next, add your ssh-key, as we will need to get inside the node to simulate a network failure. Generate the kubernetes config:
gcloud container clusters get-credentials <your cluster name> --zone <your zone>
To simulate a network partition we are going to ssh into the node and add some iptables rules, so that the kubelet
cannot access the Kubernetes API server. Before that, we need to submit a Spark Job which runs long enough to encounter the network partition. For the sake of simplicity we are going to use Spark Pi with a large precision and only two executors. To check what happens with the checkpoint dir we are going to attach a PVC to the Pod. (We know it is not a Spark Streaming App, but, as we will see, that does not really matter.)
kubectl create -f - <<EOF
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: spark-pvc-claim
spec:
accessModes:
- "ReadWriteOnce"
resources:
requests:
storage: "1Gi"
EOF
Submit the Spark Pi application:
bin/spark-submit --verbose \
--master k8s://https://35.197.251.160 \
--deploy-mode cluster \
--name spark-pi \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-pvc-claim \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.driver.job.backofflimit=1 \
--conf spark.kubernetes.container.image=banzaicloud/spark:master_job-dev_0.5_blog \
local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.0-SNAPSHOT.jar 200000
Check where the driver pod is scheduled and look up the external IP address of the node in the Google Cloud Console UI.
kubectl describe pod spark-pi-1531855837017-driver-sx77q
Name: spark-pi-1531855837017-driver-sx77q
Namespace: default
Node: gke-blog-cluster-1-default-pool-7484150c-z6wz/10.154.0.3
SSH into that Node and apply the following iptables
rules:
# First, save the iptables rules so they can be reused later
sudo iptables-save > iptables.backup
# Check which port is used by kubelet
sudo netstat -ntp | grep kubelet
tcp 0 0 10.154.0.3:55288 35.197.251.160:443 ESTABLISHED 2191/kubelet
tcp 0 0 10.154.0.3:55282 35.197.251.160:443 ESTABLISHED 2191/kubelet
tcp 0 0 10.154.0.3:35414 169.254.169.254:80 ESTABLISHED 2191/kubelet
tcp6 0 0 10.154.0.3:10255 10.52.0.6:34778 ESTABLISHED 2191/kubelet
tcp6 0 0 10.154.0.3:10255 10.52.1.4:57318 ESTABLISHED 2191/kubelet
tcp6 0 0 10.154.0.3:10250 10.154.0.2:53980 ESTABLISHED 2191/kubelet
# Our Kubernetes API external IP is 35.197.251.160 and
# the internal IP is 169.254.169.254. We need to block those
# to simulate network partition
sudo iptables -A OUTPUT -p tcp -d 35.197.251.160 -j DROP
sudo iptables -A OUTPUT -p tcp -d 169.254.169.254 -j DROP
Check the Kubernetes nodes. After a while, because of the iptables
rules, one node state will change to NotReady
.
kubectl get nodes -w
NAME STATUS ROLES AGE VERSION
gke-blog-cluster-1-default-pool-7484150c-qhwh Ready <none> 4h v1.10.4-gke.2
gke-blog-cluster-1-default-pool-7484150c-z6wz Ready <none> 4h v1.10.4-gke.2
gke-blog-cluster-1-default-pool-7484150c-qhwh Ready <none> 4h v1.10.4-gke.2
gke-blog-cluster-1-default-pool-7484150c-z6wz NotReady <none> 4h v1.10.4-gke.2
gke-blog-cluster-1-default-pool-7484150c-qhwh Ready <none> 4h v1.10.4-gke.2
If we also check the Kubernetes pods, we will see the following:
kubectl get pods -w
NAME READY STATUS RESTARTS AGE
spark-pi-1531855837017-driver-sx77q 1/1 Running 0 1m
spark-application-1531855852300-exec-1 1/1 Running 0 1m
spark-application-1531855852300-exec-2 1/1 Running 0 1m
spark-pi-1531855837017-driver-sx77q 1/1 Unknown 0 7m
After a small amount of time the driver pod will go from Running
to Unknown
. Remember, that pod is running on a Node which is separated from the cluster. As you know, the Spark job has been submitted as a Kubernetes Job, so the Job controller will create a new Pod as soon as the other goes into an Unknown state. This can cause correctness issues - because the separated driver pod is still running, it does not require a connection to the Kubernetes API, and if two driver pods are running side by side, that's a problem. Let's check back on the pods.
kubectl get pods -w
NAME READY STATUS RESTARTS AGE
spark-pi-1531855837017-driver-p6cbm 0/1 ContainerCreating 0 7s
spark-pi-1531855837017-driver-sx77q 1/1 Unknown 0 7m
If we investigate the newly created driver pod we can see that it will never start successfully as long as the other pod is not terminated, thanks to the ReadWriteOnce
PVC, so there will be no correctness issues at all.
Warning FailedMount 19s kubelet, gke-blog-cluster-1-default-pool-7484150c-qhwh Unable to mount volumes for pod "spark-pi-1531902241419-driver-p6cbm_default(c00c5519-8a64-11e8-b81d-42010a9a00fa)": timeout expired waiting for volumes to attach or mount for pod "default"/"spark-pi-1531902241419-driver-p6cbm". list of unmounted volumes=[checkpointpvc]. list of unattached volumes=[spark-local-dir-1 checkpointpvc spark-conf-volume default-token-nbpnb]
Now let's restore the iptables
rules to normal, so the node can rejoin the cluster.
sudo iptables-restore < iptables.backup
The lost node will rejoin the cluster
kubectl get nodes -w
NAME STATUS ROLES AGE VERSION
gke-blog-cluster-1-default-pool-7484150c-qhwh Ready <none> 4h v1.10.4-gke.2
gke-blog-cluster-1-default-pool-7484150c-z6wz Ready <none> 4h v1.10.4-gke.2
gke-blog-cluster-1-default-pool-7484150c-qhwh Ready <none> 4h v1.10.4-gke.2
gke-blog-cluster-1-default-pool-7484150c-z6wz Ready <none> 4h v1.10.4-gke.2
And pod status will show that, once the old driver has been terminated (it will be, since Kubernetes terminates every pod on the rejoined node), the new one will arise and finally be properly created.
kubectl get pods -w
spark-pi-1531855837017-driver-p6cbm 1/1 Running 0 2m
spark-pi-1531855837017-driver-sx77q 0/1 Terminating 0 9m
spark-application-1531856426866-exec-1 0/1 Pending 0 0s
spark-application-1531856426866-exec-1 0/1 Pending 0 0s
spark-application-1531856426866-exec-1 0/1 ContainerCreating 0 0s
spark-application-1531856426866-exec-2 0/1 Pending 0 0s
spark-application-1531856426866-exec-2 0/1 Pending 0 0s
spark-application-1531856426866-exec-2 0/1 ContainerCreating 0 0s
spark-application-1531856426866-exec-2 1/1 Running 0 1s
spark-application-1531856426866-exec-1 1/1 Running 0 1s
To summarize, let's take a look at the image below. It shows what happens during a network separation.
Get emerging insights on innovative technology straight to your inbox.
Discover why security teams rely on Panoptica's graph-based technology to navigate and prioritize risks across multi-cloud landscapes, enhancing accuracy and resilience in safeguarding diverse ecosystems.
The Shift is Outshift’s exclusive newsletter.
The latest news and updates on cloud native modern applications, application security, generative AI, quantum computing, and other groundbreaking innovations shaping the future of technology.