Skip to content

Commit

Permalink
Async Processors and Final Pipelines are Broken (elastic#69818)
Browse files Browse the repository at this point in the history
There was an obvious race here where async processor and final pipeline will
run concurrently (or the final pipeline runs multiple times in from the while loop).

relates elastic#52339 (fixes one failure scenario here but since the failure also occurred in 7.10.x not all of them)
  • Loading branch information
original-brownbear committed Mar 3, 2021
1 parent af97090 commit e2c3232
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -359,9 +360,17 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
new AbstractProcessor(tag, description) {

@Override
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
ingestDocument.setFieldValue("default", true);
return ingestDocument;
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
// randomize over sync and async execution
randomFrom(parameters.genericExecutor, Runnable::run).accept(() -> {
ingestDocument.setFieldValue("default", true);
handler.accept(ingestDocument, null);
});
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) {
throw new AssertionError("should not be called");
}

@Override
Expand Down
96 changes: 45 additions & 51 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -497,67 +497,61 @@ private void executePipelines(
final BiConsumer<Thread, Exception> onCompletion,
final Thread originalThread
) {
while (it.hasNext()) {
final String pipelineId = it.next();
try {
PipelineHolder holder = pipelines.get(pipelineId);
if (holder == null) {
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
assert it.hasNext();
final String pipelineId = it.next();
try {
PipelineHolder holder = pipelines.get(pipelineId);
if (holder == null) {
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
}
Pipeline pipeline = holder.pipeline;
String originalIndex = indexRequest.indices()[0];
innerExecute(slot, indexRequest, pipeline, onDropped, e -> {
if (e != null) {
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
pipelineId, indexRequest.index(), indexRequest.id()), e);
onFailure.accept(slot, e);
}
Pipeline pipeline = holder.pipeline;
String originalIndex = indexRequest.indices()[0];
innerExecute(slot, indexRequest, pipeline, onDropped, e -> {
if (e != null) {
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
pipelineId, indexRequest.index(), indexRequest.id()), e);
onFailure.accept(slot, e);
}

Iterator<String> newIt = it;
boolean newHasFinalPipeline = hasFinalPipeline;
String newIndex = indexRequest.indices()[0];
Iterator<String> newIt = it;
boolean newHasFinalPipeline = hasFinalPipeline;
String newIndex = indexRequest.indices()[0];

if (Objects.equals(originalIndex, newIndex) == false) {
if (hasFinalPipeline && it.hasNext() == false) {
totalMetrics.ingestFailed();
onFailure.accept(slot, new IllegalStateException("final pipeline [" + pipelineId +
"] can't change the target index"));
if (Objects.equals(originalIndex, newIndex) == false) {
if (hasFinalPipeline && it.hasNext() == false) {
totalMetrics.ingestFailed();
onFailure.accept(slot, new IllegalStateException("final pipeline [" + pipelineId +
"] can't change the target index"));
} else {
indexRequest.isPipelineResolved(false);
resolvePipelines(null, indexRequest, state.metadata());
if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) {
newIt = Collections.singleton(indexRequest.getFinalPipeline()).iterator();
newHasFinalPipeline = true;
} else {

//Drain old it so it's not looped over
it.forEachRemaining($ -> {
});
indexRequest.isPipelineResolved(false);
resolvePipelines(null, indexRequest, state.metadata());
if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) {
newIt = Collections.singleton(indexRequest.getFinalPipeline()).iterator();
newHasFinalPipeline = true;
} else {
newIt = Collections.emptyIterator();
}
newIt = Collections.emptyIterator();
}
}
}

if (newIt.hasNext()) {
executePipelines(slot, newIt, newHasFinalPipeline, indexRequest, onDropped, onFailure, counter, onCompletion,
originalThread);
} else {
if (counter.decrementAndGet() == 0) {
onCompletion.accept(originalThread, null);
}
assert counter.get() >= 0;
if (newIt.hasNext()) {
executePipelines(slot, newIt, newHasFinalPipeline, indexRequest, onDropped, onFailure, counter, onCompletion,
originalThread);
} else {
if (counter.decrementAndGet() == 0) {
onCompletion.accept(originalThread, null);
}
});
} catch (Exception e) {
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
pipelineId, indexRequest.index(), indexRequest.id()), e);
onFailure.accept(slot, e);
if (counter.decrementAndGet() == 0) {
onCompletion.accept(originalThread, null);
assert counter.get() >= 0;
}
assert counter.get() >= 0;
break;
});
} catch (Exception e) {
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
pipelineId, indexRequest.index(), indexRequest.id()), e);
onFailure.accept(slot, e);
if (counter.decrementAndGet() == 0) {
onCompletion.accept(originalThread, null);
}
assert counter.get() >= 0;
}
}

Expand Down

0 comments on commit e2c3232

Please sign in to comment.