Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Bhumika Saini committed Mar 26, 2024
1 parent 3907ec9 commit 59cd374
Show file tree
Hide file tree
Showing 14 changed files with 197 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotemigration;

import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.hamcrest.OpenSearchAssertions;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemotePrimaryLocalRecoveryIT extends MigrationBaseTestCase {
public void testLocalRecoveryRollingRestart() throws Exception {
String docRepNode = internalCluster().startNode();
Client client = internalCluster().client(docRepNode);

// create shard with 0 replica and 1 shard
client().admin().indices().prepareCreate("idx1").setSettings(indexSettings()).setMapping("field", "type=text").get();
ensureGreen("idx1");

AtomicInteger numAutoGenDocs = new AtomicInteger();
final AtomicBoolean finished = new AtomicBoolean(false);
Thread indexingThread = getIndexingThread(finished, numAutoGenDocs);
refresh("idx1");

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// add remote node in mixed mode cluster
addRemote = true;
String remoteNode = internalCluster().startNode();
internalCluster().validateClusterFormed();

updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// rolling restart
internalCluster().rollingRestart(new InternalTestCluster.RestartCallback());
ensureStableCluster(2);
ensureGreen("idx1");
assertEquals(internalCluster().size(), 2);

// Index some more docs
int currentDoc = numAutoGenDocs.get();
int finalCurrentDoc = currentDoc;
waitUntil(() -> numAutoGenDocs.get() > finalCurrentDoc + 5);

logger.info("--> relocating from {} to {} ", docRepNode, remoteNode);
client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("idx1", 0, docRepNode, remoteNode)).execute().actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();
assertEquals(0, clusterHealthResponse.getRelocatingShards());
assertEquals(remoteNode, primaryNodeName("idx1"));

OpenSearchAssertions.assertHitCount(client().prepareSearch("idx1").setTrackTotalHits(true).get(), numAutoGenDocs.get());
OpenSearchAssertions.assertHitCount(
client().prepareSearch("idx1").setTrackTotalHits(true).setQuery(QueryBuilders.termQuery("auto", true)).get(),
numAutoGenDocs.get()
);
}

private static Thread getIndexingThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) {
Thread indexingThread = new Thread(() -> {
while (finished.get() == false && numAutoGenDocs.get() < 10_000) {
IndexResponse indexResponse = client().prepareIndex("idx1").setId("id").setSource("field", "value").get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
DeleteResponse deleteResponse = client().prepareDelete("idx1", "id").get();
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
client().prepareIndex("idx1").setSource("auto", true).get();
numAutoGenDocs.incrementAndGet();
}
});
indexingThread.start();
return indexingThread;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.common.blobstore;

import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -44,6 +45,7 @@
*
* @opensearch.internal
*/
@PublicApi(since = "2.14.0")
public class BlobPath implements Iterable<String> {

private static final String SEPARATOR = "/";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.index.remote;

import org.opensearch.common.annotation.PublicApi;

import java.util.Set;

import static org.opensearch.index.remote.RemoteStoreDataEnums.DataType.DATA;
Expand All @@ -23,6 +25,7 @@ public class RemoteStoreDataEnums {
/**
* Categories of the data in Remote store.
*/
@PublicApi(since = "2.14.0")
public enum DataCategory {
SEGMENTS("segments", Set.of(DataType.values())),
TRANSLOG("translog", Set.of(DATA, METADATA));
Expand All @@ -47,6 +50,7 @@ public String getName() {
/**
* Types of data in remote store.
*/
@PublicApi(since = "2.14.0")
public enum DataType {
DATA("data"),
METADATA("metadata"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.remote;

import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.index.remote.RemoteStoreDataEnums.DataCategory;
import org.opensearch.index.remote.RemoteStoreDataEnums.DataType;
Expand All @@ -20,6 +21,7 @@
*
* @opensearch.internal
*/
@PublicApi(since = "2.14.0")
public enum RemoteStorePathType {

FIXED {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2522,7 +2522,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
syncSegmentsFromRemoteSegmentStore(false);
}
if (shardRouting.primary()) {
if (syncFromRemote) {
if (syncFromRemote || this.isRemoteSeeded()) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
} else {
// we will enter this block when we do not want to recover from remote translog.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ public class RemoteBlobStoreInternalTranslogFactory implements TranslogFactory {

private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;

private final boolean shouldSeedRemote;

public RemoteBlobStoreInternalTranslogFactory(
Supplier<RepositoriesService> repositoriesServiceSupplier,
ThreadPool threadPool,
String repositoryName,
RemoteTranslogTransferTracker remoteTranslogTransferTracker
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
boolean shouldSeedRemote
) {
Repository repository;
try {
Expand All @@ -49,6 +52,7 @@ public RemoteBlobStoreInternalTranslogFactory(
this.repository = repository;
this.threadPool = threadPool;
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
this.shouldSeedRemote = shouldSeedRemote;
}

@Override
Expand All @@ -74,7 +78,8 @@ public Translog newTranslog(
blobStoreRepository,
threadPool,
startedPrimarySupplier,
remoteTranslogTransferTracker
remoteTranslogTransferTracker,
shouldSeedRemote
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public class RemoteFsTranslog extends Translog {
private final Semaphore syncPermit = new Semaphore(SYNC_PERMIT);
private final AtomicBoolean pauseSync = new AtomicBoolean(false);

private final boolean shouldSeedRemote;

public RemoteFsTranslog(
TranslogConfig config,
String translogUUID,
Expand All @@ -100,7 +102,8 @@ public RemoteFsTranslog(
BlobStoreRepository blobStoreRepository,
ThreadPool threadPool,
BooleanSupplier startedPrimarySupplier,
RemoteTranslogTransferTracker remoteTranslogTransferTracker
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
boolean shouldSeedRemote
) throws IOException {
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
logger = Loggers.getLogger(getClass(), shardId);
Expand All @@ -115,33 +118,38 @@ public RemoteFsTranslog(
remoteTranslogTransferTracker,
indexSettings().getRemoteStorePathType()
);
this.shouldSeedRemote = shouldSeedRemote;
try {
download(translogTransferManager, location, logger);
Checkpoint checkpoint = readCheckpoint(location);
logger.info("Downloaded data from remote translog till maxSeqNo = {}", checkpoint.maxSeqNo);
this.readers.addAll(recoverFromFiles(checkpoint));
if (readers.isEmpty()) {
String errorMsg = String.format(Locale.ROOT, "%s at least one reader must be recovered", shardId);
logger.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
boolean success = false;
current = null;
try {
current = createWriter(
checkpoint.generation + 1,
getMinFileGeneration(),
checkpoint.globalCheckpoint,
persistedSequenceNumberConsumer
);
success = true;
} finally {
// we have to close all the recovered ones otherwise we leak file handles here
// for instance if we have a lot of tlog and we can't create the writer we keep
// on holding
// on to all the uncommitted tlog files if we don't close
if (success == false) {
IOUtils.closeWhileHandlingException(readers);
if (shouldSeedRemote) {
sync();
} else {
download(translogTransferManager, location, logger);
Checkpoint checkpoint = readCheckpoint(location);
logger.info("Downloaded data from remote translog till maxSeqNo = {}", checkpoint.maxSeqNo);
this.readers.addAll(recoverFromFiles(checkpoint));
if (readers.isEmpty()) {
String errorMsg = String.format(Locale.ROOT, "%s at least one reader must be recovered", shardId);
logger.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
boolean success = false;
current = null;
try {
current = createWriter(
checkpoint.generation + 1,
getMinFileGeneration(),
checkpoint.globalCheckpoint,
persistedSequenceNumberConsumer
);
success = true;
} finally {
// we have to close all the recovered ones otherwise we leak file handles here
// for instance if we have a lot of tlog and we can't create the writer we keep
// on holding
// on to all the uncommitted tlog files if we don't close
if (success == false) {
IOUtils.closeWhileHandlingException(readers);
}
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -386,7 +394,7 @@ private boolean syncToDisk() throws IOException {

@Override
public void sync() throws IOException {
if (syncToDisk() || syncNeeded()) {
if (syncToDisk() || syncNeeded() || shouldSeedRemote) {
prepareAndUpload(primaryTermSupplier.getAsLong(), null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ TranslogWriter createWriter(
tragedy,
persistedSequenceNumberConsumer,
bigArrays,
indexSettings.isRemoteTranslogStoreEnabled(),
indexSettings.isRemoteNode()
);
} catch (final IOException e) {
Expand Down Expand Up @@ -2043,6 +2044,7 @@ public static String createEmptyTranslog(
throw new UnsupportedOperationException();
},
BigArrays.NON_RECYCLING_INSTANCE,
null,
null
);
writer.close();
Expand Down
Loading

0 comments on commit 59cd374

Please sign in to comment.