From 9f0cfaaca0b68823e8cc14bfc8f1f10cef992333 Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Mon, 20 Nov 2023 11:40:54 +0800 Subject: [PATCH 01/11] Force merge API supports performing on primary shards only Signed-off-by: Gao Binlong --- CHANGELOG.md | 1 + .../test/indices.forcemerge/10_basic.yml | 20 +++++++++++++ .../indices/forcemerge/ForceMergeRequest.java | 23 +++++++++++++++ .../forcemerge/TransportForceMergeAction.java | 9 ++++-- .../cluster/routing/RoutingTable.java | 10 +++++++ .../admin/indices/RestForceMergeAction.java | 1 + .../forcemerge/ForceMergeRequestTests.java | 28 +++++++++++++++++-- .../cluster/routing/RoutingTableTests.java | 25 +++++++++++++++++ 8 files changed, 112 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b9d3ff90cd32..5f497425d8456 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -106,6 +106,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Adding slf4j license header to LoggerMessageFormat.java ([#11069](https://github.com/opensearch-project/OpenSearch/pull/11069)) - [Streaming Indexing] Introduce new experimental server HTTP transport based on Netty 4 and Project Reactor (Reactor Netty) ([#9672](https://github.com/opensearch-project/OpenSearch/pull/9672)) - Allowing pipeline processors to access index mapping info by passing ingest service ref as part of the processor factory parameters ([#10307](https://github.com/opensearch-project/OpenSearch/pull/10307)) +- Force merge API supports performing on primary shards only ### Dependencies - Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298)) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/10_basic.yml index d62c4c8882b13..caf5fd4f8d64f 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/10_basic.yml @@ -27,3 +27,23 @@ index: test max_num_segments: 10 only_expunge_deletes: true + +--- +"Test primary_only parameter": + - skip: + version: " - 2.11.99" + reason: "primary_only was added in 2.12" + + - do: + indices.create: + index: test + body: + settings: + index.number_of_shards: 2 + index.number_of_replicas: 1 + + - do: + indices.forcemerge: + index: test + primary_only: true + - match: { _shards.total: 2 } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java index f38b49f434261..659f82df9fcf6 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java @@ -69,11 +69,13 @@ public static final class Defaults { public static final int MAX_NUM_SEGMENTS = -1; public static final boolean ONLY_EXPUNGE_DELETES = false; public static final boolean FLUSH = true; + public static final boolean PRIMARY_ONLY = false; } private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS; private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES; private boolean flush = Defaults.FLUSH; + private boolean primaryOnly = Defaults.PRIMARY_ONLY; private static final Version FORCE_MERGE_UUID_VERSION = Version.V_3_0_0; @@ -100,6 +102,7 @@ public ForceMergeRequest(StreamInput in) throws IOException { maxNumSegments = in.readInt(); onlyExpungeDeletes = in.readBoolean(); flush = in.readBoolean(); + shouldStoreResult = in.readBoolean(); if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) { forceMergeUUID = in.readString(); } else if ((forceMergeUUID = in.readOptionalString()) == null) { @@ -166,6 +169,21 @@ public ForceMergeRequest flush(boolean flush) { return this; } + /** + * Should force merge only performed on primary shards. Defaults to {@code false}. + */ + public boolean primaryOnly() { + return primaryOnly; + } + + /** + * Should force merge only performed on primary shards. Defaults to {@code false}. + */ + public ForceMergeRequest primaryOnly(boolean primaryOnly) { + this.primaryOnly = primaryOnly; + return this; + } + /** * Should this task store its result after it has finished? */ @@ -188,6 +206,8 @@ public String getDescription() { + onlyExpungeDeletes + "], flush[" + flush + + "], primaryOnly[" + + primaryOnly + "]"; } @@ -197,6 +217,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeInt(maxNumSegments); out.writeBoolean(onlyExpungeDeletes); out.writeBoolean(flush); + out.writeBoolean(primaryOnly); if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) { out.writeString(forceMergeUUID); } else { @@ -213,6 +234,8 @@ public String toString() { + onlyExpungeDeletes + ", flush=" + flush + + ", primaryOnly=" + + primaryOnly + '}'; } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/TransportForceMergeAction.java b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/TransportForceMergeAction.java index fb8eb86c12269..b71c75462900a 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/TransportForceMergeAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/TransportForceMergeAction.java @@ -115,11 +115,16 @@ protected EmptyResult shardOperation(ForceMergeRequest request, ShardRouting sha } /** - * The refresh request works against *all* shards. + * The force merge request works against *all* shards by default, but it can work against all primary shards only + * by setting primary_only to true. */ @Override protected ShardsIterator shards(ClusterState clusterState, ForceMergeRequest request, String[] concreteIndices) { - return clusterState.routingTable().allShards(concreteIndices); + if (request.primaryOnly()) { + return clusterState.routingTable().allShardsSatisfyingPredicate(concreteIndices, ShardRouting::primary); + } else { + return clusterState.routingTable().allShards(concreteIndices); + } } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java index 7f2382f8b4910..e4095a84be081 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java @@ -307,6 +307,16 @@ public ShardsIterator allShardsSatisfyingPredicate(Predicate predi return allShardsSatisfyingPredicate(indices, predicate, false); } + /** + * All the shards for the provided indices on the node which match the predicate + * @param indices indices to return all the shards. + * @param predicate condition to match + * @return iterator over shards matching the predicate for the specific indices + */ + public ShardsIterator allShardsSatisfyingPredicate(String[] indices, Predicate predicate) { + return allShardsSatisfyingPredicate(indices, predicate, false); + } + private ShardsIterator allShardsSatisfyingPredicate( String[] indices, Predicate predicate, diff --git a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestForceMergeAction.java b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestForceMergeAction.java index 06f1d5f46f90b..f3e66bd20cd86 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestForceMergeAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestForceMergeAction.java @@ -76,6 +76,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC mergeRequest.maxNumSegments(request.paramAsInt("max_num_segments", mergeRequest.maxNumSegments())); mergeRequest.onlyExpungeDeletes(request.paramAsBoolean("only_expunge_deletes", mergeRequest.onlyExpungeDeletes())); mergeRequest.flush(request.paramAsBoolean("flush", mergeRequest.flush())); + mergeRequest.primaryOnly(request.paramAsBoolean("primary_only", mergeRequest.primaryOnly())); if (mergeRequest.onlyExpungeDeletes() && mergeRequest.maxNumSegments() != ForceMergeRequest.Defaults.MAX_NUM_SEGMENTS) { deprecationLogger.deprecate( "force_merge_expunge_deletes_and_max_num_segments_deprecation", diff --git a/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java b/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java index 87ba6110447c1..93296a6b22533 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java @@ -37,15 +37,37 @@ public class ForceMergeRequestTests extends OpenSearchTestCase { public void testDescription() { ForceMergeRequest request = new ForceMergeRequest(); - assertEquals("Force-merge indices [], maxSegments[-1], onlyExpungeDeletes[false], flush[true]", request.getDescription()); + assertEquals( + "Force-merge indices [], maxSegments[-1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]", + request.getDescription() + ); request = new ForceMergeRequest("shop", "blog"); - assertEquals("Force-merge indices [shop, blog], maxSegments[-1], onlyExpungeDeletes[false], flush[true]", request.getDescription()); + assertEquals( + "Force-merge indices [shop, blog], maxSegments[-1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]", + request.getDescription() + ); request = new ForceMergeRequest(); request.maxNumSegments(12); request.onlyExpungeDeletes(true); request.flush(false); - assertEquals("Force-merge indices [], maxSegments[12], onlyExpungeDeletes[true], flush[false]", request.getDescription()); + request.primaryOnly(true); + assertEquals( + "Force-merge indices [], maxSegments[12], onlyExpungeDeletes[true], flush[false], primaryOnly[true]", + request.getDescription() + ); + } + + public void testToString() { + ForceMergeRequest request = new ForceMergeRequest(); + assertEquals("ForceMergeRequest{maxNumSegments=-1, onlyExpungeDeletes=false, flush=true, primaryOnly=false}", request.toString()); + + request = new ForceMergeRequest(); + request.maxNumSegments(12); + request.onlyExpungeDeletes(true); + request.flush(false); + request.primaryOnly(true); + assertEquals("ForceMergeRequest{maxNumSegments=12, onlyExpungeDeletes=true, flush=false, primaryOnly=true}", request.toString()); } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java b/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java index 8542ff53c6ff1..97283f561d6d4 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java @@ -279,6 +279,31 @@ public void testAllShardsMatchingPredicate() { ); } + public void testAllShardsMatchingPredicateWithSpecificIndices() { + MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator()); + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .put(IndexMetadata.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(RoutingTable.builder().addAsNew(metadata.index("test1")).addAsNew(metadata.index("test2")).build()) + .build(); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))) + .build(); + clusterState = allocation.reroute(clusterState, "reroute"); + + String[] indices = new String[] { "test1", "test2" }; + // Verifies against all primary shards on the node + assertThat(clusterState.routingTable().allShardsSatisfyingPredicate(indices, ShardRouting::primary).size(), is(2)); + // Verifies against all replica shards on the node + assertThat( + clusterState.routingTable().allShardsSatisfyingPredicate(indices, shardRouting -> !shardRouting.primary()).size(), + is(2) + ); + } + public void testActivePrimaryShardsGrouped() { assertThat(this.emptyRoutingTable.activePrimaryShardsGrouped(new String[0], true).size(), is(0)); assertThat(this.emptyRoutingTable.activePrimaryShardsGrouped(new String[0], false).size(), is(0)); From 0445b4617b32857cf24c894cc0c927497fc3d41c Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Mon, 20 Nov 2023 14:55:13 +0800 Subject: [PATCH 02/11] Modify change log Signed-off-by: Gao Binlong --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f497425d8456..30942a8c2567c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -106,7 +106,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Adding slf4j license header to LoggerMessageFormat.java ([#11069](https://github.com/opensearch-project/OpenSearch/pull/11069)) - [Streaming Indexing] Introduce new experimental server HTTP transport based on Netty 4 and Project Reactor (Reactor Netty) ([#9672](https://github.com/opensearch-project/OpenSearch/pull/9672)) - Allowing pipeline processors to access index mapping info by passing ingest service ref as part of the processor factory parameters ([#10307](https://github.com/opensearch-project/OpenSearch/pull/10307)) -- Force merge API supports performing on primary shards only +- Force merge API supports performing on primary shards only ([#11269](https://github.com/opensearch-project/OpenSearch/pull/11269)) ### Dependencies - Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298)) From 2ba5e5d0f8b172245834a06e96c73c34eb05f284 Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Mon, 20 Nov 2023 15:51:48 +0800 Subject: [PATCH 03/11] Fix test failure Signed-off-by: Gao Binlong --- .../main/resources/rest-api-spec/api/indices.forcemerge.json | 4 ++++ .../test/indices.forcemerge/20_wait_for_completion.yml | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.forcemerge.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.forcemerge.json index 02fbcc36dfe64..986bce55f41e5 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.forcemerge.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.forcemerge.json @@ -63,6 +63,10 @@ "wait_for_completion": { "type" : "boolean", "description" : "If false, the request will return a task immediately and the operation will run in background. Defaults to true." + }, + "primary_only": { + "type" : "boolean", + "description" : "Specify whether the operation should only perform on primary shards. Defaults to false." } } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/20_wait_for_completion.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/20_wait_for_completion.yml index 9561ecd89fdad..efa239547e84a 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/20_wait_for_completion.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/20_wait_for_completion.yml @@ -25,7 +25,7 @@ wait_for_completion: true task_id: $taskId - match: { task.action: "indices:admin/forcemerge" } - - match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true]" } + - match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]" } # .tasks index is created when the force-merge operation completes, so we should delete .tasks index finally, # if not, the .tasks index may introduce unexpected warnings and then cause other test cases to fail. From dfe12a9c08462f628bffb7121706704c8b20cf69 Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Mon, 20 Nov 2023 17:55:33 +0800 Subject: [PATCH 04/11] Fix typo Signed-off-by: Gao Binlong --- .../action/admin/indices/forcemerge/ForceMergeRequest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java index 659f82df9fcf6..83b3fc5d49526 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java @@ -102,7 +102,7 @@ public ForceMergeRequest(StreamInput in) throws IOException { maxNumSegments = in.readInt(); onlyExpungeDeletes = in.readBoolean(); flush = in.readBoolean(); - shouldStoreResult = in.readBoolean(); + primaryOnly = in.readBoolean(); if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) { forceMergeUUID = in.readString(); } else if ((forceMergeUUID = in.readOptionalString()) == null) { From be1079bc22f41a5859d8a76493b47ec4f2cac8c6 Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Tue, 21 Nov 2023 22:30:29 +0800 Subject: [PATCH 05/11] Modify skip version Signed-off-by: Gao Binlong --- .../rest-api-spec/test/indices.forcemerge/10_basic.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/10_basic.yml index caf5fd4f8d64f..39fb1604d9596 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/10_basic.yml @@ -31,8 +31,8 @@ --- "Test primary_only parameter": - skip: - version: " - 2.11.99" - reason: "primary_only was added in 2.12" + version: " - 2.99.99" + reason: "primary_only is available in 3.0+" - do: indices.create: From 74006a23cb76ac376420c9976e7b8596296d07d9 Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Fri, 1 Dec 2023 16:29:10 +0800 Subject: [PATCH 06/11] Add version check and more tests Signed-off-by: Gao Binlong --- .../indices/forcemerge/ForceMergeRequest.java | 8 +- .../forcemerge/ForceMergeRequestBuilder.java | 8 ++ .../forcemerge/ForceMergeRequestTests.java | 97 +++++++++++++++++++ 3 files changed, 111 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java index 83b3fc5d49526..0f2eb6658864a 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java @@ -102,7 +102,9 @@ public ForceMergeRequest(StreamInput in) throws IOException { maxNumSegments = in.readInt(); onlyExpungeDeletes = in.readBoolean(); flush = in.readBoolean(); - primaryOnly = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_2_12_0)){ + primaryOnly = in.readBoolean(); + } if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) { forceMergeUUID = in.readString(); } else if ((forceMergeUUID = in.readOptionalString()) == null) { @@ -217,7 +219,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeInt(maxNumSegments); out.writeBoolean(onlyExpungeDeletes); out.writeBoolean(flush); - out.writeBoolean(primaryOnly); + if (out.getVersion().onOrAfter(Version.V_2_12_0)){ + out.writeBoolean(primaryOnly); + } if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) { out.writeString(forceMergeUUID); } else { diff --git a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestBuilder.java index d8a618a1828ad..10b9749f16b27 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestBuilder.java @@ -81,4 +81,12 @@ public ForceMergeRequestBuilder setFlush(boolean flush) { request.flush(flush); return this; } + + /** + * Should force merge only performed on primary shards. Defaults to {@code false}. + */ + public ForceMergeRequestBuilder setPrimaryOnly(boolean primaryOnly) { + request.primaryOnly(primaryOnly); + return this; + } } diff --git a/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java b/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java index 93296a6b22533..bc65bdc5a9682 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java @@ -31,7 +31,11 @@ package org.opensearch.action.admin.indices.forcemerge; +import org.opensearch.Version; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.VersionUtils; public class ForceMergeRequestTests extends OpenSearchTestCase { @@ -70,4 +74,97 @@ public void testToString() { request.primaryOnly(true); assertEquals("ForceMergeRequest{maxNumSegments=12, onlyExpungeDeletes=true, flush=false, primaryOnly=true}", request.toString()); } + + public void testSerialization() throws Exception { + final ForceMergeRequest request = randomRequest(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + request.writeTo(out); + + final ForceMergeRequest deserializedRequest; + try (StreamInput in = out.bytes().streamInput()) { + deserializedRequest = new ForceMergeRequest(in); + } + assertEquals(request.maxNumSegments(), deserializedRequest.maxNumSegments()); + assertEquals(request.onlyExpungeDeletes(), deserializedRequest.onlyExpungeDeletes()); + assertEquals(request.flush(), deserializedRequest.flush()); + assertEquals(request.primaryOnly(), deserializedRequest.primaryOnly()); + assertEquals(request.forceMergeUUID(), deserializedRequest.forceMergeUUID()); + } + } + + public void testBwcSerialization() throws Exception { + { + final ForceMergeRequest sample = randomRequest(); + final Version compatibleVersion = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT); + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(compatibleVersion); + sample.writeTo(out); + + System.out.println(compatibleVersion); + + final ForceMergeRequest deserializedRequest; + try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(Version.CURRENT); + deserializedRequest = new ForceMergeRequest(in); + } + + assertEquals(sample.maxNumSegments(), deserializedRequest.maxNumSegments()); + assertEquals(sample.onlyExpungeDeletes(), deserializedRequest.onlyExpungeDeletes()); + assertEquals(sample.flush(), deserializedRequest.flush()); + if (compatibleVersion.onOrAfter(Version.V_2_12_0)) { + assertEquals(sample.primaryOnly(), deserializedRequest.primaryOnly()); + if (compatibleVersion.onOrAfter(Version.V_3_0_0)) { + assertEquals(sample.forceMergeUUID(), deserializedRequest.forceMergeUUID()); + } + } + } + } + + { + final ForceMergeRequest sample = randomRequest(); + final Version compatibleVersion = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT); + System.out.println(compatibleVersion); + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(Version.CURRENT); + sample.getParentTask().writeTo(out); + out.writeStringArray(sample.indices()); + sample.indicesOptions().writeIndicesOptions(out); + out.writeInt(sample.maxNumSegments()); + out.writeBoolean(sample.onlyExpungeDeletes()); + out.writeBoolean(sample.flush()); + if (compatibleVersion.onOrAfter(Version.V_2_12_0)) { + out.writeBoolean(sample.primaryOnly()); + } + if (compatibleVersion.onOrAfter(Version.V_3_0_0)) { + out.writeString(sample.forceMergeUUID()); + } else { + out.writeOptionalString(sample.forceMergeUUID()); + } + + final ForceMergeRequest deserializedRequest; + try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(compatibleVersion); + deserializedRequest = new ForceMergeRequest(in); + } + + assertEquals(sample.maxNumSegments(), deserializedRequest.maxNumSegments()); + assertEquals(sample.onlyExpungeDeletes(), deserializedRequest.onlyExpungeDeletes()); + assertEquals(sample.flush(), deserializedRequest.flush()); + assertEquals(sample.primaryOnly(), deserializedRequest.primaryOnly()); + assertEquals(sample.forceMergeUUID(), deserializedRequest.forceMergeUUID()); + + } + } + } + + private ForceMergeRequest randomRequest() { + ForceMergeRequest request = new ForceMergeRequest(); + if (randomBoolean()) { + request.maxNumSegments(randomIntBetween(1, 10)); + } + request.onlyExpungeDeletes(true); + request.flush(randomBoolean()); + request.primaryOnly(randomBoolean()); + return request; + } } From 2e990e3d281e653bde31a1af3211e661ab9c1cb8 Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Fri, 1 Dec 2023 17:16:06 +0800 Subject: [PATCH 07/11] Format code Signed-off-by: Gao Binlong --- .../action/admin/indices/forcemerge/ForceMergeRequest.java | 4 ++-- .../admin/indices/forcemerge/ForceMergeRequestTests.java | 3 --- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java index 0f2eb6658864a..0a6aba039c473 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java @@ -102,7 +102,7 @@ public ForceMergeRequest(StreamInput in) throws IOException { maxNumSegments = in.readInt(); onlyExpungeDeletes = in.readBoolean(); flush = in.readBoolean(); - if (in.getVersion().onOrAfter(Version.V_2_12_0)){ + if (in.getVersion().onOrAfter(Version.V_2_12_0)) { primaryOnly = in.readBoolean(); } if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) { @@ -219,7 +219,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeInt(maxNumSegments); out.writeBoolean(onlyExpungeDeletes); out.writeBoolean(flush); - if (out.getVersion().onOrAfter(Version.V_2_12_0)){ + if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeBoolean(primaryOnly); } if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) { diff --git a/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java b/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java index bc65bdc5a9682..96d19b6879ba1 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java @@ -100,8 +100,6 @@ public void testBwcSerialization() throws Exception { out.setVersion(compatibleVersion); sample.writeTo(out); - System.out.println(compatibleVersion); - final ForceMergeRequest deserializedRequest; try (StreamInput in = out.bytes().streamInput()) { in.setVersion(Version.CURRENT); @@ -123,7 +121,6 @@ public void testBwcSerialization() throws Exception { { final ForceMergeRequest sample = randomRequest(); final Version compatibleVersion = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT); - System.out.println(compatibleVersion); try (BytesStreamOutput out = new BytesStreamOutput()) { out.setVersion(Version.CURRENT); sample.getParentTask().writeTo(out); From 709621f2d97fe035924f1a7b8ff9079e3af284cf Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Fri, 1 Mar 2024 18:30:55 +0800 Subject: [PATCH 08/11] Modify supported version and add more test Signed-off-by: Gao Binlong --- .../admin/indices/forcemerge/ForceMergeIT.java | 18 ++++++++++++++++++ .../indices/forcemerge/ForceMergeRequest.java | 4 ++-- .../forcemerge/ForceMergeRequestTests.java | 4 ++-- 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeIT.java index 09af533292e9a..5090af1706d5a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeIT.java @@ -100,6 +100,24 @@ public void testForceMergeUUIDConsistent() throws IOException { assertThat(primaryForceMergeUUID, is(replicaForceMergeUUID)); } + public void testForceMergeOnlyOnPrimaryShards() throws IOException { + internalCluster().ensureAtLeastNumDataNodes(2); + final String index = "test-index"; + createIndex( + index, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build() + ); + ensureGreen(index); + final ForceMergeResponse forceMergeResponse = client().admin() + .indices() + .prepareForceMerge(index) + .setMaxNumSegments(1) + .setPrimaryOnly(true) + .get(); + assertThat(forceMergeResponse.getFailedShards(), is(0)); + assertThat(forceMergeResponse.getSuccessfulShards(), is(1)); + } + private static String getForceMergeUUID(IndexShard indexShard) throws IOException { try (GatedCloseable wrappedIndexCommit = indexShard.acquireLastIndexCommit(true)) { return wrappedIndexCommit.get().getUserData().get(Engine.FORCE_MERGE_UUID_KEY); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java index 0a6aba039c473..3efc4db21afbc 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java @@ -102,7 +102,7 @@ public ForceMergeRequest(StreamInput in) throws IOException { maxNumSegments = in.readInt(); onlyExpungeDeletes = in.readBoolean(); flush = in.readBoolean(); - if (in.getVersion().onOrAfter(Version.V_2_12_0)) { + if (in.getVersion().onOrAfter(Version.V_2_13_0)) { primaryOnly = in.readBoolean(); } if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) { @@ -219,7 +219,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeInt(maxNumSegments); out.writeBoolean(onlyExpungeDeletes); out.writeBoolean(flush); - if (out.getVersion().onOrAfter(Version.V_2_12_0)) { + if (out.getVersion().onOrAfter(Version.V_2_13_0)) { out.writeBoolean(primaryOnly); } if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) { diff --git a/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java b/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java index 96d19b6879ba1..3ac044d23f191 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java @@ -109,7 +109,7 @@ public void testBwcSerialization() throws Exception { assertEquals(sample.maxNumSegments(), deserializedRequest.maxNumSegments()); assertEquals(sample.onlyExpungeDeletes(), deserializedRequest.onlyExpungeDeletes()); assertEquals(sample.flush(), deserializedRequest.flush()); - if (compatibleVersion.onOrAfter(Version.V_2_12_0)) { + if (compatibleVersion.onOrAfter(Version.V_2_13_0)) { assertEquals(sample.primaryOnly(), deserializedRequest.primaryOnly()); if (compatibleVersion.onOrAfter(Version.V_3_0_0)) { assertEquals(sample.forceMergeUUID(), deserializedRequest.forceMergeUUID()); @@ -129,7 +129,7 @@ public void testBwcSerialization() throws Exception { out.writeInt(sample.maxNumSegments()); out.writeBoolean(sample.onlyExpungeDeletes()); out.writeBoolean(sample.flush()); - if (compatibleVersion.onOrAfter(Version.V_2_12_0)) { + if (compatibleVersion.onOrAfter(Version.V_2_13_0)) { out.writeBoolean(sample.primaryOnly()); } if (compatibleVersion.onOrAfter(Version.V_3_0_0)) { From 53c331ab392434af60d2e902374e510503b995ba Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Mon, 4 Mar 2024 17:37:26 +0800 Subject: [PATCH 09/11] Change the supported version to 3.0.0 Signed-off-by: Gao Binlong --- .../admin/indices/forcemerge/ForceMergeRequest.java | 4 ++-- .../indices/forcemerge/ForceMergeRequestTests.java | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java index 3efc4db21afbc..bf6ee9ca43755 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java @@ -102,7 +102,7 @@ public ForceMergeRequest(StreamInput in) throws IOException { maxNumSegments = in.readInt(); onlyExpungeDeletes = in.readBoolean(); flush = in.readBoolean(); - if (in.getVersion().onOrAfter(Version.V_2_13_0)) { + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { primaryOnly = in.readBoolean(); } if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) { @@ -219,7 +219,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeInt(maxNumSegments); out.writeBoolean(onlyExpungeDeletes); out.writeBoolean(flush); - if (out.getVersion().onOrAfter(Version.V_2_13_0)) { + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { out.writeBoolean(primaryOnly); } if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) { diff --git a/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java b/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java index 3ac044d23f191..a80141c52b6b4 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java @@ -109,11 +109,9 @@ public void testBwcSerialization() throws Exception { assertEquals(sample.maxNumSegments(), deserializedRequest.maxNumSegments()); assertEquals(sample.onlyExpungeDeletes(), deserializedRequest.onlyExpungeDeletes()); assertEquals(sample.flush(), deserializedRequest.flush()); - if (compatibleVersion.onOrAfter(Version.V_2_13_0)) { + if (compatibleVersion.onOrAfter(Version.V_3_0_0)) { assertEquals(sample.primaryOnly(), deserializedRequest.primaryOnly()); - if (compatibleVersion.onOrAfter(Version.V_3_0_0)) { - assertEquals(sample.forceMergeUUID(), deserializedRequest.forceMergeUUID()); - } + assertEquals(sample.forceMergeUUID(), deserializedRequest.forceMergeUUID()); } } } @@ -129,7 +127,7 @@ public void testBwcSerialization() throws Exception { out.writeInt(sample.maxNumSegments()); out.writeBoolean(sample.onlyExpungeDeletes()); out.writeBoolean(sample.flush()); - if (compatibleVersion.onOrAfter(Version.V_2_13_0)) { + if (compatibleVersion.onOrAfter(Version.V_3_0_0)) { out.writeBoolean(sample.primaryOnly()); } if (compatibleVersion.onOrAfter(Version.V_3_0_0)) { @@ -147,7 +145,9 @@ public void testBwcSerialization() throws Exception { assertEquals(sample.maxNumSegments(), deserializedRequest.maxNumSegments()); assertEquals(sample.onlyExpungeDeletes(), deserializedRequest.onlyExpungeDeletes()); assertEquals(sample.flush(), deserializedRequest.flush()); - assertEquals(sample.primaryOnly(), deserializedRequest.primaryOnly()); + if (compatibleVersion.onOrAfter(Version.V_3_0_0)) { + assertEquals(sample.primaryOnly(), deserializedRequest.primaryOnly()); + } assertEquals(sample.forceMergeUUID(), deserializedRequest.forceMergeUUID()); } From a7c3b16a75abdad898e88fd10be21e049cbf4be1 Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Thu, 7 Mar 2024 22:37:42 +0800 Subject: [PATCH 10/11] Add test case in SegmentReplicationIT Signed-off-by: Gao Binlong --- .../replication/SegmentReplicationIT.java | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 87dd48de38d3e..99395caeec12c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -25,6 +25,7 @@ import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse; import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.action.admin.indices.flush.FlushRequest; +import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.action.get.GetResponse; @@ -437,6 +438,52 @@ public void testReplicationAfterForceMerge() throws Exception { } } + public void testReplicationAfterForceMergeOnPrimaryShardsOnly() throws Exception { + final String nodeA = internalCluster().startDataOnlyNode(); + final String nodeB = internalCluster().startDataOnlyNode(); + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + + final int initialDocCount = scaledRandomIntBetween(0, 10); + final int additionalDocCount = scaledRandomIntBetween(0, 10); + final int expectedHitCount = initialDocCount + additionalDocCount; + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + + flush(INDEX_NAME); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); + + // Index a second set of docs so we can merge into one segment. + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); + + // Perform force merge only on the primary shards. + final ForceMergeResponse forceMergeResponse = client().admin() + .indices() + .prepareForceMerge(INDEX_NAME) + .setPrimaryOnly(true) + .setMaxNumSegments(1) + .setFlush(false) + .get(); + assertThat(forceMergeResponse.getFailedShards(), is(0)); + assertThat(forceMergeResponse.getSuccessfulShards(), is(1)); + refresh(INDEX_NAME); + verifyStoreContent(); + } + } + /** * This test verifies that segment replication does not fail for closed indices */ From 37bf12028ee0299710181c799cef1e2d964ae3eb Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Mon, 11 Mar 2024 13:58:57 +0800 Subject: [PATCH 11/11] Optimize the test code Signed-off-by: Gao Binlong --- .../replication/SegmentReplicationIT.java | 44 +++---------------- 1 file changed, 7 insertions(+), 37 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 99395caeec12c..70da3b0e38472 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -401,44 +401,14 @@ public void testMultipleShards() throws Exception { } public void testReplicationAfterForceMerge() throws Exception { - final String nodeA = internalCluster().startDataOnlyNode(); - final String nodeB = internalCluster().startDataOnlyNode(); - createIndex(INDEX_NAME); - ensureGreen(INDEX_NAME); - - final int initialDocCount = scaledRandomIntBetween(0, 10); - final int additionalDocCount = scaledRandomIntBetween(0, 10); - final int expectedHitCount = initialDocCount + additionalDocCount; - try ( - BackgroundIndexer indexer = new BackgroundIndexer( - INDEX_NAME, - "_doc", - client(), - -1, - RandomizedTest.scaledRandomIntBetween(2, 5), - false, - random() - ) - ) { - indexer.start(initialDocCount); - waitForDocs(initialDocCount, indexer); - - flush(INDEX_NAME); - waitForSearchableDocs(initialDocCount, nodeA, nodeB); - - // Index a second set of docs so we can merge into one segment. - indexer.start(additionalDocCount); - waitForDocs(expectedHitCount, indexer); - waitForSearchableDocs(expectedHitCount, nodeA, nodeB); - - // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. - client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); - refresh(INDEX_NAME); - verifyStoreContent(); - } + performReplicationAfterForceMerge(false, SHARD_COUNT * (1 + REPLICA_COUNT)); } public void testReplicationAfterForceMergeOnPrimaryShardsOnly() throws Exception { + performReplicationAfterForceMerge(true, SHARD_COUNT); + } + + private void performReplicationAfterForceMerge(boolean primaryOnly, int expectedSuccessfulShards) throws Exception { final String nodeA = internalCluster().startDataOnlyNode(); final String nodeB = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); @@ -473,12 +443,12 @@ public void testReplicationAfterForceMergeOnPrimaryShardsOnly() throws Exception final ForceMergeResponse forceMergeResponse = client().admin() .indices() .prepareForceMerge(INDEX_NAME) - .setPrimaryOnly(true) + .setPrimaryOnly(primaryOnly) .setMaxNumSegments(1) .setFlush(false) .get(); assertThat(forceMergeResponse.getFailedShards(), is(0)); - assertThat(forceMergeResponse.getSuccessfulShards(), is(1)); + assertThat(forceMergeResponse.getSuccessfulShards(), is(expectedSuccessfulShards)); refresh(INDEX_NAME); verifyStoreContent(); }