Skip to content

Commit

Permalink
[SPARK-36462][K8S] Add the ability to selectively disable watching or…
Browse files Browse the repository at this point in the history
… polling

### What changes were proposed in this pull request?

Add the ability to selectively disable watching or polling

Updated version of apache#34264

### Why are the changes needed?

Watching or polling for pod status on Kubernetes can place additional load on etcd, with a large number of executors and large number of jobs this can have negative impacts and executors register themselves with the driver under normal operations anyways.

### Does this PR introduce _any_ user-facing change?

Two new config flags.

### How was this patch tested?

New unit tests + manually tested a forked version of this on an internal cluster with both watching and polling disabled.

Closes apache#36433 from holdenk/SPARK-36462-allow-spark-on-kube-to-operate-without-watchers.

Lead-authored-by: Holden Karau <[email protected]>
Co-authored-by: Holden Karau <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
2 people authored and dongjoon-hyun committed Aug 18, 2022
1 parent 6cd9d88 commit 5bffb98
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,25 @@ private[spark] object Config extends Logging {
.checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
.createWithDefaultString("1s")

val KUBERNETES_EXECUTOR_ENABLE_API_POLLING =
ConfigBuilder("spark.kubernetes.executor.enableApiPolling")
.doc("If Spark should poll Kubernetes for executor pod status. " +
"You should leave this enabled unless you're encountering issues with your etcd.")
.version("3.4.0")
.internal()
.booleanConf
.createWithDefault(true)

val KUBERNETES_EXECUTOR_ENABLE_API_WATCHER =
ConfigBuilder("spark.kubernetes.executor.enableApiWatcher")
.doc("If Spark should create watchers for executor pod status. " +
"You should leave this enabled unless you're encountering issues with your etcd.")
.version("3.4.0")
.internal()
.booleanConf
.createWithDefault(true)


val KUBERNETES_EXECUTOR_API_POLLING_INTERVAL =
ConfigBuilder("spark.kubernetes.executor.apiPollingInterval")
.doc("Interval between polls against the Kubernetes API server to inspect the " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,18 @@ class ExecutorPodsPollingSnapshotSource(
pollingExecutor: ScheduledExecutorService) extends Logging {

private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
private val pollingEnabled = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_POLLING)

private var pollingFuture: Future[_] = _

@Since("3.1.3")
def start(applicationId: String): Unit = {
require(pollingFuture == null, "Cannot start polling more than once.")
logDebug(s"Starting to check for executor pod state every $pollingInterval ms.")
pollingFuture = pollingExecutor.scheduleWithFixedDelay(
new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS)
if (pollingEnabled) {
require(pollingFuture == null, "Cannot start polling more than once.")
logDebug(s"Starting to check for executor pod state every $pollingInterval ms.")
pollingFuture = pollingExecutor.scheduleWithFixedDelay(
new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS)
}
}

@Since("3.1.3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClient, Watcher, WatcherException}
import io.fabric8.kubernetes.client.Watcher.Action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Since, Stable}
import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
Expand All @@ -38,19 +40,28 @@ import org.apache.spark.util.Utils
@DeveloperApi
class ExecutorPodsWatchSnapshotSource(
snapshotsStore: ExecutorPodsSnapshotsStore,
kubernetesClient: KubernetesClient) extends Logging {
kubernetesClient: KubernetesClient,
conf: SparkConf) extends Logging {

private var watchConnection: Closeable = _
private val enableWatching = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER)

// If we're constructed with the old API get the SparkConf from the running SparkContext.
def this(snapshotsStore: ExecutorPodsSnapshotsStore, kubernetesClient: KubernetesClient) = {
this(snapshotsStore, kubernetesClient, SparkContext.getOrCreate().conf)
}

@Since("3.1.3")
def start(applicationId: String): Unit = {
require(watchConnection == null, "Cannot start the watcher twice.")
logDebug(s"Starting watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," +
s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.")
watchConnection = kubernetesClient.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.watch(new ExecutorPodsWatcher())
if (enableWatching) {
require(watchConnection == null, "Cannot start the watcher twice.")
logDebug(s"Starting to watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," +
s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.")
watchConnection = kubernetesClient.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.watch(new ExecutorPodsWatcher())
}
}

@Since("3.1.3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit

val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
snapshotsStore,
kubernetesClient)
kubernetesClient,
sc.conf)

val eventsPollingExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"kubernetes-executor-pod-polling-sync")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.{ListOptionsBuilder, PodListBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import org.jmock.lib.concurrent.DeterministicScheduler
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Mockito.{verify, when}
import org.mockito.Mockito.{never, verify, when}
import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkFunSuite}
Expand All @@ -33,9 +33,9 @@ import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._

class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAndAfter {

private val sparkConf = new SparkConf
private val defaultConf = new SparkConf()

private val pollingInterval = sparkConf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
private val pollingInterval = defaultConf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)

@Mock
private var kubernetesClient: KubernetesClient = _
Expand All @@ -61,12 +61,6 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
before {
MockitoAnnotations.openMocks(this).close()
pollingExecutor = new DeterministicScheduler()
pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource(
sparkConf,
kubernetesClient,
eventQueue,
pollingExecutor)
pollingSourceUnderTest.start(TEST_SPARK_APP_ID)
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
.thenReturn(appIdLabeledPods)
Expand All @@ -77,6 +71,13 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
}

test("Items returned by the API should be pushed to the event queue") {
val sparkConf = new SparkConf()
pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource(
sparkConf,
kubernetesClient,
eventQueue,
pollingExecutor)
pollingSourceUnderTest.start(TEST_SPARK_APP_ID)
val exec1 = runningExecutor(1)
val exec2 = runningExecutor(2)
when(activeExecutorPods.list())
Expand All @@ -89,13 +90,27 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
verify(eventQueue).replaceSnapshot(Seq(exec1, exec2))
}

test("SPARK-36462: If polling is disabled we don't call pods() on the client") {
val sparkConf = new SparkConf()
val source = new ExecutorPodsPollingSnapshotSource(
sparkConf.set(KUBERNETES_EXECUTOR_ENABLE_API_POLLING, false),
kubernetesClient,
eventQueue,
pollingExecutor)
source.start(TEST_SPARK_APP_ID)
pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
verify(kubernetesClient, never()).pods()
}

test("SPARK-36334: Support pod listing with resource version") {
Seq(true, false).foreach { value =>
val sparkConf = new SparkConf()
val source = new ExecutorPodsPollingSnapshotSource(
sparkConf.set(KUBERNETES_EXECUTOR_API_POLLING_WITH_RESOURCE_VERSION, value),
kubernetesClient,
eventQueue,
pollingExecutor)
source.start(TEST_SPARK_APP_ID)
pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
if (value) {
verify(activeExecutorPods).list(new ListOptionsBuilder().withResourceVersion("0").build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.Mockito.{verify, when}
import org.mockito.Mockito.{never, verify, when}
import org.scalatest.BeforeAndAfter

import org.apache.spark.SparkConf
import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
Expand Down Expand Up @@ -61,17 +63,27 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA
when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(executorRoleLabeledPods)
when(executorRoleLabeledPods.watch(watch.capture())).thenReturn(watchConnection)
watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
eventQueue, kubernetesClient)
watchSourceUnderTest.start(TEST_SPARK_APP_ID)
}

test("Watch events should be pushed to the snapshots store as snapshot updates.") {
val conf = new SparkConf()
watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
eventQueue, kubernetesClient, conf)
watchSourceUnderTest.start(TEST_SPARK_APP_ID)
val exec1 = runningExecutor(1)
val exec2 = runningExecutor(2)
watch.getValue.eventReceived(Action.ADDED, exec1)
watch.getValue.eventReceived(Action.MODIFIED, exec2)
verify(eventQueue).updatePod(exec1)
verify(eventQueue).updatePod(exec2)
}

test("SPARK-36462: Verify if watchers are disabled we don't call pods() on the client") {
val conf = new SparkConf()
conf.set(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER, false)
watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
eventQueue, kubernetesClient, conf)
watchSourceUnderTest.start(TEST_SPARK_APP_ID)
verify(kubernetesClient, never()).pods()
}
}

0 comments on commit 5bffb98

Please sign in to comment.