Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs #3948

Merged
merged 3 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Plugin ZIP publication groupId value is configurable ([#4156](https://github.com/opensearch-project/OpenSearch/pull/4156))
- Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253))
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402))
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))

### Deprecated

Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2357,6 +2357,17 @@ public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo,
return getEngine().newChangesSnapshot(reason, startingSeqNo, endSeqNo, true, accurateCount);
}

/**
* Creates a new history snapshot from the translog instead of the lucene index. Required for cross cluster replication.
* Use the recommended {@link #getHistoryOperations(String, long, long, boolean)} method for other cases.
* This method should only be invoked if Segment Replication or Remote Store is not enabled.
*/
public Translog.Snapshot getHistoryOperationsFromTranslog(long startingSeqNo, long endSeqNo) throws IOException {
assert (indexSettings.isSegRepEnabled() || indexSettings.isRemoteStoreEnabled()) == false
: "unsupported operation for segment replication enabled indices or remote store backed indices";
return getEngine().translogManager().newChangesSnapshot(startingSeqNo, endSeqNo, true);
}

/**
* Checks if we have a completed history of operations since the given starting seqno (inclusive).
* This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ public void rollTranslogGeneration() throws TranslogException {
}
}

@Override
public Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
return translog.newSnapshot(fromSeqNo, toSeqNo, requiredFullRange);
}

/**
* Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive).
* This operation will close the engine if the recovery fails.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,9 @@ public Translog.Operation readOperation(Translog.Location location) throws IOExc
public Translog.Location add(Translog.Operation operation) throws IOException {
return new Translog.Location(0, 0, 0);
}

@Override
public Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
throw new UnsupportedOperationException("Translog snapshot unsupported with no-op translogs");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public interface TranslogManager {
*/
int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo) throws IOException;

/**
* Creates a new history snapshot from the translog file instead of the lucene index.
ankitkala marked this conversation as resolved.
Show resolved Hide resolved
*/
Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException;

/**
* Checks if the underlying storage sync is required.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ public int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, lo
public void skipTranslogRecovery() {
// Do nothing.
}

@Override
public Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
throw new UnsupportedOperationException("Translog snapshot unsupported with no-op translogs");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,66 @@ public void testAcquireReplicaPermitAdvanceMaxSeqNoOfUpdates() throws Exception
closeShards(replica);
}

public void testGetChangesSnapshotThrowsAssertForSegRep() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 0);
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
shardId,
randomAlphaOfLength(8),
true,
ShardRoutingState.INITIALIZING,
RecoverySource.EmptyStoreRecoverySource.INSTANCE
);
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT.toString())
.build();
final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1);
final AtomicBoolean synced = new AtomicBoolean();
final IndexShard primaryShard = newShard(
shardRouting,
indexMetadata.build(),
null,
new InternalEngineFactory(),
() -> synced.set(true),
RetentionLeaseSyncer.EMPTY,
null
);
expectThrows(AssertionError.class, () -> primaryShard.getHistoryOperationsFromTranslog(0, 1));
closeShard(primaryShard, false);
}

public void testGetChangesSnapshotThrowsAssertForRemoteStore() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 0);
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
shardId,
randomAlphaOfLength(8),
true,
ShardRoutingState.INITIALIZING,
RecoverySource.EmptyStoreRecoverySource.INSTANCE
);
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.build();
final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1);
final AtomicBoolean synced = new AtomicBoolean();
final IndexShard primaryShard = newShard(
shardRouting,
indexMetadata.build(),
null,
new InternalEngineFactory(),
() -> synced.set(true),
RetentionLeaseSyncer.EMPTY,
null
);
expectThrows(AssertionError.class, () -> primaryShard.getHistoryOperationsFromTranslog(0, 1));
closeShard(primaryShard, false);
}

public void testGlobalCheckpointSync() throws IOException {
// create the primary shard with a callback that sets a boolean when the global checkpoint sync is invoked
final ShardId shardId = new ShardId("index", "_na_", 0);
Expand Down