Skip to content

Commit

Permalink
Use global checkpoint as base for seq based recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Jun 21, 2019
1 parent f879e84 commit f8c0b15
Show file tree
Hide file tree
Showing 25 changed files with 382 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1877,11 +1877,6 @@ public interface Warmer {
*/
public abstract Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException;

/**
* Do not replay translog operations, but make the engine be ready.
*/
public abstract void skipTranslogRecovery();

/**
* Returns <code>true</code> iff this engine is currently recovering from translog.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,12 +447,6 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover
return this;
}

@Override
public void skipTranslogRecovery() {
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
pendingTranslogRecovery.set(false); // we are good - now we can commit
}

private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,6 @@ public Engine recoverFromTranslog(final TranslogRecoveryRunner translogRecoveryR
return this;
}

@Override
public void skipTranslogRecovery() {
}

@Override
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) {
}
Expand Down
21 changes: 8 additions & 13 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1393,8 +1393,10 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operat
/**
* opens the engine on top of the existing lucene engine and translog.
* Operations from the translog will be replayed to bring lucene up to date.
*
* @param recoverUpToSeqNo the upper bound of sequence number to be recovered (inclusive)
**/
public void openEngineAndRecoverFromTranslog() throws IOException {
public void openEngineAndRecoverFromTranslog(long recoverUpToSeqNo) throws IOException {
final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog();
final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> {
translogRecoveryStats.totalOperations(snapshot.totalOperations());
Expand All @@ -1403,16 +1405,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
translogRecoveryStats::incrementRecoveredOperations);
};
innerOpenEngineAndTranslog();
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
}

/**
* Opens the engine on top of the existing lucene engine and translog.
* The translog is kept but its operations won't be replayed.
*/
public void openEngineAndSkipTranslogRecovery() throws IOException {
innerOpenEngineAndTranslog();
getEngine().skipTranslogRecovery();
getEngine().recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
}

private void innerOpenEngineAndTranslog() throws IOException {
Expand Down Expand Up @@ -1788,8 +1781,10 @@ public List<Segment> segments(boolean verbose) {
return getEngine().segments(verbose);
}

public void flushAndCloseEngine() throws IOException {
getEngine().flushAndClose();
public void closeEngine() throws IOException {
synchronized (mutex) {
IOUtils.close(this.currentEngineReference.getAndSet(null));
}
}

public String getHistoryUUID() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
store.associateIndexWithNewTranslog(translogUUID);
writeEmptyRetentionLeasesFile(indexShard);
}
indexShard.openEngineAndRecoverFromTranslog();
indexShard.openEngineAndRecoverFromTranslog(Long.MAX_VALUE);
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from shard_store");
Expand Down Expand Up @@ -480,7 +480,7 @@ private void restore(final IndexShard indexShard, final Repository repository, f
store.associateIndexWithNewTranslog(translogUUID);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
writeEmptyRetentionLeasesFile(indexShard);
indexShard.openEngineAndRecoverFromTranslog();
indexShard.openEngineAndRecoverFromTranslog(Long.MAX_VALUE);
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
indexShard.postRecovery("restore done");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public PeerRecoverySourceService(TransportService transportService, IndicesServi
this.transportService = transportService;
this.indicesService = indicesService;
this.recoverySettings = recoverySettings;
transportService.registerRequestHandler(Actions.START_RECOVERY, StartRecoveryRequest::new, ThreadPool.Names.GENERIC,
transportService.registerRequestHandler(Actions.START_RECOVERY, ThreadPool.Names.GENERIC, StartRecoveryRequest::new,
new StartRecoveryTransportRequestHandler());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -119,7 +118,7 @@ public PeerRecoveryTargetService(ThreadPool threadPool, TransportService transpo
transportService.registerRequestHandler(Actions.CLEAN_FILES, ThreadPool.Names.GENERIC,
RecoveryCleanFilesRequest::new, new CleanFilesRequestHandler());
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC,
RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler());
RecoveryPrepareForTranslogRequest::new, new PrepareForTranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, ThreadPool.Names.GENERIC, RecoveryTranslogOperationsRequest::new,
new TranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
Expand Down Expand Up @@ -345,9 +344,10 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget);
logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());

final long globalCheckpoint = recoveryTarget.readGlobalCheckpointFromTranslog();
final long startingSeqNo;
if (metadataSnapshot.size() > 0) {
startingSeqNo = getStartingSeqNo(logger, recoveryTarget);
startingSeqNo = getStartingSeqNo(logger, recoveryTarget, globalCheckpoint);
} else {
startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
Expand All @@ -370,7 +370,8 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
metadataSnapshot,
recoveryTarget.state().getPrimary(),
recoveryTarget.recoveryId(),
startingSeqNo);
startingSeqNo,
globalCheckpoint);
return request;
}

Expand All @@ -381,11 +382,9 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
* @return the starting sequence number or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if obtaining the starting sequence number
* failed
*/
public static long getStartingSeqNo(final Logger logger, final RecoveryTarget recoveryTarget) {
public static long getStartingSeqNo(final Logger logger, final RecoveryTarget recoveryTarget, final long globalCheckpoint) {
try {
final Store store = recoveryTarget.store();
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), translogUUID);
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint);
final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(safeCommit);
Expand Down Expand Up @@ -424,14 +423,15 @@ public interface RecoveryListener {
void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure);
}

class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryPrepareForTranslogOperationsRequest> {
class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryPrepareForTranslogRequest> {

@Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) {
public void messageReceived(RecoveryPrepareForTranslogRequest request, TransportChannel channel, Task task) {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ActionListener<TransportResponse> listener = new ChannelActionListener<>(channel, Actions.PREPARE_TRANSLOG, request);
final ActionListener<RecoveryPrepareForTranslogOperationsResponse> listener = new ChannelActionListener<>(
channel, Actions.PREPARE_TRANSLOG, request);
recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps(),
ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
request.recoverUpToSeqNo(), ActionListener.map(listener, RecoveryPrepareForTranslogOperationsResponse::new));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.indices.recovery;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.transport.TransportResponse;

import java.io.IOException;
import java.util.Optional;

final class RecoveryPrepareForTranslogOperationsResponse extends TransportResponse {
final Optional<Store.MetadataSnapshot> targetMetadata;

RecoveryPrepareForTranslogOperationsResponse(Optional<Store.MetadataSnapshot> targetMetadata) {
this.targetMetadata = targetMetadata;
}

RecoveryPrepareForTranslogOperationsResponse(final StreamInput in) throws IOException {
super(in);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
targetMetadata = Optional.ofNullable(in.readOptionalWriteable(Store.MetadataSnapshot::new));
} else {
targetMetadata = Optional.empty();
}
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeOptionalWriteable(targetMetadata.orElse(null));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,43 @@

package org.elasticsearch.indices.recovery;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;

class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
final class RecoveryPrepareForTranslogRequest extends TransportRequest {

private final long recoveryId;
private final ShardId shardId;
private final int totalTranslogOps;
private final boolean fileBasedRecovery;
private final long recoverUpToSeqNo;

RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, boolean fileBasedRecovery) {
RecoveryPrepareForTranslogRequest(long recoveryId, ShardId shardId, int totalTranslogOps,
boolean fileBasedRecovery, long recoverUpToSeqNo) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.totalTranslogOps = totalTranslogOps;
this.fileBasedRecovery = fileBasedRecovery;
this.recoverUpToSeqNo = recoverUpToSeqNo;
}

RecoveryPrepareForTranslogOperationsRequest(StreamInput in) throws IOException {
RecoveryPrepareForTranslogRequest(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
totalTranslogOps = in.readVInt();
fileBasedRecovery = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
recoverUpToSeqNo = in.readZLong();
} else {
recoverUpToSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}

public long recoveryId() {
Expand All @@ -67,12 +77,19 @@ public boolean isFileBasedRecovery() {
return fileBasedRecovery;
}

public long recoverUpToSeqNo() {
return recoverUpToSeqNo;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);
out.writeVInt(totalTranslogOps);
out.writeBoolean(fileBasedRecovery);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeZLong(recoverUpToSeqNo);
}
}
}
Loading

0 comments on commit f8c0b15

Please sign in to comment.