Skip to content

Commit

Permalink
[SPARK-31179] Fast fail the connection while last connection failed i…
Browse files Browse the repository at this point in the history
…n fast fail time window

Squashed commit of the following:

commit aeedd82
Author: hustfeiwang <[email protected]>
Date:   Tue May 21 13:45:42 2019 +0800

    [SPARK-27637][SHUFFLE] For nettyBlockTransferService, if IOException occurred while fetching data, check whether relative executor is alive before retry

    There are several kinds of shuffle client, blockTransferService and externalShuffleClient.

    For the externalShuffleClient,  there are relative external shuffle service, which guarantees  the shuffle block data and regardless the  state of executors.

    For the blockTransferService, it is used to fetch broadcast block, and fetch the shuffle data when external shuffle service is not enabled.

    When fetching data by using blockTransferService, the shuffle client would connect relative executor's blockManager, so if the relative executor is dead, it would never fetch successfully.

    When spark.shuffle.service.enabled is true and spark.dynamicAllocation.enabled is true,  the executor will be removed while it has been idle  for more than idleTimeout.

    If a blockTransferService create connection to relative executor successfully, but the relative executor is removed when beginning to fetch broadcast block, it would retry (see RetryingBlockFetcher), which is Ineffective.

    If the spark.shuffle.io.retryWait and spark.shuffle.io.maxRetries is big,  such as 30s and 10 times, it would waste 5 minutes.

    In this PR, we check whether relative executor is alive before retry.
    Unit test.

    Closes apache#24533 from turboFei/SPARK-27637.

    Authored-by: hustfeiwang <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
hustfeiwang authored and pan3793 committed Feb 9, 2022
1 parent 28c7c16 commit f3b7ebc
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@ public class TransportClientFactory implements Closeable {
private static class ClientPool {
TransportClient[] clients;
Object[] locks;
volatile long lastConnectionFailed;

ClientPool(int size) {
clients = new TransportClient[size];
locks = new Object[size];
for (int i = 0; i < size; i++) {
locks[i] = new Object();
}
lastConnectionFailed = 0;
}
}

Expand All @@ -86,6 +88,7 @@ private static class ClientPool {
private EventLoopGroup workerGroup;
private PooledByteBufAllocator pooledAllocator;
private final NettyMemoryMetrics metrics;
private final int fastFailTimeWindow;

public TransportClientFactory(
TransportContext context,
Expand All @@ -107,6 +110,7 @@ public TransportClientFactory(
conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
this.metrics = new NettyMemoryMetrics(
this.pooledAllocator, conf.getModuleName() + "-client", conf);
fastFailTimeWindow = (int)(conf.ioRetryWaitTimeMs() * 0.95);
}

public MetricSet getAllMetrics() {
Expand All @@ -116,18 +120,27 @@ public MetricSet getAllMetrics() {
/**
* Create a {@link TransportClient} connecting to the given remote host / port.
*
* We maintains an array of clients (size determined by spark.shuffle.io.numConnectionsPerPeer)
* We maintain an array of clients (size determined by spark.shuffle.io.numConnectionsPerPeer)
* and randomly picks one to use. If no client was previously created in the randomly selected
* spot, this function creates a new client and places it there.
*
* If the fastFail parameter is true, fail immediately when the last attempt to the same address
* failed within the fast fail time window (95 percent of the io wait retry timeout). The
* assumption is the caller will handle retrying.
*
* Prior to the creation of a new TransportClient, we will execute all
* {@link TransportClientBootstrap}s that are registered with this factory.
*
* This blocks until a connection is successfully established and fully bootstrapped.
*
* Concurrency: This method is safe to call from multiple threads.
*
* @param remoteHost remote address host
* @param remotePort remote address port
* @param fastFail whether this call should fail immediately when the last attempt to the same
* address failed with in the last fast fail time window.
*/
public TransportClient createClient(String remoteHost, int remotePort)
public TransportClient createClient(String remoteHost, int remotePort, boolean fastFail)
throws IOException, InterruptedException {
// Get connection from the connection pool first.
// If it is not found or not active, create a new one.
Expand Down Expand Up @@ -184,11 +197,30 @@ public TransportClient createClient(String remoteHost, int remotePort)
logger.info("Found inactive connection to {}, creating a new one.", resolvedAddress);
}
}
clientPool.clients[clientIndex] = createClient(resolvedAddress);
// If this connection should fast fail when last connection failed in last fast fail time
// window and it did, fail this connection directly.
if (fastFail && System.currentTimeMillis() - clientPool.lastConnectionFailed <
fastFailTimeWindow) {
throw new IOException(
String.format("Connecting to %s failed in the last %s ms, fail this connection directly",
resolvedAddress, fastFailTimeWindow));
}
try {
clientPool.clients[clientIndex] = createClient(resolvedAddress);
clientPool.lastConnectionFailed = 0;
} catch (IOException e) {
clientPool.lastConnectionFailed = System.currentTimeMillis();
throw e;
}
return clientPool.clients[clientIndex];
}
}

public TransportClient createClient(String remoteHost, int remotePort)
throws IOException, InterruptedException {
return createClient(remoteHost, remotePort, false);
}

/**
* Create a completely new {@link TransportClient} to the given remote host / port.
* This connection is not pooled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.*;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
Expand All @@ -44,6 +41,7 @@
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;
import org.junit.rules.ExpectedException;

public class TransportClientFactorySuite {
private TransportConf conf;
Expand Down Expand Up @@ -215,4 +213,24 @@ public Iterable<Map.Entry<String, String>> getAll() {
assertFalse(c1.isActive());
}
}

@Rule
public ExpectedException expectedException = ExpectedException.none();

@Test
public void fastFailConnectionInTimeWindow() throws IOException, InterruptedException {
TransportClientFactory factory = context.createClientFactory();
TransportServer server = context.createServer();
int unreachablePort = server.getPort();
server.close();
try {
factory.createClient(TestUtils.getLocalHost(), unreachablePort, true);
} catch (Exception e) {
assert(e instanceof IOException);
}
expectedException.expect(IOException.class);
expectedException.expectMessage("fail this connection directly");
factory.createClient(TestUtils.getLocalHost(), unreachablePort, true);
expectedException = ExpectedException.none();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ public void fetchBlocks(
checkInit();
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
try {
int maxRetries = conf.maxIORetries();
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
(blockIds1, listener1) -> {
TransportClient client = clientFactory.createClient(host, port);
TransportClient client = clientFactory.createClient(host, port, maxRetries > 0);
new OneForOneBlockFetcher(client, appId, execId,
blockIds1, listener1, conf, downloadFileManager).start();
};

int maxRetries = conf.maxIORetries();
if (maxRetries > 0) {
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
// a bug in this code. We should remove the if statement once we're sure of the stability.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ private[spark] class NettyBlockTransferService(
tempFileManager: DownloadFileManager): Unit = {
logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
try {
val maxRetries = transportConf.maxIORetries()
val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
val client = clientFactory.createClient(host, port)
Expand All @@ -116,7 +117,6 @@ private[spark] class NettyBlockTransferService(
}
}

val maxRetries = transportConf.maxIORetries()
if (maxRetries > 0) {
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
// a bug in this code. We should remove the if statement once we're sure of the stability.
Expand Down
17 changes: 16 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,21 @@
<CodeCacheSize>512m</CodeCacheSize>
</properties>
<repositories>
<repository>
<id>gcs-maven-central-mirror</id>
<!--
Google Mirror of Maven Central, placed first so that it's used instead of flaky Maven Central.
See https://storage-download.googleapis.com/maven-central/index.html
-->
<name>GCS Maven Central mirror</name>
<url>https://maven-central.storage-download.googleapis.com/maven2/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>central</id>
<!-- This should be at top, it makes maven try the central repo first and then others and hence faster dep resolution -->
Expand Down Expand Up @@ -2047,7 +2062,7 @@
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<!-- 3.3.1 won't work with zinc; fails to find javac from java.home -->
<version>3.2.2</version>
<version>4.3.0</version>
<executions>
<execution>
<id>eclipse-add-source</id>
Expand Down

0 comments on commit f3b7ebc

Please sign in to comment.