Skip to content

Commit

Permalink
Add the ability to selectively disable watching or polling for pods o…
Browse files Browse the repository at this point in the history
…n Kubernetes for environments where etcd may be under a high load or otherwise not support polling/watching.
  • Loading branch information
holdenk committed Oct 12, 2021
1 parent 1af7072 commit 65d7cc7
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,23 @@ private[spark] object Config extends Logging {
.checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
.createWithDefaultString("1s")

val KUBERNETES_EXECUTOR_ENABLE_API_POLLING =
ConfigBuilder("spark.kubernetes.executor.enableApiPolling")
.doc("If Spark should poll Kubernetes for executor pod status. " +
"You should leave this enabled unless your encountering performance issues with your etcd.")
.version("3.3.0")
.booleanConf
.createWithDefault(true)

val KUBERNETES_EXECUTOR_ENABLE_API_WATCHER =
ConfigBuilder("spark.kubernetes.executor.enableApiWatcher")
.doc("If Spark should create watchers for executor pod status. " +
"You should leave this enabled unless your encountering performance issues with your etcd.")
.version("3.3.0")
.booleanConf
.createWithDefault(true)


val KUBERNETES_EXECUTOR_API_POLLING_INTERVAL =
ConfigBuilder("spark.kubernetes.executor.apiPollingInterval")
.doc("Interval between polls against the Kubernetes API server to inspect the " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@ private[spark] class ExecutorPodsPollingSnapshotSource(
pollingExecutor: ScheduledExecutorService) extends Logging {

private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
private val pollingEnabled = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_POLLING)

private var pollingFuture: Future[_] = _

def start(applicationId: String): Unit = {
require(pollingFuture == null, "Cannot start polling more than once.")
logDebug(s"Starting to check for executor pod state every $pollingInterval ms.")
pollingFuture = pollingExecutor.scheduleWithFixedDelay(
new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS)
if (pollingEnabled) {
require(pollingFuture == null, "Cannot start polling more than once.")
logDebug(s"Starting to check for executor pod state every $pollingInterval ms.")
pollingFuture = pollingExecutor.scheduleWithFixedDelay(
new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS)
}
}

def stop(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,30 @@ import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClient, Watcher, WatcherException}
import io.fabric8.kubernetes.client.Watcher.Action

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

private[spark] class ExecutorPodsWatchSnapshotSource(
snapshotsStore: ExecutorPodsSnapshotsStore,
kubernetesClient: KubernetesClient) extends Logging {
kubernetesClient: KubernetesClient,
conf: SparkConf) extends Logging {

private var watchConnection: Closeable = _
private val enablePolling = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER)

def start(applicationId: String): Unit = {
require(watchConnection == null, "Cannot start the watcher twice.")
logDebug(s"Starting watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," +
s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.")
watchConnection = kubernetesClient.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.watch(new ExecutorPodsWatcher())
if (enablePolling) {
require(watchConnection == null, "Cannot start the watcher twice.")
logDebug(s"Starting watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," +
s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.")
watchConnection = kubernetesClient.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.watch(new ExecutorPodsWatcher())
}
}

def stop(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit

val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
snapshotsStore,
kubernetesClient)
kubernetesClient,
sc.conf)

val eventsPollingExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"kubernetes-executor-pod-polling-sync")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.{ListOptionsBuilder, PodListBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import org.jmock.lib.concurrent.DeterministicScheduler
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Mockito.{verify, when}
import org.mockito.Mockito.{never, verify, when}
import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkFunSuite}
Expand All @@ -33,9 +33,9 @@ import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._

class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAndAfter {

private val sparkConf = new SparkConf
private val defaultConf = new SparkConf()

private val pollingInterval = sparkConf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
private val pollingInterval = defaultConf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)

@Mock
private var kubernetesClient: KubernetesClient = _
Expand All @@ -61,12 +61,6 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
before {
MockitoAnnotations.openMocks(this).close()
pollingExecutor = new DeterministicScheduler()
pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource(
sparkConf,
kubernetesClient,
eventQueue,
pollingExecutor)
pollingSourceUnderTest.start(TEST_SPARK_APP_ID)
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
.thenReturn(appIdLabeledPods)
Expand All @@ -77,6 +71,13 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
}

test("Items returned by the API should be pushed to the event queue") {
val sparkConf = new SparkConf()
pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource(
sparkConf,
kubernetesClient,
eventQueue,
pollingExecutor)
pollingSourceUnderTest.start(TEST_SPARK_APP_ID)
val exec1 = runningExecutor(1)
val exec2 = runningExecutor(2)
when(activeExecutorPods.list())
Expand All @@ -89,13 +90,27 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
verify(eventQueue).replaceSnapshot(Seq(exec1, exec2))
}

test("If polling is disabled we don't call pods() on the client") {
val sparkConf = new SparkConf()
val source = new ExecutorPodsPollingSnapshotSource(
sparkConf.set(KUBERNETES_EXECUTOR_ENABLE_API_POLLING, false),
kubernetesClient,
eventQueue,
pollingExecutor)
source.start(TEST_SPARK_APP_ID)
pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
verify(kubernetesClient, never()).pods()
}

test("SPARK-36334: Support pod listing with resource version") {
Seq(true, false).foreach { value =>
val sparkConf = new SparkConf()
val source = new ExecutorPodsPollingSnapshotSource(
sparkConf.set(KUBERNETES_EXECUTOR_API_POLLING_WITH_RESOURCE_VERSION, value),
kubernetesClient,
eventQueue,
pollingExecutor)
source.start(TEST_SPARK_APP_ID)
pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
if (value) {
verify(activeExecutorPods).list(new ListOptionsBuilder().withResourceVersion("0").build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.Mockito.{verify, when}
import org.mockito.Mockito.{never, verify, when}
import org.scalatest.BeforeAndAfter

import org.apache.spark.SparkConf
import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
Expand Down Expand Up @@ -61,17 +63,27 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA
when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(executorRoleLabeledPods)
when(executorRoleLabeledPods.watch(watch.capture())).thenReturn(watchConnection)
watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
eventQueue, kubernetesClient)
watchSourceUnderTest.start(TEST_SPARK_APP_ID)
}

test("Watch events should be pushed to the snapshots store as snapshot updates.") {
val conf = new SparkConf()
watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
eventQueue, kubernetesClient, conf)
watchSourceUnderTest.start(TEST_SPARK_APP_ID)
val exec1 = runningExecutor(1)
val exec2 = runningExecutor(2)
watch.getValue.eventReceived(Action.ADDED, exec1)
watch.getValue.eventReceived(Action.MODIFIED, exec2)
verify(eventQueue).updatePod(exec1)
verify(eventQueue).updatePod(exec2)
}

test("Verify if watchers are disabled we don't call pods() on the client") {
val conf = new SparkConf()
conf.set(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER, false)
watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
eventQueue, kubernetesClient, conf)
watchSourceUnderTest.start(TEST_SPARK_APP_ID)
verify(kubernetesClient, never()).pods()
}
}

0 comments on commit 65d7cc7

Please sign in to comment.