Skip to content

Commit

Permalink
Testclusters: convert ccr tests (#42313)
Browse files Browse the repository at this point in the history
  • Loading branch information
alpar-t authored May 29, 2019
1 parent 665b656 commit 5e0a162
Show file tree
Hide file tree
Showing 14 changed files with 282 additions and 214 deletions.
2 changes: 1 addition & 1 deletion buildSrc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ if (project != rootProject) {
if (isLuceneSnapshot) {
systemProperty 'test.lucene-snapshot-revision', isLuceneSnapshot[0][1]
}
maxParallelForks System.getProperty('tests.jvms', project.rootProject.ext.defaultParallel.toString()) as Integer
maxParallelForks System.getProperty('tI', project.rootProject.ext.defaultParallel.toString()) as Integer
}
check.dependsOn(integTest)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ class RestIntegTestTask extends DefaultTask {

// disable the build cache for rest test tasks
// there are a number of inputs we aren't properly tracking here so we'll just not cache these for now
runner.outputs.doNotCacheIf('Caching is disabled for REST integration tests') { true }
runner.getOutputs().doNotCacheIf(
"Caching is disabled for REST integration tests",
{ false }
);

// override/add more for rest tests
runner.maxParallelForks = 1
Expand Down Expand Up @@ -285,4 +288,5 @@ class RestIntegTestTask extends DefaultTask {
}
return copyRestSpec
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,16 @@ public void environment(String key, Supplier<CharSequence> valueSupplier) {
nodes.all(each -> each.environment(key, valueSupplier));
}

@Override
public void jvmArgs(String... values) {
nodes.all(each -> each.jvmArgs(values));
}

@Override
public void jvmArgs(Supplier<String[]> valueSupplier) {
nodes.all(each -> each.jvmArgs(valueSupplier));
}

@Override
public void freeze() {
nodes.forEach(ElasticsearchNode::freeze);
Expand Down Expand Up @@ -216,6 +226,11 @@ public void start() {
}
}

@Override
public void restart() {
nodes.forEach(ElasticsearchNode::restart);
}

@Override
public void extraConfigFile(String destination, File from) {
nodes.all(node -> node.extraConfigFile(destination, from));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -86,6 +87,7 @@ public class ElasticsearchNode implements TestClusterConfiguration {
private final Map<String, FileSupplier> keystoreFiles = new LinkedHashMap<>();
private final Map<String, Supplier<CharSequence>> systemProperties = new LinkedHashMap<>();
private final Map<String, Supplier<CharSequence>> environment = new LinkedHashMap<>();
private final List<Supplier<List<CharSequence>>> jvmArgs = new ArrayList<>();
private final Map<String, File> extraConfigFiles = new HashMap<>();
final LinkedHashMap<String, String> defaultConfig = new LinkedHashMap<>();
private final List<Map<String, String>> credentials = new ArrayList<>();
Expand All @@ -105,6 +107,7 @@ public class ElasticsearchNode implements TestClusterConfiguration {
private File javaHome;
private volatile Process esProcess;
private Function<String, String> nameCustomization = Function.identity();
private boolean isWorkingDirConfigured = false;

ElasticsearchNode(String path, String name, GradleServicesAdapter services, File artifactsExtractDir, File workingDirBase) {
this.path = path;
Expand Down Expand Up @@ -220,6 +223,19 @@ public void environment(String key, Supplier<CharSequence> valueSupplier) {
addSupplier("Environment variable", environment, key, valueSupplier);
}


public void jvmArgs(String... values) {
for (String value : values) {
requireNonNull(value, "jvm argument was null when configuring test cluster `" + this + "`");
}
jvmArgs.add(() -> Arrays.asList(values));
}

public void jvmArgs(Supplier<String[]> valueSupplier) {
requireNonNull(valueSupplier, "jvm argument supplier was null when configuring test cluster `" + this + "`");
jvmArgs.add(() -> Arrays.asList(valueSupplier.get()));
}

private void addSupplier(String name, Map<String, Supplier<CharSequence>> collector, String key, Supplier<CharSequence> valueSupplier) {
requireNonNull(key, name + " key was null when configuring test cluster `" + this + "`");
requireNonNull(valueSupplier, name + " value supplier was null when configuring test cluster `" + this + "`");
Expand All @@ -231,10 +247,13 @@ private void addSupplier(String name, Map<String, Supplier<CharSequence>> collec
addSupplier(name, collector, key, () -> actualValue);
}

private void checkSuppliers(String name, Map<String, Supplier<CharSequence>> collector) {
collector.forEach((key, value) -> {
requireNonNull(value.get().toString(), name + " supplied value was null when configuring test cluster `" + this + "`");
});
private void checkSuppliers(String name, Collection<Supplier<CharSequence>> collector) {
collector.forEach(suplier ->
requireNonNull(
suplier.get().toString(),
name + " supplied value was null when configuring test cluster `" + this + "`"
)
);
}

public Path getConfigDir() {
Expand Down Expand Up @@ -289,7 +308,11 @@ public synchronized void start() {
}

try {
createWorkingDir(distroArtifact);
if (isWorkingDirConfigured == false) {
// Only configure working dir once so we don't loose data on restarts
isWorkingDirConfigured = true;
createWorkingDir(distroArtifact);
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to create working directory for " + this, e);
}
Expand All @@ -303,7 +326,7 @@ public synchronized void start() {
if (keystoreSettings.isEmpty() == false || keystoreFiles.isEmpty() == false) {
runElaticsearchBinScript("elasticsearch-keystore", "create");

checkSuppliers("Keystore", keystoreSettings);
checkSuppliers("Keystore", keystoreSettings.values());
keystoreSettings.forEach((key, value) ->
runElaticsearchBinScriptWithInput(value.get().toString(), "elasticsearch-keystore", "add", "-x", key)
);
Expand Down Expand Up @@ -337,6 +360,20 @@ public synchronized void start() {
startElasticsearchProcess();
}

@Override
public void restart() {
LOGGER.info("Restarting {}", this);
stop(false);
try {
Files.delete(httpPortsFile);
Files.delete(transportPortFile);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

start();
}

private boolean isSettingMissingOrTrue(String name) {
return Boolean.valueOf(settings.getOrDefault(name, () -> "false").get().toString());
}
Expand All @@ -349,7 +386,7 @@ private void copyExtraConfigFiles() {
}
Path dst = configFile.getParent().resolve(destination);
try {
Files.createDirectories(dst);
Files.createDirectories(dst.getParent());
Files.copy(from.toPath(), dst, StandardCopyOption.REPLACE_EXISTING);
LOGGER.info("Added extra config file {} for {}", destination, this);
} catch (IOException e) {
Expand Down Expand Up @@ -453,12 +490,30 @@ private Map<String, String> getESEnvironment() {
defaultEnv.put("ES_PATH_CONF", configFile.getParent().toString());
String systemPropertiesString = "";
if (systemProperties.isEmpty() == false) {
checkSuppliers("Java System property", systemProperties);
checkSuppliers("Java System property", systemProperties.values());
systemPropertiesString = " " + systemProperties.entrySet().stream()
.map(entry -> "-D" + entry.getKey() + "=" + entry.getValue().get())
.collect(Collectors.joining(" "));
}
defaultEnv.put("ES_JAVA_OPTS", "-Xms512m -Xmx512m -ea -esa" + systemPropertiesString);
String jvmArgsString = "";
if (jvmArgs.isEmpty() == false) {
jvmArgsString = " " + jvmArgs.stream()
.map(Supplier::get)
.peek(charSequences -> requireNonNull(charSequences, "Jvm argument supplier returned null while configuring " + this))
.flatMap(Collection::stream)
.peek(argument -> {
requireNonNull(argument, "Jvm argument supplier returned null while configuring " + this);
if (argument.toString().startsWith("-D")) {
throw new TestClustersException("Invalid jvm argument `" + argument +
"` configure as systemProperty instead for " + this
);
}
})
.collect(Collectors.joining(" "));
}
defaultEnv.put("ES_JAVA_OPTS", "-Xms512m -Xmx512m -ea -esa" +
systemPropertiesString + jvmArgsString
);
defaultEnv.put("ES_TMPDIR", tmpDir.toString());
// Windows requires this as it defaults to `c:\windows` despite ES_TMPDIR
defaultEnv.put("TMP", tmpDir.toString());
Expand All @@ -471,7 +526,7 @@ private Map<String, String> getESEnvironment() {
);
}

checkSuppliers("Environment variable", environment);
checkSuppliers("Environment variable", environment.values());
environment.forEach((key, value) -> defaultEnv.put(key, value.get().toString()));
return defaultEnv;
}
Expand Down Expand Up @@ -520,6 +575,10 @@ public List<String> getAllTransportPortURI() {
return getTransportPortInternal();
}

public File getServerLog() {
return confPathLogs.resolve(safeName(getName()).replaceAll("-[0-9]+$", "") + "_server.json").toFile();
}

@Override
public synchronized void stop(boolean tailLogs) {
if (esProcess == null && tailLogs) {
Expand Down Expand Up @@ -693,7 +752,7 @@ private void createConfiguration() {
// Don't wait for state, just start up quickly. This will also allow new and old nodes in the BWC case to become the master
defaultConfig.put("discovery.initial_state_timeout", "0s");

checkSuppliers("Settings", settings);
checkSuppliers("Settings", settings.values());
Map<String, String> userConfig = settings.entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue().get().toString()));
HashSet<String> overriden = new HashSet<>(defaultConfig.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,18 @@ public interface TestClusterConfiguration {

void environment(String key, Supplier<CharSequence> valueSupplier);

void jvmArgs(String... values);

void jvmArgs(Supplier<String[]> valueSupplier);

void freeze();

void setJavaHome(File javaHome);

void start();

void restart();

void extraConfigFile(String destination, File from);

void user(Map<String, String> userSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ public class TestClustersPlugin implements Plugin<Project> {
private static final TimeUnit EXECUTOR_SHUTDOWN_TIMEOUT_UNIT = TimeUnit.MINUTES;

private static final Logger logger = Logging.getLogger(TestClustersPlugin.class);
private static final String TESTCLUSTERS_INSPECT_FAILURE = "testclusters.inspect.failure";

private final Map<Task, List<ElasticsearchCluster>> usedClusters = new HashMap<>();
private final Map<ElasticsearchCluster, Integer> claimsInventory = new HashMap<>();
private final Set<ElasticsearchCluster> runningClusters =new HashSet<>();
private final Thread shutdownHook = new Thread(this::shutDownAllClusters);
private final Boolean allowClusterToSurvive = Boolean.valueOf(System.getProperty(TESTCLUSTERS_INSPECT_FAILURE, "false"));
private ExecutorService executorService = Executors.newSingleThreadExecutor();

public static String getHelperConfigurationName(String version) {
Expand Down Expand Up @@ -195,7 +197,7 @@ private void configureStartClustersHook(Project project) {
public void beforeActions(Task task) {
// we only start the cluster before the actions, so we'll not start it if the task is up-to-date
usedClusters.getOrDefault(task, Collections.emptyList()).stream()
.filter(each -> runningClusters.contains(each) == false)
.filter(cluster -> runningClusters.contains(cluster) == false)
.forEach(elasticsearchCluster -> {
elasticsearchCluster.start();
runningClusters.add(elasticsearchCluster);
Expand All @@ -221,18 +223,18 @@ public void afterExecute(Task task, TaskState state) {
if (state.getFailure() != null) {
// If the task fails, and other tasks use this cluster, the other task will likely never be
// executed at all, so we will never get to un-claim and terminate it.
clustersUsedByTask.forEach(each -> each.stop(true));
clustersUsedByTask.forEach(cluster -> stopCluster(cluster, true));
} else {
clustersUsedByTask.forEach(
each -> claimsInventory.put(each, claimsInventory.getOrDefault(each, 0) - 1)
cluster -> claimsInventory.put(cluster, claimsInventory.getOrDefault(cluster, 0) - 1)
);
claimsInventory.entrySet().stream()
.filter(entry -> entry.getValue() == 0)
.filter(entry -> runningClusters.contains(entry.getKey()))
.map(Map.Entry::getKey)
.forEach(each -> {
each.stop(false);
runningClusters.remove(each);
.forEach(cluster -> {
stopCluster(cluster, false);
runningClusters.remove(cluster);
});
}
}
Expand All @@ -242,6 +244,28 @@ public void beforeExecute(Task task) {}
);
}

private void stopCluster(ElasticsearchCluster cluster, boolean taskFailed) {
if (allowClusterToSurvive) {
logger.info("Not stopping clusters, disabled by property");
if (taskFailed) {
// task failed or this is the last one to stop
for (int i=1 ; ; i += i) {
logger.lifecycle(
"No more test clusters left to run, going to sleep because {} was set," +
" interrupt (^C) to stop clusters.", TESTCLUSTERS_INSPECT_FAILURE
);
try {
Thread.sleep(1000 * i);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
}
cluster.stop(taskFailed);
}

/**
* Boilerplate to get testClusters container extension
*
Expand Down Expand Up @@ -428,13 +452,16 @@ private void shutdownExecutorService() {

private void shutDownAllClusters() {
synchronized (runningClusters) {
if (runningClusters.isEmpty()) {
return;
}
Iterator<ElasticsearchCluster> iterator = runningClusters.iterator();
while (iterator.hasNext()) {
ElasticsearchCluster next = iterator.next();
iterator.remove();
iterator.next().stop(true);
next.stop(false);
}
}
}


}
1 change: 1 addition & 0 deletions x-pack/plugin/ccr/qa/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ subprojects {
include 'rest-api-spec/api/**'
}
}

}
Loading

0 comments on commit 5e0a162

Please sign in to comment.