Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove time/date component when casting timestamp to date/time #7724

Merged
merged 2 commits into from
Jun 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,14 @@ public static Date parseDate(final String str) {
public static String formatDate(final Date date) {
return LocalDate.ofEpochDay(TimeUnit.MILLISECONDS.toDays(date.getTime())).toString();
}

public static Date timestampToDate(final Timestamp timestamp) {
final long epochDay = timestamp.toInstant().atZone(ZoneId.of("Z")).toLocalDate().toEpochDay();
return new Date(TimeUnit.DAYS.toMillis(epochDay));
}

public static Time timestampToTime(final Timestamp timestamp) {
final long nanoOfDay = timestamp.toInstant().atZone(ZoneId.of("Z")).toLocalTime().toNanoOfDay();
return new Time(TimeUnit.NANOSECONDS.toMillis(nanoOfDay));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ public final class CastEvaluator {
.put(key(STRUCT, STRING), CastEvaluator::castToString)
// TIME:
.put(key(TIMESTAMP, STRING), nonNullSafeCode("SqlTimeTypes.formatTimestamp(%s)"))
.put(key(TIMESTAMP, TIME), nonNullSafeCode("new Time(%s.getTime())"))
.put(key(TIMESTAMP, DATE), nonNullSafeCode("new Date(%s.getTime())"))
.put(key(TIMESTAMP, TIME), nonNullSafeCode("SqlTimeTypes.timestampToTime(%s)"))
.put(key(TIMESTAMP, DATE), nonNullSafeCode("SqlTimeTypes.timestampToDate(%s)"))
.put(key(DATE, TIMESTAMP), nonNullSafeCode("new Timestamp(%s.getTime())"))
.put(key(TIME, STRING), nonNullSafeCode("SqlTimeTypes.formatTime(%s)"))
.put(key(DATE, STRING), nonNullSafeCode("SqlTimeTypes.formatDate(%s)"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,15 @@ public void shouldNotCastIncorrectlyFormattedString() {
// Then:
assertThat(exception.getMessage(), containsString("Required format is: \"yyyy-MM-dd\""));
}

@Test
public void shouldCastDateToTimestamp() throws Exception {
// Given:
final Evaluator evaluator = cookCode(SqlTypes.DATE, SqlTypes.TIMESTAMP, config);

// Then:
assertThat(evaluator.rawEvaluate(new Date(864000000)), is(new Timestamp(864000000)));
}
}

@RunWith(MockitoJUnitRunner.class)
Expand All @@ -284,6 +293,24 @@ public void shouldNotCastIncorrectlyFormattedString() {
// Then:
assertThat(exception.getMessage(), containsString("Required format is: \"yyyy-MM-dd'T'HH:mm:ss.SSS\", with an optional numeric"));
}

@Test
public void shouldCastTimestampToDate() throws Exception {
// Given:
final Evaluator evaluator = cookCode(SqlTypes.TIMESTAMP, SqlTypes.DATE, config);

// Then:
assertThat(evaluator.rawEvaluate(new Timestamp(864033000)), is(new Date(864000000)));
}

@Test
public void shouldCastTimestampToTime() throws Exception {
// Given:
final Evaluator evaluator = cookCode(SqlTypes.TIMESTAMP, SqlTypes.TIME, config);

// Then:
assertThat(evaluator.rawEvaluate(new Timestamp(864033000)), is(new Time(33000)));
}
}

// Run CombinationTest again, for arrays with different element types:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (ID STRING KEY, DATE DATE) WITH (KAFKA_TOPIC='test', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`ID` STRING KEY, `DATE` DATE",
"topicName" : "test",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST2 AS SELECT\n TEST.ID ID,\n CAST(TEST.DATE AS STRING) RESULT\nFROM TEST TEST\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST2",
"schema" : "`ID` STRING KEY, `RESULT` STRING",
"topicName" : "TEST2",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "TEST2",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "TEST2"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"sourceSchema" : "`ID` STRING KEY, `DATE` DATE"
},
"keyColumnNames" : [ "ID" ],
"selectExpressions" : [ "CAST(DATE AS STRING) AS RESULT" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"topicName" : "TEST2"
},
"queryId" : "CSAS_TEST2_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.persistence.default.format.key" : "KAFKA",
"ksql.query.persistent.max.bytes.buffering.total" : "-1",
"ksql.queryanonymizer.logs_enabled" : "true",
"ksql.query.error.max.queue.size" : "10",
"ksql.variable.substitution.enable" : "true",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.queryanonymizer.cluster_namespace" : null,
"ksql.query.pull.metrics.enabled" : "true",
"ksql.create.or.replace.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.lambdas.enabled" : "true",
"ksql.suppress.enabled" : "false",
"ksql.query.push.scalable.enabled" : "false",
"ksql.query.push.scalable.interpreter.enabled" : "true",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.persistence.wrap.single.values" : null,
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.query.transient.max.bytes.buffering.total" : "-1",
"ksql.schema.registry.url" : "",
"ksql.properties.overrides.denylist" : "",
"ksql.query.pull.max.concurrent.requests" : "2147483647",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.query.pull.interpreter.enabled" : "true",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.metrics.tags.custom" : "",
"ksql.persistence.default.format.value" : null,
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.nested.error.set.null" : "true",
"ksql.udf.collect.metrics" : "false",
"ksql.query.pull.thread.pool.size" : "100",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
{
"version" : "7.0.0",
"timestamp" : 1624662245517,
"path" : "query-validation-tests/date.json",
"schemas" : {
"CSAS_TEST2_0.TEST2" : {
"schema" : "`ID` STRING KEY, `RESULT` STRING",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"CSAS_TEST2_0.KsqlTopic.Source" : {
"schema" : "`ID` STRING KEY, `DATE` DATE",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
}
},
"testCase" : {
"name" : "casting - date to string",
"inputs" : [ {
"topic" : "test",
"key" : null,
"value" : {
"date" : 10
}
}, {
"topic" : "test",
"key" : null,
"value" : {
"date" : -1
}
}, {
"topic" : "test",
"key" : null,
"value" : {
"date" : null
}
} ],
"outputs" : [ {
"topic" : "TEST2",
"key" : null,
"value" : {
"RESULT" : "1970-01-11"
}
}, {
"topic" : "TEST2",
"key" : null,
"value" : {
"RESULT" : "1969-12-31"
}
}, {
"topic" : "TEST2",
"key" : null,
"value" : {
"RESULT" : null
}
} ],
"topics" : [ {
"name" : "TEST2",
"replicas" : 1,
"numPartitions" : 4
}, {
"name" : "test",
"valueSchema" : {
"type" : "record",
"name" : "KsqlDataSourceSchema",
"namespace" : "io.confluent.ksql.avro_schemas",
"fields" : [ {
"name" : "DATE",
"type" : [ "null", {
"type" : "int",
"connect.version" : 1,
"connect.name" : "org.apache.kafka.connect.data.Date",
"logicalType" : "date"
} ],
"default" : null
} ],
"connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema"
},
"valueFormat" : "AVRO",
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM TEST (ID STRING KEY, date DATE) WITH (kafka_topic='test', value_format='AVRO');", "CREATE STREAM TEST2 AS SELECT ID, CAST(date AS STRING) AS RESULT FROM TEST;" ],
"post" : {
"sources" : [ {
"name" : "TEST",
"type" : "STREAM",
"schema" : "`ID` STRING KEY, `DATE` DATE",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : "AVRO",
"keyFeatures" : [ ],
"valueFeatures" : [ ]
}, {
"name" : "TEST2",
"type" : "STREAM",
"schema" : "`ID` STRING KEY, `RESULT` STRING",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : "AVRO",
"keyFeatures" : [ ],
"valueFeatures" : [ ]
} ],
"topics" : {
"topics" : [ {
"name" : "test",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
},
"partitions" : 4,
"valueSchema" : {
"type" : "record",
"name" : "KsqlDataSourceSchema",
"namespace" : "io.confluent.ksql.avro_schemas",
"fields" : [ {
"name" : "DATE",
"type" : [ "null", {
"type" : "int",
"connect.version" : 1,
"connect.name" : "org.apache.kafka.connect.data.Date",
"logicalType" : "date"
} ],
"default" : null
} ],
"connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema"
}
}, {
"name" : "TEST2",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
},
"partitions" : 4,
"valueSchema" : {
"type" : "record",
"name" : "KsqlDataSourceSchema",
"namespace" : "io.confluent.ksql.avro_schemas",
"fields" : [ {
"name" : "RESULT",
"type" : [ "null", "string" ],
"default" : null
} ]
}
} ]
}
}
}
}
Loading