diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/200_default_pipeline.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/200_default_pipeline.yml new file mode 100644 index 0000000000000..c20d7698131bf --- /dev/null +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/200_default_pipeline.yml @@ -0,0 +1,73 @@ +--- +teardown: + - do: + ingest.delete_pipeline: + id: "my_pipeline" + ignore: 404 + +--- +"Test index with default pipeline": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "bytes" : { + "field" : "bytes_source_field", + "target_field" : "bytes_target_field" + } + } + ] + } + - match: { acknowledged: true } + + - do: + indices.create: + index: test + body: + settings: + index: + default_pipeline: "my_pipeline" + + - do: + index: + index: test + type: test + id: 1 + body: {bytes_source_field: "1kb"} + + - do: + get: + index: test + type: test + id: 1 + - match: { _source.bytes_source_field: "1kb" } + - match: { _source.bytes_target_field: 1024 } + + - do: + index: + index: test + type: test + id: 2 + pipeline: "_none" + body: {bytes_source_field: "1kb"} + + - do: + get: + index: test + type: test + id: 2 + - match: { _source.bytes_source_field: "1kb" } + - is_false: _source.bytes_target_field + + - do: + catch: bad_request + index: + index: test + type: test + id: 3 + pipeline: "" + body: {bytes_source_field: "1kb"} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 74e73843a993a..67a5f025bfe03 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -544,22 +544,6 @@ private int findNextMarker(byte marker, int from, BytesReference data, int lengt return -1; } - /** - * @return Whether this bulk request contains index request with an ingest pipeline enabled. - */ - public boolean hasIndexRequestsWithPipelines() { - for (DocWriteRequest actionRequest : requests) { - if (actionRequest instanceof IndexRequest) { - IndexRequest indexRequest = (IndexRequest) actionRequest; - if (Strings.hasText(indexRequest.getPipeline())) { - return true; - } - } - } - - return false; - } - @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 417fb132a2830..bf1974ac54077 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -47,6 +47,7 @@ import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -54,6 +55,7 @@ import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexClosedException; @@ -129,7 +131,29 @@ protected final void doExecute(final BulkRequest bulkRequest, final ActionListen @Override protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener listener) { - if (bulkRequest.hasIndexRequestsWithPipelines()) { + boolean hasIndexRequestsWithPipelines = false; + ImmutableOpenMap indicesMetaData = clusterService.state().getMetaData().indices(); + for (DocWriteRequest actionRequest : bulkRequest.requests) { + if (actionRequest instanceof IndexRequest) { + IndexRequest indexRequest = (IndexRequest) actionRequest; + String pipeline = indexRequest.getPipeline(); + if (pipeline == null) { + IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index()); + if (indexMetaData == null) { + indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); + } else { + String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings()); + indexRequest.setPipeline(defaultPipeline); + if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { + hasIndexRequestsWithPipelines = true; + } + } + } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) { + hasIndexRequestsWithPipelines = true; + } + } + } + if (hasIndexRequestsWithPipelines) { if (clusterService.localNode().isIngestNode()) { processBulkIndexIngestRequest(task, bulkRequest, listener); } else { diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 9e7e20ceb17ce..7240b62876c05 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -187,6 +187,10 @@ public ActionRequestValidationException validate() { validationException = addValidationError("an id must be provided if version type or value are set", validationException); } + if (pipeline != null && pipeline.isEmpty()) { + validationException = addValidationError("pipeline cannot be an empty string", validationException); + } + return validationException; } diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index bf33fdb66a02c..df7c670fb2c9a 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -153,6 +153,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { EngineConfig.INDEX_CODEC_SETTING, EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS, IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS, + IndexSettings.DEFAULT_PIPELINE, // validate that built-in similarities don't get redefined Setting.groupSetting("index.similarity.", (s) -> { Map groups = s.getAsGroups(); diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 5cd68e279d776..fc0e586c996f6 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.mapper.AllFieldMapper; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.node.Node; import java.util.Collections; @@ -274,6 +275,14 @@ public final class IndexSettings { Property.Final); } + public static final Setting DEFAULT_PIPELINE = + new Setting<>("index.default_pipeline", IngestService.NOOP_PIPELINE_NAME, s -> { + if (s == null || s.isEmpty()) { + throw new IllegalArgumentException("Value for [index.default_pipeline] must be a non-empty string."); + } + return s; + }, Property.Dynamic, Property.IndexScope); + private final Index index; private final Version version; private final Logger logger; @@ -311,6 +320,7 @@ public final class IndexSettings { private volatile int maxShingleDiff; private volatile int maxAnalyzedOffset; private volatile int maxTermsCount; + private volatile String defaultPipeline; /** * The maximum number of refresh listeners allows on this shard. @@ -434,6 +444,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti throw new AssertionError(index.toString() + "multiple types are only allowed on pre 6.x indices but version is: [" + version + "]"); } + defaultPipeline = scopedSettings.get(DEFAULT_PIPELINE); scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio); scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, mergePolicyConfig::setExpungeDeletesAllowed); @@ -470,6 +481,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll); scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields); scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength); + scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline); } private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) { @@ -830,4 +842,12 @@ public IndexSortConfig getIndexSortConfig() { } public IndexScopedSettings getScopedSettings() { return scopedSettings;} + + public String getDefaultPipeline() { + return defaultPipeline; + } + + public void setDefaultPipeline(String defaultPipeline) { + this.defaultPipeline = defaultPipeline; + } } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 46b11f7ac141d..01bc402e43ba5 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -39,6 +39,9 @@ * Holder class for several ingest related services. */ public class IngestService { + + public static final String NOOP_PIPELINE_NAME = "_none"; + private final PipelineStore pipelineStore; private final PipelineExecutionService pipelineExecutionService; diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java b/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java index d2503476d3353..d0ba8399f7178 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java @@ -24,7 +24,6 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateApplier; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -73,12 +72,16 @@ protected void doRun() throws Exception { UpdateRequest updateRequest = (UpdateRequest) actionRequest; indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest(); } - if (indexRequest != null && Strings.hasText(indexRequest.getPipeline())) { + if (indexRequest == null) { + continue; + } + String pipeline = indexRequest.getPipeline(); + if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) { try { innerExecute(indexRequest, getPipeline(indexRequest.getPipeline())); //this shouldn't be needed here but we do it for consistency with index api // which requires it to prevent double execution - indexRequest.setPipeline(null); + indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); } catch (Exception e) { itemFailureHandler.accept(indexRequest, e); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index 1d6b77fc747c7..1a0e314d88680 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -45,6 +46,7 @@ import static java.util.Collections.emptySet; import static java.util.Collections.singleton; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCase { public void testNonExceptional() { @@ -97,7 +99,11 @@ public void testSomeFail() { private void indicesThatCannotBeCreatedTestCase(Set expected, BulkRequest bulkRequest, Function shouldAutoCreate) { - TransportBulkAction action = new TransportBulkAction(Settings.EMPTY, null, mock(TransportService.class), mock(ClusterService.class), + ClusterService clusterService = mock(ClusterService.class); + ClusterState state = mock(ClusterState.class); + when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA); + when(clusterService.state()).thenReturn(state); + TransportBulkAction action = new TransportBulkAction(Settings.EMPTY, null, mock(TransportService.class), clusterService, null, null, null, mock(ActionFilters.class), null, null) { @Override void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener listener, diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 32dfbe85d426e..892f2cbb61b15 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.bulk; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.index.IndexAction; @@ -28,6 +29,8 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; @@ -35,6 +38,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.PipelineExecutionService; import org.elasticsearch.tasks.Task; @@ -68,6 +72,11 @@ public class TransportBulkActionIngestTests extends ESTestCase { + /** + * Index for which mock settings contain a default pipeline. + */ + private static final String WITH_DEFAULT_PIPELINE = "index_with_default_pipeline"; + /** Services needed by bulk action */ TransportService transportService; ClusterService clusterService; @@ -153,6 +162,15 @@ public void setupAction() { when(nodes.getIngestNodes()).thenReturn(ingestNodes); ClusterState state = mock(ClusterState.class); when(state.getNodes()).thenReturn(nodes); + when(state.getMetaData()).thenReturn(MetaData.builder().indices(ImmutableOpenMap.builder() + .putAll( + Collections.singletonMap( + WITH_DEFAULT_PIPELINE, + IndexMetaData.builder(WITH_DEFAULT_PIPELINE).settings( + settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline") + .build() + ).numberOfShards(1).numberOfReplicas(1).build())) + .build()).build()); when(clusterService.state()).thenReturn(state); doAnswer(invocation -> { ClusterChangedEvent event = mock(ClusterChangedEvent.class); @@ -227,7 +245,7 @@ public void testIngestLocal() throws Exception { // now check success Iterator req = bulkDocsItr.getValue().iterator(); failureHandler.getValue().accept((IndexRequest)req.next(), exception); // have an exception for our one index request - indexRequest2.setPipeline(null); // this is done by the real pipeline execution service when processing + indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing completionHandler.getValue().accept(null); assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one @@ -259,7 +277,7 @@ public void testSingleItemBulkActionIngestLocal() throws Exception { assertTrue(failureCalled.get()); // now check success - indexRequest.setPipeline(null); // this is done by the real pipeline execution service when processing + indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing completionHandler.getValue().accept(null); assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one @@ -359,4 +377,35 @@ public void testSingleItemBulkActionIngestForward() throws Exception { } } + public void testUseDefaultPipeline() throws Exception { + Exception exception = new Exception("fake exception"); + IndexRequest indexRequest = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id"); + indexRequest.source(Collections.emptyMap()); + AtomicBoolean responseCalled = new AtomicBoolean(false); + AtomicBoolean failureCalled = new AtomicBoolean(false); + singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap( + response -> { + responseCalled.set(true); + }, + e -> { + assertThat(e, sameInstance(exception)); + failureCalled.set(true); + })); + + // check failure works, and passes through to the listener + assertFalse(action.isExecuted); // haven't executed yet + assertFalse(responseCalled.get()); + assertFalse(failureCalled.get()); + verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); + completionHandler.getValue().accept(exception); + assertTrue(failureCalled.get()); + + // now check success + indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing + completionHandler.getValue().accept(null); + assertTrue(action.isExecuted); + assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one + verifyZeroInteractions(transportService); + } + } diff --git a/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java b/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java index 6191184ef3f7a..8ffc477c9e533 100644 --- a/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java @@ -204,4 +204,14 @@ public void testToStringSizeLimit() throws UnsupportedEncodingException { assertEquals("index {[index][type][null], source[n/a, actual length: [" + new ByteSizeValue(actualBytes).toString() + "], max length: " + new ByteSizeValue(IndexRequest.MAX_SOURCE_LENGTH_IN_TOSTRING).toString() + "]}", request.toString()); } + + public void testRejectsEmptyStringPipeline() { + IndexRequest request = new IndexRequest("index", "type"); + request.source("{}", XContentType.JSON); + request.setPipeline(""); + ActionRequestValidationException validate = request.validate(); + assertThat(validate, notNullValue()); + assertThat(validate.getMessage(), + containsString("pipeline cannot be an empty string")); + } }