Skip to content

Commit

Permalink
Revert "SegRep with Remote: Add hook for publishing checkpoint notifi…
Browse files Browse the repository at this point in the history
…cations after segment upload to remote store (#7394) (#7718)" (#7839)

This reverts commit 92571b7.

Signed-off-by: Ankit Kala <[email protected]>
  • Loading branch information
ankitkala authored May 31, 2023
1 parent 813b088 commit 3ca1c58
Show file tree
Hide file tree
Showing 6 changed files with 4 additions and 34 deletions.
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support to clear filecache using clear indices cache API ([#7498](https://github.com/opensearch-project/OpenSearch/pull/7498))
- Create NamedRoute to map extension routes to a shortened name ([#6870](https://github.com/opensearch-project/OpenSearch/pull/6870))
- Added @dbwiddis as on OpenSearch maintainer ([#7665](https://github.com/opensearch-project/OpenSearch/pull/7665))
- SegRep with Remote: Add hook for publishing checkpoint notifications after segment upload to remote store ([#7394](https://github.com/opensearch-project/OpenSearch/pull/7394))
- [Extensions] Add ExtensionAwarePlugin extension point to add custom settings for extensions ([#7526](https://github.com/opensearch-project/OpenSearch/pull/7526))
- Add new cluster setting to set default index replication type ([#7420](https://github.com/opensearch-project/OpenSearch/pull/7420))

Expand Down
8 changes: 0 additions & 8 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -1030,14 +1030,6 @@ public boolean isSegRepEnabled() {
return ReplicationType.SEGMENT.equals(replicationType);
}

public boolean isSegRepLocalEnabled() {
return isSegRepEnabled() && !isSegRepWithRemoteEnabled();
}

public boolean isSegRepWithRemoteEnabled() {
return isSegRepEnabled() && isRemoteStoreEnabled() && FeatureFlags.isEnabled(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL);
}

/**
* Returns if remote store is enabled for this index.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ public void beforeRefresh() throws IOException {

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh
&& shard.state() == IndexShardState.STARTED
&& shard.getReplicationTracker().isPrimaryMode()
&& !shard.indexSettings.isSegRepWithRemoteEnabled()) {
if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode()) {
publisher.publish(shard, shard.getLatestReplicationCheckpoint());
}
}
Expand Down
10 changes: 2 additions & 8 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -3552,16 +3552,10 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (isRemoteStoreEnabled()) {
internalRefreshListener.add(
new RemoteStoreRefreshListener(
this,
// Add the checkpoint publisher if the Segment Replciation via remote store is enabled.
indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY,
remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId())
)
new RemoteStoreRefreshListener(this, remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId()))
);
}

if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -108,15 +107,9 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres
*/
private final Map<String, Long> latestFileNameSizeOnLocalMap = ConcurrentCollections.newConcurrentMap();

private final SegmentReplicationCheckpointPublisher checkpointPublisher;

private final FileUploader fileUploader;

public RemoteStoreRefreshListener(
IndexShard indexShard,
SegmentReplicationCheckpointPublisher checkpointPublisher,
RemoteRefreshSegmentTracker segmentTracker
) {
public RemoteStoreRefreshListener(IndexShard indexShard, RemoteRefreshSegmentTracker segmentTracker) {
this.indexShard = indexShard;
this.storeDirectory = indexShard.store().directory();
this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
Expand All @@ -132,7 +125,6 @@ public RemoteStoreRefreshListener(
}
this.segmentTracker = segmentTracker;
resetBackOffDelayIterator();
this.checkpointPublisher = checkpointPublisher;
this.fileUploader = new FileUploader(new UploadTracker() {
@Override
public void beforeUpload(String file) {
Expand Down Expand Up @@ -245,7 +237,6 @@ private synchronized void syncSegments(boolean isRetry) {
clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh);
onSuccessfulSegmentsSync(refreshTimeMs, refreshSeqNo);
((InternalEngine) indexShard.getEngine()).translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1);
checkpointPublisher.publish(indexShard, checkpoint);
// At this point since we have uploaded new segments, segment infos and segment metadata file,
// along with marking minSeqNoToKeep, upload has succeeded completely.
shouldRetry = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -71,7 +70,6 @@ public void setup(boolean primary, int numberOfDocs) throws IOException {
remoteRefreshSegmentPressureService.afterIndexShardCreated(indexShard);
remoteStoreRefreshListener = new RemoteStoreRefreshListener(
indexShard,
SegmentReplicationCheckpointPublisher.EMPTY,
remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId())
);
}
Expand Down Expand Up @@ -415,7 +413,6 @@ private Tuple<RemoteStoreRefreshListener, RemoteRefreshSegmentPressureService> m
remoteRefreshSegmentPressureService.afterIndexShardCreated(shard);
RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(
shard,
SegmentReplicationCheckpointPublisher.EMPTY,
remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId())
);
refreshListener.afterRefresh(true);
Expand Down

0 comments on commit 3ca1c58

Please sign in to comment.