diff --git a/plugins/discovery-azure-classic/src/main/java/org/elasticsearch/discovery/azure/classic/AzureUnicastHostsProvider.java b/plugins/discovery-azure-classic/src/main/java/org/elasticsearch/discovery/azure/classic/AzureUnicastHostsProvider.java index 482dafb008fc5..1a9265de2a72f 100644 --- a/plugins/discovery-azure-classic/src/main/java/org/elasticsearch/discovery/azure/classic/AzureUnicastHostsProvider.java +++ b/plugins/discovery-azure-classic/src/main/java/org/elasticsearch/discovery/azure/classic/AzureUnicastHostsProvider.java @@ -132,7 +132,7 @@ public AzureUnicastHostsProvider(Settings settings, AzureComputeService azureCom * Setting `cloud.azure.refresh_interval` to `0` will disable caching (default). */ @Override - public List buildDynamicHosts() { + public List buildDynamicHosts(HostsResolver hostsResolver) { if (refreshInterval.millis() != 0) { if (dynamicHosts != null && (refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) { diff --git a/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java b/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java index 396e9f707d404..8f5037042986b 100644 --- a/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java +++ b/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java @@ -92,7 +92,7 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos } @Override - public List buildDynamicHosts() { + public List buildDynamicHosts(HostsResolver hostsResolver) { return dynamicHosts.getOrRefresh(); } diff --git a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java index 9dc2e02edc1b5..295df0c818a91 100644 --- a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java +++ b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java @@ -93,7 +93,7 @@ protected List buildDynamicHosts(Settings nodeSettings, int no protected List buildDynamicHosts(Settings nodeSettings, int nodes, List> tagsList) { try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY, nodes, tagsList)) { AwsEc2UnicastHostsProvider provider = new AwsEc2UnicastHostsProvider(nodeSettings, transportService, plugin.ec2Service); - List dynamicHosts = provider.buildDynamicHosts(); + List dynamicHosts = provider.buildDynamicHosts(null); logger.debug("--> addresses found: {}", dynamicHosts); return dynamicHosts; } catch (IOException e) { @@ -307,7 +307,7 @@ protected List fetchDynamicNodes() { } }; for (int i=0; i<3; i++) { - provider.buildDynamicHosts(); + provider.buildDynamicHosts(null); } assertThat(provider.fetchCount, is(3)); } @@ -324,12 +324,12 @@ protected List fetchDynamicNodes() { } }; for (int i=0; i<3; i++) { - provider.buildDynamicHosts(); + provider.buildDynamicHosts(null); } assertThat(provider.fetchCount, is(1)); Thread.sleep(1_000L); // wait for cache to expire for (int i=0; i<3; i++) { - provider.buildDynamicHosts(); + provider.buildDynamicHosts(null); } assertThat(provider.fetchCount, is(2)); } diff --git a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java index fb37b3bc01104..4d26447078597 100644 --- a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java +++ b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java @@ -19,35 +19,17 @@ package org.elasticsearch.discovery.file; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.discovery.zen.UnicastHostsProvider; -import org.elasticsearch.discovery.zen.UnicastZenPing; import org.elasticsearch.env.Environment; -import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.node.Node; import org.elasticsearch.plugins.DiscoveryPlugin; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.script.ScriptService; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.watcher.ResourceWatcherService; -import java.io.IOException; import java.nio.file.Path; -import java.util.Collection; import java.util.Collections; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import java.util.function.Supplier; /** @@ -57,47 +39,19 @@ */ public class FileBasedDiscoveryPlugin extends Plugin implements DiscoveryPlugin { - private static final Logger logger = Loggers.getLogger(FileBasedDiscoveryPlugin.class); - private final Settings settings; private final Path configPath; - private ExecutorService fileBasedDiscoveryExecutorService; public FileBasedDiscoveryPlugin(Settings settings, Path configPath) { this.settings = settings; this.configPath = configPath; } - @Override - public Collection createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, - ResourceWatcherService resourceWatcherService, ScriptService scriptService, - NamedXContentRegistry xContentRegistry, Environment environment, - NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) { - final int concurrentConnects = UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings); - final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[file_based_discovery_resolve]"); - fileBasedDiscoveryExecutorService = EsExecutors.newScaling( - Node.NODE_NAME_SETTING.get(settings) + "/" + "file_based_discovery_resolve", - 0, - concurrentConnects, - 60, - TimeUnit.SECONDS, - threadFactory, - threadPool.getThreadContext()); - - return Collections.emptyList(); - } - - @Override - public void close() throws IOException { - ThreadPool.terminate(fileBasedDiscoveryExecutorService, 0, TimeUnit.SECONDS); - } - @Override public Map> getZenHostsProviders(TransportService transportService, NetworkService networkService) { return Collections.singletonMap( "file", - () -> new FileBasedUnicastHostsProvider( - new Environment(settings, configPath), transportService, fileBasedDiscoveryExecutorService)); + () -> new FileBasedUnicastHostsProvider(new Environment(settings, configPath))); } } diff --git a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java index 7abcb4454720c..584ae4de5a2b5 100644 --- a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java +++ b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java @@ -23,26 +23,19 @@ import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.UnicastHostsProvider; import org.elasticsearch.env.Environment; -import org.elasticsearch.transport.TransportService; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; -import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.elasticsearch.discovery.zen.UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT; -import static org.elasticsearch.discovery.zen.UnicastZenPing.resolveHostsLists; - /** * An implementation of {@link UnicastHostsProvider} that reads hosts/ports * from {@link #UNICAST_HOSTS_FILE}. @@ -59,23 +52,15 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast static final String UNICAST_HOSTS_FILE = "unicast_hosts.txt"; - private final TransportService transportService; - private final ExecutorService executorService; - private final Path unicastHostsFilePath; - private final TimeValue resolveTimeout; - - FileBasedUnicastHostsProvider(Environment environment, TransportService transportService, ExecutorService executorService) { + FileBasedUnicastHostsProvider(Environment environment) { super(environment.settings()); - this.transportService = transportService; - this.executorService = executorService; this.unicastHostsFilePath = environment.configFile().resolve("discovery-file").resolve(UNICAST_HOSTS_FILE); - this.resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings); } @Override - public List buildDynamicHosts() { + public List buildDynamicHosts(HostsResolver hostsResolver) { List hostsList; try (Stream lines = Files.lines(unicastHostsFilePath)) { hostsList = lines.filter(line -> line.startsWith("#") == false) // lines starting with `#` are comments @@ -90,21 +75,8 @@ public List buildDynamicHosts() { hostsList = Collections.emptyList(); } - final List dynamicHosts = new ArrayList<>(); - try { - dynamicHosts.addAll(resolveHostsLists( - executorService, - logger, - hostsList, - 1, - transportService, - resolveTimeout)); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - + final List dynamicHosts = hostsResolver.resolveHosts(hostsList, 1); logger.debug("[discovery-file] Using dynamic discovery nodes {}", dynamicHosts); - return dynamicHosts; } diff --git a/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java b/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java index 860d3537635d5..5837d3bcdfe3f 100644 --- a/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java +++ b/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java @@ -24,7 +24,9 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.discovery.zen.UnicastZenPing; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -123,8 +125,10 @@ public void testUnicastHostsDoesNotExist() throws Exception { .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) .build(); final Environment environment = TestEnvironment.newEnvironment(settings); - final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(environment, transportService, executorService); - final List addresses = provider.buildDynamicHosts(); + final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(environment); + final List addresses = provider.buildDynamicHosts((hosts, limitPortCounts) -> + UnicastZenPing.resolveHostsLists(executorService, logger, hosts, limitPortCounts, transportService, + TimeValue.timeValueSeconds(10))); assertEquals(0, addresses.size()); } @@ -163,6 +167,8 @@ private List setupAndRunHostProvider(final List hostEn } return new FileBasedUnicastHostsProvider( - new Environment(settings, configPath), transportService, executorService).buildDynamicHosts(); + new Environment(settings, configPath)).buildDynamicHosts((hosts, limitPortCounts) -> + UnicastZenPing.resolveHostsLists(executorService, logger, hosts, limitPortCounts, transportService, + TimeValue.timeValueSeconds(10))); } } diff --git a/plugins/discovery-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java b/plugins/discovery-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java index 790d70a8b99b0..778c38697c5ec 100644 --- a/plugins/discovery-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java +++ b/plugins/discovery-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java @@ -93,7 +93,7 @@ public GceUnicastHostsProvider(Settings settings, GceInstancesService gceInstanc * Information can be cached using `cloud.gce.refresh_interval` property if needed. */ @Override - public List buildDynamicHosts() { + public List buildDynamicHosts(HostsResolver hostsResolver) { // We check that needed properties have been set if (this.project == null || this.project.isEmpty() || this.zones == null || this.zones.isEmpty()) { throw new IllegalArgumentException("one or more gce discovery settings are missing. " + diff --git a/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoveryTests.java b/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoveryTests.java index a1944a15d8036..816152186e761 100644 --- a/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoveryTests.java +++ b/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoveryTests.java @@ -108,7 +108,7 @@ protected List buildDynamicNodes(GceInstancesServiceImpl gceIn GceUnicastHostsProvider provider = new GceUnicastHostsProvider(nodeSettings, gceInstancesService, transportService, new NetworkService(Collections.emptyList())); - List dynamicHosts = provider.buildDynamicHosts(); + List dynamicHosts = provider.buildDynamicHosts(null); logger.info("--> addresses found: {}", dynamicHosts); return dynamicHosts; } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index e616613a425a9..478325c66f983 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -56,6 +56,7 @@ import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.FaultDetection; +import org.elasticsearch.discovery.zen.SettingsBasedHostsProvider; import org.elasticsearch.discovery.zen.UnicastZenPing; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.env.Environment; @@ -357,7 +358,7 @@ public void apply(Settings value, Settings current, Settings previous) { ZenDiscovery.MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING, ZenDiscovery.MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING, ZenDiscovery.MAX_PENDING_CLUSTER_STATES_SETTING, - UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING, + SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING, UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING, UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT, SearchService.DEFAULT_KEEPALIVE_SETTING, diff --git a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index 179692cd516c8..e47fe7a7a70ed 100644 --- a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -31,7 +31,9 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.discovery.single.SingleNodeDiscovery; +import org.elasticsearch.discovery.zen.SettingsBasedHostsProvider; import org.elasticsearch.discovery.zen.UnicastHostsProvider; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.plugins.DiscoveryPlugin; @@ -42,13 +44,15 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; +import java.util.Set; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; /** * A module for loading classes for node discovery. @@ -57,8 +61,8 @@ public class DiscoveryModule { public static final Setting DISCOVERY_TYPE_SETTING = new Setting<>("discovery.type", "zen", Function.identity(), Property.NodeScope); - public static final Setting> DISCOVERY_HOSTS_PROVIDER_SETTING = - new Setting<>("discovery.zen.hosts_provider", (String)null, Optional::ofNullable, Property.NodeScope); + public static final Setting> DISCOVERY_HOSTS_PROVIDER_SETTING = + Setting.listSetting("discovery.zen.hosts_provider", Collections.emptyList(), Function.identity(), Property.NodeScope); private final Discovery discovery; @@ -66,9 +70,9 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService, ClusterApplier clusterApplier, ClusterSettings clusterSettings, List plugins, AllocationService allocationService) { - final UnicastHostsProvider hostsProvider; final Collection> joinValidators = new ArrayList<>(); - Map> hostProviders = new HashMap<>(); + final Map> hostProviders = new HashMap<>(); + hostProviders.put("settings", () -> new SettingsBasedHostsProvider(settings, transportService)); for (DiscoveryPlugin plugin : plugins) { plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> { if (hostProviders.put(entry.getKey(), entry.getValue()) != null) { @@ -80,17 +84,32 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic joinValidators.add(joinValidator); } } - Optional hostsProviderName = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings); - if (hostsProviderName.isPresent()) { - Supplier hostsProviderSupplier = hostProviders.get(hostsProviderName.get()); - if (hostsProviderSupplier == null) { - throw new IllegalArgumentException("Unknown zen hosts provider [" + hostsProviderName.get() + "]"); - } - hostsProvider = Objects.requireNonNull(hostsProviderSupplier.get()); - } else { - hostsProvider = Collections::emptyList; + List hostsProviderNames = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings); + // for bwc purposes, add settings provider even if not explicitly specified + if (hostsProviderNames.contains("settings") == false) { + List extendedHostsProviderNames = new ArrayList<>(); + extendedHostsProviderNames.add("settings"); + extendedHostsProviderNames.addAll(hostsProviderNames); + hostsProviderNames = extendedHostsProviderNames; + } + + final Set missingProviderNames = new HashSet<>(hostsProviderNames); + missingProviderNames.removeAll(hostProviders.keySet()); + if (missingProviderNames.isEmpty() == false) { + throw new IllegalArgumentException("Unknown zen hosts providers " + missingProviderNames); } + List filteredHostsProviders = hostsProviderNames.stream() + .map(hostProviders::get).map(Supplier::get).collect(Collectors.toList()); + + final UnicastHostsProvider hostsProvider = hostsResolver -> { + final List addresses = new ArrayList<>(); + for (UnicastHostsProvider provider : filteredHostsProviders) { + addresses.addAll(provider.buildDynamicHosts(hostsResolver)); + } + return Collections.unmodifiableList(addresses); + }; + Map> discoveryTypes = new HashMap<>(); discoveryTypes.put("zen", () -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/SettingsBasedHostsProvider.java b/server/src/main/java/org/elasticsearch/discovery/zen/SettingsBasedHostsProvider.java new file mode 100644 index 0000000000000..6d6453c776e68 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/discovery/zen/SettingsBasedHostsProvider.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.zen; + +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.transport.TransportService; + +import java.util.List; +import java.util.function.Function; + +import static java.util.Collections.emptyList; + +/** + * An implementation of {@link UnicastHostsProvider} that reads hosts/ports + * from the "discovery.zen.ping.unicast.hosts" node setting. If the port is + * left off an entry, a default port of 9300 is assumed. + * + * An example unicast hosts setting might look as follows: + * [67.81.244.10, 67.81.244.11:9305, 67.81.244.15:9400] + */ +public class SettingsBasedHostsProvider extends AbstractComponent implements UnicastHostsProvider { + + public static final Setting> DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING = + Setting.listSetting("discovery.zen.ping.unicast.hosts", emptyList(), Function.identity(), Setting.Property.NodeScope); + + // these limits are per-address + public static final int LIMIT_FOREIGN_PORTS_COUNT = 1; + public static final int LIMIT_LOCAL_PORTS_COUNT = 5; + + private final List configuredHosts; + + private final int limitPortCounts; + + public SettingsBasedHostsProvider(Settings settings, TransportService transportService) { + super(settings); + + if (DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.exists(settings)) { + configuredHosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings); + // we only limit to 1 address, makes no sense to ping 100 ports + limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT; + } else { + // if unicast hosts are not specified, fill with simple defaults on the local machine + configuredHosts = transportService.getLocalAddresses(); + limitPortCounts = LIMIT_LOCAL_PORTS_COUNT; + } + + logger.debug("using initial hosts {}", configuredHosts); + } + + @Override + public List buildDynamicHosts(HostsResolver hostsResolver) { + return hostsResolver.resolveHosts(configuredHosts, limitPortCounts); + } + +} diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastHostsProvider.java b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastHostsProvider.java index d719f9d123b8c..86410005c92bf 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastHostsProvider.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastHostsProvider.java @@ -31,5 +31,15 @@ public interface UnicastHostsProvider { /** * Builds the dynamic list of unicast hosts to be used for unicast discovery. */ - List buildDynamicHosts(); + List buildDynamicHosts(HostsResolver hostsResolver); + + /** + * Helper object that allows to resolve a list of hosts to a list of transport addresses. + * Each host is resolved into a transport address (or a collection of addresses if the + * number of ports is greater than one) + */ + interface HostsResolver { + List resolveHosts(List hosts, int limitPortCounts); + } + } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index cbadbb4a1e09b..9c86fa17e9b06 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -82,11 +82,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; -import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; @@ -94,26 +92,15 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { public static final String ACTION_NAME = "internal:discovery/zen/unicast"; - public static final Setting> DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING = - Setting.listSetting("discovery.zen.ping.unicast.hosts", emptyList(), Function.identity(), - Property.NodeScope); public static final Setting DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING = Setting.intSetting("discovery.zen.ping.unicast.concurrent_connects", 10, 0, Property.NodeScope); public static final Setting DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT = Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(5), Property.NodeScope); - // these limits are per-address - public static final int LIMIT_FOREIGN_PORTS_COUNT = 1; - public static final int LIMIT_LOCAL_PORTS_COUNT = 5; - private final ThreadPool threadPool; private final TransportService transportService; private final ClusterName clusterName; - private final List configuredHosts; - - private final int limitPortCounts; - private final PingContextProvider contextProvider; private final AtomicInteger pingingRoundIdGenerator = new AtomicInteger(); @@ -141,19 +128,10 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService this.contextProvider = contextProvider; final int concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings); - if (DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.exists(settings)) { - configuredHosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings); - // we only limit to 1 addresses, makes no sense to ping 100 ports - limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT; - } else { - // if unicast hosts are not specified, fill with simple defaults on the local machine - configuredHosts = transportService.getLocalAddresses(); - limitPortCounts = LIMIT_LOCAL_PORTS_COUNT; - } + resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings); logger.debug( - "using initial hosts {}, with concurrent_connects [{}], resolve_timeout [{}]", - configuredHosts, + "using concurrent_connects [{}], resolve_timeout [{}]", concurrentConnects, resolveTimeout); @@ -172,9 +150,9 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService } /** - * Resolves a list of hosts to a list of discovery nodes. Each host is resolved into a transport address (or a collection of addresses - * if the number of ports is greater than one) and the transport addresses are used to created discovery nodes. Host lookups are done - * in parallel using specified executor service up to the specified resolve timeout. + * Resolves a list of hosts to a list of transport addresses. Each host is resolved into a transport address (or a collection of + * addresses if the number of ports is greater than one). Host lookups are done in parallel using specified executor service up + * to the specified resolve timeout. * * @param executorService the executor service used to parallelize hostname lookups * @param logger logger used for logging messages regarding hostname lookups @@ -190,7 +168,7 @@ public static List resolveHostsLists( final List hosts, final int limitPortCounts, final TransportService transportService, - final TimeValue resolveTimeout) throws InterruptedException { + final TimeValue resolveTimeout) { Objects.requireNonNull(executorService); Objects.requireNonNull(logger); Objects.requireNonNull(hosts); @@ -205,8 +183,13 @@ public static List resolveHostsLists( .stream() .map(hn -> (Callable) () -> transportService.addressesFromString(hn, limitPortCounts)) .collect(Collectors.toList()); - final List> futures = - executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS); + final List> futures; + try { + futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return Collections.emptyList(); + } final List transportAddresses = new ArrayList<>(); final Set localAddresses = new HashSet<>(); localAddresses.add(transportService.boundAddress().publishAddress()); @@ -232,6 +215,9 @@ public static List resolveHostsLists( assert e.getCause() != null; final String message = "failed to resolve host [" + hostname + "]"; logger.warn(message, e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // ignore } } else { logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname); @@ -240,6 +226,11 @@ public static List resolveHostsLists( return Collections.unmodifiableList(transportAddresses); } + private UnicastHostsProvider.HostsResolver createHostsResolver() { + return (hosts, limitPortCounts) -> resolveHostsLists(unicastZenPingExecutorService, logger, hosts, + limitPortCounts, transportService, resolveTimeout); + } + @Override public void close() { ThreadPool.terminate(unicastZenPingExecutorService, 10, TimeUnit.SECONDS); @@ -281,18 +272,7 @@ protected void ping(final Consumer resultsConsumer, final TimeValue scheduleDuration, final TimeValue requestDuration) { final List seedAddresses = new ArrayList<>(); - try { - seedAddresses.addAll(resolveHostsLists( - unicastZenPingExecutorService, - logger, - configuredHosts, - limitPortCounts, - transportService, - resolveTimeout)); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - seedAddresses.addAll(hostsProvider.buildDynamicHosts()); + seedAddresses.addAll(hostsProvider.buildDynamicHosts(createHostsResolver())); final DiscoveryNodes nodes = contextProvider.clusterState().nodes(); // add all possible master nodes that were active in the last known cluster configuration for (ObjectCursor masterNode : nodes.getMasterNodes().values()) { diff --git a/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java index 18829d515973d..f2491b2db1f9a 100644 --- a/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java @@ -137,11 +137,10 @@ public void testDuplicateDiscovery() { public void testHostsProvider() { Settings settings = Settings.builder().put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "custom").build(); - final UnicastHostsProvider provider = Collections::emptyList; AtomicBoolean created = new AtomicBoolean(false); DummyHostsProviderPlugin plugin = () -> Collections.singletonMap("custom", () -> { created.set(true); - return Collections::emptyList; + return hostsResolver -> Collections.emptyList(); }); newModule(settings, Collections.singletonList(plugin)); assertTrue(created.get()); @@ -151,7 +150,7 @@ public void testUnknownHostsProvider() { Settings settings = Settings.builder().put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "dne").build(); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> newModule(settings, Collections.emptyList())); - assertEquals("Unknown zen hosts provider [dne]", e.getMessage()); + assertEquals("Unknown zen hosts providers [dne]", e.getMessage()); } public void testDuplicateHostsProvider() { @@ -162,6 +161,37 @@ public void testDuplicateHostsProvider() { assertEquals("Cannot register zen hosts provider [dup] twice", e.getMessage()); } + public void testSettingsHostsProvider() { + DummyHostsProviderPlugin plugin = () -> Collections.singletonMap("settings", () -> null); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> + newModule(Settings.EMPTY, Arrays.asList(plugin))); + assertEquals("Cannot register zen hosts provider [settings] twice", e.getMessage()); + } + + public void testMultiHostsProvider() { + AtomicBoolean created1 = new AtomicBoolean(false); + DummyHostsProviderPlugin plugin1 = () -> Collections.singletonMap("provider1", () -> { + created1.set(true); + return hostsResolver -> Collections.emptyList(); + }); + AtomicBoolean created2 = new AtomicBoolean(false); + DummyHostsProviderPlugin plugin2 = () -> Collections.singletonMap("provider2", () -> { + created2.set(true); + return hostsResolver -> Collections.emptyList(); + }); + AtomicBoolean created3 = new AtomicBoolean(false); + DummyHostsProviderPlugin plugin3 = () -> Collections.singletonMap("provider3", () -> { + created3.set(true); + return hostsResolver -> Collections.emptyList(); + }); + Settings settings = Settings.builder().putList(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), + "provider1", "provider3").build(); + newModule(settings, Arrays.asList(plugin1, plugin2, plugin3)); + assertTrue(created1.get()); + assertFalse(created2.get()); + assertTrue(created3.get()); + } + public void testLazyConstructionHostsProvider() { DummyHostsProviderPlugin plugin = () -> Collections.singletonMap("custom", () -> { diff --git a/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java b/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java index 33c87ea7f383e..c3ffbb82081b7 100644 --- a/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java @@ -84,7 +84,7 @@ public void testDoesNotRespondToZenPings() throws Exception { internalCluster().getInstance(TransportService.class); // try to ping the single node directly final UnicastHostsProvider provider = - () -> Collections.singletonList(nodeTransport.getLocalNode().getAddress()); + hostsResolver -> Collections.singletonList(nodeTransport.getLocalNode().getAddress()); final CountDownLatch latch = new CountDownLatch(1); final DiscoveryNodes nodes = DiscoveryNodes.builder() .add(nodeTransport.getLocalNode()) diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index 4aa75077431e7..eef926a1e1238 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -137,8 +137,6 @@ public void tearDown() throws Exception { } } - private static final UnicastHostsProvider EMPTY_HOSTS_PROVIDER = Collections::emptyList; - public void testSimplePings() throws IOException, InterruptedException, ExecutionException { // use ephemeral ports final Settings settings = Settings.builder().put("cluster.name", "test").put(TcpTransport.PORT.getKey(), 0).build(); @@ -182,7 +180,7 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomNonNegativeLong()).build(); final ClusterState stateMismatch = ClusterState.builder(new ClusterName("mismatch")).version(randomNonNegativeLong()).build(); - Settings hostsSettings = Settings.builder() + final Settings hostsSettings = Settings.builder() .putList("discovery.zen.ping.unicast.hosts", NetworkAddress.format(new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort())), NetworkAddress.format(new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort())), @@ -196,22 +194,21 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil .blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) .nodes(DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A")) .build(); - TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER, () -> stateA); + TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, () -> stateA); zenPingA.start(); closeables.push(zenPingA); ClusterState stateB = ClusterState.builder(state) .nodes(DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B")) .build(); - TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER, () -> stateB); + TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, () -> stateB); zenPingB.start(); closeables.push(zenPingB); ClusterState stateC = ClusterState.builder(stateMismatch) .nodes(DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C")) .build(); - TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleC, - EMPTY_HOSTS_PROVIDER, () -> stateC) { + TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleC, () -> stateC) { @Override protected Version getVersion() { return versionD; @@ -223,8 +220,7 @@ protected Version getVersion() { ClusterState stateD = ClusterState.builder(stateMismatch) .nodes(DiscoveryNodes.builder().add(handleD.node).localNodeId("UZP_D")) .build(); - TestUnicastZenPing zenPingD = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleD, - EMPTY_HOSTS_PROVIDER, () -> stateD); + TestUnicastZenPing zenPingD = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleD, () -> stateD); zenPingD.start(); closeables.push(zenPingD); @@ -329,21 +325,21 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi .blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) .nodes(DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A")) .build(); - final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER, () -> stateA); + final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, () -> stateA); zenPingA.start(); closeables.push(zenPingA); ClusterState stateB = ClusterState.builder(state) .nodes(DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B")) .build(); - TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER, () -> stateB); + TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, () -> stateB); zenPingB.start(); closeables.push(zenPingB); ClusterState stateC = ClusterState.builder(state) .nodes(DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C")) .build(); - TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettings, threadPool, handleC, EMPTY_HOSTS_PROVIDER, () -> stateC); + TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettings, threadPool, handleC, () -> stateC); zenPingC.start(); closeables.push(zenPingC); @@ -408,7 +404,7 @@ public BoundTransportAddress boundAddress() { Collections.emptySet()); closeables.push(transportService); final int limitPortCounts = randomIntBetween(1, 10); - final List transportAddresses = TestUnicastZenPing.resolveHostsLists( + final List transportAddresses = UnicastZenPing.resolveHostsLists( executorService, logger, Collections.singletonList("127.0.0.1"), @@ -452,7 +448,7 @@ public BoundTransportAddress boundAddress() { new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()); closeables.push(transportService); - final List transportAddresses = TestUnicastZenPing.resolveHostsLists( + final List transportAddresses = UnicastZenPing.resolveHostsLists( executorService, logger, Collections.singletonList(NetworkAddress.format(loopbackAddress)), @@ -503,7 +499,7 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi Collections.emptySet()); closeables.push(transportService); - final List transportAddresses = TestUnicastZenPing.resolveHostsLists( + final List transportAddresses = UnicastZenPing.resolveHostsLists( executorService, logger, Arrays.asList(hostname), @@ -562,7 +558,7 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi closeables.push(transportService); final TimeValue resolveTimeout = TimeValue.timeValueSeconds(randomIntBetween(1, 3)); try { - final List transportAddresses = TestUnicastZenPing.resolveHostsLists( + final List transportAddresses = UnicastZenPing.resolveHostsLists( executorService, logger, Arrays.asList("hostname1", "hostname2"), @@ -610,6 +606,7 @@ public void testResolveReuseExistingNodeConnections() throws ExecutionException, hostsSettingsBuilder.put("discovery.zen.ping.unicast.hosts", (String) null); } final Settings hostsSettings = hostsSettingsBuilder.build(); + final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomNonNegativeLong()).build(); // connection to reuse @@ -627,14 +624,14 @@ public void onConnectionOpened(Transport.Connection connection) { .blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) .nodes(DiscoveryNodes.builder().add(handleA.node).add(handleB.node).localNodeId("UZP_A")) .build(); - final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER, () -> stateA); + final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, () -> stateA); zenPingA.start(); closeables.push(zenPingA); final ClusterState stateB = ClusterState.builder(state) .nodes(DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B")) .build(); - TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER, () -> stateB); + TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, () -> stateB); zenPingB.start(); closeables.push(zenPingB); @@ -669,19 +666,20 @@ public void testPingingTemporalPings() throws ExecutionException, InterruptedExc .put("cluster.name", "test") .put("discovery.zen.ping.unicast.hosts", (String) null) // use nodes for simplicity .build(); + final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomNonNegativeLong()).build(); final ClusterState stateA = ClusterState.builder(state) .blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) .nodes(DiscoveryNodes.builder().add(handleA.node).add(handleB.node).localNodeId("UZP_A")).build(); - final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER, () -> stateA); + final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, () -> stateA); zenPingA.start(); closeables.push(zenPingA); // Node B doesn't know about A! final ClusterState stateB = ClusterState.builder(state).nodes( DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B")).build(); - TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER, () -> stateB); + TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, () -> stateB); zenPingB.start(); closeables.push(zenPingB); @@ -728,7 +726,7 @@ public BoundTransportAddress boundAddress() { new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()); closeables.push(transportService); - final List transportAddresses = TestUnicastZenPing.resolveHostsLists( + final List transportAddresses = UnicastZenPing.resolveHostsLists( executorService, logger, Arrays.asList("127.0.0.1:9300:9300", "127.0.0.1:9301"), @@ -828,9 +826,10 @@ private static class NetworkHandle { private static class TestUnicastZenPing extends UnicastZenPing { TestUnicastZenPing(Settings settings, ThreadPool threadPool, NetworkHandle networkHandle, - UnicastHostsProvider unicastHostsProvider, PingContextProvider contextProvider) { + PingContextProvider contextProvider) { super(Settings.builder().put("node.name", networkHandle.node.getName()).put(settings).build(), - threadPool, networkHandle.transportService, unicastHostsProvider, contextProvider); + threadPool, networkHandle.transportService, + new SettingsBasedHostsProvider(settings, networkHandle.transportService), contextProvider); } volatile CountDownLatch allTasksCompleted; diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index 9273ab1514372..a60a23bcd6d5c 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -317,7 +317,7 @@ public void onNewClusterState(String source, Supplier clusterState } }; ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), - masterService, clusterApplier, clusterSettings, Collections::emptyList, ESAllocationTestCase.createAllocationService(), + masterService, clusterApplier, clusterSettings, hostsResolver -> Collections.emptyList(), ESAllocationTestCase.createAllocationService(), Collections.emptyList()); zenDiscovery.start(); return zenDiscovery; diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java index 2e60a3c518dd3..dc9304637cdca 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java @@ -56,7 +56,7 @@ public MockUncasedHostProvider(Supplier localNodeSupplier, Cluste } @Override - public List buildDynamicHosts() { + public List buildDynamicHosts(HostsResolver hostsResolver) { final DiscoveryNode localNode = getNode(); assert localNode != null; synchronized (activeNodesPerCluster) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java index 11f9e38e665ff..5387a659aa274 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java @@ -45,7 +45,7 @@ import java.util.Map; import java.util.function.Supplier; -import static org.elasticsearch.discovery.zen.UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; +import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; /** * A alternative zen discovery which allows using mocks for things like pings, as well as @@ -84,7 +84,7 @@ public Map> getZenHostsProviders(Transpor final Supplier supplier; if (USE_MOCK_PINGS.get(settings)) { // we have to return something in order for the unicast host provider setting to resolve to something. It will never be used - supplier = () -> () -> { + supplier = () -> hostsResolver -> { throw new UnsupportedOperationException(); }; } else {