Skip to content

Commit

Permalink
[CCR] Changed AutoFollowCoordinator to keep track of certain statistics
Browse files Browse the repository at this point in the history
The following stats are being kept track of:
1) The total number of times that auto following a leader index succeed.
2) The total number of times that auto following a leader index failed.
3) The total number of times that fetching a remote cluster state failed.
4) The most recent 256 auto follow failures per auto leader index
   (e.g. create_and_follow api call fails) or cluster alias
   (e.g. fetching remote cluster state fails).

Each auto follow run now produces a result that is being used to update
the stats being kept track of in AutoFollowCoordinator.

Relates to elastic#33007
  • Loading branch information
martijnvg committed Sep 13, 2018
1 parent a69ae6b commit e3fbd10
Show file tree
Hide file tree
Showing 6 changed files with 494 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testAutoFollow() throws Exception {
while (it.hasNext()) {
final String line = it.next();
if (line.matches(".*\\[WARN\\s*\\]\\[o\\.e\\.x\\.c\\.a\\.AutoFollowCoordinator\\s*\\] \\[node-0\\] " +
"failure occurred during auto-follower coordination")) {
"failure occurred while fetching cluster state in leader cluster \\[leader_cluster\\]")) {
warn = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public final class CcrLicenseChecker {
*
* @param isCcrAllowed a boolean supplier that should return true if CCR is allowed and false otherwise
*/
CcrLicenseChecker(final BooleanSupplier isCcrAllowed) {
public CcrLicenseChecker(final BooleanSupplier isCcrAllowed) {
this.isCcrAllowed = Objects.requireNonNull(isCcrAllowed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.client.Client;
Expand All @@ -17,8 +19,10 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.license.LicenseUtils;
Expand All @@ -27,15 +31,18 @@
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.TreeMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -47,6 +54,7 @@
public class AutoFollowCoordinator implements ClusterStateApplier {

private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class);
private static final int MAX_AUTO_FOLLOW_ERRORS = 256;

private final Client client;
private final TimeValue pollInterval;
Expand All @@ -56,6 +64,12 @@ public class AutoFollowCoordinator implements ClusterStateApplier {

private volatile boolean localNodeMaster = false;

// The following fields are read and updated under a lock:
private long numberOfSuccessfulIndicesAutoFollowed = 0;
private long numberOfFailedIndicesAutoFollowed = 0;
private long numberOfFailedRemoteClusterStateRequests = 0;
private final LinkedHashMap<String, ElasticsearchException> recentAutoFollowErrors;

public AutoFollowCoordinator(
Settings settings,
Client client,
Expand All @@ -69,6 +83,43 @@ public AutoFollowCoordinator(

this.pollInterval = CcrSettings.CCR_AUTO_FOLLOW_POLL_INTERVAL.get(settings);
clusterService.addStateApplier(this);

this.recentAutoFollowErrors = new LinkedHashMap<String, ElasticsearchException>() {
@Override
protected boolean removeEldestEntry(final Map.Entry<String, ElasticsearchException> eldest) {
return size() > MAX_AUTO_FOLLOW_ERRORS;
}
};
}

public synchronized AutoFollowStats getStats() {
return new AutoFollowStats(
numberOfFailedIndicesAutoFollowed,
numberOfFailedRemoteClusterStateRequests,
numberOfSuccessfulIndicesAutoFollowed,
new TreeMap<>(recentAutoFollowErrors)
);
}

synchronized void updateStats(List<AutoFollowResult> results) {
for (AutoFollowResult result : results) {
if (result.clusterStateFetchException != null) {
recentAutoFollowErrors.put(result.clusterAlias,
new ElasticsearchException(result.clusterStateFetchException));
numberOfFailedRemoteClusterStateRequests++;
} else {
for (Map.Entry<Index, Exception> entry : result.autoFollowExecutionResults.entrySet()) {
if (entry.getValue() != null) {
numberOfFailedIndicesAutoFollowed++;
recentAutoFollowErrors.put(result.clusterAlias + ":" + entry.getKey().getName(),
new ElasticsearchException(entry.getValue()));
} else {
numberOfSuccessfulIndicesAutoFollowed++;
}
}
}

}
}

private void doAutoFollow() {
Expand All @@ -94,10 +145,20 @@ private void doAutoFollow() {
return;
}

Consumer<Exception> handler = e -> {
if (e != null) {
LOGGER.warn("failure occurred during auto-follower coordination", e);
Consumer<List<AutoFollowResult>> handler = results -> {
for (AutoFollowResult result : results) {
if (result.clusterStateFetchException != null) {
LOGGER.warn(new ParameterizedMessage("failure occurred while fetching cluster state in leader cluster [{}]",
result.clusterAlias), result.clusterStateFetchException);
}
for (Map.Entry<Index, Exception> entry : result.autoFollowExecutionResults.entrySet()) {
if (entry.getValue() != null) {
LOGGER.warn(new ParameterizedMessage("failure occurred while auto following index [{}] in leader cluster [{}]",
entry.getKey(), result.clusterAlias), entry.getValue());
}
}
}
updateStats(results);
threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow);
};
AutoFollower operation = new AutoFollower(handler, followerClusterState) {
Expand Down Expand Up @@ -168,47 +229,58 @@ public void applyClusterState(ClusterChangedEvent event) {

abstract static class AutoFollower {

private final Consumer<Exception> handler;
private final Consumer<List<AutoFollowResult>> handler;
private final ClusterState followerClusterState;
private final AutoFollowMetadata autoFollowMetadata;

private final CountDown autoFollowPatternsCountDown;
private final AtomicReference<Exception> autoFollowPatternsErrorHolder = new AtomicReference<>();
private final AtomicArray<AutoFollowResult> clusterAliasResults;

AutoFollower(final Consumer<Exception> handler, final ClusterState followerClusterState) {
AutoFollower(final Consumer<List<AutoFollowResult>> handler, final ClusterState followerClusterState) {
this.handler = handler;
this.followerClusterState = followerClusterState;
this.autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE);
this.autoFollowPatternsCountDown = new CountDown(autoFollowMetadata.getPatterns().size());
this.clusterAliasResults = new AtomicArray<>(autoFollowMetadata.getPatterns().size());
}

void autoFollowIndices() {
int i = 0;
for (Map.Entry<String, AutoFollowPattern> entry : autoFollowMetadata.getPatterns().entrySet()) {
String clusterAlias = entry.getKey();
AutoFollowPattern autoFollowPattern = entry.getValue();
List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias);
final int slot = i;
final String clusterAlias = entry.getKey();
final AutoFollowPattern autoFollowPattern = entry.getValue();
final List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias);

getLeaderClusterState(clusterAlias, (leaderClusterState, e) -> {
if (leaderClusterState != null) {
assert e == null;
handleClusterAlias(clusterAlias, autoFollowPattern, followedIndices, leaderClusterState);
handleClusterAlias(slot, clusterAlias, autoFollowPattern, followedIndices, leaderClusterState);
} else {
finalise(e);
finalise(slot, new AutoFollowResult(clusterAlias, e));
}
});
i++;
}
}

private void handleClusterAlias(String clusterAlias, AutoFollowPattern autoFollowPattern,
List<String> followedIndexUUIDs, ClusterState leaderClusterState) {
private void handleClusterAlias(
int clusterAliasSlot,
String clusterAlias,
AutoFollowPattern autoFollowPattern,
List<String> followedIndexUUIDs,
ClusterState leaderClusterState
) {
final List<Index> leaderIndicesToFollow =
getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, followerClusterState, followedIndexUUIDs);
if (leaderIndicesToFollow.isEmpty()) {
finalise(null);
finalise(clusterAliasSlot, new AutoFollowResult(clusterAlias));
} else {
final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size());
final AtomicReference<Exception> leaderIndicesErrorHolder = new AtomicReference<>();
for (Index indexToFollow : leaderIndicesToFollow) {
final AtomicArray<Tuple<Index, Exception>> results = new AtomicArray<>(leaderIndicesToFollow.size());
for (int i = 0; i < leaderIndicesToFollow.size(); i++) {
final int slot = i;
final Index indexToFollow = leaderIndicesToFollow.get(i);
final String leaderIndexName = indexToFollow.getName();
final String followIndexName = getFollowerIndexName(autoFollowPattern, leaderIndexName);

Expand All @@ -232,37 +304,34 @@ private void handleClusterAlias(String clusterAlias, AutoFollowPattern autoFollo
updateAutoFollowMetadata(function, updateError -> {
if (updateError != null) {
LOGGER.error("Failed to mark leader index [" + leaderIndexName + "] as auto followed", updateError);
if (leaderIndicesErrorHolder.compareAndSet(null, updateError) == false) {
leaderIndicesErrorHolder.get().addSuppressed(updateError);
}
results.set(slot, new Tuple<>(indexToFollow, updateError));
} else {
results.set(slot, new Tuple<>(indexToFollow, null));
LOGGER.debug("Successfully marked leader index [{}] as auto followed", leaderIndexName);
}
if (leaderIndicesCountDown.countDown()) {
finalise(leaderIndicesErrorHolder.get());
finalise(clusterAliasSlot, new AutoFollowResult(clusterAlias, results.asList()));
}
});
};
// Execute if the create and follow apu call fails:
Consumer<Exception> failureHandler = followError -> {
assert followError != null;
LOGGER.warn("Failed to auto follow leader index [" + leaderIndexName + "]", followError);
results.set(slot, new Tuple<>(indexToFollow, followError));
if (leaderIndicesCountDown.countDown()) {
finalise(followError);
finalise(clusterAliasSlot, new AutoFollowResult(clusterAlias, results.asList()));
}
};
createAndFollow(followRequest, successHandler, failureHandler);
}
}
}

private void finalise(Exception failure) {
if (autoFollowPatternsErrorHolder.compareAndSet(null, failure) == false) {
autoFollowPatternsErrorHolder.get().addSuppressed(failure);
}

private void finalise(int slot, AutoFollowResult result) {
clusterAliasResults.set(slot, result);
if (autoFollowPatternsCountDown.countDown()) {
handler.accept(autoFollowPatternsErrorHolder.get());
handler.accept(clusterAliasResults.asList());
}
}

Expand Down Expand Up @@ -324,4 +393,35 @@ static Function<ClusterState, ClusterState> recordLeaderIndexAsFollowFunction(St
abstract void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler);

}

static class AutoFollowResult {

final String clusterAlias;
final Exception clusterStateFetchException;
final Map<Index, Exception> autoFollowExecutionResults;

AutoFollowResult(String clusterAlias, List<Tuple<Index, Exception>> results) {
this.clusterAlias = clusterAlias;

Map<Index, Exception> autoFollowExecutionResults = new HashMap<>();
for (Tuple<Index, Exception> result : results) {
autoFollowExecutionResults.put(result.v1(), result.v2());
}

this.clusterStateFetchException = null;
this.autoFollowExecutionResults = Collections.unmodifiableMap(autoFollowExecutionResults);
}

AutoFollowResult(String clusterAlias, Exception e) {
this.clusterAlias = clusterAlias;
this.clusterStateFetchException = e;
this.autoFollowExecutionResults = Collections.emptyMap();
}

AutoFollowResult(String clusterAlias) {
this.clusterAlias = clusterAlias;
this.clusterStateFetchException = null;
this.autoFollowExecutionResults = Collections.emptyMap();
}
}
}
Loading

0 comments on commit e3fbd10

Please sign in to comment.