Skip to content

Commit

Permalink
Perform port checks in parallel (#4463)
Browse files Browse the repository at this point in the history
The checks will now also incorporate Awaitility.
  • Loading branch information
bsideup authored Oct 13, 2021
1 parent b29716d commit 69cf13c
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,26 @@

import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public abstract class AbstractWaitStrategy implements WaitStrategy {

static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(new ThreadFactory() {

private final AtomicLong COUNTER = new AtomicLong(0);

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "testcontainers-wait-" + COUNTER.getAndIncrement());
thread.setDaemon(true);
return thread;
}
});

private static final RateLimiter DOCKER_CLIENT_RATE_LIMITER = RateLimiterBuilder
.newBuilder()
.withRate(1, TimeUnit.SECONDS)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
package org.testcontainers.containers.wait.strategy;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.rnorth.ducttape.TimeoutException;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.awaitility.Awaitility;
import org.testcontainers.containers.ContainerLaunchException;
import org.testcontainers.containers.wait.internal.ExternalPortListeningCheck;
import org.testcontainers.containers.wait.internal.InternalCommandPortListeningCheck;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

/**
Expand All @@ -22,6 +29,7 @@
public class HostPortWaitStrategy extends AbstractWaitStrategy {

@Override
@SneakyThrows(InterruptedException.class)
protected void waitUntilReady() {
final Set<Integer> externalLivenessCheckPorts = getLivenessCheckPorts();
if (externalLivenessCheckPorts.isEmpty()) {
Expand All @@ -31,7 +39,6 @@ protected void waitUntilReady() {
return;
}

@SuppressWarnings("unchecked")
List<Integer> exposedPorts = waitStrategyTarget.getExposedPorts();

final Set<Integer> internalPorts = getInternalPorts(externalLivenessCheckPorts, exposedPorts);
Expand All @@ -41,10 +48,43 @@ protected void waitUntilReady() {
Callable<Boolean> externalCheck = new ExternalPortListeningCheck(waitStrategyTarget, externalLivenessCheckPorts);

try {
Unreliables.retryUntilTrue((int) startupTimeout.getSeconds(), TimeUnit.SECONDS,
() -> getRateLimiter().getWhenReady(() -> internalCheck.call() && externalCheck.call()));
List<Future<Boolean>> futures = EXECUTOR.invokeAll(Arrays.asList(
// Blocking
() -> {
Instant now = Instant.now();
Boolean result = internalCheck.call();
log.debug(
"Internal port check {} for {} in {}",
Boolean.TRUE.equals(result) ? "passed" : "failed",
internalPorts,
Duration.between(now, Instant.now())
);
return result;
},
// Polling
() -> {
Instant now = Instant.now();
Awaitility.await()
.pollInSameThread()
.pollInterval(Duration.ofMillis(100))
.pollDelay(Duration.ZERO)
.forever()
.until(externalCheck);

} catch (TimeoutException e) {
log.debug(
"External port check passed for {} mapped as {} in {}",
internalPorts,
externalLivenessCheckPorts,
Duration.between(now, Instant.now())
);
return true;
}
), startupTimeout.getSeconds(), TimeUnit.SECONDS);

for (Future<Boolean> future : futures) {
future.get(0, TimeUnit.SECONDS);
}
} catch (CancellationException | ExecutionException | TimeoutException e) {
throw new ContainerLaunchException("Timed out waiting for container port to open (" +
waitStrategyTarget.getHost() +
" ports: " +
Expand Down

0 comments on commit 69cf13c

Please sign in to comment.