Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Processors and Final Pipelines are Broken #69818

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -357,9 +358,17 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
new AbstractProcessor(tag, description) {

@Override
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
ingestDocument.setFieldValue("default", true);
return ingestDocument;
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
// randomize over sync and async execution
randomFrom(parameters.genericExecutor, Runnable::run).accept(() -> {
ingestDocument.setFieldValue("default", true);
handler.accept(ingestDocument, null);
});
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) {
throw new AssertionError("should not be called");
}

@Override
Expand Down
96 changes: 45 additions & 51 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -496,67 +496,61 @@ private void executePipelines(
final BiConsumer<Thread, Exception> onCompletion,
final Thread originalThread
) {
while (it.hasNext()) {
final String pipelineId = it.next();
try {
PipelineHolder holder = pipelines.get(pipelineId);
if (holder == null) {
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
assert it.hasNext();
final String pipelineId = it.next();
try {
PipelineHolder holder = pipelines.get(pipelineId);
if (holder == null) {
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
}
Pipeline pipeline = holder.pipeline;
String originalIndex = indexRequest.indices()[0];
innerExecute(slot, indexRequest, pipeline, onDropped, e -> {
if (e != null) {
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
pipelineId, indexRequest.index(), indexRequest.id()), e);
onFailure.accept(slot, e);
}
Pipeline pipeline = holder.pipeline;
String originalIndex = indexRequest.indices()[0];
innerExecute(slot, indexRequest, pipeline, onDropped, e -> {
if (e != null) {
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
pipelineId, indexRequest.index(), indexRequest.id()), e);
onFailure.accept(slot, e);
}

Iterator<String> newIt = it;
boolean newHasFinalPipeline = hasFinalPipeline;
String newIndex = indexRequest.indices()[0];
Iterator<String> newIt = it;
boolean newHasFinalPipeline = hasFinalPipeline;
String newIndex = indexRequest.indices()[0];

if (Objects.equals(originalIndex, newIndex) == false) {
if (hasFinalPipeline && it.hasNext() == false) {
totalMetrics.ingestFailed();
onFailure.accept(slot, new IllegalStateException("final pipeline [" + pipelineId +
"] can't change the target index"));
if (Objects.equals(originalIndex, newIndex) == false) {
if (hasFinalPipeline && it.hasNext() == false) {
totalMetrics.ingestFailed();
onFailure.accept(slot, new IllegalStateException("final pipeline [" + pipelineId +
"] can't change the target index"));
} else {
indexRequest.isPipelineResolved(false);
resolvePipelines(null, indexRequest, state.metadata());
if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) {
newIt = Collections.singleton(indexRequest.getFinalPipeline()).iterator();
newHasFinalPipeline = true;
} else {

//Drain old it so it's not looped over
it.forEachRemaining($ -> {
});
indexRequest.isPipelineResolved(false);
resolvePipelines(null, indexRequest, state.metadata());
if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) {
newIt = Collections.singleton(indexRequest.getFinalPipeline()).iterator();
newHasFinalPipeline = true;
} else {
newIt = Collections.emptyIterator();
}
newIt = Collections.emptyIterator();
}
}
}

if (newIt.hasNext()) {
executePipelines(slot, newIt, newHasFinalPipeline, indexRequest, onDropped, onFailure, counter, onCompletion,
originalThread);
} else {
if (counter.decrementAndGet() == 0) {
onCompletion.accept(originalThread, null);
}
assert counter.get() >= 0;
if (newIt.hasNext()) {
executePipelines(slot, newIt, newHasFinalPipeline, indexRequest, onDropped, onFailure, counter, onCompletion,
originalThread);
} else {
if (counter.decrementAndGet() == 0) {
onCompletion.accept(originalThread, null);
}
});
} catch (Exception e) {
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
pipelineId, indexRequest.index(), indexRequest.id()), e);
onFailure.accept(slot, e);
if (counter.decrementAndGet() == 0) {
onCompletion.accept(originalThread, null);
assert counter.get() >= 0;
}
assert counter.get() >= 0;
break;
});
} catch (Exception e) {
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
pipelineId, indexRequest.index(), indexRequest.id()), e);
onFailure.accept(slot, e);
if (counter.decrementAndGet() == 0) {
onCompletion.accept(originalThread, null);
}
assert counter.get() >= 0;
}
}

Expand Down