From 908c67e63d5a43c1e9c403e16a564e53b3ffed96 Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Mon, 15 Apr 2024 12:32:49 -0700 Subject: [PATCH] Handled default search pipeline for multiple indices Signed-off-by: Owais Kazi --- CHANGELOG.md | 1 + .../action/search/TransportSearchAction.java | 2 +- .../pipeline/SearchPipelineService.java | 25 ++- .../pipeline/SearchPipelineServiceTests.java | 210 +++++++++++++++--- 4 files changed, 202 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5efcfee3d9d9d..eae77efbce3ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959)) - [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174)) - Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179)) +- [Search Pipeline] Handle default pipeline for multiple indices ([#13276](https://github.com/opensearch-project/OpenSearch/pull/13276)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 65cfd35489033..143b01af3f62f 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -457,7 +457,7 @@ private void executeRequest( PipelinedRequest searchRequest; ActionListener listener; try { - searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest); + searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest, indexNameExpressionResolver); listener = searchRequest.transformResponseListener(updatedListener); } catch (Exception e) { updatedListener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java index 2175b5d135394..2bae68b7a25ba 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -23,6 +23,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateApplier; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterManagerTaskKeys; @@ -35,6 +36,7 @@ import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.index.Index; import org.opensearch.core.service.ReportingService; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; @@ -360,7 +362,7 @@ static ClusterState innerDelete(DeleteSearchPipelineRequest request, ClusterStat return newState.build(); } - public PipelinedRequest resolvePipeline(SearchRequest searchRequest) { + public PipelinedRequest resolvePipeline(SearchRequest searchRequest, IndexNameExpressionResolver indexNameExpressionResolver) { Pipeline pipeline = Pipeline.NO_OP_PIPELINE; if (searchRequest.source() != null && searchRequest.source().searchPipelineSource() != null) { @@ -390,13 +392,22 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) { if (searchRequest.pipeline() != null) { // Named pipeline specified for the request pipelineId = searchRequest.pipeline(); - } else if (state != null && searchRequest.indices() != null && searchRequest.indices().length == 1) { + } else if (state != null && searchRequest.indices() != null) { // Check for index default pipeline - IndexMetadata indexMetadata = state.metadata().index(searchRequest.indices()[0]); - if (indexMetadata != null) { - Settings indexSettings = indexMetadata.getSettings(); - if (IndexSettings.DEFAULT_SEARCH_PIPELINE.exists(indexSettings)) { - pipelineId = IndexSettings.DEFAULT_SEARCH_PIPELINE.get(indexSettings); + Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, searchRequest); + for (Index index : concreteIndices) { + IndexMetadata indexMetadata = state.metadata().index(index); + if (indexMetadata != null) { + Settings indexSettings = indexMetadata.getSettings(); + if (IndexSettings.DEFAULT_SEARCH_PIPELINE.exists(indexSettings)) { + String currentPipelineId = IndexSettings.DEFAULT_SEARCH_PIPELINE.get(indexSettings); + if (NOOP_PIPELINE_ID.equals(pipelineId)) { + pipelineId = currentPipelineId; + } else if (pipelineId.equals(currentPipelineId) == false) { + pipelineId = NOOP_PIPELINE_ID; + break; + } + } } } } diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index f5851e669a2da..8b903fc717ca1 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -32,7 +32,9 @@ import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.AliasMetadata; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; @@ -41,6 +43,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.breaker.CircuitBreaker; @@ -96,14 +99,28 @@ public Map> getSearchPhas private ThreadPool threadPool; + private IndexNameExpressionResolver indexNameExpressionResolver; + @Before public void setup() { threadPool = mock(ThreadPool.class); ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); + indexNameExpressionResolver = new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)); when(threadPool.generic()).thenReturn(executorService); when(threadPool.executor(anyString())).thenReturn(executorService); } + + private static IndexMetadata.Builder indexBuilder(String index, Settings additionalSettings) { + return IndexMetadata.builder(index).settings(settings(additionalSettings)); + } + + private static Settings.Builder settings(Settings additionalSettings) { + return settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(additionalSettings); + } + public void testSearchPipelinePlugin() { Client client = mock(Client.class); SearchPipelineService searchPipelineService = new SearchPipelineService( @@ -162,7 +179,7 @@ public void testResolveSearchPipelineDoesNotExist() { final SearchRequest searchRequest = new SearchRequest("_index").pipeline("bar"); IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> searchPipelineService.resolvePipeline(searchRequest) + () -> searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver) ); assertTrue(e.getMessage(), e.getMessage().contains(" not defined")); } @@ -197,13 +214,13 @@ public void testResolveIndexDefaultPipeline() throws Exception { service.applyClusterState(cce); SearchRequest searchRequest = new SearchRequest("my_index").source(SearchSourceBuilder.searchSource().size(5)); - PipelinedRequest pipelinedRequest = syncTransformRequest(service.resolvePipeline(searchRequest)); + PipelinedRequest pipelinedRequest = syncTransformRequest(service.resolvePipeline(searchRequest, indexNameExpressionResolver)); assertEquals("p1", pipelinedRequest.getPipeline().getId()); assertEquals(10, pipelinedRequest.source().size()); // Bypass the default pipeline searchRequest.pipeline("_none"); - pipelinedRequest = service.resolvePipeline(searchRequest); + pipelinedRequest = service.resolvePipeline(searchRequest, indexNameExpressionResolver); assertEquals("_none", pipelinedRequest.getPipeline().getId()); assertEquals(5, pipelinedRequest.source().size()); } @@ -598,16 +615,18 @@ public void testTransformRequest() throws Exception { int size = 10; SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(new TermQueryBuilder("foo", "bar")).size(size); - SearchRequest request = new SearchRequest("_index").source(sourceBuilder).pipeline("p1"); + SearchRequest request = new SearchRequest("index").source(sourceBuilder).pipeline("p1"); - PipelinedRequest pipelinedRequest = syncTransformRequest(searchPipelineService.resolvePipeline(request)); + PipelinedRequest pipelinedRequest = syncTransformRequest( + searchPipelineService.resolvePipeline(request, indexNameExpressionResolver) + ); assertEquals(2 * size, pipelinedRequest.source().size()); assertEquals(size, request.source().size()); // This request doesn't specify a pipeline, it doesn't get transformed. request = new SearchRequest("_index").source(sourceBuilder); - pipelinedRequest = searchPipelineService.resolvePipeline(request); + pipelinedRequest = searchPipelineService.resolvePipeline(request, indexNameExpressionResolver); assertEquals(size, pipelinedRequest.source().size()); } @@ -643,13 +662,13 @@ public void testTransformResponse() throws Exception { // First try without specifying a pipeline, which should be a no-op. SearchRequest searchRequest = new SearchRequest(); - PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); SearchResponse notTransformedResponse = syncTransformResponse(pipelinedRequest, searchResponse); assertSame(searchResponse, notTransformedResponse); // Now apply a pipeline searchRequest = new SearchRequest().pipeline("p1"); - pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); SearchResponse transformedResponse = syncTransformResponse(pipelinedRequest, searchResponse); assertEquals(size, transformedResponse.getHits().getHits().length); for (int i = 0; i < size; i++) { @@ -736,7 +755,7 @@ public void testTransformSearchPhase() { // First try without specifying a pipeline, which should be a no-op. SearchRequest searchRequest = new SearchRequest(); - PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); AtomicArray notTransformedSearchPhaseResults = searchPhaseResults.getAtomicArray(); pipelinedRequest.transformSearchPhaseResults( searchPhaseResults, @@ -748,7 +767,7 @@ public void testTransformSearchPhase() { // Now set the pipeline as p1 searchRequest = new SearchRequest().pipeline("p1"); - pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); pipelinedRequest.transformSearchPhaseResults( searchPhaseResults, @@ -766,7 +785,7 @@ public void testTransformSearchPhase() { // Check Processor doesn't run for between other phases searchRequest = new SearchRequest().pipeline("p1"); - pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); AtomicArray notTransformedSearchPhaseResult = searchPhaseResults.getAtomicArray(); pipelinedRequest.transformSearchPhaseResults( searchPhaseResults, @@ -916,7 +935,9 @@ public void testInlinePipeline() throws Exception { SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); // Verify pipeline - PipelinedRequest pipelinedRequest = syncTransformRequest(searchPipelineService.resolvePipeline(searchRequest)); + PipelinedRequest pipelinedRequest = syncTransformRequest( + searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver) + ); Pipeline pipeline = pipelinedRequest.getPipeline(); assertEquals(SearchPipelineService.AD_HOC_PIPELINE_ID, pipeline.getId()); assertEquals(1, pipeline.getSearchRequestProcessors().size()); @@ -961,7 +982,10 @@ public void testExceptionOnPipelineCreation() { SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); // Exception thrown when creating the pipeline - expectThrows(SearchPipelineProcessingException.class, () -> searchPipelineService.resolvePipeline(searchRequest)); + expectThrows( + SearchPipelineProcessingException.class, + () -> searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver) + ); } @@ -989,7 +1013,7 @@ public void testExceptionOnRequestProcessing() { // Exception thrown when processing the request expectThrows( SearchPipelineProcessingException.class, - () -> syncTransformRequest(searchPipelineService.resolvePipeline(searchRequest)) + () -> syncTransformRequest(searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver)) ); } @@ -1014,7 +1038,7 @@ public void testExceptionOnResponseProcessing() throws Exception { SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().size(100).searchPipelineSource(pipelineSourceMap); SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); - PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); // Exception thrown when processing response @@ -1052,7 +1076,7 @@ public void testCatchExceptionOnRequestProcessing() throws Exception { "The exception from request processor [throwing_request] in the search pipeline [_ad_hoc_pipeline] was ignored" ) ); - syncTransformRequest(searchPipelineService.resolvePipeline(searchRequest)); + syncTransformRequest(searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver)); mockAppender.assertAllExpectationsMatched(); } } @@ -1078,7 +1102,7 @@ public void testCatchExceptionOnResponseProcessing() throws Exception { SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().size(100).searchPipelineSource(pipelineSourceMap); SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); - PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); @@ -1122,15 +1146,27 @@ public void testStats() throws Exception { SearchRequest request = new SearchRequest(); SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); - syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("good_request_pipeline")), response); + syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("good_request_pipeline"), indexNameExpressionResolver), + response + ); expectThrows( SearchPipelineProcessingException.class, - () -> syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("bad_request_pipeline")), response) + () -> syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("bad_request_pipeline"), indexNameExpressionResolver), + response + ) + ); + syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("good_response_pipeline"), indexNameExpressionResolver), + response ); - syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("good_response_pipeline")), response); expectThrows( SearchPipelineProcessingException.class, - () -> syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("bad_response_pipeline")), response) + () -> syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("bad_response_pipeline"), indexNameExpressionResolver), + response + ) ); SearchPipelineStats stats = searchPipelineService.stats(); @@ -1208,12 +1244,24 @@ public void testStatsEnabledIgnoreFailure() throws Exception { SearchRequest request = new SearchRequest(); SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); - syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("good_request_pipeline")), response); + syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("good_request_pipeline"), indexNameExpressionResolver), + response + ); // Caught Exception here - syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("bad_request_pipeline")), response); - syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("good_response_pipeline")), response); + syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("bad_request_pipeline"), indexNameExpressionResolver), + response + ); + syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("good_response_pipeline"), indexNameExpressionResolver), + response + ); // Caught Exception here - syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("bad_response_pipeline")), response); + syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("bad_response_pipeline"), indexNameExpressionResolver), + response + ); // when ignoreFailure enabled, the search pipelines will all succeed. SearchPipelineStats stats = searchPipelineService.stats(); @@ -1355,7 +1403,10 @@ public void testAdHocRejectingProcessor() { SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().searchPipelineSource(pipelineSourceMap); SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); - expectThrows(SearchPipelineProcessingException.class, () -> searchPipelineService.resolvePipeline(searchRequest)); + expectThrows( + SearchPipelineProcessingException.class, + () -> searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver) + ); } public void testExtraParameterInProcessorConfig() { @@ -1369,7 +1420,7 @@ public void testExtraParameterInProcessorConfig() { SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().searchPipelineSource(pipelineSourceMap); SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); try { - searchPipelineService.resolvePipeline(searchRequest); + searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); fail("Exception should have been thrown"); } catch (SearchPipelineProcessingException e) { assertTrue( @@ -1462,10 +1513,113 @@ public void testStatefulProcessors() throws Exception { .build(); searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousState)); - PipelinedRequest request = searchPipelineService.resolvePipeline(new SearchRequest().pipeline("p1")); + PipelinedRequest request = searchPipelineService.resolvePipeline(new SearchRequest().pipeline("p1"), indexNameExpressionResolver); assertNull(contextHolder.get()); syncExecutePipeline(request, new SearchResponse(null, null, 0, 0, 0, 0, null, null)); assertNotNull(contextHolder.get()); assertEquals("b", contextHolder.get()); } + + public void testDefaultPipelineForMultipleIndices() throws Exception { + SearchPipelineService service = createWithProcessors(); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "p1", + new PipelineConfiguration( + "p1", + new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ] }"), + MediaTypeRegistry.JSON + ) + ) + ); + + Settings defaultPipelineSetting = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexSettings.DEFAULT_SEARCH_PIPELINE.getKey(), "p1") + .build(); + + Metadata.Builder mdBuilder = Metadata.builder() + .put(indexBuilder("foo", defaultPipelineSetting).putAlias(AliasMetadata.builder("bar"))) + .put(indexBuilder("foobar", defaultPipelineSetting).putAlias(AliasMetadata.builder("bar"))) + .put(indexBuilder("foofoo-closed", defaultPipelineSetting).putAlias(AliasMetadata.builder("bar"))) + .put(indexBuilder("foofoo", defaultPipelineSetting).putAlias(AliasMetadata.builder("bar"))); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousState = clusterState; + + clusterState = ClusterState.builder(clusterState).metadata(mdBuilder.putCustom(SearchPipelineMetadata.TYPE, metadata)).build(); + + ClusterChangedEvent cce = new ClusterChangedEvent("", clusterState, previousState); + service.applyClusterState(cce); + + SearchRequest searchRequest = new SearchRequest("bar").source(SearchSourceBuilder.searchSource().size(5)); + PipelinedRequest pipelinedRequest = syncTransformRequest(service.resolvePipeline(searchRequest, indexNameExpressionResolver)); + assertEquals("p1", pipelinedRequest.getPipeline().getId()); + assertEquals(10, pipelinedRequest.source().size()); + + // Bypass the default pipeline + searchRequest.pipeline("_none"); + pipelinedRequest = service.resolvePipeline(searchRequest, indexNameExpressionResolver); + assertEquals("_none", pipelinedRequest.getPipeline().getId()); + assertEquals(5, pipelinedRequest.source().size()); + } + + public void testDifferentDefaultPipelineForMultipleIndices() throws Exception { + SearchPipelineService service = createWithProcessors(); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "p1", + new PipelineConfiguration( + "p1", + new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ] }"), + MediaTypeRegistry.JSON + ), + + "p2", + new PipelineConfiguration( + "p2", + new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 1 } } ] }"), + MediaTypeRegistry.JSON + ) + ) + ); + + Settings defaultPipelineSetting1 = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexSettings.DEFAULT_SEARCH_PIPELINE.getKey(), "p1") + .build(); + + Settings defaultPipelineSetting2 = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexSettings.DEFAULT_SEARCH_PIPELINE.getKey(), "p2") + .build(); + + Metadata.Builder mdBuilder = Metadata.builder() + .put(indexBuilder("foo", defaultPipelineSetting1).putAlias(AliasMetadata.builder("bar"))) + .put(indexBuilder("foobar", defaultPipelineSetting1).putAlias(AliasMetadata.builder("bar"))) + .put(indexBuilder("foofoo-closed", defaultPipelineSetting1).putAlias(AliasMetadata.builder("bar"))) + .put(indexBuilder("foofoo", defaultPipelineSetting2).putAlias(AliasMetadata.builder("bar"))); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousState = clusterState; + + clusterState = ClusterState.builder(clusterState).metadata(mdBuilder.putCustom(SearchPipelineMetadata.TYPE, metadata)).build(); + + ClusterChangedEvent cce = new ClusterChangedEvent("", clusterState, previousState); + service.applyClusterState(cce); + + SearchRequest searchRequest = new SearchRequest("bar").source(SearchSourceBuilder.searchSource().size(5)); + PipelinedRequest pipelinedRequest = syncTransformRequest(service.resolvePipeline(searchRequest, indexNameExpressionResolver)); + assertEquals("_none", pipelinedRequest.getPipeline().getId()); + assertEquals(5, pipelinedRequest.source().size()); + } + }