From dc9759b9c566e557cdfe28a8bccd39917a1e7f44 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 5 Aug 2019 18:55:36 +0800 Subject: [PATCH 1/2] Resolve conflict with SPARK-28209 Add indeterminate stage rerun support in shuffle writer api (cherry picked from commit 99c2b4ab0d3d2b7b0b16ab1348a190e191b43c92) Signed-off-by: Yuanjian Li --- .../spark/shuffle/api/ShuffleExecutorComponents.java | 12 ++++++++---- .../shuffle/sort/BypassMergeSortShuffleWriter.java | 4 ++-- .../sort/io/LocalDiskShuffleExecutorComponents.java | 3 ++- .../sort/io/LocalDiskShuffleMapOutputWriter.java | 6 +++++- .../io/LocalDiskShuffleMapOutputWriterSuite.scala | 1 + 5 files changed, 18 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java index 70c112b78911d..09ab3ae527f7d 100644 --- a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java @@ -39,16 +39,20 @@ public interface ShuffleExecutorComponents { /** * Called once per map task to create a writer that will be responsible for persisting all the * partitioned bytes written by that map task. - * @param shuffleId Unique identifier for the shuffle the map task is a part of + * @param shuffleId Unique identifier for the shuffle the map task is a part of + * @param shuffleGenerationId The shuffle generation ID of the stage that this task belongs to, + * it equals the stage attempt number while the stage is indeterminate + * and -1 on the contrary. * @param mapId Within the shuffle, the identifier of the map task * @param mapTaskAttemptId Identifier of the task attempt. Multiple attempts of the same map task - * with the same (shuffleId, mapId) pair can be distinguished by the - * different values of mapTaskAttemptId. + * with the same (shuffleId, mapId) pair can be distinguished by the + * different values of mapTaskAttemptId. * @param numPartitions The number of partitions that will be written by the map task. Some of -* these partitions may be empty. + * these partitions may be empty. */ ShuffleMapOutputWriter createMapOutputWriter( int shuffleId, + int shuffleGenerationId, int mapId, long mapTaskAttemptId, int numPartitions) throws IOException; diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 3ccee703619b4..a80acdcf3da56 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -129,8 +129,8 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { @Override public void write(Iterator> records) throws IOException { assert (partitionWriters == null); - ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents - .createMapOutputWriter(shuffleId, mapId, mapTaskAttemptId, numPartitions); + ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter( + shuffleId, -1, mapId, mapTaskAttemptId, numPartitions); try { if (!records.hasNext()) { partitionLengths = new long[numPartitions]; diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java index 02eb710737285..c50bc3cb738c5 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java @@ -58,6 +58,7 @@ public void initializeExecutor(String appId, String execId) { @Override public ShuffleMapOutputWriter createMapOutputWriter( int shuffleId, + int shuffleGenerationId, int mapId, long mapTaskAttemptId, int numPartitions) { @@ -66,6 +67,6 @@ public ShuffleMapOutputWriter createMapOutputWriter( "Executor components must be initialized before getting writers."); } return new LocalDiskShuffleMapOutputWriter( - shuffleId, mapId, numPartitions, blockResolver, sparkConf); + shuffleId, shuffleGenerationId, mapId, numPartitions, blockResolver, sparkConf); } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java index add4634a61fb5..90c54f8800d51 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java @@ -48,6 +48,7 @@ public class LocalDiskShuffleMapOutputWriter implements ShuffleMapOutputWriter { LoggerFactory.getLogger(LocalDiskShuffleMapOutputWriter.class); private final int shuffleId; + private final int shuffleGenerationId; private final int mapId; private final IndexShuffleBlockResolver blockResolver; private final long[] partitionLengths; @@ -63,11 +64,13 @@ public class LocalDiskShuffleMapOutputWriter implements ShuffleMapOutputWriter { public LocalDiskShuffleMapOutputWriter( int shuffleId, + int shuffleGenerationId, int mapId, int numPartitions, IndexShuffleBlockResolver blockResolver, SparkConf sparkConf) { this.shuffleId = shuffleId; + this.shuffleGenerationId = shuffleGenerationId; this.mapId = mapId; this.blockResolver = blockResolver; this.bufferSize = @@ -99,7 +102,8 @@ public ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws I public void commitAllPartitions() throws IOException { cleanUp(); File resolvedTmp = outputTempFile != null && outputTempFile.isFile() ? outputTempFile : null; - blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp); + blockResolver.writeIndexFileAndCommit( + shuffleId, mapId, partitionLengths, resolvedTmp); } @Override diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala index 5693b9824523a..bb500fbdc2b37 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala @@ -87,6 +87,7 @@ class LocalDiskShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndA } mapOutputWriter = new LocalDiskShuffleMapOutputWriter( 0, + -1, 0, NUM_PARTITIONS, blockResolver, From 6e9bab29be47b1479a5893c108dc8d0ca47a97ea Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 5 Aug 2019 22:27:22 +0800 Subject: [PATCH 2/2] fix --- .../org/apache/spark/shuffle/api/ShuffleMapOutputWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleMapOutputWriter.java index 45a593c25a93c..e3d60880765d2 100644 --- a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleMapOutputWriter.java @@ -39,7 +39,7 @@ public interface ShuffleMapOutputWriter { * for the same partition within any given map task. The partition identifier will be in the * range of precisely 0 (inclusive) to numPartitions (exclusive), where numPartitions was * provided upon the creation of this map output writer via - * {@link ShuffleExecutorComponents#createMapOutputWriter(int, int, long, int)}. + * {@link ShuffleExecutorComponents#createMapOutputWriter(int, int, int, long, int)}. *

* Calls to this method will be invoked with monotonically increasing reducePartitionIds; each * call to this method will be called with a reducePartitionId that is strictly greater than