diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index e4c7daad1..a948899ff 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -27,9 +27,11 @@ import org.opensearch.script.ScriptType import org.opensearch.search.aggregations.AggregationBuilders import org.opensearch.search.aggregations.AggregatorFactories import org.opensearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder +import org.opensearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder import java.lang.Integer.min import java.time.Instant import java.time.temporal.ChronoUnit +import kotlin.test.assertFailsWith class TransformRunnerIT : TransformRestTestCase() { @@ -688,6 +690,42 @@ class TransformRunnerIT : TransformRestTestCase() { assertTrue("Expected failure message to be present", !metadata.failureReason.isNullOrBlank()) } + fun `test transform with invalid pipeline aggregation triggering search failure`() { + assertFailsWith(IllegalArgumentException::class, "Bucket-script aggregation must fail!") { + validateSourceIndex("transform-source-index") + + val aggregatorFactories = AggregatorFactories.builder() + aggregatorFactories.addPipelineAggregator( + BucketScriptPipelineAggregationBuilder( + "test_pipeline_aggregation", + Script("1") + ) + ) + + val transform = Transform( + id = "id_17", + schemaVersion = 1L, + enabled = true, + enabledAt = Instant.now(), + updatedAt = Instant.now(), + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + description = "test transform", + metadataId = null, + sourceIndex = "transform-source-index", + targetIndex = "transform-target-index", + roles = emptyList(), + pageSize = 1, + groups = listOf( + Terms(sourceField = "store_and_fwd_flag", targetField = "flag"), + Histogram(sourceField = "passenger_count", targetField = "count", interval = 2.0), + DateHistogram(sourceField = "tpep_pickup_datetime", targetField = "date", fixedInterval = "1d") + ), + aggregations = aggregatorFactories + ).let { createTransform(it, it.id) } + updateTransformStartTime(transform) + } + } + fun `test transform with data stream`() { // Create a data stream. val dataStreamName = "transform-data-stream"