Published on 00/00/0000
Last updated on 00/00/0000
Published on 00/00/0000
Last updated on 00/00/0000
Share
Share
PRODUCT
6 min read
Share
Apache Spark on Kubernetes series: Introduction to Spark on Kubernetes Scaling Spark made simple on Kubernetes The anatomy of Spark applications on Kubernetes Monitoring Apache Spark with Prometheus Spark History Server on Kubernetes Spark scheduling on Kubernetes demystified Spark Streaming Checkpointing on Kubernetes Deep dive into monitoring Spark and Zeppelin with Prometheus Apache Spark application resilience on Kubernetes
Apache Zeppelin on Kubernetes series: Running Zeppelin Spark notebooks on Kubernetes Running Zeppelin Spark notebooks on Kubernetes - deep dive
Apache Kafka on Kubernetes series: Kafka on Kubernetes - using etcd
One of the most popular spotguides for our open source, next generation CloudFoundry/Heroku-like Pipeline PaaS is for Apache Spark. In the last few weeks, we've been evangelizing for Spark on Kubernetes, and the feedback we've received has been outstanding. Thank you for pointing out issues, bringing us ideas and making feature requests. Keep them coming! In this post we'll be picking up where we left off, and digging into the details of how the Spark scheduler on Kubernetes works. This blog complements our anatomy of Spark applications on Kubernetes blogpost, and focuses on how executor scheduling is linked to Kubernetes. The scheduler that is shipped with Spark on Kubernetes is org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend
, which is a coarse-grained scheduler (derived from org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
). Just to recap our anatomy of Spark applications on Kubernetes blogpost and how to initiate the Spark scheduler:
Once
SparkContext
has the cluster manager it creates a task scheduler and a backend scheduler by invokingorg.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler)
.
spark.kubernetes.allocation.batch.delay
(if unspecified this defaults to 1 second).Initially both the number of requested executor pods and the number of registered executors is (0) less than that of the total expected executors. Thus, the scheduler will ask Kubernetes to allocate(schedule) executor pods. It doesn't immediately request the total expected number of executor pods from Kubernetes, but requests the pods in batches of spark.kubernetes.allocation.batch.size
(if unspecified this defaults to 5). In the next scheduling cycle the total number of registered executors is compared to the total number of requested executors. If there are less registered executors than requested executors it means that not all requested executors have been created yet by Kubernetes, thus the scheduler won't request new executors from Kubernetes.
if total_registered_executos < total_requested_executors {
// don't request new executors as there are still pending executor pods
}
If all executor pods requested so far were created by Kubernetes, and the executors registered with the driver, the scheduler checks whether the total number of expected executors has been reached.
if total_registered_executos < total_requested_executors {
// there are still pending executor pods to be created by Kubernetes,
// don't request new executor pods
} if total_expected_executors <= total_registered_executos {
// maximum allowed executor limit reached. Not scaling up further.
}
If there are no pending executors and the total expected executor count hasn't been reached, the scheduler requests the next batch of executor pods.
if total_registered_executors < total_requested_executors {
// there are still pending executor pods to be created by Kubernetes
// don't request new executor pods
} else if total_expected_executors <= total_registered_executors {
// maximum allowed executor limit reached. Not scaling up further.
} else {
int to_be_requested_num = min(total_expected_executors - total_registered_executors, allocation_batch_size
for (i = 0; i < to_be_requested_num; i++) {
request_new_executor_from_kubernetes()
total_requested_executors++
}
}
To recap, this is how a Spark application submisson works behind the scenes: Fixed number of executors To run a Spark job on a fixed number of spark executors, you will have to --conf spark.dynamicAllocation.enabled=false
(if this config is not passed to spark-submit then it defaults to false) and --conf spark.executor.instances=<number of executors>
(which if unspecified defaults to 1) to spark-submit. In this case the total number of expected executors is the value passed into spark.executor.instances
. This drives the executor pod allocation logic presented above. Dynamic executor allocation Dynamic executor allocation can be enabled by passing --conf spark.dynamicAllocation.enabled=true
to spark-submit. If done, the scheduler dynamically scales the number of executor pods to meet its needs. The initial number of executors is derived from:
spark.dynamicAllocation.minExecutors
(defaults to 0 if not specified)spark.dynamicAllocation.initialExecutors
(defaults to spark.dynamicAllocation.minExecutors
if not specified)spark.executor.instances
(defaults to 0 if not specified)by taking the maximum of the above three values. The initial number of executors must be less than or equal to spark.dynamicAllocation.maxExecutors
. An executor allocation manager is only instantiated (org.apache.spark.ExecutorAllocationManager
) when dynamic allocation is enabled, which is responsible for dynamically computing the total number of expected executors. The executor allocation manager runs every 100 ms in order to actualize the number of expected executors. It initially sets the total number to initial number of executors (see above). After that, it continuously updates the number of expected executors to whatever is necessary for the current load to satisfy all running and pending tasks. Simultaneously and continously, the KubernetesClusterSchedulerBackend
handles pod creation according to the logic described above. After that, it won't do anything until there are backlogged tasks for at least spark.dynamicAllocation.schedulerBacklogTimeout
seconds (whose default is 1), before it requests more executors, since the current total expected number of executors is not enough to handle the load. Initially, it will request 1 more executor by increasing the number of total expected executors. Going forward, it will check if the backlog persists for the duration of spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
(by default it's the same as spark.dynamicAllocation.schedulerBacklogTimeout
) before requesting additional executors. The executor allocation manager will request double what it requested in its last iteration (e.g. 2, 4, 8 ... executors), until the number of expected executors reaches the number of executors needed under the current load. As tasks are completed, executors won't have anything to do and will go idle. The allocation manager downscales executors that have been idle for at least spark.dynamicAllocation.executorIdleTimeout
seconds (whose default is 60 if unspecified). It passes the id
s of executors that are to be downscaled to KubernetesClusterSchedulerBackend
, which will engage Kubernetes to delete the corresponding executor pods.
Get emerging insights on innovative technology straight to your inbox.
Discover how AI assistants can revolutionize your business, from automating routine tasks and improving employee productivity to delivering personalized customer experiences and bridging the AI skills gap.
The Shift is Outshift’s exclusive newsletter.
The latest news and updates on generative AI, quantum computing, and other groundbreaking innovations shaping the future of technology.