Skip to content

Commit

Permalink
[SPARK-44953][CORE] Log a warning when shuffle tracking is enabled al…
Browse files Browse the repository at this point in the history
…ong side another DA supported mechanism

### What changes were proposed in this pull request?

Log a warning when shuffle tracking is enabled along side another DA supported mechanism

### Why are the changes needed?

Some users enable both shuffle tracking and another mechanism (like migration) and then are confused when their jobs don't scale down.

https://issues.apache.org/jira/browse/SPARK-44953

### Does this PR introduce _any_ user-facing change?

Yes, user can find the warning log when enabled both shuffle tracking and another DA supported mechanism(shuffle decommission).

### How was this patch tested?

No

### Was this patch authored or co-authored using generative AI tooling?

NO

Closes #45454 from zwangsheng/SPARK-44953.

Authored-by: zwangsheng <[email protected]>
Signed-off-by: Holden Karau <[email protected]>
  • Loading branch information
zwangsheng authored and holdenk committed May 13, 2024
1 parent 8d8cc62 commit a101c48
Showing 1 changed file with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,13 @@ private[spark] class ExecutorAllocationManager(
throw new SparkException(
s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!")
}
val shuffleTrackingEnabled = conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)
val shuffleDecommissionEnabled = decommissionEnabled &&
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !reliableShuffleStorage) {
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
if (shuffleTrackingEnabled) {
logInfo("Dynamic allocation is enabled without a shuffle service.")
} else if (decommissionEnabled &&
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
} else if (shuffleDecommissionEnabled) {
logInfo("Shuffle data decommission is enabled without a shuffle service.")
} else if (!testing) {
throw new SparkException("Dynamic allocation of executors requires one of the " +
Expand All @@ -224,6 +226,12 @@ private[spark] class ExecutorAllocationManager(
}
}

if (shuffleTrackingEnabled && (shuffleDecommissionEnabled || reliableShuffleStorage)) {
logWarning("You are enabling both shuffle tracking and other DA supported mechanism, " +
"which will cause idle executors not to be released in a timely, " +
"please check the configurations.")
}

if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
throw new SparkException(
s"${DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO.key} must be > 0 and <= 1.0")
Expand Down

0 comments on commit a101c48

Please sign in to comment.