Skip to content

Commit

Permalink
Skip local recovery for closed or frozen indices (#44887)
Browse files Browse the repository at this point in the history
For closed and frozen indices, we should not recover shard locally up to
the global checkpoint before performing peer recovery for that copy
might be offline when the index was closed/frozen.

Relates #43463
Closes #44855
  • Loading branch information
dnhatn committed Jul 30, 2019
1 parent 6f6aaca commit f8bfcb3
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1400,6 +1400,13 @@ public long recoverLocallyUpToGlobalCheckpoint() {
recoveryState.getTranslog().totalLocal(0);
return globalCheckpoint + 1;
}
if (indexSettings.getIndexMetaData().getState() == IndexMetaData.State.CLOSE ||
IndexMetaData.INDEX_BLOCKS_WRITE_SETTING.get(indexSettings.getSettings())) {
logger.trace("skip local recovery as the index was closed or not allowed to write; safe commit {} global checkpoint {}",
safeCommit.get(), globalCheckpoint);
recoveryState.getTranslog().totalLocal(0);
return safeCommit.get().localCheckpoint + 1;
}
try {
final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> {
recoveryState.getTranslog().totalLocal(snapshot.totalOperations());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public void testRecoverFromNoOp() throws IOException {
indexShard.close("test", true);

final ShardRouting shardRouting = indexShard.routingEntry();
IndexShard primary = reinitShard(indexShard, initWithSameId(shardRouting, ExistingStoreRecoverySource.INSTANCE), NoOpEngine::new);
IndexShard primary = reinitShard(indexShard, initWithSameId(shardRouting, ExistingStoreRecoverySource.INSTANCE),
indexShard.indexSettings().getIndexMetaData(), NoOpEngine::new);
recoverShardFromStore(primary);
assertEquals(primary.seqNoStats().getMaxSeqNo(), primary.getMaxSeqNoOfUpdatesOrDeletes());
assertEquals(nbDocs, primary.docStats().getCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4098,7 +4098,7 @@ public void testDoNotTrimCommitsWhenOpenReadOnlyEngine() throws Exception {
final ShardRouting replicaRouting = shard.routingEntry();
ShardRouting readonlyShardRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true,
ShardRoutingState.INITIALIZING, RecoverySource.ExistingStoreRecoverySource.INSTANCE);
final IndexShard readonlyShard = reinitShard(shard, readonlyShardRouting,
final IndexShard readonlyShard = reinitShard(shard, readonlyShardRouting, shard.indexSettings.getIndexMetaData(),
engineConfig -> new ReadOnlyEngine(engineConfig, null, null, true, Function.identity()) {
@Override
protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,18 @@
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.NoOpEngine;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
Expand Down Expand Up @@ -134,23 +137,24 @@ public void testWriteFileChunksConcurrently() throws Exception {
closeShards(sourceShard, targetShard);
}

public void testPrepareIndexForPeerRecovery() throws Exception {
CheckedFunction<IndexShard, Long, Exception> populateData = shard -> {
List<Long> seqNos = LongStream.range(0, 100).boxed().collect(Collectors.toList());
Randomness.shuffle(seqNos);
for (long seqNo : seqNos) {
shard.applyIndexOperationOnReplica(seqNo, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, new SourceToParse(
shard.shardId().getIndexName(), "_doc", UUIDs.randomBase64UUID(), new BytesArray("{}"), XContentType.JSON));
if (randomInt(100) < 5) {
shard.flush(new FlushRequest().waitIfOngoing(true));
}
private SeqNoStats populateRandomData(IndexShard shard) throws IOException {
List<Long> seqNos = LongStream.range(0, 100).boxed().collect(Collectors.toList());
Randomness.shuffle(seqNos);
for (long seqNo : seqNos) {
shard.applyIndexOperationOnReplica(seqNo, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, new SourceToParse(
shard.shardId().getIndexName(), "_doc", UUIDs.randomBase64UUID(), new BytesArray("{}"), XContentType.JSON));
if (randomInt(100) < 5) {
shard.flush(new FlushRequest().waitIfOngoing(true));
}
shard.sync();
long globalCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, shard.getLocalCheckpoint());
shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test");
shard.sync();
return globalCheckpoint;
};
}
shard.sync();
long globalCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, shard.getLocalCheckpoint());
shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test");
shard.sync();
return shard.seqNoStats();
}

public void testPrepareIndexForPeerRecovery() throws Exception {
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);

Expand All @@ -166,7 +170,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception {

// good copy
shard = newStartedShard(false);
long globalCheckpoint = populateData.apply(shard);
long globalCheckpoint = populateRandomData(shard).getGlobalCheckpoint();
Optional<SequenceNumbers.CommitInfo> safeCommit = shard.store().findSafeIndexCommit(globalCheckpoint);
assertTrue(safeCommit.isPresent());
int expectedTotalLocal = 0;
Expand All @@ -191,7 +195,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
// corrupted copy
shard = newStartedShard(false);
if (randomBoolean()) {
populateData.apply(shard);
populateRandomData(shard);
}
shard.store().markStoreCorrupted(new IOException("test"));
replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(),
Expand All @@ -206,7 +210,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception {

// copy with truncated translog
shard = newStartedShard(false);
globalCheckpoint = populateData.apply(shard);
globalCheckpoint = populateRandomData(shard).getGlobalCheckpoint();
replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(),
RecoverySource.PeerRecoverySource.INSTANCE));
String translogUUID = Translog.createEmptyTranslog(replica.shardPath().resolveTranslog(), globalCheckpoint,
Expand All @@ -226,4 +230,32 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
closeShards(replica);
}

public void testClosedIndexSkipsLocalRecovery() throws Exception {
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
IndexShard shard = newStartedShard(false);
long globalCheckpoint = populateRandomData(shard).getGlobalCheckpoint();
Optional<SequenceNumbers.CommitInfo> safeCommit = shard.store().findSafeIndexCommit(globalCheckpoint);
assertTrue(safeCommit.isPresent());
final IndexMetaData indexMetaData;
if (randomBoolean()) {
indexMetaData = IndexMetaData.builder(shard.indexSettings().getIndexMetaData())
.settings(shard.indexSettings().getSettings())
.state(IndexMetaData.State.CLOSE).build();
} else {
indexMetaData = IndexMetaData.builder(shard.indexSettings().getIndexMetaData())
.settings(Settings.builder().put(shard.indexSettings().getSettings())
.put(IndexMetaData.SETTING_BLOCKS_WRITE, true)).build();
}
IndexShard replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(),
RecoverySource.PeerRecoverySource.INSTANCE), indexMetaData, NoOpEngine::new);
replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode));
replica.prepareForIndexRecovery();
assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(safeCommit.get().localCheckpoint + 1));
assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(0));
assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0));
assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
closeShards(replica);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -426,23 +426,24 @@ protected IndexShard reinitShard(IndexShard current, IndexingOperationListener..
* @param listeners new listerns to use for the newly created shard
*/
protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexingOperationListener... listeners) throws IOException {
return reinitShard(current, routing, current.engineFactory, listeners);
return reinitShard(current, routing, current.indexSettings.getIndexMetaData(), current.engineFactory, listeners);
}

/**
* Takes an existing shard, closes it and starts a new initialing shard at the same location
*
* @param routing the shard routing to use for the newly created shard.
* @param listeners new listerns to use for the newly created shard
* @param routing the shard routing to use for the newly created shard.
* @param listeners new listerns to use for the newly created shard
* @param indexMetaData the index metadata to use for the newly created shard
* @param engineFactory the engine factory for the new shard
*/
protected IndexShard reinitShard(IndexShard current, ShardRouting routing, EngineFactory engineFactory,
protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexMetaData indexMetaData, EngineFactory engineFactory,
IndexingOperationListener... listeners) throws IOException {
closeShards(current);
return newShard(
routing,
current.shardPath(),
current.indexSettings().getIndexMetaData(),
indexMetaData,
null,
null,
engineFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,92 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.index.engine;

import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;

import java.io.IOException;

import static org.hamcrest.Matchers.equalTo;

public class FrozenIndexRecoveryTests extends IndexShardTestCase {

/**
* Make sure we can recover from a frozen engine
*/
public void testRecoverFromFrozenPrimary() throws IOException {
IndexShard indexShard = newStartedShard(true);
indexDoc(indexShard, "_doc", "1");
indexDoc(indexShard, "_doc", "2");
indexDoc(indexShard, "_doc", "3");
indexShard.close("test", true);
final ShardRouting shardRouting = indexShard.routingEntry();
IndexShard frozenShard = reinitShard(indexShard, ShardRoutingHelper.initWithSameId(shardRouting,
shardRouting.primary() ? RecoverySource.ExistingStoreRecoverySource.INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE
), FrozenEngine::new);
recoverShardFromStore(frozenShard);
assertThat(frozenShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(frozenShard.seqNoStats().getMaxSeqNo()));
assertDocCount(frozenShard, 3);

IndexShard replica = newShard(false, Settings.EMPTY, FrozenEngine::new);
recoverReplica(replica, frozenShard, true);
assertDocCount(replica, 3);
closeShards(frozenShard, replica);
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.protocol.xpack.frozen.FreezeRequest;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction;
import org.elasticsearch.xpack.frozen.FrozenIndices;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static java.util.stream.Collectors.toList;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.not;

public class FrozenIndexRecoveryTests extends ESIntegTestCase {

@Override
protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(FrozenIndices.class);
return plugins;
}

public void testRecoverExistingReplica() throws Exception {
final String indexName = "test-recover-existing-replica";
internalCluster().ensureAtLeastNumDataNodes(2);
List<String> dataNodes = randomSubsetOf(2, Sets.newHashSet(
clusterService().state().nodes().getDataNodes().valuesIt()).stream().map(DiscoveryNode::getName).collect(Collectors.toSet()));
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.routing.allocation.include._name", String.join(",", dataNodes))
.build());
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));
ensureGreen(indexName);
if (randomBoolean()) {
client().admin().indices().prepareFlush(indexName).get();
} else {
client().admin().indices().prepareSyncedFlush(indexName).get();
}
// index more documents while one shard copy is offline
internalCluster().restartNode(dataNodes.get(1), new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
Client client = client(dataNodes.get(0));
int moreDocs = randomIntBetween(1, 50);
for (int i = 0; i < moreDocs; i++) {
client.prepareIndex(indexName, "_doc").setSource("num", i).get();
}
assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest(indexName)).actionGet());
return super.onNodeStopped(nodeName);
}
});
ensureGreen(indexName);
internalCluster().assertSameDocIdsOnShards();
for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) {
if (recovery.getPrimary() == false) {
assertThat(recovery.getIndex().fileDetails(), not(empty()));
}
}
internalCluster().fullRestart();
ensureGreen(indexName);
internalCluster().assertSameDocIdsOnShards();
for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) {
if (recovery.getPrimary() == false) {
assertThat(recovery.getIndex().fileDetails(), empty());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.index.engine;

import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;

import java.io.IOException;

import static org.hamcrest.Matchers.equalTo;

public class FrozenIndexShardTests extends IndexShardTestCase {

/**
* Make sure we can recover from a frozen engine
*/
public void testRecoverFromFrozenPrimary() throws IOException {
IndexShard indexShard = newStartedShard(true);
indexDoc(indexShard, "_doc", "1");
indexDoc(indexShard, "_doc", "2");
indexDoc(indexShard, "_doc", "3");
indexShard.close("test", true);
final ShardRouting shardRouting = indexShard.routingEntry();
IndexShard frozenShard = reinitShard(indexShard, ShardRoutingHelper.initWithSameId(shardRouting,
shardRouting.primary() ? RecoverySource.ExistingStoreRecoverySource.INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE
), indexShard.indexSettings().getIndexMetaData(), FrozenEngine::new);
recoverShardFromStore(frozenShard);
assertThat(frozenShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(frozenShard.seqNoStats().getMaxSeqNo()));
assertDocCount(frozenShard, 3);

IndexShard replica = newShard(false, Settings.EMPTY, FrozenEngine::new);
recoverReplica(replica, frozenShard, true);
assertDocCount(replica, 3);
closeShards(frozenShard, replica);
}
}

0 comments on commit f8bfcb3

Please sign in to comment.