From ba833f7849f101f5faacba490faad30a551795c7 Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Mon, 6 Jul 2020 16:17:24 +0100 Subject: [PATCH] fix: ksqlDB should not truncate decimals (#5763) fixes: https://github.com/confluentinc/ksql/issues/4710 Fixes the `JSON` / `JSON_SR` formats to not remove trailing zeros from decimal values, e.g. `14.7600` should be processed and seriailized as `14.6700` and _not_ `14.67`. Co-authored-by: Andy Coates (cherry picked from commit 2b3ca33c1cee099ebc28f407c307e89a0a957b73) --- .../confluent/ksql/test/model/RecordNode.java | 7 +- .../ksql/test/serde/ConnectSerdeSupplier.java | 18 +- .../io/confluent/ksql/test/tools/Record.java | 2 +- .../ksql/test/tools/TestExecutor.java | 4 +- .../ksql/test/tools/TopicInfoCache.java | 55 +++- .../ksql/test/model/RecordNodeTest.java | 31 +++ .../ksql/test/rest/RestTestExecutor.java | 2 +- .../ksql/test/tools/TestJsonMapperTest.java | 23 ++ .../6.1.0_1593789506560/plan.json | 126 +++++++++ .../6.1.0_1593789506560/spec.json | 107 ++++++++ .../6.1.0_1593789506560/topology | 13 + .../6.1.0_1593789506587/plan.json | 126 +++++++++ .../6.1.0_1593789506587/spec.json | 76 ++++++ .../6.1.0_1593789506587/topology | 13 + .../6.1.0_1593789506613/plan.json | 126 +++++++++ .../6.1.0_1593789506613/spec.json | 105 ++++++++ .../6.1.0_1593789506613/topology | 13 + .../6.0.0_1583419431528/spec.json | 2 +- .../6.0.0_1588893908853/spec.json | 2 +- .../6.0.0_1589910855902/spec.json | 2 +- .../6.1.0_1593789506599/plan.json | 126 +++++++++ .../6.1.0_1593789506599/spec.json | 84 ++++++ .../6.1.0_1593789506599/topology | 13 + .../query-validation-tests/decimal.json | 253 ++++++++++++------ 24 files changed, 1222 insertions(+), 107 deletions(-) create mode 100644 ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/model/RecordNodeTest.java create mode 100644 ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/tools/TestJsonMapperTest.java create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_AVRO_should_not_trim_trailing_zeros/6.1.0_1593789506560/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_AVRO_should_not_trim_trailing_zeros/6.1.0_1593789506560/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_AVRO_should_not_trim_trailing_zeros/6.1.0_1593789506560/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_DELIMITED_should_not_trim_trailing_zeros/6.1.0_1593789506587/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_DELIMITED_should_not_trim_trailing_zeros/6.1.0_1593789506587/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_DELIMITED_should_not_trim_trailing_zeros/6.1.0_1593789506587/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_SR_should_not_trim_trailing_zeros/6.1.0_1593789506613/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_SR_should_not_trim_trailing_zeros/6.1.0_1593789506613/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_SR_should_not_trim_trailing_zeros/6.1.0_1593789506613/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_should_not_trim_trailing_zeros/6.1.0_1593789506599/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_should_not_trim_trailing_zeros/6.1.0_1593789506599/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_should_not_trim_trailing_zeros/6.1.0_1593789506599/topology diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/model/RecordNode.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/model/RecordNode.java index 88dc3a3c008b..6b3ff49739c7 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/model/RecordNode.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/model/RecordNode.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonSerializer; @@ -30,6 +31,7 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.NullNode; import com.fasterxml.jackson.databind.node.TextNode; +import com.google.common.annotations.VisibleForTesting; import io.confluent.ksql.test.tools.Record; import io.confluent.ksql.test.tools.exceptions.InvalidFieldException; import io.confluent.ksql.test.tools.exceptions.MissingFieldException; @@ -42,6 +44,8 @@ public final class RecordNode { private static final ObjectMapper objectMapper = new ObjectMapper() + .enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS) + .enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN) .setNodeFactory(JsonNodeFactory.withExactBigDecimals(true)); private final String topicName; @@ -50,7 +54,8 @@ public final class RecordNode { private final Optional timestamp; private final Optional window; - private RecordNode( + @VisibleForTesting + RecordNode( final String topicName, final Optional key, final JsonNode value, diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/ConnectSerdeSupplier.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/ConnectSerdeSupplier.java index 88661c5c7c31..753aac4fe444 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/ConnectSerdeSupplier.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/ConnectSerdeSupplier.java @@ -19,6 +19,7 @@ import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.ksql.test.TestFrameworkException; import io.confluent.ksql.util.DecimalUtil; import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; @@ -165,10 +166,19 @@ private Object specToConnect(final Object spec, final Schema schema) { return struct; case BYTES: if (DecimalUtil.isDecimal(schema)) { - return DecimalUtil.cast( - (String) spec, - DecimalUtil.precision(schema), - DecimalUtil.scale(schema)); + if (spec instanceof BigDecimal) { + return DecimalUtil.ensureFit((BigDecimal) spec, schema); + } + + if (spec instanceof String) { + // Supported for legacy reasons... + return DecimalUtil.cast( + (String) spec, + DecimalUtil.precision(schema), + DecimalUtil.scale(schema)); + } + + throw new TestFrameworkException("DECIMAL type requires JSON number in test data"); } throw new RuntimeException("Unexpected BYTES type " + schema.name()); default: diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/Record.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/Record.java index 44fa8dee37a8..9b848af8a405 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/Record.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/Record.java @@ -98,7 +98,7 @@ public Optional getJsonValue() { return jsonValue; } - public Record withKey(final Object key) { + public Record withKeyValue(final Object key, final Object value) { return new Record( topicName, key, diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java index 7f8a79bcf5d6..7dbc5aa7b6fc 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java @@ -250,7 +250,7 @@ private void validateTopicData( int i = 0; while (actualIt.hasNext() && expectedIt.hasNext()) { - final Record expectedRecord = topicInfo.coerceRecordKey(expectedIt.next(), i); + final Record expectedRecord = topicInfo.coerceRecord(expectedIt.next(), i); final ProducerRecord actualProducerRecord = actualIt.next(); validateCreatedMessage( @@ -312,7 +312,7 @@ private void pipeRecordsFromProvidedInput( final TopicInfo topicInfo = topicInfoCache.get(record.getTopicName()); - final Record coerced = topicInfo.coerceRecordKey(record, inputRecordIndex); + final Record coerced = topicInfo.coerceRecord(record, inputRecordIndex); processSingleRecord( coerced.asProducerRecord(), diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TopicInfoCache.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TopicInfoCache.java index 7bd824af90c5..767ae7427c03 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TopicInfoCache.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TopicInfoCache.java @@ -34,6 +34,7 @@ import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.serde.KeyFormat; import io.confluent.ksql.serde.ValueFormat; +import io.confluent.ksql.serde.kafka.KafkaFormat; import io.confluent.ksql.test.TestFrameworkException; import io.confluent.ksql.test.serde.SerdeSupplier; import io.confluent.ksql.test.utils.SerdeUtil; @@ -41,7 +42,6 @@ import java.util.List; import java.util.OptionalLong; import java.util.Set; -import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.kafka.common.serialization.Deserializer; @@ -239,7 +239,7 @@ public Deserializer getValueDeserializer() { } /** - * Coerce the key value to the correct type. + * Coerce the key & value to the correct type. * *

The type of the key loaded from the JSON test case file may not be the exact match on * type, e.g. JSON will load a small number as an integer, but the key type of the source might @@ -249,13 +249,14 @@ public Deserializer getValueDeserializer() { * @param msgIndex the index of the message, displayed in the error message * @return a new Record with the correct key type. */ - public Record coerceRecordKey( + public Record coerceRecord( final Record record, final int msgIndex ) { try { - final Object coerced = keyCoercer().apply(record.rawKey()); - return record.withKey(coerced); + final Object coercedKey = coerceKey(record.rawKey()); + final Object coercedValue = coerceValue(record.value()); + return record.withKeyValue(coercedKey, coercedValue); } catch (final Exception e) { throw new AssertionError( "Topic '" + record.getTopicName() + "', message " + msgIndex @@ -265,11 +266,11 @@ public Record coerceRecordKey( } } - private Function keyCoercer() { + private Object coerceKey(final Object key) { if (schema.key().isEmpty()) { - // No key column - pass the key in as a string to allow tests to pass in data that should - // be ignored: - return key -> key == null ? null : String.valueOf(key); + // No key column + // - pass the key in as a string to allow tests to pass in data that should be ignored: + return key == null ? null : String.valueOf(key); } final SqlType keyType = schema @@ -277,9 +278,9 @@ private Function keyCoercer() { .get(0) .type(); - return key -> DefaultSqlValueCoercer.INSTANCE + return DefaultSqlValueCoercer.INSTANCE .coerce(key, keyType) - .orElseThrow(() -> new AssertionError("Invalid key value for topic " + topicName + "." + .orElseThrow(() -> new AssertionError("Invalid key for topic " + topicName + "." + System.lineSeparator() + "Expected KeyType: " + keyType + System.lineSeparator() @@ -291,5 +292,37 @@ private Function keyCoercer() { )) .orElse(null); } + + private Object coerceValue(final Object value) { + // Only KAFKA format needs any value coercion at the moment: + if (!(valueFormat.getFormat() instanceof KafkaFormat)) { + return value; + } + + if (schema.value().size() != 1) { + // Wrong column count: + // - pass the value as-is for negative testing: + return value == null ? null : String.valueOf(value); + } + + final SqlType valueType = schema + .value() + .get(0) + .type(); + + return DefaultSqlValueCoercer.INSTANCE + .coerce(value, valueType) + .orElseThrow(() -> new AssertionError("Invalid value for topic " + topicName + "." + + System.lineSeparator() + + "Expected ValueType: " + valueType + + System.lineSeparator() + + "Actual ValueType: " + SchemaConverters.javaToSqlConverter() + .toSqlType(value.getClass()) + + ", value: " + value + "." + + System.lineSeparator() + + "This is likely caused by the value type in the test-case not matching the schema." + )) + .orElse(null); + } } } diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/model/RecordNodeTest.java b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/model/RecordNodeTest.java new file mode 100644 index 000000000000..b4b5d892c001 --- /dev/null +++ b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/model/RecordNodeTest.java @@ -0,0 +1,31 @@ +package io.confluent.ksql.test.model; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import com.fasterxml.jackson.databind.node.DecimalNode; +import io.confluent.ksql.test.tools.Record; +import java.math.BigDecimal; +import java.util.Optional; +import org.junit.Test; + +public class RecordNodeTest { + + @Test + public void shouldUseExactDecimals() { + // Given: + final RecordNode node = new RecordNode( + "topic", + Optional.empty(), + new DecimalNode(new BigDecimal("10.000")), + Optional.empty(), + Optional.empty() + ); + + // When: + final Record result = node.build(); + + // Then: + assertThat(result.value(), is(new BigDecimal("10.000"))); + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestTestExecutor.java b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestTestExecutor.java index e3f18e7b7ed0..a3140e875880 100644 --- a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestTestExecutor.java +++ b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestTestExecutor.java @@ -208,7 +208,7 @@ private void produceInputs(final Map> inputs) { for (int idx = 0; idx < records.size(); idx++) { final Record record = records.get(idx); - final Record coerced = topicInfo.coerceRecordKey(record, idx); + final Record coerced = topicInfo.coerceRecord(record, idx); producer.send(new ProducerRecord<>( topicName, diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/tools/TestJsonMapperTest.java b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/tools/TestJsonMapperTest.java new file mode 100644 index 000000000000..e6ba633ee995 --- /dev/null +++ b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/tools/TestJsonMapperTest.java @@ -0,0 +1,23 @@ +package io.confluent.ksql.test.tools; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasEntry; + +import java.math.BigDecimal; +import java.util.Map; +import org.junit.Test; + +public class TestJsonMapperTest { + + @Test + public void shouldLoadExactDecimals() throws Exception { + // Given: + final String json = "{\"DEC\": 1.0000}"; + + // When: + final Map result = TestJsonMapper.INSTANCE.get().readValue(json, Map.class); + + // Then: + assertThat(result, hasEntry("DEC", new BigDecimal("1.0000"))); + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_AVRO_should_not_trim_trailing_zeros/6.1.0_1593789506560/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_AVRO_should_not_trim_trailing_zeros/6.1.0_1593789506560/plan.json new file mode 100644 index 000000000000..596f51eba8a2 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_AVRO_should_not_trim_trailing_zeros/6.1.0_1593789506560/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ID STRING KEY, DEC DECIMAL(6, 4)) WITH (KAFKA_TOPIC='test', VALUE_FORMAT='Avro');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ID` STRING KEY, `DEC` DECIMAL(6, 4)", + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` STRING KEY, `DEC` DECIMAL(6, 4)", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "sourceSchema" : "`ID` STRING KEY, `DEC` DECIMAL(6, 4)" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "DEC AS DEC" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.error.classifier.regex" : "" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_AVRO_should_not_trim_trailing_zeros/6.1.0_1593789506560/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_AVRO_should_not_trim_trailing_zeros/6.1.0_1593789506560/spec.json new file mode 100644 index 000000000000..1642c6c6b4f1 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_AVRO_should_not_trim_trailing_zeros/6.1.0_1593789506560/spec.json @@ -0,0 +1,107 @@ +{ + "version" : "6.1.0", + "timestamp" : 1593789506560, + "path" : "query-validation-tests/decimal.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "testCase" : { + "name" : "AVRO should not trim trailing zeros", + "inputs" : [ { + "topic" : "test", + "key" : null, + "value" : { + "DEC" : 10.0000 + } + }, { + "topic" : "test", + "key" : null, + "value" : { + "DEC" : 1.0000 + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "DEC" : 10.0000 + } + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "DEC" : 1.0000 + } + } ], + "topics" : [ { + "name" : "test", + "schema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "DEC", + "type" : [ "null", { + "type" : "bytes", + "scale" : 4, + "precision" : 6, + "connect.version" : 1, + "connect.parameters" : { + "scale" : "4", + "connect.decimal.precision" : "6" + }, + "connect.name" : "org.apache.kafka.connect.data.Decimal", + "logicalType" : "decimal" + } ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + }, + "format" : "AVRO", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (ID STRING KEY, dec DECIMAL(6,4)) WITH (kafka_topic='test', value_format='Avro');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "stream", + "schema" : "ID STRING KEY, DEC DECIMAL(6,4)" + }, { + "name" : "OUTPUT", + "type" : "stream", + "schema" : "ID STRING KEY, DEC DECIMAL(6,4)" + } ], + "topics" : { + "topics" : [ { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4 + }, { + "name" : "test", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_AVRO_should_not_trim_trailing_zeros/6.1.0_1593789506560/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_AVRO_should_not_trim_trailing_zeros/6.1.0_1593789506560/topology new file mode 100644 index 000000000000..9396b997ae94 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_AVRO_should_not_trim_trailing_zeros/6.1.0_1593789506560/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_DELIMITED_should_not_trim_trailing_zeros/6.1.0_1593789506587/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_DELIMITED_should_not_trim_trailing_zeros/6.1.0_1593789506587/plan.json new file mode 100644 index 000000000000..8d08d37c41fb --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_DELIMITED_should_not_trim_trailing_zeros/6.1.0_1593789506587/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ID STRING KEY, DEC DECIMAL(6, 4)) WITH (KAFKA_TOPIC='test', VALUE_FORMAT='Delimited');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ID` STRING KEY, `DEC` DECIMAL(6, 4)", + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` STRING KEY, `DEC` DECIMAL(6, 4)", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + } + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`ID` STRING KEY, `DEC` DECIMAL(6, 4)" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "DEC AS DEC" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.error.classifier.regex" : "" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_DELIMITED_should_not_trim_trailing_zeros/6.1.0_1593789506587/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_DELIMITED_should_not_trim_trailing_zeros/6.1.0_1593789506587/spec.json new file mode 100644 index 000000000000..a3736a52daa2 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_DELIMITED_should_not_trim_trailing_zeros/6.1.0_1593789506587/spec.json @@ -0,0 +1,76 @@ +{ + "version" : "6.1.0", + "timestamp" : 1593789506587, + "path" : "query-validation-tests/decimal.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "testCase" : { + "name" : "DELIMITED should not trim trailing zeros", + "inputs" : [ { + "topic" : "test", + "key" : null, + "value" : "10.0000" + }, { + "topic" : "test", + "key" : null, + "value" : "1.0000" + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : null, + "value" : "10.0000" + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : "1.0000" + } ], + "topics" : [ { + "name" : "test", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (ID STRING KEY, dec DECIMAL(6,4)) WITH (kafka_topic='test', value_format='Delimited');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "stream", + "schema" : "ID STRING KEY, DEC DECIMAL(6,4)" + }, { + "name" : "OUTPUT", + "type" : "stream", + "schema" : "ID STRING KEY, DEC DECIMAL(6,4)" + } ], + "topics" : { + "topics" : [ { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "test", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_DELIMITED_should_not_trim_trailing_zeros/6.1.0_1593789506587/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_DELIMITED_should_not_trim_trailing_zeros/6.1.0_1593789506587/topology new file mode 100644 index 000000000000..9396b997ae94 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_DELIMITED_should_not_trim_trailing_zeros/6.1.0_1593789506587/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_SR_should_not_trim_trailing_zeros/6.1.0_1593789506613/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_SR_should_not_trim_trailing_zeros/6.1.0_1593789506613/plan.json new file mode 100644 index 000000000000..61fae3acc656 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_SR_should_not_trim_trailing_zeros/6.1.0_1593789506613/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ID STRING KEY, DEC DECIMAL(6, 4)) WITH (KAFKA_TOPIC='test', VALUE_FORMAT='JSON_SR');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ID` STRING KEY, `DEC` DECIMAL(6, 4)", + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON_SR" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` STRING KEY, `DEC` DECIMAL(6, 4)", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON_SR" + } + } + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON_SR" + } + }, + "sourceSchema" : "`ID` STRING KEY, `DEC` DECIMAL(6, 4)" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "DEC AS DEC" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON_SR" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.error.classifier.regex" : "" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_SR_should_not_trim_trailing_zeros/6.1.0_1593789506613/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_SR_should_not_trim_trailing_zeros/6.1.0_1593789506613/spec.json new file mode 100644 index 000000000000..5ce9a970f367 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_SR_should_not_trim_trailing_zeros/6.1.0_1593789506613/spec.json @@ -0,0 +1,105 @@ +{ + "version" : "6.1.0", + "timestamp" : 1593789506613, + "path" : "query-validation-tests/decimal.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "testCase" : { + "name" : "JSON_SR should not trim trailing zeros", + "inputs" : [ { + "topic" : "test", + "key" : null, + "value" : { + "DEC" : 10.0 + } + }, { + "topic" : "test", + "key" : null, + "value" : { + "DEC" : 1.0000 + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "DEC" : 10.0 + } + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "DEC" : 1.0000 + } + } ], + "topics" : [ { + "name" : "test", + "schema" : { + "type" : "object", + "properties" : { + "DEC" : { + "connect.index" : 0, + "oneOf" : [ { + "type" : "null" + }, { + "type" : "number", + "title" : "org.apache.kafka.connect.data.Decimal", + "connect.version" : 1, + "connect.type" : "bytes", + "connect.parameters" : { + "scale" : "4", + "connect.decimal.precision" : "6" + } + } ] + } + } + }, + "format" : "JSON", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (ID STRING KEY, dec DECIMAL(6,4)) WITH (kafka_topic='test', value_format='JSON_SR');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "stream", + "schema" : "ID STRING KEY, DEC DECIMAL(6,4)" + }, { + "name" : "OUTPUT", + "type" : "stream", + "schema" : "ID STRING KEY, DEC DECIMAL(6,4)" + } ], + "topics" : { + "topics" : [ { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON_SR" + }, + "partitions" : 4 + }, { + "name" : "test", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON_SR" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_SR_should_not_trim_trailing_zeros/6.1.0_1593789506613/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_SR_should_not_trim_trailing_zeros/6.1.0_1593789506613/topology new file mode 100644 index 000000000000..9396b997ae94 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_SR_should_not_trim_trailing_zeros/6.1.0_1593789506613/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_scale_in_data_less_than_scale_in_type/6.0.0_1583419431528/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_scale_in_data_less_than_scale_in_type/6.0.0_1583419431528/spec.json index 0cc208329a2c..bcbec7692ddc 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_scale_in_data_less_than_scale_in_type/6.0.0_1583419431528/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_scale_in_data_less_than_scale_in_type/6.0.0_1583419431528/spec.json @@ -79,7 +79,7 @@ "topic" : "OUTPUT", "key" : "", "value" : { - "DEC" : 0.00010 + "DEC" : 0.0001 } } ], "topics" : [ { diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_scale_in_data_less_than_scale_in_type/6.0.0_1588893908853/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_scale_in_data_less_than_scale_in_type/6.0.0_1588893908853/spec.json index 8516370ca2a8..190cc363cc13 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_scale_in_data_less_than_scale_in_type/6.0.0_1588893908853/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_scale_in_data_less_than_scale_in_type/6.0.0_1588893908853/spec.json @@ -79,7 +79,7 @@ "topic" : "OUTPUT", "key" : "", "value" : { - "DEC" : 0.00010 + "DEC" : 0.0001 } } ], "topics" : [ { diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_scale_in_data_less_than_scale_in_type/6.0.0_1589910855902/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_scale_in_data_less_than_scale_in_type/6.0.0_1589910855902/spec.json index 86c63767d805..597190da6bf0 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_scale_in_data_less_than_scale_in_type/6.0.0_1589910855902/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_scale_in_data_less_than_scale_in_type/6.0.0_1589910855902/spec.json @@ -79,7 +79,7 @@ "topic" : "OUTPUT", "key" : "", "value" : { - "DEC" : 0.00010 + "DEC" : 0.0001 } } ], "topics" : [ { diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_should_not_trim_trailing_zeros/6.1.0_1593789506599/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_should_not_trim_trailing_zeros/6.1.0_1593789506599/plan.json new file mode 100644 index 000000000000..a1e0f64bef88 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_should_not_trim_trailing_zeros/6.1.0_1593789506599/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ID STRING KEY, DEC DECIMAL(6, 4)) WITH (KAFKA_TOPIC='test', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ID` STRING KEY, `DEC` DECIMAL(6, 4)", + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` STRING KEY, `DEC` DECIMAL(6, 4)", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ID` STRING KEY, `DEC` DECIMAL(6, 4)" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "DEC AS DEC" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.error.classifier.regex" : "" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_should_not_trim_trailing_zeros/6.1.0_1593789506599/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_should_not_trim_trailing_zeros/6.1.0_1593789506599/spec.json new file mode 100644 index 000000000000..3e3b3e42bcac --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_should_not_trim_trailing_zeros/6.1.0_1593789506599/spec.json @@ -0,0 +1,84 @@ +{ + "version" : "6.1.0", + "timestamp" : 1593789506599, + "path" : "query-validation-tests/decimal.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "testCase" : { + "name" : "JSON should not trim trailing zeros", + "inputs" : [ { + "topic" : "test", + "key" : null, + "value" : { + "DEC" : 10.0 + } + }, { + "topic" : "test", + "key" : null, + "value" : { + "DEC" : 1.0000 + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "DEC" : 10.0 + } + }, { + "topic" : "OUTPUT", + "key" : null, + "value" : { + "DEC" : 1.0000 + } + } ], + "topics" : [ { + "name" : "test", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (ID STRING KEY, dec DECIMAL(6,4)) WITH (kafka_topic='test', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "stream", + "schema" : "ID STRING KEY, DEC DECIMAL(6,4)" + }, { + "name" : "OUTPUT", + "type" : "stream", + "schema" : "ID STRING KEY, DEC DECIMAL(6,4)" + } ], + "topics" : { + "topics" : [ { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "test", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_should_not_trim_trailing_zeros/6.1.0_1593789506599/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_should_not_trim_trailing_zeros/6.1.0_1593789506599/topology new file mode 100644 index 000000000000..9396b997ae94 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/decimal_-_JSON_should_not_trim_trailing_zeros/6.1.0_1593789506599/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/decimal.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/decimal.json index 43cf6e4d3fa8..ea88ab0e209b 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/decimal.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/decimal.json @@ -21,10 +21,10 @@ "CREATE STREAM TEST2 AS SELECT * FROM TEST;" ], "inputs": [ - {"topic": "test", "value": {"DEC": "10.1234512345123451234"}} + {"topic": "test", "value": {"DEC": 10.1234512345123451234}} ], "outputs": [ - {"topic": "TEST2", "value": {"DEC": "10.1234512345123451234"}} + {"topic": "TEST2", "value": {"DEC": 10.1234512345123451234}} ] }, { @@ -38,10 +38,10 @@ "CREATE STREAM TEST2 AS SELECT * FROM TEST;" ], "inputs": [ - {"topic": "test", "value": {"DEC": "10.1234512345123451234"}} + {"topic": "test", "value": {"DEC": 10.1234512345123451234}} ], "outputs": [ - {"topic": "TEST2", "value": {"DEC": "10.1234512345123451234"}} + {"topic": "TEST2", "value": {"DEC": 10.1234512345123451234}} ] }, { @@ -64,7 +64,50 @@ {"topic": "OUTPUT", "value": {"DEC": 0.1}}, {"topic": "OUTPUT", "value": {"DEC": 0.01}}, {"topic": "OUTPUT", "value": {"DEC": 0.001}}, - {"topic": "OUTPUT", "value": {"DEC": 0.00010}} + {"topic": "OUTPUT", "value": {"DEC": 0.0001}} + ], + "post": { + "sources": [ + {"name": "INPUT", "type": "stream", "schema": "ID STRING KEY, DEC DECIMAL(6,4)"}, + {"name": "OUTPUT", "type": "stream", "schema": "ID STRING KEY, DEC DECIMAL(6,4)"} + ] + } + }, + { + "name": "AVRO should not trim trailing zeros", + "comment": "Avro decimals ALWAYS have a scale that matches the schema.", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, dec DECIMAL(6,4)) WITH (kafka_topic='test', value_format='Avro');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "test", "value": {"DEC": 10.0000}}, + {"topic": "test", "value": {"DEC": 1.0000}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"DEC": 10.0000}}, + {"topic": "OUTPUT", "value": {"DEC": 1.0000}} + ], + "post": { + "sources": [ + {"name": "INPUT", "type": "stream", "schema": "ID STRING KEY, DEC DECIMAL(6,4)"}, + {"name": "OUTPUT", "type": "stream", "schema": "ID STRING KEY, DEC DECIMAL(6,4)"} + ] + } + }, + { + "name": "DELIMITED should not trim trailing zeros", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, dec DECIMAL(6,4)) WITH (kafka_topic='test', value_format='Delimited');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "test", "value": "10.0000"}, + {"topic": "test", "value": "1.0000"} + ], + "outputs": [ + {"topic": "OUTPUT", "value": "10.0000"}, + {"topic": "OUTPUT", "value": "1.0000"} ], "post": { "sources": [ @@ -74,9 +117,7 @@ } }, { - "enabled": false, "name": "JSON should not trim trailing zeros", - "comment": "Disabled until https://github.com/confluentinc/ksql/issues/4710 is done", "statements": [ "CREATE STREAM INPUT (ID STRING KEY, dec DECIMAL(6,4)) WITH (kafka_topic='test', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" @@ -91,8 +132,52 @@ ], "post": { "sources": [ - {"name": "INPUT", "type": "stream", "schema": "ROWKEY STRING KEY, DEC DECIMAL(6,4)"}, - {"name": "OUTPUT", "type": "stream", "schema": "ROWKEY STRING KEY, DEC DECIMAL(6,4)"} + {"name": "INPUT", "type": "stream", "schema": "ID STRING KEY, DEC DECIMAL(6,4)"}, + {"name": "OUTPUT", "type": "stream", "schema": "ID STRING KEY, DEC DECIMAL(6,4)"} + ] + } + }, + { + "name": "JSON_SR should not trim trailing zeros", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, dec DECIMAL(6,4)) WITH (kafka_topic='test', value_format='JSON_SR');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "test", "value": {"DEC": 10.0}}, + {"topic": "test", "value": {"DEC": 1.0000}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"DEC": 10.0}}, + {"topic": "OUTPUT", "value": {"DEC": 1.0000}} + ], + "post": { + "sources": [ + {"name": "INPUT", "type": "stream", "schema": "ID STRING KEY, DEC DECIMAL(6,4)"}, + {"name": "OUTPUT", "type": "stream", "schema": "ID STRING KEY, DEC DECIMAL(6,4)"} + ] + } + }, + { + "enabled": false, + "comment": "Enable once https://github.com/confluentinc/ksql/issues/5762 fixed", + "name": "PROTOBUF should not trim trailing zeros", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, dec DECIMAL(6,4)) WITH (kafka_topic='test', value_format='PROTOBUF');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "test", "value": {"DEC": 10.0}}, + {"topic": "test", "value": {"DEC": 1.0000}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"DEC": 10.0}}, + {"topic": "OUTPUT", "value": {"DEC": 1.0000}} + ], + "post": { + "sources": [ + {"name": "INPUT", "type": "stream", "schema": "ID STRING KEY, DEC DECIMAL(6,4)"}, + {"name": "OUTPUT", "type": "stream", "schema": "ID STRING KEY, DEC DECIMAL(6,4)"} ] } }, @@ -103,10 +188,10 @@ "CREATE STREAM TEST2 AS SELECT ID, -dec AS negated FROM TEST;" ], "inputs": [ - {"topic": "test", "value": {"DEC": "10.12345"}} + {"topic": "test", "value": {"DEC": 10.12345}} ], "outputs": [ - {"topic": "TEST2", "value": {"NEGATED": "-10.12345"}} + {"topic": "TEST2", "value": {"NEGATED": -10.12345}} ] }, { @@ -116,9 +201,9 @@ "CREATE STREAM TEST2 AS SELECT ID, (a + b) AS RESULT FROM TEST;" ], "inputs": [ - {"topic": "test", "value": {"A": "10.01", "B": "5.10"}}, - {"topic": "test", "value": {"A": "10.01", "B": "-5.00"}}, - {"topic": "test", "value": {"A": "10.01", "B": "0.00"}} + {"topic": "test", "value": {"A": 10.01, "B": 5.10}}, + {"topic": "test", "value": {"A": 10.01, "B": -5.00}}, + {"topic": "test", "value": {"A": 10.01, "B": 0.00}} ], "outputs": [ {"topic": "TEST2", "value": {"RESULT": "15.11"}}, @@ -133,9 +218,9 @@ "CREATE STREAM TEST2 AS SELECT ID, (a + b) AS RESULT FROM TEST;" ], "inputs": [ - {"topic": "test", "value": {"A": "10.01", "B": 5.1}}, - {"topic": "test", "value": {"A": "10.01", "B": -5.0}}, - {"topic": "test", "value": {"A": "10.01", "B": 0.0}} + {"topic": "test", "value": {"A": 10.01, "B": 5.1}}, + {"topic": "test", "value": {"A": 10.01, "B": -5.0}}, + {"topic": "test", "value": {"A": 10.01, "B": 0.0}} ], "outputs": [ {"topic": "TEST2", "value": {"RESULT": 15.11}}, @@ -150,14 +235,14 @@ "CREATE STREAM TEST2 AS SELECT ID, (a + b) AS RESULT FROM TEST;" ], "inputs": [ - {"topic": "test", "value": {"A": "10.01", "B": 5}}, - {"topic": "test", "value": {"A": "10.01", "B": -5}}, - {"topic": "test", "value": {"A": "10.01", "B": 0}} + {"topic": "test", "value": {"A": 10.01, "B": 5}}, + {"topic": "test", "value": {"A": 10.01, "B": -5}}, + {"topic": "test", "value": {"A": 10.01, "B": 0}} ], "outputs": [ - {"topic": "TEST2", "value": {"RESULT": "15.01"}}, - {"topic": "TEST2", "value": {"RESULT": "5.01"}}, - {"topic": "TEST2", "value": {"RESULT": "10.01"}} + {"topic": "TEST2", "value": {"RESULT": 15.01}}, + {"topic": "TEST2", "value": {"RESULT": 5.01}}, + {"topic": "TEST2", "value": {"RESULT": 10.01}} ] }, { @@ -167,10 +252,10 @@ "CREATE STREAM TEST2 AS SELECT ID, (a + a + b) AS RESULT FROM TEST;" ], "inputs": [ - {"topic": "test", "value": {"A": "10.01", "B": "5.10"}} + {"topic": "test", "value": {"A": 10.01, "B": 5.10}} ], "outputs": [ - {"topic": "TEST2", "value": {"RESULT": "25.12"}} + {"topic": "TEST2", "value": {"RESULT": 25.12}} ] }, { @@ -180,14 +265,14 @@ "CREATE STREAM TEST2 AS SELECT ID, (a - b) AS RESULT FROM TEST;" ], "inputs": [ - {"topic": "test", "value": {"A": "10.10", "B": "5.10"}}, - {"topic": "test", "value": {"A": "10.10", "B": "-5.00"}}, - {"topic": "test", "value": {"A": "10.10", "B": "0.00"}} + {"topic": "test", "value": {"A": 10.10, "B": 5.10}}, + {"topic": "test", "value": {"A": 10.10, "B": -5.00}}, + {"topic": "test", "value": {"A": 10.10, "B": 0.00}} ], "outputs": [ - {"topic": "TEST2", "value": {"RESULT": "5.00"}}, - {"topic": "TEST2", "value": {"RESULT": "15.10"}}, - {"topic": "TEST2", "value": {"RESULT": "10.10"}} + {"topic": "TEST2", "value": {"RESULT": 5.00}}, + {"topic": "TEST2", "value": {"RESULT": 15.10}}, + {"topic": "TEST2", "value": {"RESULT": 10.10}} ] }, { @@ -197,14 +282,14 @@ "CREATE STREAM TEST2 AS SELECT ID, (a * b) AS RESULT FROM TEST;" ], "inputs": [ - {"topic": "test", "value": {"A": "10.10", "B": "02.00"}}, - {"topic": "test", "value": {"A": "10.10", "B": "-02.00"}}, - {"topic": "test", "value": {"A": "10.10", "B": "00.00"}} + {"topic": "test", "value": {"A": 10.10, "B": 2.00}}, + {"topic": "test", "value": {"A": 10.10, "B": -2.00}}, + {"topic": "test", "value": {"A": 10.10, "B": 0.00}} ], "outputs": [ - {"topic": "TEST2", "value": {"RESULT": "20.2000"}}, - {"topic": "TEST2", "value": {"RESULT": "-20.2000"}}, - {"topic": "TEST2", "value": {"RESULT": "0.0000"}} + {"topic": "TEST2", "value": {"RESULT": 20.2000}}, + {"topic": "TEST2", "value": {"RESULT": -20.2000}}, + {"topic": "TEST2", "value": {"RESULT": 0.0000}} ] }, { @@ -217,13 +302,13 @@ "The last record causes division by zero, the error is logged and a null value is output" ], "inputs": [ - {"topic": "test", "value": {"A": "10.10", "B": "02.00"}}, - {"topic": "test", "value": {"A": "10.10", "B": "-02.00"}}, - {"topic": "test", "value": {"A": "10.10", "B": "00.00"}} + {"topic": "test", "value": {"A": 10.10, "B": 2.00}}, + {"topic": "test", "value": {"A": 10.10, "B": -2.00}}, + {"topic": "test", "value": {"A": 10.10, "B": 0.00}} ], "outputs": [ - {"topic": "TEST2", "value": {"RESULT": "5.0500000"}}, - {"topic": "TEST2", "value": {"RESULT": "-5.0500000"}}, + {"topic": "TEST2", "value": {"RESULT": 5.0500000}}, + {"topic": "TEST2", "value": {"RESULT": -5.0500000}}, {"topic": "TEST2", "value": {"RESULT": null}} ] }, @@ -237,13 +322,13 @@ "The last record causes modulo by zero, the error is logged and a null value is output" ], "inputs": [ - {"topic": "test", "value": {"A": "10.10", "B": "02.00"}}, - {"topic": "test", "value": {"A": "10.10", "B": "-02.00"}}, - {"topic": "test", "value": {"A": "10.10", "B": "00.00"}} + {"topic": "test", "value": {"A": 10.10, "B": 2.00}}, + {"topic": "test", "value": {"A": 10.10, "B": -2.00}}, + {"topic": "test", "value": {"A": 10.10, "B": 0.00}} ], "outputs": [ - {"topic": "TEST2", "value": {"RESULT": "0.10"}}, - {"topic": "TEST2", "value": {"RESULT": "0.10"}}, + {"topic": "TEST2", "value": {"RESULT": 0.10}}, + {"topic": "TEST2", "value": {"RESULT": 0.10}}, {"topic": "TEST2", "value": {"RESULT": null}} ] }, @@ -254,10 +339,10 @@ "CREATE STREAM TEST2 AS SELECT ID, (a = b) AS RESULT FROM TEST;" ], "inputs": [ - {"topic": "test", "value": {"A": "10.01", "B": "10.01"}}, - {"topic": "test", "value": {"A": "10.01", "B": "12.01"}}, - {"topic": "test", "value": {"A": null, "B": "10.01"}}, - {"topic": "test", "value": {"A": null, "B": null}} + {"topic": "test", "value": {"A": 10.01, "B": 10.01}}, + {"topic": "test", "value": {"A": 10.01, "B": 12.01}}, + {"topic": "test", "value": {"A": null, "B": 10.01}}, + {"topic": "test", "value": {"A": null, "B": null}} ], "outputs": [ {"topic": "TEST2", "value": {"RESULT": true}}, @@ -273,10 +358,10 @@ "CREATE STREAM TEST2 AS SELECT ID, (a <> b) AS RESULT FROM TEST;" ], "inputs": [ - {"topic": "test", "value": {"A": "10.01", "B": "10.01"}}, - {"topic": "test", "value": {"A": "10.01", "B": "12.01"}}, - {"topic": "test", "value": {"A": null, "B": "10.01"}}, - {"topic": "test", "value": {"A": null, "B": null}} + {"topic": "test", "value": {"A": 10.01, "B": 10.01}}, + {"topic": "test", "value": {"A": 10.01, "B": 12.01}}, + {"topic": "test", "value": {"A": null, "B": 10.01}}, + {"topic": "test", "value": {"A": null, "B": null}} ], "outputs": [ {"topic": "TEST2", "value": {"RESULT": false}}, @@ -292,10 +377,10 @@ "CREATE STREAM TEST2 AS SELECT ID, (a IS DISTINCT FROM b) AS RESULT FROM TEST;" ], "inputs": [ - {"topic": "test", "value": {"A": "10.01", "B": "10.01"}}, - {"topic": "test", "value": {"A": "10.01", "B": "12.01"}}, - {"topic": "test", "value": {"A": null, "B": "10.01"}}, - {"topic": "test", "value": {"A": null, "B": null}} + {"topic": "test", "value": {"A": 10.01, "B": 10.01}}, + {"topic": "test", "value": {"A": 10.01, "B": 12.01}}, + {"topic": "test", "value": {"A": null, "B": 10.01}}, + {"topic": "test", "value": {"A": null, "B": null}} ], "outputs": [ {"topic": "TEST2", "value": {"RESULT": false}}, @@ -311,10 +396,10 @@ "CREATE STREAM TEST2 AS SELECT ID, (a < b) AS RESULT FROM TEST;" ], "inputs": [ - {"topic": "test", "value": {"A": "10.01", "B": "10.01"}}, - {"topic": "test", "value": {"A": "10.01", "B": "12.01"}}, - {"topic": "test", "value": {"A": null, "B": "10.01"}}, - {"topic": "test", "value": {"A": null, "B": null}} + {"topic": "test", "value": {"A": 10.01, "B": 10.01}}, + {"topic": "test", "value": {"A": 10.01, "B": 12.01}}, + {"topic": "test", "value": {"A": null, "B": 10.01}}, + {"topic": "test", "value": {"A": null, "B": null}} ], "outputs": [ {"topic": "TEST2", "value": {"RESULT": false}}, @@ -330,10 +415,10 @@ "CREATE STREAM TEST2 AS SELECT ID, (a < b) AS RESULT FROM TEST;" ], "inputs": [ - {"topic": "test", "value": {"A": "10.01", "B": "10.010"}}, - {"topic": "test", "value": {"A": "10.01", "B": "10.012"}}, - {"topic": "test", "value": {"A": null, "B": "10.010"}}, - {"topic": "test", "value": {"A": null, "B": null}} + {"topic": "test", "value": {"A": 10.01, "B": 10.010}}, + {"topic": "test", "value": {"A": 10.01, "B": 10.012}}, + {"topic": "test", "value": {"A": null, "B": 10.010}}, + {"topic": "test", "value": {"A": null, "B": null}} ], "outputs": [ {"topic": "TEST2", "value": {"RESULT": false}}, @@ -349,11 +434,11 @@ "CREATE STREAM TEST2 AS SELECT ID, (a <= b) AS RESULT FROM TEST;" ], "inputs": [ - {"topic": "test", "value": {"A": "10.01", "B": "03.01"}}, - {"topic": "test", "value": {"A": "10.01", "B": "10.01"}}, - {"topic": "test", "value": {"A": "10.01", "B": "12.01"}}, - {"topic": "test", "value": {"A": null, "B": "10.01"}}, - {"topic": "test", "value": {"A": null, "B": null}} + {"topic": "test", "value": {"A": 10.01, "B": 3.01}}, + {"topic": "test", "value": {"A": 10.01, "B": 10.01}}, + {"topic": "test", "value": {"A": 10.01, "B": 12.01}}, + {"topic": "test", "value": {"A": null, "B": 10.01}}, + {"topic": "test", "value": {"A": null, "B": null}} ], "outputs": [ {"topic": "TEST2", "value": {"RESULT": false}}, @@ -370,10 +455,10 @@ "CREATE STREAM TEST2 AS SELECT ID, (a >= b) AS RESULT FROM TEST;" ], "inputs": [ - {"topic": "test", "value": {"A": "10.01", "B": "03.01"}}, - {"topic": "test", "value": {"A": "10.01", "B": "10.01"}}, - {"topic": "test", "value": {"A": "10.01", "B": "12.01"}}, - {"topic": "test", "value": {"A": null, "B": "10.01"}}, + {"topic": "test", "value": {"A": 10.01, "B": 3.01}}, + {"topic": "test", "value": {"A": 10.01, "B": 10.01}}, + {"topic": "test", "value": {"A": 10.01, "B": 12.01}}, + {"topic": "test", "value": {"A": null, "B": 10.01}}, {"topic": "test", "value": {"A": null, "B": null}} ], "outputs": [ @@ -391,10 +476,10 @@ "CREATE STREAM TEST2 AS SELECT ID, (a > b) AS RESULT FROM TEST;" ], "inputs": [ - {"topic": "test", "value": {"A": "10.01", "B": "03.01"}}, - {"topic": "test", "value": {"A": "10.01", "B": "10.01"}}, - {"topic": "test", "value": {"A": "10.01", "B": "12.01"}}, - {"topic": "test", "value": {"A": null, "B": "10.01"}}, + {"topic": "test", "value": {"A": 10.01, "B": 3.01}}, + {"topic": "test", "value": {"A": 10.01, "B": 10.01}}, + {"topic": "test", "value": {"A": 10.01, "B": 12.01}}, + {"topic": "test", "value": {"A": null, "B": 10.01}}, {"topic": "test", "value": {"A": null, "B": null}} ], "outputs": [ @@ -412,10 +497,10 @@ "CREATE STREAM TEST2 AS SELECT ID, (a < b) AS RESULT FROM TEST;" ], "inputs": [ - {"topic": "test", "value": {"A": "10.01", "B": 1}}, - {"topic": "test", "value": {"A": "10.01", "B": 12}}, - {"topic": "test", "value": {"A": null, "B": 12}}, - {"topic": "test", "value": {"A": null, "B": null}} + {"topic": "test", "value": {"A": 10.01, "B": 1}}, + {"topic": "test", "value": {"A": 10.01, "B": 12}}, + {"topic": "test", "value": {"A": null, "B": 12}}, + {"topic": "test", "value": {"A": null, "B": null}} ], "outputs": [ {"topic": "TEST2", "value": {"RESULT": false}},