Skip to content

Commit

Permalink
[CCR] Make auto follow patterns work with security (#33501)
Browse files Browse the repository at this point in the history
Relates to #33007
  • Loading branch information
martijnvg authored Sep 17, 2018
1 parent 3046656 commit 481f8a9
Show file tree
Hide file tree
Showing 12 changed files with 255 additions and 107 deletions.
2 changes: 1 addition & 1 deletion x-pack/plugin/ccr/qa/multi-cluster-with-security/roles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ ccruser:
cluster:
- manage_ccr
indices:
- names: [ 'allowed-index' ]
- names: [ 'allowed-index', 'logs-eu-*' ]
privileges:
- monitor
- read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ccr;

import org.apache.http.HttpHost;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
Expand Down Expand Up @@ -119,6 +120,45 @@ public void testFollowIndex() throws Exception {
}
}

public void testAutoFollowPatterns() throws Exception {
assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster);
String allowedIndex = "logs-eu-20190101";
String disallowedIndex = "logs-us-20190101";

Request request = new Request("PUT", "/_ccr/auto_follow/leader_cluster");
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"]}");
assertOK(client().performRequest(request));

try (RestClient leaderClient = buildLeaderClient()) {
for (String index : new String[]{allowedIndex, disallowedIndex}) {
Settings settings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.build();
String requestBody = "{\"settings\": " + Strings.toString(settings) +
", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }";
request = new Request("PUT", "/" + index);
request.setJsonEntity(requestBody);
assertOK(leaderClient.performRequest(request));

for (int i = 0; i < 5; i++) {
String id = Integer.toString(i);
index(leaderClient, index, id, "field", i, "filtered_field", "true");
}
}
}

assertBusy(() -> {
ensureYellow(allowedIndex);
verifyDocuments(adminClient(), allowedIndex, 5);
});
assertThat(indexExists(adminClient(), disallowedIndex), is(false));

// Cleanup by deleting auto follow pattern and unfollowing:
request = new Request("DELETE", "/_ccr/auto_follow/leader_cluster");
assertOK(client().performRequest(request));
unfollowIndex(allowedIndex);
}

private int countCcrNodeTasks() throws IOException {
final Request request = new Request("GET", "/_tasks");
request.addParameter("detailed", "true");
Expand All @@ -139,14 +179,18 @@ private int countCcrNodeTasks() throws IOException {
}

private static void index(String index, String id, Object... fields) throws IOException {
index(adminClient(), index, id, fields);
}

private static void index(RestClient client, String index, String id, Object... fields) throws IOException {
XContentBuilder document = jsonBuilder().startObject();
for (int i = 0; i < fields.length; i += 2) {
document.field((String) fields[i], fields[i + 1]);
}
document.endObject();
final Request request = new Request("POST", "/" + index + "/_doc/" + id);
request.setJsonEntity(Strings.toString(document));
assertOK(adminClient().performRequest(request));
assertOK(client.performRequest(request));
}

private static void refresh(String index) throws IOException {
Expand Down Expand Up @@ -201,11 +245,34 @@ protected static void createIndex(String name, Settings settings, String mapping
assertOK(adminClient().performRequest(request));
}

private static void ensureYellow(String index) throws IOException {
Request request = new Request("GET", "/_cluster/health/" + index);
request.addParameter("wait_for_status", "yellow");
request.addParameter("wait_for_no_relocating_shards", "true");
request.addParameter("wait_for_no_initializing_shards", "true");
request.addParameter("timeout", "70s");
request.addParameter("level", "shards");
adminClient().performRequest(request);
}

private RestClient buildLeaderClient() throws IOException {
assert runningAgainstLeaderCluster == false;
String leaderUrl = System.getProperty("tests.leader_host");
int portSeparator = leaderUrl.lastIndexOf(':');
HttpHost httpHost = new HttpHost(leaderUrl.substring(0, portSeparator),
Integer.parseInt(leaderUrl.substring(portSeparator + 1)), getProtocol());
return buildClient(restAdminSettings(), new HttpHost[]{httpHost});
}

private static boolean indexExists(RestClient client, String index) throws IOException {
Response response = client.performRequest(new Request("HEAD", "/" + index));
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
}

private static void unfollowIndex(String followIndex) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/unfollow")));
}

private static void verifyCcrMonitoring(String expectedLeaderIndex, String expectedFollowerIndex) throws IOException {
ensureYellow(".monitoring-*");

Expand Down Expand Up @@ -239,14 +306,4 @@ private static void verifyCcrMonitoring(String expectedLeaderIndex, String expec
assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1));
}

private static void ensureYellow(String index) throws IOException {
Request request = new Request("GET", "/_cluster/health/" + index);
request.addParameter("wait_for_status", "yellow");
request.addParameter("wait_for_no_relocating_shards", "true");
request.addParameter("wait_for_no_initializing_shards", "true");
request.addParameter("timeout", "70s");
request.addParameter("level", "shards");
adminClient().performRequest(request);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,43 @@
package org.elasticsearch.xpack.ccr;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.core.XPackPlugin;

import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* Encapsulates licensing checking for CCR.
Expand Down Expand Up @@ -93,6 +103,7 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU
request.indices(leaderIndex);
checkRemoteClusterLicenseAndFetchClusterState(
client,
Collections.emptyMap(),
clusterAlias,
request,
onFailure,
Expand All @@ -115,19 +126,22 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU
*
* @param client the client
* @param clusterAlias the remote cluster alias
* @param headers the headers to use for leader client
* @param request the cluster state request
* @param onFailure the failure consumer
* @param leaderClusterStateConsumer the leader cluster state consumer
* @param <T> the type of response the listener is waiting for
*/
public <T> void checkRemoteClusterLicenseAndFetchClusterState(
final Client client,
final Map<String, String> headers,
final String clusterAlias,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer) {
checkRemoteClusterLicenseAndFetchClusterState(
client,
headers,
clusterAlias,
request,
onFailure,
Expand All @@ -144,6 +158,7 @@ public <T> void checkRemoteClusterLicenseAndFetchClusterState(
*
* @param client the client
* @param clusterAlias the remote cluster alias
* @param headers the headers to use for leader client
* @param request the cluster state request
* @param onFailure the failure consumer
* @param leaderClusterStateConsumer the leader cluster state consumer
Expand All @@ -153,6 +168,7 @@ public <T> void checkRemoteClusterLicenseAndFetchClusterState(
*/
private <T> void checkRemoteClusterLicenseAndFetchClusterState(
final Client client,
final Map<String, String> headers,
final String clusterAlias,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
Expand All @@ -167,7 +183,7 @@ private <T> void checkRemoteClusterLicenseAndFetchClusterState(
@Override
public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
if (licenseCheck.isSuccess()) {
final Client leaderClient = client.getRemoteClusterClient(clusterAlias);
final Client leaderClient = wrapClient(client.getRemoteClusterClient(clusterAlias), headers);
final ActionListener<ClusterStateResponse> clusterStateListener =
ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure);
// following an index in remote cluster, so use remote client to fetch leader index metadata
Expand Down Expand Up @@ -237,6 +253,33 @@ public void fetchLeaderHistoryUUIDs(
leaderClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure));
}

public static Client wrapClient(Client client, Map<String, String> headers) {
if (headers.isEmpty()) {
return client;
} else {
final ThreadContext threadContext = client.threadPool().getThreadContext();
Map<String, String> filteredHeaders = headers.entrySet().stream()
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return new FilterClient(client) {
@Override
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = stashWithHeaders(threadContext, filteredHeaders)) {
super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener));
}
}
};
}
}

private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map<String, String> headers) {
final ThreadContext.StoredContext storedContext = threadContext.stashContext();
threadContext.copyHeaders(headers.entrySet());
return storedContext;
}

private static ElasticsearchStatusException indexMetadataNonCompliantRemoteLicense(
final String leaderIndex, final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
final String clusterAlias = licenseCheck.remoteClusterLicenseInfo().clusterAlias();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,22 @@ private void doAutoFollow() {
AutoFollower operation = new AutoFollower(handler, followerClusterState) {

@Override
void getLeaderClusterState(final String leaderClusterAlias, final BiConsumer<ClusterState, Exception> handler) {
void getLeaderClusterState(final Map<String, String> headers,
final String leaderClusterAlias,
final BiConsumer<ClusterState, Exception> handler) {
final ClusterStateRequest request = new ClusterStateRequest();
request.clear();
request.metaData(true);

if ("_local_".equals(leaderClusterAlias)) {
Client client = CcrLicenseChecker.wrapClient(AutoFollowCoordinator.this.client, headers);
client.admin().cluster().state(
request, ActionListener.wrap(r -> handler.accept(r.getState(), null), e -> handler.accept(null, e)));
} else {
final Client leaderClient = client.getRemoteClusterClient(leaderClusterAlias);
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
leaderClient,
client,
headers,
leaderClusterAlias,
request,
e -> handler.accept(null, e),
Expand All @@ -125,15 +128,22 @@ void getLeaderClusterState(final String leaderClusterAlias, final BiConsumer<Clu
}

@Override
void createAndFollow(FollowIndexAction.Request followRequest,
void createAndFollow(Map<String, String> headers,
FollowIndexAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler) {
client.execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest),
ActionListener.wrap(r -> successHandler.run(), failureHandler));
Client followerClient = CcrLicenseChecker.wrapClient(client, headers);
CreateAndFollowIndexAction.Request request = new CreateAndFollowIndexAction.Request(followRequest);
followerClient.execute(
CreateAndFollowIndexAction.INSTANCE,
request,
ActionListener.wrap(r -> successHandler.run(), failureHandler)
);
}

@Override
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler) {
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction,
Consumer<Exception> handler) {
clusterService.submitStateUpdateTask("update_auto_follow_metadata", new ClusterStateUpdateTask() {

@Override
Expand Down Expand Up @@ -188,7 +198,7 @@ void autoFollowIndices() {
AutoFollowPattern autoFollowPattern = entry.getValue();
List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias);

getLeaderClusterState(clusterAlias, (leaderClusterState, e) -> {
getLeaderClusterState(autoFollowPattern.getHeaders(), clusterAlias, (leaderClusterState, e) -> {
if (leaderClusterState != null) {
assert e == null;
handleClusterAlias(clusterAlias, autoFollowPattern, followedIndices, leaderClusterState);
Expand Down Expand Up @@ -251,7 +261,7 @@ private void handleClusterAlias(String clusterAlias, AutoFollowPattern autoFollo
finalise(followError);
}
};
createAndFollow(followRequest, successHandler, failureHandler);
createAndFollow(autoFollowPattern.getHeaders(), followRequest, successHandler, failureHandler);
}
}
}
Expand Down Expand Up @@ -314,14 +324,27 @@ static Function<ClusterState, ClusterState> recordLeaderIndexAsFollowFunction(St
/**
* Fetch the cluster state from the leader with the specified cluster alias
*
* @param headers the client headers
* @param leaderClusterAlias the cluster alias of the leader
* @param handler the callback to invoke
*/
abstract void getLeaderClusterState(String leaderClusterAlias, BiConsumer<ClusterState, Exception> handler);

abstract void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer<Exception> failureHandler);

abstract void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler);
abstract void getLeaderClusterState(
Map<String, String> headers,
String leaderClusterAlias,
BiConsumer<ClusterState, Exception> handler
);

abstract void createAndFollow(
Map<String, String> headers,
FollowIndexAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler
);

abstract void updateAutoFollowMetadata(
Function<ClusterState, ClusterState> updateFunction,
Consumer<Exception> handler
);

}
}
Loading

0 comments on commit 481f8a9

Please sign in to comment.