diff --git a/docs/developer-guide/ksqldb-reference/scalar-functions.md b/docs/developer-guide/ksqldb-reference/scalar-functions.md index 0e08545770f0..9c412e778fb4 100644 --- a/docs/developer-guide/ksqldb-reference/scalar-functions.md +++ b/docs/developer-guide/ksqldb-reference/scalar-functions.md @@ -1271,6 +1271,28 @@ TIMESTAMPSUB(unit, interval, COL0) Subtracts an interval from a timestamp. Intervals are defined by an integer value and a supported [time unit](../../reference/sql/time.md#Time units). +### TIMEADD + +Since: 0.20 + +```sql +TIMEADD(unit, interval, COL0) +``` + +Adds an interval to a time. Intervals are defined by an integer value and a supported +[time unit](../../reference/sql/time.md#Time units). + +### TIMESUB + +Since: 0.20 + +```sql +TIMESUB(unit, interval, COL0) +``` + +Subtracts an interval from a time. Intervals are defined by an integer value and a supported +[time unit](../../reference/sql/time.md#Time units). + ## URLs !!! note diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/TimeAdd.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/TimeAdd.java new file mode 100644 index 000000000000..457029866f95 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/TimeAdd.java @@ -0,0 +1,50 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.datetime; + +import io.confluent.ksql.function.FunctionCategory; +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; +import io.confluent.ksql.util.KsqlConstants; +import java.sql.Time; +import java.time.LocalTime; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.TimeUnit; + +@UdfDescription( + name = "timeadd", + category = FunctionCategory.DATE_TIME, + author = KsqlConstants.CONFLUENT_AUTHOR, + description = "Adds a duration to a TIME value." +) +public class TimeAdd { + + @Udf(description = "Adds a duration to a time") + public Time timeAdd( + @UdfParameter(description = "A unit of time, for example SECOND or HOUR") final TimeUnit unit, + @UdfParameter(description = "An integer number of intervals to add") final Integer interval, + @UdfParameter(description = "A TIME value.") final Time time + ) { + if (unit == null || interval == null || time == null) { + return null; + } + final long nanoResult = LocalTime.ofNanoOfDay(time.getTime() * 1000_000) + .plus(unit.toNanos(interval), ChronoUnit.NANOS) + .toNanoOfDay(); + return new Time(TimeUnit.NANOSECONDS.toMillis(nanoResult)); + } +} \ No newline at end of file diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/TimeSub.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/TimeSub.java new file mode 100644 index 000000000000..ee332e958c77 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/TimeSub.java @@ -0,0 +1,51 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.datetime; + +import io.confluent.ksql.function.FunctionCategory; +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; +import io.confluent.ksql.util.KsqlConstants; +import java.sql.Time; +import java.time.LocalTime; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.TimeUnit; + +@UdfDescription( + name = "timesub", + category = FunctionCategory.DATE_TIME, + author = KsqlConstants.CONFLUENT_AUTHOR, + description = "Subtracts a duration from a TIME value." +) +public class TimeSub { + + @Udf(description = "Subtracts a duration from a time") + public Time timeSub( + @UdfParameter(description = "A unit of time, for example SECOND or HOUR") final TimeUnit unit, + @UdfParameter( + description = "An integer number of intervals to subtract") final Integer interval, + @UdfParameter(description = "A TIME value.") final Time time + ) { + if (unit == null || interval == null || time == null) { + return null; + } + final long nanoResult = LocalTime.ofNanoOfDay(time.getTime() * 1000_000) + .minus(unit.toNanos(interval), ChronoUnit.NANOS) + .toNanoOfDay(); + return new Time(TimeUnit.NANOSECONDS.toMillis(nanoResult)); + } +} \ No newline at end of file diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/TimeAddTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/TimeAddTest.java new file mode 100644 index 000000000000..9c4dccd378fe --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/TimeAddTest.java @@ -0,0 +1,49 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.datetime; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertNull; + +import java.sql.Time; +import java.sql.Timestamp; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; + +public class TimeAddTest { + private TimeAdd udf; + + @Before + public void setUp() { + udf = new TimeAdd(); + } + + @Test + public void shouldAddToTime() { + // When: + assertThat(udf.timeAdd(TimeUnit.MILLISECONDS, 50, new Time(1000)).getTime(), is(1050L)); + assertThat(udf.timeAdd(TimeUnit.DAYS, 2, new Time(1000)).getTime(), is(1000L)); + assertThat(udf.timeAdd(TimeUnit.DAYS, -2, new Time(1000)).getTime(), is(1000L)); + assertThat(udf.timeAdd(TimeUnit.MINUTES, -1, new Time(60000)).getTime(), is(0L)); + } + + @Test + public void handleNullTime() { + assertNull(udf.timeAdd(TimeUnit.MILLISECONDS, -300, null)); + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/TimeSubTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/TimeSubTest.java new file mode 100644 index 000000000000..d4b7c64a0233 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/TimeSubTest.java @@ -0,0 +1,49 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.datetime; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertNull; + +import java.sql.Time; +import java.sql.Timestamp; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; + +public class TimeSubTest { + private TimeSub udf; + + @Before + public void setUp() { + udf = new TimeSub(); + } + + @Test + public void shouldAddToTime() { + // When: + assertThat(udf.timeSub(TimeUnit.MILLISECONDS, 50, new Time(1000)).getTime(), is(950L)); + assertThat(udf.timeSub(TimeUnit.DAYS, 2, new Time(1000)).getTime(), is(1000L)); + assertThat(udf.timeSub(TimeUnit.DAYS, -2, new Time(1000)).getTime(), is(1000L)); + assertThat(udf.timeSub(TimeUnit.MINUTES, -1, new Time(60000)).getTime(), is(120000L)); + } + + @Test + public void handleNullTime() { + assertNull(udf.timeSub(TimeUnit.MILLISECONDS, -300, null)); + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_adds/7.0.0_1624587222167/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_adds/7.0.0_1624587222167/plan.json new file mode 100644 index 000000000000..cac1c503c759 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_adds/7.0.0_1624587222167/plan.json @@ -0,0 +1,153 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID STRING KEY, TIME TIME) WITH (KAFKA_TOPIC='test', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` STRING KEY, `TIME` TIME", + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST2 AS SELECT\n TEST.ID ID,\n TIMEADD(MILLISECONDS, 10, TEST.TIME) KSQL_COL_0\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST2", + "schema" : "`ID` STRING KEY, `KSQL_COL_0` TIME", + "topicName" : "TEST2", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TEST2", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TEST2" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`ID` STRING KEY, `TIME` TIME" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "TIMEADD(MILLISECONDS, 10, TIME) AS KSQL_COL_0" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "TEST2" + }, + "queryId" : "CSAS_TEST2_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_adds/7.0.0_1624587222167/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_adds/7.0.0_1624587222167/spec.json new file mode 100644 index 000000000000..d399b8f46354 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_adds/7.0.0_1624587222167/spec.json @@ -0,0 +1,100 @@ +{ + "version" : "7.0.0", + "timestamp" : 1624587222167, + "path" : "query-validation-tests/timeadd.json", + "schemas" : { + "CSAS_TEST2_0.TEST2" : { + "schema" : "`ID` STRING KEY, `KSQL_COL_0` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "CSAS_TEST2_0.KsqlTopic.Source" : { + "schema" : "`ID` STRING KEY, `TIME` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + } + }, + "testCase" : { + "name" : "adds", + "inputs" : [ { + "topic" : "test", + "key" : null, + "value" : "10" + }, { + "topic" : "test", + "key" : null, + "value" : "86399995" + } ], + "outputs" : [ { + "topic" : "TEST2", + "key" : null, + "value" : "20" + }, { + "topic" : "TEST2", + "key" : null, + "value" : "5" + } ], + "topics" : [ { + "name" : "TEST2", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID STRING KEY, time TIME) WITH (kafka_topic='test', value_format='DELIMITED');", "CREATE STREAM TEST2 AS SELECT id, timeadd(MILLISECONDS, 10, time) FROM TEST;" ], + "post" : { + "sources" : [ { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` STRING KEY, `TIME` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST2", + "type" : "STREAM", + "schema" : "`ID` STRING KEY, `KSQL_COL_0` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "TEST2", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_adds/7.0.0_1624587222167/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_adds/7.0.0_1624587222167/topology new file mode 100644 index 000000000000..e080cd675668 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_adds/7.0.0_1624587222167/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TEST2) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_adds_negative_intervals/7.0.0_1624587222218/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_adds_negative_intervals/7.0.0_1624587222218/plan.json new file mode 100644 index 000000000000..fa20fe95b0f8 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_adds_negative_intervals/7.0.0_1624587222218/plan.json @@ -0,0 +1,153 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID INTEGER KEY, TIME TIME) WITH (KAFKA_TOPIC='test', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` INTEGER KEY, `TIME` TIME", + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST2 AS SELECT\n TEST.ID ID,\n TIMEADD(MILLISECONDS, -5, TEST.TIME) VALUE\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST2", + "schema" : "`ID` INTEGER KEY, `VALUE` TIME", + "topicName" : "TEST2", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TEST2", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TEST2" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "sourceSchema" : "`ID` INTEGER KEY, `TIME` TIME" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "TIMEADD(MILLISECONDS, -5, TIME) AS VALUE" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "topicName" : "TEST2" + }, + "queryId" : "CSAS_TEST2_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_adds_negative_intervals/7.0.0_1624587222218/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_adds_negative_intervals/7.0.0_1624587222218/spec.json new file mode 100644 index 000000000000..c1ca37283403 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_adds_negative_intervals/7.0.0_1624587222218/spec.json @@ -0,0 +1,166 @@ +{ + "version" : "7.0.0", + "timestamp" : 1624587222218, + "path" : "query-validation-tests/timeadd.json", + "schemas" : { + "CSAS_TEST2_0.TEST2" : { + "schema" : "`ID` INTEGER KEY, `VALUE` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CSAS_TEST2_0.KsqlTopic.Source" : { + "schema" : "`ID` INTEGER KEY, `TIME` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "testCase" : { + "name" : "adds negative intervals", + "inputs" : [ { + "topic" : "test", + "key" : 0, + "value" : { + "time" : 5 + } + }, { + "topic" : "test", + "key" : 0, + "value" : { + "time" : 10 + } + }, { + "topic" : "test", + "key" : 0, + "value" : { + "time" : 2 + } + } ], + "outputs" : [ { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : 0 + } + }, { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : 5 + } + }, { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : 86399997 + } + } ], + "topics" : [ { + "name" : "TEST2", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test", + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "TIME", + "type" : [ "null", { + "type" : "int", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Time", + "logicalType" : "time-millis" + } ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + }, + "valueFormat" : "AVRO", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID INT KEY, time TIME) WITH (kafka_topic='test', value_format='AVRO');", "CREATE STREAM TEST2 AS SELECT id, timeadd(MILLISECONDS, -5, time) AS VALUE FROM TEST;" ], + "post" : { + "sources" : [ { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` INTEGER KEY, `TIME` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST2", + "type" : "STREAM", + "schema" : "`ID` INTEGER KEY, `VALUE` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "TIME", + "type" : [ "null", { + "type" : "int", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Time", + "logicalType" : "time-millis" + } ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + } + }, { + "name" : "TEST2", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", { + "type" : "int", + "logicalType" : "time-millis" + } ], + "default" : null + } ] + } + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_adds_negative_intervals/7.0.0_1624587222218/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_adds_negative_intervals/7.0.0_1624587222218/topology new file mode 100644 index 000000000000..e080cd675668 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_adds_negative_intervals/7.0.0_1624587222218/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TEST2) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_handles_null_values/7.0.0_1624587222266/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_handles_null_values/7.0.0_1624587222266/plan.json new file mode 100644 index 000000000000..3c805c7918ce --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_handles_null_values/7.0.0_1624587222266/plan.json @@ -0,0 +1,153 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID INTEGER KEY, TIME TIME, NUM INTEGER) WITH (KAFKA_TOPIC='test', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` INTEGER KEY, `TIME` TIME, `NUM` INTEGER", + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST2 AS SELECT\n TEST.ID ID,\n TIMEADD(MILLISECONDS, TEST.NUM, TEST.TIME) VALUE\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST2", + "schema" : "`ID` INTEGER KEY, `VALUE` TIME", + "topicName" : "TEST2", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TEST2", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TEST2" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "sourceSchema" : "`ID` INTEGER KEY, `TIME` TIME, `NUM` INTEGER" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "TIMEADD(MILLISECONDS, NUM, TIME) AS VALUE" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "topicName" : "TEST2" + }, + "queryId" : "CSAS_TEST2_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_handles_null_values/7.0.0_1624587222266/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_handles_null_values/7.0.0_1624587222266/spec.json new file mode 100644 index 000000000000..dd4821d37348 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_handles_null_values/7.0.0_1624587222266/spec.json @@ -0,0 +1,177 @@ +{ + "version" : "7.0.0", + "timestamp" : 1624587222266, + "path" : "query-validation-tests/timeadd.json", + "schemas" : { + "CSAS_TEST2_0.TEST2" : { + "schema" : "`ID` INTEGER KEY, `VALUE` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CSAS_TEST2_0.KsqlTopic.Source" : { + "schema" : "`ID` INTEGER KEY, `TIME` TIME, `NUM` INTEGER", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "testCase" : { + "name" : "handles null values", + "inputs" : [ { + "topic" : "test", + "key" : 0, + "value" : { + "time" : null, + "num" : 5 + } + }, { + "topic" : "test", + "key" : 0, + "value" : { + "time" : 5, + "num" : null + } + }, { + "topic" : "test", + "key" : 0, + "value" : { + "time" : null, + "num" : null + } + } ], + "outputs" : [ { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : null + } + }, { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : null + } + }, { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : null + } + } ], + "topics" : [ { + "name" : "TEST2", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test", + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "TIME", + "type" : [ "null", { + "type" : "int", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Time", + "logicalType" : "time-millis" + } ], + "default" : null + }, { + "name" : "NUM", + "type" : [ "null", "int" ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + }, + "valueFormat" : "AVRO", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID INT KEY, time TIME, num INTEGER) WITH (kafka_topic='test', value_format='AVRO');", "CREATE STREAM TEST2 AS SELECT id, timeadd(MILLISECONDS, num, time) AS VALUE FROM TEST;" ], + "post" : { + "sources" : [ { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` INTEGER KEY, `TIME` TIME, `NUM` INTEGER", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST2", + "type" : "STREAM", + "schema" : "`ID` INTEGER KEY, `VALUE` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "TIME", + "type" : [ "null", { + "type" : "int", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Time", + "logicalType" : "time-millis" + } ], + "default" : null + }, { + "name" : "NUM", + "type" : [ "null", "int" ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + } + }, { + "name" : "TEST2", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", { + "type" : "int", + "logicalType" : "time-millis" + } ], + "default" : null + } ] + } + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_handles_null_values/7.0.0_1624587222266/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_handles_null_values/7.0.0_1624587222266/topology new file mode 100644 index 000000000000..e080cd675668 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timeadd_-_handles_null_values/7.0.0_1624587222266/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TEST2) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_handles_null_values/7.0.0_1624587224628/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_handles_null_values/7.0.0_1624587224628/plan.json new file mode 100644 index 000000000000..8218f5a214c1 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_handles_null_values/7.0.0_1624587224628/plan.json @@ -0,0 +1,153 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID INTEGER KEY, TIME TIME, NUM INTEGER) WITH (KAFKA_TOPIC='test', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` INTEGER KEY, `TIME` TIME, `NUM` INTEGER", + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST2 AS SELECT\n TEST.ID ID,\n TIMESUB(MILLISECONDS, TEST.NUM, TEST.TIME) VALUE\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST2", + "schema" : "`ID` INTEGER KEY, `VALUE` TIME", + "topicName" : "TEST2", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TEST2", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TEST2" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "sourceSchema" : "`ID` INTEGER KEY, `TIME` TIME, `NUM` INTEGER" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "TIMESUB(MILLISECONDS, NUM, TIME) AS VALUE" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "topicName" : "TEST2" + }, + "queryId" : "CSAS_TEST2_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_handles_null_values/7.0.0_1624587224628/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_handles_null_values/7.0.0_1624587224628/spec.json new file mode 100644 index 000000000000..4ae32e5b6544 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_handles_null_values/7.0.0_1624587224628/spec.json @@ -0,0 +1,177 @@ +{ + "version" : "7.0.0", + "timestamp" : 1624587224628, + "path" : "query-validation-tests/timesub.json", + "schemas" : { + "CSAS_TEST2_0.TEST2" : { + "schema" : "`ID` INTEGER KEY, `VALUE` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CSAS_TEST2_0.KsqlTopic.Source" : { + "schema" : "`ID` INTEGER KEY, `TIME` TIME, `NUM` INTEGER", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "testCase" : { + "name" : "handles null values", + "inputs" : [ { + "topic" : "test", + "key" : 0, + "value" : { + "time" : null, + "num" : 5 + } + }, { + "topic" : "test", + "key" : 0, + "value" : { + "time" : 5, + "num" : null + } + }, { + "topic" : "test", + "key" : 0, + "value" : { + "time" : null, + "num" : null + } + } ], + "outputs" : [ { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : null + } + }, { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : null + } + }, { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : null + } + } ], + "topics" : [ { + "name" : "TEST2", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test", + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "TIME", + "type" : [ "null", { + "type" : "int", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Time", + "logicalType" : "time-millis" + } ], + "default" : null + }, { + "name" : "NUM", + "type" : [ "null", "int" ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + }, + "valueFormat" : "AVRO", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID INT KEY, time TIME, num INTEGER) WITH (kafka_topic='test', value_format='AVRO');", "CREATE STREAM TEST2 AS SELECT id, timesub(MILLISECONDS, num, time) AS VALUE FROM TEST;" ], + "post" : { + "sources" : [ { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` INTEGER KEY, `TIME` TIME, `NUM` INTEGER", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST2", + "type" : "STREAM", + "schema" : "`ID` INTEGER KEY, `VALUE` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "TIME", + "type" : [ "null", { + "type" : "int", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Time", + "logicalType" : "time-millis" + } ], + "default" : null + }, { + "name" : "NUM", + "type" : [ "null", "int" ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + } + }, { + "name" : "TEST2", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", { + "type" : "int", + "logicalType" : "time-millis" + } ], + "default" : null + } ] + } + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_handles_null_values/7.0.0_1624587224628/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_handles_null_values/7.0.0_1624587224628/topology new file mode 100644 index 000000000000..e080cd675668 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_handles_null_values/7.0.0_1624587224628/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TEST2) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_subtracts/7.0.0_1624587224517/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_subtracts/7.0.0_1624587224517/plan.json new file mode 100644 index 000000000000..21ad6338c2f7 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_subtracts/7.0.0_1624587224517/plan.json @@ -0,0 +1,153 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID STRING KEY, TIME TIME) WITH (KAFKA_TOPIC='test', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` STRING KEY, `TIME` TIME", + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST2 AS SELECT\n TEST.ID ID,\n TIMESUB(MILLISECONDS, 10, TEST.TIME) KSQL_COL_0\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST2", + "schema" : "`ID` STRING KEY, `KSQL_COL_0` TIME", + "topicName" : "TEST2", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TEST2", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TEST2" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`ID` STRING KEY, `TIME` TIME" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "TIMESUB(MILLISECONDS, 10, TIME) AS KSQL_COL_0" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "TEST2" + }, + "queryId" : "CSAS_TEST2_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_subtracts/7.0.0_1624587224517/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_subtracts/7.0.0_1624587224517/spec.json new file mode 100644 index 000000000000..bacb9ae1e7d4 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_subtracts/7.0.0_1624587224517/spec.json @@ -0,0 +1,100 @@ +{ + "version" : "7.0.0", + "timestamp" : 1624587224517, + "path" : "query-validation-tests/timesub.json", + "schemas" : { + "CSAS_TEST2_0.TEST2" : { + "schema" : "`ID` STRING KEY, `KSQL_COL_0` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "CSAS_TEST2_0.KsqlTopic.Source" : { + "schema" : "`ID` STRING KEY, `TIME` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + } + }, + "testCase" : { + "name" : "subtracts", + "inputs" : [ { + "topic" : "test", + "key" : null, + "value" : "20" + }, { + "topic" : "test", + "key" : null, + "value" : "5" + } ], + "outputs" : [ { + "topic" : "TEST2", + "key" : null, + "value" : "10" + }, { + "topic" : "TEST2", + "key" : null, + "value" : "86399995" + } ], + "topics" : [ { + "name" : "TEST2", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID STRING KEY, time TIME) WITH (kafka_topic='test', value_format='DELIMITED');", "CREATE STREAM TEST2 AS SELECT id, timesub(MILLISECONDS, 10, time) FROM TEST;" ], + "post" : { + "sources" : [ { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` STRING KEY, `TIME` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST2", + "type" : "STREAM", + "schema" : "`ID` STRING KEY, `KSQL_COL_0` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "TEST2", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_subtracts/7.0.0_1624587224517/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_subtracts/7.0.0_1624587224517/topology new file mode 100644 index 000000000000..e080cd675668 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_subtracts/7.0.0_1624587224517/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TEST2) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_subtracts_negative_intervals/7.0.0_1624587224581/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_subtracts_negative_intervals/7.0.0_1624587224581/plan.json new file mode 100644 index 000000000000..407c4dec2dd1 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_subtracts_negative_intervals/7.0.0_1624587224581/plan.json @@ -0,0 +1,153 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID INTEGER KEY, TIME TIME) WITH (KAFKA_TOPIC='test', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` INTEGER KEY, `TIME` TIME", + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST2 AS SELECT\n TEST.ID ID,\n TIMESUB(MILLISECONDS, -5, TEST.TIME) VALUE\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST2", + "schema" : "`ID` INTEGER KEY, `VALUE` TIME", + "topicName" : "TEST2", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TEST2", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TEST2" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "sourceSchema" : "`ID` INTEGER KEY, `TIME` TIME" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "TIMESUB(MILLISECONDS, -5, TIME) AS VALUE" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "topicName" : "TEST2" + }, + "queryId" : "CSAS_TEST2_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_subtracts_negative_intervals/7.0.0_1624587224581/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_subtracts_negative_intervals/7.0.0_1624587224581/spec.json new file mode 100644 index 000000000000..00a0d63b4f8c --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_subtracts_negative_intervals/7.0.0_1624587224581/spec.json @@ -0,0 +1,166 @@ +{ + "version" : "7.0.0", + "timestamp" : 1624587224581, + "path" : "query-validation-tests/timesub.json", + "schemas" : { + "CSAS_TEST2_0.TEST2" : { + "schema" : "`ID` INTEGER KEY, `VALUE` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CSAS_TEST2_0.KsqlTopic.Source" : { + "schema" : "`ID` INTEGER KEY, `TIME` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "testCase" : { + "name" : "subtracts negative intervals", + "inputs" : [ { + "topic" : "test", + "key" : 0, + "value" : { + "time" : 5 + } + }, { + "topic" : "test", + "key" : 0, + "value" : { + "time" : 0 + } + }, { + "topic" : "test", + "key" : 0, + "value" : { + "time" : 86399997 + } + } ], + "outputs" : [ { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : 10 + } + }, { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : 5 + } + }, { + "topic" : "TEST2", + "key" : 0, + "value" : { + "VALUE" : 2 + } + } ], + "topics" : [ { + "name" : "TEST2", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test", + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "TIME", + "type" : [ "null", { + "type" : "int", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Time", + "logicalType" : "time-millis" + } ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + }, + "valueFormat" : "AVRO", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID INT KEY, time TIME) WITH (kafka_topic='test', value_format='AVRO');", "CREATE STREAM TEST2 AS SELECT id, timesub(MILLISECONDS, -5, time) AS VALUE FROM TEST;" ], + "post" : { + "sources" : [ { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` INTEGER KEY, `TIME` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST2", + "type" : "STREAM", + "schema" : "`ID` INTEGER KEY, `VALUE` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "TIME", + "type" : [ "null", { + "type" : "int", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Time", + "logicalType" : "time-millis" + } ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + } + }, { + "name" : "TEST2", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", { + "type" : "int", + "logicalType" : "time-millis" + } ], + "default" : null + } ] + } + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_subtracts_negative_intervals/7.0.0_1624587224581/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_subtracts_negative_intervals/7.0.0_1624587224581/topology new file mode 100644 index 000000000000..e080cd675668 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/timesub_-_subtracts_negative_intervals/7.0.0_1624587224581/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TEST2) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/timeadd.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/timeadd.json new file mode 100644 index 000000000000..afd559657d7a --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/timeadd.json @@ -0,0 +1,65 @@ +{ + "comments": ["tests for timeadd"], + "tests": [ + { + "name": "adds", + "statements": [ + "CREATE STREAM TEST (ID STRING KEY, time TIME) WITH (kafka_topic='test', value_format='DELIMITED');", + "CREATE STREAM TEST2 AS SELECT id, timeadd(MILLISECONDS, 10, time) FROM TEST;" + ], + "inputs": [ + {"topic": "test", "value": "10"}, + {"topic": "test", "value": "86399995"} + ], + "outputs": [ + {"topic": "TEST2", "value": "20"}, + {"topic": "TEST2", "value": "5"} + ] + }, + { + "name": "throws on incorrect type", + "statements": [ + "CREATE STREAM TEST (ID STRING KEY, time TIME) WITH (kafka_topic='test', value_format='DELIMITED');", + "CREATE STREAM TEST2 AS SELECT id, timeadd(time, 5, time) FROM TEST;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Function 'timeadd' does not accept parameters (TIME, INTEGER, TIME)." + } + }, + { + "name": "adds negative intervals", + "statements": [ + "CREATE STREAM TEST (ID INT KEY, time TIME) WITH (kafka_topic='test', value_format='AVRO');", + "CREATE STREAM TEST2 AS SELECT id, timeadd(MILLISECONDS, -5, time) AS VALUE FROM TEST;" + ], + "inputs": [ + {"topic": "test", "key": 0, "value": {"time": 5}}, + {"topic": "test", "key": 0, "value": {"time": 10}}, + {"topic": "test", "key": 0, "value": {"time": 2}} + ], + "outputs": [ + {"topic": "TEST2", "key": 0, "value": {"VALUE": 0}}, + {"topic": "TEST2", "key": 0, "value": {"VALUE": 5}}, + {"topic": "TEST2", "key": 0, "value": {"VALUE": 86399997}} + ] + }, + { + "name": "handles null values", + "statements": [ + "CREATE STREAM TEST (ID INT KEY, time TIME, num INTEGER) WITH (kafka_topic='test', value_format='AVRO');", + "CREATE STREAM TEST2 AS SELECT id, timeadd(MILLISECONDS, num, time) AS VALUE FROM TEST;" + ], + "inputs": [ + {"topic": "test", "key": 0, "value": {"time": null, "num": 5}}, + {"topic": "test", "key": 0, "value": {"time": 5, "num": null}}, + {"topic": "test", "key": 0, "value": {"time": null, "num": null}} + ], + "outputs": [ + {"topic": "TEST2", "key": 0, "value": {"VALUE": null}}, + {"topic": "TEST2", "key": 0, "value": {"VALUE": null}}, + {"topic": "TEST2", "key": 0, "value": {"VALUE": null}} + ] + } + ] +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/timesub.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/timesub.json new file mode 100644 index 000000000000..cd2ee6b4f2b8 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/timesub.json @@ -0,0 +1,65 @@ +{ + "comments": ["tests for timesub"], + "tests": [ + { + "name": "subtracts", + "statements": [ + "CREATE STREAM TEST (ID STRING KEY, time TIME) WITH (kafka_topic='test', value_format='DELIMITED');", + "CREATE STREAM TEST2 AS SELECT id, timesub(MILLISECONDS, 10, time) FROM TEST;" + ], + "inputs": [ + {"topic": "test", "value": "20"}, + {"topic": "test", "value": "5"} + ], + "outputs": [ + {"topic": "TEST2", "value": "10"}, + {"topic": "TEST2", "value": "86399995"} + ] + }, + { + "name": "throws on incorrect type", + "statements": [ + "CREATE STREAM TEST (ID STRING KEY, time TIME) WITH (kafka_topic='test', value_format='DELIMITED');", + "CREATE STREAM TEST2 AS SELECT id, timesub(time, 5, time) FROM TEST;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Function 'timesub' does not accept parameters (TIME, INTEGER, TIME)." + } + }, + { + "name": "subtracts negative intervals", + "statements": [ + "CREATE STREAM TEST (ID INT KEY, time TIME) WITH (kafka_topic='test', value_format='AVRO');", + "CREATE STREAM TEST2 AS SELECT id, timesub(MILLISECONDS, -5, time) AS VALUE FROM TEST;" + ], + "inputs": [ + {"topic": "test", "key": 0, "value": {"time": 5}}, + {"topic": "test", "key": 0, "value": {"time": 0}}, + {"topic": "test", "key": 0, "value": {"time": 86399997}} + ], + "outputs": [ + {"topic": "TEST2", "key": 0, "value": {"VALUE": 10}}, + {"topic": "TEST2", "key": 0, "value": {"VALUE": 5}}, + {"topic": "TEST2", "key": 0, "value": {"VALUE": 2}} + ] + }, + { + "name": "handles null values", + "statements": [ + "CREATE STREAM TEST (ID INT KEY, time TIME, num INTEGER) WITH (kafka_topic='test', value_format='AVRO');", + "CREATE STREAM TEST2 AS SELECT id, timesub(MILLISECONDS, num, time) AS VALUE FROM TEST;" + ], + "inputs": [ + {"topic": "test", "key": 0, "value": {"time": null, "num": 5}}, + {"topic": "test", "key": 0, "value": {"time": 5, "num": null}}, + {"topic": "test", "key": 0, "value": {"time": null, "num": null}} + ], + "outputs": [ + {"topic": "TEST2", "key": 0, "value": {"VALUE": null}}, + {"topic": "TEST2", "key": 0, "value": {"VALUE": null}}, + {"topic": "TEST2", "key": 0, "value": {"VALUE": null}} + ] + } + ] +} \ No newline at end of file