Skip to content

Commit

Permalink
Changed logic to do flush instead of refresh
Browse files Browse the repository at this point in the history
Signed-off-by: Shubh Sahu <[email protected]>
  • Loading branch information
Shubh Sahu committed Apr 15, 2024
1 parent d1ffa62 commit d0c2dd2
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -790,32 +790,34 @@ public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionExcep
);
}

public void testRefreshOnTooManyRemoteTranslogFiles() throws Exception {

public void testFlushOnTooManyRemoteTranslogFiles() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(1).get(0);
String datanode = internalCluster().startDataOnlyNodes(1).get(0);
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1));
ensureGreen(INDEX_NAME);

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "5")
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "100")
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// indexing 35 documents (7 bulk requests), which should trigger refresh, and hence number of documents(searchable) should be 35.
// Here refresh will be triggered on 6th and 7th bulk request. One extra since translogs will be marked
// unreferenced after 6th refresh completes and will be trimmed on 7th bulk request call.
for (int i = 0; i < 7; i++) {
indexBulk(INDEX_NAME, 5);
IndexShard indexShard = getIndexShard(datanode, INDEX_NAME);

assertFalse(indexShard.shouldPeriodicallyFlush());
assertEquals(0, indexShard.getNumberofTranslogReaders());

// indexing 100 documents (100 bulk requests), no flush will be triggered yet
for (int i = 0; i < 100; i++) {
indexBulk(INDEX_NAME, 1);
}

assertBusy(() -> assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), 35), 30, TimeUnit.SECONDS);
assertEquals(100, indexShard.getNumberofTranslogReaders());

// refresh will not trigger here, hence total searchable documents will be 35 (not 40)
indexBulk(INDEX_NAME, 5);
// Will flush and trim the translog readers
indexBulk(INDEX_NAME, 1);

long currentDocCount = client().prepareSearch(INDEX_NAME).setSize(0).get().getHits().getTotalHits().value;
assertEquals(35, currentDocCount);
assertBusy(() -> assertEquals(0, indexShard.getNumberofTranslogReaders()), 30, TimeUnit.SECONDS);
assertFalse(indexShard.shouldPeriodicallyFlush());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1875,7 +1875,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
logger.trace("finished commit for flush");

// a temporary debugging to investigate test failure - issue#32827. Remove when the issue is resolved
logger.debug(
logger.info(
"new commit on flush, hasUncommittedChanges:{}, force:{}, shouldPeriodicallyFlush:{}",
hasUncommittedChanges,
force,
Expand Down
35 changes: 6 additions & 29 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2911,7 +2911,7 @@ public void restoreFromRepository(Repository repository, ActionListener<Boolean>
*
* @return {@code true} if the engine should be flushed
*/
boolean shouldPeriodicallyFlush() {
public boolean shouldPeriodicallyFlush() {
final Engine engine = getEngineOrNull();
if (engine != null) {
try {
Expand Down Expand Up @@ -4484,26 +4484,17 @@ public Durability getTranslogDurability() {
// we can not protect with a lock since we "release" on a different thread
private final AtomicBoolean flushOrRollRunning = new AtomicBoolean();

/**
* Checks if the shard need to be refreshed depending on translog constraints.
* each translog type can have it's own decider
* @return {@code true} if the shard should be refreshed
*/
public boolean shouldRefreshShard() {
// For testing purpose
public int getNumberofTranslogReaders() {
final Engine engine = getEngineOrNull();
if (engine != null) {
try {
return engine.translogManager().shouldRefreshShard();
return engine.translogManager().getNumberofTranslogReaders();
} catch (final AlreadyClosedException e) {
// we are already closed, no need to Refresh
// we are already closed
}
}
return false;
}

private void maybeRefreshShard(String source) {
verifyNotClosed();
getEngine().maybeRefresh(source);
return -1;
}

/**
Expand Down Expand Up @@ -4571,20 +4562,6 @@ public void onAfter() {
flushOrRollRunning.compareAndSet(true, false);
}
}
} else if (shouldRefreshShard()) {
logger.debug("submitting async Refresh request");
final AbstractRunnable refreshRunnable = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("refresh failed after translog manager decided to refresh the shard", e);
}

@Override
protected void doRun() throws Exception {
maybeRefreshShard("Translog manager decided to refresh the shard");
}
};
threadPool.executor(ThreadPool.Names.REFRESH).execute(refreshRunnable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,13 @@ public String getTranslogUUID() {
* @return if the translog should be flushed
*/
public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold) {
/*
* This triggers flush if number of translog files have breached a threshold.
* each translog type can have it's own decider.
*/
if (translog.shouldFlushOnMaxTranslogFiles()) {
return true;
}
// This is the minimum seqNo that is referred in translog and considered for calculating translog size
long minTranslogRefSeqNo = translog.getMinUnreferencedSeqNoInSegments(localCheckpointOfLastCommit + 1);
final long minReferencedTranslogGeneration = translog.getMinGenerationForSeqNo(minTranslogRefSeqNo).translogFileGeneration;
Expand Down Expand Up @@ -470,8 +477,11 @@ public void close() throws IOException {
IOUtils.closeWhileHandlingException(translog);
}

@Override
public boolean shouldRefreshShard() {
return getTranslog(true).shouldRefreshShard();
/**
* Retrieves the number of translog readers
* @return number of translog readers
*/
public int getNumberofTranslogReaders() {
return translog.getNumberofTranslogReaders();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public Translog.TranslogGeneration getTranslogGeneration() {
}

@Override
public boolean shouldRefreshShard() {
return false;
public int getNumberofTranslogReaders() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -654,13 +654,13 @@ int availablePermits() {
}

/**
* Checks whether or not the shard should be refreshed.
* Checks whether or not the shard should be flushed based on translog files.
* This checks if number of translog files breaches the threshold count determined by
* {@code cluster.remote_store.translog.max_readers} setting
* @return {@code true} if the shard should be refreshed
* @return {@code true} if the shard should be flushed
*/
@Override
public boolean shouldRefreshShard() {
public boolean shouldFlushOnMaxTranslogFiles() {
return readers.size() >= translogTransferManager.getMaxRemoteTranslogReadersSettings();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2054,11 +2054,15 @@ public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommi
}

/**
* Checks whether or not the shard should be refreshed.
* Checks whether or not the shard should be flushed based on translog files.
* each translog type can have it's own decider
* @return {@code true} if the shard should be refreshed
*/
public boolean shouldRefreshShard() {
public boolean shouldFlushOnMaxTranslogFiles() {
return false;
}

public int getNumberofTranslogReaders() {
return readers.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,5 +143,5 @@ public interface TranslogManager {

Translog.TranslogGeneration getTranslogGeneration();

boolean shouldRefreshShard();
int getNumberofTranslogReaders();
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public class RemoteStoreSettings {
*/
public static final Setting<Integer> CLUSTER_REMOTE_MAX_TRANSLOG_READERS = Setting.intSetting(
"cluster.remote_store.translog.max_readers",
300,
1,
1000,
100,
Property.Dynamic,
Property.NodeScope
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.index.translog.listener.TranslogEventListener;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
Expand Down Expand Up @@ -4958,39 +4957,4 @@ private static void assertRemoteSegmentStats(
assertTrue(remoteSegmentStats.getTotalRejections() > 0);
assertEquals(remoteSegmentTransferTracker.getRejectionCount(), remoteSegmentStats.getTotalRejections());
}

public void testShouldRefreshOnTooManyRemoteTranslogFiles() throws Exception {

Settings primarySettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, "seg-test")
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "txlog-test")
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1)
.build();

final IndexShard primaryShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory());
RemoteStoreSettings remoteStoreSettings = primaryShard.getRemoteStoreSettings();
final long numDocs = remoteStoreSettings.getMaxRemoteTranslogReaders();

assertFalse(primaryShard.shouldRefreshShard());

for (long i = 0; i < numDocs; i++) {
indexDoc(primaryShard, "_doc", Long.toString(i), "{}");
}

assertTrue(primaryShard.shouldRefreshShard());
assertBusy(() -> {
primaryShard.afterWriteOperation();
try (Engine.Searcher searcher = primaryShard.acquireSearcher("test")) {
assertEquals(numDocs, searcher.getIndexReader().numDocs());
}
});

closeShards(primaryShard);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,21 @@ public void testClusterRemoteTranslogTransferTimeout() {

public void testMaxRemoteReferencedTranslogFiles() {
// Test default value
assertEquals(300, remoteStoreSettings.getMaxRemoteTranslogReaders());
assertEquals(1000, remoteStoreSettings.getMaxRemoteTranslogReaders());

// Test override with valid value
clusterSettings.applySettings(
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "100").build()
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "500").build()
);
assertEquals(100, remoteStoreSettings.getMaxRemoteTranslogReaders());
assertEquals(500, remoteStoreSettings.getMaxRemoteTranslogReaders());

// Test override with value less than minimum
assertThrows(
IllegalArgumentException.class,
() -> clusterSettings.applySettings(
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "0").build()
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "99").build()
)
);
assertEquals(100, remoteStoreSettings.getMaxRemoteTranslogReaders());
assertEquals(500, remoteStoreSettings.getMaxRemoteTranslogReaders());
}
}

0 comments on commit d0c2dd2

Please sign in to comment.