diff --git a/CHANGELOG.md b/CHANGELOG.md index 1524e9fbe4e9b..15ca6605921d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,6 +84,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Extensions] Add IdentityPlugin into core to support Extension identities ([#7246](https://github.com/opensearch-project/OpenSearch/pull/7246)) - Add connectToNodeAsExtension in TransportService ([#6866](https://github.com/opensearch-project/OpenSearch/pull/6866)) - [Search Pipelines] Accept pipelines defined in search source ([#7253](https://github.com/opensearch-project/OpenSearch/pull/7253)) +- [Search Pipelines] Add `default_search_pipeline` index setting ([#7470](https://github.com/opensearch-project/OpenSearch/pull/7470)) - Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244)) - Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237)) - Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375)) @@ -128,4 +129,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x \ No newline at end of file +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x diff --git a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/30_filter_query.yml b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/30_filter_query.yml index 6a69e3893ed4a..730b10d87e6a8 100644 --- a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/30_filter_query.yml +++ b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/30_filter_query.yml @@ -101,3 +101,24 @@ teardown: } - match: { hits.total.value: 1 } - match: { hits.hits.0._id: "1" } + + # Make it the default for the index + - do: + indices.put_settings: + index: test + body: + index.search.default_pipeline: my_pipeline + + - do: + search: + index: test + body: { } + - match: { hits.total.value: 1 } + + # Explicitly bypass the pipeline to match both docs + - do: + search: + search_pipeline: _none + index: test + body: { } + - match: { hits.total.value: 2 } diff --git a/qa/mixed-cluster/build.gradle b/qa/mixed-cluster/build.gradle index 90aeb8faadf80..55f900c52f2c2 100644 --- a/qa/mixed-cluster/build.gradle +++ b/qa/mixed-cluster/build.gradle @@ -64,6 +64,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { numberOfNodes = 4 setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}" + setting 'opensearch.experimental.feature.search_pipeline.enabled', 'true' } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search_pipeline/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search_pipeline/10_basic.yml index 415501ded532e..46a63b79afb31 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search_pipeline/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search_pipeline/10_basic.yml @@ -1,8 +1,8 @@ --- "Test basic pipeline crud": - skip: - version: " - 2.99.99" - reason: "to be introduced in future release / TODO: change if/when we backport to 2.x" + version: " - 2.7.0" + reason: "Added in 2.7.0" - do: search_pipeline.put: id: "my_pipeline" @@ -32,8 +32,8 @@ --- "Test Put Versioned Pipeline": - skip: - version: " - 2.99.99" - reason: "to be introduced in future release / TODO: change if/when we backport to 2.x" + version: " - 2.7.0" + reason: "Added in 2.7.0" - do: search_pipeline.put: id: "my_pipeline" @@ -125,8 +125,8 @@ --- "Test Get All Pipelines": - skip: - version: " - 2.99.99" - reason: "to be introduced in future release / TODO: change if/when we backport to 2.x" + version: " - 2.7.0" + reason: "Added in 2.7.0" - do: search_pipeline.put: id: "first_pipeline" @@ -152,8 +152,8 @@ --- "Test invalid config": - skip: - version: " - 2.99.99" - reason: "to be introduced in future release / TODO: change if/when we backport to 2.x" + version: " - 2.7.0" + reason: "Added in 2.7.0" - do: catch: /parse_exception/ search_pipeline.put: diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 6ef12b550fc5e..734830f99e6fb 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -197,6 +197,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED, IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY, + IndexSettings.DEFAULT_SEARCH_PIPELINE, // Settings for Searchable Snapshots IndexSettings.SEARCHABLE_SNAPSHOT_REPOSITORY, diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 04c97a2f41aaa..b32b667df36a9 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -42,6 +42,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsException; import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; @@ -51,6 +52,7 @@ import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.ingest.IngestService; import org.opensearch.node.Node; +import org.opensearch.search.pipeline.SearchPipelineService; import java.util.Collections; import java.util.List; @@ -62,6 +64,7 @@ import java.util.function.UnaryOperator; import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY; +import static org.opensearch.common.util.FeatureFlags.SEARCH_PIPELINE; import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING; import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING; import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING; @@ -578,6 +581,14 @@ public final class IndexSettings { Property.InternalIndex ); + public static final Setting DEFAULT_SEARCH_PIPELINE = new Setting<>( + "index.search.default_pipeline", + SearchPipelineService.NOOP_PIPELINE_ID, + Function.identity(), + Property.Dynamic, + Property.IndexScope + ); + private final Index index; private final Version version; private final Logger logger; @@ -619,6 +630,8 @@ public final class IndexSettings { private volatile long retentionLeaseMillis; + private volatile String defaultSearchPipeline; + /** * The maximum age of a retention lease before it is considered expired. * @@ -823,6 +836,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti maxFullFlushMergeWaitTime = scopedSettings.get(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME); mergeOnFlushEnabled = scopedSettings.get(INDEX_MERGE_ON_FLUSH_ENABLED); setMergeOnFlushPolicy(scopedSettings.get(INDEX_MERGE_ON_FLUSH_POLICY)); + defaultSearchPipeline = scopedSettings.get(DEFAULT_SEARCH_PIPELINE); scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio); scopedSettings.addSettingsUpdateConsumer( @@ -896,6 +910,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, this::setMaxFullFlushMergeWaitTime); scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_ENABLED, this::setMergeOnFlushEnabled); scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_POLICY, this::setMergeOnFlushPolicy); + scopedSettings.addSettingsUpdateConsumer(DEFAULT_SEARCH_PIPELINE, this::setDefaultSearchPipeline); } private void setSearchSegmentOrderReversed(boolean reversed) { @@ -1577,4 +1592,16 @@ private void setMergeOnFlushPolicy(String policy) { public Optional> getMergeOnFlushPolicy() { return Optional.ofNullable(mergeOnFlushPolicy); } + + public String getDefaultSearchPipeline() { + return defaultSearchPipeline; + } + + public void setDefaultSearchPipeline(String defaultSearchPipeline) { + if (FeatureFlags.isEnabled(SEARCH_PIPELINE)) { + this.defaultSearchPipeline = defaultSearchPipeline; + } else { + throw new SettingsException("Unsupported setting: " + DEFAULT_SEARCH_PIPELINE.getKey()); + } + } } diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java index 845126aa788ea..f96a6eb4a6b76 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -23,6 +23,7 @@ import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateApplier; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterManagerTaskKeys; @@ -30,11 +31,13 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.regex.Regex; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.gateway.GatewayService; +import org.opensearch.index.IndexSettings; import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.ingest.ConfigurationUtils; import org.opensearch.node.ReportingService; @@ -339,6 +342,7 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce return new PipelinedRequest(pipeline, searchRequest); } if (searchRequest.source() != null && searchRequest.source().searchPipelineSource() != null) { + // Pipeline defined in search request (ad hoc pipeline). if (searchRequest.pipeline() != null) { throw new IllegalArgumentException( "Both named and inline search pipeline were specified. Please only specify one or the other." @@ -354,13 +358,28 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce } catch (Exception e) { throw new SearchPipelineProcessingException(e); } - } else if (searchRequest.pipeline() != null) { - String pipelineId = searchRequest.pipeline(); - PipelineHolder pipelineHolder = pipelines.get(pipelineId); - if (pipelineHolder == null) { - throw new IllegalArgumentException("Pipeline " + pipelineId + " is not defined"); + } else { + String pipelineId = NOOP_PIPELINE_ID; + if (searchRequest.pipeline() != null) { + // Named pipeline specified for the request + pipelineId = searchRequest.pipeline(); + } else if (searchRequest.indices() != null && searchRequest.indices().length == 1) { + // Check for index default pipeline + IndexMetadata indexMetadata = state.metadata().index(searchRequest.indices()[0]); + if (indexMetadata != null) { + Settings indexSettings = indexMetadata.getSettings(); + if (IndexSettings.DEFAULT_SEARCH_PIPELINE.exists(indexSettings)) { + pipelineId = IndexSettings.DEFAULT_SEARCH_PIPELINE.get(indexSettings); + } + } + } + if (NOOP_PIPELINE_ID.equals(pipelineId) == false) { + PipelineHolder pipelineHolder = pipelines.get(pipelineId); + if (pipelineHolder == null) { + throw new IllegalArgumentException("Pipeline " + pipelineId + " is not defined"); + } + pipeline = pipelineHolder.pipeline; } - pipeline = pipelineHolder.pipeline; } SearchRequest transformedRequest = pipeline.transformRequest(searchRequest); return new PipelinedRequest(pipeline, transformedRequest); diff --git a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java index 8f65271b9fcc9..bf3c6e15bde6e 100644 --- a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java @@ -46,6 +46,7 @@ import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.translog.Translog; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.search.pipeline.SearchPipelineService; import org.opensearch.test.FeatureFlagSetter; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; @@ -1083,4 +1084,41 @@ public void testExtendedCompatibilityVersionWithoutFeatureFlag() { assertTrue(settings.isRemoteSnapshot()); assertEquals(Version.CURRENT.minimumIndexCompatibilityVersion(), settings.getExtendedCompatibilitySnapshotVersion()); } + + @SuppressForbidden(reason = "sets the SEARCH_PIPELINE feature flag") + public void testDefaultSearchPipeline() throws Exception { + FeatureFlagSetter.set(FeatureFlags.SEARCH_PIPELINE); + IndexMetadata metadata = newIndexMeta( + "index", + Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT).build() + ); + IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY); + assertEquals(SearchPipelineService.NOOP_PIPELINE_ID, settings.getDefaultSearchPipeline()); + metadata = newIndexMeta( + "index", + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexSettings.DEFAULT_SEARCH_PIPELINE.getKey(), "foo") + .build() + ); + settings.updateIndexMetadata(metadata); + assertEquals("foo", settings.getDefaultSearchPipeline()); + } + + public void testDefaultSearchPipelineWithoutFeatureFlag() { + IndexMetadata metadata = newIndexMeta( + "index", + Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT).build() + ); + IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY); + assertEquals(SearchPipelineService.NOOP_PIPELINE_ID, settings.getDefaultSearchPipeline()); + IndexMetadata updatedMetadata = newIndexMeta( + "index", + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexSettings.DEFAULT_SEARCH_PIPELINE.getKey(), "foo") + .build() + ); + assertThrows(SettingsException.class, () -> settings.updateIndexMetadata(updatedMetadata)); + } } diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index 197ef1f160170..36978d5310810 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -24,6 +24,7 @@ import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; @@ -32,6 +33,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.IndexSettings; import org.opensearch.index.query.TermQueryBuilder; import org.opensearch.plugins.SearchPipelinePlugin; import org.opensearch.search.SearchHit; @@ -134,6 +136,47 @@ public void testResolveSearchPipelineDoesNotExist() { assertTrue(e.getMessage(), e.getMessage().contains(" not defined")); } + public void testResolveIndexDefaultPipeline() throws Exception { + SearchPipelineService service = createWithProcessors(); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "p1", + new PipelineConfiguration( + "p1", + new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ] }"), + XContentType.JSON + ) + ) + ); + Settings defaultPipelineSetting = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexSettings.DEFAULT_SEARCH_PIPELINE.getKey(), "p1") + .build(); + IndexMetadata indexMetadata = new IndexMetadata.Builder("my_index").settings(defaultPipelineSetting).build(); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousState = clusterState; + clusterState = ClusterState.builder(clusterState) + .metadata(Metadata.builder().put(indexMetadata, false).putCustom(SearchPipelineMetadata.TYPE, metadata)) + .build(); + + ClusterChangedEvent cce = new ClusterChangedEvent("", clusterState, previousState); + service.applyClusterState(cce); + + SearchRequest searchRequest = new SearchRequest("my_index").source(SearchSourceBuilder.searchSource().size(5)); + PipelinedRequest pipelinedRequest = service.resolvePipeline(searchRequest); + assertEquals("p1", pipelinedRequest.getPipeline().getId()); + assertEquals(10, pipelinedRequest.transformedRequest().source().size()); + + // Bypass the default pipeline + searchRequest.pipeline("_none"); + pipelinedRequest = service.resolvePipeline(searchRequest); + assertEquals("_none", pipelinedRequest.getPipeline().getId()); + assertEquals(5, pipelinedRequest.transformedRequest().source().size()); + } + private static abstract class FakeProcessor implements Processor { private final String type; private final String tag;