diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 2c02abab9dc1d..0bd976420f07d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -111,7 +111,6 @@ import org.elasticsearch.transport.MockTransportClient; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; -import org.junit.Assert; import java.io.Closeable; import java.io.IOException; @@ -187,6 +186,16 @@ public final class InternalTestCluster extends TestCluster { private final Logger logger = LogManager.getLogger(getClass()); + private static final Predicate DATA_NODE_PREDICATE = + nodeAndClient -> DiscoveryNode.isDataNode(nodeAndClient.node.settings()); + + private static final Predicate NO_DATA_NO_MASTER_PREDICATE = nodeAndClient -> + DiscoveryNode.isMasterNode(nodeAndClient.node.settings()) == false + && DiscoveryNode.isDataNode(nodeAndClient.node.settings()) == false; + + private static final Predicate MASTER_NODE_PREDICATE = + nodeAndClient -> DiscoveryNode.isMasterNode(nodeAndClient.node.settings()); + public static final int DEFAULT_LOW_NUM_MASTER_NODES = 1; public static final int DEFAULT_HIGH_NUM_MASTER_NODES = 3; @@ -197,8 +206,10 @@ public final class InternalTestCluster extends TestCluster { static final int DEFAULT_MIN_NUM_CLIENT_NODES = 0; static final int DEFAULT_MAX_NUM_CLIENT_NODES = 1; - /* sorted map to make traverse order reproducible, concurrent since we do checks on it not within a sync block */ - private final NavigableMap nodes = new TreeMap<>(); + /* Sorted map to make traverse order reproducible. + * The map of nodes is never mutated so individual reads are safe without synchronization. + * Updates are intended to follow a copy-on-write approach. */ + private volatile NavigableMap nodes = Collections.emptyNavigableMap(); private final Set dataDirToClean = new HashSet<>(); @@ -208,7 +219,7 @@ public final class InternalTestCluster extends TestCluster { private final Settings defaultSettings; - private AtomicInteger nextNodeId = new AtomicInteger(0); + private final AtomicInteger nextNodeId = new AtomicInteger(0); /* Each shared node has a node seed that is used to start up the node and get default settings * this is important if a node is randomly shut down in a test since the next test relies on a @@ -240,7 +251,7 @@ public final class InternalTestCluster extends TestCluster { private final Path baseDir; private ServiceDisruptionScheme activeDisruptionScheme; - private Function clientWrapper; + private final Function clientWrapper; private int bootstrapMasterNodeIndex = -1; @@ -405,10 +416,6 @@ private static boolean usingZen1(Settings settings) { return ZEN_DISCOVERY_TYPE.equals(DISCOVERY_TYPE_SETTING.get(settings)); } - public int getBootstrapMasterNodeIndex() { - return bootstrapMasterNodeIndex; - } - /** * Sets {@link #bootstrapMasterNodeIndex} to the given value, see {@link #bootstrapMasterNodeWithSpecifiedIndex(List)} * for the description of how this field is used. @@ -460,7 +467,7 @@ public Collection> getPlugins() { return plugins; } - private Settings getRandomNodeSettings(long seed) { + private static Settings getRandomNodeSettings(long seed) { Random random = new Random(seed); Builder builder = Settings.builder(); builder.put(TransportSettings.TRANSPORT_COMPRESS.getKey(), rarely(random)); @@ -545,8 +552,8 @@ private void ensureOpen() { } } - private synchronized NodeAndClient getOrBuildRandomNode() { - ensureOpen(); + private NodeAndClient getOrBuildRandomNode() { + assert Thread.holdsLock(this); final NodeAndClient randomNodeAndClient = getRandomNodeAndClient(); if (randomNodeAndClient != null) { return randomNodeAndClient; @@ -566,11 +573,10 @@ private synchronized NodeAndClient getOrBuildRandomNode() { return buildNode; } - private synchronized NodeAndClient getRandomNodeAndClient() { + private NodeAndClient getRandomNodeAndClient() { return getRandomNodeAndClient(nc -> true); } - private synchronized NodeAndClient getRandomNodeAndClient(Predicate predicate) { ensureOpen(); List values = nodes.values().stream().filter(predicate).collect(Collectors.toList()); @@ -612,7 +618,7 @@ public synchronized void ensureAtMostNumDataNodes(int n) throws IOException { final Stream collection = n == 0 ? nodes.values().stream() : nodes.values().stream() - .filter(new DataNodePredicate().and(new NodeNamePredicate(getMasterName()).negate())); + .filter(DATA_NODE_PREDICATE.and(new NodeNamePredicate(getMasterName()).negate())); final Iterator values = collection.iterator(); logger.info("changing cluster size from {} data nodes to {}", size, n); @@ -676,20 +682,19 @@ private Settings getNodeSettings(final int nodeId, final long seed, final Settin * the method will return the existing one * @param onTransportServiceStarted callback to run when transport service is started */ - private NodeAndClient buildNode(int nodeId, Settings settings, + private synchronized NodeAndClient buildNode(int nodeId, Settings settings, boolean reuseExisting, Runnable onTransportServiceStarted) { assert Thread.holdsLock(this); ensureOpen(); Collection> plugins = getPlugins(); String name = settings.get("node.name"); - if (reuseExisting && nodes.containsKey(name)) { + final NodeAndClient nodeAndClient = nodes.get(name); + if (reuseExisting && nodeAndClient != null) { onTransportServiceStarted.run(); // reusing an existing node implies its transport service already started - return nodes.get(name); - } else { - assert reuseExisting == true || nodes.containsKey(name) == false : - "node name [" + name + "] already exists but not allowed to use it"; + return nodeAndClient; } + assert reuseExisting == true || nodeAndClient == null : "node name [" + name + "] already exists but not allowed to use it"; SecureSettings secureSettings = Settings.builder().put(settings).getSecureSettings(); if (secureSettings instanceof MockSecureSettings) { @@ -726,7 +731,7 @@ private String buildNodeName(int id, Settings settings) { /** * returns a suffix string based on the node role. If no explicit role is defined, the suffix will be empty */ - private String getRoleSuffix(Settings settings) { + private static String getRoleSuffix(Settings settings) { String suffix = ""; if (Node.NODE_MASTER_SETTING.exists(settings) && Node.NODE_MASTER_SETTING.get(settings)) { suffix = suffix + Role.MASTER.getAbbreviation(); @@ -753,37 +758,32 @@ public synchronized Client client() { * Returns a node client to a data node in the cluster. * Note: use this with care tests should not rely on a certain nodes client. */ - public synchronized Client dataNodeClient() { - ensureOpen(); + public Client dataNodeClient() { /* Randomly return a client to one of the nodes in the cluster */ - return getRandomNodeAndClient(new DataNodePredicate()).client(random); + return getRandomNodeAndClient(DATA_NODE_PREDICATE).client(random); } /** * Returns a node client to the current master node. * Note: use this with care tests should not rely on a certain nodes client. */ - public synchronized Client masterClient() { - ensureOpen(); + public Client masterClient() { NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new NodeNamePredicate(getMasterName())); if (randomNodeAndClient != null) { return randomNodeAndClient.nodeClient(); // ensure node client master is requested } - Assert.fail("No master client found"); - return null; // can't happen + throw new AssertionError("No master client found"); } /** * Returns a node client to random node but not the master. This method will fail if no non-master client is available. */ - public synchronized Client nonMasterClient() { - ensureOpen(); + public Client nonMasterClient() { NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new NodeNamePredicate(getMasterName()).negate()); if (randomNodeAndClient != null) { return randomNodeAndClient.nodeClient(); // ensure node client non-master is requested } - Assert.fail("No non-master client found"); - return null; // can't happen + throw new AssertionError("No non-master client found"); } /** @@ -791,14 +791,14 @@ public synchronized Client nonMasterClient() { */ public synchronized Client coordOnlyNodeClient() { ensureOpen(); - NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new NoDataNoMasterNodePredicate()); + NodeAndClient randomNodeAndClient = getRandomNodeAndClient(NO_DATA_NO_MASTER_PREDICATE); if (randomNodeAndClient != null) { return randomNodeAndClient.client(random); } int nodeId = nextNodeId.getAndIncrement(); Settings settings = getSettings(nodeId, random.nextLong(), Settings.EMPTY); startCoordinatingOnlyNode(settings); - return getRandomNodeAndClient(new NoDataNoMasterNodePredicate()).client(random); + return getRandomNodeAndClient(NO_DATA_NO_MASTER_PREDICATE).client(random); } public synchronized String startCoordinatingOnlyNode(Settings settings) { @@ -812,7 +812,6 @@ public synchronized String startCoordinatingOnlyNode(Settings settings) { * Returns a transport client */ public synchronized Client transportClient() { - ensureOpen(); // randomly return a transport client going to one of the nodes in the cluster return getOrBuildRandomNode().transportClient(); } @@ -820,27 +819,24 @@ public synchronized Client transportClient() { /** * Returns a node client to a given node. */ - public synchronized Client client(String nodeName) { - ensureOpen(); + public Client client(String nodeName) { NodeAndClient nodeAndClient = nodes.get(nodeName); if (nodeAndClient != null) { return nodeAndClient.client(random); } - Assert.fail("No node found with name: [" + nodeName + "]"); - return null; // can't happen + throw new AssertionError("No node found with name: [" + nodeName + "]"); } /** * Returns a "smart" node client to a random node in the cluster */ - public synchronized Client smartClient() { + public Client smartClient() { NodeAndClient randomNodeAndClient = getRandomNodeAndClient(); if (randomNodeAndClient != null) { return randomNodeAndClient.nodeClient(); } - Assert.fail("No smart client found"); - return null; // can't happen + throw new AssertionError("No smart client found"); } @Override @@ -853,13 +849,13 @@ public synchronized void close() throws IOException { try { IOUtils.close(nodes.values()); } finally { - nodes.clear(); + nodes = Collections.emptyNavigableMap(); executor.shutdownNow(); } } } - public static final int REMOVED_MINIMUM_MASTER_NODES = Integer.MAX_VALUE; + private static final int REMOVED_MINIMUM_MASTER_NODES = Integer.MAX_VALUE; private final class NodeAndClient implements Closeable { private MockNode node; @@ -875,7 +871,7 @@ private final class NodeAndClient implements Closeable { this.name = name; this.originalNodeSettings = originalNodeSettings; this.nodeAndClientId = nodeAndClientId; - markNodeDataDirsAsNotEligableForWipe(node); + markNodeDataDirsAsNotEligibleForWipe(node); } Node node() { @@ -898,9 +894,6 @@ public boolean isMasterEligible() { } Client client(Random random) { - if (closed.get()) { - throw new RuntimeException("already closed"); - } double nextDouble = random.nextDouble(); if (nextDouble < transportClientRatio) { if (logger.isTraceEnabled()) { @@ -927,22 +920,32 @@ Client transportClient() { } private Client getOrBuildNodeClient() { - if (nodeClient == null) { - nodeClient = node.client(); + synchronized (InternalTestCluster.this) { + if (closed.get()) { + throw new RuntimeException("already closed"); + } + if (nodeClient == null) { + nodeClient = node.client(); + } + return clientWrapper.apply(nodeClient); } - return clientWrapper.apply(nodeClient); } private Client getOrBuildTransportClient() { - if (transportClient == null) { - /* don't sniff client for now - doesn't work will all tests - * since it might throw NoNodeAvailableException if nodes are - * shut down. we first need support of transportClientRatio - * as annotations or so */ - transportClient = new TransportClientFactory(false, nodeConfigurationSource.transportClientSettings(), + synchronized (InternalTestCluster.this) { + if (closed.get()) { + throw new RuntimeException("already closed"); + } + if (transportClient == null) { + /* don't sniff client for now - doesn't work will all tests + * since it might throw NoNodeAvailableException if nodes are + * shut down. we first need support of transportClientRatio + * as annotations or so */ + transportClient = new TransportClientFactory(nodeConfigurationSource.transportClientSettings(), baseDir, nodeConfigurationSource.transportClientPlugins()).client(node, clusterName); + } + return clientWrapper.apply(transportClient); } - return clientWrapper.apply(transportClient); } void resetClient() { @@ -1034,11 +1037,12 @@ public void afterStart() { } }); closed.set(false); - markNodeDataDirsAsNotEligableForWipe(node); + markNodeDataDirsAsNotEligibleForWipe(node); } @Override public void close() throws IOException { + assert Thread.holdsLock(InternalTestCluster.this); try { resetClient(); } finally { @@ -1047,18 +1051,32 @@ public void close() throws IOException { node.close(); } } + + private void markNodeDataDirsAsPendingForWipe(Node node) { + assert Thread.holdsLock(InternalTestCluster.this); + NodeEnvironment nodeEnv = node.getNodeEnvironment(); + if (nodeEnv.hasNodeFile()) { + dataDirToClean.addAll(Arrays.asList(nodeEnv.nodeDataPaths())); + } + } + + private void markNodeDataDirsAsNotEligibleForWipe(Node node) { + assert Thread.holdsLock(InternalTestCluster.this); + NodeEnvironment nodeEnv = node.getNodeEnvironment(); + if (nodeEnv.hasNodeFile()) { + dataDirToClean.removeAll(Arrays.asList(nodeEnv.nodeDataPaths())); + } + } } public static final String TRANSPORT_CLIENT_PREFIX = "transport_client_"; - static class TransportClientFactory { - private final boolean sniff; + private static class TransportClientFactory { private final Settings settings; private final Path baseDir; private final Collection> plugins; - TransportClientFactory(boolean sniff, Settings settings, Path baseDir, Collection> plugins) { - this.sniff = sniff; + TransportClientFactory(Settings settings, Path baseDir, Collection> plugins) { this.settings = settings != null ? settings : Settings.EMPTY; this.baseDir = baseDir; this.plugins = plugins; @@ -1071,7 +1089,7 @@ public Client client(Node node, String clusterName) { .put("client.transport.nodes_sampler_interval", "1s") .put(Environment.PATH_HOME_SETTING.getKey(), baseDir) .put("node.name", TRANSPORT_CLIENT_PREFIX + node.settings().get("node.name")) - .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName).put("client.transport.sniff", sniff) + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName).put("client.transport.sniff", false) .put("logger.prefix", nodeSettings.get("logger.prefix", "")) .put("logger.level", nodeSettings.get("logger.level", "INFO")) .put(settings); @@ -1120,8 +1138,7 @@ private synchronized void reset(boolean wipeData) throws IOException { // trash all nodes with id >= sharedNodesSeeds.length - they are non shared final List toClose = new ArrayList<>(); - for (Iterator iterator = nodes.values().iterator(); iterator.hasNext();) { - NodeAndClient nodeAndClient = iterator.next(); + for (NodeAndClient nodeAndClient : nodes.values()) { if (nodeAndClient.nodeAndClientId() >= sharedNodesSeeds.length) { logger.debug("Close Node [{}] not shared", nodeAndClient.name); toClose.add(nodeAndClient); @@ -1213,7 +1230,7 @@ public synchronized void validateClusterFormed() { } /** ensure a cluster is formed with all published nodes, but do so by using the client of the specified node */ - public synchronized void validateClusterFormed(String viaNode) { + private synchronized void validateClusterFormed(String viaNode) { Set expectedNodes = new HashSet<>(); for (NodeAndClient nodeAndClient : nodes.values()) { expectedNodes.add(getInstanceFromNode(ClusterService.class, nodeAndClient.node()).localNode()); @@ -1242,7 +1259,7 @@ public synchronized void validateClusterFormed(String viaNode) { } @Override - public synchronized void afterTest() throws IOException { + public synchronized void afterTest() { wipePendingDataDirectories(); randomlyResetClients(); /* reset all clients - each test gets its own client based on the Random instance created above. */ } @@ -1291,8 +1308,7 @@ private void assertSameSyncIdSameDocs() { private void assertNoPendingIndexOperations() throws Exception { assertBusy(() -> { - final Collection nodesAndClients = nodes.values(); - for (NodeAndClient nodeAndClient : nodesAndClients) { + for (NodeAndClient nodeAndClient : nodes.values()) { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { @@ -1300,7 +1316,7 @@ private void assertNoPendingIndexOperations() throws Exception { if (operations.size() > 0) { throw new AssertionError( "shard " + indexShard.shardId() + " on node [" + nodeAndClient.name + "] has pending operations:\n --> " + - operations.stream().collect(Collectors.joining("\n --> ")) + String.join("\n --> ", operations) ); } } @@ -1311,8 +1327,7 @@ private void assertNoPendingIndexOperations() throws Exception { private void assertOpenTranslogReferences() throws Exception { assertBusy(() -> { - final Collection nodesAndClients = nodes.values(); - for (NodeAndClient nodeAndClient : nodesAndClients) { + for (NodeAndClient nodeAndClient : nodes.values()) { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { @@ -1331,8 +1346,7 @@ private void assertOpenTranslogReferences() throws Exception { private void assertNoSnapshottedIndexCommit() throws Exception { assertBusy(() -> { - final Collection nodesAndClients = nodes.values(); - for (NodeAndClient nodeAndClient : nodesAndClients) { + for (NodeAndClient nodeAndClient : nodes.values()) { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { @@ -1356,8 +1370,7 @@ private void assertNoSnapshottedIndexCommit() throws Exception { * This assertion might be expensive, thus we prefer not to execute on every test but only interesting tests. */ public void assertConsistentHistoryBetweenTranslogAndLuceneIndex() throws IOException { - final Collection nodesAndClients = nodes.values(); - for (NodeAndClient nodeAndClient : nodesAndClients) { + for (NodeAndClient nodeAndClient : nodes.values()) { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { @@ -1463,6 +1476,7 @@ public void assertSameDocIdsOnShards() throws Exception { } private void randomlyResetClients() { + assert Thread.holdsLock(this); // only reset the clients on nightly tests, it causes heavy load... if (RandomizedTest.isNightly() && rarely(random)) { final Collection nodesAndClients = nodes.values(); @@ -1489,22 +1503,6 @@ public synchronized void wipePendingDataDirectories() { } } - private void markNodeDataDirsAsPendingForWipe(Node node) { - assert Thread.holdsLock(this); - NodeEnvironment nodeEnv = node.getNodeEnvironment(); - if (nodeEnv.hasNodeFile()) { - dataDirToClean.addAll(Arrays.asList(nodeEnv.nodeDataPaths())); - } - } - - private void markNodeDataDirsAsNotEligableForWipe(Node node) { - assert Thread.holdsLock(this); - NodeEnvironment nodeEnv = node.getNodeEnvironment(); - if (nodeEnv.hasNodeFile()) { - dataDirToClean.removeAll(Arrays.asList(nodeEnv.nodeDataPaths())); - } - } - /** * Returns a reference to a random node's {@link ClusterService} */ @@ -1515,26 +1513,22 @@ public ClusterService clusterService() { /** * Returns a reference to a node's {@link ClusterService}. If the given node is null, a random node will be selected. */ - public synchronized ClusterService clusterService(@Nullable String node) { + public ClusterService clusterService(@Nullable String node) { return getInstance(ClusterService.class, node); } /** * Returns an Iterable to all instances for the given class >T< across all nodes in the cluster. */ - public synchronized Iterable getInstances(Class clazz) { - List instances = new ArrayList<>(nodes.size()); - for (NodeAndClient nodeAndClient : nodes.values()) { - instances.add(getInstanceFromNode(clazz, nodeAndClient.node)); - } - return instances; + public Iterable getInstances(Class clazz) { + return nodes.values().stream().map(node -> getInstanceFromNode(clazz, node.node)).collect(Collectors.toList()); } /** * Returns an Iterable to all instances for the given class >T< across all data nodes in the cluster. */ - public synchronized Iterable getDataNodeInstances(Class clazz) { - return getInstances(clazz, new DataNodePredicate()); + public Iterable getDataNodeInstances(Class clazz) { + return getInstances(clazz, DATA_NODE_PREDICATE); } public synchronized T getCurrentMasterNodeInstance(Class clazz) { @@ -1545,11 +1539,11 @@ public synchronized T getCurrentMasterNodeInstance(Class clazz) { * Returns an Iterable to all instances for the given class >T< across all data and master nodes * in the cluster. */ - public synchronized Iterable getDataOrMasterNodeInstances(Class clazz) { - return getInstances(clazz, new DataNodePredicate().or(new MasterNodePredicate())); + public Iterable getDataOrMasterNodeInstances(Class clazz) { + return getInstances(clazz, DATA_NODE_PREDICATE.or(MASTER_NODE_PREDICATE)); } - private synchronized Iterable getInstances(Class clazz, Predicate predicate) { + private Iterable getInstances(Class clazz, Predicate predicate) { Iterable filteredNodes = nodes.values().stream().filter(predicate)::iterator; List instances = new ArrayList<>(); for (NodeAndClient nodeAndClient : filteredNodes) { @@ -1561,16 +1555,16 @@ private synchronized Iterable getInstances(Class clazz, Predicate T getInstance(Class clazz, final String node) { + public T getInstance(Class clazz, final String node) { return getInstance(clazz, nc -> node == null || node.equals(nc.name)); } - public synchronized T getDataNodeInstance(Class clazz) { - return getInstance(clazz, new DataNodePredicate()); + public T getDataNodeInstance(Class clazz) { + return getInstance(clazz, DATA_NODE_PREDICATE); } - public synchronized T getMasterNodeInstance(Class clazz) { - return getInstance(clazz, new MasterNodePredicate()); + public T getMasterNodeInstance(Class clazz) { + return getInstance(clazz, MASTER_NODE_PREDICATE); } private synchronized T getInstance(Class clazz, Predicate predicate) { @@ -1582,17 +1576,17 @@ private synchronized T getInstance(Class clazz, Predicate /** * Returns a reference to a random nodes instances of the given class >T< */ - public synchronized T getInstance(Class clazz) { + public T getInstance(Class clazz) { return getInstance(clazz, nc -> true); } - private synchronized T getInstanceFromNode(Class clazz, Node node) { + private static T getInstanceFromNode(Class clazz, Node node) { return node.injector().getInstance(clazz); } @Override - public synchronized int size() { - return this.nodes.size(); + public int size() { + return nodes.size(); } @Override @@ -1609,7 +1603,7 @@ public InetSocketAddress[] httpAddresses() { */ public synchronized boolean stopRandomDataNode() throws IOException { ensureOpen(); - NodeAndClient nodeAndClient = getRandomNodeAndClient(new DataNodePredicate()); + NodeAndClient nodeAndClient = getRandomNodeAndClient(DATA_NODE_PREDICATE); if (nodeAndClient != null) { logger.info("Closing random node [{}] ", nodeAndClient.name); stopNodesAndClient(nodeAndClient); @@ -1638,9 +1632,10 @@ public synchronized void stopCurrentMasterNode() throws IOException { ensureOpen(); assert size() > 0; String masterNodeName = getMasterName(); - assert nodes.containsKey(masterNodeName); + final NodeAndClient masterNode = nodes.get(masterNodeName); + assert masterNode != null; logger.info("Closing master node [{}] ", masterNodeName); - stopNodesAndClient(nodes.get(masterNodeName)); + stopNodesAndClient(masterNode); } /** @@ -1694,14 +1689,15 @@ private void rebuildUnicastHostFiles(List newNodes) { // cannot be a synchronized method since it's called on other threads from within synchronized startAndPublishNodesAndClients() synchronized (discoveryFileMutex) { try { - Stream unicastHosts = Stream.concat(nodes.values().stream(), newNodes.stream()); + final Collection currentNodes = nodes.values(); + Stream unicastHosts = Stream.concat(currentNodes.stream(), newNodes.stream()); List discoveryFileContents = unicastHosts.map( - nac -> nac.node.injector().getInstance(TransportService.class) - ).filter(Objects::nonNull) + nac -> nac.node.injector().getInstance(TransportService.class) + ).filter(Objects::nonNull) .map(TransportService::getLocalNode).filter(Objects::nonNull).filter(DiscoveryNode::isMasterNode) .map(n -> n.getAddress().toString()) .distinct().collect(Collectors.toList()); - Set configPaths = Stream.concat(nodes.values().stream(), newNodes.stream()) + Set configPaths = Stream.concat(currentNodes.stream(), newNodes.stream()) .map(nac -> nac.node.getEnvironment().configFile()).collect(Collectors.toSet()); logger.debug("configuring discovery with {} at {}", discoveryFileContents, configPaths); for (final Path configPath : configPaths) { @@ -1714,7 +1710,7 @@ private void rebuildUnicastHostFiles(List newNodes) { } } - private synchronized void stopNodesAndClient(NodeAndClient nodeAndClient) throws IOException { + private void stopNodesAndClient(NodeAndClient nodeAndClient) throws IOException { stopNodesAndClients(Collections.singleton(nodeAndClient)); } @@ -1723,7 +1719,7 @@ private synchronized void stopNodesAndClients(Collection nodeAndC for (NodeAndClient nodeAndClient: nodeAndClients) { removeDisruptionSchemeFromNode(nodeAndClient); - NodeAndClient previous = nodes.remove(nodeAndClient.name); + final NodeAndClient previous = removeNode(nodeAndClient); assert previous == nodeAndClient; nodeAndClient.close(); } @@ -1741,16 +1737,9 @@ public void restartRandomDataNode() throws Exception { /** * Restarts a random data node in the cluster and calls the callback during restart. */ - public void restartRandomDataNode(RestartCallback callback) throws Exception { - restartRandomNode(new DataNodePredicate(), callback); - } - - /** - * Restarts a random node in the cluster and calls the callback during restart. - */ - private synchronized void restartRandomNode(Predicate predicate, RestartCallback callback) throws Exception { + public synchronized void restartRandomDataNode(RestartCallback callback) throws Exception { ensureOpen(); - NodeAndClient nodeAndClient = getRandomNodeAndClient(predicate); + NodeAndClient nodeAndClient = getRandomNodeAndClient(InternalTestCluster.DATA_NODE_PREDICATE); if (nodeAndClient != null) { restartNode(nodeAndClient, callback); } @@ -1788,6 +1777,7 @@ public synchronized void rollingRestart(RestartCallback callback) throws Excepti } private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback) throws Exception { + assert Thread.holdsLock(this); logger.info("Restarting node [{}] ", nodeAndClient.name); if (activeDisruptionScheme != null) { @@ -1807,8 +1797,9 @@ private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback) nodeAndClient.startNode(); success = true; } finally { - if (success == false) - nodes.remove(nodeAndClient.name); + if (success == false) { + removeNode(nodeAndClient); + } } if (activeDisruptionScheme != null) { @@ -1828,7 +1819,16 @@ private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback) } } + private NodeAndClient removeNode(NodeAndClient nodeAndClient) { + assert Thread.holdsLock(this); + final NavigableMap newNodes = new TreeMap<>(nodes); + final NodeAndClient previous = newNodes.remove(nodeAndClient.name); + nodes = Collections.unmodifiableNavigableMap(newNodes); + return previous; + } + private Set excludeMasters(Collection nodeAndClients) { + assert Thread.holdsLock(this); final Set excludedNodeIds = new HashSet<>(); if (autoManageMinMasterNodes && nodeAndClients.size() > 0) { @@ -1847,7 +1847,7 @@ private Set excludeMasters(Collection nodeAndClients) { logger.info("adding voting config exclusions {} prior to restart/shutdown", excludedNodeIds); try { client().execute(AddVotingConfigExclusionsAction.INSTANCE, - new AddVotingConfigExclusionsRequest(excludedNodeIds.toArray(new String[0]))).get(); + new AddVotingConfigExclusionsRequest(excludedNodeIds.toArray(Strings.EMPTY_ARRAY))).get(); } catch (InterruptedException | ExecutionException e) { throw new AssertionError("unexpected", e); } @@ -1861,6 +1861,7 @@ private Set excludeMasters(Collection nodeAndClients) { } private void removeExclusions(Set excludedNodeIds) { + assert Thread.holdsLock(this); if (excludedNodeIds.isEmpty() == false) { logger.info("removing voting config exclusions for {} after restart/shutdown", excludedNodeIds); try { @@ -1879,7 +1880,7 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception int numNodesRestarted = 0; final Settings[] newNodeSettings = new Settings[nextNodeId.get()]; Map, List> nodesByRoles = new HashMap<>(); - Set[] rolesOrderedByOriginalStartupOrder = new Set[nextNodeId.get()]; + Set[] rolesOrderedByOriginalStartupOrder = new Set[nextNodeId.get()]; final int minMasterNodes = autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1; for (NodeAndClient nodeAndClient : nodes.values()) { callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient()); @@ -1894,7 +1895,7 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception nodesByRoles.computeIfAbsent(discoveryNode.getRoles(), k -> new ArrayList<>()).add(nodeAndClient); } - assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == nodes.size(); + assert nodesByRoles.values().stream().mapToInt(List::size).sum() == nodes.size(); // randomize start up order, but making sure that: // 1) A data folder that was assigned to a data node will stay so @@ -1912,7 +1913,7 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception final List nodesByRole = nodesByRoles.get(roles); startUpOrder.add(nodesByRole.remove(0)); } - assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == 0; + assert nodesByRoles.values().stream().mapToInt(List::size).sum() == 0; for (NodeAndClient nodeAndClient : startUpOrder) { logger.info("creating node [{}] ", nodeAndClient.name); @@ -1949,17 +1950,14 @@ public String getMasterName(@Nullable String viaNode) { } } - synchronized Set allDataNodesButN(int numNodes) { - return nRandomDataNodes(numDataNodes() - numNodes); - } - - private synchronized Set nRandomDataNodes(int numNodes) { + synchronized Set allDataNodesButN(int count) { + final int numNodes = numDataNodes() - count; assert size() >= numNodes; Map dataNodes = nodes .entrySet() .stream() - .filter(new EntryNodePredicate(new DataNodePredicate())) + .filter(entry -> DATA_NODE_PREDICATE.test(entry.getValue())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); final HashSet set = new HashSet<>(); final Iterator iterator = dataNodes.keySet().iterator(); @@ -1996,7 +1994,8 @@ public synchronized Set nodesInclude(String index) { * If {@link #bootstrapMasterNodeIndex} is -1 (default), this method does nothing. */ private List bootstrapMasterNodeWithSpecifiedIndex(List allNodesSettings) { - if (getBootstrapMasterNodeIndex() == -1) { // fast-path + assert Thread.holdsLock(this); + if (bootstrapMasterNodeIndex == -1) { // fast-path return allNodesSettings; } @@ -2040,36 +2039,36 @@ private List bootstrapMasterNodeWithSpecifiedIndex(List allN /** * Starts a node with default settings and returns its name. */ - public synchronized String startNode() { + public String startNode() { return startNode(Settings.EMPTY); } /** * Starts a node with the given settings builder and returns its name. */ - public synchronized String startNode(Settings.Builder settings) { + public String startNode(Settings.Builder settings) { return startNode(settings.build()); } /** * Starts a node with the given settings and returns its name. */ - public synchronized String startNode(Settings settings) { + public String startNode(Settings settings) { return startNodes(settings).get(0); } /** * Starts multiple nodes with default settings and returns their names */ - public synchronized List startNodes(int numOfNodes) { + public List startNodes(int numOfNodes) { return startNodes(numOfNodes, Settings.EMPTY); } /** * Starts multiple nodes with the given settings and returns their names */ - public synchronized List startNodes(int numOfNodes, Settings settings) { - return startNodes(Collections.nCopies(numOfNodes, settings).stream().toArray(Settings[]::new)); + public List startNodes(int numOfNodes, Settings settings) { + return startNodes(Collections.nCopies(numOfNodes, settings).toArray(new Settings[0])); } /** @@ -2127,11 +2126,11 @@ public synchronized List startNodes(Settings... extraSettings) { return nodes.stream().map(NodeAndClient::getName).collect(Collectors.toList()); } - public synchronized List startMasterOnlyNodes(int numNodes) { + public List startMasterOnlyNodes(int numNodes) { return startMasterOnlyNodes(numNodes, Settings.EMPTY); } - public synchronized List startMasterOnlyNodes(int numNodes, Settings settings) { + public List startMasterOnlyNodes(int numNodes, Settings settings) { Settings settings1 = Settings.builder() .put(settings) .put(Node.NODE_MASTER_SETTING.getKey(), true) @@ -2140,17 +2139,11 @@ public synchronized List startMasterOnlyNodes(int numNodes, Settings set return startNodes(numNodes, settings1); } - public synchronized List startDataOnlyNodes(int numNodes) { - return startDataOnlyNodes(numNodes, Settings.EMPTY); - } - - public synchronized List startDataOnlyNodes(int numNodes, Settings settings) { - Settings settings1 = Settings.builder() - .put(settings) - .put(Node.NODE_MASTER_SETTING.getKey(), false) - .put(Node.NODE_DATA_SETTING.getKey(), true) - .build(); - return startNodes(numNodes, settings1); + public List startDataOnlyNodes(int numNodes) { + return startNodes( + numNodes, + Settings.builder().put(Settings.EMPTY).put(Node.NODE_MASTER_SETTING.getKey(), false) + .put(Node.NODE_DATA_SETTING.getKey(), true).build()); } /** @@ -2158,7 +2151,7 @@ public synchronized List startDataOnlyNodes(int numNodes, Settings setti * * @param eligibleMasterNodeCount the number of master eligible nodes to use as basis for the min master node setting */ - private int updateMinMasterNodes(int eligibleMasterNodeCount) { + private void updateMinMasterNodes(int eligibleMasterNodeCount) { assert autoManageMinMasterNodes; final int minMasterNodes = getMinMasterNodes(eligibleMasterNodeCount); if (getMasterNodesCount() > 0) { @@ -2173,23 +2166,22 @@ private int updateMinMasterNodes(int eligibleMasterNodeCount) { minMasterNodes, getMasterNodesCount()); } } - return minMasterNodes; } /** calculates a min master nodes value based on the given number of master nodes */ - private int getMinMasterNodes(int eligibleMasterNodes) { + private static int getMinMasterNodes(int eligibleMasterNodes) { return eligibleMasterNodes / 2 + 1; } private int getMasterNodesCount() { - return (int)nodes.values().stream().filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())).count(); + return (int) nodes.values().stream().filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())).count(); } - public synchronized String startMasterOnlyNode() { + public String startMasterOnlyNode() { return startMasterOnlyNode(Settings.EMPTY); } - public synchronized String startMasterOnlyNode(Settings settings) { + public String startMasterOnlyNode(Settings settings) { Settings settings1 = Settings.builder() .put(settings) .put(Node.NODE_MASTER_SETTING.getKey(), true) @@ -2198,10 +2190,11 @@ public synchronized String startMasterOnlyNode(Settings settings) { return startNode(settings1); } - public synchronized String startDataOnlyNode() { + public String startDataOnlyNode() { return startDataOnlyNode(Settings.EMPTY); } - public synchronized String startDataOnlyNode(Settings settings) { + + public String startDataOnlyNode(Settings settings) { Settings settings1 = Settings.builder() .put(settings) .put(Node.NODE_MASTER_SETTING.getKey(), false) @@ -2212,7 +2205,9 @@ public synchronized String startDataOnlyNode(Settings settings) { private synchronized void publishNode(NodeAndClient nodeAndClient) { assert !nodeAndClient.node().isClosed(); - nodes.put(nodeAndClient.name, nodeAndClient); + final NavigableMap newNodes = new TreeMap<>(nodes); + newNodes.put(nodeAndClient.name, nodeAndClient); + nodes = Collections.unmodifiableNavigableMap(newNodes); applyDisruptionSchemeToNode(nodeAndClient); } @@ -2227,10 +2222,10 @@ public int numDataNodes() { @Override public int numDataAndMasterNodes() { - return dataAndMasterNodes().size(); + return filterNodes(nodes, DATA_NODE_PREDICATE.or(MASTER_NODE_PREDICATE)).size(); } - public synchronized int numMasterNodes() { + public int numMasterNodes() { return filterNodes(nodes, NodeAndClient::isMasterEligible).size(); } @@ -2245,7 +2240,8 @@ public void clearDisruptionScheme() { clearDisruptionScheme(true); } - public void clearDisruptionScheme(boolean ensureHealthyCluster) { + // synchronized to prevent concurrently modifying the cluster. + public synchronized void clearDisruptionScheme(boolean ensureHealthyCluster) { if (activeDisruptionScheme != null) { TimeValue expectedHealingTime = activeDisruptionScheme.expectedTimeToHeal(); logger.info("Clearing active scheme {}, expected healing time {}", activeDisruptionScheme, expectedHealingTime); @@ -2272,15 +2268,11 @@ private void removeDisruptionSchemeFromNode(NodeAndClient nodeAndClient) { } } - private synchronized Collection dataNodeAndClients() { - return filterNodes(nodes, new DataNodePredicate()); + private Collection dataNodeAndClients() { + return filterNodes(nodes, DATA_NODE_PREDICATE); } - private synchronized Collection dataAndMasterNodes() { - return filterNodes(nodes, new DataNodePredicate().or(new MasterNodePredicate())); - } - - private synchronized Collection filterNodes(Map map, + private static Collection filterNodes(Map map, Predicate predicate) { return map .values() @@ -2289,51 +2281,16 @@ private synchronized Collection filterNodes(Map { - @Override - public boolean test(NodeAndClient nodeAndClient) { - return DiscoveryNode.isDataNode(nodeAndClient.node.settings()); - } - } - - private static final class MasterNodePredicate implements Predicate { - @Override - public boolean test(NodeAndClient nodeAndClient) { - return DiscoveryNode.isMasterNode(nodeAndClient.node.settings()); - } - } - private static final class NodeNamePredicate implements Predicate { - private final HashSet nodeNames; + private final String nodeName; - NodeNamePredicate(String... nodeNames) { - this.nodeNames = Sets.newHashSet(nodeNames); + NodeNamePredicate(String nodeName) { + this.nodeName = nodeName; } @Override public boolean test(NodeAndClient nodeAndClient) { - return nodeNames.contains(nodeAndClient.getName()); - } - } - - private static final class NoDataNoMasterNodePredicate implements Predicate { - @Override - public boolean test(NodeAndClient nodeAndClient) { - return DiscoveryNode.isMasterNode(nodeAndClient.node.settings()) == false && - DiscoveryNode.isDataNode(nodeAndClient.node.settings()) == false; - } - } - - private static final class EntryNodePredicate implements Predicate> { - private final Predicate delegateNodePredicate; - - EntryNodePredicate(Predicate delegateNodePredicate) { - this.delegateNodePredicate = delegateNodePredicate; - } - - @Override - public boolean test(Map.Entry entry) { - return delegateNodePredicate.test(entry.getValue()); + return nodeName.equals(nodeAndClient.getName()); } } @@ -2365,8 +2322,7 @@ synchronized String routingKeyForShard(Index index, int shard, Random random) { } @Override - public synchronized Iterable getClients() { - ensureOpen(); + public Iterable getClients() { return () -> { ensureOpen(); final Iterator iterator = nodes.values().iterator(); @@ -2492,7 +2448,7 @@ public void ensureEstimatedStats() { } @Override - public void assertAfterTest() throws IOException { + public synchronized void assertAfterTest() throws IOException { super.assertAfterTest(); assertRequestsFinished(); for (NodeAndClient nodeAndClient : nodes.values()) { @@ -2509,6 +2465,7 @@ public void assertAfterTest() throws IOException { } private void assertRequestsFinished() { + assert Thread.holdsLock(this); if (size() > 0) { for (NodeAndClient nodeAndClient : nodes.values()) { CircuitBreaker inFlightRequestsBreaker = getInstance(CircuitBreakerService.class, nodeAndClient.name)