Skip to content

Commit

Permalink
revert ae (apache#140)
Browse files Browse the repository at this point in the history
  • Loading branch information
7mming7 authored and zheniantoushipashi committed Jul 11, 2020
1 parent 9762fc7 commit 86b9059
Show file tree
Hide file tree
Showing 69 changed files with 767 additions and 3,792 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,60 +197,48 @@ public Map<String, Metric> getMetrics() {
}
}

private boolean isShuffleBlock(String[] blockIdParts) {
// length == 4: ShuffleBlockId
// length == 5: ContinuousShuffleBlockId
return (blockIdParts.length == 4 || blockIdParts.length == 5) &&
blockIdParts[0].equals("shuffle");
}

private class ManagedBufferIterator implements Iterator<ManagedBuffer> {

private int index = 0;
private final String appId;
private final String execId;
private final int shuffleId;
// An array containing mapId, reduceId and numBlocks tuple
private final int[] shuffleBlockIds;
// An array containing mapId and reduceId pairs.
private final int[] mapIdAndReduceIds;

ManagedBufferIterator(String appId, String execId, String[] blockIds) {
this.appId = appId;
this.execId = execId;
String[] blockId0Parts = blockIds[0].split("_");
if (!isShuffleBlock(blockId0Parts)) {
if (blockId0Parts.length != 4 || !blockId0Parts[0].equals("shuffle")) {
throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[0]);
}
this.shuffleId = Integer.parseInt(blockId0Parts[1]);
shuffleBlockIds = new int[3 * blockIds.length];
mapIdAndReduceIds = new int[2 * blockIds.length];
for (int i = 0; i < blockIds.length; i++) {
String[] blockIdParts = blockIds[i].split("_");
if (!isShuffleBlock(blockIdParts)) {
if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) {
throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[i]);
}
if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
", got:" + blockIds[i]);
}
shuffleBlockIds[3 * i] = Integer.parseInt(blockIdParts[2]);
shuffleBlockIds[3 * i + 1] = Integer.parseInt(blockIdParts[3]);
if (blockIdParts.length == 4) {
shuffleBlockIds[3 * i + 2] = 1;
} else {
shuffleBlockIds[3 * i + 2] = Integer.parseInt(blockIdParts[4]);
", got:" + blockIds[i]);
}
mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]);
mapIdAndReduceIds[2 * i + 1] = Integer.parseInt(blockIdParts[3]);
}
}

@Override
public boolean hasNext() {
return index < shuffleBlockIds.length;
return index < mapIdAndReduceIds.length;
}

@Override
public ManagedBuffer next() {
final ManagedBuffer block = blockManager.getBlockData(appId, execId, shuffleId,
shuffleBlockIds[index], shuffleBlockIds[index + 1], shuffleBlockIds[index + 2]);
index += 3;
mapIdAndReduceIds[index], mapIdAndReduceIds[index + 1]);
index += 2;
metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
return block;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,22 +162,21 @@ public void registerExecutor(
}

/**
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId, numBlocks). We make assumptions
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). We make assumptions
* about how the hash and sort based shuffles store their data.
*/
public ManagedBuffer getBlockData(
String appId,
String execId,
int shuffleId,
int mapId,
int reduceId,
int numBlocks) {
String appId,
String execId,
int shuffleId,
int mapId,
int reduceId) {
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
if (executor == null) {
throw new RuntimeException(
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
}
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId, numBlocks);
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
}

/**
Expand Down Expand Up @@ -281,19 +280,19 @@ public boolean accept(File dir, String name) {
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
*/
private ManagedBuffer getSortBasedShuffleBlockData(
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId, int numBlocks) {
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.index");
"shuffle_" + shuffleId + "_" + mapId + "_0.index");

try {
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId, numBlocks);
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
return new FileSegmentManagedBuffer(
conf,
getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
shuffleIndexRecord.getOffset(),
shuffleIndexRecord.getLength());
conf,
getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
shuffleIndexRecord.getOffset(),
shuffleIndexRecord.getLength());
} catch (ExecutionException e) {
throw new RuntimeException("Failed to open file: " + indexFile, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ public int getSize() {
/**
* Get index offset for a particular reducer.
*/
public ShuffleIndexRecord getIndex(int reduceId, int numBlocks) {
public ShuffleIndexRecord getIndex(int reduceId) {
long offset = offsets.get(reduceId);
long nextOffset = offsets.get(reduceId + numBlocks);
long nextOffset = offsets.get(reduceId + 1);
return new ShuffleIndexRecord(offset, nextOffset - offset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public void testOpenShuffleBlocks() {

ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 0, 1)).thenReturn(block0Marker);
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 1, 1)).thenReturn(block1Marker);
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 0)).thenReturn(block0Marker);
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 1)).thenReturn(block1Marker);
ByteBuffer openBlocks = new OpenBlocks("app0", "exec1",
new String[] { "shuffle_0_0_0", "shuffle_0_0_1" })
.toByteBuffer();
Expand All @@ -106,8 +106,8 @@ public void testOpenShuffleBlocks() {
assertEquals(block0Marker, buffers.next());
assertEquals(block1Marker, buffers.next());
assertFalse(buffers.hasNext());
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0, 1);
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1, 1);
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0);
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1);

// Verify open block request latency metrics
Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void testBadRequests() throws IOException {
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
// Unregistered executor
try {
resolver.getBlockData("app0", "exec1", 1, 1, 0, 1);
resolver.getBlockData("app0", "exec1", 1, 1, 0);
fail("Should have failed");
} catch (RuntimeException e) {
assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
Expand All @@ -75,7 +75,7 @@ public void testBadRequests() throws IOException {
// Invalid shuffle manager
try {
resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
resolver.getBlockData("app0", "exec2", 1, 1, 0, 1);
resolver.getBlockData("app0", "exec2", 1, 1, 0);
fail("Should have failed");
} catch (UnsupportedOperationException e) {
// pass
Expand All @@ -85,7 +85,7 @@ public void testBadRequests() throws IOException {
resolver.registerExecutor("app0", "exec3",
dataContext.createExecutorInfo(SORT_MANAGER));
try {
resolver.getBlockData("app0", "exec3", 1, 1, 0, 1);
resolver.getBlockData("app0", "exec3", 1, 1, 0);
fail("Should have failed");
} catch (Exception e) {
// pass
Expand All @@ -99,25 +99,18 @@ public void testSortShuffleBlocks() throws IOException {
dataContext.createExecutorInfo(SORT_MANAGER));

InputStream block0Stream =
resolver.getBlockData("app0", "exec0", 0, 0, 0, 1).createInputStream();
resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream();
String block0 = CharStreams.toString(
new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
block0Stream.close();
assertEquals(sortBlock0, block0);

InputStream block1Stream =
resolver.getBlockData("app0", "exec0", 0, 0, 1, 1).createInputStream();
resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream();
String block1 = CharStreams.toString(
new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
block1Stream.close();
assertEquals(sortBlock1, block1);

InputStream block01Stream =
resolver.getBlockData("app0", "exec0", 0, 0, 0, 2).createInputStream();
String block01 = CharStreams.toString(
new InputStreamReader(block01Stream, StandardCharsets.UTF_8));
block01Stream.close();
assertEquals(sortBlock0 + sortBlock1, block01);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, new long[numPartitions]);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
Expand Down Expand Up @@ -159,18 +159,15 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {

File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
File tmp = Utils.tempFileWith(output);
MapInfo mapInfo;
try {
mapInfo = writePartitionedFile(tmp);
partitionLengths = mapInfo.lengths;
partitionLengths = writePartitionedFile(tmp);
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), mapInfo.lengths, mapInfo.records);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}

@VisibleForTesting
Expand All @@ -183,13 +180,12 @@ long[] getPartitionLengths() {
*
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker).
*/
private MapInfo writePartitionedFile(File outputFile) throws IOException {
private long[] writePartitionedFile(File outputFile) throws IOException {
// Track location of the partition starts in the output file
final long[] lengths = new long[numPartitions];
final long[] records = new long[numPartitions];
if (partitionWriters == null) {
// We were passed an empty iterator
return new MapInfo(lengths, records);
return lengths;
}

final FileOutputStream out = new FileOutputStream(outputFile, true);
Expand All @@ -198,7 +194,6 @@ private MapInfo writePartitionedFile(File outputFile) throws IOException {
try {
for (int i = 0; i < numPartitions; i++) {
final File file = partitionWriterSegments[i].file();
records[i] = partitionWriterSegments[i].record();
if (file.exists()) {
final FileInputStream in = new FileInputStream(file);
boolean copyThrewException = true;
Expand All @@ -219,7 +214,7 @@ private MapInfo writePartitionedFile(File outputFile) throws IOException {
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
}
partitionWriters = null;
return new MapInfo(lengths, records);
return lengths;
}

@Override
Expand Down
28 changes: 0 additions & 28 deletions core/src/main/java/org/apache/spark/shuffle/sort/MapInfo.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ private void writeSortedFile(boolean isLastFile) {
if (currentPartition != -1) {
final FileSegment fileSegment = writer.commitAndGet();
spillInfo.partitionLengths[currentPartition] = fileSegment.length();
spillInfo.partitionRecords[currentPartition] = fileSegment.record();
}
currentPartition = partition;
}
Expand Down Expand Up @@ -223,7 +222,6 @@ private void writeSortedFile(boolean isLastFile) {
// writeSortedFile() in that case.
if (currentPartition != -1) {
spillInfo.partitionLengths[currentPartition] = committedSegment.length();
spillInfo.partitionRecords[currentPartition] = committedSegment.record();
spills.add(spillInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@
*/
final class SpillInfo {
final long[] partitionLengths;
final long[] partitionRecords;
final File file;
final TempShuffleBlockId blockId;

SpillInfo(int numPartitions, File file, TempShuffleBlockId blockId) {
this.partitionLengths = new long[numPartitions];
this.partitionRecords = new long[numPartitions];
this.file = file;
this.blockId = blockId;
}
Expand Down
Loading

0 comments on commit 86b9059

Please sign in to comment.