diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/math/Round.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/math/Round.java index 0fc8681075a1..2973abc37489 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/math/Round.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/math/Round.java @@ -62,6 +62,7 @@ in order to provide compatibility with the previous ROUND() implementation which we need to use different rounding modes on BigDecimal depending on whether the value is +ve or -ve to get consistent behaviour. */ +@SuppressWarnings("MethodMayBeStatic") @UdfDescription( name = "Round", category = FunctionCategory.MATHEMATICAL, @@ -100,7 +101,10 @@ public Double round(@UdfParameter final Double val, @UdfParameter final Integer @Udf(schemaProvider = "provideDecimalSchema") public BigDecimal round(@UdfParameter final BigDecimal val) { - return round(val, 0); + if (val == null) { + return null; + } + return roundBigDecimal(val, 0); } @Udf(schemaProvider = "provideDecimalSchemaWithDecimalPlaces") @@ -108,11 +112,17 @@ public BigDecimal round( @UdfParameter final BigDecimal val, @UdfParameter final Integer decimalPlaces ) { - return val == null ? null : roundBigDecimal(val, decimalPlaces); + if (val == null) { + return null; + } + return roundBigDecimal(val, decimalPlaces) + // Must maintain source scale for now. See https://github.com/confluentinc/ksql/issues/6235. + .setScale(val.scale(), RoundingMode.UNNECESSARY); } + @SuppressWarnings("unused") // Invoked via reflection @UdfSchemaProvider - public SqlType provideDecimalSchemaWithDecimalPlaces(final List params) { + public static SqlType provideDecimalSchemaWithDecimalPlaces(final List params) { final SqlType s0 = params.get(0); if (s0.baseType() != SqlBaseType.DECIMAL) { throw new KsqlException("The schema provider method for round expects a BigDecimal parameter" @@ -123,11 +133,15 @@ public SqlType provideDecimalSchemaWithDecimalPlaces(final List params) throw new KsqlException("The schema provider method for round expects an Integer parameter" + "type as second parameter."); } + + // While the user requested a certain number of decimal places, this can't be used to change + // the scale of the return type. See https://github.com/confluentinc/ksql/issues/6235. return s0; } + @SuppressWarnings("unused") // Invoked via reflection @UdfSchemaProvider - public SqlType provideDecimalSchema(final List params) { + public static SqlType provideDecimalSchema(final List params) { final SqlType s0 = params.get(0); if (s0.baseType() != SqlBaseType.DECIMAL) { throw new KsqlException("The schema provider method for round expects a BigDecimal parameter" @@ -137,7 +151,10 @@ public SqlType provideDecimalSchema(final List params) { return SqlDecimal.of(param.getPrecision() - param.getScale(), 0); } - private BigDecimal roundBigDecimal(final BigDecimal val, final int decimalPlaces) { + private static BigDecimal roundBigDecimal( + final BigDecimal val, + final int decimalPlaces + ) { final RoundingMode roundingMode = val.compareTo(BigDecimal.ZERO) > 0 ? RoundingMode.HALF_UP : RoundingMode.HALF_DOWN; return val.setScale(decimalPlaces, roundingMode); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/math/RoundTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/math/RoundTest.java index c91b2e5463c3..a484c7fec874 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/math/RoundTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/math/RoundTest.java @@ -152,67 +152,66 @@ public void shouldRoundDoubleWithDecimalPlacesNegative() { @Test public void shouldRoundBigDecimalWithDecimalPlacesPositive() { assertThat(udf.round(new BigDecimal("0"), 0), is(new BigDecimal("0"))); - assertThat(udf.round(new BigDecimal("1.0"), 0), is(new BigDecimal("1"))); - assertThat(udf.round(new BigDecimal("1.1"), 0), is(new BigDecimal("1"))); - assertThat(udf.round(new BigDecimal("1.5"), 0), is(new BigDecimal("2"))); - assertThat(udf.round(new BigDecimal("1.75"), 0), is(new BigDecimal("2"))); - assertThat(udf.round(new BigDecimal("100.1"), 0),is(new BigDecimal("100"))); - assertThat(udf.round(new BigDecimal("100.5"), 0), is(new BigDecimal("101"))); - assertThat(udf.round(new BigDecimal("100.75"), 0), is(new BigDecimal("101"))); - assertThat(udf.round(new BigDecimal("100.10"), 1), is(new BigDecimal("100.1"))); - assertThat(udf.round(new BigDecimal("100.11"), 1), is(new BigDecimal("100.1"))); - assertThat(udf.round(new BigDecimal("100.15"), 1), is(new BigDecimal("100.2"))); - assertThat(udf.round(new BigDecimal("100.17"), 1), is(new BigDecimal("100.2"))); - assertThat(udf.round(new BigDecimal("100.110"), 2), is(new BigDecimal("100.11"))); - assertThat(udf.round(new BigDecimal("100.111"), 2), is(new BigDecimal("100.11"))); - assertThat(udf.round(new BigDecimal("100.115"), 2), is(new BigDecimal("100.12"))); - assertThat(udf.round(new BigDecimal("100.117"), 2), is(new BigDecimal("100.12"))); - assertThat(udf.round(new BigDecimal("100.1110"), 3), is(new BigDecimal("100.111"))); - assertThat(udf.round(new BigDecimal("100.1111"), 3), is(new BigDecimal("100.111"))); - assertThat(udf.round(new BigDecimal("100.1115"), 3), is(new BigDecimal("100.112"))); - assertThat(udf.round(new BigDecimal("100.1117"), 3), is(new BigDecimal("100.112"))); - assertThat(udf.round(new BigDecimal("12345.67"), -1), is(new BigDecimal("1.235E4"))); - assertThat(udf.round(new BigDecimal("12345.67"), -2), is(new BigDecimal("1.23E4"))); - assertThat(udf.round(new BigDecimal("12345.67"), -3), is(new BigDecimal("1.2E4"))); - assertThat(udf.round(new BigDecimal("12345.67"), -4), is(new BigDecimal("1E4"))); - assertThat(udf.round(new BigDecimal("12345.67"), -5), is(new BigDecimal("0E5"))); + assertThat(udf.round(new BigDecimal("1.0"), 0), is(new BigDecimal("1.0"))); + assertThat(udf.round(new BigDecimal("1.1"), 0), is(new BigDecimal("1.0"))); + assertThat(udf.round(new BigDecimal("1.5"), 0), is(new BigDecimal("2.0"))); + assertThat(udf.round(new BigDecimal("1.75"), 0), is(new BigDecimal("2.00"))); + assertThat(udf.round(new BigDecimal("100.1"), 0),is(new BigDecimal("100.0"))); + assertThat(udf.round(new BigDecimal("100.5"), 0), is(new BigDecimal("101.0"))); + assertThat(udf.round(new BigDecimal("100.75"), 0), is(new BigDecimal("101.00"))); + assertThat(udf.round(new BigDecimal("100.10"), 1), is(new BigDecimal("100.10"))); + assertThat(udf.round(new BigDecimal("100.11"), 1), is(new BigDecimal("100.10"))); + assertThat(udf.round(new BigDecimal("100.15"), 1), is(new BigDecimal("100.20"))); + assertThat(udf.round(new BigDecimal("100.17"), 1), is(new BigDecimal("100.20"))); + assertThat(udf.round(new BigDecimal("100.110"), 2), is(new BigDecimal("100.110"))); + assertThat(udf.round(new BigDecimal("100.111"), 2), is(new BigDecimal("100.110"))); + assertThat(udf.round(new BigDecimal("100.115"), 2), is(new BigDecimal("100.120"))); + assertThat(udf.round(new BigDecimal("100.117"), 2), is(new BigDecimal("100.120"))); + assertThat(udf.round(new BigDecimal("100.1110"), 3), is(new BigDecimal("100.1110"))); + assertThat(udf.round(new BigDecimal("100.1111"), 3), is(new BigDecimal("100.1110"))); + assertThat(udf.round(new BigDecimal("100.1115"), 3), is(new BigDecimal("100.1120"))); + assertThat(udf.round(new BigDecimal("100.1117"), 3), is(new BigDecimal("100.1120"))); + assertThat(udf.round(new BigDecimal("12345.67"), -1), is(new BigDecimal("12350.00"))); + assertThat(udf.round(new BigDecimal("12345.67"), -2), is(new BigDecimal("12300.00"))); + assertThat(udf.round(new BigDecimal("12345.67"), -3), is(new BigDecimal("12000.00"))); + assertThat(udf.round(new BigDecimal("12345.67"), -4), is(new BigDecimal("10000.00"))); + assertThat(udf.round(new BigDecimal("12345.67"), -5), is(new BigDecimal("0.00"))); } @Test public void shouldRoundBigDecimalWithDecimalPlacesNegative() { - assertThat(udf.round(new BigDecimal("-1.0"), 0), is(new BigDecimal("-1"))); - assertThat(udf.round(new BigDecimal("-1.1"), 0), is(new BigDecimal("-1"))); - assertThat(udf.round(new BigDecimal("-1.5"), 0), is(new BigDecimal("-1"))); - assertThat(udf.round(new BigDecimal("-1.75"), 0), is(new BigDecimal("-2"))); - assertThat(udf.round(new BigDecimal("-100.1"), 0), is(new BigDecimal("-100"))); - assertThat(udf.round(new BigDecimal("-100.5"), 0), is(new BigDecimal("-100"))); - assertThat(udf.round(new BigDecimal("-100.75"), 0), is(new BigDecimal("-101"))); - assertThat(udf.round(new BigDecimal("-100.10"), 1), is(new BigDecimal("-100.1"))); - assertThat(udf.round(new BigDecimal("-100.11"), 1), is(new BigDecimal("-100.1"))); - assertThat(udf.round(new BigDecimal("-100.15"), 1), is(new BigDecimal("-100.1"))); - assertThat(udf.round(new BigDecimal("-100.17"), 1), is(new BigDecimal("-100.2"))); - assertThat(udf.round(new BigDecimal("-100.110"), 2), is(new BigDecimal("-100.11"))); - assertThat(udf.round(new BigDecimal("-100.111"), 2), is(new BigDecimal("-100.11"))); - assertThat(udf.round(new BigDecimal("-100.115"), 2), is(new BigDecimal("-100.11"))); - assertThat(udf.round(new BigDecimal("-100.117"), 2), is(new BigDecimal("-100.12"))); - assertThat(udf.round(new BigDecimal("-100.1110"), 3), is(new BigDecimal("-100.111"))); - assertThat(udf.round(new BigDecimal("-100.1111"), 3), is(new BigDecimal("-100.111"))); - assertThat(udf.round(new BigDecimal("-100.1115"), 3), is(new BigDecimal("-100.111"))); - assertThat(udf.round(new BigDecimal("-100.1117"), 3), is(new BigDecimal("-100.112"))); - assertThat(udf.round(new BigDecimal("-12345.67"), -1), is(new BigDecimal("-1.235E4"))); - assertThat(udf.round(new BigDecimal("-12345.67"), -2), is(new BigDecimal("-1.23E4"))); - assertThat(udf.round(new BigDecimal("-12345.67"), -3), is(new BigDecimal("-1.2E4"))); - assertThat(udf.round(new BigDecimal("-12345.67"), -4), is(new BigDecimal("-1E4"))); - assertThat(udf.round(new BigDecimal("-12345.67"), -5), is(new BigDecimal("-0E5"))); + assertThat(udf.round(new BigDecimal("-1.0"), 0), is(new BigDecimal("-1.0"))); + assertThat(udf.round(new BigDecimal("-1.1"), 0), is(new BigDecimal("-1.0"))); + assertThat(udf.round(new BigDecimal("-1.5"), 0), is(new BigDecimal("-1.0"))); + assertThat(udf.round(new BigDecimal("-1.75"), 0), is(new BigDecimal("-2.00"))); + assertThat(udf.round(new BigDecimal("-100.1"), 0), is(new BigDecimal("-100.0"))); + assertThat(udf.round(new BigDecimal("-100.5"), 0), is(new BigDecimal("-100.0"))); + assertThat(udf.round(new BigDecimal("-100.75"), 0), is(new BigDecimal("-101.00"))); + assertThat(udf.round(new BigDecimal("-100.10"), 1), is(new BigDecimal("-100.10"))); + assertThat(udf.round(new BigDecimal("-100.11"), 1), is(new BigDecimal("-100.10"))); + assertThat(udf.round(new BigDecimal("-100.15"), 1), is(new BigDecimal("-100.10"))); + assertThat(udf.round(new BigDecimal("-100.17"), 1), is(new BigDecimal("-100.20"))); + assertThat(udf.round(new BigDecimal("-100.110"), 2), is(new BigDecimal("-100.110"))); + assertThat(udf.round(new BigDecimal("-100.111"), 2), is(new BigDecimal("-100.110"))); + assertThat(udf.round(new BigDecimal("-100.115"), 2), is(new BigDecimal("-100.110"))); + assertThat(udf.round(new BigDecimal("-100.117"), 2), is(new BigDecimal("-100.120"))); + assertThat(udf.round(new BigDecimal("-100.1110"), 3), is(new BigDecimal("-100.1110"))); + assertThat(udf.round(new BigDecimal("-100.1111"), 3), is(new BigDecimal("-100.1110"))); + assertThat(udf.round(new BigDecimal("-100.1115"), 3), is(new BigDecimal("-100.1110"))); + assertThat(udf.round(new BigDecimal("-100.1117"), 3), is(new BigDecimal("-100.1120"))); + assertThat(udf.round(new BigDecimal("-12345.67"), -2), is(new BigDecimal("-12300.00"))); + assertThat(udf.round(new BigDecimal("-12345.67"), -3), is(new BigDecimal("-12000.00"))); + assertThat(udf.round(new BigDecimal("-12345.67"), -4), is(new BigDecimal("-10000.00"))); + assertThat(udf.round(new BigDecimal("-12345.67"), -5), is(new BigDecimal("0.00"))); } @Test - public void shouldHandleDoubleLiteralsEndingWith5ThatCannotBeRepresentedExactylyAsDoubles() { - assertThat(udf.round(new BigDecimal("265.335"), 2), is(new BigDecimal("265.34"))); - assertThat(udf.round(new BigDecimal("-265.335"), 2), is(new BigDecimal("-265.33"))); + public void shouldHandleDoubleLiteralsEndingWith5ThatCannotBeRepresentedExactlyAsDoubles() { + assertThat(udf.round(new BigDecimal("265.335"), 2), is(new BigDecimal("265.340"))); + assertThat(udf.round(new BigDecimal("-265.335"), 2), is(new BigDecimal("-265.330"))); - assertThat(udf.round(new BigDecimal("265.365"), 2), is(new BigDecimal("265.37"))); - assertThat(udf.round(new BigDecimal("-265.365"), 2), is(new BigDecimal("-265.36"))); + assertThat(udf.round(new BigDecimal("265.365"), 2), is(new BigDecimal("265.370"))); + assertThat(udf.round(new BigDecimal("-265.365"), 2), is(new BigDecimal("-265.360"))); } @Test diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_AVRO/6.1.0_1600280487284/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_AVRO/6.1.0_1600280487284/plan.json new file mode 100644 index 000000000000..65d173f5995f --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_AVRO/6.1.0_1600280487284/plan.json @@ -0,0 +1,140 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID STRING KEY, V DECIMAL(33, 16)) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='AVRO');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` STRING KEY, `V` DECIMAL(33, 16)", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.ID ID,\n ROUND(TEST.V) R0,\n ROUND(TEST.V, 0) R00\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` STRING KEY, `R0` DECIMAL(17, 0), `R00` DECIMAL(33, 16)", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "sourceSchema" : "`ID` STRING KEY, `V` DECIMAL(33, 16)" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "ROUND(V) AS R0", "ROUND(V, 0) AS R00" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.streams.max.task.idle.ms" : "0", + "ksql.query.error.max.queue.size" : "10", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.enable.metastore.backup" : "false", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_AVRO/6.1.0_1600280487284/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_AVRO/6.1.0_1600280487284/spec.json new file mode 100644 index 000000000000..867474d41652 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_AVRO/6.1.0_1600280487284/spec.json @@ -0,0 +1,110 @@ +{ + "version" : "6.1.0", + "timestamp" : 1600280487284, + "path" : "query-validation-tests/math.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`ID` STRING KEY, `V` DECIMAL(33, 16)", + "serdeOptions" : [ ] + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`ID` STRING KEY, `R0` DECIMAL(17, 0), `R00` DECIMAL(33, 16)", + "serdeOptions" : [ ] + } + }, + "testCase" : { + "name" : "round with large DECIMAL values - AVRO", + "inputs" : [ { + "topic" : "test_topic", + "key" : null, + "value" : { + "v" : 12345678987654321.2345678987654321 + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "R0" : 12345678987654321, + "R00" : 12345678987654321.0000000000000000 + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "schema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "V", + "type" : [ "null", { + "type" : "bytes", + "scale" : 16, + "precision" : 33, + "connect.version" : 1, + "connect.parameters" : { + "scale" : "16", + "connect.decimal.precision" : "33" + }, + "connect.name" : "org.apache.kafka.connect.data.Decimal", + "logicalType" : "decimal" + } ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + }, + "format" : "AVRO", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM test (ID STRING KEY, v DECIMAL(33, 16)) WITH (kafka_topic='test_topic', value_format='AVRO');", "CREATE STREAM OUTPUT AS SELECT ID, ROUND(v) as R0, ROUND(v, 0) as R00 FROM test;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`ID` STRING KEY, `R0` DECIMAL(17, 0), `R00` DECIMAL(33, 16)", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` STRING KEY, `V` DECIMAL(33, 16)", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_AVRO/6.1.0_1600280487284/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_AVRO/6.1.0_1600280487284/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_AVRO/6.1.0_1600280487284/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_DELIMITED/6.1.0_1600280487241/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_DELIMITED/6.1.0_1600280487241/plan.json new file mode 100644 index 000000000000..7a98af0461f4 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_DELIMITED/6.1.0_1600280487241/plan.json @@ -0,0 +1,140 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID STRING KEY, V DECIMAL(33, 16)) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='DELIMITED');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` STRING KEY, `V` DECIMAL(33, 16)", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.ID ID,\n ROUND(TEST.V) R0,\n ROUND(TEST.V, 0) R00,\n ROUND(TEST.V, 1) R1,\n ROUND(TEST.V, 2) R2,\n ROUND(TEST.V, 10) R10,\n ROUND(TEST.V, -1) 1R,\n ROUND(TEST.V, -2) 2R\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` STRING KEY, `R0` DECIMAL(17, 0), `R00` DECIMAL(33, 16), `R1` DECIMAL(33, 16), `R2` DECIMAL(33, 16), `R10` DECIMAL(33, 16), `1R` DECIMAL(33, 16), `2R` DECIMAL(33, 16)", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`ID` STRING KEY, `V` DECIMAL(33, 16)" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "ROUND(V) AS R0", "ROUND(V, 0) AS R00", "ROUND(V, 1) AS R1", "ROUND(V, 2) AS R2", "ROUND(V, 10) AS R10", "ROUND(V, -1) AS 1R", "ROUND(V, -2) AS 2R" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.streams.max.task.idle.ms" : "0", + "ksql.query.error.max.queue.size" : "10", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.enable.metastore.backup" : "false", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_DELIMITED/6.1.0_1600280487241/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_DELIMITED/6.1.0_1600280487241/spec.json new file mode 100644 index 000000000000..9512ebb3b68d --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_DELIMITED/6.1.0_1600280487241/spec.json @@ -0,0 +1,82 @@ +{ + "version" : "6.1.0", + "timestamp" : 1600280487241, + "path" : "query-validation-tests/math.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`ID` STRING KEY, `V` DECIMAL(33, 16)", + "serdeOptions" : [ ] + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`ID` STRING KEY, `R0` DECIMAL(17, 0), `R00` DECIMAL(33, 16), `R1` DECIMAL(33, 16), `R2` DECIMAL(33, 16), `R10` DECIMAL(33, 16), `1R` DECIMAL(33, 16), `2R` DECIMAL(33, 16)", + "serdeOptions" : [ ] + } + }, + "testCase" : { + "name" : "round with large DECIMAL values - DELIMITED", + "inputs" : [ { + "topic" : "test_topic", + "key" : null, + "value" : "12345678987654321.2345678987654321" + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : null, + "value" : "\"12,345,678,987,654,321\",\"12,345,678,987,654,321.0000000000000000\",\"12,345,678,987,654,321.2000000000000000\",\"12,345,678,987,654,321.2300000000000000\",\"12,345,678,987,654,321.2345678988000000\",\"12,345,678,987,654,320.0000000000000000\",\"12,345,678,987,654,300.0000000000000000\"" + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM test (ID STRING KEY, v DECIMAL(33, 16)) WITH (kafka_topic='test_topic', value_format='DELIMITED');", "CREATE STREAM OUTPUT AS SELECT ID, ROUND(v) as R0, ROUND(v, 0) as R00, ROUND(v, 1) as R1, ROUND(v, 2) as R2, ROUND(v, 10) as R10, ROUND(v , -1) as 1R , ROUND(v , -2) as 2R FROM test;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`ID` STRING KEY, `R0` DECIMAL(17, 0), `R00` DECIMAL(33, 16), `R1` DECIMAL(33, 16), `R2` DECIMAL(33, 16), `R10` DECIMAL(33, 16), `1R` DECIMAL(33, 16), `2R` DECIMAL(33, 16)", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` STRING KEY, `V` DECIMAL(33, 16)", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_DELIMITED/6.1.0_1600280487241/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_DELIMITED/6.1.0_1600280487241/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_DELIMITED/6.1.0_1600280487241/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_JSON/6.1.0_1600280487303/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_JSON/6.1.0_1600280487303/plan.json new file mode 100644 index 000000000000..3ad84a2318aa --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_JSON/6.1.0_1600280487303/plan.json @@ -0,0 +1,140 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID STRING KEY, V DECIMAL(33, 16)) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` STRING KEY, `V` DECIMAL(33, 16)", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.ID ID,\n ROUND(TEST.V) R0,\n ROUND(TEST.V, 0) R00\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` STRING KEY, `R0` DECIMAL(17, 0), `R00` DECIMAL(33, 16)", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ID` STRING KEY, `V` DECIMAL(33, 16)" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "ROUND(V) AS R0", "ROUND(V, 0) AS R00" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.streams.max.task.idle.ms" : "0", + "ksql.query.error.max.queue.size" : "10", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.enable.metastore.backup" : "false", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_JSON/6.1.0_1600280487303/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_JSON/6.1.0_1600280487303/spec.json new file mode 100644 index 000000000000..67726d967a5d --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_JSON/6.1.0_1600280487303/spec.json @@ -0,0 +1,87 @@ +{ + "version" : "6.1.0", + "timestamp" : 1600280487303, + "path" : "query-validation-tests/math.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`ID` STRING KEY, `V` DECIMAL(33, 16)", + "serdeOptions" : [ ] + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`ID` STRING KEY, `R0` DECIMAL(17, 0), `R00` DECIMAL(33, 16)", + "serdeOptions" : [ ] + } + }, + "testCase" : { + "name" : "round with large DECIMAL values - JSON", + "inputs" : [ { + "topic" : "test_topic", + "key" : null, + "value" : { + "v" : 12345678987654321.2345678987654321 + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "R0" : 12345678987654321, + "R00" : 12345678987654321.0000000000000000 + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM test (ID STRING KEY, v DECIMAL(33, 16)) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT ID, ROUND(v) as R0, ROUND(v, 0) as R00 FROM test;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`ID` STRING KEY, `R0` DECIMAL(17, 0), `R00` DECIMAL(33, 16)", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` STRING KEY, `V` DECIMAL(33, 16)", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_JSON/6.1.0_1600280487303/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_JSON/6.1.0_1600280487303/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/math_-_round_with_large_DECIMAL_values_-_JSON/6.1.0_1600280487303/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/math.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/math.json index 46bd68de1809..2d44cf6a4ac3 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/math.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/math.json @@ -210,7 +210,7 @@ ] }, { - "name": "round with large DECIMAL values", + "name": "round with large DECIMAL values - DELIMITED", "statements": [ "CREATE STREAM test (ID STRING KEY, v DECIMAL(33, 16)) WITH (kafka_topic='test_topic', value_format='DELIMITED');", "CREATE STREAM OUTPUT AS SELECT ID, ROUND(v) as R0, ROUND(v, 0) as R00, ROUND(v, 1) as R1, ROUND(v, 2) as R2, ROUND(v, 10) as R10, ROUND(v , -1) as 1R , ROUND(v , -2) as 2R FROM test;" @@ -233,6 +233,29 @@ } ] } + }, + { + "name": "round with large DECIMAL values", + "format": ["AVRO", "JSON"], + "statements": [ + "CREATE STREAM test (ID STRING KEY, v DECIMAL(33, 16)) WITH (kafka_topic='test_topic', value_format='{FORMAT}');", + "CREATE STREAM OUTPUT AS SELECT ID, ROUND(v) as R0, ROUND(v, 0) as R00 FROM test;" + ], + "inputs": [ + {"topic": "test_topic", "value": {"v": 12345678987654321.2345678987654321}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"R0": 12345678987654321, "R00": 12345678987654321.0000000000000000}} + ], + "post": { + "sources": [ + { + "name": "OUTPUT", + "type": "stream", + "schema": "ID STRING KEY, R0 DECIMAL(17,0), R00 DECIMAL(33,16)" + } + ] + } } ] } \ No newline at end of file