diff --git a/design-proposals/klip-46-date-and-time-data-type-support.md b/design-proposals/klip-46-date-and-time-data-type-support.md index 4d1863215b68..2663f4f1cb32 100644 --- a/design-proposals/klip-46-date-and-time-data-type-support.md +++ b/design-proposals/klip-46-date-and-time-data-type-support.md @@ -127,8 +127,7 @@ The following functions will be added/updated: * `PARSE_DATE(format, date_string)` - converts a date string in the specified format to a DATE * `PARSE_TIME(format, time_string)` - converts a time string in the specified format to a TIME * `UNIX_DATE(date)` - returns an INTEGER number of days that have passed between Unix epoch and the specified date -* `UNIX_TIME(time)` - returns an INTEGER number of milliseconds that have passed 00:00:00.000 and the specified time -* `CONVERT_TZ(time, from_tz, to_tz)` - converts a time from one timezone to another +* `FROM_DAYS(int)` - convert epoch days to a DATE value * `DATEADD(time unit, integer, date)` - Adds an interval to the date. The time unit must be `DAYS` or `YEARS`. If it is not, then the function will throw an error * `DATESUB(time unit, integer, date)` - Subtracts an interval from the date. The time unit must be `DAYS` or `YEARS`. If it is not, diff --git a/docs/developer-guide/ksqldb-reference/scalar-functions.md b/docs/developer-guide/ksqldb-reference/scalar-functions.md index 838520b12bca..858c67c60a0a 100644 --- a/docs/developer-guide/ksqldb-reference/scalar-functions.md +++ b/docs/developer-guide/ksqldb-reference/scalar-functions.md @@ -1082,11 +1082,14 @@ complex type are not inspected. Since: 0.6.0 ```sql -UNIX_DATE() +UNIX_DATE([date]) ``` - -Gets an integer representing days since epoch. The returned timestamp -may differ depending on the local time of different ksqlDB Server instances. + +If `UNIX_DATE` is called with the date parameter, the function returns the DATE +value as an INTEGER value representing the number of days since `1970-01-01`. + +If the `date` parameter is not provided, it returns an integer representing days since `1970-01-01`. +The returned integer may differ depending on the local time of different ksqlDB Server instances. ### `UNIX_TIMESTAMP` @@ -1274,6 +1277,14 @@ FROM_UNIXTIME(milliseconds) Converts a BIGINT millisecond timestamp value into a TIMESTAMP value. +### `FROM_DAYS` + +```sql +FROM_DAYS(days) +``` + +Converts an INT number of days since epoch to a DATE value. + ### TIMESTAMPADD Since: 0.17 diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/FromDays.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/FromDays.java new file mode 100644 index 000000000000..df2f5b7988df --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/FromDays.java @@ -0,0 +1,37 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.datetime; + +import io.confluent.ksql.function.FunctionCategory; +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.util.KsqlConstants; +import java.sql.Date; +import java.util.concurrent.TimeUnit; + +@UdfDescription( + name = "from_days", + category = FunctionCategory.DATE_TIME, + description = "Converts a number of days since epoch to a DATE value.", + author = KsqlConstants.CONFLUENT_AUTHOR +) +public class FromDays { + + @Udf(description = "Converts a number of days since epoch to a DATE value.") + public Date fromDays(final int days) { + return new Date(TimeUnit.DAYS.toMillis(days)); + } + +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/UnixDate.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/UnixDate.java index 0ad7c21c7264..6505d69856c2 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/UnixDate.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/datetime/UnixDate.java @@ -18,7 +18,9 @@ import io.confluent.ksql.function.udf.Udf; import io.confluent.ksql.function.udf.UdfDescription; import io.confluent.ksql.util.KsqlConstants; +import java.sql.Date; import java.time.LocalDate; +import java.util.concurrent.TimeUnit; @UdfDescription( name = "unix_date", @@ -35,4 +37,13 @@ public int unixDate() { return ((int) LocalDate.now().toEpochDay()); } + @Udf(description = "Returns the current number of days since " + + "1970-01-01 00:00:00 UTC/GMT represented by the given date.") + public Integer unixDate(final Date date) { + if (date == null) { + return null; + } + return (int) TimeUnit.MILLISECONDS.toDays(date.getTime()); + } + } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/FromDaysTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/FromDaysTest.java new file mode 100644 index 000000000000..678ae2ab1043 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/FromDaysTest.java @@ -0,0 +1,37 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.datetime; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.sql.Date; +import org.junit.Before; +import org.junit.Test; + +public class FromDaysTest { + private FromDays udf; + + @Before + public void setUp() { + udf = new FromDays(); + } + + @Test + public void shouldConvertToTimestamp() { + assertThat(udf.fromDays(50), is(new Date(4320000000L))); + assertThat(udf.fromDays(-50), is(new Date(-4320000000L))); + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/UnixDateTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/UnixDateTest.java index b1e5a1956683..47c70ef30052 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/UnixDateTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/datetime/UnixDateTest.java @@ -14,11 +14,15 @@ package io.confluent.ksql.function.udf.datetime; +import java.sql.Date; import org.junit.Before; import org.junit.Test; import java.time.LocalDate; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; public class UnixDateTest { @@ -41,4 +45,22 @@ public void shouldGetTheUnixDate() { assertEquals(now, result); } + @Test + public void shouldReturnDays() { + // When: + final int result = udf.unixDate(new Date(864000000)); + + // Then: + assertThat(result, is(10)); + } + + @Test + public void shouldReturnNull() { + // When: + final Integer result = udf.unixDate(null); + + // Then: + assertNull(result); + } + } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/from-days_-_convert_days_to_date/7.0.0_1624951332511/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/from-days_-_convert_days_to_date/7.0.0_1624951332511/plan.json new file mode 100644 index 000000000000..c130b1f65263 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/from-days_-_convert_days_to_date/7.0.0_1624951332511/plan.json @@ -0,0 +1,153 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (K STRING KEY, D INTEGER) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`K` STRING KEY, `D` INTEGER", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TS AS SELECT\n TEST.K K,\n FROM_DAYS(TEST.D) TS\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TS", + "schema" : "`K` STRING KEY, `TS` DATE", + "topicName" : "TS", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TS", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TS" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`K` STRING KEY, `D` INTEGER" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "FROM_DAYS(D) AS TS" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "TS" + }, + "queryId" : "CSAS_TS_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/from-days_-_convert_days_to_date/7.0.0_1624951332511/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/from-days_-_convert_days_to_date/7.0.0_1624951332511/spec.json new file mode 100644 index 000000000000..afa05bb2f63d --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/from-days_-_convert_days_to_date/7.0.0_1624951332511/spec.json @@ -0,0 +1,108 @@ +{ + "version" : "7.0.0", + "timestamp" : 1624951332511, + "path" : "query-validation-tests/from-days.json", + "schemas" : { + "CSAS_TS_0.TS" : { + "schema" : "`K` STRING KEY, `TS` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "CSAS_TS_0.KsqlTopic.Source" : { + "schema" : "`K` STRING KEY, `D` INTEGER", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + } + }, + "testCase" : { + "name" : "convert days to date", + "inputs" : [ { + "topic" : "test_topic", + "key" : "0", + "value" : "10" + }, { + "topic" : "test_topic", + "key" : "0", + "value" : "-10" + }, { + "topic" : "test_topic", + "key" : "0", + "value" : null + } ], + "outputs" : [ { + "topic" : "TS", + "key" : "0", + "value" : "10" + }, { + "topic" : "TS", + "key" : "0", + "value" : "\"-10\"" + }, { + "topic" : "TS", + "key" : "0", + "value" : null + } ], + "topics" : [ { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "TS", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (K STRING KEY, d INT) WITH (kafka_topic='test_topic', value_format='DELIMITED');", "CREATE STREAM TS AS select K, from_days(d) as ts from test;" ], + "post" : { + "sources" : [ { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `D` INTEGER", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TS", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `TS` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "TS", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/from-days_-_convert_days_to_date/7.0.0_1624951332511/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/from-days_-_convert_days_to_date/7.0.0_1624951332511/topology new file mode 100644 index 000000000000..aabd9cade152 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/from-days_-_convert_days_to_date/7.0.0_1624951332511/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TS) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/unix-date_-_convert_date_to_days/7.0.0_1625112537895/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/unix-date_-_convert_date_to_days/7.0.0_1625112537895/plan.json new file mode 100644 index 000000000000..d66b3433ca14 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/unix-date_-_convert_date_to_days/7.0.0_1625112537895/plan.json @@ -0,0 +1,154 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (K STRING KEY, DATE DATE) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`K` STRING KEY, `DATE` DATE", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TS AS SELECT\n TEST.K K,\n UNIX_DATE(TEST.DATE) TS\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TS", + "schema" : "`K` STRING KEY, `TS` INTEGER", + "topicName" : "TS", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TS", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TS" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`K` STRING KEY, `DATE` DATE" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "UNIX_DATE(DATE) AS TS" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "TS" + }, + "queryId" : "CSAS_TS_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/unix-date_-_convert_date_to_days/7.0.0_1625112537895/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/unix-date_-_convert_date_to_days/7.0.0_1625112537895/spec.json new file mode 100644 index 000000000000..c89f6f6b4116 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/unix-date_-_convert_date_to_days/7.0.0_1625112537895/spec.json @@ -0,0 +1,108 @@ +{ + "version" : "7.0.0", + "timestamp" : 1625112537895, + "path" : "query-validation-tests/unix-date.json", + "schemas" : { + "CSAS_TS_0.TS" : { + "schema" : "`K` STRING KEY, `TS` INTEGER", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "CSAS_TS_0.KsqlTopic.Source" : { + "schema" : "`K` STRING KEY, `DATE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + } + }, + "testCase" : { + "name" : "convert date to days", + "inputs" : [ { + "topic" : "test_topic", + "key" : "0", + "value" : "10" + }, { + "topic" : "test_topic", + "key" : "0", + "value" : "100" + }, { + "topic" : "test_topic", + "key" : "0", + "value" : null + } ], + "outputs" : [ { + "topic" : "TS", + "key" : "0", + "value" : "10" + }, { + "topic" : "TS", + "key" : "0", + "value" : "100" + }, { + "topic" : "TS", + "key" : "0", + "value" : null + } ], + "topics" : [ { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "TS", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (K STRING KEY, date DATE) WITH (kafka_topic='test_topic', value_format='DELIMITED');", "CREATE STREAM TS AS select K, unix_date(date) as ts from test;" ], + "post" : { + "sources" : [ { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `DATE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TS", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `TS` INTEGER", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "TS", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/unix-date_-_convert_date_to_days/7.0.0_1625112537895/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/unix-date_-_convert_date_to_days/7.0.0_1625112537895/topology new file mode 100644 index 000000000000..aabd9cade152 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/unix-date_-_convert_date_to_days/7.0.0_1625112537895/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TS) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/unix-date_-_returns_the_current_date/7.0.0_1625112537868/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/unix-date_-_returns_the_current_date/7.0.0_1625112537868/plan.json new file mode 100644 index 000000000000..af72f8716134 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/unix-date_-_returns_the_current_date/7.0.0_1625112537868/plan.json @@ -0,0 +1,154 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (K STRING KEY, NAME STRING) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`K` STRING KEY, `NAME` STRING", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TS AS SELECT\n TEST.K K,\n (UNIX_DATE() > 100) TS\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TS", + "schema" : "`K` STRING KEY, `TS` BOOLEAN", + "topicName" : "TS", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TS", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TS" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`K` STRING KEY, `NAME` STRING" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "(UNIX_DATE() > 100) AS TS" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "TS" + }, + "queryId" : "CSAS_TS_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/unix-date_-_returns_the_current_date/7.0.0_1625112537868/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/unix-date_-_returns_the_current_date/7.0.0_1625112537868/spec.json new file mode 100644 index 000000000000..d1cab3efafd2 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/unix-date_-_returns_the_current_date/7.0.0_1625112537868/spec.json @@ -0,0 +1,108 @@ +{ + "version" : "7.0.0", + "timestamp" : 1625112537868, + "path" : "query-validation-tests/unix-date.json", + "schemas" : { + "CSAS_TS_0.TS" : { + "schema" : "`K` STRING KEY, `TS` BOOLEAN", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "CSAS_TS_0.KsqlTopic.Source" : { + "schema" : "`K` STRING KEY, `NAME` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + } + }, + "testCase" : { + "name" : "returns the current date", + "inputs" : [ { + "topic" : "test_topic", + "key" : "0", + "value" : "a" + }, { + "topic" : "test_topic", + "key" : "0", + "value" : "b" + }, { + "topic" : "test_topic", + "key" : "0", + "value" : null + } ], + "outputs" : [ { + "topic" : "TS", + "key" : "0", + "value" : "true" + }, { + "topic" : "TS", + "key" : "0", + "value" : "true" + }, { + "topic" : "TS", + "key" : "0", + "value" : null + } ], + "topics" : [ { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "TS", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (K STRING KEY, name STRING) WITH (kafka_topic='test_topic', value_format='DELIMITED');", "CREATE STREAM TS AS select K, unix_date() > 100 as ts from test;" ], + "post" : { + "sources" : [ { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `NAME` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TS", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `TS` BOOLEAN", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "TS", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/unix-date_-_returns_the_current_date/7.0.0_1625112537868/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/unix-date_-_returns_the_current_date/7.0.0_1625112537868/topology new file mode 100644 index 000000000000..aabd9cade152 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/unix-date_-_returns_the_current_date/7.0.0_1625112537868/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TS) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/from-days.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/from-days.json new file mode 100644 index 000000000000..893591d66627 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/from-days.json @@ -0,0 +1,24 @@ +{ + "comments": [ + "Tests covering the use of the FROM_DAYS function." + ], + "tests": [ + { + "name": "convert days to date", + "statements": [ + "CREATE STREAM TEST (K STRING KEY, d INT) WITH (kafka_topic='test_topic', value_format='DELIMITED');", + "CREATE STREAM TS AS select K, from_days(d) as ts from test;" + ], + "inputs": [ + {"topic": "test_topic", "key": "0", "value": "10"}, + {"topic": "test_topic", "key": "0", "value": "-10"}, + {"topic": "test_topic", "key": "0", "value": null} + ], + "outputs": [ + {"topic": "TS", "key": "0", "value": "10"}, + {"topic": "TS", "key": "0", "value": "\"-10\""}, + {"topic": "TS", "key": "0", "value": null} + ] + } + ] +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/unix-date.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/unix-date.json new file mode 100644 index 000000000000..f2f1458e3cba --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/unix-date.json @@ -0,0 +1,41 @@ +{ + "comments": [ + "Tests covering the use of the UNIX_DATE function." + ], + "tests": [ + { + "name": "returns the current date", + "statements": [ + "CREATE STREAM TEST (K STRING KEY, name STRING) WITH (kafka_topic='test_topic', value_format='DELIMITED');", + "CREATE STREAM TS AS select K, unix_date() > 100 as ts from test;" + ], + "inputs": [ + {"topic": "test_topic", "key": "0", "value": "a"}, + {"topic": "test_topic", "key": "0", "value": "b"}, + {"topic": "test_topic", "key": "0", "value": null} + ], + "outputs": [ + {"topic": "TS", "key": "0", "value": "true"}, + {"topic": "TS", "key": "0", "value": "true"}, + {"topic": "TS", "key": "0", "value": null} + ] + }, + { + "name": "convert date to days", + "statements": [ + "CREATE STREAM TEST (K STRING KEY, date DATE) WITH (kafka_topic='test_topic', value_format='DELIMITED');", + "CREATE STREAM TS AS select K, unix_date(date) as ts from test;" + ], + "inputs": [ + {"topic": "test_topic", "key": "0", "value": "10"}, + {"topic": "test_topic", "key": "0", "value": "100"}, + {"topic": "test_topic", "key": "0", "value": null} + ], + "outputs": [ + {"topic": "TS", "key": "0", "value": "10"}, + {"topic": "TS", "key": "0", "value": "100"}, + {"topic": "TS", "key": "0", "value": null} + ] + } + ] +} \ No newline at end of file