Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCR] Changed AutoFollowCoordinator to keep track of certain statistics #33684

Merged
merged 10 commits into from
Sep 18, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testAutoFollow() throws Exception {
while (it.hasNext()) {
final String line = it.next();
if (line.matches(".*\\[WARN\\s*\\]\\[o\\.e\\.x\\.c\\.a\\.AutoFollowCoordinator\\s*\\] \\[node-0\\] " +
"failure occurred during auto-follower coordination")) {
"failure occurred while fetching cluster state in leader cluster \\[leader_cluster\\]")) {
warn = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public final class CcrLicenseChecker {
*
* @param isCcrAllowed a boolean supplier that should return true if CCR is allowed and false otherwise
*/
CcrLicenseChecker(final BooleanSupplier isCcrAllowed) {
public CcrLicenseChecker(final BooleanSupplier isCcrAllowed) {
this.isCcrAllowed = Objects.requireNonNull(isCcrAllowed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.client.Client;
Expand All @@ -17,8 +19,10 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.license.LicenseUtils;
Expand All @@ -27,15 +31,18 @@
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.TreeMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -47,6 +54,7 @@
public class AutoFollowCoordinator implements ClusterStateApplier {

private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class);
private static final int MAX_AUTO_FOLLOW_ERRORS = 256;

private final Client client;
private final TimeValue pollInterval;
Expand All @@ -56,6 +64,12 @@ public class AutoFollowCoordinator implements ClusterStateApplier {

private volatile boolean localNodeMaster = false;

// The following fields are read and updated under a lock:
private long numberOfSuccessfulIndicesAutoFollowed = 0;
private long numberOfFailedIndicesAutoFollowed = 0;
private long numberOfFailedRemoteClusterStateRequests = 0;
private final LinkedHashMap<String, ElasticsearchException> recentAutoFollowErrors;

public AutoFollowCoordinator(
Settings settings,
Client client,
Expand All @@ -69,6 +83,47 @@ public AutoFollowCoordinator(

this.pollInterval = CcrSettings.CCR_AUTO_FOLLOW_POLL_INTERVAL.get(settings);
clusterService.addStateApplier(this);

this.recentAutoFollowErrors = new LinkedHashMap<String, ElasticsearchException>() {
@Override
protected boolean removeEldestEntry(final Map.Entry<String, ElasticsearchException> eldest) {
return size() > MAX_AUTO_FOLLOW_ERRORS;
}
};
}

public synchronized AutoFollowStats getStats() {
return new AutoFollowStats(
numberOfFailedIndicesAutoFollowed,
numberOfFailedRemoteClusterStateRequests,
numberOfSuccessfulIndicesAutoFollowed,
new TreeMap<>(recentAutoFollowErrors)
);
}

synchronized void updateStats(List<AutoFollowResult> results) {
for (AutoFollowResult result : results) {
if (result.clusterStateFetchException != null) {
recentAutoFollowErrors.put(result.clusterAlias,
new ElasticsearchException(result.clusterStateFetchException));
numberOfFailedRemoteClusterStateRequests++;
LOGGER.warn(new ParameterizedMessage("failure occurred while fetching cluster state in leader cluster [{}]",
result.clusterAlias), result.clusterStateFetchException);
} else {
for (Map.Entry<Index, Exception> entry : result.autoFollowExecutionResults.entrySet()) {
if (entry.getValue() != null) {
numberOfFailedIndicesAutoFollowed++;
recentAutoFollowErrors.put(result.clusterAlias + ":" + entry.getKey().getName(),
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
new ElasticsearchException(entry.getValue()));
LOGGER.warn(new ParameterizedMessage("failure occurred while auto following index [{}] in leader cluster [{}]",
entry.getKey(), result.clusterAlias), entry.getValue());
} else {
numberOfSuccessfulIndicesAutoFollowed++;
}
}
}

}
}

private void doAutoFollow() {
Expand All @@ -94,10 +149,8 @@ private void doAutoFollow() {
return;
}

Consumer<Exception> handler = e -> {
if (e != null) {
LOGGER.warn("failure occurred during auto-follower coordination", e);
}
Consumer<List<AutoFollowResult>> handler = results -> {
updateStats(results);
threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow);
};
AutoFollower operation = new AutoFollower(handler, followerClusterState) {
Expand Down Expand Up @@ -178,101 +231,97 @@ public void applyClusterState(ClusterChangedEvent event) {

abstract static class AutoFollower {

private final Consumer<Exception> handler;
private final Consumer<List<AutoFollowResult>> handler;
private final ClusterState followerClusterState;
private final AutoFollowMetadata autoFollowMetadata;

private final CountDown autoFollowPatternsCountDown;
private final AtomicReference<Exception> autoFollowPatternsErrorHolder = new AtomicReference<>();
private final AtomicArray<AutoFollowResult> autoFollowResults;

AutoFollower(final Consumer<Exception> handler, final ClusterState followerClusterState) {
AutoFollower(final Consumer<List<AutoFollowResult>> handler, final ClusterState followerClusterState) {
this.handler = handler;
this.followerClusterState = followerClusterState;
this.autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE);
this.autoFollowPatternsCountDown = new CountDown(autoFollowMetadata.getPatterns().size());
this.autoFollowResults = new AtomicArray<>(autoFollowMetadata.getPatterns().size());
}

void autoFollowIndices() {
int i = 0;
for (Map.Entry<String, AutoFollowPattern> entry : autoFollowMetadata.getPatterns().entrySet()) {
String clusterAlias = entry.getKey();
AutoFollowPattern autoFollowPattern = entry.getValue();
List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias);
final int slot = i;
final String clusterAlias = entry.getKey();
final AutoFollowPattern autoFollowPattern = entry.getValue();

getLeaderClusterState(autoFollowPattern.getHeaders(), clusterAlias, (leaderClusterState, e) -> {
if (leaderClusterState != null) {
assert e == null;
handleClusterAlias(clusterAlias, autoFollowPattern, followedIndices, leaderClusterState);
final List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias);
final List<Index> leaderIndicesToFollow =
getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, followerClusterState, followedIndices);
if (leaderIndicesToFollow.isEmpty()) {
finalise(slot, new AutoFollowResult(clusterAlias));
}else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a space before else.

Consumer<AutoFollowResult> resultHandler = result -> finalise(slot, result);
checkAutoFollowPattern(clusterAlias, autoFollowPattern, leaderIndicesToFollow, resultHandler);
}
} else {
finalise(e);
finalise(slot, new AutoFollowResult(clusterAlias, e));
}
});
i++;
}
}

private void handleClusterAlias(String clusterAlias, AutoFollowPattern autoFollowPattern,
List<String> followedIndexUUIDs, ClusterState leaderClusterState) {
final List<Index> leaderIndicesToFollow =
getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, followerClusterState, followedIndexUUIDs);
if (leaderIndicesToFollow.isEmpty()) {
finalise(null);
} else {
final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size());
final AtomicReference<Exception> leaderIndicesErrorHolder = new AtomicReference<>();
for (Index indexToFollow : leaderIndicesToFollow) {
final String leaderIndexName = indexToFollow.getName();
final String followIndexName = getFollowerIndexName(autoFollowPattern, leaderIndexName);

String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName :
clusterAlias + ":" + leaderIndexName;
FollowIndexAction.Request followRequest =
new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName,
autoFollowPattern.getMaxBatchOperationCount(), autoFollowPattern.getMaxConcurrentReadBatches(),
autoFollowPattern.getMaxOperationSizeInBytes(), autoFollowPattern.getMaxConcurrentWriteBatches(),
autoFollowPattern.getMaxWriteBufferSize(), autoFollowPattern.getMaxRetryDelay(),
autoFollowPattern.getIdleShardRetryDelay());

// Execute if the create and follow api call succeeds:
Runnable successHandler = () -> {
LOGGER.info("Auto followed leader index [{}] as follow index [{}]", leaderIndexName, followIndexName);

// This function updates the auto follow metadata in the cluster to record that the leader index has been followed:
// (so that we do not try to follow it in subsequent auto follow runs)
Function<ClusterState, ClusterState> function = recordLeaderIndexAsFollowFunction(clusterAlias, indexToFollow);
// The coordinator always runs on the elected master node, so we can update cluster state here:
updateAutoFollowMetadata(function, updateError -> {
if (updateError != null) {
LOGGER.error("Failed to mark leader index [" + leaderIndexName + "] as auto followed", updateError);
if (leaderIndicesErrorHolder.compareAndSet(null, updateError) == false) {
leaderIndicesErrorHolder.get().addSuppressed(updateError);
}
} else {
LOGGER.debug("Successfully marked leader index [{}] as auto followed", leaderIndexName);
}
if (leaderIndicesCountDown.countDown()) {
finalise(leaderIndicesErrorHolder.get());
}
});
};
// Execute if the create and follow apu call fails:
Consumer<Exception> failureHandler = followError -> {
assert followError != null;
LOGGER.warn("Failed to auto follow leader index [" + leaderIndexName + "]", followError);
if (leaderIndicesCountDown.countDown()) {
finalise(followError);
}
};
createAndFollow(autoFollowPattern.getHeaders(), followRequest, successHandler, failureHandler);
}
private void checkAutoFollowPattern(String clusterAlias, AutoFollowPattern autoFollowPattern,
List<Index> leaderIndicesToFollow, Consumer<AutoFollowResult> resultHandler) {

final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size());
final AtomicArray<Tuple<Index, Exception>> results = new AtomicArray<>(leaderIndicesToFollow.size());
for (int i = 0; i < leaderIndicesToFollow.size(); i++) {
final Index indexToFollow = leaderIndicesToFollow.get(i);
final int slot = i;
followLeaderIndex(clusterAlias, indexToFollow, autoFollowPattern, error -> {
results.set(slot, new Tuple<>(indexToFollow, error));
if (leaderIndicesCountDown.countDown()) {
resultHandler.accept(new AutoFollowResult(clusterAlias, results.asList()));
}
});
}
}

private void finalise(Exception failure) {
if (autoFollowPatternsErrorHolder.compareAndSet(null, failure) == false) {
autoFollowPatternsErrorHolder.get().addSuppressed(failure);
}
private void followLeaderIndex(String clusterAlias, Index indexToFollow,
AutoFollowPattern pattern, Consumer<Exception> onResult) {
final String leaderIndexName = indexToFollow.getName();
final String followIndexName = getFollowerIndexName(pattern, leaderIndexName);

String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName :
clusterAlias + ":" + leaderIndexName;
FollowIndexAction.Request request =
new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName,
pattern.getMaxBatchOperationCount(), pattern.getMaxConcurrentReadBatches(),
pattern.getMaxOperationSizeInBytes(), pattern.getMaxConcurrentWriteBatches(),
pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(),
pattern.getIdleShardRetryDelay());

// Execute if the create and follow api call succeeds:
Runnable successHandler = () -> {
LOGGER.info("Auto followed leader index [{}] as follow index [{}]", leaderIndexName, followIndexName);

// This function updates the auto follow metadata in the cluster to record that the leader index has been followed:
// (so that we do not try to follow it in subsequent auto follow runs)
Function<ClusterState, ClusterState> function = recordLeaderIndexAsFollowFunction(clusterAlias, indexToFollow);
// The coordinator always runs on the elected master node, so we can update cluster state here:
updateAutoFollowMetadata(function, onResult);
};
createAndFollow(pattern.getHeaders(), request, successHandler, onResult);
}

private void finalise(int slot, AutoFollowResult result) {
assert autoFollowResults.get(slot) == null;
autoFollowResults.set(slot, result);
if (autoFollowPatternsCountDown.countDown()) {
handler.accept(autoFollowPatternsErrorHolder.get());
handler.accept(autoFollowResults.asList());
}
}

Expand Down Expand Up @@ -347,4 +396,33 @@ abstract void updateAutoFollowMetadata(
);

}

static class AutoFollowResult {

final String clusterAlias;
final Exception clusterStateFetchException;
final Map<Index, Exception> autoFollowExecutionResults;

AutoFollowResult(String clusterAlias, List<Tuple<Index, Exception>> results) {
this.clusterAlias = clusterAlias;

Map<Index, Exception> autoFollowExecutionResults = new HashMap<>();
for (Tuple<Index, Exception> result : results) {
autoFollowExecutionResults.put(result.v1(), result.v2());
}

this.clusterStateFetchException = null;
this.autoFollowExecutionResults = Collections.unmodifiableMap(autoFollowExecutionResults);
}

AutoFollowResult(String clusterAlias, Exception e) {
this.clusterAlias = clusterAlias;
this.clusterStateFetchException = e;
this.autoFollowExecutionResults = Collections.emptyMap();
}

AutoFollowResult(String clusterAlias) {
this(clusterAlias, (Exception) null);
}
}
}
Loading