Skip to content

Commit

Permalink
feat: add FROM_DAYS and update UNIX_DATE function (#7742)
Browse files Browse the repository at this point in the history
* feat: add TO_DAYS and update UNIX_DATE function

* add to_days docs

* update historic plans
  • Loading branch information
Zara Lim authored Jul 1, 2021
1 parent c63e924 commit 3c68710
Show file tree
Hide file tree
Showing 17 changed files with 1,012 additions and 6 deletions.
3 changes: 1 addition & 2 deletions design-proposals/klip-46-date-and-time-data-type-support.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ The following functions will be added/updated:
* `PARSE_DATE(format, date_string)` - converts a date string in the specified format to a DATE
* `PARSE_TIME(format, time_string)` - converts a time string in the specified format to a TIME
* `UNIX_DATE(date)` - returns an INTEGER number of days that have passed between Unix epoch and the specified date
* `UNIX_TIME(time)` - returns an INTEGER number of milliseconds that have passed 00:00:00.000 and the specified time
* `CONVERT_TZ(time, from_tz, to_tz)` - converts a time from one timezone to another
* `FROM_DAYS(int)` - convert epoch days to a DATE value
* `DATEADD(time unit, integer, date)` - Adds an interval to the date. The time unit must be `DAYS` or `YEARS`. If it is not,
then the function will throw an error
* `DATESUB(time unit, integer, date)` - Subtracts an interval from the date. The time unit must be `DAYS` or `YEARS`. If it is not,
Expand Down
19 changes: 15 additions & 4 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1082,11 +1082,14 @@ complex type are not inspected.
Since: 0.6.0

```sql
UNIX_DATE()
UNIX_DATE([date])
```

Gets an integer representing days since epoch. The returned timestamp
may differ depending on the local time of different ksqlDB Server instances.

If `UNIX_DATE` is called with the date parameter, the function returns the DATE
value as an INTEGER value representing the number of days since `1970-01-01`.

If the `date` parameter is not provided, it returns an integer representing days since `1970-01-01`.
The returned integer may differ depending on the local time of different ksqlDB Server instances.

### `UNIX_TIMESTAMP`

Expand Down Expand Up @@ -1274,6 +1277,14 @@ FROM_UNIXTIME(milliseconds)

Converts a BIGINT millisecond timestamp value into a TIMESTAMP value.

### `FROM_DAYS`

```sql
FROM_DAYS(days)
```

Converts an INT number of days since epoch to a DATE value.

### TIMESTAMPADD

Since: 0.17
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.datetime;

import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.util.KsqlConstants;
import java.sql.Date;
import java.util.concurrent.TimeUnit;

@UdfDescription(
name = "from_days",
category = FunctionCategory.DATE_TIME,
description = "Converts a number of days since epoch to a DATE value.",
author = KsqlConstants.CONFLUENT_AUTHOR
)
public class FromDays {

@Udf(description = "Converts a number of days since epoch to a DATE value.")
public Date fromDays(final int days) {
return new Date(TimeUnit.DAYS.toMillis(days));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.util.KsqlConstants;
import java.sql.Date;
import java.time.LocalDate;
import java.util.concurrent.TimeUnit;

@UdfDescription(
name = "unix_date",
Expand All @@ -35,4 +37,13 @@ public int unixDate() {
return ((int) LocalDate.now().toEpochDay());
}

@Udf(description = "Returns the current number of days since "
+ "1970-01-01 00:00:00 UTC/GMT represented by the given date.")
public Integer unixDate(final Date date) {
if (date == null) {
return null;
}
return (int) TimeUnit.MILLISECONDS.toDays(date.getTime());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.datetime;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

import java.sql.Date;
import org.junit.Before;
import org.junit.Test;

public class FromDaysTest {
private FromDays udf;

@Before
public void setUp() {
udf = new FromDays();
}

@Test
public void shouldConvertToTimestamp() {
assertThat(udf.fromDays(50), is(new Date(4320000000L)));
assertThat(udf.fromDays(-50), is(new Date(-4320000000L)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@

package io.confluent.ksql.function.udf.datetime;

import java.sql.Date;
import org.junit.Before;
import org.junit.Test;
import java.time.LocalDate;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

public class UnixDateTest {

Expand All @@ -41,4 +45,22 @@ public void shouldGetTheUnixDate() {
assertEquals(now, result);
}

@Test
public void shouldReturnDays() {
// When:
final int result = udf.unixDate(new Date(864000000));

// Then:
assertThat(result, is(10));
}

@Test
public void shouldReturnNull() {
// When:
final Integer result = udf.unixDate(null);

// Then:
assertNull(result);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (K STRING KEY, D INTEGER) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`K` STRING KEY, `D` INTEGER",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TS AS SELECT\n TEST.K K,\n FROM_DAYS(TEST.D) TS\nFROM TEST TEST\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TS",
"schema" : "`K` STRING KEY, `TS` DATE",
"topicName" : "TS",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "TS",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "TS"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"sourceSchema" : "`K` STRING KEY, `D` INTEGER"
},
"keyColumnNames" : [ "K" ],
"selectExpressions" : [ "FROM_DAYS(D) AS TS" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"topicName" : "TS"
},
"queryId" : "CSAS_TS_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.persistence.default.format.key" : "KAFKA",
"ksql.query.persistent.max.bytes.buffering.total" : "-1",
"ksql.queryanonymizer.logs_enabled" : "true",
"ksql.query.error.max.queue.size" : "10",
"ksql.variable.substitution.enable" : "true",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.queryanonymizer.cluster_namespace" : null,
"ksql.query.pull.metrics.enabled" : "true",
"ksql.create.or.replace.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.lambdas.enabled" : "true",
"ksql.suppress.enabled" : "false",
"ksql.query.push.scalable.enabled" : "false",
"ksql.query.push.scalable.interpreter.enabled" : "true",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.persistence.wrap.single.values" : null,
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.query.transient.max.bytes.buffering.total" : "-1",
"ksql.schema.registry.url" : "",
"ksql.properties.overrides.denylist" : "",
"ksql.query.pull.max.concurrent.requests" : "2147483647",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.query.pull.interpreter.enabled" : "true",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.metrics.tags.custom" : "",
"ksql.persistence.default.format.value" : null,
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.nested.error.set.null" : "true",
"ksql.udf.collect.metrics" : "false",
"ksql.query.pull.thread.pool.size" : "100",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}
Loading

0 comments on commit 3c68710

Please sign in to comment.