Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Add capability of doing refresh as determined by the translog #12992

Merged
merged 18 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656))
- [Remote Store] Make translog transfer timeout configurable ([#12704](https://github.com/opensearch-project/OpenSearch/pull/12704))
- Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686))
- [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992))
astute-decipher marked this conversation as resolved.
Show resolved Hide resolved
- Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.refresh;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RefreshRemoteTranslogFilesIT extends RemoteStoreBaseIntegTestCase {
astute-decipher marked this conversation as resolved.
Show resolved Hide resolved

protected final String INDEX_NAME = "remote-store-test-idx-1";

public void testRefreshOnTooManyRemoteTranslogFiles() throws Exception {

internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(1).get(0);
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1));
ensureGreen(INDEX_NAME);

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "5")
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// indexing 35 documents (7 bulk requests), which should trigger refresh, and hence number of documents(searchable) should be 35.
// Here refresh will be triggered on 6th and 7th bulk request. One extra since translogs will be marked
// unreferenced after 6th refresh completes and will be trimmed on 7th bulk request call.
for (int i = 0; i < 7; i++) {
indexBulk(INDEX_NAME, 5);
}

// refresh will not trigger here, hence total searchable documents will be 35 (not 40)
indexBulk(INDEX_NAME, 5);

long currentDocCount = client().prepareSearch(INDEX_NAME).setSize(0).get().getHits().getTotalHits().value;
assertEquals(35, currentDocCount);
astute-decipher marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,8 @@ public void apply(Settings value, Settings current, Settings previous) {

RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS
)
)
);
Expand Down
37 changes: 37 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4484,9 +4484,32 @@
// we can not protect with a lock since we "release" on a different thread
private final AtomicBoolean flushOrRollRunning = new AtomicBoolean();

/**
* Checks if the shard need to be refreshed depending on translog constraints.
* each translog type can have it's own decider
* @return {@code true} if the shard should be refreshed
*/
public boolean shouldRefreshShard() {
final Engine engine = getEngineOrNull();
if (engine != null) {
try {
return engine.translogManager().shouldRefreshShard();
} catch (final AlreadyClosedException e) {

Check warning on line 4497 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L4497

Added line #L4497 was not covered by tests
astute-decipher marked this conversation as resolved.
Show resolved Hide resolved
// we are already closed, no need to Refresh
}
}
return false;

Check warning on line 4501 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L4501

Added line #L4501 was not covered by tests
}

private void maybeRefreshShard(String source) {
verifyNotClosed();
getEngine().maybeRefresh(source);
}

/**
* Schedules a flush or translog generation roll if needed but will not schedule more than one concurrently. The operation will be
* executed asynchronously on the flush thread pool.
* Also schedules a refresh if required, decided by translog manager
*/
public void afterWriteOperation() {
if (shouldPeriodicallyFlush() || shouldRollTranslogGeneration()) {
Expand Down Expand Up @@ -4548,6 +4571,20 @@
flushOrRollRunning.compareAndSet(true, false);
}
}
} else if (shouldRefreshShard()) {
logger.debug("submitting async Refresh request");
final AbstractRunnable refreshRunnable = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("refresh failed after translog manager decided to refresh the shard", e);
}

Check warning on line 4580 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L4579-L4580

Added lines #L4579 - L4580 were not covered by tests

@Override
protected void doRun() throws Exception {
maybeRefreshShard("Translog manager decided to refresh the shard");
}
};
threadPool.executor(ThreadPool.Names.REFRESH).execute(refreshRunnable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,4 +469,9 @@ public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long fl
public void close() throws IOException {
IOUtils.closeWhileHandlingException(translog);
}

@Override
public boolean shouldRefreshShard() {
return getTranslog(true).shouldRefreshShard();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,9 @@ public Releasable drainSync() {
public Translog.TranslogGeneration getTranslogGeneration() {
return null;
}

@Override
public boolean shouldRefreshShard() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -652,4 +652,15 @@ public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommi
int availablePermits() {
return syncPermit.availablePermits();
}

/**
* Checks whether or not the shard should be refreshed.
* This checks if number of translog files breaches the threshold count determined by
* {@code cluster.remote_store.translog.max_readers} setting
* @return {@code true} if the shard should be refreshed
*/
@Override
public boolean shouldRefreshShard() {
return readers.size() >= translogTransferManager.getMaxRemoteTranslogReadersSettings();
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2052,4 +2052,13 @@ public static String createEmptyTranslog(
public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) {
return minUnrefCheckpointInLastCommit;
}

/**
* Checks whether or not the shard should be refreshed.
* each translog type can have it's own decider
* @return {@code true} if the shard should be refreshed
*/
public boolean shouldRefreshShard() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,6 @@ public interface TranslogManager {
Releasable drainSync();

Translog.TranslogGeneration getTranslogGeneration();

boolean shouldRefreshShard();
}
Original file line number Diff line number Diff line change
Expand Up @@ -585,4 +585,8 @@ public void onFailure(Exception e) {
throw e;
}
}

public int getMaxRemoteTranslogReadersSettings() {
return this.remoteStoreSettings.getMaxRemoteTranslogReaders();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,21 @@ public class RemoteStoreSettings {
Property.Dynamic
);

/**
* Controls the maximum referenced remote translog files. If breached the shard will be Refreshed.
astute-decipher marked this conversation as resolved.
Show resolved Hide resolved
*/
public static final Setting<Integer> CLUSTER_REMOTE_MAX_TRANSLOG_READERS = Setting.intSetting(
"cluster.remote_store.translog.max_readers",
300,
1,
astute-decipher marked this conversation as resolved.
Show resolved Hide resolved
Property.Dynamic,
Property.NodeScope
);

private volatile TimeValue clusterRemoteTranslogBufferInterval;
private volatile int minRemoteSegmentMetadataFiles;
private volatile TimeValue clusterRemoteTranslogTransferTimeout;
private volatile int maxRemoteTranslogReaders;

public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
Expand All @@ -87,6 +99,9 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
this::setClusterRemoteTranslogTransferTimeout
);

maxRemoteTranslogReaders = CLUSTER_REMOTE_MAX_TRANSLOG_READERS.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_MAX_TRANSLOG_READERS, this::setMaxRemoteTranslogReaders);
}

public TimeValue getClusterRemoteTranslogBufferInterval() {
Expand All @@ -112,4 +127,12 @@ public TimeValue getClusterRemoteTranslogTransferTimeout() {
private void setClusterRemoteTranslogTransferTimeout(TimeValue clusterRemoteTranslogTransferTimeout) {
this.clusterRemoteTranslogTransferTimeout = clusterRemoteTranslogTransferTimeout;
}

public int getMaxRemoteTranslogReaders() {
return maxRemoteTranslogReaders;
}

private void setMaxRemoteTranslogReaders(int maxRemoteTranslogReaders) {
this.maxRemoteTranslogReaders = maxRemoteTranslogReaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.index.translog.listener.TranslogEventListener;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
Expand Down Expand Up @@ -4957,4 +4958,39 @@ private static void assertRemoteSegmentStats(
assertTrue(remoteSegmentStats.getTotalRejections() > 0);
assertEquals(remoteSegmentTransferTracker.getRejectionCount(), remoteSegmentStats.getTotalRejections());
}

public void testShouldRefreshOnTooManyRemoteTranslogFiles() throws Exception {
gbbafna marked this conversation as resolved.
Show resolved Hide resolved

Settings primarySettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, "seg-test")
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "txlog-test")
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1)
.build();

final IndexShard primaryShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory());
RemoteStoreSettings remoteStoreSettings = primaryShard.getRemoteStoreSettings();
final long numDocs = remoteStoreSettings.getMaxRemoteTranslogReaders();

assertFalse(primaryShard.shouldRefreshShard());

for (long i = 0; i < numDocs; i++) {
indexDoc(primaryShard, "_doc", Long.toString(i), "{}");
}

assertTrue(primaryShard.shouldRefreshShard());
assertBusy(() -> {
primaryShard.afterWriteOperation();
try (Engine.Searcher searcher = primaryShard.acquireSearcher("test")) {
assertEquals(numDocs, searcher.getIndexReader().numDocs());
}
});

closeShards(primaryShard);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,24 @@ public void testClusterRemoteTranslogTransferTimeout() {
);
assertEquals(TimeValue.timeValueSeconds(40), remoteStoreSettings.getClusterRemoteTranslogTransferTimeout());
}

public void testMaxRemoteReferencedTranslogFiles() {
// Test default value
assertEquals(300, remoteStoreSettings.getMaxRemoteTranslogReaders());

// Test override with valid value
clusterSettings.applySettings(
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "100").build()
);
assertEquals(100, remoteStoreSettings.getMaxRemoteTranslogReaders());

// Test override with value less than minimum
assertThrows(
IllegalArgumentException.class,
() -> clusterSettings.applySettings(
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "0").build()
)
);
assertEquals(100, remoteStoreSettings.getMaxRemoteTranslogReaders());
}
}
Loading