Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock when ryuk does not acknowledge filters #843

Merged
merged 6 commits into from
Sep 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 56 additions & 20 deletions core/src/main/java/org/testcontainers/utility/ResourceReaper.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.github.dockerjava.api.model.Network;
import com.github.dockerjava.api.model.Ports;
import com.github.dockerjava.api.model.Volume;
import com.google.common.annotations.VisibleForTesting;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.utils.URLEncodedUtils;
Expand All @@ -23,6 +24,7 @@

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
Expand Down Expand Up @@ -117,8 +119,7 @@ public static String start(String hostIpAddress, DockerClient client, boolean wi
while (true) {
int index = 0;
try(Socket clientSocket = new Socket(hostIpAddress, ryukPort)) {
OutputStream out = clientSocket.getOutputStream();
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
FilterRegistry registry = new FilterRegistry(clientSocket.getInputStream(), clientSocket.getOutputStream());

synchronized (DEATH_NOTE) {
while (true) {
Expand All @@ -131,24 +132,14 @@ public static String start(String hostIpAddress, DockerClient client, boolean wi
}
}
List<Map.Entry<String, String>> filters = DEATH_NOTE.get(index);

String query = URLEncodedUtils.format(
filters.stream()
.map(it -> new BasicNameValuePair(it.getKey(), it.getValue()))
.collect(Collectors.toList()),
(String) null
);

log.debug("Sending '{}' to Ryuk", query);
out.write(query.getBytes());
out.write('\n');
out.flush();

while (!"ACK".equalsIgnoreCase(in.readLine())) {
boolean isAcknowledged = registry.register(filters);
if (isAcknowledged) {
log.debug("Received 'ACK' from Ryuk");
ryukScheduledLatch.countDown();
index++;
} else {
log.debug("Didn't receive 'ACK' from Ryuk. Will retry to send filters.");
}

ryukScheduledLatch.countDown();
index++;
}
}
} catch (IOException e) {
Expand Down Expand Up @@ -179,7 +170,6 @@ public synchronized static ResourceReaper instance() {

/**
* Perform a cleanup.
*
*/
public synchronized void performCleanup() {
registeredContainers.forEach(this::stopContainer);
Expand Down Expand Up @@ -362,4 +352,50 @@ private void setHook() {
Runtime.getRuntime().addShutdownHook(new Thread(DockerClientFactory.TESTCONTAINERS_THREAD_GROUP, this::performCleanup));
}
}

static class FilterRegistry {

@VisibleForTesting
static final String ACKNOWLEDGMENT = "ACK";

private final BufferedReader in;
private final OutputStream out;

FilterRegistry(InputStream ryukInputStream, OutputStream ryukOutputStream) {
this.in = new BufferedReader(new InputStreamReader(ryukInputStream));
this.out = ryukOutputStream;
}

/**
* Registers the given filters with Ryuk
*
* @param filters the filter to register
* @return true if the filters have been registered successfuly, false otherwise
* @throws IOException if communication with Ryuk fails
*/
protected boolean register(List<Map.Entry<String, String>> filters) throws IOException {
String query = URLEncodedUtils.format(
filters.stream()
.map(it -> new BasicNameValuePair(it.getKey(), it.getValue()))
.collect(Collectors.toList()),
(String) null
);

log.debug("Sending '{}' to Ryuk", query);
out.write(query.getBytes());
out.write('\n');
out.flush();

return waitForAcknowledgment(in);
}

private static boolean waitForAcknowledgment(BufferedReader in) throws IOException {
String line = in.readLine();
while (line != null && !ACKNOWLEDGMENT.equalsIgnoreCase(line)) {
line = in.readLine();
}
return ACKNOWLEDGMENT.equalsIgnoreCase(line);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.testcontainers.utility;

import org.junit.Test;
import org.testcontainers.utility.ResourceReaper.FilterRegistry;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.AbstractMap.SimpleEntry;
import java.util.List;
import java.util.Map.Entry;

import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class FilterRegistryTest {

private static final List<Entry<String, String>> FILTERS = asList(
new SimpleEntry<>("key1!", "value2?"), new SimpleEntry<>("key2#", "value2%")
);
private static final String URL_ENCODED_FILTERS = "key1%21=value2%3F&key2%23=value2%25";
private static final byte[] ACKNOWLEDGEMENT = FilterRegistry.ACKNOWLEDGMENT.getBytes();
private static final byte[] NO_ACKNOWLEDGEMENT = "".getBytes();
private static final String NEW_LINE = "\n";

@Test
public void registerReturnsTrueIfAcknowledgementIsReadFromInputStream() throws IOException {
FilterRegistry registry = new FilterRegistry(inputStream(ACKNOWLEDGEMENT), anyOutputStream());

boolean successful = registry.register(FILTERS);

assertTrue(successful);
}

@Test
public void registerReturnsFalseIfNoAcknowledgementIsReadFromInputStream() throws IOException {
FilterRegistry registry = new FilterRegistry(inputStream(NO_ACKNOWLEDGEMENT), anyOutputStream());

boolean successful = registry.register(FILTERS);

assertFalse(successful);
}

@Test
public void registerWritesUrlEncodedFiltersAndNewlineToOutputStream() throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
FilterRegistry registry = new FilterRegistry(anyInputStream(), outputStream);

registry.register(FILTERS);

assertEquals(URL_ENCODED_FILTERS + NEW_LINE, new String(outputStream.toByteArray()));
}

private static InputStream inputStream(byte[] bytes) {
return new ByteArrayInputStream(bytes);
}

private static InputStream anyInputStream() {
return inputStream(ACKNOWLEDGEMENT);
}

private static OutputStream anyOutputStream() {
return new ByteArrayOutputStream();
}

}