From fe69c6046f272b6adf440c593a0c95682a65df9f Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 25 Nov 2019 11:42:11 +0100 Subject: [PATCH] Enhance SnapshotResiliencyTests (#49514) A few enhancements to `SnapshotResiliencyTests`: 1. Test running requests from random nodes in more spots to enhance coverage (this is particularly motivated by #49060 where the additional number of cluster state updates makes it more interesting to fully cover all kinds of network failures) 2. Fix issue with restarting only master node in one test (doing so breaks the test at an incredibly low frequency, that becomes not so low in #49060 with the additional cluster state updates between request and response) 3. Improved cluster formation checks (now properly checks the term as well when forming cluster) + makes sure all nodes are connected to all other nodes (previously the data nodes would at times not be connected to other data nodes, which was shaken out now by adding the `client()` method 4. Make sure the cluster left behind by the test makes sense by running the repo cleanup action on it (this also increases coverage of the repository cleanup action obviously and adds the basis of making it part of more resiliency tests) --- .../snapshots/SnapshotResiliencyTests.java | 139 +++++++++++------- 1 file changed, 85 insertions(+), 54 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index b4ba7fbc9269b..c4d27ff13fca8 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -21,11 +21,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.RequestValidators; import org.elasticsearch.action.StepListener; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryAction; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.TransportCleanupRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction; @@ -235,6 +240,16 @@ public void createServices() { @After public void verifyReposThenStopServices() { try { + clearDisruptionsAndAwaitSync(); + + final StepListener cleanupResponse = new StepListener<>(); + client().admin().cluster().cleanupRepository( + new CleanupRepositoryRequest("repo"), cleanupResponse); + final AtomicBoolean cleanedUp = new AtomicBoolean(false); + continueOrDie(cleanupResponse, r -> cleanedUp.set(true)); + + runUntil(cleanedUp::get, TimeUnit.MINUTES.toMillis(1L)); + if (blobStoreContext != null) { blobStoreContext.forceConsistent(); } @@ -260,8 +275,8 @@ public void testSuccessfulSnapshotAndRestore() { final StepListener createSnapshotResponseListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> { - final Runnable afterIndexing = () -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> { + final Runnable afterIndexing = () -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) .setWaitForCompletion(true).execute(createSnapshotResponseListener); if (documents == 0) { afterIndexing.run(); @@ -271,7 +286,7 @@ public void testSuccessfulSnapshotAndRestore() { bulkRequest.add(new IndexRequest(index).source(Collections.singletonMap("foo", "bar" + i))); } final StepListener bulkResponseStepListener = new StepListener<>(); - masterNode.client.bulk(bulkRequest, bulkResponseStepListener); + client().bulk(bulkRequest, bulkResponseStepListener); continueOrDie(bulkResponseStepListener, bulkResponse -> { assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); assertEquals(documents, bulkResponse.getItems().length); @@ -283,16 +298,16 @@ public void testSuccessfulSnapshotAndRestore() { final StepListener deleteIndexListener = new StepListener<>(); continueOrDie(createSnapshotResponseListener, - createSnapshotResponse -> masterNode.client.admin().indices().delete(new DeleteIndexRequest(index), deleteIndexListener)); + createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), deleteIndexListener)); final StepListener restoreSnapshotResponseListener = new StepListener<>(); - continueOrDie(deleteIndexListener, ignored -> masterNode.client.admin().cluster().restoreSnapshot( + continueOrDie(deleteIndexListener, ignored -> client().admin().cluster().restoreSnapshot( new RestoreSnapshotRequest(repoName, snapshotName).waitForCompletion(true), restoreSnapshotResponseListener)); final StepListener searchResponseListener = new StepListener<>(); continueOrDie(restoreSnapshotResponseListener, restoreSnapshotResponse -> { assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards()); - masterNode.client.search( + client().search( new SearchRequest(index).source(new SearchSourceBuilder().size(0).trackTotalHits(true)), searchResponseListener); }); @@ -321,33 +336,33 @@ public void testSuccessfulSnapshotAndRestore() { public void testSnapshotWithNodeDisconnects() { final int dataNodes = randomIntBetween(2, 10); - setupTestCluster(randomFrom(1, 3, 5), dataNodes); + final int masterNodes = randomFrom(1, 3, 5); + setupTestCluster(masterNodes, dataNodes); String repoName = "repo"; String snapshotName = "snapshot"; final String index = "test"; final int shards = randomIntBetween(1, 10); - TestClusterNodes.TestClusterNode masterNode = - testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); - final StepListener createSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> { + continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> { for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) { scheduleNow(this::disconnectRandomDataNode); } if (randomBoolean()) { scheduleNow(() -> testClusterNodes.clearNetworkDisruptions()); } - masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName).execute(createSnapshotResponseStepListener); + testClusterNodes.randomMasterNodeSafe().client.admin().cluster() + .prepareCreateSnapshot(repoName, snapshotName).execute(createSnapshotResponseStepListener); }); continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> { for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) { scheduleNow(this::disconnectOrRestartDataNode); } - final boolean disconnectedMaster = randomBoolean(); + // Only disconnect master if we have more than a single master and can simulate a failover + final boolean disconnectedMaster = randomBoolean() && masterNodes > 1; if (disconnectedMaster) { scheduleNow(this::disconnectOrRestartMasterNode); } @@ -387,18 +402,18 @@ public void testConcurrentSnapshotCreateAndDelete() { final StepListener createSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), - createIndexResponse -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + continueOrDie(createRepoAndIndex(repoName, index, shards), + createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) .execute(createSnapshotResponseStepListener)); final StepListener deleteSnapshotStepListener = new StepListener<>(); - continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> masterNode.client.admin().cluster().deleteSnapshot( + continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> client().admin().cluster().deleteSnapshot( new DeleteSnapshotRequest(repoName, snapshotName), deleteSnapshotStepListener)); final StepListener createAnotherSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(deleteSnapshotStepListener, acknowledgedResponse -> masterNode.client.admin().cluster() + continueOrDie(deleteSnapshotStepListener, acknowledgedResponse -> client().admin().cluster() .prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute(createAnotherSnapshotResponseStepListener)); continueOrDie(createAnotherSnapshotResponseStepListener, createSnapshotResponse -> assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS)); @@ -432,24 +447,26 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { final StepListener createSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), - createIndexResponse -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + continueOrDie(createRepoAndIndex(repoName, index, shards), + createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) .setWaitForCompletion(true).execute(createSnapshotResponseStepListener)); final StepListener createOtherSnapshotResponseStepListener = new StepListener<>(); continueOrDie(createSnapshotResponseStepListener, - createSnapshotResponse -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2") + createSnapshotResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2") .execute(createOtherSnapshotResponseStepListener)); final StepListener deleteSnapshotStepListener = new StepListener<>(); continueOrDie(createOtherSnapshotResponseStepListener, - createSnapshotResponse -> masterNode.client.admin().cluster().deleteSnapshot( + createSnapshotResponse -> client().admin().cluster().deleteSnapshot( new DeleteSnapshotRequest(repoName, snapshotName), ActionListener.wrap( resp -> deleteSnapshotStepListener.onResponse(true), e -> { - assertThat(e, instanceOf(ConcurrentSnapshotExecutionException.class)); + final Throwable unwrapped = + ExceptionsHelper.unwrap(e, ConcurrentSnapshotExecutionException.class); + assertThat(unwrapped, instanceOf(ConcurrentSnapshotExecutionException.class)); deleteSnapshotStepListener.onResponse(false); }))); @@ -458,8 +475,7 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { continueOrDie(deleteSnapshotStepListener, deleted -> { if (deleted) { // The delete worked out, creating a third snapshot - masterNode.client.admin().cluster() - .prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true) + client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true) .execute(createAnotherSnapshotResponseStepListener); continueOrDie(createAnotherSnapshotResponseStepListener, createSnapshotResponse -> assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS)); @@ -470,13 +486,12 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { deterministicTaskQueue.runAllRunnableTasks(); - final CreateSnapshotResponse thirdSnapshotResponse = createAnotherSnapshotResponseStepListener.result(); - SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); - assertThat(snapshotIds, hasSize(thirdSnapshotResponse == null ? 2 : 3)); + // We end up with two snapshots no matter if the delete worked out or not + assertThat(snapshotIds, hasSize(2)); for (SnapshotId snapshotId : snapshotIds) { final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId); @@ -508,8 +523,8 @@ public void testSnapshotPrimaryRelocations() { final StepListener clusterStateResponseStepListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), - createIndexResponse -> masterAdminClient.cluster().state(new ClusterStateRequest(), clusterStateResponseStepListener)); + continueOrDie(createRepoAndIndex(repoName, index, shards), + createIndexResponse -> client().admin().cluster().state(new ClusterStateRequest(), clusterStateResponseStepListener)); continueOrDie(clusterStateResponseStepListener, clusterStateResponse -> { final ShardRouting shardToRelocate = clusterStateResponse.getState().routingTable().allShards(index).get(0); @@ -579,19 +594,18 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { final StepListener createSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> { + continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> { final AtomicBoolean initiatedSnapshot = new AtomicBoolean(false); for (int i = 0; i < documents; ++i) { // Index a few documents with different field names so we trigger a dynamic mapping update for each of them - masterNode.client.bulk( - new BulkRequest().add(new IndexRequest(index).source(Map.of("foo" + i, "bar"))) + client().bulk(new BulkRequest().add(new IndexRequest(index).source(Map.of("foo" + i, "bar"))) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), assertNoFailureListener( bulkResponse -> { assertFalse("Failures in bulkresponse: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); if (initiatedSnapshot.compareAndSet(false, true)) { - masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) - .setWaitForCompletion(true).execute(createSnapshotResponseStepListener); + client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true) + .execute(createSnapshotResponseStepListener); } })); } @@ -601,7 +615,7 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { final StepListener restoreSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> masterNode.client.admin().cluster().restoreSnapshot( + continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> client().admin().cluster().restoreSnapshot( new RestoreSnapshotRequest(repoName, snapshotName) .renamePattern(index).renameReplacement(restoredIndex).waitForCompletion(true), restoreSnapshotResponseStepListener)); @@ -609,8 +623,7 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { continueOrDie(restoreSnapshotResponseStepListener, restoreSnapshotResponse -> { assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards()); - masterNode.client.search( - new SearchRequest(restoredIndex).source(new SearchSourceBuilder().size(documents).trackTotalHits(true)), + client().search(new SearchRequest(restoredIndex).source(new SearchSourceBuilder().size(documents).trackTotalHits(true)), searchResponseStepListener); }); @@ -652,18 +665,15 @@ private RepositoryData getRepositoryData(Repository repository) { return res.actionGet(); } - private StepListener createRepoAndIndex(TestClusterNodes.TestClusterNode masterNode, String repoName, - String index, int shards) { - final AdminClient adminClient = masterNode.client.admin(); - + private StepListener createRepoAndIndex(String repoName, String index, int shards) { final StepListener createRepositoryListener = new StepListener<>(); - adminClient.cluster().preparePutRepository(repoName).setType(FsRepository.TYPE) + client().admin().cluster().preparePutRepository(repoName).setType(FsRepository.TYPE) .setSettings(Settings.builder().put("location", randomAlphaOfLength(10))).execute(createRepositoryListener); final StepListener createIndexResponseStepListener = new StepListener<>(); - continueOrDie(createRepositoryListener, acknowledgedResponse -> adminClient.indices().create( + continueOrDie(createRepositoryListener, acknowledgedResponse -> client().admin().indices().create( new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL).settings(defaultIndexSettings(shards)), createIndexResponseStepListener)); @@ -672,11 +682,7 @@ private StepListener createRepoAndIndex(TestClusterNodes.Te private void clearDisruptionsAndAwaitSync() { testClusterNodes.clearNetworkDisruptions(); - runUntil(() -> { - final List versions = testClusterNodes.nodes.values().stream() - .map(n -> n.clusterService.state().version()).distinct().collect(Collectors.toList()); - return versions.size() == 1L; - }, TimeUnit.MINUTES.toMillis(1L)); + stabilize(); } private void disconnectOrRestartDataNode() { @@ -713,15 +719,25 @@ private void startCluster() { .filter(DiscoveryNode::isMasterNode).map(DiscoveryNode::getId).collect(Collectors.toSet())); testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode()).forEach( testClusterNode -> testClusterNode.coordinator.setInitialConfiguration(votingConfiguration)); + // Connect all nodes to each other + testClusterNodes.nodes.values().forEach(node -> testClusterNodes.nodes.values().forEach( + n -> n.transportService.connectToNode(node.node, null, + ActionTestUtils.assertNoFailureListener(c -> logger.info("--> Connected [{}] to [{}]", n.node, node.node))))); + stabilize(); + } + private void stabilize() { runUntil( () -> { - List masterNodeIds = testClusterNodes.nodes.values().stream() - .map(node -> node.clusterService.state().nodes().getMasterNodeId()) - .distinct().collect(Collectors.toList()); - return masterNodeIds.size() == 1 && masterNodeIds.contains(null) == false; + final Collection clusterStates = + testClusterNodes.nodes.values().stream().map(node -> node.clusterService.state()).collect(Collectors.toList()); + final Set masterNodeIds = clusterStates.stream() + .map(clusterState -> clusterState.nodes().getMasterNodeId()).collect(Collectors.toSet()); + final Set terms = clusterStates.stream().map(ClusterState::term).collect(Collectors.toSet()); + final List versions = clusterStates.stream().map(ClusterState::version).distinct().collect(Collectors.toList()); + return versions.size() == 1 && masterNodeIds.size() == 1 && masterNodeIds.contains(null) == false && terms.size() == 1; }, - TimeUnit.SECONDS.toMillis(30L) + TimeUnit.MINUTES.toMillis(1L) ); } @@ -767,6 +783,17 @@ private static ActionListener noopListener() { return ActionListener.wrap(() -> {}); } + public NodeClient client() { + // Select from sorted list of nodes + final List nodes = testClusterNodes.nodes.values().stream() + .filter(n -> testClusterNodes.disconnectedNodes.contains(n.node.getName()) == false) + .sorted(Comparator.comparing(n -> n.node.getName())).collect(Collectors.toList()); + if (nodes.isEmpty()) { + throw new AssertionError("No nodes available"); + } + return randomFrom(nodes).client; + } + /** * Create a {@link Environment} with random path.home and path.repo **/ @@ -842,7 +869,9 @@ public TestClusterNode randomMasterNodeSafe() { public Optional randomMasterNode() { // Select from sorted list of data-nodes here to not have deterministic behaviour - final List masterNodes = testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode()) + final List masterNodes = testClusterNodes.nodes.values().stream() + .filter(n -> n.node.isMasterNode()) + .filter(n -> disconnectedNodes.contains(n.node.getName()) == false) .sorted(Comparator.comparing(n -> n.node.getName())).collect(Collectors.toList()); return masterNodes.isEmpty() ? Optional.empty() : Optional.of(randomFrom(masterNodes)); } @@ -1166,6 +1195,8 @@ searchTransportService, new SearchPhaseController(searchService::createReduceCon transportService, clusterService, repositoriesService, threadPool, actionFilters, indexNameExpressionResolver )); + actions.put(CleanupRepositoryAction.INSTANCE, new TransportCleanupRepositoryAction(transportService, clusterService, + repositoriesService, threadPool, actionFilters, indexNameExpressionResolver)); actions.put(CreateSnapshotAction.INSTANCE, new TransportCreateSnapshotAction( transportService, clusterService, threadPool,