Skip to content

Commit

Permalink
fix: bug preventing decimals in (-1, 1) from being inserted / queried
Browse files Browse the repository at this point in the history
  • Loading branch information
maheshba committed Feb 5, 2022
1 parent 6b65cd4 commit ad50e80
Show file tree
Hide file tree
Showing 7 changed files with 367 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,24 @@ public static SqlType fromValue(final BigDecimal value) {
? value.setScale(0, BigDecimal.ROUND_UNNECESSARY)
: value;

/* When a BigDecimal has value 0, the built-in method precision() always returns 1. To account
for this edge case, we just take the scale and add one and use that for the precision instead.
*/
if (decimal.compareTo(BigDecimal.ZERO) == 0) {
/* We can't use BigDecimal.precision() directly for all cases, since it defines
* precision differently from SQL Decimal.
* In particular, if the decimal is between -0.1 and 0.1, BigDecimal precision can be
* lower than scale, which is disallowed in SQL Decimal. For example, 0.005 in
* BigDecimal has a precision,scale of 1,3; whereas we expect 4,3.
* If the decimal is in (-1,1) but outside (-0.1,0.1), the code doesn't throw, but
* gives lower precision than expected (e.g., 0.8 has precision 1 instead of 2).
* To account for this edge case, we just take the scale and add one and use that
* for the precision instead. This works since BigDecimal defines scale as the
* number of digits to the right of the period; which is one lower than the precision for
* anything in the range (-1, 1).
* This covers the case where BigDecimal has a value of 0.
* Note: This solution differs from the SQL definition in that it returns (4, 3) for
* both "0.005" and ".005", whereas SQL expects (3, 3) for the latter. This is unavoidable
* if we use BigDecimal as an intermediate representation, since the two strings are parsed
* identically by it to have precision 1.
*/
if (decimal.compareTo(BigDecimal.ONE) < 0 && decimal.compareTo(BigDecimal.ONE.negate()) > 0) {
return SqlTypes.decimal(decimal.scale() + 1, decimal.scale());
}
return SqlTypes.decimal(decimal.precision(), Math.max(decimal.scale(), 0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,12 @@ public void shouldGetSchemaFromDecimal2_2() {
// When:
final SqlType schema = DecimalUtil.fromValue(new BigDecimal(".12"));

// Note: this behavior is different from the SQL specification, where
// we expect precision = 2, scale = 2. This difference is because we use
// BigDecimal in our implementation, which treats precision differently.

// Then:
assertThat(schema, is(SqlTypes.decimal(2, 2)));
assertThat(schema, is(SqlTypes.decimal(3, 2)));
}

@Test
Expand Down Expand Up @@ -386,6 +390,15 @@ public void shouldGetSchemaFromDecimal10_5() {
assertThat(schema, is(SqlTypes.decimal(10, 5)));
}

@Test
public void shouldGetSchemaFromDecimal4_3() {
// When:
final SqlType schema = DecimalUtil.fromValue(new BigDecimal("0.005"));

// Then:
assertThat(schema, is(SqlTypes.decimal(4, 3)));
}

@Test
public void shouldFailIfBuilderWithZeroPrecision() {
// When:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (ID STRING KEY, DEC DECIMAL(4, 2)) WITH (KAFKA_TOPIC='test', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`ID` STRING KEY, `DEC` DECIMAL(4, 2)",
"topicName" : "test",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"orReplace" : false,
"isSource" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST2 AS SELECT *\nFROM TEST TEST\nWHERE ((TEST.DEC < 0.08) AND (TEST.DEC > -0.08))\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST2",
"schema" : "`ID` STRING KEY, `DEC` DECIMAL(4, 2)",
"topicName" : "TEST2",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"orReplace" : false,
"isSource" : false
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "TEST2",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "TEST2"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamFilterV1",
"properties" : {
"queryContext" : "WhereFilter"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"sourceSchema" : "`ID` STRING KEY, `DEC` DECIMAL(4, 2)",
"pseudoColumnVersion" : 1
},
"filterExpression" : "((DEC < 0.08) AND (DEC > -0.08))"
},
"keyColumnNames" : [ "ID" ],
"selectExpressions" : [ "DEC AS DEC" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"topicName" : "TEST2"
},
"queryId" : "CSAS_TEST2_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"metric.reporters" : "",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.connect.basic.auth.credentials.reload" : "false",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.stream.enabled" : "true",
"ksql.query.push.v2.interpreter.enabled" : "true",
"ksql.queryanonymizer.logs_enabled" : "true",
"ksql.variable.substitution.enable" : "true",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.query.pull.metrics.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.query.push.v2.alos.enabled" : "true",
"ksql.query.push.v2.max.hourly.bandwidth.megabytes" : "2147483647",
"ksql.query.pull.range.scan.enabled" : "true",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.lambdas.enabled" : "true",
"ksql.source.table.materialization.enabled" : "true",
"ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.persistence.wrap.single.values" : null,
"ksql.query.transient.max.bytes.buffering.total" : "-1",
"ksql.connect.basic.auth.credentials.source" : "NONE",
"ksql.schema.registry.url" : "schema_registry.url:0",
"ksql.properties.overrides.denylist" : "",
"ksql.service.id" : "some.ksql.service.id",
"ksql.query.push.v2.max.catchup.consumers" : "5",
"ksql.query.push.v2.enabled" : "false",
"ksql.rowpartition.rowoffset.enabled" : "true",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "true",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.endpoint.migrate.query" : "true",
"ksql.query.push.v2.registry.installed" : "false",
"ksql.streams.num.stream.threads" : "4",
"ksql.metrics.tags.custom" : "",
"ksql.query.push.v2.catchup.consumer.msg.window" : "50",
"ksql.runtime.feature.shared.enabled" : "false",
"ksql.udf.collect.metrics" : "false",
"ksql.connect.request.headers.plugin" : null,
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.headers.columns.enabled" : "true",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.connect.request.timeout.ms" : "5000",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.persistence.default.format.key" : "KAFKA",
"ksql.query.persistent.max.bytes.buffering.total" : "-1",
"ksql.query.error.max.queue.size" : "10",
"ksql.query.cleanup.shutdown.timeout.ms" : "30000",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.queryanonymizer.cluster_namespace" : null,
"ksql.create.or.replace.enabled" : "true",
"ksql.shared.runtimes.count" : "8",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.query.pull.consistency.token.enabled" : "false",
"ksql.suppress.enabled" : "false",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.connect.basic.auth.credentials.file" : "",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.query.pull.max.concurrent.requests" : "2147483647",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.query.push.v2.new.latest.delay.ms" : "5000",
"ksql.query.push.v2.latest.reset.age.ms" : "30000",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.query.pull.interpreter.enabled" : "true",
"ksql.query.pull.limit.clause.enabled" : "true",
"ksql.connect.error.handler" : null,
"ksql.query.pull.router.thread.pool.size" : "50",
"ksql.query.push.v2.continuation.tokens.enabled" : "false",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.persistence.default.format.value" : null,
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.nested.error.set.null" : "true",
"ksql.query.pull.thread.pool.size" : "50",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
{
"version" : "7.2.0",
"timestamp" : 1644051383155,
"path" : "query-validation-tests/decimal.json",
"schemas" : {
"CSAS_TEST2_0.TEST2" : {
"schema" : "`ID` STRING KEY, `DEC` DECIMAL(4, 2)",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"CSAS_TEST2_0.KsqlTopic.Source" : {
"schema" : "`ID` STRING KEY, `DEC` DECIMAL(4, 2)",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
}
},
"testCase" : {
"name" : "decimal between -1 and 1",
"inputs" : [ {
"topic" : "test",
"key" : null,
"value" : "0.05"
}, {
"topic" : "test",
"key" : null,
"value" : "0.55"
}, {
"topic" : "test",
"key" : null,
"value" : "-0.5"
} ],
"outputs" : [ {
"topic" : "TEST2",
"key" : null,
"value" : "0.05"
} ],
"topics" : [ {
"name" : "TEST2",
"replicas" : 1,
"numPartitions" : 4
}, {
"name" : "test",
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM TEST (ID STRING KEY, dec DECIMAL(4,2)) WITH (kafka_topic='test', value_format='DELIMITED');", "CREATE STREAM TEST2 AS SELECT * FROM TEST WHERE dec < 0.08 AND dec > -0.08;" ],
"post" : {
"sources" : [ {
"name" : "TEST",
"type" : "STREAM",
"schema" : "`ID` STRING KEY, `DEC` DECIMAL(4, 2)",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : "DELIMITED",
"keyFeatures" : [ ],
"valueFeatures" : [ ],
"isSource" : false
}, {
"name" : "TEST2",
"type" : "STREAM",
"schema" : "`ID` STRING KEY, `DEC` DECIMAL(4, 2)",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : "DELIMITED",
"keyFeatures" : [ ],
"valueFeatures" : [ ],
"isSource" : false
} ],
"topics" : {
"topics" : [ {
"name" : "test",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
},
"partitions" : 4
}, {
"name" : "TEST2",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
},
"partitions" : 4
} ]
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [test])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> WhereFilter
<-- KSTREAM-SOURCE-0000000000
Processor: WhereFilter (stores: [])
--> Project
<-- KSTREAM-TRANSFORMVALUES-0000000001
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000004
<-- WhereFilter
Sink: KSTREAM-SINK-0000000004 (topic: TEST2)
<-- Project

Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,21 @@
{"topic": "TEST2", "value": {"RESULT": false}},
{"topic": "TEST2", "value": {"RESULT": false}}
]
},
{
"name": "decimal between -1 and 1",
"statements": [
"CREATE STREAM TEST (ID STRING KEY, dec DECIMAL(4,2)) WITH (kafka_topic='test', value_format='DELIMITED');",
"CREATE STREAM TEST2 AS SELECT * FROM TEST WHERE dec < 0.08 AND dec > -0.08;"
],
"inputs": [
{"topic": "test", "value": "0.05"},
{"topic": "test", "value": "0.55"},
{"topic": "test", "value": "-0.5"}
],
"outputs": [
{"topic": "TEST2", "value": "0.05"}
]
}
]
}
Loading

0 comments on commit ad50e80

Please sign in to comment.