Skip to content

Commit

Permalink
Add task attempt id in the APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
mccheah committed Jul 1, 2019
1 parent 4c3d692 commit 2421c92
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ public interface ShuffleWriteSupport {
ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
int mapId,
long mapTaskAttemptId,
int numPartitions) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final ShuffleWriteMetricsReporter writeMetrics;
private final int shuffleId;
private final int mapId;
private final long mapTaskAttemptId;
private final Serializer serializer;
private final ShuffleWriteSupport shuffleWriteSupport;

Expand All @@ -107,6 +108,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
BlockManager blockManager,
BypassMergeSortShuffleHandle<K, V> handle,
int mapId,
long mapTaskAttemptId,
SparkConf conf,
ShuffleWriteMetricsReporter writeMetrics,
ShuffleWriteSupport shuffleWriteSupport) {
Expand All @@ -116,6 +118,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
this.blockManager = blockManager;
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.mapId = mapId;
this.mapTaskAttemptId = mapTaskAttemptId;
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
Expand All @@ -128,7 +131,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
ShuffleMapOutputWriter mapOutputWriter = shuffleWriteSupport
.createMapOutputWriter(shuffleId, mapId, numPartitions);
.createMapOutputWriter(shuffleId, mapId, mapTaskAttemptId, numPartitions);
try {
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public DefaultShuffleWriteSupport(
public ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
int mapId,
long mapTaskAttemptId,
int numPartitions) {
TaskContext taskContext = TaskContext.get();
if (taskContext == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
env.blockManager,
bypassMergeSortHandle,
mapId,
context.taskAttemptId(),
env.conf,
metrics,
shuffleExecutorComponents.writes())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
blockManager,
shuffleHandle,
0, // MapId
0L,
conf,
taskContext.taskMetrics().shuffleWriteMetrics,
writeSupport
Expand All @@ -173,6 +174,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
blockManager,
shuffleHandle,
0, // MapId
0L,
transferConf,
taskContext.taskMetrics().shuffleWriteMetrics,
writeSupport
Expand All @@ -197,6 +199,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
blockManager,
shuffleHandle,
0, // MapId
0L,
conf,
taskContext.taskMetrics().shuffleWriteMetrics,
writeSupport
Expand Down Expand Up @@ -232,6 +235,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
blockManager,
shuffleHandle,
0, // MapId
0L,
conf,
taskContext.taskMetrics().shuffleWriteMetrics,
writeSupport
Expand All @@ -254,6 +258,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
blockManager,
shuffleHandle,
0, // MapId
0L,
conf,
taskContext.taskMetrics().shuffleWriteMetrics,
writeSupport
Expand Down

0 comments on commit 2421c92

Please sign in to comment.