Skip to content

Commit

Permalink
Adding full CompatibilityVersions to NodeInfo (#116577)
Browse files Browse the repository at this point in the history
Extracting the Transport protocol related changes from
#115771 to make backport
easier.
  • Loading branch information
ldematte authored Nov 11, 2024
1 parent 3ebc1f4 commit 0ce310a
Show file tree
Hide file tree
Showing 14 changed files with 81 additions and 31 deletions.
2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_ATTACH_TO_EXISTSING_DEPLOYMENT = def(8_771_00_0);
public static final TransportVersion CONVERT_FAILURE_STORE_OPTIONS_TO_SELECTOR_OPTIONS_INTERNALLY = def(8_772_00_0);
public static final TransportVersion INFERENCE_DONT_PERSIST_ON_READ_BACKPORT_8_16 = def(8_772_00_1);
public static final TransportVersion ADD_COMPATIBILITY_VERSIONS_TO_NODE_INFO_BACKPORT_8_16 = def(8_772_00_2);
public static final TransportVersion REMOVE_MIN_COMPATIBLE_SHARD_NODE = def(8_773_00_0);
public static final TransportVersion REVERT_REMOVE_MIN_COMPATIBLE_SHARD_NODE = def(8_774_00_0);
public static final TransportVersion ESQL_FIELD_ATTRIBUTE_PARENT_SIMPLIFIED = def(8_775_00_0);
Expand All @@ -191,6 +192,7 @@ static TransportVersion def(int id) {
public static final TransportVersion KQL_QUERY_ADDED = def(8_786_00_0);
public static final TransportVersion ROLE_MONITOR_STATS = def(8_787_00_0);
public static final TransportVersion DATA_STREAM_INDEX_VERSION_DEPRECATION_CHECK = def(8_788_00_0);
public static final TransportVersion ADD_COMPATIBILITY_VERSIONS_TO_NODE_INFO = def(8_789_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.ingest.IngestInfo;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.os.OsInfo;
Expand All @@ -42,7 +44,7 @@
public class NodeInfo extends BaseNodeResponse {

private final String version;
private final TransportVersion transportVersion;
private final CompatibilityVersions compatibilityVersions;
private final IndexVersion indexVersion;
private final Map<String, Integer> componentVersions;
private final Build build;
Expand All @@ -64,15 +66,20 @@ public NodeInfo(StreamInput in) throws IOException {
super(in);
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
version = in.readString();
transportVersion = TransportVersion.readVersion(in);
if (in.getTransportVersion().isPatchFrom(TransportVersions.ADD_COMPATIBILITY_VERSIONS_TO_NODE_INFO_BACKPORT_8_16)
|| in.getTransportVersion().onOrAfter(TransportVersions.ADD_COMPATIBILITY_VERSIONS_TO_NODE_INFO)) {
compatibilityVersions = CompatibilityVersions.readVersion(in);
} else {
compatibilityVersions = new CompatibilityVersions(TransportVersion.readVersion(in), Map.of()); // unknown mappings versions
}
indexVersion = IndexVersion.readVersion(in);
} else {
Version legacyVersion = Version.readVersion(in);
version = legacyVersion.toString();
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0)) {
transportVersion = TransportVersion.readVersion(in);
compatibilityVersions = new CompatibilityVersions(TransportVersion.readVersion(in), Map.of()); // unknown mappings versions
} else {
transportVersion = TransportVersion.fromId(legacyVersion.id);
compatibilityVersions = new CompatibilityVersions(TransportVersion.fromId(legacyVersion.id), Map.of());
}
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_11_X)) {
indexVersion = IndexVersion.readVersion(in);
Expand Down Expand Up @@ -114,7 +121,7 @@ public NodeInfo(StreamInput in) throws IOException {

public NodeInfo(
String version,
TransportVersion transportVersion,
CompatibilityVersions compatibilityVersions,
IndexVersion indexVersion,
Map<String, Integer> componentVersions,
Build build,
Expand All @@ -134,7 +141,7 @@ public NodeInfo(
) {
super(node);
this.version = version;
this.transportVersion = transportVersion;
this.compatibilityVersions = compatibilityVersions;
this.indexVersion = indexVersion;
this.componentVersions = componentVersions;
this.build = build;
Expand Down Expand Up @@ -171,7 +178,7 @@ public String getVersion() {
* The most recent transport version that can be used by this node
*/
public TransportVersion getTransportVersion() {
return transportVersion;
return compatibilityVersions.transportVersion();
}

/**
Expand All @@ -188,6 +195,13 @@ public Map<String, Integer> getComponentVersions() {
return componentVersions;
}

/**
* A map of system index names to versions for their mappings supported by this node.
*/
public Map<String, SystemIndexDescriptor.MappingsVersion> getCompatibilityVersions() {
return compatibilityVersions.systemIndexMappingsVersion();
}

/**
* The build version of the node.
*/
Expand Down Expand Up @@ -240,8 +254,11 @@ public void writeTo(StreamOutput out) throws IOException {
} else {
Version.writeVersion(Version.fromString(version), out);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0)) {
TransportVersion.writeVersion(transportVersion, out);
if (out.getTransportVersion().isPatchFrom(TransportVersions.ADD_COMPATIBILITY_VERSIONS_TO_NODE_INFO_BACKPORT_8_16)
|| out.getTransportVersion().onOrAfter(TransportVersions.ADD_COMPATIBILITY_VERSIONS_TO_NODE_INFO)) {
compatibilityVersions.writeTo(out);
} else if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0)) {
TransportVersion.writeVersion(compatibilityVersions.transportVersion(), out);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_11_X)) {
IndexVersion.writeVersion(indexVersion, out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,8 @@ private void construct(
searchTransportService,
indexingLimits,
searchModule.getValuesSourceRegistry().getUsageService(),
repositoriesService
repositoriesService,
compatibilityVersions
);

final TimeValue metricsInterval = settings.getAsTime("telemetry.agent.metrics_interval", TimeValue.timeValueSeconds(10));
Expand Down
9 changes: 6 additions & 3 deletions server/src/main/java/org/elasticsearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
package org.elasticsearch.node;

import org.elasticsearch.Build;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.info.ComponentVersionNumber;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
Expand All @@ -19,6 +18,7 @@
import org.elasticsearch.action.search.SearchTransportService;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand Down Expand Up @@ -65,6 +65,7 @@ public class NodeService implements Closeable {
private final Coordinator coordinator;
private final RepositoriesService repositoriesService;
private final Map<String, Integer> componentVersions;
private final CompatibilityVersions compatibilityVersions;

NodeService(
Settings settings,
Expand All @@ -84,7 +85,8 @@ public class NodeService implements Closeable {
SearchTransportService searchTransportService,
IndexingPressure indexingPressure,
AggregationUsageService aggregationUsageService,
RepositoriesService repositoriesService
RepositoriesService repositoriesService,
CompatibilityVersions compatibilityVersions
) {
this.settings = settings;
this.threadPool = threadPool;
Expand All @@ -104,6 +106,7 @@ public class NodeService implements Closeable {
this.aggregationUsageService = aggregationUsageService;
this.repositoriesService = repositoriesService;
this.componentVersions = findComponentVersions(pluginService);
this.compatibilityVersions = compatibilityVersions;
clusterService.addStateApplier(ingestService);
}

Expand All @@ -124,7 +127,7 @@ public NodeInfo info(
return new NodeInfo(
// TODO: revert to Build.current().version() when Kibana is updated
Version.CURRENT.toString(),
TransportVersion.current(),
compatibilityVersions,
IndexVersion.current(),
componentVersions,
Build.current(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.Build;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.monitor.jvm.JvmInfo;
Expand Down Expand Up @@ -40,7 +41,7 @@ public class NodeInfoTests extends ESTestCase {
public void testGetInfo() {
NodeInfo nodeInfo = new NodeInfo(
Build.current().version(),
TransportVersion.current(),
new CompatibilityVersions(TransportVersion.current(), Map.of()),
IndexVersion.current(),
Map.of(),
Build.current(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
Expand Down Expand Up @@ -78,7 +79,7 @@ public void testDoExecuteForRemoteServerNodes() {
nodeInfos.add(
new NodeInfo(
Build.current().version(),
TransportVersion.current(),
new CompatibilityVersions(TransportVersion.current(), Map.of()),
IndexVersion.current(),
Map.of(),
null,
Expand Down Expand Up @@ -156,7 +157,7 @@ public void testDoExecuteForRemoteNodes() {
nodeInfos.add(
new NodeInfo(
Build.current().version(),
TransportVersion.current(),
new CompatibilityVersions(TransportVersion.current(), Map.of()),
IndexVersion.current(),
Map.of(),
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStatsTests;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -327,7 +328,7 @@ private static NodeInfo createNodeInfo(String nodeId, String transportType, Stri
}
return new NodeInfo(
Build.current().version(),
TransportVersion.current(),
new CompatibilityVersions(TransportVersion.current(), Map.of()),
IndexVersion.current(),
Map.of(),
Build.current(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
Expand Down Expand Up @@ -103,7 +104,7 @@ public void setup() {

NodeInfo nodeInfo = new NodeInfo(
Build.current().version(),
TransportVersion.current(),
new CompatibilityVersions(TransportVersion.current(), Map.of()),
IndexVersion.current(),
Map.of(),
Build.current(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,22 @@
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.Scheduler;
import org.mockito.ArgumentCaptor;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;

import static java.util.Map.entry;
import static org.elasticsearch.test.LambdaMatchers.transformedMatch;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.same;
Expand Down Expand Up @@ -77,7 +81,7 @@ private static <T> Map<String, T> versions(T... versions) {
return tvs;
}

private static NodesInfoResponse getResponse(Map<String, TransportVersion> responseData) {
private static NodesInfoResponse getResponse(Map<String, CompatibilityVersions> responseData) {
return new NodesInfoResponse(
ClusterName.DEFAULT,
responseData.entrySet()
Expand Down Expand Up @@ -207,10 +211,19 @@ public void testVersionsAreFixed() {
argThat(transformedMatch(NodesInfoRequest::nodesIds, arrayContainingInAnyOrder("node1", "node2"))),
action.capture()
);
action.getValue().onResponse(getResponse(Map.of("node1", NEXT_TRANSPORT_VERSION, "node2", NEXT_TRANSPORT_VERSION)));
action.getValue()
.onResponse(
getResponse(
Map.ofEntries(
entry("node1", new CompatibilityVersions(NEXT_TRANSPORT_VERSION, Map.of())),
entry("node2", new CompatibilityVersions(NEXT_TRANSPORT_VERSION, Map.of()))
)
)
);
verify(taskQueue).submitTask(anyString(), task.capture(), any());

assertThat(task.getValue().results(), equalTo(Map.of("node1", NEXT_TRANSPORT_VERSION, "node2", NEXT_TRANSPORT_VERSION)));
assertThat(task.getValue().results().keySet(), equalTo(Set.of("node1", "node2")));
assertThat(task.getValue().results().values(), everyItem(equalTo(NEXT_TRANSPORT_VERSION)));
}

public void testConcurrentChangesDoNotOverlap() {
Expand Down Expand Up @@ -259,12 +272,17 @@ public void testFailedRequestsAreRetried() {
Scheduler scheduler = mock(Scheduler.class);
Executor executor = mock(Executor.class);

var compatibilityVersions = new CompatibilityVersions(
TransportVersion.current(),
Map.of(".system-index-1", new SystemIndexDescriptor.MappingsVersion(1, 1234))
);
ClusterState testState1 = ClusterState.builder(ClusterState.EMPTY_STATE)
.nodes(node(NEXT_VERSION, NEXT_VERSION, NEXT_VERSION))
.nodes(node(Version.CURRENT, Version.CURRENT, Version.CURRENT))
.nodeIdsToCompatibilityVersions(
Maps.transformValues(
versions(NEXT_TRANSPORT_VERSION, TransportVersions.V_8_8_0, TransportVersions.V_8_8_0),
transportVersion -> new CompatibilityVersions(transportVersion, Map.of())
Map.ofEntries(
entry("node0", compatibilityVersions),
entry("node1", new CompatibilityVersions(TransportVersions.V_8_8_0, Map.of())),
entry("node2", new CompatibilityVersions(TransportVersions.V_8_8_0, Map.of()))
)
)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.admin.cluster.node.info.PluginsAndModules;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -241,7 +242,7 @@ private static NodeInfo createNodeInfo() {
}
return new NodeInfo(
randomAlphaOfLengthBetween(6, 32),
TransportVersionUtils.randomVersion(random()),
new CompatibilityVersions(TransportVersionUtils.randomVersion(random()), Map.of()),
IndexVersionUtils.randomVersion(random()),
componentVersions,
build,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.common.Table;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.plugins.PluginDescriptor;
Expand Down Expand Up @@ -66,7 +67,7 @@ private Table buildTable(List<PluginDescriptor> pluginDescriptor) {
nodeInfos.add(
new NodeInfo(
Build.current().version(),
TransportVersion.current(),
new CompatibilityVersions(TransportVersion.current(), Map.of()),
IndexVersion.current(),
Map.of(),
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
Expand Down Expand Up @@ -452,7 +453,7 @@ private static org.elasticsearch.action.admin.cluster.node.info.NodeInfo infoFor
OsInfo osInfo = new OsInfo(randomLong(), processors, Processors.of((double) processors), null, null, null, null);
return new org.elasticsearch.action.admin.cluster.node.info.NodeInfo(
Build.current().version(),
TransportVersion.current(),
new CompatibilityVersions(TransportVersion.current(), Map.of()),
IndexVersion.current(),
Map.of(),
Build.current(),
Expand Down
Loading

0 comments on commit 0ce310a

Please sign in to comment.