Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37497][K8S] Promote ExecutorPods[PollingSnapshot|WatchSnapshot]Source to DeveloperApi #34751

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,21 @@ import io.fabric8.kubernetes.client.KubernetesClient
import scala.collection.JavaConverters._

import org.apache.spark.SparkConf
import org.apache.spark.annotation.{DeveloperApi, Since, Stable}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.util.{ThreadUtils, Utils}

private[spark] class ExecutorPodsPollingSnapshotSource(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's true technically in general, @HyukjinKwon . For this one, it's a little awkward situation because this has been used for a long time to implement a new cluster manager having ExternalClusterManager interface which is also private[spark].

After we make K8s GA at 3.1.1, we noticed that K8s eco-system is fragile because some new contributions easily broke the others because everything is private. That's the reason why I used the last signature change to show the historic version at 4e8701a, SPARK-35280 Promote KubernetesUtils to DeveloperApi. This is very specific situation in K8s domain. So, if you don't mind, I'd want to show the original version here.

/**
* :: DeveloperApi ::
*
* A class used for polling K8s executor pods by ExternalClusterManagers.
* @since 3.1.3
*/
@Stable
@DeveloperApi
class ExecutorPodsPollingSnapshotSource(
conf: SparkConf,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore,
Expand All @@ -39,13 +48,15 @@ private[spark] class ExecutorPodsPollingSnapshotSource(

private var pollingFuture: Future[_] = _

@Since("3.1.3")
def start(applicationId: String): Unit = {
require(pollingFuture == null, "Cannot start polling more than once.")
logDebug(s"Starting to check for executor pod state every $pollingInterval ms.")
pollingFuture = pollingExecutor.scheduleWithFixedDelay(
new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS)
}

@Since("3.1.3")
def stop(): Unit = {
if (pollingFuture != null) {
pollingFuture.cancel(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,27 @@ import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClient, Watcher, WatcherException}
import io.fabric8.kubernetes.client.Watcher.Action

import org.apache.spark.annotation.{DeveloperApi, Since, Stable}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

private[spark] class ExecutorPodsWatchSnapshotSource(
/**
* :: DeveloperApi ::
*
* A class used for watching K8s executor pods by ExternalClusterManagers.
*
* @since 3.1.3
*/
@Stable
@DeveloperApi
class ExecutorPodsWatchSnapshotSource(
snapshotsStore: ExecutorPodsSnapshotsStore,
kubernetesClient: KubernetesClient) extends Logging {

private var watchConnection: Closeable = _

@Since("3.1.3")
def start(applicationId: String): Unit = {
require(watchConnection == null, "Cannot start the watcher twice.")
logDebug(s"Starting watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," +
Expand All @@ -42,6 +53,7 @@ private[spark] class ExecutorPodsWatchSnapshotSource(
.watch(new ExecutorPodsWatcher())
}

@Since("3.1.3")
def stop(): Unit = {
if (watchConnection != null) {
Utils.tryLogNonFatalError {
Expand Down