Skip to content

Commit

Permalink
Fix ingest simulate response document order if processor executes asy…
Browse files Browse the repository at this point in the history
…nc (elastic#50244)

If a processor executes asynchronously and the ingest simulate api simulates with
multiple documents then the order of the documents in the response may not match
the order of the documents in the request.

Alexander Reelsen discovered this issue with the enrich processor with the following reproduction:

```
PUT cities/_doc/munich
{"zip":"80331","city":"Munich"}

PUT cities/_doc/berlin
{"zip":"10965","city":"Berlin"}

PUT /_enrich/policy/zip-policy
{
  "match": {
    "indices": "cities",
    "match_field": "zip",
    "enrich_fields": [ "city" ]
  }
}

POST /_enrich/policy/zip-policy/_execute

GET _cat/indices/.enrich-*

POST /_ingest/pipeline/_simulate
{
  "pipeline": {
  "processors" : [
    {
      "enrich" : {
        "policy_name": "zip-policy",
        "field" : "zip",
        "target_field": "city",
        "max_matches": "1"
      }
    }
  ]
  },
  "docs": [
    { "_id": "first", "_source" : { "zip" : "80331" } } ,
    { "_id": "second", "_source" : { "zip" : "50667" } }
  ]
}
```
  • Loading branch information
martijnvg authored and SivagurunathanV committed Jan 21, 2020
1 parent 7f6644e commit f2045ab
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,21 @@ void executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean v
public void execute(SimulatePipelineRequest.Parsed request, ActionListener<SimulatePipelineResponse> listener) {
threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> {
final AtomicInteger counter = new AtomicInteger();
final List<SimulateDocumentResult> responses = new CopyOnWriteArrayList<>();
final List<SimulateDocumentResult> responses =
new CopyOnWriteArrayList<>(new SimulateDocumentBaseResult[request.getDocuments().size()]);
int iter = 0;
for (IngestDocument ingestDocument : request.getDocuments()) {
final int index = iter;
executeDocument(request.getPipeline(), ingestDocument, request.isVerbose(), (response, e) -> {
if (response != null) {
responses.add(response);
responses.set(index, response);
}
if (counter.incrementAndGet() == request.getDocuments().size()) {
l.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(),
request.isVerbose(), responses));
}
});
iter++;
}
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.elasticsearch.action.ingest;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.DropProcessor;
import org.elasticsearch.ingest.IngestDocument;
Expand All @@ -29,17 +32,24 @@
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -330,4 +340,56 @@ public void testDropDocumentVerboseExtraProcessor() throws Exception {
assertThat(verboseResult.getProcessorResults().get(1).getFailure(), nullValue());
}

public void testAsyncSimulation() throws Exception {
int numDocs = randomIntBetween(1, 64);
List<IngestDocument> documents = new ArrayList<>(numDocs);
for (int id = 0; id < numDocs; id++) {
documents.add(new IngestDocument("_index", Integer.toString(id), null, 0L, VersionType.INTERNAL, new HashMap<>()));
}
Processor processor1 = new AbstractProcessor(null) {

@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
ingestDocument.setFieldValue("processed", true);
handler.accept(ingestDocument, null);
});
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException();
}

@Override
public String getType() {
return "none-of-your-business";
}
};
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1));
SimulatePipelineRequest.Parsed request = new SimulatePipelineRequest.Parsed(pipeline, documents, false);

AtomicReference<SimulatePipelineResponse> responseHolder = new AtomicReference<>();
AtomicReference<Exception> errorHolder = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
executionService.execute(request, ActionListener.wrap(response -> {
responseHolder.set(response);
latch.countDown();
}, e -> {
errorHolder.set(e);
latch.countDown();
}));
latch.await(1, TimeUnit.MINUTES);
assertThat(errorHolder.get(), nullValue());
SimulatePipelineResponse response = responseHolder.get();
assertThat(response, notNullValue());
assertThat(response.getResults().size(), equalTo(numDocs));

for (int id = 0; id < numDocs; id++) {
SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) response.getResults().get(id);
assertThat(result.getIngestDocument().getMetadata().get(IngestDocument.MetaData.ID), equalTo(Integer.toString(id)));
assertThat(result.getIngestDocument().getSourceAndMetadata().get("processed"), is(true));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ public TransportAction(TransportService transportService, ActionFilters actionFi

@Override
protected void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> listener) {
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE);
// Write tp is expected when executing enrich processor from index / bulk api
// Management tp is expected when executing enrich processor from ingest simulate api
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE)
|| Thread.currentThread().getName().contains(ThreadPool.Names.MANAGEMENT);
coordinator.schedule(request, listener);
}
}
Expand Down

0 comments on commit f2045ab

Please sign in to comment.