Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force merge API supports performing only on primary shards #11269

Merged
merged 21 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add toString methods to MultiSearchRequest, MultiGetRequest and CreateIndexRequest ([#12163](https://github.com/opensearch-project/OpenSearch/pull/12163))
- Support for returning scores in matched queries ([#11626](https://github.com/opensearch-project/OpenSearch/pull/11626))
- Add shard id property to SearchLookup for use in field types provided by plugins ([#1063](https://github.com/opensearch-project/OpenSearch/pull/1063))
- Force merge API supports performing on primary shards only ([#11269](https://github.com/opensearch-project/OpenSearch/pull/11269))
- [Tiered caching] Make IndicesRequestCache implementation configurable [EXPERIMENTAL] ([#12533](https://github.com/opensearch-project/OpenSearch/pull/12533))
- Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835))
- The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
"wait_for_completion": {
"type" : "boolean",
"description" : "If false, the request will return a task immediately and the operation will run in background. Defaults to true."
},
"primary_only": {
"type" : "boolean",
"description" : "Specify whether the operation should only perform on primary shards. Defaults to false."
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,23 @@
index: test
max_num_segments: 10
only_expunge_deletes: true

---
"Test primary_only parameter":
- skip:
version: " - 2.99.99"
reason: "primary_only is available in 3.0+"

- do:
indices.create:
index: test
body:
settings:
index.number_of_shards: 2
index.number_of_replicas: 1

- do:
indices.forcemerge:
index: test
primary_only: true
- match: { _shards.total: 2 }
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
wait_for_completion: true
task_id: $taskId
- match: { task.action: "indices:admin/forcemerge" }
- match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true]" }
- match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]" }

# .tasks index is created when the force-merge operation completes, so we should delete .tasks index finally,
# if not, the .tasks index may introduce unexpected warnings and then cause other test cases to fail.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,24 @@ public void testForceMergeUUIDConsistent() throws IOException {
assertThat(primaryForceMergeUUID, is(replicaForceMergeUUID));
}

public void testForceMergeOnlyOnPrimaryShards() throws IOException {
internalCluster().ensureAtLeastNumDataNodes(2);
final String index = "test-index";
createIndex(
index,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
);
ensureGreen(index);
final ForceMergeResponse forceMergeResponse = client().admin()
.indices()
.prepareForceMerge(index)
.setMaxNumSegments(1)
.setPrimaryOnly(true)
.get();
assertThat(forceMergeResponse.getFailedShards(), is(0));
assertThat(forceMergeResponse.getSuccessfulShards(), is(1));
}

private static String getForceMergeUUID(IndexShard indexShard) throws IOException {
try (GatedCloseable<IndexCommit> wrappedIndexCommit = indexShard.acquireLastIndexCommit(true)) {
return wrappedIndexCommit.get().getUserData().get(Engine.FORCE_MERGE_UUID_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.get.GetResponse;
Expand Down Expand Up @@ -400,6 +401,14 @@ public void testMultipleShards() throws Exception {
}

public void testReplicationAfterForceMerge() throws Exception {
performReplicationAfterForceMerge(false, SHARD_COUNT * (1 + REPLICA_COUNT));
}

public void testReplicationAfterForceMergeOnPrimaryShardsOnly() throws Exception {
sohami marked this conversation as resolved.
Show resolved Hide resolved
performReplicationAfterForceMerge(true, SHARD_COUNT);
}

private void performReplicationAfterForceMerge(boolean primaryOnly, int expectedSuccessfulShards) throws Exception {
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -430,8 +439,16 @@ public void testReplicationAfterForceMerge() throws Exception {
waitForDocs(expectedHitCount, indexer);
waitForSearchableDocs(expectedHitCount, nodeA, nodeB);

// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
// Perform force merge only on the primary shards.
final ForceMergeResponse forceMergeResponse = client().admin()
.indices()
.prepareForceMerge(INDEX_NAME)
.setPrimaryOnly(primaryOnly)
.setMaxNumSegments(1)
.setFlush(false)
.get();
assertThat(forceMergeResponse.getFailedShards(), is(0));
assertThat(forceMergeResponse.getSuccessfulShards(), is(expectedSuccessfulShards));
refresh(INDEX_NAME);
verifyStoreContent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ public static final class Defaults {
public static final int MAX_NUM_SEGMENTS = -1;
public static final boolean ONLY_EXPUNGE_DELETES = false;
public static final boolean FLUSH = true;
public static final boolean PRIMARY_ONLY = false;
}

private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS;
private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
private boolean flush = Defaults.FLUSH;
private boolean primaryOnly = Defaults.PRIMARY_ONLY;

private static final Version FORCE_MERGE_UUID_VERSION = Version.V_3_0_0;

Expand All @@ -100,6 +102,9 @@ public ForceMergeRequest(StreamInput in) throws IOException {
maxNumSegments = in.readInt();
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
primaryOnly = in.readBoolean();
}
if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
forceMergeUUID = in.readString();
} else if ((forceMergeUUID = in.readOptionalString()) == null) {
Expand Down Expand Up @@ -166,6 +171,21 @@ public ForceMergeRequest flush(boolean flush) {
return this;
}

/**
* Should force merge only performed on primary shards. Defaults to {@code false}.
*/
public boolean primaryOnly() {
return primaryOnly;
}

/**
* Should force merge only performed on primary shards. Defaults to {@code false}.
*/
public ForceMergeRequest primaryOnly(boolean primaryOnly) {
gaobinlong marked this conversation as resolved.
Show resolved Hide resolved
this.primaryOnly = primaryOnly;
return this;
}

/**
* Should this task store its result after it has finished?
*/
Expand All @@ -188,6 +208,8 @@ public String getDescription() {
+ onlyExpungeDeletes
+ "], flush["
+ flush
+ "], primaryOnly["
+ primaryOnly
+ "]";
}

Expand All @@ -197,6 +219,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeInt(maxNumSegments);
out.writeBoolean(onlyExpungeDeletes);
out.writeBoolean(flush);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(primaryOnly);
}
if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
out.writeString(forceMergeUUID);
} else {
Expand All @@ -213,6 +238,8 @@ public String toString() {
+ onlyExpungeDeletes
+ ", flush="
+ flush
+ ", primaryOnly="
+ primaryOnly
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,12 @@
request.flush(flush);
return this;
}

/**
* Should force merge only performed on primary shards. Defaults to {@code false}.
*/
public ForceMergeRequestBuilder setPrimaryOnly(boolean primaryOnly) {
request.primaryOnly(primaryOnly);
return this;

Check warning on line 90 in server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestBuilder.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestBuilder.java#L89-L90

Added lines #L89 - L90 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,16 @@
}

/**
* The refresh request works against *all* shards.
* The force merge request works against *all* shards by default, but it can work against all primary shards only
* by setting primary_only to true.
*/
@Override
protected ShardsIterator shards(ClusterState clusterState, ForceMergeRequest request, String[] concreteIndices) {
return clusterState.routingTable().allShards(concreteIndices);
if (request.primaryOnly()) {
return clusterState.routingTable().allShardsSatisfyingPredicate(concreteIndices, ShardRouting::primary);

Check warning on line 124 in server/src/main/java/org/opensearch/action/admin/indices/forcemerge/TransportForceMergeAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/forcemerge/TransportForceMergeAction.java#L124

Added line #L124 was not covered by tests
} else {
return clusterState.routingTable().allShards(concreteIndices);

Check warning on line 126 in server/src/main/java/org/opensearch/action/admin/indices/forcemerge/TransportForceMergeAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/forcemerge/TransportForceMergeAction.java#L126

Added line #L126 was not covered by tests
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,16 @@ public ShardsIterator allShardsSatisfyingPredicate(Predicate<ShardRouting> predi
return allShardsSatisfyingPredicate(indices, predicate, false);
}

/**
* All the shards for the provided indices on the node which match the predicate
* @param indices indices to return all the shards.
* @param predicate condition to match
* @return iterator over shards matching the predicate for the specific indices
*/
public ShardsIterator allShardsSatisfyingPredicate(String[] indices, Predicate<ShardRouting> predicate) {
return allShardsSatisfyingPredicate(indices, predicate, false);
}

private ShardsIterator allShardsSatisfyingPredicate(
String[] indices,
Predicate<ShardRouting> predicate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
mergeRequest.maxNumSegments(request.paramAsInt("max_num_segments", mergeRequest.maxNumSegments()));
mergeRequest.onlyExpungeDeletes(request.paramAsBoolean("only_expunge_deletes", mergeRequest.onlyExpungeDeletes()));
mergeRequest.flush(request.paramAsBoolean("flush", mergeRequest.flush()));
mergeRequest.primaryOnly(request.paramAsBoolean("primary_only", mergeRequest.primaryOnly()));
if (mergeRequest.onlyExpungeDeletes() && mergeRequest.maxNumSegments() != ForceMergeRequest.Defaults.MAX_NUM_SEGMENTS) {
deprecationLogger.deprecate(
"force_merge_expunge_deletes_and_max_num_segments_deprecation",
Expand Down
gaobinlong marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,137 @@

package org.opensearch.action.admin.indices.forcemerge;

import org.opensearch.Version;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;

public class ForceMergeRequestTests extends OpenSearchTestCase {

public void testDescription() {
ForceMergeRequest request = new ForceMergeRequest();
assertEquals("Force-merge indices [], maxSegments[-1], onlyExpungeDeletes[false], flush[true]", request.getDescription());
assertEquals(
"Force-merge indices [], maxSegments[-1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]",
request.getDescription()
);

request = new ForceMergeRequest("shop", "blog");
assertEquals("Force-merge indices [shop, blog], maxSegments[-1], onlyExpungeDeletes[false], flush[true]", request.getDescription());
assertEquals(
"Force-merge indices [shop, blog], maxSegments[-1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]",
request.getDescription()
);

request = new ForceMergeRequest();
request.maxNumSegments(12);
request.onlyExpungeDeletes(true);
request.flush(false);
assertEquals("Force-merge indices [], maxSegments[12], onlyExpungeDeletes[true], flush[false]", request.getDescription());
request.primaryOnly(true);
assertEquals(
"Force-merge indices [], maxSegments[12], onlyExpungeDeletes[true], flush[false], primaryOnly[true]",
request.getDescription()
);
}

public void testToString() {
ForceMergeRequest request = new ForceMergeRequest();
assertEquals("ForceMergeRequest{maxNumSegments=-1, onlyExpungeDeletes=false, flush=true, primaryOnly=false}", request.toString());

request = new ForceMergeRequest();
request.maxNumSegments(12);
request.onlyExpungeDeletes(true);
request.flush(false);
request.primaryOnly(true);
assertEquals("ForceMergeRequest{maxNumSegments=12, onlyExpungeDeletes=true, flush=false, primaryOnly=true}", request.toString());
}

public void testSerialization() throws Exception {
final ForceMergeRequest request = randomRequest();
try (BytesStreamOutput out = new BytesStreamOutput()) {
request.writeTo(out);

final ForceMergeRequest deserializedRequest;
try (StreamInput in = out.bytes().streamInput()) {
deserializedRequest = new ForceMergeRequest(in);
}
assertEquals(request.maxNumSegments(), deserializedRequest.maxNumSegments());
assertEquals(request.onlyExpungeDeletes(), deserializedRequest.onlyExpungeDeletes());
assertEquals(request.flush(), deserializedRequest.flush());
assertEquals(request.primaryOnly(), deserializedRequest.primaryOnly());
assertEquals(request.forceMergeUUID(), deserializedRequest.forceMergeUUID());
}
}

public void testBwcSerialization() throws Exception {
{
final ForceMergeRequest sample = randomRequest();
final Version compatibleVersion = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT);
gaobinlong marked this conversation as resolved.
Show resolved Hide resolved
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setVersion(compatibleVersion);
sample.writeTo(out);

final ForceMergeRequest deserializedRequest;
try (StreamInput in = out.bytes().streamInput()) {
in.setVersion(Version.CURRENT);
deserializedRequest = new ForceMergeRequest(in);
}

assertEquals(sample.maxNumSegments(), deserializedRequest.maxNumSegments());
assertEquals(sample.onlyExpungeDeletes(), deserializedRequest.onlyExpungeDeletes());
assertEquals(sample.flush(), deserializedRequest.flush());
if (compatibleVersion.onOrAfter(Version.V_3_0_0)) {
assertEquals(sample.primaryOnly(), deserializedRequest.primaryOnly());
assertEquals(sample.forceMergeUUID(), deserializedRequest.forceMergeUUID());
}
}
}

{
final ForceMergeRequest sample = randomRequest();
final Version compatibleVersion = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT);
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setVersion(Version.CURRENT);
sample.getParentTask().writeTo(out);
out.writeStringArray(sample.indices());
sample.indicesOptions().writeIndicesOptions(out);
out.writeInt(sample.maxNumSegments());
out.writeBoolean(sample.onlyExpungeDeletes());
out.writeBoolean(sample.flush());
if (compatibleVersion.onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(sample.primaryOnly());
}
if (compatibleVersion.onOrAfter(Version.V_3_0_0)) {
out.writeString(sample.forceMergeUUID());
} else {
out.writeOptionalString(sample.forceMergeUUID());
}

final ForceMergeRequest deserializedRequest;
try (StreamInput in = out.bytes().streamInput()) {
in.setVersion(compatibleVersion);
deserializedRequest = new ForceMergeRequest(in);
}

assertEquals(sample.maxNumSegments(), deserializedRequest.maxNumSegments());
assertEquals(sample.onlyExpungeDeletes(), deserializedRequest.onlyExpungeDeletes());
assertEquals(sample.flush(), deserializedRequest.flush());
if (compatibleVersion.onOrAfter(Version.V_3_0_0)) {
assertEquals(sample.primaryOnly(), deserializedRequest.primaryOnly());
}
assertEquals(sample.forceMergeUUID(), deserializedRequest.forceMergeUUID());

}
}
}

private ForceMergeRequest randomRequest() {
ForceMergeRequest request = new ForceMergeRequest();
if (randomBoolean()) {
request.maxNumSegments(randomIntBetween(1, 10));
}
request.onlyExpungeDeletes(true);
request.flush(randomBoolean());
request.primaryOnly(randomBoolean());
return request;
}
}
Loading
Loading