diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java
index 70c112b78911d..09ab3ae527f7d 100644
--- a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java
+++ b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java
@@ -39,16 +39,20 @@ public interface ShuffleExecutorComponents {
/**
* Called once per map task to create a writer that will be responsible for persisting all the
* partitioned bytes written by that map task.
- * @param shuffleId Unique identifier for the shuffle the map task is a part of
+ * @param shuffleId Unique identifier for the shuffle the map task is a part of
+ * @param shuffleGenerationId The shuffle generation ID of the stage that this task belongs to,
+ * it equals the stage attempt number while the stage is indeterminate
+ * and -1 on the contrary.
* @param mapId Within the shuffle, the identifier of the map task
* @param mapTaskAttemptId Identifier of the task attempt. Multiple attempts of the same map task
- * with the same (shuffleId, mapId) pair can be distinguished by the
- * different values of mapTaskAttemptId.
+ * with the same (shuffleId, mapId) pair can be distinguished by the
+ * different values of mapTaskAttemptId.
* @param numPartitions The number of partitions that will be written by the map task. Some of
-* these partitions may be empty.
+ * these partitions may be empty.
*/
ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
+ int shuffleGenerationId,
int mapId,
long mapTaskAttemptId,
int numPartitions) throws IOException;
diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleMapOutputWriter.java
index 45a593c25a93c..e3d60880765d2 100644
--- a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleMapOutputWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleMapOutputWriter.java
@@ -39,7 +39,7 @@ public interface ShuffleMapOutputWriter {
* for the same partition within any given map task. The partition identifier will be in the
* range of precisely 0 (inclusive) to numPartitions (exclusive), where numPartitions was
* provided upon the creation of this map output writer via
- * {@link ShuffleExecutorComponents#createMapOutputWriter(int, int, long, int)}.
+ * {@link ShuffleExecutorComponents#createMapOutputWriter(int, int, int, long, int)}.
*
* Calls to this method will be invoked with monotonically increasing reducePartitionIds; each
* call to this method will be called with a reducePartitionId that is strictly greater than
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index 3ccee703619b4..a80acdcf3da56 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -129,8 +129,8 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter {
@Override
public void write(Iterator> records) throws IOException {
assert (partitionWriters == null);
- ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
- .createMapOutputWriter(shuffleId, mapId, mapTaskAttemptId, numPartitions);
+ ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
+ shuffleId, -1, mapId, mapTaskAttemptId, numPartitions);
try {
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
index 02eb710737285..c50bc3cb738c5 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
@@ -58,6 +58,7 @@ public void initializeExecutor(String appId, String execId) {
@Override
public ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
+ int shuffleGenerationId,
int mapId,
long mapTaskAttemptId,
int numPartitions) {
@@ -66,6 +67,6 @@ public ShuffleMapOutputWriter createMapOutputWriter(
"Executor components must be initialized before getting writers.");
}
return new LocalDiskShuffleMapOutputWriter(
- shuffleId, mapId, numPartitions, blockResolver, sparkConf);
+ shuffleId, shuffleGenerationId, mapId, numPartitions, blockResolver, sparkConf);
}
}
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
index add4634a61fb5..90c54f8800d51 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
@@ -48,6 +48,7 @@ public class LocalDiskShuffleMapOutputWriter implements ShuffleMapOutputWriter {
LoggerFactory.getLogger(LocalDiskShuffleMapOutputWriter.class);
private final int shuffleId;
+ private final int shuffleGenerationId;
private final int mapId;
private final IndexShuffleBlockResolver blockResolver;
private final long[] partitionLengths;
@@ -63,11 +64,13 @@ public class LocalDiskShuffleMapOutputWriter implements ShuffleMapOutputWriter {
public LocalDiskShuffleMapOutputWriter(
int shuffleId,
+ int shuffleGenerationId,
int mapId,
int numPartitions,
IndexShuffleBlockResolver blockResolver,
SparkConf sparkConf) {
this.shuffleId = shuffleId;
+ this.shuffleGenerationId = shuffleGenerationId;
this.mapId = mapId;
this.blockResolver = blockResolver;
this.bufferSize =
@@ -99,7 +102,8 @@ public ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws I
public void commitAllPartitions() throws IOException {
cleanUp();
File resolvedTmp = outputTempFile != null && outputTempFile.isFile() ? outputTempFile : null;
- blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp);
+ blockResolver.writeIndexFileAndCommit(
+ shuffleId, mapId, partitionLengths, resolvedTmp);
}
@Override
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala
index 5693b9824523a..bb500fbdc2b37 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala
@@ -87,6 +87,7 @@ class LocalDiskShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndA
}
mapOutputWriter = new LocalDiskShuffleMapOutputWriter(
0,
+ -1,
0,
NUM_PARTITIONS,
blockResolver,