From e3fbd100b6c9fba29588f2b661478860b04d0ff5 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 13 Sep 2018 20:09:01 +0200 Subject: [PATCH 1/7] [CCR] Changed AutoFollowCoordinator to keep track of certain statistics The following stats are being kept track of: 1) The total number of times that auto following a leader index succeed. 2) The total number of times that auto following a leader index failed. 3) The total number of times that fetching a remote cluster state failed. 4) The most recent 256 auto follow failures per auto leader index (e.g. create_and_follow api call fails) or cluster alias (e.g. fetching remote cluster state fails). Each auto follow run now produces a result that is being used to update the stats being kept track of in AutoFollowCoordinator. Relates to #33007 --- .../xpack/ccr/CcrMultiClusterLicenseIT.java | 2 +- .../xpack/ccr/CcrLicenseChecker.java | 2 +- .../ccr/action/AutoFollowCoordinator.java | 156 +++++++++++--- .../action/AutoFollowCoordinatorTests.java | 101 ++++++++- .../ccr/action/AutoFollowStatsTests.java | 77 +++++++ .../xpack/core/ccr/AutoFollowStats.java | 194 ++++++++++++++++++ 6 files changed, 494 insertions(+), 38 deletions(-) create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsTests.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java index 7bc952a3ea8e8..505683b892ca8 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java @@ -64,7 +64,7 @@ public void testAutoFollow() throws Exception { while (it.hasNext()) { final String line = it.next(); if (line.matches(".*\\[WARN\\s*\\]\\[o\\.e\\.x\\.c\\.a\\.AutoFollowCoordinator\\s*\\] \\[node-0\\] " + - "failure occurred during auto-follower coordination")) { + "failure occurred while fetching cluster state in leader cluster \\[leader_cluster\\]")) { warn = true; break; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index 2161d0a14237a..067f4a602aff1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -54,7 +54,7 @@ public final class CcrLicenseChecker { * * @param isCcrAllowed a boolean supplier that should return true if CCR is allowed and false otherwise */ - CcrLicenseChecker(final BooleanSupplier isCcrAllowed) { + public CcrLicenseChecker(final BooleanSupplier isCcrAllowed) { this.isCcrAllowed = Objects.requireNonNull(isCcrAllowed); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 722cbddde1891..5145e5960b608 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -7,6 +7,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.client.Client; @@ -17,8 +19,10 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.license.LicenseUtils; @@ -27,15 +31,18 @@ import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; +import org.elasticsearch.xpack.core.ccr.AutoFollowStats; import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction; import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; +import java.util.TreeMap; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -47,6 +54,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier { private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class); + private static final int MAX_AUTO_FOLLOW_ERRORS = 256; private final Client client; private final TimeValue pollInterval; @@ -56,6 +64,12 @@ public class AutoFollowCoordinator implements ClusterStateApplier { private volatile boolean localNodeMaster = false; + // The following fields are read and updated under a lock: + private long numberOfSuccessfulIndicesAutoFollowed = 0; + private long numberOfFailedIndicesAutoFollowed = 0; + private long numberOfFailedRemoteClusterStateRequests = 0; + private final LinkedHashMap recentAutoFollowErrors; + public AutoFollowCoordinator( Settings settings, Client client, @@ -69,6 +83,43 @@ public AutoFollowCoordinator( this.pollInterval = CcrSettings.CCR_AUTO_FOLLOW_POLL_INTERVAL.get(settings); clusterService.addStateApplier(this); + + this.recentAutoFollowErrors = new LinkedHashMap() { + @Override + protected boolean removeEldestEntry(final Map.Entry eldest) { + return size() > MAX_AUTO_FOLLOW_ERRORS; + } + }; + } + + public synchronized AutoFollowStats getStats() { + return new AutoFollowStats( + numberOfFailedIndicesAutoFollowed, + numberOfFailedRemoteClusterStateRequests, + numberOfSuccessfulIndicesAutoFollowed, + new TreeMap<>(recentAutoFollowErrors) + ); + } + + synchronized void updateStats(List results) { + for (AutoFollowResult result : results) { + if (result.clusterStateFetchException != null) { + recentAutoFollowErrors.put(result.clusterAlias, + new ElasticsearchException(result.clusterStateFetchException)); + numberOfFailedRemoteClusterStateRequests++; + } else { + for (Map.Entry entry : result.autoFollowExecutionResults.entrySet()) { + if (entry.getValue() != null) { + numberOfFailedIndicesAutoFollowed++; + recentAutoFollowErrors.put(result.clusterAlias + ":" + entry.getKey().getName(), + new ElasticsearchException(entry.getValue())); + } else { + numberOfSuccessfulIndicesAutoFollowed++; + } + } + } + + } } private void doAutoFollow() { @@ -94,10 +145,20 @@ private void doAutoFollow() { return; } - Consumer handler = e -> { - if (e != null) { - LOGGER.warn("failure occurred during auto-follower coordination", e); + Consumer> handler = results -> { + for (AutoFollowResult result : results) { + if (result.clusterStateFetchException != null) { + LOGGER.warn(new ParameterizedMessage("failure occurred while fetching cluster state in leader cluster [{}]", + result.clusterAlias), result.clusterStateFetchException); + } + for (Map.Entry entry : result.autoFollowExecutionResults.entrySet()) { + if (entry.getValue() != null) { + LOGGER.warn(new ParameterizedMessage("failure occurred while auto following index [{}] in leader cluster [{}]", + entry.getKey(), result.clusterAlias), entry.getValue()); + } + } } + updateStats(results); threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); }; AutoFollower operation = new AutoFollower(handler, followerClusterState) { @@ -168,47 +229,58 @@ public void applyClusterState(ClusterChangedEvent event) { abstract static class AutoFollower { - private final Consumer handler; + private final Consumer> handler; private final ClusterState followerClusterState; private final AutoFollowMetadata autoFollowMetadata; private final CountDown autoFollowPatternsCountDown; - private final AtomicReference autoFollowPatternsErrorHolder = new AtomicReference<>(); + private final AtomicArray clusterAliasResults; - AutoFollower(final Consumer handler, final ClusterState followerClusterState) { + AutoFollower(final Consumer> handler, final ClusterState followerClusterState) { this.handler = handler; this.followerClusterState = followerClusterState; this.autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); this.autoFollowPatternsCountDown = new CountDown(autoFollowMetadata.getPatterns().size()); + this.clusterAliasResults = new AtomicArray<>(autoFollowMetadata.getPatterns().size()); } void autoFollowIndices() { + int i = 0; for (Map.Entry entry : autoFollowMetadata.getPatterns().entrySet()) { - String clusterAlias = entry.getKey(); - AutoFollowPattern autoFollowPattern = entry.getValue(); - List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias); + final int slot = i; + final String clusterAlias = entry.getKey(); + final AutoFollowPattern autoFollowPattern = entry.getValue(); + final List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias); getLeaderClusterState(clusterAlias, (leaderClusterState, e) -> { if (leaderClusterState != null) { assert e == null; - handleClusterAlias(clusterAlias, autoFollowPattern, followedIndices, leaderClusterState); + handleClusterAlias(slot, clusterAlias, autoFollowPattern, followedIndices, leaderClusterState); } else { - finalise(e); + finalise(slot, new AutoFollowResult(clusterAlias, e)); } }); + i++; } } - private void handleClusterAlias(String clusterAlias, AutoFollowPattern autoFollowPattern, - List followedIndexUUIDs, ClusterState leaderClusterState) { + private void handleClusterAlias( + int clusterAliasSlot, + String clusterAlias, + AutoFollowPattern autoFollowPattern, + List followedIndexUUIDs, + ClusterState leaderClusterState + ) { final List leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, followerClusterState, followedIndexUUIDs); if (leaderIndicesToFollow.isEmpty()) { - finalise(null); + finalise(clusterAliasSlot, new AutoFollowResult(clusterAlias)); } else { final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size()); - final AtomicReference leaderIndicesErrorHolder = new AtomicReference<>(); - for (Index indexToFollow : leaderIndicesToFollow) { + final AtomicArray> results = new AtomicArray<>(leaderIndicesToFollow.size()); + for (int i = 0; i < leaderIndicesToFollow.size(); i++) { + final int slot = i; + final Index indexToFollow = leaderIndicesToFollow.get(i); final String leaderIndexName = indexToFollow.getName(); final String followIndexName = getFollowerIndexName(autoFollowPattern, leaderIndexName); @@ -232,14 +304,13 @@ private void handleClusterAlias(String clusterAlias, AutoFollowPattern autoFollo updateAutoFollowMetadata(function, updateError -> { if (updateError != null) { LOGGER.error("Failed to mark leader index [" + leaderIndexName + "] as auto followed", updateError); - if (leaderIndicesErrorHolder.compareAndSet(null, updateError) == false) { - leaderIndicesErrorHolder.get().addSuppressed(updateError); - } + results.set(slot, new Tuple<>(indexToFollow, updateError)); } else { + results.set(slot, new Tuple<>(indexToFollow, null)); LOGGER.debug("Successfully marked leader index [{}] as auto followed", leaderIndexName); } if (leaderIndicesCountDown.countDown()) { - finalise(leaderIndicesErrorHolder.get()); + finalise(clusterAliasSlot, new AutoFollowResult(clusterAlias, results.asList())); } }); }; @@ -247,8 +318,9 @@ private void handleClusterAlias(String clusterAlias, AutoFollowPattern autoFollo Consumer failureHandler = followError -> { assert followError != null; LOGGER.warn("Failed to auto follow leader index [" + leaderIndexName + "]", followError); + results.set(slot, new Tuple<>(indexToFollow, followError)); if (leaderIndicesCountDown.countDown()) { - finalise(followError); + finalise(clusterAliasSlot, new AutoFollowResult(clusterAlias, results.asList())); } }; createAndFollow(followRequest, successHandler, failureHandler); @@ -256,13 +328,10 @@ private void handleClusterAlias(String clusterAlias, AutoFollowPattern autoFollo } } - private void finalise(Exception failure) { - if (autoFollowPatternsErrorHolder.compareAndSet(null, failure) == false) { - autoFollowPatternsErrorHolder.get().addSuppressed(failure); - } - + private void finalise(int slot, AutoFollowResult result) { + clusterAliasResults.set(slot, result); if (autoFollowPatternsCountDown.countDown()) { - handler.accept(autoFollowPatternsErrorHolder.get()); + handler.accept(clusterAliasResults.asList()); } } @@ -324,4 +393,35 @@ static Function recordLeaderIndexAsFollowFunction(St abstract void updateAutoFollowMetadata(Function updateFunction, Consumer handler); } + + static class AutoFollowResult { + + final String clusterAlias; + final Exception clusterStateFetchException; + final Map autoFollowExecutionResults; + + AutoFollowResult(String clusterAlias, List> results) { + this.clusterAlias = clusterAlias; + + Map autoFollowExecutionResults = new HashMap<>(); + for (Tuple result : results) { + autoFollowExecutionResults.put(result.v1(), result.v2()); + } + + this.clusterStateFetchException = null; + this.autoFollowExecutionResults = Collections.unmodifiableMap(autoFollowExecutionResults); + } + + AutoFollowResult(String clusterAlias, Exception e) { + this.clusterAlias = clusterAlias; + this.clusterStateFetchException = e; + this.autoFollowExecutionResults = Collections.emptyMap(); + } + + AutoFollowResult(String clusterAlias) { + this.clusterAlias = clusterAlias; + this.clusterStateFetchException = null; + this.autoFollowExecutionResults = Collections.emptyMap(); + } + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 5ab11cf5b0c81..2756b455bfe3d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -11,15 +11,20 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; +import org.elasticsearch.xpack.core.ccr.AutoFollowStats; import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -63,9 +68,15 @@ public void testAutoFollower() { .build(); boolean[] invoked = new boolean[]{false}; - Consumer handler = e -> { + Consumer> handler = results -> { invoked[0] = true; - assertThat(e, nullValue()); + + assertThat(results.size(), equalTo(1)); + assertThat(results.get(0).clusterStateFetchException, nullValue()); + List> entries = new ArrayList<>(results.get(0).autoFollowExecutionResults.entrySet()); + assertThat(entries.size(), equalTo(1)); + assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); + assertThat(entries.get(0).getValue(), nullValue()); }; AutoFollower autoFollower = new AutoFollower(handler, currentState) { @Override @@ -110,9 +121,12 @@ public void testAutoFollowerClusterStateApiFailure() { Exception failure = new RuntimeException("failure"); boolean[] invoked = new boolean[]{false}; - Consumer handler = e -> { + Consumer> handler = results -> { invoked[0] = true; - assertThat(e, sameInstance(failure)); + + assertThat(results.size(), equalTo(1)); + assertThat(results.get(0).clusterStateFetchException, sameInstance(failure)); + assertThat(results.get(0).autoFollowExecutionResults.entrySet().size(), equalTo(0)); }; AutoFollower autoFollower = new AutoFollower(handler, followerState) { @Override @@ -158,9 +172,15 @@ public void testAutoFollowerUpdateClusterStateFailure() { Exception failure = new RuntimeException("failure"); boolean[] invoked = new boolean[]{false}; - Consumer handler = e -> { + Consumer> handler = results -> { invoked[0] = true; - assertThat(e, sameInstance(failure)); + + assertThat(results.size(), equalTo(1)); + assertThat(results.get(0).clusterStateFetchException, nullValue()); + List> entries = new ArrayList<>(results.get(0).autoFollowExecutionResults.entrySet()); + assertThat(entries.size(), equalTo(1)); + assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); + assertThat(entries.get(0).getValue(), sameInstance(failure)); }; AutoFollower autoFollower = new AutoFollower(handler, followerState) { @Override @@ -208,9 +228,15 @@ public void testAutoFollowerCreateAndFollowApiCallFailure() { Exception failure = new RuntimeException("failure"); boolean[] invoked = new boolean[]{false}; - Consumer handler = e -> { + Consumer> handler = results -> { invoked[0] = true; - assertThat(e, sameInstance(failure)); + + assertThat(results.size(), equalTo(1)); + assertThat(results.get(0).clusterStateFetchException, nullValue()); + List> entries = new ArrayList<>(results.get(0).autoFollowExecutionResults.entrySet()); + assertThat(entries.size(), equalTo(1)); + assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); + assertThat(entries.get(0).getValue(), sameInstance(failure)); }; AutoFollower autoFollower = new AutoFollower(handler, followerState) { @Override @@ -294,4 +320,63 @@ public void testGetFollowerIndexName() { assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); } + public void testStats() { + AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + Settings.EMPTY, + null, + null, + mock(ClusterService.class), + new CcrLicenseChecker(() -> true) + ); + + autoFollowCoordinator.updateStats(Collections.singletonList( + new AutoFollowCoordinator.AutoFollowResult("_alias1")) + ); + AutoFollowStats autoFollowStats = autoFollowCoordinator.getStats(); + assertThat(autoFollowStats.getNumberOfFailedIndicesAutoFollowed(), equalTo(0L)); + assertThat(autoFollowStats.getNumberOfFailedRemoteClusterStateRequests(), equalTo(0L)); + assertThat(autoFollowStats.getNumberOfSuccessfulIndicesAutoFollowed(), equalTo(0L)); + assertThat(autoFollowStats.getRecentAutoFollowErrors().size(), equalTo(0)); + + autoFollowCoordinator.updateStats(Collections.singletonList( + new AutoFollowCoordinator.AutoFollowResult("_alias1", new RuntimeException("error"))) + ); + autoFollowStats = autoFollowCoordinator.getStats(); + assertThat(autoFollowStats.getNumberOfFailedIndicesAutoFollowed(), equalTo(0L)); + assertThat(autoFollowStats.getNumberOfFailedRemoteClusterStateRequests(), equalTo(1L)); + assertThat(autoFollowStats.getNumberOfSuccessfulIndicesAutoFollowed(), equalTo(0L)); + assertThat(autoFollowStats.getRecentAutoFollowErrors().size(), equalTo(1)); + assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1").getCause().getMessage(), equalTo("error")); + + autoFollowCoordinator.updateStats(Arrays.asList( + new AutoFollowCoordinator.AutoFollowResult("_alias1", + Collections.singletonList(Tuple.tuple(new Index("index1", "_na_"), new RuntimeException("error")))), + new AutoFollowCoordinator.AutoFollowResult("_alias2", + Collections.singletonList(Tuple.tuple(new Index("index2", "_na_"), new RuntimeException("error")))) + )); + autoFollowStats = autoFollowCoordinator.getStats(); + assertThat(autoFollowStats.getNumberOfFailedIndicesAutoFollowed(), equalTo(2L)); + assertThat(autoFollowStats.getNumberOfFailedRemoteClusterStateRequests(), equalTo(1L)); + assertThat(autoFollowStats.getNumberOfSuccessfulIndicesAutoFollowed(), equalTo(0L)); + assertThat(autoFollowStats.getRecentAutoFollowErrors().size(), equalTo(3)); + assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1").getCause().getMessage(), equalTo("error")); + assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1:index1").getCause().getMessage(), equalTo("error")); + assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias2:index2").getCause().getMessage(), equalTo("error")); + + autoFollowCoordinator.updateStats(Arrays.asList( + new AutoFollowCoordinator.AutoFollowResult("_alias1", + Collections.singletonList(Tuple.tuple(new Index("index1", "_na_"), null))), + new AutoFollowCoordinator.AutoFollowResult("_alias2", + Collections.singletonList(Tuple.tuple(new Index("index2", "_na_"), null))) + )); + autoFollowStats = autoFollowCoordinator.getStats(); + assertThat(autoFollowStats.getNumberOfFailedIndicesAutoFollowed(), equalTo(2L)); + assertThat(autoFollowStats.getNumberOfFailedRemoteClusterStateRequests(), equalTo(1L)); + assertThat(autoFollowStats.getNumberOfSuccessfulIndicesAutoFollowed(), equalTo(2L)); + assertThat(autoFollowStats.getRecentAutoFollowErrors().size(), equalTo(3)); + assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1").getCause().getMessage(), equalTo("error")); + assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1:index1").getCause().getMessage(), equalTo("error")); + assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias2:index2").getCause().getMessage(), equalTo("error")); + } + } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsTests.java new file mode 100644 index 0000000000000..b9ee5bf464616 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsTests.java @@ -0,0 +1,77 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.core.ccr.AutoFollowStats; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; + +public class AutoFollowStatsTests extends AbstractSerializingTestCase { + + @Override + protected AutoFollowStats doParseInstance(XContentParser parser) throws IOException { + return AutoFollowStats.fromXContent(parser); + } + + @Override + protected AutoFollowStats createTestInstance() { + return new AutoFollowStats( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomReadExceptions() + ); + } + + private static NavigableMap randomReadExceptions() { + final int count = randomIntBetween(0, 16); + final NavigableMap readExceptions = new TreeMap<>(); + for (int i = 0; i < count; i++) { + readExceptions.put("" + i, new ElasticsearchException(new IllegalStateException("index [" + i + "]"))); + } + return readExceptions; + } + + @Override + protected Writeable.Reader instanceReader() { + return AutoFollowStats::new; + } + + @Override + protected void assertEqualInstances(AutoFollowStats expectedInstance, AutoFollowStats newInstance) { + assertNotSame(expectedInstance, newInstance); + + assertThat(newInstance.getRecentAutoFollowErrors().size(), equalTo(expectedInstance.getRecentAutoFollowErrors().size())); + assertThat(newInstance.getRecentAutoFollowErrors().keySet(), equalTo(expectedInstance.getRecentAutoFollowErrors().keySet())); + for (final Map.Entry entry : newInstance.getRecentAutoFollowErrors().entrySet()) { + // x-content loses the exception + final ElasticsearchException expected = expectedInstance.getRecentAutoFollowErrors().get(entry.getKey()); + assertThat(entry.getValue().getMessage(), containsString(expected.getMessage())); + assertNotNull(entry.getValue().getCause()); + assertThat( + entry.getValue().getCause(), + anyOf(instanceOf(ElasticsearchException.class), instanceOf(IllegalStateException.class))); + assertThat(entry.getValue().getCause().getMessage(), containsString(expected.getCause().getMessage())); + } + } + + @Override + protected boolean assertToXContentEquivalence() { + return false; + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java new file mode 100644 index 0000000000000..afd863fb1b623 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java @@ -0,0 +1,194 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ccr; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.AbstractMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.TreeMap; +import java.util.stream.Collectors; + +public class AutoFollowStats implements Writeable, ToXContentObject { + + private static final ParseField NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED = + new ParseField("number_of_successful_indices_auto_followed"); + private static final ParseField NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED = new ParseField("number_of_failed_indices_auto_followed"); + private static final ParseField NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS = + new ParseField("number_of_failed_remote_cluster_state_requests"); + private static final ParseField RECENT_AUTO_FOLLOW_ERRORS = new ParseField("recent_auto_follow_errors"); + private static final ParseField LEADER_INDEX = new ParseField("leader_index"); + private static final ParseField AUTO_FOLLOW_EXCEPTION = new ParseField("auto_follow_exception"); + + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser STATS_PARSER = new ConstructingObjectParser<>("auto_follow_stats", + args -> new AutoFollowStats( + (Long) args[0], + (Long) args[1], + (Long) args[2], + new TreeMap<>( + ((List>) args[3]) + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) + )); + + private static final ConstructingObjectParser, Void> AUTO_FOLLOW_EXCEPTIONS_PARSER = + new ConstructingObjectParser<>( + "auto_follow_stats_errors", + args -> new AbstractMap.SimpleEntry<>((String) args[0], (ElasticsearchException) args[1])); + + static { + AUTO_FOLLOW_EXCEPTIONS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX); + AUTO_FOLLOW_EXCEPTIONS_PARSER.declareObject( + ConstructingObjectParser.constructorArg(), + (p, c) -> ElasticsearchException.fromXContent(p), + AUTO_FOLLOW_EXCEPTION); + + STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED); + STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS); + STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED); + STATS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), AUTO_FOLLOW_EXCEPTIONS_PARSER, RECENT_AUTO_FOLLOW_ERRORS); + } + + public static AutoFollowStats fromXContent(final XContentParser parser) { + return STATS_PARSER.apply(parser, null); + } + + private final long numberOfFailedIndicesAutoFollowed; + private final long numberOfFailedRemoteClusterStateRequests; + private final long numberOfSuccessfulIndicesAutoFollowed; + private final NavigableMap recentAutoFollowErrors; + + public AutoFollowStats( + long numberOfFailedIndicesAutoFollowed, + long numberOfFailedRemoteClusterStateRequests, + long numberOfSuccessfulIndicesAutoFollowed, + NavigableMap recentAutoFollowErrors + ) { + this.numberOfFailedIndicesAutoFollowed = numberOfFailedIndicesAutoFollowed; + this.numberOfFailedRemoteClusterStateRequests = numberOfFailedRemoteClusterStateRequests; + this.numberOfSuccessfulIndicesAutoFollowed = numberOfSuccessfulIndicesAutoFollowed; + this.recentAutoFollowErrors = recentAutoFollowErrors; + } + + public AutoFollowStats(StreamInput in) throws IOException { + numberOfFailedIndicesAutoFollowed = in.readVLong(); + numberOfFailedRemoteClusterStateRequests = in.readVLong(); + numberOfSuccessfulIndicesAutoFollowed = in.readVLong(); + recentAutoFollowErrors= new TreeMap<>(in.readMap(StreamInput::readString, StreamInput::readException)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(numberOfFailedIndicesAutoFollowed); + out.writeVLong(numberOfFailedRemoteClusterStateRequests); + out.writeVLong(numberOfSuccessfulIndicesAutoFollowed); + out.writeMap(recentAutoFollowErrors, StreamOutput::writeString, StreamOutput::writeException); + } + + public long getNumberOfFailedIndicesAutoFollowed() { + return numberOfFailedIndicesAutoFollowed; + } + + public long getNumberOfFailedRemoteClusterStateRequests() { + return numberOfFailedRemoteClusterStateRequests; + } + + public long getNumberOfSuccessfulIndicesAutoFollowed() { + return numberOfSuccessfulIndicesAutoFollowed; + } + + public NavigableMap getRecentAutoFollowErrors() { + return recentAutoFollowErrors; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field(NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED.getPreferredName(), numberOfFailedIndicesAutoFollowed); + builder.field(NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS.getPreferredName(), numberOfFailedRemoteClusterStateRequests); + builder.field(NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED.getPreferredName(), numberOfSuccessfulIndicesAutoFollowed); + builder.startArray(RECENT_AUTO_FOLLOW_ERRORS.getPreferredName()); + { + for (final Map.Entry entry : recentAutoFollowErrors.entrySet()) { + builder.startObject(); + { + builder.field(LEADER_INDEX.getPreferredName(), entry.getKey()); + builder.field(AUTO_FOLLOW_EXCEPTION.getPreferredName()); + builder.startObject(); + { + ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue()); + } + builder.endObject(); + } + builder.endObject(); + } + } + builder.endArray(); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AutoFollowStats that = (AutoFollowStats) o; + return numberOfFailedIndicesAutoFollowed == that.numberOfFailedIndicesAutoFollowed && + numberOfFailedRemoteClusterStateRequests == that.numberOfFailedRemoteClusterStateRequests && + numberOfSuccessfulIndicesAutoFollowed == that.numberOfSuccessfulIndicesAutoFollowed && + /* + * ElasticsearchException does not implement equals so we will assume the fetch exceptions are equal if they are equal + * up to the key set and their messages. Note that we are relying on the fact that the auto follow exceptions are ordered by + * keys. + */ + recentAutoFollowErrors.keySet().equals(that.recentAutoFollowErrors.keySet()) && + getFetchExceptionMessages(this).equals(getFetchExceptionMessages(that)); + } + + @Override + public int hashCode() { + return Objects.hash( + numberOfFailedIndicesAutoFollowed, + numberOfFailedRemoteClusterStateRequests, + numberOfSuccessfulIndicesAutoFollowed, + /* + * ElasticsearchException does not implement hash code so we will compute the hash code based on the key set and the + * messages. Note that we are relying on the fact that the auto follow exceptions are ordered by keys. + */ + recentAutoFollowErrors.keySet(), + getFetchExceptionMessages(this) + ); + } + + private static List getFetchExceptionMessages(final AutoFollowStats status) { + return status.getRecentAutoFollowErrors().values().stream().map(ElasticsearchException::getMessage).collect(Collectors.toList()); + } + + @Override + public String toString() { + return "AutoFollowStats{" + + "numberOfFailedIndicesAutoFollowed=" + numberOfFailedIndicesAutoFollowed + + ", numberOfFailedRemoteClusterStateRequests=" + numberOfFailedRemoteClusterStateRequests + + ", numberOfSuccessfulIndicesAutoFollowed=" + numberOfSuccessfulIndicesAutoFollowed + + ", recentAutoFollowErrors=" + recentAutoFollowErrors + + '}'; + } +} From c30d440265f906ebeb5cb26f0b466b4a513dcbc4 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 13 Sep 2018 20:46:41 +0200 Subject: [PATCH 2/7] fixed checkstyle violation --- .../java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java index afd863fb1b623..f7868bc6d0bde 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java @@ -62,7 +62,8 @@ public class AutoFollowStats implements Writeable, ToXContentObject { STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED); STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS); STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED); - STATS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), AUTO_FOLLOW_EXCEPTIONS_PARSER, RECENT_AUTO_FOLLOW_ERRORS); + STATS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), AUTO_FOLLOW_EXCEPTIONS_PARSER, + RECENT_AUTO_FOLLOW_ERRORS); } public static AutoFollowStats fromXContent(final XContentParser parser) { From 59cde1a280b32ebb564df4094d3e2606f1638629 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 17 Sep 2018 09:20:46 +0200 Subject: [PATCH 3/7] iter --- .../ccr/action/AutoFollowCoordinator.java | 30 +++++++------------ 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index ae33b3349c944..27ff7b20b0045 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -107,12 +107,16 @@ synchronized void updateStats(List results) { recentAutoFollowErrors.put(result.clusterAlias, new ElasticsearchException(result.clusterStateFetchException)); numberOfFailedRemoteClusterStateRequests++; + LOGGER.warn(new ParameterizedMessage("failure occurred while fetching cluster state in leader cluster [{}]", + result.clusterAlias), result.clusterStateFetchException); } else { for (Map.Entry entry : result.autoFollowExecutionResults.entrySet()) { if (entry.getValue() != null) { numberOfFailedIndicesAutoFollowed++; recentAutoFollowErrors.put(result.clusterAlias + ":" + entry.getKey().getName(), new ElasticsearchException(entry.getValue())); + LOGGER.warn(new ParameterizedMessage("failure occurred while auto following index [{}] in leader cluster [{}]", + entry.getKey(), result.clusterAlias), entry.getValue()); } else { numberOfSuccessfulIndicesAutoFollowed++; } @@ -146,18 +150,6 @@ private void doAutoFollow() { } Consumer> handler = results -> { - for (AutoFollowResult result : results) { - if (result.clusterStateFetchException != null) { - LOGGER.warn(new ParameterizedMessage("failure occurred while fetching cluster state in leader cluster [{}]", - result.clusterAlias), result.clusterStateFetchException); - } - for (Map.Entry entry : result.autoFollowExecutionResults.entrySet()) { - if (entry.getValue() != null) { - LOGGER.warn(new ParameterizedMessage("failure occurred while auto following index [{}] in leader cluster [{}]", - entry.getKey(), result.clusterAlias), entry.getValue()); - } - } - } updateStats(results); threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); }; @@ -244,14 +236,14 @@ abstract static class AutoFollower { private final AutoFollowMetadata autoFollowMetadata; private final CountDown autoFollowPatternsCountDown; - private final AtomicArray clusterAliasResults; + private final AtomicArray autoFollowResults; AutoFollower(final Consumer> handler, final ClusterState followerClusterState) { this.handler = handler; this.followerClusterState = followerClusterState; this.autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); this.autoFollowPatternsCountDown = new CountDown(autoFollowMetadata.getPatterns().size()); - this.clusterAliasResults = new AtomicArray<>(autoFollowMetadata.getPatterns().size()); + this.autoFollowResults = new AtomicArray<>(autoFollowMetadata.getPatterns().size()); } void autoFollowIndices() { @@ -312,6 +304,7 @@ private void handleClusterAlias( Function function = recordLeaderIndexAsFollowFunction(clusterAlias, indexToFollow); // The coordinator always runs on the elected master node, so we can update cluster state here: updateAutoFollowMetadata(function, updateError -> { + assert results.get(slot) == null; if (updateError != null) { LOGGER.error("Failed to mark leader index [" + leaderIndexName + "] as auto followed", updateError); results.set(slot, new Tuple<>(indexToFollow, updateError)); @@ -339,9 +332,10 @@ private void handleClusterAlias( } private void finalise(int slot, AutoFollowResult result) { - clusterAliasResults.set(slot, result); + assert autoFollowResults.get(slot) == null; + autoFollowResults.set(slot, result); if (autoFollowPatternsCountDown.countDown()) { - handler.accept(clusterAliasResults.asList()); + handler.accept(autoFollowResults.asList()); } } @@ -442,9 +436,7 @@ static class AutoFollowResult { } AutoFollowResult(String clusterAlias) { - this.clusterAlias = clusterAlias; - this.clusterStateFetchException = null; - this.autoFollowExecutionResults = Collections.emptyMap(); + this(clusterAlias, (Exception) null); } } } From 90fb198b20468bba0f1dbcfd5b8827c48b14fba0 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 17 Sep 2018 09:55:16 +0200 Subject: [PATCH 4/7] let autoFollowIndices() handle passing down the result to finilise() method, this avoids handleClusterAlias() having to know about clusterAliasSlot parameter. --- .../xpack/ccr/action/AutoFollowCoordinator.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 27ff7b20b0045..853b30b96da6a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -257,7 +257,8 @@ void autoFollowIndices() { getLeaderClusterState(autoFollowPattern.getHeaders(), clusterAlias, (leaderClusterState, e) -> { if (leaderClusterState != null) { assert e == null; - handleClusterAlias(slot, clusterAlias, autoFollowPattern, followedIndices, leaderClusterState); + Consumer resultHandler = result -> finalise(slot, result); + handleClusterAlias(clusterAlias, autoFollowPattern, followedIndices, leaderClusterState, resultHandler); } else { finalise(slot, new AutoFollowResult(clusterAlias, e)); } @@ -267,16 +268,16 @@ void autoFollowIndices() { } private void handleClusterAlias( - int clusterAliasSlot, String clusterAlias, AutoFollowPattern autoFollowPattern, List followedIndexUUIDs, - ClusterState leaderClusterState + ClusterState leaderClusterState, + Consumer resultHandler ) { final List leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, followerClusterState, followedIndexUUIDs); if (leaderIndicesToFollow.isEmpty()) { - finalise(clusterAliasSlot, new AutoFollowResult(clusterAlias)); + resultHandler.accept(new AutoFollowResult(clusterAlias)); } else { final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size()); final AtomicArray> results = new AtomicArray<>(leaderIndicesToFollow.size()); @@ -313,7 +314,7 @@ private void handleClusterAlias( LOGGER.debug("Successfully marked leader index [{}] as auto followed", leaderIndexName); } if (leaderIndicesCountDown.countDown()) { - finalise(clusterAliasSlot, new AutoFollowResult(clusterAlias, results.asList())); + resultHandler.accept(new AutoFollowResult(clusterAlias, results.asList())); } }); }; @@ -323,7 +324,7 @@ private void handleClusterAlias( LOGGER.warn("Failed to auto follow leader index [" + leaderIndexName + "]", followError); results.set(slot, new Tuple<>(indexToFollow, followError)); if (leaderIndicesCountDown.countDown()) { - finalise(clusterAliasSlot, new AutoFollowResult(clusterAlias, results.asList())); + resultHandler.accept(new AutoFollowResult(clusterAlias, results.asList())); } }; createAndFollow(autoFollowPattern.getHeaders(), followRequest, successHandler, failureHandler); From 3a15fe6ded516723ef7bda721012cf990677dda0 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 17 Sep 2018 14:34:14 +0200 Subject: [PATCH 5/7] rename --- .../action/AutoFollowCoordinatorTests.java | 16 +++--- .../xpack/core/ccr/AutoFollowStats.java | 49 +++++++++---------- 2 files changed, 32 insertions(+), 33 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 9448fe3fe7e46..218825e41207b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -356,18 +356,18 @@ public void testStats() { new AutoFollowCoordinator.AutoFollowResult("_alias1")) ); AutoFollowStats autoFollowStats = autoFollowCoordinator.getStats(); - assertThat(autoFollowStats.getNumberOfFailedIndicesAutoFollowed(), equalTo(0L)); + assertThat(autoFollowStats.getNumberOfFailedFollowIndices(), equalTo(0L)); assertThat(autoFollowStats.getNumberOfFailedRemoteClusterStateRequests(), equalTo(0L)); - assertThat(autoFollowStats.getNumberOfSuccessfulIndicesAutoFollowed(), equalTo(0L)); + assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(0L)); assertThat(autoFollowStats.getRecentAutoFollowErrors().size(), equalTo(0)); autoFollowCoordinator.updateStats(Collections.singletonList( new AutoFollowCoordinator.AutoFollowResult("_alias1", new RuntimeException("error"))) ); autoFollowStats = autoFollowCoordinator.getStats(); - assertThat(autoFollowStats.getNumberOfFailedIndicesAutoFollowed(), equalTo(0L)); + assertThat(autoFollowStats.getNumberOfFailedFollowIndices(), equalTo(0L)); assertThat(autoFollowStats.getNumberOfFailedRemoteClusterStateRequests(), equalTo(1L)); - assertThat(autoFollowStats.getNumberOfSuccessfulIndicesAutoFollowed(), equalTo(0L)); + assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(0L)); assertThat(autoFollowStats.getRecentAutoFollowErrors().size(), equalTo(1)); assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1").getCause().getMessage(), equalTo("error")); @@ -378,9 +378,9 @@ public void testStats() { Collections.singletonList(Tuple.tuple(new Index("index2", "_na_"), new RuntimeException("error")))) )); autoFollowStats = autoFollowCoordinator.getStats(); - assertThat(autoFollowStats.getNumberOfFailedIndicesAutoFollowed(), equalTo(2L)); + assertThat(autoFollowStats.getNumberOfFailedFollowIndices(), equalTo(2L)); assertThat(autoFollowStats.getNumberOfFailedRemoteClusterStateRequests(), equalTo(1L)); - assertThat(autoFollowStats.getNumberOfSuccessfulIndicesAutoFollowed(), equalTo(0L)); + assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(0L)); assertThat(autoFollowStats.getRecentAutoFollowErrors().size(), equalTo(3)); assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1").getCause().getMessage(), equalTo("error")); assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1:index1").getCause().getMessage(), equalTo("error")); @@ -393,9 +393,9 @@ public void testStats() { Collections.singletonList(Tuple.tuple(new Index("index2", "_na_"), null))) )); autoFollowStats = autoFollowCoordinator.getStats(); - assertThat(autoFollowStats.getNumberOfFailedIndicesAutoFollowed(), equalTo(2L)); + assertThat(autoFollowStats.getNumberOfFailedFollowIndices(), equalTo(2L)); assertThat(autoFollowStats.getNumberOfFailedRemoteClusterStateRequests(), equalTo(1L)); - assertThat(autoFollowStats.getNumberOfSuccessfulIndicesAutoFollowed(), equalTo(2L)); + assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(2L)); assertThat(autoFollowStats.getRecentAutoFollowErrors().size(), equalTo(3)); assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1").getCause().getMessage(), equalTo("error")); assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1:index1").getCause().getMessage(), equalTo("error")); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java index f7868bc6d0bde..7133a201f4e2a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java @@ -26,9 +26,8 @@ public class AutoFollowStats implements Writeable, ToXContentObject { - private static final ParseField NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED = - new ParseField("number_of_successful_indices_auto_followed"); - private static final ParseField NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED = new ParseField("number_of_failed_indices_auto_followed"); + private static final ParseField NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED = new ParseField("number_of_successful_follow_indices"); + private static final ParseField NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED = new ParseField("number_of_failed_follow_indices"); private static final ParseField NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS = new ParseField("number_of_failed_remote_cluster_state_requests"); private static final ParseField RECENT_AUTO_FOLLOW_ERRORS = new ParseField("recent_auto_follow_errors"); @@ -70,48 +69,48 @@ public static AutoFollowStats fromXContent(final XContentParser parser) { return STATS_PARSER.apply(parser, null); } - private final long numberOfFailedIndicesAutoFollowed; + private final long numberOfFailedFollowIndices; private final long numberOfFailedRemoteClusterStateRequests; - private final long numberOfSuccessfulIndicesAutoFollowed; + private final long numberOfSuccessfulFollowIndices; private final NavigableMap recentAutoFollowErrors; public AutoFollowStats( - long numberOfFailedIndicesAutoFollowed, + long numberOfFailedFollowIndices, long numberOfFailedRemoteClusterStateRequests, - long numberOfSuccessfulIndicesAutoFollowed, + long numberOfSuccessfulFollowIndices, NavigableMap recentAutoFollowErrors ) { - this.numberOfFailedIndicesAutoFollowed = numberOfFailedIndicesAutoFollowed; + this.numberOfFailedFollowIndices = numberOfFailedFollowIndices; this.numberOfFailedRemoteClusterStateRequests = numberOfFailedRemoteClusterStateRequests; - this.numberOfSuccessfulIndicesAutoFollowed = numberOfSuccessfulIndicesAutoFollowed; + this.numberOfSuccessfulFollowIndices = numberOfSuccessfulFollowIndices; this.recentAutoFollowErrors = recentAutoFollowErrors; } public AutoFollowStats(StreamInput in) throws IOException { - numberOfFailedIndicesAutoFollowed = in.readVLong(); + numberOfFailedFollowIndices = in.readVLong(); numberOfFailedRemoteClusterStateRequests = in.readVLong(); - numberOfSuccessfulIndicesAutoFollowed = in.readVLong(); + numberOfSuccessfulFollowIndices = in.readVLong(); recentAutoFollowErrors= new TreeMap<>(in.readMap(StreamInput::readString, StreamInput::readException)); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVLong(numberOfFailedIndicesAutoFollowed); + out.writeVLong(numberOfFailedFollowIndices); out.writeVLong(numberOfFailedRemoteClusterStateRequests); - out.writeVLong(numberOfSuccessfulIndicesAutoFollowed); + out.writeVLong(numberOfSuccessfulFollowIndices); out.writeMap(recentAutoFollowErrors, StreamOutput::writeString, StreamOutput::writeException); } - public long getNumberOfFailedIndicesAutoFollowed() { - return numberOfFailedIndicesAutoFollowed; + public long getNumberOfFailedFollowIndices() { + return numberOfFailedFollowIndices; } public long getNumberOfFailedRemoteClusterStateRequests() { return numberOfFailedRemoteClusterStateRequests; } - public long getNumberOfSuccessfulIndicesAutoFollowed() { - return numberOfSuccessfulIndicesAutoFollowed; + public long getNumberOfSuccessfulFollowIndices() { + return numberOfSuccessfulFollowIndices; } public NavigableMap getRecentAutoFollowErrors() { @@ -122,9 +121,9 @@ public NavigableMap getRecentAutoFollowErrors() public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); { - builder.field(NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED.getPreferredName(), numberOfFailedIndicesAutoFollowed); + builder.field(NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED.getPreferredName(), numberOfFailedFollowIndices); builder.field(NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS.getPreferredName(), numberOfFailedRemoteClusterStateRequests); - builder.field(NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED.getPreferredName(), numberOfSuccessfulIndicesAutoFollowed); + builder.field(NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED.getPreferredName(), numberOfSuccessfulFollowIndices); builder.startArray(RECENT_AUTO_FOLLOW_ERRORS.getPreferredName()); { for (final Map.Entry entry : recentAutoFollowErrors.entrySet()) { @@ -152,9 +151,9 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; AutoFollowStats that = (AutoFollowStats) o; - return numberOfFailedIndicesAutoFollowed == that.numberOfFailedIndicesAutoFollowed && + return numberOfFailedFollowIndices == that.numberOfFailedFollowIndices && numberOfFailedRemoteClusterStateRequests == that.numberOfFailedRemoteClusterStateRequests && - numberOfSuccessfulIndicesAutoFollowed == that.numberOfSuccessfulIndicesAutoFollowed && + numberOfSuccessfulFollowIndices == that.numberOfSuccessfulFollowIndices && /* * ElasticsearchException does not implement equals so we will assume the fetch exceptions are equal if they are equal * up to the key set and their messages. Note that we are relying on the fact that the auto follow exceptions are ordered by @@ -167,9 +166,9 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash( - numberOfFailedIndicesAutoFollowed, + numberOfFailedFollowIndices, numberOfFailedRemoteClusterStateRequests, - numberOfSuccessfulIndicesAutoFollowed, + numberOfSuccessfulFollowIndices, /* * ElasticsearchException does not implement hash code so we will compute the hash code based on the key set and the * messages. Note that we are relying on the fact that the auto follow exceptions are ordered by keys. @@ -186,9 +185,9 @@ private static List getFetchExceptionMessages(final AutoFollowStats stat @Override public String toString() { return "AutoFollowStats{" + - "numberOfFailedIndicesAutoFollowed=" + numberOfFailedIndicesAutoFollowed + + "numberOfFailedFollowIndices=" + numberOfFailedFollowIndices + ", numberOfFailedRemoteClusterStateRequests=" + numberOfFailedRemoteClusterStateRequests + - ", numberOfSuccessfulIndicesAutoFollowed=" + numberOfSuccessfulIndicesAutoFollowed + + ", numberOfSuccessfulFollowIndices=" + numberOfSuccessfulFollowIndices + ", recentAutoFollowErrors=" + recentAutoFollowErrors + '}'; } From f501c10c698bc5eb77ddcdaef8640531e268d6f7 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 17 Sep 2018 20:44:55 +0200 Subject: [PATCH 6/7] split up handleClusterAlias() method --- .../ccr/action/AutoFollowCoordinator.java | 115 ++++++++---------- 1 file changed, 50 insertions(+), 65 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 853b30b96da6a..a1545a38f42f8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -252,13 +252,19 @@ void autoFollowIndices() { final int slot = i; final String clusterAlias = entry.getKey(); final AutoFollowPattern autoFollowPattern = entry.getValue(); - final List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias); getLeaderClusterState(autoFollowPattern.getHeaders(), clusterAlias, (leaderClusterState, e) -> { if (leaderClusterState != null) { assert e == null; - Consumer resultHandler = result -> finalise(slot, result); - handleClusterAlias(clusterAlias, autoFollowPattern, followedIndices, leaderClusterState, resultHandler); + final List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias); + final List leaderIndicesToFollow = + getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, followerClusterState, followedIndices); + if (leaderIndicesToFollow.isEmpty()) { + finalise(slot, new AutoFollowResult(clusterAlias)); + }else { + Consumer resultHandler = result -> finalise(slot, result); + checkAutoFollowPattern(clusterAlias, autoFollowPattern, leaderIndicesToFollow, resultHandler); + } } else { finalise(slot, new AutoFollowResult(clusterAlias, e)); } @@ -267,71 +273,50 @@ void autoFollowIndices() { } } - private void handleClusterAlias( - String clusterAlias, - AutoFollowPattern autoFollowPattern, - List followedIndexUUIDs, - ClusterState leaderClusterState, - Consumer resultHandler - ) { - final List leaderIndicesToFollow = - getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, followerClusterState, followedIndexUUIDs); - if (leaderIndicesToFollow.isEmpty()) { - resultHandler.accept(new AutoFollowResult(clusterAlias)); - } else { - final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size()); - final AtomicArray> results = new AtomicArray<>(leaderIndicesToFollow.size()); - for (int i = 0; i < leaderIndicesToFollow.size(); i++) { - final int slot = i; - final Index indexToFollow = leaderIndicesToFollow.get(i); - final String leaderIndexName = indexToFollow.getName(); - final String followIndexName = getFollowerIndexName(autoFollowPattern, leaderIndexName); - - String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName : - clusterAlias + ":" + leaderIndexName; - FollowIndexAction.Request followRequest = - new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName, - autoFollowPattern.getMaxBatchOperationCount(), autoFollowPattern.getMaxConcurrentReadBatches(), - autoFollowPattern.getMaxOperationSizeInBytes(), autoFollowPattern.getMaxConcurrentWriteBatches(), - autoFollowPattern.getMaxWriteBufferSize(), autoFollowPattern.getMaxRetryDelay(), - autoFollowPattern.getIdleShardRetryDelay()); - - // Execute if the create and follow api call succeeds: - Runnable successHandler = () -> { - LOGGER.info("Auto followed leader index [{}] as follow index [{}]", leaderIndexName, followIndexName); - - // This function updates the auto follow metadata in the cluster to record that the leader index has been followed: - // (so that we do not try to follow it in subsequent auto follow runs) - Function function = recordLeaderIndexAsFollowFunction(clusterAlias, indexToFollow); - // The coordinator always runs on the elected master node, so we can update cluster state here: - updateAutoFollowMetadata(function, updateError -> { - assert results.get(slot) == null; - if (updateError != null) { - LOGGER.error("Failed to mark leader index [" + leaderIndexName + "] as auto followed", updateError); - results.set(slot, new Tuple<>(indexToFollow, updateError)); - } else { - results.set(slot, new Tuple<>(indexToFollow, null)); - LOGGER.debug("Successfully marked leader index [{}] as auto followed", leaderIndexName); - } - if (leaderIndicesCountDown.countDown()) { - resultHandler.accept(new AutoFollowResult(clusterAlias, results.asList())); - } - }); - }; - // Execute if the create and follow apu call fails: - Consumer failureHandler = followError -> { - assert followError != null; - LOGGER.warn("Failed to auto follow leader index [" + leaderIndexName + "]", followError); - results.set(slot, new Tuple<>(indexToFollow, followError)); - if (leaderIndicesCountDown.countDown()) { - resultHandler.accept(new AutoFollowResult(clusterAlias, results.asList())); - } - }; - createAndFollow(autoFollowPattern.getHeaders(), followRequest, successHandler, failureHandler); - } + private void checkAutoFollowPattern(String clusterAlias, AutoFollowPattern autoFollowPattern, + List leaderIndicesToFollow, Consumer resultHandler) { + + final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size()); + final AtomicArray> results = new AtomicArray<>(leaderIndicesToFollow.size()); + for (int i = 0; i < leaderIndicesToFollow.size(); i++) { + final Index indexToFollow = leaderIndicesToFollow.get(i); + final int slot = i; + followLeaderIndex(clusterAlias, indexToFollow, autoFollowPattern, error -> { + results.set(slot, new Tuple<>(indexToFollow, error)); + if (leaderIndicesCountDown.countDown()) { + resultHandler.accept(new AutoFollowResult(clusterAlias, results.asList())); + } + }); } } + private void followLeaderIndex(String clusterAlias, Index indexToFollow, + AutoFollowPattern pattern, Consumer onResult) { + final String leaderIndexName = indexToFollow.getName(); + final String followIndexName = getFollowerIndexName(pattern, leaderIndexName); + + String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName : + clusterAlias + ":" + leaderIndexName; + FollowIndexAction.Request request = + new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName, + pattern.getMaxBatchOperationCount(), pattern.getMaxConcurrentReadBatches(), + pattern.getMaxOperationSizeInBytes(), pattern.getMaxConcurrentWriteBatches(), + pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(), + pattern.getIdleShardRetryDelay()); + + // Execute if the create and follow api call succeeds: + Runnable successHandler = () -> { + LOGGER.info("Auto followed leader index [{}] as follow index [{}]", leaderIndexName, followIndexName); + + // This function updates the auto follow metadata in the cluster to record that the leader index has been followed: + // (so that we do not try to follow it in subsequent auto follow runs) + Function function = recordLeaderIndexAsFollowFunction(clusterAlias, indexToFollow); + // The coordinator always runs on the elected master node, so we can update cluster state here: + updateAutoFollowMetadata(function, onResult); + }; + createAndFollow(pattern.getHeaders(), request, successHandler, onResult); + } + private void finalise(int slot, AutoFollowResult result) { assert autoFollowResults.get(slot) == null; autoFollowResults.set(slot, result); From 484e57640fbf6cb0ae299cff340b374d1173e68a Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 18 Sep 2018 07:18:57 +0200 Subject: [PATCH 7/7] added space --- .../elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index a1545a38f42f8..3a524e5724980 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -261,7 +261,7 @@ void autoFollowIndices() { getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, followerClusterState, followedIndices); if (leaderIndicesToFollow.isEmpty()) { finalise(slot, new AutoFollowResult(clusterAlias)); - }else { + } else { Consumer resultHandler = result -> finalise(slot, result); checkAutoFollowPattern(clusterAlias, autoFollowPattern, leaderIndicesToFollow, resultHandler); }