diff --git a/server/src/internalClusterTest/java/org/opensearch/ingest/IngestClientIT.java b/server/src/internalClusterTest/java/org/opensearch/ingest/IngestClientIT.java index 9481a6116cdbc..6aebad8063846 100644 --- a/server/src/internalClusterTest/java/org/opensearch/ingest/IngestClientIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/ingest/IngestClientIT.java @@ -60,15 +60,18 @@ import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; +import org.hamcrest.MatcherAssert; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; import static org.opensearch.test.NodeRoles.nonIngestNode; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; @@ -177,6 +180,7 @@ public void testBulkWithIngestFailures() throws Exception { int numRequests = scaledRandomIntBetween(32, 128); BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.batchSize(numRequests); for (int i = 0; i < numRequests; i++) { IndexRequest indexRequest = new IndexRequest("index").id(Integer.toString(i)).setPipeline("_id"); indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", i % 2 == 0); @@ -209,6 +213,47 @@ public void testBulkWithIngestFailures() throws Exception { assertTrue(deletePipelineResponse.isAcknowledged()); } + public void testBulkWithIngestFailuresBatch() throws Exception { + createIndex("index"); + + BytesReference source = BytesReference.bytes( + jsonBuilder().startObject() + .field("description", "my_pipeline") + .startArray("processors") + .startObject() + .startObject("test") + .endObject() + .endObject() + .endArray() + .endObject() + ); + PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, MediaTypeRegistry.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.batchSize(2); + bulkRequest.add( + new IndexRequest("index").id("_fail").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", true) + ); + bulkRequest.add( + new IndexRequest("index").id("_success").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", false) + ); + + BulkResponse response = client().bulk(bulkRequest).actionGet(); + MatcherAssert.assertThat(response.getItems().length, equalTo(bulkRequest.requests().size())); + + Map results = Arrays.stream(response.getItems()) + .collect(Collectors.toMap(BulkItemResponse::getId, r -> r)); + + MatcherAssert.assertThat(results.keySet(), containsInAnyOrder("_fail", "_success")); + assertNotNull(results.get("_fail").getFailure()); + assertNull(results.get("_success").getFailure()); + + // cleanup + AcknowledgedResponse deletePipelineResponse = client().admin().cluster().prepareDeletePipeline("_id").get(); + assertTrue(deletePipelineResponse.isAcknowledged()); + } + public void testBulkWithUpsert() throws Exception { createIndex("index");