diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 1f508b52729e7..b8e9b92159b65 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -483,6 +483,25 @@ private[spark] object Config extends Logging { .checkValue(interval => interval > 0, s"Logging interval must be a positive time value.") .createWithDefaultString("1s") + val KUBERNETES_EXECUTOR_ENABLE_API_POLLING = + ConfigBuilder("spark.kubernetes.executor.enableApiPolling") + .doc("If Spark should poll Kubernetes for executor pod status. " + + "You should leave this enabled unless you're encountering issues with your etcd.") + .version("3.4.0") + .internal() + .booleanConf + .createWithDefault(true) + + val KUBERNETES_EXECUTOR_ENABLE_API_WATCHER = + ConfigBuilder("spark.kubernetes.executor.enableApiWatcher") + .doc("If Spark should create watchers for executor pod status. " + + "You should leave this enabled unless you're encountering issues with your etcd.") + .version("3.4.0") + .internal() + .booleanConf + .createWithDefault(true) + + val KUBERNETES_EXECUTOR_API_POLLING_INTERVAL = ConfigBuilder("spark.kubernetes.executor.apiPollingInterval") .doc("Interval between polls against the Kubernetes API server to inspect the " + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala index 192b5993efe07..10f26bd441ead 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala @@ -45,15 +45,18 @@ class ExecutorPodsPollingSnapshotSource( pollingExecutor: ScheduledExecutorService) extends Logging { private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + private val pollingEnabled = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_POLLING) private var pollingFuture: Future[_] = _ @Since("3.1.3") def start(applicationId: String): Unit = { - require(pollingFuture == null, "Cannot start polling more than once.") - logDebug(s"Starting to check for executor pod state every $pollingInterval ms.") - pollingFuture = pollingExecutor.scheduleWithFixedDelay( - new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + if (pollingEnabled) { + require(pollingFuture == null, "Cannot start polling more than once.") + logDebug(s"Starting to check for executor pod state every $pollingInterval ms.") + pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + } } @Since("3.1.3") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala index 06d942eb5b36f..a334ece565377 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala @@ -22,7 +22,9 @@ import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.{KubernetesClient, Watcher, WatcherException} import io.fabric8.kubernetes.client.Watcher.Action +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.annotation.{DeveloperApi, Since, Stable} +import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -38,19 +40,28 @@ import org.apache.spark.util.Utils @DeveloperApi class ExecutorPodsWatchSnapshotSource( snapshotsStore: ExecutorPodsSnapshotsStore, - kubernetesClient: KubernetesClient) extends Logging { + kubernetesClient: KubernetesClient, + conf: SparkConf) extends Logging { private var watchConnection: Closeable = _ + private val enableWatching = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER) + + // If we're constructed with the old API get the SparkConf from the running SparkContext. + def this(snapshotsStore: ExecutorPodsSnapshotsStore, kubernetesClient: KubernetesClient) = { + this(snapshotsStore, kubernetesClient, SparkContext.getOrCreate().conf) + } @Since("3.1.3") def start(applicationId: String): Unit = { - require(watchConnection == null, "Cannot start the watcher twice.") - logDebug(s"Starting watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," + - s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.") - watchConnection = kubernetesClient.pods() - .withLabel(SPARK_APP_ID_LABEL, applicationId) - .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) - .watch(new ExecutorPodsWatcher()) + if (enableWatching) { + require(watchConnection == null, "Cannot start the watcher twice.") + logDebug(s"Starting to watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," + + s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.") + watchConnection = kubernetesClient.pods() + .withLabel(SPARK_APP_ID_LABEL, applicationId) + .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) + .watch(new ExecutorPodsWatcher()) + } } @Since("3.1.3") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 10ea3a8cb0e46..507d2b310b7cb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -114,7 +114,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource( snapshotsStore, - kubernetesClient) + kubernetesClient, + sc.conf) val eventsPollingExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( "kubernetes-executor-pod-polling-sync") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala index 11b604a4d8322..e0016a2ae0503 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala @@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.{ListOptionsBuilder, PodListBuilder} import io.fabric8.kubernetes.client.KubernetesClient import org.jmock.lib.concurrent.DeterministicScheduler import org.mockito.{Mock, MockitoAnnotations} -import org.mockito.Mockito.{verify, when} +import org.mockito.Mockito.{never, verify, when} import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} @@ -33,9 +33,9 @@ import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAndAfter { - private val sparkConf = new SparkConf + private val defaultConf = new SparkConf() - private val pollingInterval = sparkConf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + private val pollingInterval = defaultConf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) @Mock private var kubernetesClient: KubernetesClient = _ @@ -61,12 +61,6 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn before { MockitoAnnotations.openMocks(this).close() pollingExecutor = new DeterministicScheduler() - pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource( - sparkConf, - kubernetesClient, - eventQueue, - pollingExecutor) - pollingSourceUnderTest.start(TEST_SPARK_APP_ID) when(kubernetesClient.pods()).thenReturn(podOperations) when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) .thenReturn(appIdLabeledPods) @@ -77,6 +71,13 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn } test("Items returned by the API should be pushed to the event queue") { + val sparkConf = new SparkConf() + pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource( + sparkConf, + kubernetesClient, + eventQueue, + pollingExecutor) + pollingSourceUnderTest.start(TEST_SPARK_APP_ID) val exec1 = runningExecutor(1) val exec2 = runningExecutor(2) when(activeExecutorPods.list()) @@ -89,13 +90,27 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn verify(eventQueue).replaceSnapshot(Seq(exec1, exec2)) } + test("SPARK-36462: If polling is disabled we don't call pods() on the client") { + val sparkConf = new SparkConf() + val source = new ExecutorPodsPollingSnapshotSource( + sparkConf.set(KUBERNETES_EXECUTOR_ENABLE_API_POLLING, false), + kubernetesClient, + eventQueue, + pollingExecutor) + source.start(TEST_SPARK_APP_ID) + pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS) + verify(kubernetesClient, never()).pods() + } + test("SPARK-36334: Support pod listing with resource version") { Seq(true, false).foreach { value => + val sparkConf = new SparkConf() val source = new ExecutorPodsPollingSnapshotSource( sparkConf.set(KUBERNETES_EXECUTOR_API_POLLING_WITH_RESOURCE_VERSION, value), kubernetesClient, eventQueue, pollingExecutor) + source.start(TEST_SPARK_APP_ID) pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS) if (value) { verify(activeExecutorPods).list(new ListOptionsBuilder().withResourceVersion("0").build()) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala index cddb5f6da44f4..8209bee7a02b7 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala @@ -20,10 +20,12 @@ import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} -import org.mockito.Mockito.{verify, when} +import org.mockito.Mockito.{never, verify, when} import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkConf import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Fabric8Aliases._ import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._ @@ -61,12 +63,13 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) .thenReturn(executorRoleLabeledPods) when(executorRoleLabeledPods.watch(watch.capture())).thenReturn(watchConnection) - watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource( - eventQueue, kubernetesClient) - watchSourceUnderTest.start(TEST_SPARK_APP_ID) } test("Watch events should be pushed to the snapshots store as snapshot updates.") { + val conf = new SparkConf() + watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource( + eventQueue, kubernetesClient, conf) + watchSourceUnderTest.start(TEST_SPARK_APP_ID) val exec1 = runningExecutor(1) val exec2 = runningExecutor(2) watch.getValue.eventReceived(Action.ADDED, exec1) @@ -74,4 +77,13 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA verify(eventQueue).updatePod(exec1) verify(eventQueue).updatePod(exec2) } + + test("SPARK-36462: Verify if watchers are disabled we don't call pods() on the client") { + val conf = new SparkConf() + conf.set(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER, false) + watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource( + eventQueue, kubernetesClient, conf) + watchSourceUnderTest.start(TEST_SPARK_APP_ID) + verify(kubernetesClient, never()).pods() + } }