diff --git a/docs/developer-guide/ksqldb-reference/scalar-functions.md b/docs/developer-guide/ksqldb-reference/scalar-functions.md index f326bfd435a1..838520b12bca 100644 --- a/docs/developer-guide/ksqldb-reference/scalar-functions.md +++ b/docs/developer-guide/ksqldb-reference/scalar-functions.md @@ -1318,6 +1318,28 @@ TIMESUB(unit, interval, COL0) Subtracts an interval from a time. Intervals are defined by an integer value and a supported [time unit](../../reference/sql/time.md#Time units). +### DATEADD + +Since: 0.20 + +```sql +DATEADD(unit, interval, COL0) +``` + +Adds an interval to a date. Intervals are defined by an integer value and a supported +[time unit](../../reference/sql/time.md#Time units). + +### DATESUB + +Since: 0.20 + +```sql +DATESUB(unit, interval, COL0) +``` + +Subtracts an interval from a date. Intervals are defined by an integer value and a supported +[time unit](../../reference/sql/time.md#Time units). + ## URLs !!! note diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/DateAdd.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/DateAdd.java new file mode 100644 index 000000000000..af587f3da5a3 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/DateAdd.java @@ -0,0 +1,47 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.datetime; + +import io.confluent.ksql.function.FunctionCategory; +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; +import io.confluent.ksql.util.KsqlConstants; +import java.sql.Date; +import java.util.concurrent.TimeUnit; + +@UdfDescription( + name = "dateadd", + category = FunctionCategory.DATE_TIME, + author = KsqlConstants.CONFLUENT_AUTHOR, + description = "Adds a duration to a DATE value." +) +public class DateAdd { + + @Udf(description = "Adds a duration to a date") + public Date dateAdd( + @UdfParameter(description = "A unit of time, for example DAY") final TimeUnit unit, + @UdfParameter(description = "An integer number of intervals to add") final Integer interval, + @UdfParameter(description = "A DATE value.") final Date date + ) { + if (unit == null || interval == null || date == null) { + return null; + } + final long epochDayResult = + TimeUnit.MILLISECONDS.toDays(date.getTime() + unit.toMillis(interval)); + return new Date(TimeUnit.DAYS.toMillis(epochDayResult)); + } +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/DateSub.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/DateSub.java new file mode 100644 index 000000000000..623184d2f3bb --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/DateSub.java @@ -0,0 +1,48 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.datetime; + +import io.confluent.ksql.function.FunctionCategory; +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; +import io.confluent.ksql.util.KsqlConstants; +import java.sql.Date; +import java.util.concurrent.TimeUnit; + +@UdfDescription( + name = "datesub", + category = FunctionCategory.DATE_TIME, + author = KsqlConstants.CONFLUENT_AUTHOR, + description = "Subtracts a duration from a DATE value." +) +public class DateSub { + + @Udf(description = "Subtracts a duration from a date") + public Date dateSub( + @UdfParameter(description = "A unit of time, for example DAY") final TimeUnit unit, + @UdfParameter( + description = "An integer number of intervals to subtract") final Integer interval, + @UdfParameter(description = "A DATE value.") final Date date + ) { + if (unit == null || interval == null || date == null) { + return null; + } + final long epochDayResult = + TimeUnit.MILLISECONDS.toDays(date.getTime() - unit.toMillis(interval)); + return new Date(TimeUnit.DAYS.toMillis(epochDayResult)); + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/DateAddTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/DateAddTest.java new file mode 100644 index 000000000000..f93d9598efc0 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/DateAddTest.java @@ -0,0 +1,48 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.datetime; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertNull; + +import java.sql.Date; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; + +public class DateAddTest { + private DateAdd udf; + + @Before + public void setUp() { + udf = new DateAdd(); + } + + @Test + public void shouldAddToDate() { + assertThat(udf.dateAdd(TimeUnit.DAYS, 9, new Date(86400000)), is(new Date(864000000))); + assertThat(udf.dateAdd(TimeUnit.DAYS, -1, new Date(86400000)), is(new Date(0))); + assertThat(udf.dateAdd(TimeUnit.SECONDS, 5, new Date(86400000)), is(new Date(86400000))); + } + + @Test + public void handleNulls() { + assertNull(udf.dateAdd(TimeUnit.DAYS, -300, null)); + assertNull(udf.dateAdd(null, 54, new Date(864000000))); + assertNull(udf.dateAdd(TimeUnit.DAYS, null, new Date(864000000))); + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/DateSubTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/DateSubTest.java new file mode 100644 index 000000000000..d60dc8d3c1dc --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/DateSubTest.java @@ -0,0 +1,48 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.datetime; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertNull; + +import java.sql.Date; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; + +public class DateSubTest { + private DateSub udf; + + @Before + public void setUp() { + udf = new DateSub(); + } + + @Test + public void shouldAddToDate() { + assertThat(udf.dateSub(TimeUnit.DAYS, 11, new Date(86400000)), is(new Date(-864000000))); + assertThat(udf.dateSub(TimeUnit.DAYS, -1, new Date(86400000)), is(new Date(172800000))); + assertThat(udf.dateSub(TimeUnit.SECONDS, 5, new Date(86400000)), is(new Date(0))); + } + + @Test + public void handleNulls() { + assertNull(udf.dateSub(TimeUnit.DAYS, -300, null)); + assertNull(udf.dateSub(null, 54, new Date(864000000))); + assertNull(udf.dateSub(TimeUnit.DAYS, null, new Date(864000000))); + } +} diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_adds/7.0.0_1624983633824/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_adds/7.0.0_1624983633824/plan.json new file mode 100644 index 000000000000..6e0252e2d75d --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_adds/7.0.0_1624983633824/plan.json @@ -0,0 +1,153 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID STRING KEY, DATE DATE) WITH (KAFKA_TOPIC='test', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` STRING KEY, `DATE` DATE", + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST2 AS SELECT\n TEST.ID ID,\n DATEADD(DAYS, 10, TEST.DATE) KSQL_COL_0\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST2", + "schema" : "`ID` STRING KEY, `KSQL_COL_0` DATE", + "topicName" : "TEST2", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TEST2", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TEST2" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`ID` STRING KEY, `DATE` DATE" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "DATEADD(DAYS, 10, DATE) AS KSQL_COL_0" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "TEST2" + }, + "queryId" : "CSAS_TEST2_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_adds/7.0.0_1624983633824/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_adds/7.0.0_1624983633824/spec.json new file mode 100644 index 000000000000..3ab4d8edd139 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_adds/7.0.0_1624983633824/spec.json @@ -0,0 +1,92 @@ +{ + "version" : "7.0.0", + "timestamp" : 1624983633824, + "path" : "query-validation-tests/dateadd.json", + "schemas" : { + "CSAS_TEST2_0.TEST2" : { + "schema" : "`ID` STRING KEY, `KSQL_COL_0` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "CSAS_TEST2_0.KsqlTopic.Source" : { + "schema" : "`ID` STRING KEY, `DATE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + } + }, + "testCase" : { + "name" : "adds", + "inputs" : [ { + "topic" : "test", + "key" : null, + "value" : "10" + } ], + "outputs" : [ { + "topic" : "TEST2", + "key" : null, + "value" : "20" + } ], + "topics" : [ { + "name" : "TEST2", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID STRING KEY, date DATE) WITH (kafka_topic='test', value_format='DELIMITED');", "CREATE STREAM TEST2 AS SELECT id, dateadd(DAYS, 10, date) FROM TEST;" ], + "post" : { + "sources" : [ { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` STRING KEY, `DATE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST2", + "type" : "STREAM", + "schema" : "`ID` STRING KEY, `KSQL_COL_0` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "TEST2", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_adds/7.0.0_1624983633824/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_adds/7.0.0_1624983633824/topology new file mode 100644 index 000000000000..e080cd675668 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_adds/7.0.0_1624983633824/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TEST2) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_adds_negative_intervals/7.0.0_1624983633854/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_adds_negative_intervals/7.0.0_1624983633854/plan.json new file mode 100644 index 000000000000..e1bf7a5d5066 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_adds_negative_intervals/7.0.0_1624983633854/plan.json @@ -0,0 +1,153 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID INTEGER KEY, DATE DATE) WITH (KAFKA_TOPIC='test', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` INTEGER KEY, `DATE` DATE", + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST2 AS SELECT\n TEST.ID ID,\n DATEADD(DAYS, -5, TEST.DATE) VALUE\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST2", + "schema" : "`ID` INTEGER KEY, `VALUE` DATE", + "topicName" : "TEST2", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TEST2", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TEST2" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "sourceSchema" : "`ID` INTEGER KEY, `DATE` DATE" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "DATEADD(DAYS, -5, DATE) AS VALUE" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "topicName" : "TEST2" + }, + "queryId" : "CSAS_TEST2_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_adds_negative_intervals/7.0.0_1624983633854/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_adds_negative_intervals/7.0.0_1624983633854/spec.json new file mode 100644 index 000000000000..d5ffd1fb2097 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_adds_negative_intervals/7.0.0_1624983633854/spec.json @@ -0,0 +1,166 @@ +{ + "version" : "7.0.0", + "timestamp" : 1624983633854, + "path" : "query-validation-tests/dateadd.json", + "schemas" : { + "CSAS_TEST2_0.TEST2" : { + "schema" : "`ID` INTEGER KEY, `VALUE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CSAS_TEST2_0.KsqlTopic.Source" : { + "schema" : "`ID` INTEGER KEY, `DATE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "testCase" : { + "name" : "adds negative intervals", + "inputs" : [ { + "topic" : "test", + "key" : 0, + "value" : { + "date" : 5 + } + }, { + "topic" : "test", + "key" : 0, + "value" : { + "date" : 10 + } + }, { + "topic" : "test", + "key" : 0, + "value" : { + "date" : 2 + } + } ], + "outputs" : [ { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : 0 + } + }, { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : 5 + } + }, { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : -3 + } + } ], + "topics" : [ { + "name" : "TEST2", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test", + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "DATE", + "type" : [ "null", { + "type" : "int", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Date", + "logicalType" : "date" + } ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + }, + "valueFormat" : "AVRO", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID INT KEY, date DATE) WITH (kafka_topic='test', value_format='AVRO');", "CREATE STREAM TEST2 AS SELECT id, dateadd(DAYS, -5, date) AS VALUE FROM TEST;" ], + "post" : { + "sources" : [ { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` INTEGER KEY, `DATE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST2", + "type" : "STREAM", + "schema" : "`ID` INTEGER KEY, `VALUE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "DATE", + "type" : [ "null", { + "type" : "int", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Date", + "logicalType" : "date" + } ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + } + }, { + "name" : "TEST2", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", { + "type" : "int", + "logicalType" : "date" + } ], + "default" : null + } ] + } + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_adds_negative_intervals/7.0.0_1624983633854/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_adds_negative_intervals/7.0.0_1624983633854/topology new file mode 100644 index 000000000000..e080cd675668 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_adds_negative_intervals/7.0.0_1624983633854/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TEST2) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_handles_null_values/7.0.0_1624983633877/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_handles_null_values/7.0.0_1624983633877/plan.json new file mode 100644 index 000000000000..0c4025ef83e7 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_handles_null_values/7.0.0_1624983633877/plan.json @@ -0,0 +1,153 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID INTEGER KEY, DATE DATE, NUM INTEGER) WITH (KAFKA_TOPIC='test', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` INTEGER KEY, `DATE` DATE, `NUM` INTEGER", + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST2 AS SELECT\n TEST.ID ID,\n DATEADD(MILLISECONDS, TEST.NUM, TEST.DATE) VALUE\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST2", + "schema" : "`ID` INTEGER KEY, `VALUE` DATE", + "topicName" : "TEST2", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TEST2", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TEST2" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "sourceSchema" : "`ID` INTEGER KEY, `DATE` DATE, `NUM` INTEGER" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "DATEADD(MILLISECONDS, NUM, DATE) AS VALUE" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "topicName" : "TEST2" + }, + "queryId" : "CSAS_TEST2_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_handles_null_values/7.0.0_1624983633877/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_handles_null_values/7.0.0_1624983633877/spec.json new file mode 100644 index 000000000000..c03f113d6a0d --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_handles_null_values/7.0.0_1624983633877/spec.json @@ -0,0 +1,177 @@ +{ + "version" : "7.0.0", + "timestamp" : 1624983633877, + "path" : "query-validation-tests/dateadd.json", + "schemas" : { + "CSAS_TEST2_0.TEST2" : { + "schema" : "`ID` INTEGER KEY, `VALUE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CSAS_TEST2_0.KsqlTopic.Source" : { + "schema" : "`ID` INTEGER KEY, `DATE` DATE, `NUM` INTEGER", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "testCase" : { + "name" : "handles null values", + "inputs" : [ { + "topic" : "test", + "key" : 0, + "value" : { + "date" : null, + "num" : 5 + } + }, { + "topic" : "test", + "key" : 0, + "value" : { + "date" : 5, + "num" : null + } + }, { + "topic" : "test", + "key" : 0, + "value" : { + "date" : null, + "num" : null + } + } ], + "outputs" : [ { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : null + } + }, { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : null + } + }, { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : null + } + } ], + "topics" : [ { + "name" : "TEST2", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test", + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "DATE", + "type" : [ "null", { + "type" : "int", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Date", + "logicalType" : "date" + } ], + "default" : null + }, { + "name" : "NUM", + "type" : [ "null", "int" ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + }, + "valueFormat" : "AVRO", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID INT KEY, date DATE, num INTEGER) WITH (kafka_topic='test', value_format='AVRO');", "CREATE STREAM TEST2 AS SELECT id, dateadd(MILLISECONDS, num, date) AS VALUE FROM TEST;" ], + "post" : { + "sources" : [ { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` INTEGER KEY, `DATE` DATE, `NUM` INTEGER", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST2", + "type" : "STREAM", + "schema" : "`ID` INTEGER KEY, `VALUE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "DATE", + "type" : [ "null", { + "type" : "int", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Date", + "logicalType" : "date" + } ], + "default" : null + }, { + "name" : "NUM", + "type" : [ "null", "int" ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + } + }, { + "name" : "TEST2", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", { + "type" : "int", + "logicalType" : "date" + } ], + "default" : null + } ] + } + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_handles_null_values/7.0.0_1624983633877/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_handles_null_values/7.0.0_1624983633877/topology new file mode 100644 index 000000000000..e080cd675668 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/dateadd_-_handles_null_values/7.0.0_1624983633877/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TEST2) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_handles_null_values/7.0.0_1624983633967/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_handles_null_values/7.0.0_1624983633967/plan.json new file mode 100644 index 000000000000..518e6361ef32 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_handles_null_values/7.0.0_1624983633967/plan.json @@ -0,0 +1,153 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID INTEGER KEY, DATE DATE, NUM INTEGER) WITH (KAFKA_TOPIC='test', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` INTEGER KEY, `DATE` DATE, `NUM` INTEGER", + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST2 AS SELECT\n TEST.ID ID,\n DATESUB(MILLISECONDS, TEST.NUM, TEST.DATE) VALUE\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST2", + "schema" : "`ID` INTEGER KEY, `VALUE` DATE", + "topicName" : "TEST2", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TEST2", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TEST2" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "sourceSchema" : "`ID` INTEGER KEY, `DATE` DATE, `NUM` INTEGER" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "DATESUB(MILLISECONDS, NUM, DATE) AS VALUE" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "topicName" : "TEST2" + }, + "queryId" : "CSAS_TEST2_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_handles_null_values/7.0.0_1624983633967/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_handles_null_values/7.0.0_1624983633967/spec.json new file mode 100644 index 000000000000..4dc3b2c47b9a --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_handles_null_values/7.0.0_1624983633967/spec.json @@ -0,0 +1,177 @@ +{ + "version" : "7.0.0", + "timestamp" : 1624983633967, + "path" : "query-validation-tests/datesub.json", + "schemas" : { + "CSAS_TEST2_0.TEST2" : { + "schema" : "`ID` INTEGER KEY, `VALUE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CSAS_TEST2_0.KsqlTopic.Source" : { + "schema" : "`ID` INTEGER KEY, `DATE` DATE, `NUM` INTEGER", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "testCase" : { + "name" : "handles null values", + "inputs" : [ { + "topic" : "test", + "key" : 0, + "value" : { + "date" : null, + "num" : 5 + } + }, { + "topic" : "test", + "key" : 0, + "value" : { + "date" : 5, + "num" : null + } + }, { + "topic" : "test", + "key" : 0, + "value" : { + "date" : null, + "num" : null + } + } ], + "outputs" : [ { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : null + } + }, { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : null + } + }, { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : null + } + } ], + "topics" : [ { + "name" : "TEST2", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test", + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "DATE", + "type" : [ "null", { + "type" : "int", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Date", + "logicalType" : "date" + } ], + "default" : null + }, { + "name" : "NUM", + "type" : [ "null", "int" ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + }, + "valueFormat" : "AVRO", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID INT KEY, date DATE, num INTEGER) WITH (kafka_topic='test', value_format='AVRO');", "CREATE STREAM TEST2 AS SELECT id, datesub(MILLISECONDS, num, date) AS VALUE FROM TEST;" ], + "post" : { + "sources" : [ { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` INTEGER KEY, `DATE` DATE, `NUM` INTEGER", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST2", + "type" : "STREAM", + "schema" : "`ID` INTEGER KEY, `VALUE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "DATE", + "type" : [ "null", { + "type" : "int", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Date", + "logicalType" : "date" + } ], + "default" : null + }, { + "name" : "NUM", + "type" : [ "null", "int" ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + } + }, { + "name" : "TEST2", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", { + "type" : "int", + "logicalType" : "date" + } ], + "default" : null + } ] + } + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_handles_null_values/7.0.0_1624983633967/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_handles_null_values/7.0.0_1624983633967/topology new file mode 100644 index 000000000000..e080cd675668 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_handles_null_values/7.0.0_1624983633967/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TEST2) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_subtracts/7.0.0_1624983633922/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_subtracts/7.0.0_1624983633922/plan.json new file mode 100644 index 000000000000..92bc0544b591 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_subtracts/7.0.0_1624983633922/plan.json @@ -0,0 +1,153 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID STRING KEY, DATE DATE) WITH (KAFKA_TOPIC='test', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` STRING KEY, `DATE` DATE", + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST2 AS SELECT\n TEST.ID ID,\n DATESUB(DAYS, 10, TEST.DATE) KSQL_COL_0\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST2", + "schema" : "`ID` STRING KEY, `KSQL_COL_0` DATE", + "topicName" : "TEST2", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TEST2", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TEST2" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`ID` STRING KEY, `DATE` DATE" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "DATESUB(DAYS, 10, DATE) AS KSQL_COL_0" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "TEST2" + }, + "queryId" : "CSAS_TEST2_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_subtracts/7.0.0_1624983633922/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_subtracts/7.0.0_1624983633922/spec.json new file mode 100644 index 000000000000..c32ca0f76da8 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_subtracts/7.0.0_1624983633922/spec.json @@ -0,0 +1,100 @@ +{ + "version" : "7.0.0", + "timestamp" : 1624983633922, + "path" : "query-validation-tests/datesub.json", + "schemas" : { + "CSAS_TEST2_0.TEST2" : { + "schema" : "`ID` STRING KEY, `KSQL_COL_0` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "CSAS_TEST2_0.KsqlTopic.Source" : { + "schema" : "`ID` STRING KEY, `DATE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + } + }, + "testCase" : { + "name" : "subtracts", + "inputs" : [ { + "topic" : "test", + "key" : null, + "value" : "40" + }, { + "topic" : "test", + "key" : null, + "value" : "5" + } ], + "outputs" : [ { + "topic" : "TEST2", + "key" : null, + "value" : "30" + }, { + "topic" : "TEST2", + "key" : null, + "value" : "\"-5\"" + } ], + "topics" : [ { + "name" : "TEST2", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID STRING KEY, date DATE) WITH (kafka_topic='test', value_format='DELIMITED');", "CREATE STREAM TEST2 AS SELECT id, datesub(DAYS, 10, date) FROM TEST;" ], + "post" : { + "sources" : [ { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` STRING KEY, `DATE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST2", + "type" : "STREAM", + "schema" : "`ID` STRING KEY, `KSQL_COL_0` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "TEST2", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_subtracts/7.0.0_1624983633922/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_subtracts/7.0.0_1624983633922/topology new file mode 100644 index 000000000000..e080cd675668 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_subtracts/7.0.0_1624983633922/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TEST2) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_subtracts_negative_intervals/7.0.0_1624983633943/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_subtracts_negative_intervals/7.0.0_1624983633943/plan.json new file mode 100644 index 000000000000..5fff35871eeb --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_subtracts_negative_intervals/7.0.0_1624983633943/plan.json @@ -0,0 +1,153 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID INTEGER KEY, DATE DATE) WITH (KAFKA_TOPIC='test', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` INTEGER KEY, `DATE` DATE", + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST2 AS SELECT\n TEST.ID ID,\n DATESUB(DAYS, -5, TEST.DATE) VALUE\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST2", + "schema" : "`ID` INTEGER KEY, `VALUE` DATE", + "topicName" : "TEST2", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TEST2", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TEST2" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "sourceSchema" : "`ID` INTEGER KEY, `DATE` DATE" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "DATESUB(DAYS, -5, DATE) AS VALUE" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "topicName" : "TEST2" + }, + "queryId" : "CSAS_TEST2_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_subtracts_negative_intervals/7.0.0_1624983633943/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_subtracts_negative_intervals/7.0.0_1624983633943/spec.json new file mode 100644 index 000000000000..6c5f548f2264 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_subtracts_negative_intervals/7.0.0_1624983633943/spec.json @@ -0,0 +1,154 @@ +{ + "version" : "7.0.0", + "timestamp" : 1624983633943, + "path" : "query-validation-tests/datesub.json", + "schemas" : { + "CSAS_TEST2_0.TEST2" : { + "schema" : "`ID` INTEGER KEY, `VALUE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CSAS_TEST2_0.KsqlTopic.Source" : { + "schema" : "`ID` INTEGER KEY, `DATE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "testCase" : { + "name" : "subtracts negative intervals", + "inputs" : [ { + "topic" : "test", + "key" : 0, + "value" : { + "date" : 5 + } + }, { + "topic" : "test", + "key" : 0, + "value" : { + "date" : -5 + } + } ], + "outputs" : [ { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : 10 + } + }, { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : 0 + } + } ], + "topics" : [ { + "name" : "TEST2", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test", + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "DATE", + "type" : [ "null", { + "type" : "int", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Date", + "logicalType" : "date" + } ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + }, + "valueFormat" : "AVRO", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID INT KEY, date DATE) WITH (kafka_topic='test', value_format='AVRO');", "CREATE STREAM TEST2 AS SELECT id, datesub(DAYS, -5, date) AS VALUE FROM TEST;" ], + "post" : { + "sources" : [ { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` INTEGER KEY, `DATE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST2", + "type" : "STREAM", + "schema" : "`ID` INTEGER KEY, `VALUE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "DATE", + "type" : [ "null", { + "type" : "int", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Date", + "logicalType" : "date" + } ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + } + }, { + "name" : "TEST2", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", { + "type" : "int", + "logicalType" : "date" + } ], + "default" : null + } ] + } + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_subtracts_negative_intervals/7.0.0_1624983633943/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_subtracts_negative_intervals/7.0.0_1624983633943/topology new file mode 100644 index 000000000000..e080cd675668 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/datesub_-_subtracts_negative_intervals/7.0.0_1624983633943/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TEST2) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/dateadd.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/dateadd.json new file mode 100644 index 000000000000..e769e5a01e83 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/dateadd.json @@ -0,0 +1,63 @@ +{ + "comments": ["tests for dateadd"], + "tests": [ + { + "name": "adds", + "statements": [ + "CREATE STREAM TEST (ID STRING KEY, date DATE) WITH (kafka_topic='test', value_format='DELIMITED');", + "CREATE STREAM TEST2 AS SELECT id, dateadd(DAYS, 10, date) FROM TEST;" + ], + "inputs": [ + {"topic": "test", "value": "10"} + ], + "outputs": [ + {"topic": "TEST2", "value": "20"} + ] + }, + { + "name": "throws on incorrect type", + "statements": [ + "CREATE STREAM TEST (ID STRING KEY, date DATE) WITH (kafka_topic='test', value_format='DELIMITED');", + "CREATE STREAM TEST2 AS SELECT id, dateadd(date, 5, date) FROM TEST;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Function 'dateadd' does not accept parameters (DATE, INTEGER, DATE)." + } + }, + { + "name": "adds negative intervals", + "statements": [ + "CREATE STREAM TEST (ID INT KEY, date DATE) WITH (kafka_topic='test', value_format='AVRO');", + "CREATE STREAM TEST2 AS SELECT id, dateadd(DAYS, -5, date) AS VALUE FROM TEST;" + ], + "inputs": [ + {"topic": "test", "key": 0, "value": {"date": 5}}, + {"topic": "test", "key": 0, "value": {"date": 10}}, + {"topic": "test", "key": 0, "value": {"date": 2}} + ], + "outputs": [ + {"topic": "TEST2", "key": 0, "value": {"VALUE": 0}}, + {"topic": "TEST2", "key": 0, "value": {"VALUE": 5}}, + {"topic": "TEST2", "key": 0, "value": {"VALUE": -3}} + ] + }, + { + "name": "handles null values", + "statements": [ + "CREATE STREAM TEST (ID INT KEY, date DATE, num INTEGER) WITH (kafka_topic='test', value_format='AVRO');", + "CREATE STREAM TEST2 AS SELECT id, dateadd(MILLISECONDS, num, date) AS VALUE FROM TEST;" + ], + "inputs": [ + {"topic": "test", "key": 0, "value": {"date": null, "num": 5}}, + {"topic": "test", "key": 0, "value": {"date": 5, "num": null}}, + {"topic": "test", "key": 0, "value": {"date": null, "num": null}} + ], + "outputs": [ + {"topic": "TEST2", "key": 0, "value": {"VALUE": null}}, + {"topic": "TEST2", "key": 0, "value": {"VALUE": null}}, + {"topic": "TEST2", "key": 0, "value": {"VALUE": null}} + ] + } + ] +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/datesub.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/datesub.json new file mode 100644 index 000000000000..5d750831718a --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/datesub.json @@ -0,0 +1,63 @@ +{ + "comments": ["tests for datesub"], + "tests": [ + { + "name": "subtracts", + "statements": [ + "CREATE STREAM TEST (ID STRING KEY, date DATE) WITH (kafka_topic='test', value_format='DELIMITED');", + "CREATE STREAM TEST2 AS SELECT id, datesub(DAYS, 10, date) FROM TEST;" + ], + "inputs": [ + {"topic": "test", "value": "40"}, + {"topic": "test", "value": "5"} + ], + "outputs": [ + {"topic": "TEST2", "value": "30"}, + {"topic": "TEST2", "value": "\"-5\""} + ] + }, + { + "name": "throws on incorrect type", + "statements": [ + "CREATE STREAM TEST (ID STRING KEY, date DATE) WITH (kafka_topic='test', value_format='DELIMITED');", + "CREATE STREAM TEST2 AS SELECT id, datesub(date, 5, date) FROM TEST;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Function 'datesub' does not accept parameters (DATE, INTEGER, DATE)." + } + }, + { + "name": "subtracts negative intervals", + "statements": [ + "CREATE STREAM TEST (ID INT KEY, date DATE) WITH (kafka_topic='test', value_format='AVRO');", + "CREATE STREAM TEST2 AS SELECT id, datesub(DAYS, -5, date) AS VALUE FROM TEST;" + ], + "inputs": [ + {"topic": "test", "key": 0, "value": {"date": 5}}, + {"topic": "test", "key": 0, "value": {"date": -5}} + ], + "outputs": [ + {"topic": "TEST2", "key": 0, "value": {"VALUE": 10}}, + {"topic": "TEST2", "key": 0, "value": {"VALUE": 0}} + ] + }, + { + "name": "handles null values", + "statements": [ + "CREATE STREAM TEST (ID INT KEY, date DATE, num INTEGER) WITH (kafka_topic='test', value_format='AVRO');", + "CREATE STREAM TEST2 AS SELECT id, datesub(MILLISECONDS, num, date) AS VALUE FROM TEST;" + ], + "inputs": [ + {"topic": "test", "key": 0, "value": {"date": null, "num": 5}}, + {"topic": "test", "key": 0, "value": {"date": 5, "num": null}}, + {"topic": "test", "key": 0, "value": {"date": null, "num": null}} + ], + "outputs": [ + {"topic": "TEST2", "key": 0, "value": {"VALUE": null}}, + {"topic": "TEST2", "key": 0, "value": {"VALUE": null}}, + {"topic": "TEST2", "key": 0, "value": {"VALUE": null}} + ] + } + ] +} \ No newline at end of file