Skip to content

Commit

Permalink
[CCR] Change AutofollowCoordinator to use wait_for_metadata_version
Browse files Browse the repository at this point in the history
Changed AutofollowCoordinator makes use of the wait_for_metadata_version
feature in cluster state API and removed hard coded poll interval.

Originates from elastic#35895
Relates to elastic#33007
  • Loading branch information
martijnvg committed Dec 5, 2018
1 parent 3d85e8c commit c2bdb4e
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public Collection<Object> createComponents(

return Arrays.asList(
ccrLicenseChecker,
new AutoFollowCoordinator(client, threadPool, clusterService, ccrLicenseChecker)
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
client.getRemoteClusterClient(clusterAlias),
request,
onFailure,
leaderClusterState -> {
IndexMetaData leaderIndexMetaData = leaderClusterState.getMetaData().index(leaderIndex);
remoteClusterStateRsp -> {
ClusterState remoteClusterState = remoteClusterStateRsp.getState();
IndexMetaData leaderIndexMetaData = remoteClusterState.getMetaData().index(leaderIndex);
if (leaderIndexMetaData == null) {
onFailure.accept(new IndexNotFoundException(leaderIndex));
return;
Expand Down Expand Up @@ -159,7 +160,7 @@ public void checkRemoteClusterLicenseAndFetchClusterState(
final String clusterAlias,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer) {
final Consumer<ClusterStateResponse> leaderClusterStateConsumer) {
try {
Client remoteClient = systemClient(client.getRemoteClusterClient(clusterAlias));
checkRemoteClusterLicenseAndFetchClusterState(
Expand Down Expand Up @@ -199,7 +200,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState(
final Client remoteClient,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer,
final Consumer<ClusterStateResponse> leaderClusterStateConsumer,
final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
final Function<Exception, ElasticsearchStatusException> unknownLicense) {
// we have to check the license on the remote cluster
Expand All @@ -211,7 +212,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState(
public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
if (licenseCheck.isSuccess()) {
final ActionListener<ClusterStateResponse> clusterStateListener =
ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure);
ActionListener.wrap(leaderClusterStateConsumer::accept, onFailure);
// following an index in remote cluster, so use remote client to fetch leader index metadata
remoteClient.admin().cluster().state(request, clusterStateListener);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -23,13 +24,11 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.CopyOnWriteHashMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
Expand Down Expand Up @@ -62,7 +61,6 @@ public class AutoFollowCoordinator implements ClusterStateListener {
private static final int MAX_AUTO_FOLLOW_ERRORS = 256;

private final Client client;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final CcrLicenseChecker ccrLicenseChecker;

Expand All @@ -76,11 +74,9 @@ public class AutoFollowCoordinator implements ClusterStateListener {

public AutoFollowCoordinator(
Client client,
ThreadPool threadPool,
ClusterService clusterService,
CcrLicenseChecker ccrLicenseChecker) {
this.client = client;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker");
clusterService.addListener(this);
Expand Down Expand Up @@ -146,22 +142,24 @@ void updateAutoFollowers(ClusterState followerClusterState) {

Map<String, AutoFollower> newAutoFollowers = new HashMap<>(newRemoteClusters.size());
for (String remoteCluster : newRemoteClusters) {
AutoFollower autoFollower = new AutoFollower(remoteCluster, threadPool, this::updateStats, clusterService::state) {
AutoFollower autoFollower = new AutoFollower(remoteCluster, this::updateStats, clusterService::state) {

@Override
void getLeaderClusterState(final String remoteCluster,
final BiConsumer<ClusterState, Exception> handler) {
void getRemoteClusterState(final String remoteCluster,
final long metadataVersion,
final BiConsumer<ClusterStateResponse, Exception> handler) {
final ClusterStateRequest request = new ClusterStateRequest();
request.clear();
request.metaData(true);
request.routingTable(true);
request.waitForMetaDataVersion(metadataVersion);
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
client,
remoteCluster,
request,
e -> handler.accept(null, e),
leaderClusterState -> handler.accept(leaderClusterState, null));
remoteClusterStateRsp -> handler.accept(remoteClusterStateRsp, null));
}

@Override
Expand Down Expand Up @@ -235,19 +233,17 @@ public void clusterChanged(ClusterChangedEvent event) {
abstract static class AutoFollower {

private final String remoteCluster;
private final ThreadPool threadPool;
private final Consumer<List<AutoFollowResult>> statsUpdater;
private final Supplier<ClusterState> followerClusterStateSupplier;

private volatile long metadataVersion = 0;
private volatile CountDown autoFollowPatternsCountDown;
private volatile AtomicArray<AutoFollowResult> autoFollowResults;

AutoFollower(final String remoteCluster,
final ThreadPool threadPool,
final Consumer<List<AutoFollowResult>> statsUpdater,
final Supplier<ClusterState> followerClusterStateSupplier) {
this.remoteCluster = remoteCluster;
this.threadPool = threadPool;
this.statsUpdater = statsUpdater;
this.followerClusterStateSupplier = followerClusterStateSupplier;
}
Expand All @@ -272,10 +268,15 @@ void autoFollowIndices() {
this.autoFollowPatternsCountDown = new CountDown(patterns.size());
this.autoFollowResults = new AtomicArray<>(patterns.size());

getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> {
if (leaderClusterState != null) {
getRemoteClusterState(remoteCluster, metadataVersion + 1, (leaderClusterStateRsp, e) -> {
if (leaderClusterStateRsp != null) {
assert e == null;
if (leaderClusterStateRsp.isWaitForTimedOut()) {
autoFollowIndices();
return;
}

ClusterState leaderClusterState = leaderClusterStateRsp.getState();
int i = 0;
for (String autoFollowPatternName : patterns) {
final int slot = i;
Expand Down Expand Up @@ -392,8 +393,7 @@ private void finalise(int slot, AutoFollowResult result) {
autoFollowResults.set(slot, result);
if (autoFollowPatternsCountDown.countDown()) {
statsUpdater.accept(autoFollowResults.asList());
// TODO: Remove scheduling here with using cluster state API's waitForMetadataVersion:
threadPool.schedule(TimeValue.timeValueMillis(2500), ThreadPool.Names.GENERIC, this::autoFollowIndices);
autoFollowIndices();
}
}

Expand Down Expand Up @@ -461,13 +461,15 @@ static Function<ClusterState, ClusterState> recordLeaderIndexAsFollowFunction(St
}

/**
* Fetch the cluster state from the leader with the specified cluster alias
* Fetch a remote cluster state from with the specified cluster alias
* @param remoteCluster the name of the leader cluster
* @param metadataVersion the last seen metadata version
* @param handler the callback to invoke
*/
abstract void getLeaderClusterState(
abstract void getRemoteClusterState(
String remoteCluster,
BiConsumer<ClusterState, Exception> handler
long metadataVersion,
BiConsumer<ClusterStateResponse, Exception> handler
);

abstract void createAndFollow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
Expand Down Expand Up @@ -80,7 +81,7 @@ protected void masterOperation(PutAutoFollowPatternAction.Request request,
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Consumer<ClusterState> consumer = remoteClusterState -> {
Consumer<ClusterStateResponse> consumer = remoteClusterState -> {
String[] indices = request.getLeaderIndexPatterns().toArray(new String[0]);
ccrLicenseChecker.hasPrivilegesToFollowIndices(remoteClient, indices, e -> {
if (e == null) {
Expand All @@ -94,7 +95,7 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) {

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return innerPut(request, filteredHeaders, currentState, remoteClusterState);
return innerPut(request, filteredHeaders, currentState, remoteClusterState.getState());
}
});
} else {
Expand Down
Loading

0 comments on commit c2bdb4e

Please sign in to comment.