Skip to content

Commit

Permalink
[Rest Api Compatibility] Type metadata for docs used in simulate requ…
Browse files Browse the repository at this point in the history
…est (#74222)

This commit allows to provide _type field on document ingested in
simulate pipeline requests.

relates main meta issue #51816
relates types removal issue #54160
  • Loading branch information
pgomulka authored Jun 21, 2021
1 parent 98a6ef9 commit 073f080
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
Expand All @@ -33,17 +35,24 @@
import java.util.Objects;

public class SimulatePipelineRequest extends ActionRequest implements ToXContentObject {
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(SimulatePipelineRequest.class);
private String id;
private boolean verbose;
private BytesReference source;
private XContentType xContentType;
private RestApiVersion restApiVersion;

/**
* Creates a new request with the given source and its content type
*/
public SimulatePipelineRequest(BytesReference source, XContentType xContentType) {
this(source, xContentType, RestApiVersion.current());
}

public SimulatePipelineRequest(BytesReference source, XContentType xContentType, RestApiVersion restApiVersion) {
this.source = Objects.requireNonNull(source);
this.xContentType = Objects.requireNonNull(xContentType);
this.restApiVersion = restApiVersion;
}

SimulatePipelineRequest() {
Expand Down Expand Up @@ -133,28 +142,30 @@ public boolean isVerbose() {

static final String SIMULATED_PIPELINE_ID = "_simulate_pipeline";

static Parsed parseWithPipelineId(String pipelineId, Map<String, Object> config, boolean verbose, IngestService ingestService) {
static Parsed parseWithPipelineId(String pipelineId, Map<String, Object> config, boolean verbose, IngestService ingestService,
RestApiVersion restApiVersion) {
if (pipelineId == null) {
throw new IllegalArgumentException("param [pipeline] is null");
}
Pipeline pipeline = ingestService.getPipeline(pipelineId);
if (pipeline == null) {
throw new IllegalArgumentException("pipeline [" + pipelineId + "] does not exist");
}
List<IngestDocument> ingestDocumentList = parseDocs(config);
List<IngestDocument> ingestDocumentList = parseDocs(config, restApiVersion);
return new Parsed(pipeline, ingestDocumentList, verbose);
}

static Parsed parse(Map<String, Object> config, boolean verbose, IngestService ingestService) throws Exception {
static Parsed parse(Map<String, Object> config, boolean verbose, IngestService ingestService, RestApiVersion restApiVersion)
throws Exception {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
Pipeline pipeline = Pipeline.create(
SIMULATED_PIPELINE_ID, pipelineConfig, ingestService.getProcessorFactories(), ingestService.getScriptService()
);
List<IngestDocument> ingestDocumentList = parseDocs(config);
List<IngestDocument> ingestDocumentList = parseDocs(config, restApiVersion);
return new Parsed(pipeline, ingestDocumentList, verbose);
}

private static List<IngestDocument> parseDocs(Map<String, Object> config) {
private static List<IngestDocument> parseDocs(Map<String, Object> config, RestApiVersion restApiVersion) {
List<Map<String, Object>> docs =
ConfigurationUtils.readList(null, null, config, Fields.DOCS);
if (docs.isEmpty()) {
Expand All @@ -174,6 +185,10 @@ private static List<IngestDocument> parseDocs(Map<String, Object> config) {
dataMap, Metadata.ID.getFieldName(), "_id");
String routing = ConfigurationUtils.readOptionalStringOrIntProperty(null, null,
dataMap, Metadata.ROUTING.getFieldName());
if (restApiVersion == RestApiVersion.V_7 && dataMap.containsKey(Metadata.TYPE.getFieldName())) {
deprecationLogger.compatibleApiWarning("simulate_pipeline_with_types",
"[types removal] specifying _type in pipeline simulation requests is deprecated");
}
Long version = null;
if (dataMap.containsKey(Metadata.VERSION.getFieldName())) {
String versionValue = ConfigurationUtils.readOptionalStringOrLongProperty(null, null,
Expand Down Expand Up @@ -224,4 +239,8 @@ private static List<IngestDocument> parseDocs(Map<String, Object> config) {
}
return ingestDocumentList;
}

public RestApiVersion getRestApiVersion() {
return restApiVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe
final SimulatePipelineRequest.Parsed simulateRequest;
try {
if (request.getId() != null) {
simulateRequest = SimulatePipelineRequest.parseWithPipelineId(request.getId(), source, request.isVerbose(), ingestService);
simulateRequest = SimulatePipelineRequest.parseWithPipelineId(request.getId(), source, request.isVerbose(), ingestService,
request.getRestApiVersion());
} else {
simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), ingestService);
simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), ingestService, request.getRestApiVersion());
}
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.IngestDocument.Metadata;

Expand Down Expand Up @@ -118,6 +120,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(metadata.getKey().getFieldName(), metadata.getValue().toString());
}
}
if(builder.getRestApiVersion() == RestApiVersion.V_7) {
builder.field(MapperService.TYPE_FIELD_NAME, MapperService.SINGLE_MAPPING_NAME);
}
Map<String, Object> source = IngestDocument.deepCopyMap(ingestDocument.getSourceAndMetadata());
metadataMap.keySet().forEach(mD -> source.remove(mD.getFieldName()));
builder.field(SOURCE_FIELD, source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ public String toString() {

public enum Metadata {
INDEX(IndexFieldMapper.NAME),
TYPE("_type"),
ID(IdFieldMapper.NAME),
ROUTING(RoutingFieldMapper.NAME),
VERSION(VersionFieldMapper.NAME),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.action.ingest;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
Expand All @@ -31,12 +32,13 @@
import static org.elasticsearch.action.ingest.SimulatePipelineRequest.Fields;
import static org.elasticsearch.action.ingest.SimulatePipelineRequest.SIMULATED_PIPELINE_ID;
import static org.elasticsearch.ingest.IngestDocument.Metadata.ID;
import static org.elasticsearch.ingest.IngestDocument.Metadata.IF_PRIMARY_TERM;
import static org.elasticsearch.ingest.IngestDocument.Metadata.IF_SEQ_NO;
import static org.elasticsearch.ingest.IngestDocument.Metadata.INDEX;
import static org.elasticsearch.ingest.IngestDocument.Metadata.ROUTING;
import static org.elasticsearch.ingest.IngestDocument.Metadata.TYPE;
import static org.elasticsearch.ingest.IngestDocument.Metadata.VERSION;
import static org.elasticsearch.ingest.IngestDocument.Metadata.VERSION_TYPE;
import static org.elasticsearch.ingest.IngestDocument.Metadata.IF_SEQ_NO;
import static org.elasticsearch.ingest.IngestDocument.Metadata.IF_PRIMARY_TERM;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
Expand All @@ -49,7 +51,8 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {

@Before
public void init() throws IOException {
TestProcessor processor = new TestProcessor(ingestDocument -> {});
TestProcessor processor = new TestProcessor(ingestDocument -> {
});
CompoundProcessor pipelineCompoundProcessor = new CompoundProcessor(processor);
Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, null, pipelineCompoundProcessor);
Map<String, Processor.Factory> registry =
Expand Down Expand Up @@ -84,7 +87,8 @@ public void testParseUsingPipelineStore() throws Exception {
}

SimulatePipelineRequest.Parsed actualRequest =
SimulatePipelineRequest.parseWithPipelineId(SIMULATED_PIPELINE_ID, requestContent, false, ingestService);
SimulatePipelineRequest.parseWithPipelineId(SIMULATED_PIPELINE_ID, requestContent, false, ingestService,
RestApiVersion.current());
assertThat(actualRequest.isVerbose(), equalTo(false));
assertThat(actualRequest.getDocuments().size(), equalTo(numDocs));
Iterator<Map<String, Object>> expectedDocsIterator = expectedDocs.iterator();
Expand Down Expand Up @@ -112,7 +116,7 @@ public void testParseWithProvidedPipeline() throws Exception {
Map<String, Object> doc = new HashMap<>();
Map<String, Object> expectedDoc = new HashMap<>();
List<IngestDocument.Metadata> fields = Arrays.asList(INDEX, ID, ROUTING, VERSION, VERSION_TYPE, IF_SEQ_NO, IF_PRIMARY_TERM);
for(IngestDocument.Metadata field : fields) {
for (IngestDocument.Metadata field : fields) {
if (field == VERSION) {
Object value = randomBoolean() ? randomLong() : randomInt();
doc.put(field.getFieldName(), randomBoolean() ? value : value.toString());
Expand Down Expand Up @@ -177,7 +181,8 @@ public void testParseWithProvidedPipeline() throws Exception {

requestContent.put(Fields.PIPELINE, pipelineConfig);

SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parse(requestContent, false, ingestService);
SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parse(requestContent, false, ingestService,
RestApiVersion.current());
assertThat(actualRequest.isVerbose(), equalTo(false));
assertThat(actualRequest.getDocuments().size(), equalTo(numDocs));
Iterator<Map<String, Object>> expectedDocsIterator = expectedDocs.iterator();
Expand All @@ -204,7 +209,7 @@ public void testNullPipelineId() {
List<Map<String, Object>> docs = new ArrayList<>();
requestContent.put(Fields.DOCS, docs);
Exception e = expectThrows(IllegalArgumentException.class,
() -> SimulatePipelineRequest.parseWithPipelineId(null, requestContent, false, ingestService));
() -> SimulatePipelineRequest.parseWithPipelineId(null, requestContent, false, ingestService, RestApiVersion.current()));
assertThat(e.getMessage(), equalTo("param [pipeline] is null"));
}

Expand All @@ -214,7 +219,7 @@ public void testNonExistentPipelineId() {
List<Map<String, Object>> docs = new ArrayList<>();
requestContent.put(Fields.DOCS, docs);
Exception e = expectThrows(IllegalArgumentException.class,
() -> SimulatePipelineRequest.parseWithPipelineId(pipelineId, requestContent, false, ingestService));
() -> SimulatePipelineRequest.parseWithPipelineId(pipelineId, requestContent, false, ingestService, RestApiVersion.current()));
assertThat(e.getMessage(), equalTo("pipeline [" + pipelineId + "] does not exist"));
}

Expand All @@ -227,7 +232,7 @@ public void testNotValidDocs() {
requestContent.put(Fields.DOCS, docs);
requestContent.put(Fields.PIPELINE, pipelineConfig);
Exception e1 = expectThrows(IllegalArgumentException.class,
() -> SimulatePipelineRequest.parse(requestContent, false, ingestService));
() -> SimulatePipelineRequest.parse(requestContent, false, ingestService, RestApiVersion.current()));
assertThat(e1.getMessage(), equalTo("must specify at least one document in [docs]"));

List<String> stringList = new ArrayList<>();
Expand All @@ -236,14 +241,107 @@ public void testNotValidDocs() {
requestContent.put(Fields.DOCS, stringList);
requestContent.put(Fields.PIPELINE, pipelineConfig);
Exception e2 = expectThrows(IllegalArgumentException.class,
() -> SimulatePipelineRequest.parse(requestContent, false, ingestService));
() -> SimulatePipelineRequest.parse(requestContent, false, ingestService, RestApiVersion.current()));
assertThat(e2.getMessage(), equalTo("malformed [docs] section, should include an inner object"));

docs.add(new HashMap<>());
requestContent.put(Fields.DOCS, docs);
requestContent.put(Fields.PIPELINE, pipelineConfig);
Exception e3 = expectThrows(ElasticsearchParseException.class,
() -> SimulatePipelineRequest.parse(requestContent, false, ingestService));
() -> SimulatePipelineRequest.parse(requestContent, false, ingestService, RestApiVersion.current()));
assertThat(e3.getMessage(), containsString("required property is missing"));
}

public void testIngestPipelineWithDocumentsWithType() throws Exception {
int numDocs = randomIntBetween(1, 10);

Map<String, Object> requestContent = new HashMap<>();
List<Map<String, Object>> docs = new ArrayList<>();
List<Map<String, Object>> expectedDocs = new ArrayList<>();
requestContent.put(Fields.DOCS, docs);
for (int i = 0; i < numDocs; i++) {
Map<String, Object> doc = new HashMap<>();
Map<String, Object> expectedDoc = new HashMap<>();
List<IngestDocument.Metadata> fields = Arrays.asList(INDEX, TYPE, ID, ROUTING, VERSION, VERSION_TYPE);
for (IngestDocument.Metadata field : fields) {
if (field == VERSION) {
Long value = randomLong();
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), value);
} else if (field == VERSION_TYPE) {
String value = VersionType.toString(
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE)
);
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), value);
} else if (field == TYPE) {
String value = randomAlphaOfLengthBetween(1, 10);
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), value);
} else {
if (randomBoolean()) {
String value = randomAlphaOfLengthBetween(1, 10);
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), value);
} else {
Integer value = randomIntBetween(1, 1000000);
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), String.valueOf(value));
}
}
}
String fieldName = randomAlphaOfLengthBetween(1, 10);
String fieldValue = randomAlphaOfLengthBetween(1, 10);
doc.put(Fields.SOURCE, Collections.singletonMap(fieldName, fieldValue));
docs.add(doc);
expectedDoc.put(Fields.SOURCE, Collections.singletonMap(fieldName, fieldValue));
expectedDocs.add(expectedDoc);
}
Map<String, Object> pipelineConfig = new HashMap<>();
List<Map<String, Object>> processors = new ArrayList<>();
int numProcessors = randomIntBetween(1, 10);
for (int i = 0; i < numProcessors; i++) {
Map<String, Object> processorConfig = new HashMap<>();
List<Map<String, Object>> onFailureProcessors = new ArrayList<>();
int numOnFailureProcessors = randomIntBetween(0, 1);
for (int j = 0; j < numOnFailureProcessors; j++) {
onFailureProcessors.add(Collections.singletonMap("mock_processor", Collections.emptyMap()));
}
if (numOnFailureProcessors > 0) {
processorConfig.put("on_failure", onFailureProcessors);
}
processors.add(Collections.singletonMap("mock_processor", processorConfig));
}
pipelineConfig.put("processors", processors);
List<Map<String, Object>> onFailureProcessors = new ArrayList<>();
int numOnFailureProcessors = randomIntBetween(0, 1);
for (int i = 0; i < numOnFailureProcessors; i++) {
onFailureProcessors.add(Collections.singletonMap("mock_processor", Collections.emptyMap()));
}
if (numOnFailureProcessors > 0) {
pipelineConfig.put("on_failure", onFailureProcessors);
}
requestContent.put(Fields.PIPELINE, pipelineConfig);
SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parse(requestContent, false, ingestService,
RestApiVersion.V_7);
assertThat(actualRequest.isVerbose(), equalTo(false));
assertThat(actualRequest.getDocuments().size(), equalTo(numDocs));
Iterator<Map<String, Object>> expectedDocsIterator = expectedDocs.iterator();
for (IngestDocument ingestDocument : actualRequest.getDocuments()) {
Map<String, Object> expectedDocument = expectedDocsIterator.next();
Map<IngestDocument.Metadata, Object> metadataMap = ingestDocument.extractMetadata();
assertThat(metadataMap.get(INDEX), equalTo(expectedDocument.get(INDEX.getFieldName())));
assertThat(metadataMap.get(ID), equalTo(expectedDocument.get(ID.getFieldName())));
assertThat(metadataMap.get(ROUTING), equalTo(expectedDocument.get(ROUTING.getFieldName())));
assertThat(metadataMap.get(VERSION), equalTo(expectedDocument.get(VERSION.getFieldName())));
assertThat(metadataMap.get(VERSION_TYPE), equalTo(expectedDocument.get(VERSION_TYPE.getFieldName())));
assertThat(ingestDocument.getSourceAndMetadata(), equalTo(expectedDocument.get(Fields.SOURCE)));
}
assertThat(actualRequest.getPipeline().getId(), equalTo(SIMULATED_PIPELINE_ID));
assertThat(actualRequest.getPipeline().getDescription(), nullValue());
assertThat(actualRequest.getPipeline().getProcessors().size(), equalTo(numProcessors));

assertWarnings("[types removal] specifying _type in pipeline simulation requests is deprecated");

}
}

0 comments on commit 073f080

Please sign in to comment.