Skip to content

Commit

Permalink
[SPARK-40458][K8S] Bump Kubernetes Client Version to 6.1.1
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Bump kubernetes-client version from 5.12.3 to 6.1.1 and clean up all the deprecations.

### Why are the changes needed?

To keep up with kubernetes-client [changes](fabric8io/kubernetes-client@v5.12.3...v6.1.1).
As this is an upgrade where the main version changed I have cleaned up all the deprecations.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

#### Unit tests

#### Manual tests for submit and application management

Started an application in a non-default namespace (`bla`):

```
➜  spark git:(SPARK-40458) ✗ ./bin/spark-submit \
    --master k8s://http://127.0.0.1:8001 \
    --deploy-mode cluster \
    --name spark-pi \
    --class org.apache.spark.examples.SparkPi \
    --conf spark.executor.instances=5 \
    --conf spark.kubernetes.namespace=bla \
    --conf spark.kubernetes.container.image=docker.io/kubespark/spark:3.4.0-SNAPSHOT_064A99CC-57AF-46D5-B743-5B12692C260D \
    local:///opt/spark/examples/jars/spark-examples_2.12-3.4.0-SNAPSHOT.jar 200000
```

Check that we cannot find it in the default namespace even with glob without the namespace definition:

```
➜  spark git:(SPARK-40458) ✗ minikube kubectl -- config set-context --current --namespace=default
Context "minikube" modified.
➜  spark git:(SPARK-40458) ✗ ./bin/spark-submit --status "spark-pi-*" --master k8s://http://127.0.0.1:8001
Submitting a request for the status of submission spark-pi-* in k8s://http://127.0.0.1:8001.
No applications found.
```

Then check we can find it by specifying the namespace:
```
➜  spark git:(SPARK-40458) ✗ ./bin/spark-submit --status "bla:spark-pi-*" --master k8s://http://127.0.0.1:8001
Submitting a request for the status of submission bla:spark-pi-* in k8s://http://127.0.0.1:8001.
Application status (driver):
         pod name: spark-pi-4c4e70837c86ae1a-driver
         namespace: bla
         labels: spark-app-name -> spark-pi, spark-app-selector -> spark-c95a9a0888214c01a286eb7ba23980a0, spark-role -> driver, spark-version -> 3.4.0-SNAPSHOT
         pod uid: 0be8952e-3e00-47a3-9082-9cb45278ed6d
         creation time: 2022-09-27T01:19:06Z
         service account name: default
         volumes: spark-local-dir-1, spark-conf-volume-driver, kube-api-access-wxnqw
         node name: minikube
         start time: 2022-09-27T01:19:06Z
         phase: Running
         container status:
                 container name: spark-kubernetes-driver
                 container image: kubespark/spark:3.4.0-SNAPSHOT_064A99CC-57AF-46D5-B743-5B12692C260D
                 container state: running
                 container started at: 2022-09-27T01:19:07Z
```

Changing the namespace to `bla` with `kubectl`:

```
➜  spark git:(SPARK-40458) ✗  minikube kubectl -- config set-context --current --namespace=bla
Context "minikube" modified.
```

Checking we can find it without specifying the namespace (and glob):
```
➜  spark git:(SPARK-40458) ✗  ./bin/spark-submit --status "spark-pi-*" --master k8s://http://127.0.0.1:8001
Submitting a request for the status of submission spark-pi-* in k8s://http://127.0.0.1:8001.
Application status (driver):
         pod name: spark-pi-4c4e70837c86ae1a-driver
         namespace: bla
         labels: spark-app-name -> spark-pi, spark-app-selector -> spark-c95a9a0888214c01a286eb7ba23980a0, spark-role -> driver, spark-version -> 3.4.0-SNAPSHOT
         pod uid: 0be8952e-3e00-47a3-9082-9cb45278ed6d
         creation time: 2022-09-27T01:19:06Z
         service account name: default
         volumes: spark-local-dir-1, spark-conf-volume-driver, kube-api-access-wxnqw
         node name: minikube
         start time: 2022-09-27T01:19:06Z
         phase: Running
         container status:
                 container name: spark-kubernetes-driver
                 container image: kubespark/spark:3.4.0-SNAPSHOT_064A99CC-57AF-46D5-B743-5B12692C260D
                 container state: running
                 container started at: 2022-09-27T01:19:07Z
```

Killing the app:
```
➜  spark git:(SPARK-40458) ✗  ./bin/spark-submit --kill "spark-pi-*" --master k8s://http://127.0.0.1:8001
Submitting a request to kill submission spark-pi-* in k8s://http://127.0.0.1:8001. Grace period in secs: not set.
Deleting driver pod: spark-pi-4c4e70837c86ae1a-driver.
```

Closes apache#37990 from attilapiros/SPARK-40458.

Authored-by: attilapiros <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
attilapiros authored and dongjoon-hyun committed Sep 29, 2022
1 parent 03ef022 commit fa88651
Show file tree
Hide file tree
Showing 23 changed files with 455 additions and 349 deletions.
47 changes: 24 additions & 23 deletions dev/deps/spark-deps-hadoop-2-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ arrow-memory-core/9.0.0//arrow-memory-core-9.0.0.jar
arrow-memory-netty/9.0.0//arrow-memory-netty-9.0.0.jar
arrow-vector/9.0.0//arrow-vector-9.0.0.jar
audience-annotations/0.5.0//audience-annotations-0.5.0.jar
automaton/1.11-8//automaton-1.11-8.jar
avro-ipc/1.11.1//avro-ipc-1.11.1.jar
avro-mapred/1.11.1//avro-mapred-1.11.1.jar
avro/1.11.1//avro-1.11.1.jar
Expand Down Expand Up @@ -69,7 +68,6 @@ error_prone_annotations/2.10.0//error_prone_annotations-2.10.0.jar
failureaccess/1.0.1//failureaccess-1.0.1.jar
flatbuffers-java/1.12.0//flatbuffers-java-1.12.0.jar
gcs-connector/hadoop2-2.2.7/shaded/gcs-connector-hadoop2-2.2.7-shaded.jar
generex/1.0.2//generex-1.0.2.jar
gmetric4j/1.0.10//gmetric4j-1.0.10.jar
grpc-api/1.47.0//grpc-api-1.47.0.jar
grpc-context/1.47.0//grpc-context-1.47.0.jar
Expand Down Expand Up @@ -175,27 +173,30 @@ jsr305/3.0.0//jsr305-3.0.0.jar
jta/1.1//jta-1.1.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
kubernetes-client/5.12.3//kubernetes-client-5.12.3.jar
kubernetes-model-admissionregistration/5.12.3//kubernetes-model-admissionregistration-5.12.3.jar
kubernetes-model-apiextensions/5.12.3//kubernetes-model-apiextensions-5.12.3.jar
kubernetes-model-apps/5.12.3//kubernetes-model-apps-5.12.3.jar
kubernetes-model-autoscaling/5.12.3//kubernetes-model-autoscaling-5.12.3.jar
kubernetes-model-batch/5.12.3//kubernetes-model-batch-5.12.3.jar
kubernetes-model-certificates/5.12.3//kubernetes-model-certificates-5.12.3.jar
kubernetes-model-common/5.12.3//kubernetes-model-common-5.12.3.jar
kubernetes-model-coordination/5.12.3//kubernetes-model-coordination-5.12.3.jar
kubernetes-model-core/5.12.3//kubernetes-model-core-5.12.3.jar
kubernetes-model-discovery/5.12.3//kubernetes-model-discovery-5.12.3.jar
kubernetes-model-events/5.12.3//kubernetes-model-events-5.12.3.jar
kubernetes-model-extensions/5.12.3//kubernetes-model-extensions-5.12.3.jar
kubernetes-model-flowcontrol/5.12.3//kubernetes-model-flowcontrol-5.12.3.jar
kubernetes-model-metrics/5.12.3//kubernetes-model-metrics-5.12.3.jar
kubernetes-model-networking/5.12.3//kubernetes-model-networking-5.12.3.jar
kubernetes-model-node/5.12.3//kubernetes-model-node-5.12.3.jar
kubernetes-model-policy/5.12.3//kubernetes-model-policy-5.12.3.jar
kubernetes-model-rbac/5.12.3//kubernetes-model-rbac-5.12.3.jar
kubernetes-model-scheduling/5.12.3//kubernetes-model-scheduling-5.12.3.jar
kubernetes-model-storageclass/5.12.3//kubernetes-model-storageclass-5.12.3.jar
kubernetes-client-api/6.1.1//kubernetes-client-api-6.1.1.jar
kubernetes-client/6.1.1//kubernetes-client-6.1.1.jar
kubernetes-httpclient-okhttp/6.1.1//kubernetes-httpclient-okhttp-6.1.1.jar
kubernetes-model-admissionregistration/6.1.1//kubernetes-model-admissionregistration-6.1.1.jar
kubernetes-model-apiextensions/6.1.1//kubernetes-model-apiextensions-6.1.1.jar
kubernetes-model-apps/6.1.1//kubernetes-model-apps-6.1.1.jar
kubernetes-model-autoscaling/6.1.1//kubernetes-model-autoscaling-6.1.1.jar
kubernetes-model-batch/6.1.1//kubernetes-model-batch-6.1.1.jar
kubernetes-model-certificates/6.1.1//kubernetes-model-certificates-6.1.1.jar
kubernetes-model-common/6.1.1//kubernetes-model-common-6.1.1.jar
kubernetes-model-coordination/6.1.1//kubernetes-model-coordination-6.1.1.jar
kubernetes-model-core/6.1.1//kubernetes-model-core-6.1.1.jar
kubernetes-model-discovery/6.1.1//kubernetes-model-discovery-6.1.1.jar
kubernetes-model-events/6.1.1//kubernetes-model-events-6.1.1.jar
kubernetes-model-extensions/6.1.1//kubernetes-model-extensions-6.1.1.jar
kubernetes-model-flowcontrol/6.1.1//kubernetes-model-flowcontrol-6.1.1.jar
kubernetes-model-gatewayapi/6.1.1//kubernetes-model-gatewayapi-6.1.1.jar
kubernetes-model-metrics/6.1.1//kubernetes-model-metrics-6.1.1.jar
kubernetes-model-networking/6.1.1//kubernetes-model-networking-6.1.1.jar
kubernetes-model-node/6.1.1//kubernetes-model-node-6.1.1.jar
kubernetes-model-policy/6.1.1//kubernetes-model-policy-6.1.1.jar
kubernetes-model-rbac/6.1.1//kubernetes-model-rbac-6.1.1.jar
kubernetes-model-scheduling/6.1.1//kubernetes-model-scheduling-6.1.1.jar
kubernetes-model-storageclass/6.1.1//kubernetes-model-storageclass-6.1.1.jar
lapack/3.0.2//lapack-3.0.2.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
libfb303/0.9.3//libfb303-0.9.3.jar
Expand Down
47 changes: 24 additions & 23 deletions dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ arrow-memory-core/9.0.0//arrow-memory-core-9.0.0.jar
arrow-memory-netty/9.0.0//arrow-memory-netty-9.0.0.jar
arrow-vector/9.0.0//arrow-vector-9.0.0.jar
audience-annotations/0.5.0//audience-annotations-0.5.0.jar
automaton/1.11-8//automaton-1.11-8.jar
avro-ipc/1.11.1//avro-ipc-1.11.1.jar
avro-mapred/1.11.1//avro-mapred-1.11.1.jar
avro/1.11.1//avro-1.11.1.jar
Expand Down Expand Up @@ -66,7 +65,6 @@ error_prone_annotations/2.10.0//error_prone_annotations-2.10.0.jar
failureaccess/1.0.1//failureaccess-1.0.1.jar
flatbuffers-java/1.12.0//flatbuffers-java-1.12.0.jar
gcs-connector/hadoop3-2.2.7/shaded/gcs-connector-hadoop3-2.2.7-shaded.jar
generex/1.0.2//generex-1.0.2.jar
gmetric4j/1.0.10//gmetric4j-1.0.10.jar
grpc-api/1.47.0//grpc-api-1.47.0.jar
grpc-context/1.47.0//grpc-context-1.47.0.jar
Expand Down Expand Up @@ -159,27 +157,30 @@ jsr305/3.0.0//jsr305-3.0.0.jar
jta/1.1//jta-1.1.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
kubernetes-client/5.12.3//kubernetes-client-5.12.3.jar
kubernetes-model-admissionregistration/5.12.3//kubernetes-model-admissionregistration-5.12.3.jar
kubernetes-model-apiextensions/5.12.3//kubernetes-model-apiextensions-5.12.3.jar
kubernetes-model-apps/5.12.3//kubernetes-model-apps-5.12.3.jar
kubernetes-model-autoscaling/5.12.3//kubernetes-model-autoscaling-5.12.3.jar
kubernetes-model-batch/5.12.3//kubernetes-model-batch-5.12.3.jar
kubernetes-model-certificates/5.12.3//kubernetes-model-certificates-5.12.3.jar
kubernetes-model-common/5.12.3//kubernetes-model-common-5.12.3.jar
kubernetes-model-coordination/5.12.3//kubernetes-model-coordination-5.12.3.jar
kubernetes-model-core/5.12.3//kubernetes-model-core-5.12.3.jar
kubernetes-model-discovery/5.12.3//kubernetes-model-discovery-5.12.3.jar
kubernetes-model-events/5.12.3//kubernetes-model-events-5.12.3.jar
kubernetes-model-extensions/5.12.3//kubernetes-model-extensions-5.12.3.jar
kubernetes-model-flowcontrol/5.12.3//kubernetes-model-flowcontrol-5.12.3.jar
kubernetes-model-metrics/5.12.3//kubernetes-model-metrics-5.12.3.jar
kubernetes-model-networking/5.12.3//kubernetes-model-networking-5.12.3.jar
kubernetes-model-node/5.12.3//kubernetes-model-node-5.12.3.jar
kubernetes-model-policy/5.12.3//kubernetes-model-policy-5.12.3.jar
kubernetes-model-rbac/5.12.3//kubernetes-model-rbac-5.12.3.jar
kubernetes-model-scheduling/5.12.3//kubernetes-model-scheduling-5.12.3.jar
kubernetes-model-storageclass/5.12.3//kubernetes-model-storageclass-5.12.3.jar
kubernetes-client-api/6.1.1//kubernetes-client-api-6.1.1.jar
kubernetes-client/6.1.1//kubernetes-client-6.1.1.jar
kubernetes-httpclient-okhttp/6.1.1//kubernetes-httpclient-okhttp-6.1.1.jar
kubernetes-model-admissionregistration/6.1.1//kubernetes-model-admissionregistration-6.1.1.jar
kubernetes-model-apiextensions/6.1.1//kubernetes-model-apiextensions-6.1.1.jar
kubernetes-model-apps/6.1.1//kubernetes-model-apps-6.1.1.jar
kubernetes-model-autoscaling/6.1.1//kubernetes-model-autoscaling-6.1.1.jar
kubernetes-model-batch/6.1.1//kubernetes-model-batch-6.1.1.jar
kubernetes-model-certificates/6.1.1//kubernetes-model-certificates-6.1.1.jar
kubernetes-model-common/6.1.1//kubernetes-model-common-6.1.1.jar
kubernetes-model-coordination/6.1.1//kubernetes-model-coordination-6.1.1.jar
kubernetes-model-core/6.1.1//kubernetes-model-core-6.1.1.jar
kubernetes-model-discovery/6.1.1//kubernetes-model-discovery-6.1.1.jar
kubernetes-model-events/6.1.1//kubernetes-model-events-6.1.1.jar
kubernetes-model-extensions/6.1.1//kubernetes-model-extensions-6.1.1.jar
kubernetes-model-flowcontrol/6.1.1//kubernetes-model-flowcontrol-6.1.1.jar
kubernetes-model-gatewayapi/6.1.1//kubernetes-model-gatewayapi-6.1.1.jar
kubernetes-model-metrics/6.1.1//kubernetes-model-metrics-6.1.1.jar
kubernetes-model-networking/6.1.1//kubernetes-model-networking-6.1.1.jar
kubernetes-model-node/6.1.1//kubernetes-model-node-6.1.1.jar
kubernetes-model-policy/6.1.1//kubernetes-model-policy-6.1.1.jar
kubernetes-model-rbac/6.1.1//kubernetes-model-rbac-6.1.1.jar
kubernetes-model-scheduling/6.1.1//kubernetes-model-scheduling-6.1.1.jar
kubernetes-model-storageclass/6.1.1//kubernetes-model-storageclass-6.1.1.jar
lapack/3.0.2//lapack-3.0.2.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
libfb303/0.9.3//libfb303-0.9.3.jar
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@
<arrow.version>9.0.0</arrow.version>
<!-- org.fusesource.leveldbjni will be used except on arm64 platform. -->
<leveldbjni.group>org.fusesource.leveldbjni</leveldbjni.group>
<kubernetes-client.version>5.12.3</kubernetes-client.version>
<kubernetes-client.version>6.1.1</kubernetes-client.version>

<test.java.home>${java.home}</test.java.home>

Expand Down
5 changes: 5 additions & 0 deletions resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-httpclient-okhttp</artifactId>
<version>${kubernetes-client.version}</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.base.Charsets
import com.google.common.io.Files
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
import io.fabric8.kubernetes.client.{ConfigBuilder, KubernetesClient, KubernetesClientBuilder}
import io.fabric8.kubernetes.client.Config.KUBERNETES_REQUEST_RETRY_BACKOFFLIMIT_SYSTEM_PROPERTY
import io.fabric8.kubernetes.client.Config.autoConfigure
import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory
Expand Down Expand Up @@ -115,7 +115,10 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
}
logDebug("Kubernetes client config: " +
new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(config))
new DefaultKubernetesClient(factoryWithCustomDispatcher.createHttpClient(config), config)
new KubernetesClientBuilder()
.withHttpClientFactory(factoryWithCustomDispatcher)
.withConfig(config)
.build()
}

private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.deploy.k8s.submit
import scala.collection.JavaConverters._

import K8SSparkSubmitOperation.getGracePeriod
import io.fabric8.kubernetes.api.model.{Pod, PodList}
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.{NonNamespaceOperation, PodResource}
import io.fabric8.kubernetes.client.dsl.PodResource

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkSubmitOperation
Expand All @@ -32,25 +32,23 @@ import org.apache.spark.deploy.k8s.KubernetesUtils.formatPodState
import org.apache.spark.util.{CommandLineLoggingUtils, Utils}

private sealed trait K8sSubmitOp extends CommandLineLoggingUtils {
type NON_NAMESPACED_PODS =
NonNamespaceOperation[Pod, PodList, PodResource[Pod]]
def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit
def executeOnGlob(pods: List[Pod], ns: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit
def listPodsInNameSpace(namespace: Option[String])
(implicit client: KubernetesClient): NON_NAMESPACED_PODS = {
def getPod(namespace: Option[String], name: String)
(implicit client: KubernetesClient): PodResource = {
namespace match {
case Some(ns) => client.pods.inNamespace(ns)
case None => client.pods
case Some(ns) => client.pods.inNamespace(ns).withName(name)
case None => client.pods.withName(name)
}
}
}

private class KillApplication extends K8sSubmitOp {
override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit = {
val podToDelete = listPodsInNameSpace(namespace).withName(pName)
val podToDelete = getPod(namespace, pName)

if (Option(podToDelete).isDefined) {
getGracePeriod(sparkConf) match {
Expand All @@ -66,19 +64,11 @@ private class KillApplication extends K8sSubmitOp {
(implicit client: KubernetesClient): Unit = {
if (pods.nonEmpty) {
pods.foreach { pod => printMessage(s"Deleting driver pod: ${pod.getMetadata.getName}.") }
val listedPods = listPodsInNameSpace(namespace)

getGracePeriod(sparkConf) match {
case Some(period) =>
// this is not using the batch api because no option is provided
// when using the grace period.
pods.foreach { pod =>
listedPods
.withName(pod.getMetadata.getName)
.withGracePeriod(period)
.delete()
}
case _ => listedPods.delete(pods.asJava)
client.resourceList(pods.asJava).withGracePeriod(period).delete()
case _ =>
client.resourceList(pods.asJava).delete()
}
} else {
printMessage("No applications found.")
Expand All @@ -89,7 +79,7 @@ private class KillApplication extends K8sSubmitOp {
private class ListStatus extends K8sSubmitOp {
override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit = {
val pod = listPodsInNameSpace(namespace).withName(pName).get()
val pod = getPod(namespace, pName).get()
if (Option(pod).isDefined) {
printMessage("Application status (driver): " +
Option(pod).map(formatPodState).getOrElse("unknown."))
Expand Down Expand Up @@ -145,13 +135,12 @@ private[spark] class K8SSparkSubmitOperation extends SparkSubmitOperation
.pods
}
val pods = ops
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
.list()
.getItems
.asScala
.filter { pod =>
val meta = pod.getMetadata
meta.getName.startsWith(pName.stripSuffix("*")) &&
meta.getLabels.get(SPARK_ROLE_LABEL) == SPARK_POD_DRIVER_ROLE
pod.getMetadata.getName.startsWith(pName.stripSuffix("*"))
}.toList
op.executeOnGlob(pods, namespace, sparkConf)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ private[spark] class Client(
var watch: Watch = null
var createdDriverPod: Pod = null
try {
createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
createdDriverPod =
kubernetesClient.pods().inNamespace(conf.namespace).resource(resolvedDriverPod).create()
} catch {
case NonFatal(e) =>
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
Expand All @@ -163,7 +164,7 @@ private[spark] class Client(
kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
kubernetesClient.pods().resource(createdDriverPod).delete()
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
throw e
}
Expand All @@ -175,7 +176,7 @@ private[spark] class Client(
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
kubernetesClient.pods().resource(createdDriverPod).delete()
throw e
}

Expand All @@ -185,6 +186,7 @@ private[spark] class Client(
while (true) {
val podWithName = kubernetesClient
.pods()
.inNamespace(conf.namespace)
.withName(driverPodName)
// Reset resource to old before we start the watch, this is important for race conditions
watcher.reset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class ExecutorPodsAllocator(

val driverPod = kubernetesDriverPodName
.map(name => Option(kubernetesClient.pods()
.inNamespace(namespace)
.withName(name)
.get())
.getOrElse(throw new SparkException(
Expand Down Expand Up @@ -112,6 +113,7 @@ class ExecutorPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withName(pod.getMetadata.getName)
.waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
}
Expand Down Expand Up @@ -185,6 +187,7 @@ class ExecutorPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, timedOut.toSeq.map(_.toString): _*)
Expand Down Expand Up @@ -299,6 +302,7 @@ class ExecutorPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withField("status.phase", "Pending")
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
Expand Down Expand Up @@ -363,6 +367,7 @@ class ExecutorPodsAllocator(
try {
val createdPVCs = kubernetesClient
.persistentVolumeClaims
.inNamespace(namespace)
.withLabel("spark-app-selector", applicationId)
.list()
.getItems
Expand Down Expand Up @@ -406,7 +411,8 @@ class ExecutorPodsAllocator(
.build()
val resources = replacePVCsIfNeeded(
podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer)
val createdExecutorPod =
kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
try {
addOwnerReference(createdExecutorPod, resources)
resources
Expand All @@ -418,13 +424,16 @@ class ExecutorPodsAllocator(
val pvc = resource.asInstanceOf[PersistentVolumeClaim]
logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
s"StorageClass ${pvc.getSpec.getStorageClassName}")
kubernetesClient.persistentVolumeClaims().create(pvc)
kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create()
}
newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis())
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdExecutorPod)
kubernetesClient.pods()
.inNamespace(namespace)
.resource(createdExecutorPod)
.delete()
throw e
}
}
Expand Down Expand Up @@ -475,6 +484,7 @@ class ExecutorPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
Expand Down
Loading

0 comments on commit fa88651

Please sign in to comment.