Skip to content

Commit

Permalink
[SPARK-35125][K8S] Upgrade K8s client to 5.3.0 to support K8s 1.20
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Although AS-IS master branch already works with K8s 1.20, this PR aims to upgrade K8s client to 5.3.0 to support K8s 1.20 officially.
- https://github.com/fabric8io/kubernetes-client#compatibility-matrix

The following are the notable breaking API changes.

1. Remove Doneable (5.0+):
    - fabric8io/kubernetes-client#2571
2. Change Watcher.onClose signature (5.0+):
    - fabric8io/kubernetes-client#2616
3. Change Readiness (5.1+)
    - fabric8io/kubernetes-client#2796

### Why are the changes needed?

According to the compatibility matrix, this makes Apache Spark and its external cluster manager extension support all K8s 1.20 features officially for Apache Spark 3.2.0.

### Does this PR introduce _any_ user-facing change?

Yes, this is a dev dependency change which affects K8s cluster extension users.

### How was this patch tested?

Pass the CIs.

This is manually tested with K8s IT.
```
KubernetesSuite:
- Run SparkPi with no resources
- Run SparkPi with a very long application name.
- Use SparkLauncher.NO_RESOURCE
- Run SparkPi with a master URL without a scheme.
- Run SparkPi with an argument.
- Run SparkPi with custom labels, annotations, and environment variables.
- All pods have the same service account by default
- Run extraJVMOptions check on driver
- Run SparkRemoteFileTest using a remote data file
- Verify logging configuration is picked from the provided SPARK_CONF_DIR/log4j.properties
- Run SparkPi with env and mount secrets.
- Run PySpark on simple pi.py example
- Run PySpark to test a pyfiles example
- Run PySpark with memory customization
- Run in client mode.
- Start pod creation from template
- PVs with local storage
- Launcher client dependencies
- SPARK-33615: Launcher client archives
- SPARK-33748: Launcher python client respecting PYSPARK_PYTHON
- SPARK-33748: Launcher python client respecting spark.pyspark.python and spark.pyspark.driver.python
- Launcher python client dependencies using a zip file
- Test basic decommissioning
- Test basic decommissioning with shuffle cleanup
- Test decommissioning with dynamic allocation & shuffle cleanups
- Test decommissioning timeouts
- Run SparkR on simple dataframe.R example
Run completed in 17 minutes, 44 seconds.
Total number of tests run: 27
Suites: completed 2, aborted 0
Tests: succeeded 27, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

Closes #32221 from dongjoon-hyun/SPARK-K8S-530.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dongjoon-hyun committed Apr 19, 2021
1 parent 7f34035 commit 425dc58
Show file tree
Hide file tree
Showing 17 changed files with 94 additions and 89 deletions.
43 changes: 20 additions & 23 deletions dev/deps/spark-deps-hadoop-2.7-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,8 @@ jackson-dataformat-yaml/2.12.2//jackson-dataformat-yaml-2.12.2.jar
jackson-datatype-jsr310/2.11.2//jackson-datatype-jsr310-2.11.2.jar
jackson-jaxrs/1.9.13//jackson-jaxrs-1.9.13.jar
jackson-mapper-asl/1.9.13//jackson-mapper-asl-1.9.13.jar
jackson-module-jaxb-annotations/2.12.2//jackson-module-jaxb-annotations-2.12.2.jar
jackson-module-scala_2.12/2.12.2//jackson-module-scala_2.12-2.12.2.jar
jackson-xc/1.9.13//jackson-xc-1.9.13.jar
jakarta.activation-api/1.2.1//jakarta.activation-api-1.2.1.jar
jakarta.annotation-api/1.3.5//jakarta.annotation-api-1.3.5.jar
jakarta.inject/2.6.1//jakarta.inject-2.6.1.jar
jakarta.servlet-api/4.0.3//jakarta.servlet-api-4.0.3.jar
Expand Down Expand Up @@ -155,27 +153,26 @@ jsr305/3.0.0//jsr305-3.0.0.jar
jta/1.1//jta-1.1.jar
jul-to-slf4j/1.7.30//jul-to-slf4j-1.7.30.jar
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
kubernetes-client/4.13.2//kubernetes-client-4.13.2.jar
kubernetes-model-admissionregistration/4.13.2//kubernetes-model-admissionregistration-4.13.2.jar
kubernetes-model-apiextensions/4.13.2//kubernetes-model-apiextensions-4.13.2.jar
kubernetes-model-apps/4.13.2//kubernetes-model-apps-4.13.2.jar
kubernetes-model-autoscaling/4.13.2//kubernetes-model-autoscaling-4.13.2.jar
kubernetes-model-batch/4.13.2//kubernetes-model-batch-4.13.2.jar
kubernetes-model-certificates/4.13.2//kubernetes-model-certificates-4.13.2.jar
kubernetes-model-common/4.13.2//kubernetes-model-common-4.13.2.jar
kubernetes-model-coordination/4.13.2//kubernetes-model-coordination-4.13.2.jar
kubernetes-model-core/4.13.2//kubernetes-model-core-4.13.2.jar
kubernetes-model-discovery/4.13.2//kubernetes-model-discovery-4.13.2.jar
kubernetes-model-events/4.13.2//kubernetes-model-events-4.13.2.jar
kubernetes-model-extensions/4.13.2//kubernetes-model-extensions-4.13.2.jar
kubernetes-model-metrics/4.13.2//kubernetes-model-metrics-4.13.2.jar
kubernetes-model-networking/4.13.2//kubernetes-model-networking-4.13.2.jar
kubernetes-model-node/4.13.2//kubernetes-model-node-4.13.2.jar
kubernetes-model-policy/4.13.2//kubernetes-model-policy-4.13.2.jar
kubernetes-model-rbac/4.13.2//kubernetes-model-rbac-4.13.2.jar
kubernetes-model-scheduling/4.13.2//kubernetes-model-scheduling-4.13.2.jar
kubernetes-model-settings/4.13.2//kubernetes-model-settings-4.13.2.jar
kubernetes-model-storageclass/4.13.2//kubernetes-model-storageclass-4.13.2.jar
kubernetes-client/5.3.0//kubernetes-client-5.3.0.jar
kubernetes-model-admissionregistration/5.3.0//kubernetes-model-admissionregistration-5.3.0.jar
kubernetes-model-apiextensions/5.3.0//kubernetes-model-apiextensions-5.3.0.jar
kubernetes-model-apps/5.3.0//kubernetes-model-apps-5.3.0.jar
kubernetes-model-autoscaling/5.3.0//kubernetes-model-autoscaling-5.3.0.jar
kubernetes-model-batch/5.3.0//kubernetes-model-batch-5.3.0.jar
kubernetes-model-certificates/5.3.0//kubernetes-model-certificates-5.3.0.jar
kubernetes-model-common/5.3.0//kubernetes-model-common-5.3.0.jar
kubernetes-model-coordination/5.3.0//kubernetes-model-coordination-5.3.0.jar
kubernetes-model-core/5.3.0//kubernetes-model-core-5.3.0.jar
kubernetes-model-discovery/5.3.0//kubernetes-model-discovery-5.3.0.jar
kubernetes-model-events/5.3.0//kubernetes-model-events-5.3.0.jar
kubernetes-model-extensions/5.3.0//kubernetes-model-extensions-5.3.0.jar
kubernetes-model-metrics/5.3.0//kubernetes-model-metrics-5.3.0.jar
kubernetes-model-networking/5.3.0//kubernetes-model-networking-5.3.0.jar
kubernetes-model-node/5.3.0//kubernetes-model-node-5.3.0.jar
kubernetes-model-policy/5.3.0//kubernetes-model-policy-5.3.0.jar
kubernetes-model-rbac/5.3.0//kubernetes-model-rbac-5.3.0.jar
kubernetes-model-scheduling/5.3.0//kubernetes-model-scheduling-5.3.0.jar
kubernetes-model-storageclass/5.3.0//kubernetes-model-storageclass-5.3.0.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
libfb303/0.9.3//libfb303-0.9.3.jar
libthrift/0.12.0//libthrift-0.12.0.jar
Expand Down
43 changes: 20 additions & 23 deletions dev/deps/spark-deps-hadoop-3.2-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ jackson-databind/2.12.2//jackson-databind-2.12.2.jar
jackson-dataformat-yaml/2.12.2//jackson-dataformat-yaml-2.12.2.jar
jackson-datatype-jsr310/2.11.2//jackson-datatype-jsr310-2.11.2.jar
jackson-mapper-asl/1.9.13//jackson-mapper-asl-1.9.13.jar
jackson-module-jaxb-annotations/2.12.2//jackson-module-jaxb-annotations-2.12.2.jar
jackson-module-scala_2.12/2.12.2//jackson-module-scala_2.12-2.12.2.jar
jakarta.activation-api/1.2.1//jakarta.activation-api-1.2.1.jar
jakarta.annotation-api/1.3.5//jakarta.annotation-api-1.3.5.jar
jakarta.inject/2.6.1//jakarta.inject-2.6.1.jar
jakarta.servlet-api/4.0.3//jakarta.servlet-api-4.0.3.jar
Expand Down Expand Up @@ -126,27 +124,26 @@ jsr305/3.0.0//jsr305-3.0.0.jar
jta/1.1//jta-1.1.jar
jul-to-slf4j/1.7.30//jul-to-slf4j-1.7.30.jar
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
kubernetes-client/4.13.2//kubernetes-client-4.13.2.jar
kubernetes-model-admissionregistration/4.13.2//kubernetes-model-admissionregistration-4.13.2.jar
kubernetes-model-apiextensions/4.13.2//kubernetes-model-apiextensions-4.13.2.jar
kubernetes-model-apps/4.13.2//kubernetes-model-apps-4.13.2.jar
kubernetes-model-autoscaling/4.13.2//kubernetes-model-autoscaling-4.13.2.jar
kubernetes-model-batch/4.13.2//kubernetes-model-batch-4.13.2.jar
kubernetes-model-certificates/4.13.2//kubernetes-model-certificates-4.13.2.jar
kubernetes-model-common/4.13.2//kubernetes-model-common-4.13.2.jar
kubernetes-model-coordination/4.13.2//kubernetes-model-coordination-4.13.2.jar
kubernetes-model-core/4.13.2//kubernetes-model-core-4.13.2.jar
kubernetes-model-discovery/4.13.2//kubernetes-model-discovery-4.13.2.jar
kubernetes-model-events/4.13.2//kubernetes-model-events-4.13.2.jar
kubernetes-model-extensions/4.13.2//kubernetes-model-extensions-4.13.2.jar
kubernetes-model-metrics/4.13.2//kubernetes-model-metrics-4.13.2.jar
kubernetes-model-networking/4.13.2//kubernetes-model-networking-4.13.2.jar
kubernetes-model-node/4.13.2//kubernetes-model-node-4.13.2.jar
kubernetes-model-policy/4.13.2//kubernetes-model-policy-4.13.2.jar
kubernetes-model-rbac/4.13.2//kubernetes-model-rbac-4.13.2.jar
kubernetes-model-scheduling/4.13.2//kubernetes-model-scheduling-4.13.2.jar
kubernetes-model-settings/4.13.2//kubernetes-model-settings-4.13.2.jar
kubernetes-model-storageclass/4.13.2//kubernetes-model-storageclass-4.13.2.jar
kubernetes-client/5.3.0//kubernetes-client-5.3.0.jar
kubernetes-model-admissionregistration/5.3.0//kubernetes-model-admissionregistration-5.3.0.jar
kubernetes-model-apiextensions/5.3.0//kubernetes-model-apiextensions-5.3.0.jar
kubernetes-model-apps/5.3.0//kubernetes-model-apps-5.3.0.jar
kubernetes-model-autoscaling/5.3.0//kubernetes-model-autoscaling-5.3.0.jar
kubernetes-model-batch/5.3.0//kubernetes-model-batch-5.3.0.jar
kubernetes-model-certificates/5.3.0//kubernetes-model-certificates-5.3.0.jar
kubernetes-model-common/5.3.0//kubernetes-model-common-5.3.0.jar
kubernetes-model-coordination/5.3.0//kubernetes-model-coordination-5.3.0.jar
kubernetes-model-core/5.3.0//kubernetes-model-core-5.3.0.jar
kubernetes-model-discovery/5.3.0//kubernetes-model-discovery-5.3.0.jar
kubernetes-model-events/5.3.0//kubernetes-model-events-5.3.0.jar
kubernetes-model-extensions/5.3.0//kubernetes-model-extensions-5.3.0.jar
kubernetes-model-metrics/5.3.0//kubernetes-model-metrics-5.3.0.jar
kubernetes-model-networking/5.3.0//kubernetes-model-networking-5.3.0.jar
kubernetes-model-node/5.3.0//kubernetes-model-node-5.3.0.jar
kubernetes-model-policy/5.3.0//kubernetes-model-policy-5.3.0.jar
kubernetes-model-rbac/5.3.0//kubernetes-model-rbac-5.3.0.jar
kubernetes-model-scheduling/5.3.0//kubernetes-model-scheduling-5.3.0.jar
kubernetes-model-storageclass/5.3.0//kubernetes-model-storageclass-5.3.0.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
libfb303/0.9.3//libfb303-0.9.3.jar
libthrift/0.12.0//libthrift-0.12.0.jar
Expand Down
2 changes: 1 addition & 1 deletion resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<properties>
<sbt.project.name>kubernetes</sbt.project.name>
<!-- Note: Please update the kubernetes client version in kubernetes/integration-tests/pom.xml -->
<kubernetes.client.version>4.13.2</kubernetes.client.version>
<kubernetes.client.version>5.3.0</kubernetes.client.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.deploy.k8s.submit
import scala.collection.JavaConverters._

import K8SSparkSubmitOperation.getGracePeriod
import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodList}
import io.fabric8.kubernetes.api.model.{Pod, PodList}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.{NonNamespaceOperation, PodResource}

Expand All @@ -33,7 +33,7 @@ import org.apache.spark.util.{CommandLineLoggingUtils, Utils}

private sealed trait K8sSubmitOp extends CommandLineLoggingUtils {
type NON_NAMESPACED_PODS =
NonNamespaceOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
NonNamespaceOperation[Pod, PodList, PodResource[Pod]]
def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit
def executeOnGlob(pods: List[Pod], ns: Option[String], sparkConf: SparkConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
package org.apache.spark.deploy.k8s.submit

import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.{Watcher, WatcherException}
import io.fabric8.kubernetes.client.Watcher.Action
import java.net.HttpURLConnection.HTTP_GONE

import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.KubernetesDriverConf
Expand Down Expand Up @@ -68,16 +67,21 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
}
}

override def onClose(e: KubernetesClientException): Unit = {
override def onClose(e: WatcherException): Unit = {
logDebug(s"Stopping watching application $appId with last-observed phase $phase")
if(e != null && e.getCode == HTTP_GONE) {
if(e != null && e.isHttpGone) {
resourceTooOldReceived = true
logDebug(s"Got HTTP Gone code, resource version changed in k8s api: $e")
} else {
closeWatch()
}
}

override def onClose(): Unit = {
logDebug(s"Stopping watching application $appId with last-observed phase $phase")
closeWatch()
}

private def logLongStatus(): Unit = {
logInfo("State changed, new state: " + pod.map(formatPodState).getOrElse("unknown"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster.k8s
import java.io.Closeable

import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.{KubernetesClient, Watcher, WatcherException}
import io.fabric8.kubernetes.client.Watcher.Action

import org.apache.spark.deploy.k8s.Constants._
Expand Down Expand Up @@ -58,10 +58,14 @@ private[spark] class ExecutorPodsWatchSnapshotSource(
snapshotsStore.updatePod(pod)
}

override def onClose(e: KubernetesClientException): Unit = {
override def onClose(e: WatcherException): Unit = {
logWarning("Kubernetes client has been closed (this is expected if the application is" +
" shutting down.)", e)
}

override def onClose(): Unit = {
logWarning("Kubernetes client has been closed.")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,16 @@
*/
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapList, DoneableConfigMap, DoneablePod, HasMetadata, Pod, PodList}
import io.fabric8.kubernetes.client.Watch
import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapList, HasMetadata, Pod, PodList}
import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource, Resource}

object Fabric8Aliases {
type PODS = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
type PODS = MixedOperation[Pod, PodList, PodResource[Pod]]
type CONFIG_MAPS = MixedOperation[
ConfigMap, ConfigMapList, DoneableConfigMap, Resource[ConfigMap, DoneableConfigMap]]
type LABELED_PODS = FilterWatchListDeletable[
Pod, PodList, java.lang.Boolean, Watch]
type LABELED_CONFIG_MAPS = FilterWatchListDeletable[
ConfigMap, ConfigMapList, java.lang.Boolean, Watch]
type SINGLE_POD = PodResource[Pod, DoneablePod]
ConfigMap, ConfigMapList, Resource[ConfigMap]]
type LABELED_PODS = FilterWatchListDeletable[Pod, PodList]
type LABELED_CONFIG_MAPS = FilterWatchListDeletable[ConfigMap, ConfigMapList]
type SINGLE_POD = PodResource[Pod]
type RESOURCE_LIST = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[
HasMetadata, Boolean]
HasMetadata]
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ abstract class PodBuilderSuite extends SparkFunSuite {
private def mockKubernetesClient(pod: Pod = podWithSupportedFeatures()): KubernetesClient = {
val kubernetesClient = mock(classOf[KubernetesClient])
val pods =
mock(classOf[MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]])
val podResource = mock(classOf[PodResource[Pod, DoneablePod]])
mock(classOf[MixedOperation[Pod, PodList, PodResource[Pod]]])
val podResource = mock(classOf[PodResource[Pod]])
when(kubernetesClient.pods()).thenReturn(pods)
when(pods.load(any(classOf[File]))).thenReturn(podResource)
when(podResource.get()).thenReturn(pod)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
private var podOperations: PODS = _

@Mock
private var namedPods: PodResource[Pod, DoneablePod] = _
private var namedPods: PodResource[Pod] = _

@Mock
private var loggingPodStatusWatcher: LoggingPodStatusWatcher = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ class K8sSubmitOpSuite extends SparkFunSuite with BeforeAndAfter {
private var podOperations: PODS = _

@Mock
private var driverPodOperations1: PodResource[Pod, DoneablePod] = _
private var driverPodOperations1: PodResource[Pod] = _

@Mock
private var driverPodOperations2: PodResource[Pod, DoneablePod] = _
private var driverPodOperations2: PodResource[Pod] = _

@Mock
private var kubernetesClient: KubernetesClient = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s

import java.time.Instant

import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder}
import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.PodResource
import org.mockito.{Mock, MockitoAnnotations}
Expand Down Expand Up @@ -76,7 +76,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
private var labeledPods: LABELED_PODS = _

@Mock
private var driverPodOperations: PodResource[Pod, DoneablePod] = _
private var driverPodOperations: PodResource[Pod] = _

@Mock
private var executorBuilder: KubernetesExecutorBuilder = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.scheduler.cluster.k8s

import io.fabric8.kubernetes.api.model.{DoneablePod, Pod}
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.PodResource
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
Expand All @@ -37,7 +37,7 @@ import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._

class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfter {

private var namedExecutorPods: mutable.Map[String, PodResource[Pod, DoneablePod]] = _
private var namedExecutorPods: mutable.Map[String, PodResource[Pod]] = _

@Mock
private var kubernetesClient: KubernetesClient = _
Expand All @@ -54,7 +54,7 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
before {
MockitoAnnotations.openMocks(this).close()
snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
namedExecutorPods = mutable.Map.empty[String, PodResource[Pod, DoneablePod]]
namedExecutorPods = mutable.Map.empty[String, PodResource[Pod]]
when(schedulerBackend.getExecutorsWithRegistrationTs()).thenReturn(Map.empty[String, Long])
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withName(any(classOf[String]))).thenAnswer(namedPodsAnswer())
Expand Down Expand Up @@ -139,10 +139,10 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
""".stripMargin
}

private def namedPodsAnswer(): Answer[PodResource[Pod, DoneablePod]] =
private def namedPodsAnswer(): Answer[PodResource[Pod]] =
(invocation: InvocationOnMock) => {
val podName: String = invocation.getArgument(0)
namedExecutorPods.getOrElseUpdate(
podName, mock(classOf[PodResource[Pod, DoneablePod]]))
podName, mock(classOf[PodResource[Pod]]))
}
}
2 changes: 1 addition & 1 deletion resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<properties>
<download-maven-plugin.version>1.3.0</download-maven-plugin.version>
<extraScalaTestArgs></extraScalaTestArgs>
<kubernetes-client.version>4.13.2</kubernetes-client.version>
<kubernetes-client.version>5.3.0</kubernetes-client.version>
<sbt.project.name>kubernetes-integration-tests</sbt.project.name>

<!-- Integration Test Configuration Properties -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.deploy.k8s.integrationtest

import io.fabric8.kubernetes.api.model.{PodBuilder, ServiceBuilder}
import org.scalatest.concurrent.Eventually
import scala.collection.JavaConverters._

Expand All @@ -32,7 +33,7 @@ private[spark] trait ClientModeTestsSuite { k8sSuite: KubernetesSuite =>
.getKubernetesClient
.services()
.inNamespace(kubernetesTestComponents.namespace)
.createNew()
.create(new ServiceBuilder()
.withNewMetadata()
.withName(s"$driverPodName-svc")
.endMetadata()
Expand All @@ -50,13 +51,13 @@ private[spark] trait ClientModeTestsSuite { k8sSuite: KubernetesSuite =>
.withNewTargetPort(blockManagerPort)
.endPort()
.endSpec()
.done()
.build())
try {
val driverPod = testBackend
.getKubernetesClient
.pods()
.inNamespace(kubernetesTestComponents.namespace)
.createNew()
.create(new PodBuilder()
.withNewMetadata()
.withName(driverPodName)
.withLabels(labels.asJava)
Expand Down Expand Up @@ -93,7 +94,7 @@ private[spark] trait ClientModeTestsSuite { k8sSuite: KubernetesSuite =>
.addToArgs("10")
.endContainer()
.endSpec()
.done()
.build())
Eventually.eventually(TIMEOUT, INTERVAL) {
assert(kubernetesTestComponents.kubernetesClient
.pods()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import com.google.common.base.Charsets
import com.google.common.io.Files
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.{Watcher, WatcherException}
import io.fabric8.kubernetes.client.Watcher.Action
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Tag}
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
Expand Down Expand Up @@ -343,7 +343,8 @@ class KubernetesSuite extends SparkFunSuite
.withLabel("spark-role", "executor")
.watch(new Watcher[Pod] {
logDebug("Beginning watch of executors")
override def onClose(cause: KubernetesClientException): Unit =
override def onClose(): Unit = logInfo("Ending watch of executors")
override def onClose(cause: WatcherException): Unit =
logInfo("Ending watch of executors")
override def eventReceived(action: Watcher.Action, resource: Pod): Unit = {
val name = resource.getMetadata.getName
Expand Down
Loading

0 comments on commit 425dc58

Please sign in to comment.