Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INGEST: Enable default pipelines (#32286) #32591

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "my_pipeline"
ignore: 404

---
"Test index with default pipeline":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"bytes" : {
"field" : "bytes_source_field",
"target_field" : "bytes_target_field"
}
}
]
}
- match: { acknowledged: true }

- do:
indices.create:
index: test
body:
settings:
index:
default_pipeline: "my_pipeline"

- do:
index:
index: test
type: test
id: 1
body: {bytes_source_field: "1kb"}

- do:
get:
index: test
type: test
id: 1
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }

- do:
index:
index: test
type: test
id: 2
pipeline: "_none"
body: {bytes_source_field: "1kb"}

- do:
get:
index: test
type: test
id: 2
- match: { _source.bytes_source_field: "1kb" }
- is_false: _source.bytes_target_field

- do:
catch: bad_request
index:
index: test
type: test
id: 3
pipeline: ""
body: {bytes_source_field: "1kb"}
Original file line number Diff line number Diff line change
Expand Up @@ -544,22 +544,6 @@ private int findNextMarker(byte marker, int from, BytesReference data, int lengt
return -1;
}

/**
* @return Whether this bulk request contains index request with an ingest pipeline enabled.
*/
public boolean hasIndexRequestsWithPipelines() {
for (DocWriteRequest actionRequest : requests) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
if (Strings.hasText(indexRequest.getPipeline())) {
return true;
}
}
}

return false;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexClosedException;
Expand Down Expand Up @@ -129,7 +131,29 @@ protected final void doExecute(final BulkRequest bulkRequest, final ActionListen

@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
if (bulkRequest.hasIndexRequestsWithPipelines()) {
boolean hasIndexRequestsWithPipelines = false;
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
String pipeline = indexRequest.getPipeline();
if (pipeline == null) {
IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
if (indexMetaData == null) {
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
} else {
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
indexRequest.setPipeline(defaultPipeline);
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
hasIndexRequestsWithPipelines = true;
}
}
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
hasIndexRequestsWithPipelines = true;
}
}
}
if (hasIndexRequestsWithPipelines) {
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, listener);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ public ActionRequestValidationException validate() {
validationException = addValidationError("an id must be provided if version type or value are set", validationException);
}

if (pipeline != null && pipeline.isEmpty()) {
validationException = addValidationError("pipeline cannot be an empty string", validationException);
}

return validationException;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
EngineConfig.INDEX_CODEC_SETTING,
EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS,
IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS,
IndexSettings.DEFAULT_PIPELINE,
// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Map<String, Settings> groups = s.getAsGroups();
Expand Down
20 changes: 20 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.mapper.AllFieldMapper;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.node.Node;

import java.util.Collections;
Expand Down Expand Up @@ -274,6 +275,14 @@ public final class IndexSettings {
Property.Final);
}

public static final Setting<String> DEFAULT_PIPELINE =
new Setting<>("index.default_pipeline", IngestService.NOOP_PIPELINE_NAME, s -> {
if (s == null || s.isEmpty()) {
throw new IllegalArgumentException("Value for [index.default_pipeline] must be a non-empty string.");
}
return s;
}, Property.Dynamic, Property.IndexScope);

private final Index index;
private final Version version;
private final Logger logger;
Expand Down Expand Up @@ -311,6 +320,7 @@ public final class IndexSettings {
private volatile int maxShingleDiff;
private volatile int maxAnalyzedOffset;
private volatile int maxTermsCount;
private volatile String defaultPipeline;

/**
* The maximum number of refresh listeners allows on this shard.
Expand Down Expand Up @@ -434,6 +444,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
throw new AssertionError(index.toString() + "multiple types are only allowed on pre 6.x indices but version is: ["
+ version + "]");
}
defaultPipeline = scopedSettings.get(DEFAULT_PIPELINE);

scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, mergePolicyConfig::setExpungeDeletesAllowed);
Expand Down Expand Up @@ -470,6 +481,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll);
scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields);
scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength);
scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline);
}

private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {
Expand Down Expand Up @@ -830,4 +842,12 @@ public IndexSortConfig getIndexSortConfig() {
}

public IndexScopedSettings getScopedSettings() { return scopedSettings;}

public String getDefaultPipeline() {
return defaultPipeline;
}

public void setDefaultPipeline(String defaultPipeline) {
this.defaultPipeline = defaultPipeline;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
* Holder class for several ingest related services.
*/
public class IngestService {

public static final String NOOP_PIPELINE_NAME = "_none";

private final PipelineStore pipelineStore;
private final PipelineExecutionService pipelineExecutionService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -73,12 +72,16 @@ protected void doRun() throws Exception {
UpdateRequest updateRequest = (UpdateRequest) actionRequest;
indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
}
if (indexRequest != null && Strings.hasText(indexRequest.getPipeline())) {
if (indexRequest == null) {
continue;
}
String pipeline = indexRequest.getPipeline();
if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
try {
innerExecute(indexRequest, getPipeline(indexRequest.getPipeline()));
//this shouldn't be needed here but we do it for consistency with index api
// which requires it to prevent double execution
indexRequest.setPipeline(null);
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
} catch (Exception e) {
itemFailureHandler.accept(indexRequest, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -45,6 +46,7 @@
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCase {
public void testNonExceptional() {
Expand Down Expand Up @@ -97,7 +99,11 @@ public void testSomeFail() {

private void indicesThatCannotBeCreatedTestCase(Set<String> expected,
BulkRequest bulkRequest, Function<String, Boolean> shouldAutoCreate) {
TransportBulkAction action = new TransportBulkAction(Settings.EMPTY, null, mock(TransportService.class), mock(ClusterService.class),
ClusterService clusterService = mock(ClusterService.class);
ClusterState state = mock(ClusterState.class);
when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
when(clusterService.state()).thenReturn(state);
TransportBulkAction action = new TransportBulkAction(Settings.EMPTY, null, mock(TransportService.class), clusterService,
null, null, null, mock(ActionFilters.class), null, null) {
@Override
void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener<BulkResponse> listener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexAction;
Expand All @@ -28,13 +29,16 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -68,6 +72,11 @@

public class TransportBulkActionIngestTests extends ESTestCase {

/**
* Index for which mock settings contain a default pipeline.
*/
private static final String WITH_DEFAULT_PIPELINE = "index_with_default_pipeline";

/** Services needed by bulk action */
TransportService transportService;
ClusterService clusterService;
Expand Down Expand Up @@ -153,6 +162,15 @@ public void setupAction() {
when(nodes.getIngestNodes()).thenReturn(ingestNodes);
ClusterState state = mock(ClusterState.class);
when(state.getNodes()).thenReturn(nodes);
when(state.getMetaData()).thenReturn(MetaData.builder().indices(ImmutableOpenMap.<String, IndexMetaData>builder()
.putAll(
Collections.singletonMap(
WITH_DEFAULT_PIPELINE,
IndexMetaData.builder(WITH_DEFAULT_PIPELINE).settings(
settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline")
.build()
).numberOfShards(1).numberOfReplicas(1).build()))
.build()).build());
when(clusterService.state()).thenReturn(state);
doAnswer(invocation -> {
ClusterChangedEvent event = mock(ClusterChangedEvent.class);
Expand Down Expand Up @@ -227,7 +245,7 @@ public void testIngestLocal() throws Exception {
// now check success
Iterator<DocWriteRequest> req = bulkDocsItr.getValue().iterator();
failureHandler.getValue().accept((IndexRequest)req.next(), exception); // have an exception for our one index request
indexRequest2.setPipeline(null); // this is done by the real pipeline execution service when processing
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
Expand Down Expand Up @@ -259,7 +277,7 @@ public void testSingleItemBulkActionIngestLocal() throws Exception {
assertTrue(failureCalled.get());

// now check success
indexRequest.setPipeline(null); // this is done by the real pipeline execution service when processing
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
Expand Down Expand Up @@ -359,4 +377,35 @@ public void testSingleItemBulkActionIngestForward() throws Exception {
}
}

public void testUseDefaultPipeline() throws Exception {
Exception exception = new Exception("fake exception");
IndexRequest indexRequest = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id");
indexRequest.source(Collections.emptyMap());
AtomicBoolean responseCalled = new AtomicBoolean(false);
AtomicBoolean failureCalled = new AtomicBoolean(false);
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
response -> {
responseCalled.set(true);
},
e -> {
assertThat(e, sameInstance(exception));
failureCalled.set(true);
}));

// check failure works, and passes through to the listener
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture());
completionHandler.getValue().accept(exception);
assertTrue(failureCalled.get());

// now check success
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyZeroInteractions(transportService);
}

}
Loading