From 59100c46ec22f12b3dc9fd4f0b2257a0436ecce2 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 18 Sep 2018 09:43:50 +0200 Subject: [PATCH] [CCR] Changed AutoFollowCoordinator to keep track of certain statistics (#33684) The following stats are being kept track of: 1) The total number of times that auto following a leader index succeed. 2) The total number of times that auto following a leader index failed. 3) The total number of times that fetching a remote cluster state failed. 4) The most recent 256 auto follow failures per auto leader index (e.g. create_and_follow api call fails) or cluster alias (e.g. fetching remote cluster state fails). Each auto follow run now produces a result that is being used to update the stats being kept track of in AutoFollowCoordinator. Relates to #33007 --- .../xpack/ccr/CcrMultiClusterLicenseIT.java | 2 +- .../xpack/ccr/CcrLicenseChecker.java | 2 +- .../ccr/action/AutoFollowCoordinator.java | 222 ++++++++++++------ .../action/AutoFollowCoordinatorTests.java | 101 +++++++- .../ccr/action/AutoFollowStatsTests.java | 77 ++++++ .../xpack/core/ccr/AutoFollowStats.java | 194 +++++++++++++++ 6 files changed, 516 insertions(+), 82 deletions(-) create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsTests.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java index 7bc952a3ea8e8..505683b892ca8 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java @@ -64,7 +64,7 @@ public void testAutoFollow() throws Exception { while (it.hasNext()) { final String line = it.next(); if (line.matches(".*\\[WARN\\s*\\]\\[o\\.e\\.x\\.c\\.a\\.AutoFollowCoordinator\\s*\\] \\[node-0\\] " + - "failure occurred during auto-follower coordination")) { + "failure occurred while fetching cluster state in leader cluster \\[leader_cluster\\]")) { warn = true; break; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index b6c6885b51834..be846fec40802 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -65,7 +65,7 @@ public final class CcrLicenseChecker { * * @param isCcrAllowed a boolean supplier that should return true if CCR is allowed and false otherwise */ - CcrLicenseChecker(final BooleanSupplier isCcrAllowed) { + public CcrLicenseChecker(final BooleanSupplier isCcrAllowed) { this.isCcrAllowed = Objects.requireNonNull(isCcrAllowed); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 180e5e3799098..3a524e5724980 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -7,6 +7,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.client.Client; @@ -17,8 +19,10 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.license.LicenseUtils; @@ -27,15 +31,18 @@ import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; +import org.elasticsearch.xpack.core.ccr.AutoFollowStats; import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction; import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; +import java.util.TreeMap; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -47,6 +54,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier { private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class); + private static final int MAX_AUTO_FOLLOW_ERRORS = 256; private final Client client; private final TimeValue pollInterval; @@ -56,6 +64,12 @@ public class AutoFollowCoordinator implements ClusterStateApplier { private volatile boolean localNodeMaster = false; + // The following fields are read and updated under a lock: + private long numberOfSuccessfulIndicesAutoFollowed = 0; + private long numberOfFailedIndicesAutoFollowed = 0; + private long numberOfFailedRemoteClusterStateRequests = 0; + private final LinkedHashMap recentAutoFollowErrors; + public AutoFollowCoordinator( Settings settings, Client client, @@ -69,6 +83,47 @@ public AutoFollowCoordinator( this.pollInterval = CcrSettings.CCR_AUTO_FOLLOW_POLL_INTERVAL.get(settings); clusterService.addStateApplier(this); + + this.recentAutoFollowErrors = new LinkedHashMap() { + @Override + protected boolean removeEldestEntry(final Map.Entry eldest) { + return size() > MAX_AUTO_FOLLOW_ERRORS; + } + }; + } + + public synchronized AutoFollowStats getStats() { + return new AutoFollowStats( + numberOfFailedIndicesAutoFollowed, + numberOfFailedRemoteClusterStateRequests, + numberOfSuccessfulIndicesAutoFollowed, + new TreeMap<>(recentAutoFollowErrors) + ); + } + + synchronized void updateStats(List results) { + for (AutoFollowResult result : results) { + if (result.clusterStateFetchException != null) { + recentAutoFollowErrors.put(result.clusterAlias, + new ElasticsearchException(result.clusterStateFetchException)); + numberOfFailedRemoteClusterStateRequests++; + LOGGER.warn(new ParameterizedMessage("failure occurred while fetching cluster state in leader cluster [{}]", + result.clusterAlias), result.clusterStateFetchException); + } else { + for (Map.Entry entry : result.autoFollowExecutionResults.entrySet()) { + if (entry.getValue() != null) { + numberOfFailedIndicesAutoFollowed++; + recentAutoFollowErrors.put(result.clusterAlias + ":" + entry.getKey().getName(), + new ElasticsearchException(entry.getValue())); + LOGGER.warn(new ParameterizedMessage("failure occurred while auto following index [{}] in leader cluster [{}]", + entry.getKey(), result.clusterAlias), entry.getValue()); + } else { + numberOfSuccessfulIndicesAutoFollowed++; + } + } + } + + } } private void doAutoFollow() { @@ -94,10 +149,8 @@ private void doAutoFollow() { return; } - Consumer handler = e -> { - if (e != null) { - LOGGER.warn("failure occurred during auto-follower coordination", e); - } + Consumer> handler = results -> { + updateStats(results); threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); }; AutoFollower operation = new AutoFollower(handler, followerClusterState) { @@ -178,101 +231,97 @@ public void applyClusterState(ClusterChangedEvent event) { abstract static class AutoFollower { - private final Consumer handler; + private final Consumer> handler; private final ClusterState followerClusterState; private final AutoFollowMetadata autoFollowMetadata; private final CountDown autoFollowPatternsCountDown; - private final AtomicReference autoFollowPatternsErrorHolder = new AtomicReference<>(); + private final AtomicArray autoFollowResults; - AutoFollower(final Consumer handler, final ClusterState followerClusterState) { + AutoFollower(final Consumer> handler, final ClusterState followerClusterState) { this.handler = handler; this.followerClusterState = followerClusterState; this.autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); this.autoFollowPatternsCountDown = new CountDown(autoFollowMetadata.getPatterns().size()); + this.autoFollowResults = new AtomicArray<>(autoFollowMetadata.getPatterns().size()); } void autoFollowIndices() { + int i = 0; for (Map.Entry entry : autoFollowMetadata.getPatterns().entrySet()) { - String clusterAlias = entry.getKey(); - AutoFollowPattern autoFollowPattern = entry.getValue(); - List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias); + final int slot = i; + final String clusterAlias = entry.getKey(); + final AutoFollowPattern autoFollowPattern = entry.getValue(); getLeaderClusterState(autoFollowPattern.getHeaders(), clusterAlias, (leaderClusterState, e) -> { if (leaderClusterState != null) { assert e == null; - handleClusterAlias(clusterAlias, autoFollowPattern, followedIndices, leaderClusterState); + final List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias); + final List leaderIndicesToFollow = + getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, followerClusterState, followedIndices); + if (leaderIndicesToFollow.isEmpty()) { + finalise(slot, new AutoFollowResult(clusterAlias)); + } else { + Consumer resultHandler = result -> finalise(slot, result); + checkAutoFollowPattern(clusterAlias, autoFollowPattern, leaderIndicesToFollow, resultHandler); + } } else { - finalise(e); + finalise(slot, new AutoFollowResult(clusterAlias, e)); } }); + i++; } } - private void handleClusterAlias(String clusterAlias, AutoFollowPattern autoFollowPattern, - List followedIndexUUIDs, ClusterState leaderClusterState) { - final List leaderIndicesToFollow = - getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, followerClusterState, followedIndexUUIDs); - if (leaderIndicesToFollow.isEmpty()) { - finalise(null); - } else { - final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size()); - final AtomicReference leaderIndicesErrorHolder = new AtomicReference<>(); - for (Index indexToFollow : leaderIndicesToFollow) { - final String leaderIndexName = indexToFollow.getName(); - final String followIndexName = getFollowerIndexName(autoFollowPattern, leaderIndexName); - - String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName : - clusterAlias + ":" + leaderIndexName; - FollowIndexAction.Request followRequest = - new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName, - autoFollowPattern.getMaxBatchOperationCount(), autoFollowPattern.getMaxConcurrentReadBatches(), - autoFollowPattern.getMaxOperationSizeInBytes(), autoFollowPattern.getMaxConcurrentWriteBatches(), - autoFollowPattern.getMaxWriteBufferSize(), autoFollowPattern.getMaxRetryDelay(), - autoFollowPattern.getIdleShardRetryDelay()); - - // Execute if the create and follow api call succeeds: - Runnable successHandler = () -> { - LOGGER.info("Auto followed leader index [{}] as follow index [{}]", leaderIndexName, followIndexName); - - // This function updates the auto follow metadata in the cluster to record that the leader index has been followed: - // (so that we do not try to follow it in subsequent auto follow runs) - Function function = recordLeaderIndexAsFollowFunction(clusterAlias, indexToFollow); - // The coordinator always runs on the elected master node, so we can update cluster state here: - updateAutoFollowMetadata(function, updateError -> { - if (updateError != null) { - LOGGER.error("Failed to mark leader index [" + leaderIndexName + "] as auto followed", updateError); - if (leaderIndicesErrorHolder.compareAndSet(null, updateError) == false) { - leaderIndicesErrorHolder.get().addSuppressed(updateError); - } - } else { - LOGGER.debug("Successfully marked leader index [{}] as auto followed", leaderIndexName); - } - if (leaderIndicesCountDown.countDown()) { - finalise(leaderIndicesErrorHolder.get()); - } - }); - }; - // Execute if the create and follow apu call fails: - Consumer failureHandler = followError -> { - assert followError != null; - LOGGER.warn("Failed to auto follow leader index [" + leaderIndexName + "]", followError); - if (leaderIndicesCountDown.countDown()) { - finalise(followError); - } - }; - createAndFollow(autoFollowPattern.getHeaders(), followRequest, successHandler, failureHandler); - } + private void checkAutoFollowPattern(String clusterAlias, AutoFollowPattern autoFollowPattern, + List leaderIndicesToFollow, Consumer resultHandler) { + + final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size()); + final AtomicArray> results = new AtomicArray<>(leaderIndicesToFollow.size()); + for (int i = 0; i < leaderIndicesToFollow.size(); i++) { + final Index indexToFollow = leaderIndicesToFollow.get(i); + final int slot = i; + followLeaderIndex(clusterAlias, indexToFollow, autoFollowPattern, error -> { + results.set(slot, new Tuple<>(indexToFollow, error)); + if (leaderIndicesCountDown.countDown()) { + resultHandler.accept(new AutoFollowResult(clusterAlias, results.asList())); + } + }); } } - private void finalise(Exception failure) { - if (autoFollowPatternsErrorHolder.compareAndSet(null, failure) == false) { - autoFollowPatternsErrorHolder.get().addSuppressed(failure); - } + private void followLeaderIndex(String clusterAlias, Index indexToFollow, + AutoFollowPattern pattern, Consumer onResult) { + final String leaderIndexName = indexToFollow.getName(); + final String followIndexName = getFollowerIndexName(pattern, leaderIndexName); + + String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName : + clusterAlias + ":" + leaderIndexName; + FollowIndexAction.Request request = + new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName, + pattern.getMaxBatchOperationCount(), pattern.getMaxConcurrentReadBatches(), + pattern.getMaxOperationSizeInBytes(), pattern.getMaxConcurrentWriteBatches(), + pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(), + pattern.getIdleShardRetryDelay()); + + // Execute if the create and follow api call succeeds: + Runnable successHandler = () -> { + LOGGER.info("Auto followed leader index [{}] as follow index [{}]", leaderIndexName, followIndexName); + + // This function updates the auto follow metadata in the cluster to record that the leader index has been followed: + // (so that we do not try to follow it in subsequent auto follow runs) + Function function = recordLeaderIndexAsFollowFunction(clusterAlias, indexToFollow); + // The coordinator always runs on the elected master node, so we can update cluster state here: + updateAutoFollowMetadata(function, onResult); + }; + createAndFollow(pattern.getHeaders(), request, successHandler, onResult); + } + private void finalise(int slot, AutoFollowResult result) { + assert autoFollowResults.get(slot) == null; + autoFollowResults.set(slot, result); if (autoFollowPatternsCountDown.countDown()) { - handler.accept(autoFollowPatternsErrorHolder.get()); + handler.accept(autoFollowResults.asList()); } } @@ -347,4 +396,33 @@ abstract void updateAutoFollowMetadata( ); } + + static class AutoFollowResult { + + final String clusterAlias; + final Exception clusterStateFetchException; + final Map autoFollowExecutionResults; + + AutoFollowResult(String clusterAlias, List> results) { + this.clusterAlias = clusterAlias; + + Map autoFollowExecutionResults = new HashMap<>(); + for (Tuple result : results) { + autoFollowExecutionResults.put(result.v1(), result.v2()); + } + + this.clusterStateFetchException = null; + this.autoFollowExecutionResults = Collections.unmodifiableMap(autoFollowExecutionResults); + } + + AutoFollowResult(String clusterAlias, Exception e) { + this.clusterAlias = clusterAlias; + this.clusterStateFetchException = e; + this.autoFollowExecutionResults = Collections.emptyMap(); + } + + AutoFollowResult(String clusterAlias) { + this(clusterAlias, (Exception) null); + } + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 31af326250c3b..218825e41207b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -11,15 +11,20 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; +import org.elasticsearch.xpack.core.ccr.AutoFollowStats; import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -63,9 +68,15 @@ public void testAutoFollower() { .build(); boolean[] invoked = new boolean[]{false}; - Consumer handler = e -> { + Consumer> handler = results -> { invoked[0] = true; - assertThat(e, nullValue()); + + assertThat(results.size(), equalTo(1)); + assertThat(results.get(0).clusterStateFetchException, nullValue()); + List> entries = new ArrayList<>(results.get(0).autoFollowExecutionResults.entrySet()); + assertThat(entries.size(), equalTo(1)); + assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); + assertThat(entries.get(0).getValue(), nullValue()); }; AutoFollower autoFollower = new AutoFollower(handler, currentState) { @Override @@ -116,9 +127,12 @@ public void testAutoFollowerClusterStateApiFailure() { Exception failure = new RuntimeException("failure"); boolean[] invoked = new boolean[]{false}; - Consumer handler = e -> { + Consumer> handler = results -> { invoked[0] = true; - assertThat(e, sameInstance(failure)); + + assertThat(results.size(), equalTo(1)); + assertThat(results.get(0).clusterStateFetchException, sameInstance(failure)); + assertThat(results.get(0).autoFollowExecutionResults.entrySet().size(), equalTo(0)); }; AutoFollower autoFollower = new AutoFollower(handler, followerState) { @Override @@ -170,9 +184,15 @@ public void testAutoFollowerUpdateClusterStateFailure() { Exception failure = new RuntimeException("failure"); boolean[] invoked = new boolean[]{false}; - Consumer handler = e -> { + Consumer> handler = results -> { invoked[0] = true; - assertThat(e, sameInstance(failure)); + + assertThat(results.size(), equalTo(1)); + assertThat(results.get(0).clusterStateFetchException, nullValue()); + List> entries = new ArrayList<>(results.get(0).autoFollowExecutionResults.entrySet()); + assertThat(entries.size(), equalTo(1)); + assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); + assertThat(entries.get(0).getValue(), sameInstance(failure)); }; AutoFollower autoFollower = new AutoFollower(handler, followerState) { @Override @@ -225,9 +245,15 @@ public void testAutoFollowerCreateAndFollowApiCallFailure() { Exception failure = new RuntimeException("failure"); boolean[] invoked = new boolean[]{false}; - Consumer handler = e -> { + Consumer> handler = results -> { invoked[0] = true; - assertThat(e, sameInstance(failure)); + + assertThat(results.size(), equalTo(1)); + assertThat(results.get(0).clusterStateFetchException, nullValue()); + List> entries = new ArrayList<>(results.get(0).autoFollowExecutionResults.entrySet()); + assertThat(entries.size(), equalTo(1)); + assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); + assertThat(entries.get(0).getValue(), sameInstance(failure)); }; AutoFollower autoFollower = new AutoFollower(handler, followerState) { @Override @@ -317,4 +343,63 @@ public void testGetFollowerIndexName() { assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); } + public void testStats() { + AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + Settings.EMPTY, + null, + null, + mock(ClusterService.class), + new CcrLicenseChecker(() -> true) + ); + + autoFollowCoordinator.updateStats(Collections.singletonList( + new AutoFollowCoordinator.AutoFollowResult("_alias1")) + ); + AutoFollowStats autoFollowStats = autoFollowCoordinator.getStats(); + assertThat(autoFollowStats.getNumberOfFailedFollowIndices(), equalTo(0L)); + assertThat(autoFollowStats.getNumberOfFailedRemoteClusterStateRequests(), equalTo(0L)); + assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(0L)); + assertThat(autoFollowStats.getRecentAutoFollowErrors().size(), equalTo(0)); + + autoFollowCoordinator.updateStats(Collections.singletonList( + new AutoFollowCoordinator.AutoFollowResult("_alias1", new RuntimeException("error"))) + ); + autoFollowStats = autoFollowCoordinator.getStats(); + assertThat(autoFollowStats.getNumberOfFailedFollowIndices(), equalTo(0L)); + assertThat(autoFollowStats.getNumberOfFailedRemoteClusterStateRequests(), equalTo(1L)); + assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(0L)); + assertThat(autoFollowStats.getRecentAutoFollowErrors().size(), equalTo(1)); + assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1").getCause().getMessage(), equalTo("error")); + + autoFollowCoordinator.updateStats(Arrays.asList( + new AutoFollowCoordinator.AutoFollowResult("_alias1", + Collections.singletonList(Tuple.tuple(new Index("index1", "_na_"), new RuntimeException("error")))), + new AutoFollowCoordinator.AutoFollowResult("_alias2", + Collections.singletonList(Tuple.tuple(new Index("index2", "_na_"), new RuntimeException("error")))) + )); + autoFollowStats = autoFollowCoordinator.getStats(); + assertThat(autoFollowStats.getNumberOfFailedFollowIndices(), equalTo(2L)); + assertThat(autoFollowStats.getNumberOfFailedRemoteClusterStateRequests(), equalTo(1L)); + assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(0L)); + assertThat(autoFollowStats.getRecentAutoFollowErrors().size(), equalTo(3)); + assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1").getCause().getMessage(), equalTo("error")); + assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1:index1").getCause().getMessage(), equalTo("error")); + assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias2:index2").getCause().getMessage(), equalTo("error")); + + autoFollowCoordinator.updateStats(Arrays.asList( + new AutoFollowCoordinator.AutoFollowResult("_alias1", + Collections.singletonList(Tuple.tuple(new Index("index1", "_na_"), null))), + new AutoFollowCoordinator.AutoFollowResult("_alias2", + Collections.singletonList(Tuple.tuple(new Index("index2", "_na_"), null))) + )); + autoFollowStats = autoFollowCoordinator.getStats(); + assertThat(autoFollowStats.getNumberOfFailedFollowIndices(), equalTo(2L)); + assertThat(autoFollowStats.getNumberOfFailedRemoteClusterStateRequests(), equalTo(1L)); + assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(2L)); + assertThat(autoFollowStats.getRecentAutoFollowErrors().size(), equalTo(3)); + assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1").getCause().getMessage(), equalTo("error")); + assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias1:index1").getCause().getMessage(), equalTo("error")); + assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias2:index2").getCause().getMessage(), equalTo("error")); + } + } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsTests.java new file mode 100644 index 0000000000000..b9ee5bf464616 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsTests.java @@ -0,0 +1,77 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.core.ccr.AutoFollowStats; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; + +public class AutoFollowStatsTests extends AbstractSerializingTestCase { + + @Override + protected AutoFollowStats doParseInstance(XContentParser parser) throws IOException { + return AutoFollowStats.fromXContent(parser); + } + + @Override + protected AutoFollowStats createTestInstance() { + return new AutoFollowStats( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomReadExceptions() + ); + } + + private static NavigableMap randomReadExceptions() { + final int count = randomIntBetween(0, 16); + final NavigableMap readExceptions = new TreeMap<>(); + for (int i = 0; i < count; i++) { + readExceptions.put("" + i, new ElasticsearchException(new IllegalStateException("index [" + i + "]"))); + } + return readExceptions; + } + + @Override + protected Writeable.Reader instanceReader() { + return AutoFollowStats::new; + } + + @Override + protected void assertEqualInstances(AutoFollowStats expectedInstance, AutoFollowStats newInstance) { + assertNotSame(expectedInstance, newInstance); + + assertThat(newInstance.getRecentAutoFollowErrors().size(), equalTo(expectedInstance.getRecentAutoFollowErrors().size())); + assertThat(newInstance.getRecentAutoFollowErrors().keySet(), equalTo(expectedInstance.getRecentAutoFollowErrors().keySet())); + for (final Map.Entry entry : newInstance.getRecentAutoFollowErrors().entrySet()) { + // x-content loses the exception + final ElasticsearchException expected = expectedInstance.getRecentAutoFollowErrors().get(entry.getKey()); + assertThat(entry.getValue().getMessage(), containsString(expected.getMessage())); + assertNotNull(entry.getValue().getCause()); + assertThat( + entry.getValue().getCause(), + anyOf(instanceOf(ElasticsearchException.class), instanceOf(IllegalStateException.class))); + assertThat(entry.getValue().getCause().getMessage(), containsString(expected.getCause().getMessage())); + } + } + + @Override + protected boolean assertToXContentEquivalence() { + return false; + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java new file mode 100644 index 0000000000000..7133a201f4e2a --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java @@ -0,0 +1,194 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ccr; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.AbstractMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.TreeMap; +import java.util.stream.Collectors; + +public class AutoFollowStats implements Writeable, ToXContentObject { + + private static final ParseField NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED = new ParseField("number_of_successful_follow_indices"); + private static final ParseField NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED = new ParseField("number_of_failed_follow_indices"); + private static final ParseField NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS = + new ParseField("number_of_failed_remote_cluster_state_requests"); + private static final ParseField RECENT_AUTO_FOLLOW_ERRORS = new ParseField("recent_auto_follow_errors"); + private static final ParseField LEADER_INDEX = new ParseField("leader_index"); + private static final ParseField AUTO_FOLLOW_EXCEPTION = new ParseField("auto_follow_exception"); + + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser STATS_PARSER = new ConstructingObjectParser<>("auto_follow_stats", + args -> new AutoFollowStats( + (Long) args[0], + (Long) args[1], + (Long) args[2], + new TreeMap<>( + ((List>) args[3]) + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) + )); + + private static final ConstructingObjectParser, Void> AUTO_FOLLOW_EXCEPTIONS_PARSER = + new ConstructingObjectParser<>( + "auto_follow_stats_errors", + args -> new AbstractMap.SimpleEntry<>((String) args[0], (ElasticsearchException) args[1])); + + static { + AUTO_FOLLOW_EXCEPTIONS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX); + AUTO_FOLLOW_EXCEPTIONS_PARSER.declareObject( + ConstructingObjectParser.constructorArg(), + (p, c) -> ElasticsearchException.fromXContent(p), + AUTO_FOLLOW_EXCEPTION); + + STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED); + STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS); + STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED); + STATS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), AUTO_FOLLOW_EXCEPTIONS_PARSER, + RECENT_AUTO_FOLLOW_ERRORS); + } + + public static AutoFollowStats fromXContent(final XContentParser parser) { + return STATS_PARSER.apply(parser, null); + } + + private final long numberOfFailedFollowIndices; + private final long numberOfFailedRemoteClusterStateRequests; + private final long numberOfSuccessfulFollowIndices; + private final NavigableMap recentAutoFollowErrors; + + public AutoFollowStats( + long numberOfFailedFollowIndices, + long numberOfFailedRemoteClusterStateRequests, + long numberOfSuccessfulFollowIndices, + NavigableMap recentAutoFollowErrors + ) { + this.numberOfFailedFollowIndices = numberOfFailedFollowIndices; + this.numberOfFailedRemoteClusterStateRequests = numberOfFailedRemoteClusterStateRequests; + this.numberOfSuccessfulFollowIndices = numberOfSuccessfulFollowIndices; + this.recentAutoFollowErrors = recentAutoFollowErrors; + } + + public AutoFollowStats(StreamInput in) throws IOException { + numberOfFailedFollowIndices = in.readVLong(); + numberOfFailedRemoteClusterStateRequests = in.readVLong(); + numberOfSuccessfulFollowIndices = in.readVLong(); + recentAutoFollowErrors= new TreeMap<>(in.readMap(StreamInput::readString, StreamInput::readException)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(numberOfFailedFollowIndices); + out.writeVLong(numberOfFailedRemoteClusterStateRequests); + out.writeVLong(numberOfSuccessfulFollowIndices); + out.writeMap(recentAutoFollowErrors, StreamOutput::writeString, StreamOutput::writeException); + } + + public long getNumberOfFailedFollowIndices() { + return numberOfFailedFollowIndices; + } + + public long getNumberOfFailedRemoteClusterStateRequests() { + return numberOfFailedRemoteClusterStateRequests; + } + + public long getNumberOfSuccessfulFollowIndices() { + return numberOfSuccessfulFollowIndices; + } + + public NavigableMap getRecentAutoFollowErrors() { + return recentAutoFollowErrors; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field(NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED.getPreferredName(), numberOfFailedFollowIndices); + builder.field(NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS.getPreferredName(), numberOfFailedRemoteClusterStateRequests); + builder.field(NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED.getPreferredName(), numberOfSuccessfulFollowIndices); + builder.startArray(RECENT_AUTO_FOLLOW_ERRORS.getPreferredName()); + { + for (final Map.Entry entry : recentAutoFollowErrors.entrySet()) { + builder.startObject(); + { + builder.field(LEADER_INDEX.getPreferredName(), entry.getKey()); + builder.field(AUTO_FOLLOW_EXCEPTION.getPreferredName()); + builder.startObject(); + { + ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue()); + } + builder.endObject(); + } + builder.endObject(); + } + } + builder.endArray(); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AutoFollowStats that = (AutoFollowStats) o; + return numberOfFailedFollowIndices == that.numberOfFailedFollowIndices && + numberOfFailedRemoteClusterStateRequests == that.numberOfFailedRemoteClusterStateRequests && + numberOfSuccessfulFollowIndices == that.numberOfSuccessfulFollowIndices && + /* + * ElasticsearchException does not implement equals so we will assume the fetch exceptions are equal if they are equal + * up to the key set and their messages. Note that we are relying on the fact that the auto follow exceptions are ordered by + * keys. + */ + recentAutoFollowErrors.keySet().equals(that.recentAutoFollowErrors.keySet()) && + getFetchExceptionMessages(this).equals(getFetchExceptionMessages(that)); + } + + @Override + public int hashCode() { + return Objects.hash( + numberOfFailedFollowIndices, + numberOfFailedRemoteClusterStateRequests, + numberOfSuccessfulFollowIndices, + /* + * ElasticsearchException does not implement hash code so we will compute the hash code based on the key set and the + * messages. Note that we are relying on the fact that the auto follow exceptions are ordered by keys. + */ + recentAutoFollowErrors.keySet(), + getFetchExceptionMessages(this) + ); + } + + private static List getFetchExceptionMessages(final AutoFollowStats status) { + return status.getRecentAutoFollowErrors().values().stream().map(ElasticsearchException::getMessage).collect(Collectors.toList()); + } + + @Override + public String toString() { + return "AutoFollowStats{" + + "numberOfFailedFollowIndices=" + numberOfFailedFollowIndices + + ", numberOfFailedRemoteClusterStateRequests=" + numberOfFailedRemoteClusterStateRequests + + ", numberOfSuccessfulFollowIndices=" + numberOfSuccessfulFollowIndices + + ", recentAutoFollowErrors=" + recentAutoFollowErrors + + '}'; + } +}