Skip to content

Commit

Permalink
[SPARK-48268][CORE] Add a configuration for SparkContext.setCheckpoin…
Browse files Browse the repository at this point in the history
…tDir

### What changes were proposed in this pull request?

This PR adds `spark.checkpoint.dir` configuration so users can set the checkpoint dir when they submit their application.

### Why are the changes needed?

Separate the configuration logic so the same app can run with a different checkpoint.
In addition, this would be useful for Spark Connect with #46570.

### Does this PR introduce _any_ user-facing change?

Yes, it adds a new user-facing configuration.

### How was this patch tested?

unittest added

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46571 from HyukjinKwon/SPARK-48268.

Lead-authored-by: Hyukjin Kwon <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
HyukjinKwon and HyukjinKwon committed May 16, 2024
1 parent e9d4152 commit 153053f
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 0 deletions.
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,8 @@ class SparkContext(config: SparkConf) extends Logging {
.foreach(logLevel => _schedulerBackend.updateExecutorsLogLevel(logLevel))
}

_conf.get(CHECKPOINT_DIR).foreach(setCheckpointDir)

val _executorMetricsSource =
if (_conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) {
Some(new ExecutorMetricsSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1317,6 +1317,15 @@ package object config {
s" be less than or equal to ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
.createWithDefault(64 * 1024 * 1024)

private[spark] val CHECKPOINT_DIR =
ConfigBuilder("spark.checkpoint.dir")
.doc(
"Set the default directory for checkpointing. It can be overwritten by " +
"SparkContext.setCheckpointDir.")
.version("4.0.0")
.stringConf
.createOptional

private[spark] val CHECKPOINT_COMPRESS =
ConfigBuilder("spark.checkpoint.compress")
.doc("Whether to compress RDD checkpoints. Generally a good idea. Compression will use " +
Expand Down
16 changes: 16 additions & 0 deletions core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -669,4 +669,20 @@ class CheckpointStorageSuite extends SparkFunSuite with LocalSparkContext {
assert(rdd.firstParent.isInstanceOf[ReliableCheckpointRDD[_]])
}
}

test("SPARK-48268: checkpoint directory via configuration") {
withTempDir { checkpointDir =>
val conf = new SparkConf()
.set("spark.checkpoint.dir", checkpointDir.toString)
.set(UI_ENABLED.key, "false")
sc = new SparkContext("local", "test", conf)
val parCollection = sc.makeRDD(1 to 4)
val flatMappedRDD = parCollection.flatMap(x => 1 to x)
flatMappedRDD.checkpoint()
assert(flatMappedRDD.dependencies.head.rdd === parCollection)
val result = flatMappedRDD.collect()
assert(flatMappedRDD.dependencies.head.rdd != parCollection)
assert(flatMappedRDD.collect() === result)
}
}
}
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1795,6 +1795,15 @@ Apart from these, the following properties are also available, and may be useful
</td>
<td>0.6.0</td>
</tr>
<tr>
<td><code>spark.checkpoint.dir</code></td>
<td>(none)</td>
<td>
Set the default directory for checkpointing. It can be overwritten by
SparkContext.setCheckpointDir.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.checkpoint.compress</code></td>
<td>false</td>
Expand Down

0 comments on commit 153053f

Please sign in to comment.