From 24948aac236b4cdb5b1da77e87d977d7ced5fe2e Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Wed, 16 May 2018 11:38:24 +0100 Subject: [PATCH] Refactors ClientHelper to combine header logic (#30620) * Refactors ClientHelper to combine header logic This change removes all the `*ClientHelper` classes which were repeating logic between plugins and instead adds `ClientHelper.executeWithHeaders()` and `ClientHelper.executeWithHeadersAsync()` methods to centralise the logic for executing requests with stored security headers. * Removes Watcher headers constant x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/tran sport/actions/put/TransportPutWatchActionTests.java /Users/colings86/dev/work/git/elasticsearch/.git/worktrees/elasticsearch -6.x/CHERRY_PICK_HEAD x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelp er.java x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlClien tHelper.java x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetad ata.java x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafee d/DatafeedUpdate.java x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ClientHelp erTests.java x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/Transpo rtPreviewDatafeedAction.java x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extra ctor/aggregation/AggregationDataExtractor.java x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extra ctor/chunked/ChunkedDataExtractor.java x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extra ctor/scroll/ScrollDataExtractor.java x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extra ctor/scroll/ScrollDataExtractorFactory.java x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlClientHelper Tests.java x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/Ro llupClientHelper.java x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/Ro llupJobTask.java x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/Ro llupClientHelperTests.java x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watc her.java x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watc herClientHelper.java x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/acti ons/index/ExecutableIndexAction.java x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/inpu t/search/ExecutableSearchInput.java x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/tran sform/search/ExecutableSearchTransform.java x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/tran sport/actions/put/TransportPutWatchAction.java x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/Watc herClientHelperTests.java x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/tran sport/actions/put/TransportPutWatchActionTests.java --- .../xpack/core/ClientHelper.java | 89 +++++++- .../xpack/core/ml/MlClientHelper.java | 71 ------- .../xpack/core/ml/MlMetadata.java | 7 +- .../core/ml/datafeed/DatafeedUpdate.java | 4 +- .../xpack/core/ClientHelperTests.java | 199 +++++++++++++++++- .../TransportPreviewDatafeedAction.java | 4 +- .../aggregation/AggregationDataExtractor.java | 4 +- .../chunked/ChunkedDataExtractor.java | 6 +- .../extractor/scroll/ScrollDataExtractor.java | 10 +- .../scroll/ScrollDataExtractorFactory.java | 6 +- .../xpack/ml/MlClientHelperTests.java | 118 ----------- .../xpack/rollup/job/RollupClientHelper.java | 59 ------ .../xpack/rollup/job/RollupJobTask.java | 7 +- .../rollup/job/RollupClientHelperTests.java | 133 ------------ .../elasticsearch/xpack/watcher/Watcher.java | 4 - .../xpack/watcher/WatcherClientHelper.java | 51 ----- .../actions/index/ExecutableIndexAction.java | 6 +- .../input/search/ExecutableSearchInput.java | 6 +- .../search/ExecutableSearchTransform.java | 5 +- .../actions/put/TransportPutWatchAction.java | 4 +- .../watcher/WatcherClientHelperTests.java | 119 ----------- .../put/TransportPutWatchActionTests.java | 4 +- 22 files changed, 324 insertions(+), 592 deletions(-) delete mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlClientHelper.java delete mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlClientHelperTests.java delete mode 100644 x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupClientHelper.java delete mode 100644 x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupClientHelperTests.java delete mode 100644 x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherClientHelper.java delete mode 100644 x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherClientHelperTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java index d657d4df809c4..c73bb8576a7ad 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java @@ -14,9 +14,15 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.FilterClient; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.xpack.core.security.authc.AuthenticationField; +import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField; +import java.util.Map; +import java.util.Set; import java.util.function.BiConsumer; import java.util.function.Supplier; +import java.util.stream.Collectors; /** * Utility class to help with the execution of requests made using a {@link Client} such that they @@ -24,6 +30,12 @@ */ public final class ClientHelper { + /** + * List of headers that are related to security + */ + public static final Set SECURITY_HEADER_FILTERS = Sets.newHashSet(AuthenticationServiceField.RUN_AS_USER_HEADER, + AuthenticationField.AUTHENTICATION_KEY); + public static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin"; public static final String SECURITY_ORIGIN = "security"; public static final String WATCHER_ORIGIN = "watcher"; @@ -78,6 +90,82 @@ RequestBuilder extends ActionRequestBuilder> } } + /** + * Execute a client operation and return the response, try to run an action + * with least privileges, when headers exist + * + * @param headers + * Request headers, ideally including security headers + * @param origin + * The origin to fall back to if there are no security headers + * @param client + * The client used to query + * @param supplier + * The action to run + * @return An instance of the response class + */ + public static T executeWithHeaders(Map headers, String origin, Client client, + Supplier supplier) { + Map filteredHeaders = headers.entrySet().stream().filter(e -> SECURITY_HEADER_FILTERS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + // no security headers, we will have to use the xpack internal user for + // our execution by specifying the origin + if (filteredHeaders.isEmpty()) { + try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), origin)) { + return supplier.get(); + } + } else { + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { + client.threadPool().getThreadContext().copyHeaders(filteredHeaders.entrySet()); + return supplier.get(); + } + } + } + + /** + * Execute a client operation asynchronously, try to run an action with + * least privileges, when headers exist + * + * @param headers + * Request headers, ideally including security headers + * @param origin + * The origin to fall back to if there are no security headers + * @param action + * The action to execute + * @param request + * The request object for the action + * @param listener + * The listener to call when the action is complete + */ + public static > void executeWithHeadersAsync( + Map headers, String origin, Client client, Action action, Request request, + ActionListener listener) { + + Map filteredHeaders = headers.entrySet().stream().filter(e -> SECURITY_HEADER_FILTERS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + final ThreadContext threadContext = client.threadPool().getThreadContext(); + + // No headers (e.g. security not installed/in use) so execute as origin + if (filteredHeaders.isEmpty()) { + ClientHelper.executeAsyncWithOrigin(client, origin, action, request, listener); + } else { + // Otherwise stash the context and copy in the saved headers before executing + final Supplier supplier = threadContext.newRestorableContext(false); + try (ThreadContext.StoredContext ignore = stashWithHeaders(threadContext, filteredHeaders)) { + client.execute(action, request, new ContextPreservingActionListener<>(supplier, listener)); + } + } + } + + private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map headers) { + final ThreadContext.StoredContext storedContext = threadContext.stashContext(); + threadContext.copyHeaders(headers.entrySet()); + return storedContext; + } + private static final class ClientWithOrigin extends FilterClient { private final String origin; @@ -98,5 +186,4 @@ RequestBuilder extends ActionRequestBuilder> } } } - } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlClientHelper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlClientHelper.java deleted file mode 100644 index a76c5c51e8d7f..0000000000000 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlClientHelper.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.core.ml; - -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.xpack.core.ClientHelper; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.core.security.authc.AuthenticationField; -import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField; - -import java.util.Map; -import java.util.Set; -import java.util.function.Supplier; -import java.util.stream.Collectors; - -/** - * A helper class for actions which decides if we should run via the _xpack user and set ML as origin - * or if we should use the run_as functionality by setting the correct headers - */ -public class MlClientHelper { - - /** - * List of headers that are related to security - */ - public static final Set SECURITY_HEADER_FILTERS = Sets.newHashSet(AuthenticationServiceField.RUN_AS_USER_HEADER, - AuthenticationField.AUTHENTICATION_KEY); - - /** - * Execute a client operation and return the response, try to run a datafeed search with least privileges, when headers exist - * - * @param datafeedConfig The config for a datafeed - * @param client The client used to query - * @param supplier The action to run - * @return An instance of the response class - */ - public static T execute(DatafeedConfig datafeedConfig, Client client, Supplier supplier) { - return execute(datafeedConfig.getHeaders(), client, supplier); - } - - /** - * Execute a client operation and return the response, try to run an action with least privileges, when headers exist - * - * @param headers Request headers, ideally including security headers - * @param client The client used to query - * @param supplier The action to run - * @return An instance of the response class - */ - public static T execute(Map headers, Client client, Supplier supplier) { - // no headers, we will have to use the xpack internal user for our execution by specifying the ml origin - if (headers == null || headers.isEmpty()) { - try (ThreadContext.StoredContext ignore = ClientHelper.stashWithOrigin(client.threadPool().getThreadContext(), - ClientHelper.ML_ORIGIN)) { - return supplier.get(); - } - } else { - try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { - Map filteredHeaders = headers.entrySet().stream() - .filter(e -> SECURITY_HEADER_FILTERS.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - client.threadPool().getThreadContext().copyHeaders(filteredHeaders.entrySet()); - return supplier.get(); - } - } - } -} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index b09a7463ffdb1..b709e32946ec6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -23,6 +23,9 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; @@ -35,8 +38,6 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.NameResolver; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.io.IOException; import java.util.Collection; @@ -303,7 +304,7 @@ public Builder putDatafeed(DatafeedConfig datafeedConfig, ThreadContext threadCo // Adjust the request, adding security headers from the current thread context DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedConfig); Map headers = threadContext.getHeaders().entrySet().stream() - .filter(e -> MlClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) + .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); builder.setHeaders(headers); datafeedConfig = builder.build(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java index 6255be9f4383a..444532a7e3f15 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java @@ -21,7 +21,7 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.xpack.core.ml.MlClientHelper; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -304,7 +304,7 @@ public DatafeedConfig apply(DatafeedConfig datafeedConfig, ThreadContext threadC if (threadContext != null) { // Adjust the request, adding security headers from the current thread context Map headers = threadContext.getHeaders().entrySet().stream() - .filter(e -> MlClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) + .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); builder.setHeaders(headers); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ClientHelperTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ClientHelperTests.java index a243b8c995d23..95361dbff42b0 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ClientHelperTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ClientHelperTests.java @@ -9,15 +9,33 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; +import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.security.authc.AuthenticationField; +import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; +import static org.elasticsearch.xpack.core.ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -97,7 +115,7 @@ public void testExecuteWithClient() throws Exception { assertEquals(origin, threadContext.getTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME)); assertNull(threadContext.getHeader(headerName)); latch.countDown(); - ((ActionListener)invocationOnMock.getArguments()[2]).onResponse(null); + ((ActionListener) invocationOnMock.getArguments()[2]).onResponse(null); return null; }).when(client).execute(anyObject(), anyObject(), anyObject()); @@ -130,7 +148,7 @@ public void testClientWithOrigin() throws Exception { assertEquals(origin, threadContext.getTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME)); assertNull(threadContext.getHeader(headerName)); latch.countDown(); - ((ActionListener)invocationOnMock.getArguments()[2]).onResponse(null); + ((ActionListener) invocationOnMock.getArguments()[2]).onResponse(null); return null; }).when(client).execute(anyObject(), anyObject(), anyObject()); @@ -139,4 +157,179 @@ public void testClientWithOrigin() throws Exception { clientWithOrigin.execute(null, null, listener); latch.await(); } + + public void testExecuteWithHeadersAsyncNoHeaders() throws InterruptedException { + final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + final Client client = mock(Client.class); + final ThreadPool threadPool = mock(ThreadPool.class); + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(threadContext); + + final CountDownLatch latch = new CountDownLatch(2); + final ActionListener listener = ActionListener.wrap(v -> { + assertTrue(threadContext.getHeaders().isEmpty()); + latch.countDown(); + }, e -> fail(e.getMessage())); + + doAnswer(invocationOnMock -> { + assertTrue(threadContext.getHeaders().isEmpty()); + latch.countDown(); + ((ActionListener) invocationOnMock.getArguments()[2]).onResponse(null); + return null; + }).when(client).execute(anyObject(), anyObject(), anyObject()); + + SearchRequest request = new SearchRequest("foo"); + + String originName = randomFrom(ClientHelper.ML_ORIGIN, ClientHelper.WATCHER_ORIGIN, ClientHelper.ROLLUP_ORIGIN); + ClientHelper.executeWithHeadersAsync(Collections.emptyMap(), originName, client, SearchAction.INSTANCE, request, listener); + + latch.await(); + } + + public void testExecuteWithHeadersAsyncWrongHeaders() throws InterruptedException { + final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + final Client client = mock(Client.class); + final ThreadPool threadPool = mock(ThreadPool.class); + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(threadContext); + + final CountDownLatch latch = new CountDownLatch(2); + final ActionListener listener = ActionListener.wrap(v -> { + assertTrue(threadContext.getHeaders().isEmpty()); + latch.countDown(); + }, e -> fail(e.getMessage())); + + doAnswer(invocationOnMock -> { + assertTrue(threadContext.getHeaders().isEmpty()); + latch.countDown(); + ((ActionListener) invocationOnMock.getArguments()[2]).onResponse(null); + return null; + }).when(client).execute(anyObject(), anyObject(), anyObject()); + + SearchRequest request = new SearchRequest("foo"); + Map headers = new HashMap<>(1); + headers.put("foo", "foo"); + headers.put("bar", "bar"); + + String originName = randomFrom(ClientHelper.ML_ORIGIN, ClientHelper.WATCHER_ORIGIN, ClientHelper.ROLLUP_ORIGIN); + ClientHelper.executeWithHeadersAsync(headers, originName, client, SearchAction.INSTANCE, request, listener); + + latch.await(); + } + + public void testExecuteWithHeadersAsyncWithHeaders() throws Exception { + final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + final Client client = mock(Client.class); + final ThreadPool threadPool = mock(ThreadPool.class); + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(threadContext); + + final CountDownLatch latch = new CountDownLatch(2); + final ActionListener listener = ActionListener.wrap(v -> { + assertTrue(threadContext.getHeaders().isEmpty()); + latch.countDown(); + }, e -> fail(e.getMessage())); + + doAnswer(invocationOnMock -> { + assertThat(threadContext.getHeaders().size(), equalTo(2)); + assertThat(threadContext.getHeaders().get("es-security-runas-user"), equalTo("foo")); + assertThat(threadContext.getHeaders().get("_xpack_security_authentication"), equalTo("bar")); + latch.countDown(); + ((ActionListener) invocationOnMock.getArguments()[2]).onResponse(null); + return null; + }).when(client).execute(anyObject(), anyObject(), anyObject()); + + SearchRequest request = new SearchRequest("foo"); + Map headers = new HashMap<>(1); + headers.put("es-security-runas-user", "foo"); + headers.put("_xpack_security_authentication", "bar"); + + String originName = randomFrom(ClientHelper.ML_ORIGIN, ClientHelper.WATCHER_ORIGIN, ClientHelper.ROLLUP_ORIGIN); + ClientHelper.executeWithHeadersAsync(headers, originName, client, SearchAction.INSTANCE, request, listener); + + latch.await(); + } + + public void testExecuteWithHeadersNoHeaders() { + Client client = mock(Client.class); + ThreadPool threadPool = mock(ThreadPool.class); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + when(threadPool.getThreadContext()).thenReturn(threadContext); + when(client.threadPool()).thenReturn(threadPool); + + PlainActionFuture searchFuture = PlainActionFuture.newFuture(); + searchFuture.onResponse(new SearchResponse()); + when(client.search(any())).thenReturn(searchFuture); + assertExecutionWithOrigin(Collections.emptyMap(), client); + } + + public void testExecuteWithHeaders() { + Client client = mock(Client.class); + ThreadPool threadPool = mock(ThreadPool.class); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + when(threadPool.getThreadContext()).thenReturn(threadContext); + when(client.threadPool()).thenReturn(threadPool); + + PlainActionFuture searchFuture = PlainActionFuture.newFuture(); + searchFuture.onResponse(new SearchResponse()); + when(client.search(any())).thenReturn(searchFuture); + Map headers = MapBuilder. newMapBuilder().put(AuthenticationField.AUTHENTICATION_KEY, "anything") + .put(AuthenticationServiceField.RUN_AS_USER_HEADER, "anything").map(); + + assertRunAsExecution(headers, h -> { + assertThat(h.keySet(), hasSize(2)); + assertThat(h, hasEntry(AuthenticationField.AUTHENTICATION_KEY, "anything")); + assertThat(h, hasEntry(AuthenticationServiceField.RUN_AS_USER_HEADER, "anything")); + }, client); + } + + public void testExecuteWithHeadersNoSecurityHeaders() { + Client client = mock(Client.class); + ThreadPool threadPool = mock(ThreadPool.class); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + when(threadPool.getThreadContext()).thenReturn(threadContext); + when(client.threadPool()).thenReturn(threadPool); + + PlainActionFuture searchFuture = PlainActionFuture.newFuture(); + searchFuture.onResponse(new SearchResponse()); + when(client.search(any())).thenReturn(searchFuture); + Map unrelatedHeaders = MapBuilder. newMapBuilder().put(randomAlphaOfLength(10), "anything").map(); + + assertExecutionWithOrigin(unrelatedHeaders, client); + } + + /** + * This method executes a search and checks if the thread context was + * enriched with the ml origin + */ + private void assertExecutionWithOrigin(Map storedHeaders, Client client) { + String originName = randomFrom(ClientHelper.ML_ORIGIN, ClientHelper.WATCHER_ORIGIN, ClientHelper.ROLLUP_ORIGIN); + ClientHelper.executeWithHeaders(storedHeaders, originName, client, () -> { + Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME); + assertThat(origin, is(originName)); + + // Check that headers are not set + Map headers = client.threadPool().getThreadContext().getHeaders(); + assertThat(headers, not(hasEntry(AuthenticationField.AUTHENTICATION_KEY, "anything"))); + assertThat(headers, not(hasEntry(AuthenticationServiceField.RUN_AS_USER_HEADER, "anything"))); + + return client.search(new SearchRequest()).actionGet(); + }); + } + + /** + * This method executes a search and ensures no stashed origin thread + * context was created, so that the regular node client was used, to emulate + * a run_as function + */ + public void assertRunAsExecution(Map storedHeaders, Consumer> consumer, Client client) { + String originName = randomFrom(ClientHelper.ML_ORIGIN, ClientHelper.WATCHER_ORIGIN, ClientHelper.ROLLUP_ORIGIN); + ClientHelper.executeWithHeaders(storedHeaders, originName, client, () -> { + Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME); + assertThat(origin, is(nullValue())); + + consumer.accept(client.threadPool().getThreadContext().getHeaders()); + return client.search(new SearchRequest()).actionGet(); + }); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java index 98ba2caa408ed..2ffb318dc4fb2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java @@ -16,8 +16,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.MLMetadataField; -import org.elasticsearch.xpack.core.ml.MlClientHelper; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig; @@ -64,7 +64,7 @@ protected void doExecute(PreviewDatafeedAction.Request request, ActionListener

headers = threadPool.getThreadContext().getHeaders().entrySet().stream() - .filter(e -> MlClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) + .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); previewDatafeed.setHeaders(headers); // NB: this is using the client from the transport layer, NOT the internal client. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java index f9089b6bc1704..d83865b751f50 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java @@ -14,7 +14,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; -import org.elasticsearch.xpack.core.ml.MlClientHelper; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; @@ -112,7 +112,7 @@ private void initAggregationProcessor(Aggregations aggs) throws IOException { } protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) { - return MlClientHelper.execute(context.headers, client, searchRequestBuilder::get); + return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get); } private SearchRequestBuilder buildSearchRequest() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java index 61298f16abd14..2e157c3d1e95a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java @@ -15,10 +15,10 @@ import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.metrics.max.Max; import org.elasticsearch.search.aggregations.metrics.min.Min; -import org.elasticsearch.xpack.core.ml.MlClientHelper; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; -import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; +import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import java.io.IOException; import java.io.InputStream; @@ -135,7 +135,7 @@ private DataSummary requestDataSummary() throws IOException { } protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) { - return MlClientHelper.execute(context.headers, client, searchRequestBuilder::get); + return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get); } private Optional getNextStream() throws IOException { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java index 57601406e7117..24174730e2d3b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java @@ -20,7 +20,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.fetch.StoredFieldsContext; import org.elasticsearch.search.sort.SortOrder; -import org.elasticsearch.xpack.core.ml.MlClientHelper; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; import org.elasticsearch.xpack.ml.utils.DomainSplitFunction; @@ -100,7 +100,7 @@ protected InputStream initScroll(long startTimestamp) throws IOException { } protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) { - return MlClientHelper.execute(context.headers, client, searchRequestBuilder::get); + return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get); } private SearchRequestBuilder buildSearchRequest(long start) { @@ -211,7 +211,8 @@ private void markScrollAsErrored() { } protected SearchResponse executeSearchScrollRequest(String scrollId) { - return MlClientHelper.execute(context.headers, client, () -> SearchScrollAction.INSTANCE.newRequestBuilder(client) + return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, + () -> SearchScrollAction.INSTANCE.newRequestBuilder(client) .setScroll(SCROLL_TIMEOUT) .setScrollId(scrollId) .get()); @@ -226,7 +227,8 @@ private void clearScroll(String scrollId) { if (scrollId != null) { ClearScrollRequest request = new ClearScrollRequest(); request.addScrollId(scrollId); - MlClientHelper.execute(context.headers, client, () -> client.execute(ClearScrollAction.INSTANCE, request).actionGet()); + ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, + () -> client.execute(ClearScrollAction.INSTANCE, request).actionGet()); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java index f4f359580db8e..2c6e0deaebd9f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java @@ -12,12 +12,12 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; import org.elasticsearch.client.Client; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.xpack.core.ml.MlClientHelper; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; -import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.utils.MlStrings; +import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import java.util.Objects; @@ -76,7 +76,7 @@ public static void create(Client client, DatafeedConfig datafeed, Job job, Actio String[] requestFields = job.allInputFields().stream().map(f -> MlStrings.getParentField(f) + "*") .toArray(size -> new String[size]); fieldCapabilitiesRequest.fields(requestFields); - MlClientHelper.execute(datafeed, client, () -> { + ClientHelper. executeWithHeaders(datafeed.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> { client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler); // This response gets discarded - the listener handles the real response return null; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlClientHelperTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlClientHelperTests.java deleted file mode 100644 index 284e746e67db2..0000000000000 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlClientHelperTests.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml; - -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.MlClientHelper; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.core.security.authc.AuthenticationField; -import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField; -import org.junit.Before; - -import java.util.Collections; -import java.util.Map; -import java.util.function.Consumer; - -import static org.elasticsearch.xpack.core.ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME; -import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; -import static org.hamcrest.Matchers.hasEntry; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class MlClientHelperTests extends ESTestCase { - - private Client client = mock(Client.class); - - @Before - public void setupMocks() { - ThreadPool threadPool = mock(ThreadPool.class); - ThreadContext threadContext = new ThreadContext(Settings.EMPTY); - when(threadPool.getThreadContext()).thenReturn(threadContext); - when(client.threadPool()).thenReturn(threadPool); - - PlainActionFuture searchFuture = PlainActionFuture.newFuture(); - searchFuture.onResponse(new SearchResponse()); - when(client.search(any())).thenReturn(searchFuture); - } - - public void testEmptyHeaders() { - DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed-foo", "foo"); - builder.setIndices(Collections.singletonList("foo-index")); - - assertExecutionWithOrigin(builder.build()); - } - - public void testWithHeaders() { - DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed-foo", "foo"); - builder.setIndices(Collections.singletonList("foo-index")); - Map headers = MapBuilder.newMapBuilder() - .put(AuthenticationField.AUTHENTICATION_KEY, "anything") - .put(AuthenticationServiceField.RUN_AS_USER_HEADER, "anything") - .map(); - builder.setHeaders(headers); - - assertRunAsExecution(builder.build(), h -> { - assertThat(h.keySet(), hasSize(2)); - assertThat(h, hasEntry(AuthenticationField.AUTHENTICATION_KEY, "anything")); - assertThat(h, hasEntry(AuthenticationServiceField.RUN_AS_USER_HEADER, "anything")); - }); - } - - public void testFilteredHeaders() { - DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed-foo", "foo"); - builder.setIndices(Collections.singletonList("foo-index")); - Map unrelatedHeaders = MapBuilder.newMapBuilder() - .put(randomAlphaOfLength(10), "anything") - .map(); - builder.setHeaders(unrelatedHeaders); - - assertRunAsExecution(builder.build(), h -> assertThat(h.keySet(), hasSize(0))); - } - - /** - * This method executes a search and checks if the thread context was enriched with the ml origin - */ - private void assertExecutionWithOrigin(DatafeedConfig datafeedConfig) { - MlClientHelper.execute(datafeedConfig, client, () -> { - Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME); - assertThat(origin, is(ML_ORIGIN)); - - // Check that headers are not set - Map headers = client.threadPool().getThreadContext().getHeaders(); - assertThat(headers, not(hasEntry(AuthenticationField.AUTHENTICATION_KEY, "anything"))); - assertThat(headers, not(hasEntry(AuthenticationServiceField.RUN_AS_USER_HEADER, "anything"))); - - return client.search(new SearchRequest()).actionGet(); - }); - } - - /** - * This method executes a search and ensures no stashed origin thread context was created, so that the regular node - * client was used, to emulate a run_as function - */ - public void assertRunAsExecution(DatafeedConfig datafeedConfig, Consumer> consumer) { - MlClientHelper.execute(datafeedConfig, client, () -> { - Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME); - assertThat(origin, is(nullValue())); - - consumer.accept(client.threadPool().getThreadContext().getHeaders()); - return client.search(new SearchRequest()).actionGet(); - }); - } -} diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupClientHelper.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupClientHelper.java deleted file mode 100644 index 20e4ba120cd8c..0000000000000 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupClientHelper.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.rollup.job; - -import org.elasticsearch.action.Action; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.support.ContextPreservingActionListener; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.xpack.core.ClientHelper; -import org.elasticsearch.xpack.core.rollup.job.RollupJob; -import org.elasticsearch.xpack.rollup.Rollup; - -import java.util.Map; -import java.util.function.Supplier; -import java.util.stream.Collectors; - - -/** - * Helper class to execute actions with authentication headers cached in the rollup job (if they exist, otherwise Origin) - */ -public class RollupClientHelper { - - @SuppressWarnings("try") - public static > void executeAsync( - Client client, RollupJob job, Action action, Request request, - ActionListener listener) { - - Map filteredHeaders = job.getHeaders().entrySet().stream() - .filter(e -> Rollup.HEADER_FILTERS.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - final ThreadContext threadContext = client.threadPool().getThreadContext(); - - // No headers (e.g. security not installed/in use) so execute as rollup origin - if (filteredHeaders.isEmpty()) { - ClientHelper.executeAsyncWithOrigin(client, ClientHelper.ROLLUP_ORIGIN, action, request, listener); - } else { - // Otherwise stash the context and copy in the saved headers before executing - final Supplier supplier = threadContext.newRestorableContext(false); - try (ThreadContext.StoredContext ignore = stashWithHeaders(threadContext, filteredHeaders)) { - client.execute(action, request, new ContextPreservingActionListener<>(supplier, listener)); - } - } - } - - private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map headers) { - final ThreadContext.StoredContext storedContext = threadContext.stashContext(); - threadContext.copyHeaders(headers.entrySet()); - return storedContext; - } -} diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java index f357d579c82c5..425629c248c9c 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java @@ -23,6 +23,7 @@ import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction; import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction; @@ -103,12 +104,14 @@ protected class ClientRollupPageManager extends RollupIndexer { @Override protected void doNextSearch(SearchRequest request, ActionListener nextPhase) { - RollupClientHelper.executeAsync(client, job, SearchAction.INSTANCE, request, nextPhase); + ClientHelper.executeWithHeadersAsync(job.getHeaders(), ClientHelper.ROLLUP_ORIGIN, client, SearchAction.INSTANCE, request, + nextPhase); } @Override protected void doNextBulk(BulkRequest request, ActionListener nextPhase) { - RollupClientHelper.executeAsync(client, job, BulkAction.INSTANCE, request, nextPhase); + ClientHelper.executeWithHeadersAsync(job.getHeaders(), ClientHelper.ROLLUP_ORIGIN, client, BulkAction.INSTANCE, request, + nextPhase); } @Override diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupClientHelperTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupClientHelperTests.java deleted file mode 100644 index b2d098d458ea0..0000000000000 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupClientHelperTests.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.rollup.job; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.SearchAction; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.rollup.job.RollupJob; -import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig; -import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.CountDownLatch; - -import static org.hamcrest.Matchers.equalTo; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class RollupClientHelperTests extends ESTestCase { - - @SuppressWarnings("unchecked") - public void testNoHeaders() throws InterruptedException { - final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); - final Client client = mock(Client.class); - final ThreadPool threadPool = mock(ThreadPool.class); - when(client.threadPool()).thenReturn(threadPool); - when(threadPool.getThreadContext()).thenReturn(threadContext); - - final CountDownLatch latch = new CountDownLatch(2); - final ActionListener listener = ActionListener.wrap(v -> { - assertTrue(threadContext.getHeaders().isEmpty()); - latch.countDown(); - }, e -> fail(e.getMessage())); - - doAnswer(invocationOnMock -> { - assertTrue(threadContext.getHeaders().isEmpty()); - latch.countDown(); - ((ActionListener)invocationOnMock.getArguments()[2]).onResponse(null); - return null; - }).when(client).execute(anyObject(), anyObject(), anyObject()); - - SearchRequest request = new SearchRequest("foo"); - - RollupJobConfig config = ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build(); - RollupJob job = new RollupJob(config, Collections.emptyMap()); - - RollupClientHelper.executeAsync(client, job, SearchAction.INSTANCE, request, listener); - - latch.await(); - } - - @SuppressWarnings("unchecked") - public void testWrongHeaders() throws InterruptedException { - final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); - final Client client = mock(Client.class); - final ThreadPool threadPool = mock(ThreadPool.class); - when(client.threadPool()).thenReturn(threadPool); - when(threadPool.getThreadContext()).thenReturn(threadContext); - - final CountDownLatch latch = new CountDownLatch(2); - final ActionListener listener = ActionListener.wrap(v -> { - assertTrue(threadContext.getHeaders().isEmpty()); - latch.countDown(); - }, e -> fail(e.getMessage())); - - doAnswer(invocationOnMock -> { - assertTrue(threadContext.getHeaders().isEmpty()); - latch.countDown(); - ((ActionListener)invocationOnMock.getArguments()[2]).onResponse(null); - return null; - }).when(client).execute(anyObject(), anyObject(), anyObject()); - - SearchRequest request = new SearchRequest("foo"); - Map headers = new HashMap<>(1); - headers.put("foo", "foo"); - headers.put("bar", "bar"); - RollupJobConfig config = ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build(); - RollupJob job = new RollupJob(config, headers); - - RollupClientHelper.executeAsync(client, job, SearchAction.INSTANCE, request, listener); - - latch.await(); - } - - @SuppressWarnings("unchecked") - public void testWithHeaders() throws Exception { - final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); - final Client client = mock(Client.class); - final ThreadPool threadPool = mock(ThreadPool.class); - when(client.threadPool()).thenReturn(threadPool); - when(threadPool.getThreadContext()).thenReturn(threadContext); - - final CountDownLatch latch = new CountDownLatch(2); - final ActionListener listener = ActionListener.wrap(v -> { - assertTrue(threadContext.getHeaders().isEmpty()); - latch.countDown(); - }, e -> fail(e.getMessage())); - - doAnswer(invocationOnMock -> { - assertThat(threadContext.getHeaders().size(), equalTo(2)); - assertThat(threadContext.getHeaders().get("es-security-runas-user"), equalTo("foo")); - assertThat(threadContext.getHeaders().get("_xpack_security_authentication"), equalTo("bar")); - latch.countDown(); - ((ActionListener)invocationOnMock.getArguments()[2]).onResponse(null); - return null; - }).when(client).execute(anyObject(), anyObject(), anyObject()); - - SearchRequest request = new SearchRequest("foo"); - Map headers = new HashMap<>(1); - headers.put("es-security-runas-user", "foo"); - headers.put("_xpack_security_authentication", "bar"); - RollupJobConfig config = ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build(); - RollupJob job = new RollupJob(config, headers); - - RollupClientHelper.executeAsync(client, job, SearchAction.INSTANCE, request, listener); - - latch.await(); - } - -} diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index 7f42777bb5faf..39760bb4dc7b9 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -206,10 +206,6 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin { public static final Setting MAX_STOP_TIMEOUT_SETTING = Setting.timeSetting("xpack.watcher.stop.timeout", TimeValue.timeValueSeconds(30), Setting.Property.NodeScope); - // list of headers that will be stored when a watch is stored - public static final Set HEADER_FILTERS = - new HashSet<>(Arrays.asList("es-security-runas-user", "_xpack_security_authentication")); - public static final ScriptContext SCRIPT_SEARCH_CONTEXT = new ScriptContext<>("xpack", SearchScript.Factory.class); // TODO: remove this context when each xpack script use case has their own contexts diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherClientHelper.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherClientHelper.java deleted file mode 100644 index 1019f5a423e98..0000000000000 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherClientHelper.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.watcher; - -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.xpack.core.watcher.watch.Watch; - -import java.util.Map; -import java.util.function.Supplier; -import java.util.stream.Collectors; - -import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN; -import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; - -/** - * A helper class which decides if we should run via the xpack user and set watcher as origin or - * if we should use the run_as functionality by setting the correct headers - */ -public class WatcherClientHelper { - - /** - * Execute a client operation and return the response, try to run with least privileges, when headers exist - * - * @param watch The watch in which context this method gets executed in - * @param client The client used to query - * @param supplier The action to run - * @param The client response class this should return - * @return An instance of the response class - */ - public static T execute(Watch watch, Client client, Supplier supplier) { - // no headers, we will have to use the xpack internal user for our execution by specifying the watcher origin - if (watch.status().getHeaders().isEmpty()) { - try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { - return supplier.get(); - } - } else { - try (ThreadContext.StoredContext ignored = client.threadPool().getThreadContext().stashContext()) { - Map filteredHeaders = watch.status().getHeaders().entrySet().stream() - .filter(e -> Watcher.HEADER_FILTERS.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - client.threadPool().getThreadContext().copyHeaders(filteredHeaders.entrySet()); - return supplier.get(); - } - } - } -} diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java index e49732f0cb543..a156e68a4b1ed 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.watcher.actions.Action; import org.elasticsearch.xpack.core.watcher.actions.Action.Result.Status; import org.elasticsearch.xpack.core.watcher.actions.ExecutableAction; @@ -24,7 +25,6 @@ import org.elasticsearch.xpack.core.watcher.support.WatcherDateTimeUtils; import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource; import org.elasticsearch.xpack.core.watcher.watch.Payload; -import org.elasticsearch.xpack.watcher.WatcherClientHelper; import org.elasticsearch.xpack.watcher.support.ArrayObjectIterator; import org.joda.time.DateTime; @@ -96,7 +96,7 @@ public Action.Result execute(String actionId, WatchExecutionContext ctx, Payload new XContentSource(indexRequest.source(), XContentType.JSON)); } - IndexResponse response = WatcherClientHelper.execute(ctx.watch(), client, + IndexResponse response = ClientHelper.executeWithHeaders(ctx.watch().status().getHeaders(), ClientHelper.WATCHER_ORIGIN, client, () -> client.index(indexRequest).actionGet(indexDefaultTimeout)); try (XContentBuilder builder = jsonBuilder()) { indexResponseToXContent(builder, response); @@ -137,7 +137,7 @@ Action.Result indexBulk(Iterable list, String actionId, WatchExecutionContext ct } bulkRequest.add(indexRequest); } - BulkResponse bulkResponse = WatcherClientHelper.execute(ctx.watch(), client, + BulkResponse bulkResponse = ClientHelper.executeWithHeaders(ctx.watch().status().getHeaders(), ClientHelper.WATCHER_ORIGIN, client, () -> client.bulk(bulkRequest).actionGet(bulkDefaultTimeout)); try (XContentBuilder jsonBuilder = jsonBuilder().startArray()) { for (BulkItemResponse item : bulkResponse) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/input/search/ExecutableSearchInput.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/input/search/ExecutableSearchInput.java index 83a4f1f85e732..4aced1b6c0398 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/input/search/ExecutableSearchInput.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/input/search/ExecutableSearchInput.java @@ -20,10 +20,10 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext; import org.elasticsearch.xpack.core.watcher.input.ExecutableInput; import org.elasticsearch.xpack.core.watcher.watch.Payload; -import org.elasticsearch.xpack.watcher.WatcherClientHelper; import org.elasticsearch.xpack.watcher.support.XContentFilterKeysUtils; import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest; import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService; @@ -71,8 +71,8 @@ SearchInput.Result doExecute(WatchExecutionContext ctx, WatcherSearchTemplateReq } SearchRequest searchRequest = searchTemplateService.toSearchRequest(request); - final SearchResponse response = WatcherClientHelper.execute(ctx.watch(), client, - () -> client.search(searchRequest).actionGet(timeout)); + final SearchResponse response = ClientHelper.executeWithHeaders(ctx.watch().status().getHeaders(), ClientHelper.WATCHER_ORIGIN, + client, () -> client.search(searchRequest).actionGet(timeout)); if (logger.isDebugEnabled()) { logger.debug("[{}] found [{}] hits", ctx.id(), response.getHits().getTotalHits()); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transform/search/ExecutableSearchTransform.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transform/search/ExecutableSearchTransform.java index 03dbf88fb0d80..1b408bc5e6463 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transform/search/ExecutableSearchTransform.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transform/search/ExecutableSearchTransform.java @@ -15,10 +15,10 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.script.Script; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext; import org.elasticsearch.xpack.core.watcher.transform.ExecutableTransform; import org.elasticsearch.xpack.core.watcher.watch.Payload; -import org.elasticsearch.xpack.watcher.WatcherClientHelper; import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest; import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService; @@ -49,7 +49,8 @@ public SearchTransform.Result execute(WatchExecutionContext ctx, Payload payload // We need to make a copy, so that we don't modify the original instance that we keep around in a watch: request = new WatcherSearchTemplateRequest(transform.getRequest(), new BytesArray(renderedTemplate)); SearchRequest searchRequest = searchTemplateService.toSearchRequest(request); - SearchResponse resp = WatcherClientHelper.execute(ctx.watch(), client, () -> client.search(searchRequest).actionGet(timeout)); + SearchResponse resp = ClientHelper.executeWithHeaders(ctx.watch().status().getHeaders(), ClientHelper.WATCHER_ORIGIN, client, + () -> client.search(searchRequest).actionGet(timeout)); return new SearchTransform.Result(request, new Payload.XContent(resp)); } catch (Exception e) { logger.error((Supplier) () -> new ParameterizedMessage("failed to execute [{}] transform for [{}]", TYPE, ctx.id()), e); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java index d836507596b74..7dcca20e2019e 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java @@ -22,12 +22,12 @@ import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.watcher.support.xcontent.WatcherParams; import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchAction; import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchRequest; import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchResponse; import org.elasticsearch.xpack.core.watcher.watch.Watch; -import org.elasticsearch.xpack.watcher.Watcher; import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.watch.WatchParser; @@ -90,7 +90,7 @@ protected void masterOperation(PutWatchRequest request, ClusterState state, // ensure we only filter for the allowed headers Map filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream() - .filter(e -> Watcher.HEADER_FILTERS.contains(e.getKey())) + .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); watch.status().setHeaders(filteredHeaders); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherClientHelperTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherClientHelperTests.java deleted file mode 100644 index f1908ccefc2ec..0000000000000 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherClientHelperTests.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.watcher; - -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext; -import org.elasticsearch.xpack.watcher.test.WatchExecutionContextMockBuilder; -import org.junit.Before; - -import java.util.Collections; -import java.util.Map; -import java.util.function.Consumer; - -import static org.elasticsearch.xpack.core.ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME; -import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN; -import static org.hamcrest.Matchers.hasEntry; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class WatcherClientHelperTests extends ESTestCase { - - private Client client = mock(Client.class); - - @Before - public void setupMocks() { - PlainActionFuture searchFuture = PlainActionFuture.newFuture(); - searchFuture.onResponse(new SearchResponse()); - when(client.search(any())).thenReturn(searchFuture); - - ThreadPool threadPool = mock(ThreadPool.class); - ThreadContext threadContext = new ThreadContext(Settings.EMPTY); - when(threadPool.getThreadContext()).thenReturn(threadContext); - when(client.threadPool()).thenReturn(threadPool); - } - - public void testEmptyHeaders() { - WatchExecutionContext ctx = new WatchExecutionContextMockBuilder("_id").buildMock(); - when(ctx.watch().status().getHeaders()).thenReturn(Collections.emptyMap()); - - assertExecutionWithOrigin(ctx); - } - - public void testWithHeaders() { - WatchExecutionContext ctx = new WatchExecutionContextMockBuilder("_id").buildMock(); - Map watchStatusHeaders = MapBuilder.newMapBuilder() - .put("es-security-runas-user", "anything") - .put("_xpack_security_authentication", "anything") - .map(); - when(ctx.watch().status().getHeaders()).thenReturn(watchStatusHeaders); - - assertRunAsExecution(ctx, headers -> { - assertThat(headers.keySet(), hasSize(2)); - assertThat(headers, hasEntry("es-security-runas-user", "anything")); - assertThat(headers, hasEntry("_xpack_security_authentication", "anything")); - }); - } - - public void testFilteredHeaders() { - WatchExecutionContext ctx = new WatchExecutionContextMockBuilder("_id").buildMock(); - Map watchStatusHeaders = MapBuilder.newMapBuilder() - .put(randomAlphaOfLength(10), "anything") - .map(); - when(ctx.watch().status().getHeaders()).thenReturn(watchStatusHeaders); - - assertRunAsExecution(ctx, headers -> { - assertThat(headers.keySet(), hasSize(0)); - }); - } - - /** - * This method executes a search and checks if the thread context was enriched with the watcher origin - */ - private void assertExecutionWithOrigin(WatchExecutionContext ctx) { - WatcherClientHelper.execute(ctx.watch(), client, () -> { - Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME); - assertThat(origin, is(WATCHER_ORIGIN)); - - // check that headers are not set - Map headers = client.threadPool().getThreadContext().getHeaders(); - assertThat(headers, not(hasEntry("es-security-runas-user", "anything"))); - assertThat(headers, not(hasEntry("_xpack_security_authentication", "anything"))); - - return client.search(new SearchRequest()).actionGet(); - }); - - } - - /** - * This method executes a search and ensures no stashed origin thread context was created, so that the regular node - * client was used, to emulate a run_as function - */ - public void assertRunAsExecution(WatchExecutionContext ctx, Consumer> consumer) { - WatcherClientHelper.execute(ctx.watch(), client, () -> { - Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME); - assertThat(origin, is(nullValue())); - - Map headers = client.threadPool().getThreadContext().getHeaders(); - consumer.accept(headers); - return client.search(new SearchRequest()).actionGet(); - }); - - } -} \ No newline at end of file diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java index 9a41c9b7aabbf..ce223b1c9fd15 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java @@ -21,10 +21,10 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchRequest; import org.elasticsearch.xpack.core.watcher.watch.ClockMock; import org.elasticsearch.xpack.core.watcher.watch.Watch; -import org.elasticsearch.xpack.watcher.Watcher; import org.elasticsearch.xpack.watcher.test.WatchExecutionContextMockBuilder; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.watch.WatchParser; @@ -86,7 +86,7 @@ public void setupAction() throws Exception { public void testHeadersAreFilteredWhenPuttingWatches() throws Exception { ClusterState state = mock(ClusterState.class); // set up threadcontext with some arbitrary info - String headerName = randomFrom(Watcher.HEADER_FILTERS); + String headerName = randomFrom(ClientHelper.SECURITY_HEADER_FILTERS); threadContext.putHeader(headerName, randomAlphaOfLength(10)); threadContext.putHeader(randomAlphaOfLength(10), "doesntmatter");