Skip to content

Commit

Permalink
[7.x] Map iteration support for ForEach processor (elastic#64062) (el…
Browse files Browse the repository at this point in the history
  • Loading branch information
danhermann authored Aug 17, 2021
1 parent 0055c15 commit 4d0dee5
Show file tree
Hide file tree
Showing 2 changed files with 263 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.ingest.common;

import org.elasticsearch.common.Strings;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
Expand All @@ -16,6 +17,7 @@
import org.elasticsearch.script.ScriptService;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -28,12 +30,10 @@
import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;

/**
* A processor that for each value in a list executes a one or more processors.
* Processor that executes another processor for each value in a list or map field.
*
* This can be useful in cases to do string operations on json array of strings,
* or remove a field from objects inside a json array.
*
* Note that this processor is experimental.
* This can be useful for performing string operations on arrays of strings,
* removing or modifying a field in objects inside arrays or maps, etc.
*/
public final class ForEachProcessor extends AbstractProcessor implements WrappingProcessor {

Expand All @@ -56,20 +56,61 @@ boolean isIgnoreMissing() {

@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
List<?> values = ingestDocument.getFieldValue(field, List.class, ignoreMissing);
if (values == null) {
Object o = ingestDocument.getFieldValue(field, Object.class, ignoreMissing);
if (o == null) {
if (ignoreMissing) {
handler.accept(ingestDocument, null);
} else {
handler.accept(null, new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements."));
}
} else if (o instanceof Map) {
Map<?, ?> map = (Map<?, ?>) o;
List<?> keys = new ArrayList<>(map.keySet());
innerExecuteMap(0, new HashMap<Object, Object>(map), keys, new HashMap<>(map.size()), ingestDocument, handler);
} else if (o instanceof List) {
List<?> list = (List<?>) o;
innerExecuteList(0, new ArrayList<>(list), new ArrayList<>(list.size()), ingestDocument, handler);
} else {
innerExecute(0, new ArrayList<>(values), new ArrayList<>(values.size()), ingestDocument, handler);
throw new IllegalArgumentException("field [" + field + "] of type [" + o.getClass().getName() + "] cannot be cast to a " +
"list or map");
}
}

void innerExecuteMap(int keyIndex, Map<?, ?> map, List<?> keys, Map<Object, Object> newValues, IngestDocument document,
BiConsumer<IngestDocument, Exception> handler) {
for (; keyIndex < keys.size(); keyIndex++) {
AtomicBoolean shouldContinueHere = new AtomicBoolean();
String key = (String) keys.get(keyIndex);
Object previousKey = document.getIngestMetadata().put("_key", key);
Object value = map.get(key);
Object previousValue = document.getIngestMetadata().put("_value", value);
int nextIndex = keyIndex + 1;
processor.execute(document, (result, e) -> {
String newKey = (String) document.getIngestMetadata().get("_key");
if (Strings.hasText(newKey)) {
newValues.put(newKey, document.getIngestMetadata().put("_value", previousValue));
}
document.getIngestMetadata().put("_key", previousKey);
if (e != null || result == null) {
handler.accept(result, e);
} else if (shouldContinueHere.getAndSet(true)) {
innerExecuteMap(nextIndex, map, keys, newValues, document, handler);
}
});

if (shouldContinueHere.getAndSet(true) == false) {
return;
}
}

if (keyIndex == keys.size()) {
document.setFieldValue(field, new HashMap<>(newValues));
handler.accept(document, null);
}
}

void innerExecute(int index, List<?> values, List<Object> newValues, IngestDocument document,
BiConsumer<IngestDocument, Exception> handler) {
void innerExecuteList(int index, List<?> values, List<Object> newValues, IngestDocument document,
BiConsumer<IngestDocument, Exception> handler) {
for (; index < values.size(); index++) {
AtomicBoolean shouldContinueHere = new AtomicBoolean();
Object value = values.get(index);
Expand All @@ -80,7 +121,7 @@ void innerExecute(int index, List<?> values, List<Object> newValues, IngestDocum
if (e != null || result == null) {
handler.accept(result, e);
} else if (shouldContinueHere.getAndSet(true)) {
innerExecute(nextIndex, values, newValues, document, handler);
innerExecuteList(nextIndex, values, newValues, document, handler);
}
});

Expand Down
Loading

0 comments on commit 4d0dee5

Please sign in to comment.