Skip to content

Commit

Permalink
Fix spotless checks.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed May 9, 2022
1 parent e77b616 commit 22e7dde
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,8 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery
flush(false, true);
translog.trimUnreferencedReaders();
}
// Package private for testing purposes only

// Package private for testing purposes only
boolean hasSnapshottedCommits() {
return combinedDeletionPolicy.hasSnapshottedCommits();
}
Expand Down Expand Up @@ -543,7 +544,6 @@ public long getWritingBytes() {
return indexWriter.getFlushingBytes() + versionMap.getRefreshingBytes();
}


private ExternalReaderManager createReaderManager(RefreshWarmerListener externalRefreshListener) throws EngineException {
boolean success = false;
OpenSearchReaderManager internalReaderManager = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,25 +50,26 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
try {
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId));
final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(this.lastCommittedSegmentInfos.getUserData().entrySet());
final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
this.lastCommittedSegmentInfos.getUserData().entrySet()
);
this.localCheckpointTracker = new LocalCheckpointTracker(commitInfo.maxSeqNo, commitInfo.localCheckpoint);
this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));
this.readerManager.addListener(completionStatsCache);
} catch (IOException e) {
IOUtils.closeWhileHandlingException(store::decRef, translog);
IOUtils.closeWhileHandlingException(store::decRef, translog);
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
}
}

public synchronized void updateSegments(final SegmentInfos infos, long seqNo)
throws IOException {
public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException {
try {
store.incRef();
// Update the current infos reference on the Engine's reader.
readerManager.updateSegments(infos);

// only update the persistedSeqNo and "lastCommitted" infos reference if the incoming segments have a higher
// generation. We can still refresh with incoming SegmentInfos that are not part of a commit point.
// generation. We can still refresh with incoming SegmentInfos that are not part of a commit point.
if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) {
this.lastCommittedSegmentInfos = infos;
localCheckpointTracker.fastForwardPersistedSeqNo(seqNo);
Expand Down Expand Up @@ -106,12 +107,7 @@ public boolean isThrottled() {

@Override
public IndexResult index(Index index) throws IOException {
IndexResult indexResult = new IndexResult(
index.version(),
index.primaryTerm(),
index.seqNo(),
false
);
IndexResult indexResult = new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), false);
addIndexOperationToTranslog(index, indexResult);
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
Expand All @@ -121,12 +117,7 @@ public IndexResult index(Index index) throws IOException {

@Override
public DeleteResult delete(Delete delete) throws IOException {
DeleteResult deleteResult = new DeleteResult(
delete.version(),
delete.primaryTerm(),
delete.seqNo(),
true
);
DeleteResult deleteResult = new DeleteResult(delete.version(), delete.primaryTerm(), delete.seqNo(), true);
addDeleteOperationToTranslog(delete, deleteResult);
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
Expand Down Expand Up @@ -161,7 +152,13 @@ public Closeable acquireHistoryRetentionLock() {
}

@Override
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange, boolean accurateCount) throws IOException {
public Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) throws IOException {
throw new UnsupportedOperationException("Not implemented");
}

Expand All @@ -170,7 +167,6 @@ public int countNumberOfHistoryOperations(String source, long fromSeqNo, long to
return 0;
}


@Override
public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) {
return false;
Expand Down Expand Up @@ -229,9 +225,15 @@ public boolean shouldPeriodicallyFlush() {
@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {}


@Override
public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments, String forceMergeUUID) throws EngineException, IOException {}
public void forceMerge(
boolean flush,
int maxNumSegments,
boolean onlyExpungeDeletes,
boolean upgrade,
boolean upgradeOnlyAncientSegments,
String forceMergeUUID
) throws EngineException, IOException {}

@Override
public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) throws EngineException {
Expand Down Expand Up @@ -323,9 +325,6 @@ protected SegmentInfos getLatestSegmentInfos() {

private DirectoryReader getDirectoryReader() throws IOException {
// for segment replication: replicas should create the reader from store, we don't want an open IW on replicas.
return new SoftDeletesDirectoryReaderWrapper(
DirectoryReader.open(store.directory()),
Lucene.SOFT_DELETES_FIELD
);
return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.apache.lucene.index.StandardDirectoryReader;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.index.shard.ShardId;

import java.io.IOException;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@ protected TranslogAwareEngine(EngineConfig engineConfig) {
customTranslogDeletionPolicy = engineConfig.getCustomTranslogDeletionPolicyFactory()
.create(engineConfig.getIndexSettings(), engineConfig.retentionLeasesSupplier());
}
translogDeletionPolicy = Objects.requireNonNullElseGet(customTranslogDeletionPolicy, () -> new DefaultTranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
));
translogDeletionPolicy = Objects.requireNonNullElseGet(
customTranslogDeletionPolicy,
() -> new DefaultTranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
)
);
try {
store.trimUnsafeCommits(engineConfig.getTranslogConfig().getTranslogPath());
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier(), seqNo -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@

import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.hamcrest.MatcherAssert;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
Expand All @@ -29,8 +27,10 @@ public class NRTReplicationEngineTest extends EngineTestCase {

public void testCreateEngine() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (final Store nrtEngineStore = createStore();
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);) {
try (
final Store nrtEngineStore = createStore();
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);
) {
final SegmentInfos latestSegmentInfos = nrtEngine.getLatestSegmentInfos();
final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos();
assertEquals(latestSegmentInfos.version, lastCommittedSegmentInfos.version);
Expand All @@ -44,9 +44,16 @@ public void testCreateEngine() throws IOException {
public void testEngineWritesOpsToTranslog() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

try (final Store nrtEngineStore = createStore();
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);) {
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean());
try (
final Store nrtEngineStore = createStore();
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);
) {
List<Engine.Operation> operations = generateHistoryOnReplica(
between(1, 500),
randomBoolean(),
randomBoolean(),
randomBoolean()
);
for (Engine.Operation op : operations) {
applyOperation(engine, op);
applyOperation(nrtEngine, op);
Expand All @@ -70,10 +77,15 @@ public void testEngineWritesOpsToTranslog() throws Exception {
public void testUpdateSegments() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

try (final Store nrtEngineStore = createStore();
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);) {
try (
final Store nrtEngineStore = createStore();
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);
) {
// add docs to the primary engine.
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean()).stream().filter(op -> op.operationType().equals(Engine.Operation.TYPE.INDEX)).collect(Collectors.toList());
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean())
.stream()
.filter(op -> op.operationType().equals(Engine.Operation.TYPE.INDEX))
.collect(Collectors.toList());
for (Engine.Operation op : operations) {
applyOperation(engine, op);
applyOperation(nrtEngine, op);
Expand Down Expand Up @@ -116,7 +128,15 @@ private void assertSearcherHits(Engine engine, int hits) {
private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException {
Lucene.cleanLuceneIndex(store.directory());
final Path translogDir = createTempDir();
final EngineConfig replicaConfig = config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get);
final EngineConfig replicaConfig = config(
defaultSettings,
store,
translogDir,
NoMergePolicy.INSTANCE,
null,
null,
globalCheckpoint::get
);
if (Lucene.indexExists(store.directory()) == false) {
store.createEmpty(replicaConfig.getIndexSettings().getIndexVersionCreated().luceneVersion);
final String translogUuid = Translog.createEmptyTranslog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4399,11 +4399,9 @@ public void testReadOnlyReplicaEngineConfig() throws IOException {
assertTrue(replicaShard.getEngine().config().isReadOnlyReplica());
assertEquals(replicaShard.getEngine().getClass(), NRTReplicationEngine.class);


closeShards(primaryShard, replicaShard);
}


public void testCloseShardWhileEngineIsWarming() throws Exception {
CountDownLatch warmerStarted = new CountDownLatch(1);
CountDownLatch warmerBlocking = new CountDownLatch(1);
Expand Down

0 comments on commit 22e7dde

Please sign in to comment.