Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28625][Core] Indeterminate shuffle support in Shuffle Writer API #25361

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,20 @@ public interface ShuffleExecutorComponents {
/**
* Called once per map task to create a writer that will be responsible for persisting all the
* partitioned bytes written by that map task.
* @param shuffleId Unique identifier for the shuffle the map task is a part of
* @param shuffleId Unique identifier for the shuffle the map task is a part of
* @param shuffleGenerationId The shuffle generation ID of the stage that this task belongs to,
Copy link
Contributor

@mccheah mccheah Aug 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we just use the mapTaskAttemptId? (In fact I wonder if we can just remove shuffleId and mapId and just use mapTaskAttemptId as a global identifier, but that might be a bit ambitious.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically taking a look at the linked PR for indeterminate retries - I'd expect that on a rolled back map stage, the implementation of this plugin will be given a different mapTaskAttemptId anyways since that's going to be updated on the resubmit. So, we'll end up opening a new writer regardless, but, we could have gotten the same behavior just by using the mapTaskAttemptId.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually let's move discussion over to #24892

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, thanks for your faster review, I describe the requirement in #24892 (comment)

* it equals the stage attempt number while the stage is indeterminate
* and -1 on the contrary.
* @param mapId Within the shuffle, the identifier of the map task
* @param mapTaskAttemptId Identifier of the task attempt. Multiple attempts of the same map task
* with the same (shuffleId, mapId) pair can be distinguished by the
* different values of mapTaskAttemptId.
* with the same (shuffleId, mapId) pair can be distinguished by the
* different values of mapTaskAttemptId.
* @param numPartitions The number of partitions that will be written by the map task. Some of
* these partitions may be empty.
* these partitions may be empty.
*/
ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
int shuffleGenerationId,
int mapId,
long mapTaskAttemptId,
int numPartitions) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public interface ShuffleMapOutputWriter {
* for the same partition within any given map task. The partition identifier will be in the
* range of precisely 0 (inclusive) to numPartitions (exclusive), where numPartitions was
* provided upon the creation of this map output writer via
* {@link ShuffleExecutorComponents#createMapOutputWriter(int, int, long, int)}.
* {@link ShuffleExecutorComponents#createMapOutputWriter(int, int, int, long, int)}.
* <p>
* Calls to this method will be invoked with monotonically increasing reducePartitionIds; each
* call to this method will be called with a reducePartitionId that is strictly greater than
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
@Override
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
.createMapOutputWriter(shuffleId, mapId, mapTaskAttemptId, numPartitions);
ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
shuffleId, -1, mapId, mapTaskAttemptId, numPartitions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we always passing in -1 here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR created for quick reviewing of API changes, you can see the real scenario of shuffleGeneraionId here :)

try {
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public void initializeExecutor(String appId, String execId) {
@Override
public ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
int shuffleGenerationId,
int mapId,
long mapTaskAttemptId,
int numPartitions) {
Expand All @@ -66,6 +67,6 @@ public ShuffleMapOutputWriter createMapOutputWriter(
"Executor components must be initialized before getting writers.");
}
return new LocalDiskShuffleMapOutputWriter(
shuffleId, mapId, numPartitions, blockResolver, sparkConf);
shuffleId, shuffleGenerationId, mapId, numPartitions, blockResolver, sparkConf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class LocalDiskShuffleMapOutputWriter implements ShuffleMapOutputWriter {
LoggerFactory.getLogger(LocalDiskShuffleMapOutputWriter.class);

private final int shuffleId;
private final int shuffleGenerationId;
private final int mapId;
private final IndexShuffleBlockResolver blockResolver;
private final long[] partitionLengths;
Expand All @@ -63,11 +64,13 @@ public class LocalDiskShuffleMapOutputWriter implements ShuffleMapOutputWriter {

public LocalDiskShuffleMapOutputWriter(
int shuffleId,
int shuffleGenerationId,
int mapId,
int numPartitions,
IndexShuffleBlockResolver blockResolver,
SparkConf sparkConf) {
this.shuffleId = shuffleId;
this.shuffleGenerationId = shuffleGenerationId;
this.mapId = mapId;
this.blockResolver = blockResolver;
this.bufferSize =
Expand Down Expand Up @@ -99,7 +102,8 @@ public ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws I
public void commitAllPartitions() throws IOException {
cleanUp();
File resolvedTmp = outputTempFile != null && outputTempFile.isFile() ? outputTempFile : null;
blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp);
blockResolver.writeIndexFileAndCommit(
shuffleId, mapId, partitionLengths, resolvedTmp);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class LocalDiskShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndA
}
mapOutputWriter = new LocalDiskShuffleMapOutputWriter(
0,
-1,
0,
NUM_PARTITIONS,
blockResolver,
Expand Down