Skip to content

Commit

Permalink
Allow dropping documents with auto-generated ID (#46773)
Browse files Browse the repository at this point in the history
When using auto-generated IDs + the ingest drop processor (which looks to be used by filebeat
as well) + coordinating nodes that do not have the ingest processor functionality, this can lead
to a NullPointerException.

The issue is that markCurrentItemAsDropped() is creating an UpdateResponse with no id when
the request contains auto-generated IDs. The response serialization is lenient for our
REST/XContent format (i.e. we will send "id" : null) but the internal transport format (used for
communication between nodes) assumes for this field to be non-null, which means that it can't
be serialized between nodes. Bulk requests with ingest functionality are processed on the
coordinating node if the node has the ingest capability, and only otherwise sent to a different
node. This means that, in order to reproduce this, one needs two nodes, with the coordinating
node not having the ingest functionality.

Closes #46678
  • Loading branch information
ywelsch authored Sep 19, 2019
1 parent 8b764a5 commit 08e3ceb
Show file tree
Hide file tree
Showing 17 changed files with 92 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ void doExecute(ActionType<Response> action, Request request, ActionListener<Resp
new IndexResponse(
shardId,
index.type(),
index.id(),
index.id() == null ? "dummy_id" : index.id(),
randomInt(20),
randomIntBetween(1, 16),
randomIntBetween(0, Integer.MAX_VALUE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Locale;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
Expand Down Expand Up @@ -121,13 +122,13 @@ public void writeTo(StreamOutput out) throws IOException {
protected final Result result;

public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, Result result) {
this.shardId = shardId;
this.type = type;
this.id = id;
this.shardId = Objects.requireNonNull(shardId);
this.type = Objects.requireNonNull(type);
this.id = Objects.requireNonNull(id);
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.version = version;
this.result = result;
this.result = Objects.requireNonNull(result);
}

// needed for deserialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final IngestActionForwarder ingestForwarder;
private final NodeClient client;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private static final String DROPPED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated";

@Inject
public TransportBulkAction(ThreadPool threadPool, TransportService transportService,
Expand Down Expand Up @@ -672,11 +673,12 @@ ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis,
void markCurrentItemAsDropped() {
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot));
failedSlots.set(currentSlot);
final String id = indexRequest.id() == null ? DROPPED_ITEM_WITH_AUTO_GENERATED_ID : indexRequest.id();
itemResponses.add(
new BulkItemResponse(currentSlot, indexRequest.opType(),
new UpdateResponse(
new ShardId(indexRequest.index(), IndexMetaData.INDEX_UUID_NA_VALUE, 0),
indexRequest.type(), indexRequest.id(), SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
indexRequest.type(), id, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
indexRequest.version(), DocWriteResponse.Result.NOOP
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
import org.junit.After;
Expand Down Expand Up @@ -226,7 +227,8 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
}

private BulkItemResponse successfulResponse() {
return new BulkItemResponse(1, OpType.DELETE, new DeleteResponse(null, null, null, 0, 0, 0, false));
return new BulkItemResponse(1, OpType.DELETE, new DeleteResponse(
new ShardId("test", "test", 0), "_doc", "test", 0, 0, 0, false));
}

private BulkItemResponse failedResponse() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception {
Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation);

IndexShard shard = mock(IndexShard.class);
when(shard.shardId()).thenReturn(shardId);
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
.thenReturn(mappingUpdate);

Expand Down Expand Up @@ -583,6 +584,7 @@ public void testUpdateRequestWithSuccess() throws Exception {
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
.thenReturn(indexResult);
when(shard.indexSettings()).thenReturn(indexSettings);
when(shard.shardId()).thenReturn(shardId);

UpdateHelper updateHelper = mock(UpdateHelper.class);
when(updateHelper.prepare(any(), eq(shard), any())).thenReturn(
Expand Down Expand Up @@ -629,6 +631,7 @@ public void testUpdateWithDelete() throws Exception {
IndexShard shard = mock(IndexShard.class);
when(shard.applyDeleteOperationOnPrimary(anyLong(), any(), any(), any(), anyLong(), anyLong())).thenReturn(deleteResult);
when(shard.indexSettings()).thenReturn(indexSettings);
when(shard.shardId()).thenReturn(shardId);

UpdateHelper updateHelper = mock(UpdateHelper.class);
when(updateHelper.prepare(any(), eq(shard), any())).thenReturn(
Expand Down Expand Up @@ -783,6 +786,7 @@ public void testRetries() throws Exception {
}
});
when(shard.indexSettings()).thenReturn(indexSettings);
when(shard.shardId()).thenReturn(shardId);

UpdateHelper updateHelper = mock(UpdateHelper.class);
when(updateHelper.prepare(any(), eq(shard), any())).thenReturn(
Expand Down Expand Up @@ -814,7 +818,7 @@ public void testRetries() throws Exception {
private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) {
if (randomBoolean()) {
// add a response to the request and thereby check that it is ignored for the primary.
primaryRequest.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, new IndexResponse(null, "_doc",
primaryRequest.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, new IndexResponse(shardId, "_doc",
"ignore-primary-response-on-primary", 42, 42, 42, false)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ public void testExecuteItem() throws Exception {

public void testExecuteVerboseItemExceptionWithoutOnFailure() throws Exception {
TestProcessor processor1 = new TestProcessor("processor_0", "mock", ingestDocument -> {});
TestProcessor processor2 = new TestProcessor("processor_1", "mock",
ingestDocument -> { throw new RuntimeException("processor failed"); });
TestProcessor processor2 = new TestProcessor("processor_1", "mock", new RuntimeException("processor failed"));
TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
Expand All @@ -126,8 +125,7 @@ public void testExecuteVerboseItemExceptionWithoutOnFailure() throws Exception {
}

public void testExecuteVerboseItemWithOnFailure() throws Exception {
TestProcessor processor1 = new TestProcessor("processor_0", "mock",
ingestDocument -> { throw new RuntimeException("processor failed"); });
TestProcessor processor1 = new TestProcessor("processor_0", "mock", new RuntimeException("processor failed"));
TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> {});
TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version,
Expand Down Expand Up @@ -165,7 +163,7 @@ public void testExecuteVerboseItemWithOnFailure() throws Exception {

public void testExecuteVerboseItemExceptionWithIgnoreFailure() throws Exception {
RuntimeException exception = new RuntimeException("processor failed");
TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { throw exception; });
TestProcessor testProcessor = new TestProcessor("processor_0", "mock", exception);
CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList());
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.LongSupplier;

import static org.hamcrest.CoreMatchers.equalTo;
Expand Down Expand Up @@ -74,7 +75,7 @@ public void testSingleProcessor() throws Exception {
}

public void testSingleProcessorWithException() throws Exception {
TestProcessor processor = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processor = new TestProcessor(new RuntimeException("error"));
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundProcessor = new CompoundProcessor(relativeTimeProvider, processor);
Expand All @@ -93,7 +94,7 @@ public void testSingleProcessorWithException() throws Exception {
}

public void testIgnoreFailure() throws Exception {
TestProcessor processor1 = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processor1 = new TestProcessor(new RuntimeException("error"));
TestProcessor processor2 = new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue("field", "value");});
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
Expand All @@ -108,7 +109,7 @@ public void testIgnoreFailure() throws Exception {
}

public void testSingleProcessorWithOnFailureProcessor() throws Exception {
TestProcessor processor1 = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processor1 = new TestProcessor("id", "first", new RuntimeException("error"));
TestProcessor processor2 = new TestProcessor(ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(3));
Expand All @@ -130,7 +131,7 @@ public void testSingleProcessorWithOnFailureProcessor() throws Exception {
}

public void testSingleProcessorWithOnFailureDropProcessor() throws Exception {
TestProcessor processor1 = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processor1 = new TestProcessor("id", "first", new RuntimeException("error"));
Processor processor2 = new Processor() {
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
Expand Down Expand Up @@ -159,8 +160,8 @@ public String getTag() {
}

public void testSingleProcessorWithNestedFailures() throws Exception {
TestProcessor processor = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processorToFail = new TestProcessor("id2", "second", ingestDocument -> {
TestProcessor processor = new TestProcessor("id", "first", new RuntimeException("error"));
TestProcessor processorToFail = new TestProcessor("id2", "second", (Consumer<IngestDocument>) ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(3));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
Expand Down Expand Up @@ -189,7 +190,7 @@ public void testSingleProcessorWithNestedFailures() throws Exception {
}

public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor firstProcessor = new TestProcessor("id1", "first", new RuntimeException("error"));
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(3));
Expand All @@ -212,9 +213,9 @@ public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exceptio
}

public void testCompoundProcessorExceptionFail() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor firstProcessor = new TestProcessor("id1", "first", new RuntimeException("error"));
TestProcessor failProcessor =
new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
new TestProcessor("tag_fail", "fail", new RuntimeException("custom error message"));
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(3));
Expand All @@ -238,9 +239,9 @@ public void testCompoundProcessorExceptionFail() throws Exception {
}

public void testCompoundProcessorExceptionFailInOnFailure() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor firstProcessor = new TestProcessor("id1", "first", new RuntimeException("error"));
TestProcessor failProcessor =
new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
new TestProcessor("tag_fail", "fail", new RuntimeException("custom error message"));
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(3));
Expand All @@ -264,8 +265,8 @@ public void testCompoundProcessorExceptionFailInOnFailure() throws Exception {
}

public void testBreakOnFailure() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error1");});
TestProcessor secondProcessor = new TestProcessor("id2", "second", ingestDocument -> {throw new RuntimeException("error2");});
TestProcessor firstProcessor = new TestProcessor("id1", "first", new RuntimeException("error1"));
TestProcessor secondProcessor = new TestProcessor("id2", "second", new RuntimeException("error2"));
TestProcessor onFailureProcessor = new TestProcessor("id2", "on_failure", ingestDocument -> {});
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
Expand Down
21 changes: 21 additions & 0 deletions server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,25 @@ public void testPutWithPipelineFactoryError() throws Exception {
GetPipelineResponse response = client().admin().cluster().prepareGetPipeline("_id2").get();
assertFalse(response.isFound());
}

public void testWithDedicatedMaster() throws Exception {
String masterOnlyNode = internalCluster().startMasterOnlyNode();
BytesReference source = BytesReference.bytes(jsonBuilder().startObject()
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("test")
.endObject()
.endObject()
.endArray()
.endObject());
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();

BulkItemResponse item = client(masterOnlyNode).prepareBulk().add(
client().prepareIndex("test", "type").setSource("field", "value2", "drop", true).setPipeline("_id")).get()
.getItems()[0];
assertFalse(item.isFailed());
assertEquals("auto-generated", item.getResponse().getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testActualCompoundProcessorWithoutOnFailure() throws Exception {

public void testActualCompoundProcessorWithOnFailure() throws Exception {
RuntimeException exception = new RuntimeException("fail");
TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; });
TestProcessor failProcessor = new TestProcessor("fail", "test", exception);
TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {});
CompoundProcessor actualProcessor = new CompoundProcessor(false,
Arrays.asList(new CompoundProcessor(false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
if (doc.hasField("fail") && doc.getFieldValue("fail", Boolean.class)) {
throw new IllegalArgumentException("test processor failed");
}
if (doc.hasField("drop") && doc.getFieldValue("drop", Boolean.class)) {
return null;
}
return doc;
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* Processor used for testing, keeps track of how many times it is invoked and
Expand All @@ -31,24 +32,38 @@ public class TestProcessor implements Processor {

private final String type;
private final String tag;
private final Consumer<IngestDocument> ingestDocumentConsumer;
private final Function<IngestDocument, IngestDocument> ingestDocumentMapper;
private final AtomicInteger invokedCounter = new AtomicInteger();

public TestProcessor(Consumer<IngestDocument> ingestDocumentConsumer) {
this(null, "test-processor", ingestDocumentConsumer);
}

public TestProcessor(RuntimeException e) {
this(null, "test-processor", e);
}

public TestProcessor(String tag, String type, RuntimeException e) {
this(tag, type, (Consumer<IngestDocument>) i -> { throw e; });
}

public TestProcessor(String tag, String type, Consumer<IngestDocument> ingestDocumentConsumer) {
this.ingestDocumentConsumer = ingestDocumentConsumer;
this(tag, type, id -> {
ingestDocumentConsumer.accept(id);
return id;
});
}

public TestProcessor(String tag, String type, Function<IngestDocument, IngestDocument> ingestDocumentMapper) {
this.ingestDocumentMapper = ingestDocumentMapper;
this.type = type;
this.tag = tag;
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
invokedCounter.incrementAndGet();
ingestDocumentConsumer.accept(ingestDocument);
return ingestDocument;
return ingestDocumentMapper.apply(ingestDocument);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2002,6 +2002,7 @@ public String startMasterOnlyNode(Settings settings) {
.put(settings)
.put(Node.NODE_MASTER_SETTING.getKey(), true)
.put(Node.NODE_DATA_SETTING.getKey(), false)
.put(Node.NODE_INGEST_SETTING.getKey(), false)
.build();
return startNode(settings1);
}
Expand Down
Loading

0 comments on commit 08e3ceb

Please sign in to comment.