Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19659] Fetch big blocks to disk when shuffle-read. #16989

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import scala.Tuple2;

import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import org.slf4j.Logger;
Expand Down Expand Up @@ -94,6 +96,25 @@ public ManagedBuffer getChunk(long streamId, int chunkIndex) {
return nextChunk;
}

@Override
public ManagedBuffer openStream(String streamChunkId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinxing64 this breaks old shuffle service. We should avoid change server side codes. Right now I just disabled this feature in #18467

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I will try make a pr as soon as possible.

Tuple2<Long, Integer> streamIdAndChunkId = parseStreamChunkId(streamChunkId);
return getChunk(streamIdAndChunkId._1, streamIdAndChunkId._2);
}

public static String genStreamChunkId(long streamId, int chunkId) {
return String.format("%d_%d", streamId, chunkId);
}

public static Tuple2<Long, Integer> parseStreamChunkId(String streamChunkId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is called in a single place. Because it needs to return two values I see that you had to import the Scala tuple class in this Java code. You could avoid the need to do this by simply inlining this at its sole call site, which I think would make the code simpler to read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idea was to ensure that string parsing and unparsing it localized to a single place (genStreamChunkId and parseStreamChunkId) - so that we can modify it in future if required without needing to look for where all we did a split by "_"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can inline it if it becomes a bottleneck in the future.

String[] array = streamChunkId.split("_");
assert array.length == 2:
"Stream id and chunk index should be specified when open stream for fetching block.";
long streamId = Long.valueOf(array[0]);
int chunkIndex = Integer.valueOf(array[1]);
return new Tuple2<>(streamId, chunkIndex);
}

@Override
public void connectionTerminated(Channel channel) {
// Close all streams which have been associated with the channel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.network.shuffle;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
Expand Down Expand Up @@ -86,14 +87,16 @@ public void fetchBlocks(
int port,
String execId,
String[] blockIds,
BlockFetchingListener listener) {
BlockFetchingListener listener,
File[] shuffleFiles) {
checkInit();
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
try {
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
(blockIds1, listener1) -> {
TransportClient client = clientFactory.createClient(host, port);
new OneForOneBlockFetcher(client, appId, execId, blockIds1, listener1).start();
new OneForOneBlockFetcher(client, appId, execId, blockIds1, listener1, conf,
shuffleFiles).start();
};

int maxRetries = conf.maxIORetries();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,28 @@

package org.apache.spark.network.shuffle;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.ChunkReceivedCallback;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.StreamCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.OpenBlocks;
import org.apache.spark.network.shuffle.protocol.StreamHandle;
import org.apache.spark.network.util.TransportConf;

/**
* Simple wrapper on top of a TransportClient which interprets each chunk as a whole block, and
Expand All @@ -48,6 +57,8 @@ public class OneForOneBlockFetcher {
private final String[] blockIds;
private final BlockFetchingListener listener;
private final ChunkReceivedCallback chunkCallback;
private TransportConf transportConf = null;
private File[] shuffleFiles = null;

private StreamHandle streamHandle = null;

Expand All @@ -56,12 +67,20 @@ public OneForOneBlockFetcher(
String appId,
String execId,
String[] blockIds,
BlockFetchingListener listener) {
BlockFetchingListener listener,
TransportConf transportConf,
File[] shuffleFiles) {
this.client = client;
this.openMessage = new OpenBlocks(appId, execId, blockIds);
this.blockIds = blockIds;
this.listener = listener;
this.chunkCallback = new ChunkCallback();
this.transportConf = transportConf;
if (shuffleFiles != null) {
this.shuffleFiles = shuffleFiles;
assert this.shuffleFiles.length == blockIds.length:
"Number of shuffle files should equal to blocks";
}
}

/** Callback invoked on receipt of each chunk. We equate a single chunk to a single block. */
Expand Down Expand Up @@ -100,7 +119,12 @@ public void onSuccess(ByteBuffer response) {
// Immediately request all chunks -- we expect that the total size of the request is
// reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].
for (int i = 0; i < streamHandle.numChunks; i++) {
client.fetchChunk(streamHandle.streamId, i, chunkCallback);
if (shuffleFiles != null) {
client.stream(OneForOneStreamManager.genStreamChunkId(streamHandle.streamId, i),
new DownloadCallback(shuffleFiles[i], i));
} else {
client.fetchChunk(streamHandle.streamId, i, chunkCallback);
}
}
} catch (Exception e) {
logger.error("Failed while starting block fetches after success", e);
Expand All @@ -126,4 +150,38 @@ private void failRemainingBlocks(String[] failedBlockIds, Throwable e) {
}
}
}

private class DownloadCallback implements StreamCallback {

private WritableByteChannel channel = null;
private File targetFile = null;
private int chunkIndex;

public DownloadCallback(File targetFile, int chunkIndex) throws IOException {
this.targetFile = targetFile;
this.channel = Channels.newChannel(new FileOutputStream(targetFile));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work with RetryingBlockFetcher? Let's say we have 2 chunks: "chunk 1", "chunk 2". If "chunk 1" fails, it will fail "chunk 2" as well. However, DownloadCallbacks for "chunk 2" are still running. In this case, RetryingBlockFetcher will retry "chunk 2" as well. Hence, there will be 2 DownloadCallbacks writing to the same file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One possible fix is writing to a temp file and renaming it to the target file.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
I will make a pr today for this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing @cloud-fan
OneForOneBlockFetcher "open blocks" asynchronously. If I understand correctly, the retry of the start() in OneForOneBlockFetcher is only triggered when failure of sending OpenBlocks, but failure of fetching chunk cannot trigger the retry in RetryingBlockFetcher. DownloadCalback is not initialized when the failure of "open blocks" happens. So there cannot be two DownloadCallbacks for same stream working at the same time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinxing64 The retry logic is here:

The issue is there will be two DownloadCallbacks download the same content to the same target file. While the first one finishes, ShuffleBlockFetcherIterator may start to read it, however, the second DownloadCallback may be still running and writing to the target file. It could cause ShuffleBlockFetcherIterator reading a partial result.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon, I could hardly believe there are two DownloadCallbacks download the same content to the same target file. In my understanding:

  1. When RetryingBlockFetcher retry, there is no DownloadCallback initialized;
  2. When failure of fetching chunk, retry from RetryingBlockFetcher will not be triggered.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing
Sorry, I just realized this issue. There can be conflict between two DownloadCallbacks. I will figure out a way to resolve this.

this.chunkIndex = chunkIndex;
}

@Override
public void onData(String streamId, ByteBuffer buf) throws IOException {
channel.write(buf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an impl detail (since channel is a FOS), this will work - but in general, channel.write() need not write buf.remain(); which actually breaks spark code iirc - since it expects odData to completely consume the data.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, @mridulm , I'm still not quite sure how to refine this. Could you please give more details?

Copy link
Contributor

@mridulm mridulm May 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just noting it for review :-) in case someone else goes through the same concern.
We are relying on impl detail of what the channel is here to avoid checking if buf.remaining == 0 after channel.write returns.
StreamCallback expects it to be empty after onData is done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am super-late on reviewing this, apologies, just asking questions for my own understanding, and to consider possible future improvements -- this won't do a zero-copy transfer, will it? That ByteBuffer is still in user space?

From my understanding, we'd need to do special handling to use netty's spliceTo when possible:
https://stackoverflow.com/questions/30322957/is-there-transferfrom-like-functionality-in-netty-for-zero-copy

but I'm still working on putting all the pieces together here and admittedly this is out of my area of expertise

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito This is a Java Channel. Not sure how to call io.netty.channel.epoll.AbstractEpollStreamChannel.spliceTo here.

By the way, I think this is a zero-copy transfer since the underlying buffer is an off heap buffer.

Anyway, I found a bug here...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, I realize there isn't a simple one-line change here to switch to using spliceTo, I was wondering what the behavior is.

I actually thought zero-copy and offheap were orthogonal -- anytime netty gives you direct access to bytes, it has to be copied to user space, right?

Copy link
Member

@zsxwing zsxwing Jan 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito You are right. It needs a copy between user space and kernel space.

}

@Override
public void onComplete(String streamId) throws IOException {
channel.close();
ManagedBuffer buffer = new FileSegmentManagedBuffer(transportConf, targetFile, 0,
targetFile.length());
listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer);
}

@Override
public void onFailure(String streamId, Throwable cause) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we remove the partial written file when failing?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that will be good !

channel.close();
// On receipt of a failure, fail every block from chunkIndex onwards.
String[] remainingBlockIds = Arrays.copyOfRange(blockIds, chunkIndex, blockIds.length);
failRemainingBlocks(remainingBlockIds, cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.network.shuffle;

import java.io.Closeable;
import java.io.File;

/** Provides an interface for reading shuffle files, either from an Executor or external service. */
public abstract class ShuffleClient implements Closeable {
Expand All @@ -40,5 +41,6 @@ public abstract void fetchBlocks(
int port,
String execId,
String[] blockIds,
BlockFetchingListener listener);
BlockFetchingListener listener,
File[] shuffleFiles);
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void onBlockFetchFailure(String blockId, Throwable t) {

String[] blockIds = { "shuffle_2_3_4", "shuffle_6_7_8" };
OneForOneBlockFetcher fetcher =
new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener);
new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener, conf, null);
fetcher.start();
blockFetchLatch.await();
checkSecurityException(exception.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public void onBlockFetchFailure(String blockId, Throwable exception) {
}
}
}
});
}, null);

if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) {
fail("Timeout getting response from the server");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,13 @@
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.OpenBlocks;
import org.apache.spark.network.shuffle.protocol.StreamHandle;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;

public class OneForOneBlockFetcherSuite {

private static final TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY);

@Test
public void testFetchOne() {
LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
Expand Down Expand Up @@ -126,7 +131,7 @@ private static BlockFetchingListener fetchBlocks(LinkedHashMap<String, ManagedBu
BlockFetchingListener listener = mock(BlockFetchingListener.class);
String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
OneForOneBlockFetcher fetcher =
new OneForOneBlockFetcher(client, "app-id", "exec-id", blockIds, listener);
new OneForOneBlockFetcher(client, "app-id", "exec-id", blockIds, listener, conf, null);

// Respond to the "OpenBlocks" message with an appropriate ShuffleStreamHandle with streamId 123
doAnswer(invocationOnMock -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,10 @@ package object config {
.bytesConf(ByteUnit.BYTE)
.createWithDefault(100 * 1024 * 1024)

private[spark] val REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM =
ConfigBuilder("spark.reducer.maxReqSizeShuffleToMem")
.doc("The blocks of a shuffle request will be fetched to disk when size of the request is " +
"above this threshold. This is to avoid a giant request takes too much memory.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("200m")
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.network

import java.io.Closeable
import java.io.{Closeable, File}
import java.nio.ByteBuffer

import scala.concurrent.{Future, Promise}
Expand Down Expand Up @@ -67,7 +67,8 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
port: Int,
execId: String,
blockIds: Array[String],
listener: BlockFetchingListener): Unit
listener: BlockFetchingListener,
shuffleFiles: Array[File]): Unit

/**
* Upload a single block to a remote node, available only after [[init]] is invoked.
Expand Down Expand Up @@ -100,7 +101,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
ret.flip()
result.success(new NioManagedBuffer(ret))
}
})
}, shuffleFiles = null)
ThreadUtils.awaitResult(result.future, Duration.Inf)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.network.netty

import java.io.File
import java.nio.ByteBuffer

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -88,13 +89,15 @@ private[spark] class NettyBlockTransferService(
port: Int,
execId: String,
blockIds: Array[String],
listener: BlockFetchingListener): Unit = {
listener: BlockFetchingListener,
shuffleFiles: Array[File]): Unit = {
logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
try {
val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
val client = clientFactory.createClient(host, port)
new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener).start()
new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener,
transportConf, shuffleFiles).start()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.shuffle

import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.serializer.SerializerManager
import org.apache.spark.storage.{BlockManager, ShuffleBlockFetcherIterator}
import org.apache.spark.util.CompletionIterator
Expand Down Expand Up @@ -51,6 +51,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
SparkEnv.get.conf.get(config.REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM),
SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))

val serializerInstance = dep.serializer.newInstance()
Expand Down
Loading