From 2421c928dd35ccaa28825a96bcd3449e94383b59 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 1 Jul 2019 16:46:33 -0700 Subject: [PATCH] Add task attempt id in the APIs --- .../org/apache/spark/api/shuffle/ShuffleWriteSupport.java | 1 + .../spark/shuffle/sort/BypassMergeSortShuffleWriter.java | 5 ++++- .../spark/shuffle/sort/io/DefaultShuffleWriteSupport.java | 1 + .../org/apache/spark/shuffle/sort/SortShuffleManager.scala | 1 + .../shuffle/sort/BypassMergeSortShuffleWriterSuite.scala | 5 +++++ 5 files changed, 12 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java index 7e2b6cf4133fd..7ee1d8a554073 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java @@ -32,5 +32,6 @@ public interface ShuffleWriteSupport { ShuffleMapOutputWriter createMapOutputWriter( int shuffleId, int mapId, + long mapTaskAttemptId, int numPartitions) throws IOException; } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 95b26030b8167..ec828a150235b 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -87,6 +87,7 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { private final ShuffleWriteMetricsReporter writeMetrics; private final int shuffleId; private final int mapId; + private final long mapTaskAttemptId; private final Serializer serializer; private final ShuffleWriteSupport shuffleWriteSupport; @@ -107,6 +108,7 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { BlockManager blockManager, BypassMergeSortShuffleHandle handle, int mapId, + long mapTaskAttemptId, SparkConf conf, ShuffleWriteMetricsReporter writeMetrics, ShuffleWriteSupport shuffleWriteSupport) { @@ -116,6 +118,7 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { this.blockManager = blockManager; final ShuffleDependency dep = handle.dependency(); this.mapId = mapId; + this.mapTaskAttemptId = mapTaskAttemptId; this.shuffleId = dep.shuffleId(); this.partitioner = dep.partitioner(); this.numPartitions = partitioner.numPartitions(); @@ -128,7 +131,7 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { public void write(Iterator> records) throws IOException { assert (partitionWriters == null); ShuffleMapOutputWriter mapOutputWriter = shuffleWriteSupport - .createMapOutputWriter(shuffleId, mapId, numPartitions); + .createMapOutputWriter(shuffleId, mapId, mapTaskAttemptId, numPartitions); try { if (!records.hasNext()) { partitionLengths = new long[numPartitions]; diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleWriteSupport.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleWriteSupport.java index 8a583d1bd50a3..be392ecbe8253 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleWriteSupport.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleWriteSupport.java @@ -39,6 +39,7 @@ public DefaultShuffleWriteSupport( public ShuffleMapOutputWriter createMapOutputWriter( int shuffleId, int mapId, + long mapTaskAttemptId, int numPartitions) { TaskContext taskContext = TaskContext.get(); if (taskContext == null) { diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 49f43be9e2ac5..a2cd942849c89 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -152,6 +152,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager env.blockManager, bypassMergeSortHandle, mapId, + context.taskAttemptId(), env.conf, metrics, shuffleExecutorComponents.writes()) diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index b16bf5194eeb5..02811cf67e4d1 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -148,6 +148,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte blockManager, shuffleHandle, 0, // MapId + 0L, conf, taskContext.taskMetrics().shuffleWriteMetrics, writeSupport @@ -173,6 +174,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte blockManager, shuffleHandle, 0, // MapId + 0L, transferConf, taskContext.taskMetrics().shuffleWriteMetrics, writeSupport @@ -197,6 +199,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte blockManager, shuffleHandle, 0, // MapId + 0L, conf, taskContext.taskMetrics().shuffleWriteMetrics, writeSupport @@ -232,6 +235,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte blockManager, shuffleHandle, 0, // MapId + 0L, conf, taskContext.taskMetrics().shuffleWriteMetrics, writeSupport @@ -254,6 +258,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte blockManager, shuffleHandle, 0, // MapId + 0L, conf, taskContext.taskMetrics().shuffleWriteMetrics, writeSupport