Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest: support default pipelines + bulk upserts #36618

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ teardown:
]
}
- match: { acknowledged: true }

# default pipeline via index
- do:
indices.create:
index: test
Expand All @@ -48,7 +48,7 @@ teardown:
id: 1
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }

# default pipeline via alias
- do:
index:
index: test_alias
Expand All @@ -63,28 +63,117 @@ teardown:
id: 2
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# default pipeline via upsert
- do:
update:
index: test
type: test
id: 3
body:
script:
source: "ctx._source.ran_script = true"
lang: "painless"
upsert: { "bytes_source_field":"1kb" }
- do:
get:
index: test
type: test
id: 3
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# default pipeline via scripted upsert
- do:
update:
index: test
type: test
id: 4
body:
script:
source: "ctx._source.bytes_source_field = '1kb'"
lang: "painless"
upsert : {}
scripted_upsert: true
- do:
get:
index: test
type: test
id: 4
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# default pipeline via doc_as_upsert
- do:
update:
index: test
type: test
id: 5
body:
doc: { "bytes_source_field":"1kb" }
doc_as_upsert: true
- do:
get:
index: test
type: test
id: 5
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# default pipeline via bulk upsert
# note - bulk scripted upsert's execute the pipeline before the script, so any data referenced by the pipeline
# needs to be in the upsert, not the script
- do:
bulk:
refresh: true
body: |
{"update":{"_id":"6","_index":"test","_type":"test"}}
{"script":{"source":"ctx._source.ran_script = true"},"upsert":{"bytes_source_field":"1kb"}}
jakelandis marked this conversation as resolved.
Show resolved Hide resolved
{"update":{"_id":"7","_index":"test","_type":"test"}}
{"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true}
{"update":{"_id":"8","_index":"test","_type":"test"}}
{"script":{"source":"ctx._source.ran_script = true"},"upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true}

- do:
mget:
body:
docs:
- { _index: "test", _type: "_doc", _id: "6" }
- { _index: "test", _type: "_doc", _id: "7" }
- { _index: "test", _type: "_doc", _id: "8" }
- match: { docs.0._index: "test" }
- match: { docs.0._id: "6" }
- match: { docs.0._source.bytes_source_field: "1kb" }
- match: { docs.0._source.bytes_target_field: 1024 }
- is_false: docs.0._source.ran_script
- match: { docs.1._index: "test" }
- match: { docs.1._id: "7" }
- match: { docs.1._source.bytes_source_field: "2kb" }
- match: { docs.1._source.bytes_target_field: 2048 }
- match: { docs.2._index: "test" }
- match: { docs.2._id: "8" }
- match: { docs.2._source.bytes_source_field: "3kb" }
- match: { docs.2._source.bytes_target_field: 3072 }
- match: { docs.2._source.ran_script: true }

# explicit no default pipeline
- do:
index:
index: test
type: test
id: 3
id: 9
pipeline: "_none"
body: {bytes_source_field: "1kb"}

- do:
get:
index: test
type: test
id: 3
id: 9
- match: { _source.bytes_source_field: "1kb" }
- is_false: _source.bytes_target_field

# bad request
- do:
catch: bad_request
index:
index: test
type: test
id: 4
id: 10
pipeline: ""
body: {bytes_source_field: "1kb"}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,24 @@ public TransportBulkAction(ThreadPool threadPool, TransportService transportServ
clusterService.addStateApplier(this.ingestForwarder);
}

/**
* Retrieves the {@link IndexRequest} from the provided {@link DocWriteRequest} for index or upsert actions. Upserts are
* modeled as {@link IndexRequest} inside the {@link UpdateRequest}. Ignores {@link org.elasticsearch.action.delete.DeleteRequest}'s
*
* @param docWriteRequest The request to find the {@link IndexRequest}
* @return the found {@link IndexRequest} or {@code null} if one can not be found.
*/
public static IndexRequest getIndexWriteRequest(DocWriteRequest docWriteRequest) {
IndexRequest indexRequest = null;
if (docWriteRequest instanceof IndexRequest) {
indexRequest = (IndexRequest) docWriteRequest;
} else if (docWriteRequest instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) docWriteRequest;
indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
}
return indexRequest;
}

@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
final long startTime = relativeTime();
Expand Down Expand Up @@ -207,12 +225,12 @@ private void executeIngestAndBulk(Task task, final BulkRequest bulkRequest, fina
final MetaData metaData = clusterService.state().getMetaData();
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
if(indexRequest != null){
String pipeline = indexRequest.getPipeline();
if (pipeline == null) {
IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
if (indexMetaData == null) {
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
if (indexMetaData == null && indexRequest.index() != null) {
//check the alias
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
if (indexOrAlias != null && indexOrAlias.isAlias()) {
Expand Down Expand Up @@ -566,6 +584,7 @@ void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListen
indexRequest -> bulkRequestModifier.markCurrentItemAsDropped());
}


jakelandis marked this conversation as resolved.
Show resolved Hide resolved
static final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> {

final BulkRequest bulkRequest;
Expand Down Expand Up @@ -626,7 +645,7 @@ ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis,
}

void markCurrentItemAsDropped() {
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(currentSlot);
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot));
failedSlots.set(currentSlot);
itemResponses.add(
new BulkItemResponse(currentSlot, indexRequest.opType(),
Expand All @@ -639,7 +658,7 @@ void markCurrentItemAsDropped() {
}

void markCurrentItemAsFailed(Exception e) {
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(currentSlot);
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot));
// We hit a error during preprocessing a request, so we:
// 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed
// 2) Add a bulk item failure for this request
Expand Down
10 changes: 2 additions & 8 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -388,13 +388,7 @@ public void onFailure(Exception e) {
@Override
protected void doRun() {
for (DocWriteRequest<?> actionRequest : actionRequests) {
IndexRequest indexRequest = null;
if (actionRequest instanceof IndexRequest) {
indexRequest = (IndexRequest) actionRequest;
} else if (actionRequest instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) actionRequest;
indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
}
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest);
if (indexRequest == null) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
Expand Down Expand Up @@ -408,6 +409,57 @@ public void testUseDefaultPipelineWithAlias() throws Exception {
validateDefaultPipeline(new IndexRequest(WITH_DEFAULT_PIPELINE_ALIAS, "type", "id"));
}

public void testUseDefaultPipelineWithBulkUpsert() throws Exception {
Exception exception = new Exception("fake exception");
BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest1 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id1").source(Collections.emptyMap());
IndexRequest indexRequest2 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id2").source(Collections.emptyMap());
IndexRequest indexRequest3 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id3").source(Collections.emptyMap());
UpdateRequest upsertRequest = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id1").upsert(indexRequest1).script(mockScript("1"));
UpdateRequest docAsUpsertRequest = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id2").doc(indexRequest2).docAsUpsert(true);
// this test only covers the mechanics that scripted bulk upserts will execute a default pipeline. However, in practice scripted
// bulk upserts with a default pipeline are a bit surprising since the script executes AFTER the pipeline.
UpdateRequest scriptedUpsert = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id2").upsert(indexRequest3).script(mockScript("1"))
.scriptedUpsert(true);
bulkRequest.add(upsertRequest).add(docAsUpsertRequest).add(scriptedUpsert);

AtomicBoolean responseCalled = new AtomicBoolean(false);
AtomicBoolean failureCalled = new AtomicBoolean(false);
assertNull(indexRequest1.getPipeline());
assertNull(indexRequest2.getPipeline());
assertNull(indexRequest3.getPipeline());
action.execute(null, bulkRequest, ActionListener.wrap(
response -> {
BulkItemResponse itemResponse = response.iterator().next();
assertThat(itemResponse.getFailure().getMessage(), containsString("fake exception"));
responseCalled.set(true);
},
e -> {
assertThat(e, sameInstance(exception));
failureCalled.set(true);
}));

// check failure works, and passes through to the listener
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any());
assertEquals(indexRequest1.getPipeline(), "default_pipeline");
assertEquals(indexRequest2.getPipeline(), "default_pipeline");
assertEquals(indexRequest3.getPipeline(), "default_pipeline");
completionHandler.getValue().accept(exception);
assertTrue(failureCalled.get());

// now check success of the transport bulk action
indexRequest1.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
indexRequest3.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyZeroInteractions(transportService);
}

public void testCreateIndexBeforeRunPipeline() throws Exception {
Exception exception = new Exception("fake exception");
IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id");
Expand Down Expand Up @@ -445,6 +497,7 @@ private void validateDefaultPipeline(IndexRequest indexRequest) {
indexRequest.source(Collections.emptyMap());
AtomicBoolean responseCalled = new AtomicBoolean(false);
AtomicBoolean failureCalled = new AtomicBoolean(false);
assertNull(indexRequest.getPipeline());
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
response -> {
responseCalled.set(true);
Expand All @@ -459,6 +512,7 @@ private void validateDefaultPipeline(IndexRequest indexRequest) {
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any());
assertEquals(indexRequest.getPipeline(), "default_pipeline");
completionHandler.getValue().accept(exception);
assertTrue(failureCalled.get());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.TransportBulkActionTookTests.Resolver;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -132,4 +134,24 @@ public void testDeleteNonExistingDocExternalGteVersionCreatesIndex() throws Exce
throw new AssertionError(exception);
}));
}

public void testGetIndexWriteRequest() throws Exception {
IndexRequest indexRequest = new IndexRequest("index", "type", "id1").source(Collections.emptyMap());
UpdateRequest upsertRequest = new UpdateRequest("index", "type", "id1").upsert(indexRequest).script(mockScript("1"));
UpdateRequest docAsUpsertRequest = new UpdateRequest("index", "type", "id2").doc(indexRequest).docAsUpsert(true);
UpdateRequest scriptedUpsert = new UpdateRequest("index", "type", "id2").upsert(indexRequest).script(mockScript("1"))
.scriptedUpsert(true);

assertEquals(TransportBulkAction.getIndexWriteRequest(indexRequest), indexRequest);
assertEquals(TransportBulkAction.getIndexWriteRequest(upsertRequest), indexRequest);
assertEquals(TransportBulkAction.getIndexWriteRequest(docAsUpsertRequest), indexRequest);
assertEquals(TransportBulkAction.getIndexWriteRequest(scriptedUpsert), indexRequest);

DeleteRequest deleteRequest = new DeleteRequest("index", "id");
assertNull(TransportBulkAction.getIndexWriteRequest(deleteRequest));

UpdateRequest badUpsertRequest = new UpdateRequest("index", "type", "id1");
assertNull(TransportBulkAction.getIndexWriteRequest(badUpsertRequest));

jakelandis marked this conversation as resolved.
Show resolved Hide resolved
}
}