Skip to content

Commit

Permalink
feat: implement casting for DATE/TIME (#7708)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zara Lim authored Jun 22, 2021
1 parent 7537d87 commit 18cc030
Show file tree
Hide file tree
Showing 14 changed files with 355 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,6 @@ public static String getSRSubject(final String topicName, final boolean isKey) {
* Default time and date patterns
*/
public static final String TIME_PATTERN = "HH:mm:ss.SSS";
public static final String DATE_TIME_PATTERN = "yyyy-MM-dd'T'" + TIME_PATTERN;
public static final String DATE_PATTERN = "yyyy-MM-dd";
public static final String DATE_TIME_PATTERN = DATE_PATTERN + "'T'" + TIME_PATTERN;
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private static ZoneId parseTimezone(final String timezone) {
}
}

private static String completeDate(final String date) {
public static String completeDate(final String date) {
final String[] parts = date.split("-");
if (parts.length == 1) {
return date + "-01-01";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.util.KsqlException;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -300,6 +302,8 @@ private static final class Rules {
// STRING:
.put(key(STRING, STRING), Coercer.PASS_THROUGH)
.put(key(STRING, TIMESTAMP), parser((v, t) -> SqlTimeTypes.parseTimestamp(v)))
.put(key(STRING, TIME), parser((v, t) -> SqlTimeTypes.parseTime(v)))
.put(key(STRING, DATE), parser((v, t) -> SqlTimeTypes.parseDate(v)))
// ARRAY:
.put(key(ARRAY, ARRAY), coercer(
DefaultSqlValueCoercer::canCoerceToArray,
Expand Down Expand Up @@ -341,9 +345,13 @@ private static final class Rules {
.put(key(STRING, DECIMAL), parser((v, t) -> DecimalUtil
.ensureFit(new BigDecimal(v), (SqlDecimal) t)))
.put(key(STRING, DOUBLE), parser((v, t) -> SqlDoubles.parseDouble(v)))
// TIMESTAMP:
// TIME:
.put(key(TIMESTAMP, STRING), coercer((c, v, t)
-> Result.of(SqlTimeTypes.formatTimestamp((Timestamp) v))))
.put(key(TIME, STRING), coercer((c, v, t)
-> Result.of(SqlTimeTypes.formatTime((Time) v))))
.put(key(DATE, STRING), coercer((c, v, t)
-> Result.of(SqlTimeTypes.formatDate((Date) v))))
.build();

private static Coercer parser(final BiFunction<String, SqlType, Object> parserFunction) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.schema.ksql;

import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.timestamp.PartialStringToTimestampParser;
import java.sql.Date;
import java.sql.Time;
Expand All @@ -24,6 +25,7 @@
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -33,16 +35,23 @@ public final class SqlTimeTypes {

private static PartialStringToTimestampParser PARSER = new PartialStringToTimestampParser();

private static final String TIME_HELP_MESSAGE = System.lineSeparator()
+ "Required format is: \"" + KsqlConstants.TIME_PATTERN + "\", "
+ "for example: '01:34:20.123' or '01:34:20'. "
+ "Partials are also supported, for example '01:34'";
private static final String DATE_HELP_MESSAGE = System.lineSeparator()
+ "Required format is: \"" + KsqlConstants.DATE_PATTERN + "\", "
+ "for example '2020-05-26'. "
+ "Partials are also supported, for example '2020-05'";

private SqlTimeTypes() {
}

/**
* Parse a SQL timestamp from a string.
*
* <p>Rejects {@code Infinity} and {@code Nan} as invalid.
*
* @param str the string to parse.
* @return the double value.
* @return the Timestamp value.
*/
public static Timestamp parseTimestamp(final String str) {
return PARSER.parseToTimestamp(str);
Expand All @@ -55,10 +64,36 @@ public static String formatTimestamp(final Timestamp timestamp) {
.format(timestamp.toInstant());
}

public static Time parseTime(final String str) {
try {
return new Time(LocalTime.parse(str).toNanoOfDay() / 1000000);
} catch (DateTimeParseException e) {
throw new KsqlException("Failed to parse time '" + str
+ "': " + e.getMessage()
+ TIME_HELP_MESSAGE,
e
);
}
}

public static String formatTime(final Time time) {
return LocalTime.ofSecondOfDay(time.getTime() / 1000).toString();
}

public static Date parseDate(final String str) {
try {
return new Date(TimeUnit.DAYS.toMillis(
LocalDate.parse(PartialStringToTimestampParser.completeDate(str))
.toEpochDay()));
} catch (DateTimeParseException e) {
throw new KsqlException("Failed to parse date '" + str
+ "': " + e.getMessage()
+ DATE_HELP_MESSAGE,
e
);
}
}

public static String formatDate(final Date date) {
return LocalDate.ofEpochDay(TimeUnit.MILLISECONDS.toDays(date.getTime())).toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,17 @@ public static class LaxValueCoercionTest {
.put(TIMESTAMP, new Timestamp(1535792475000L))
.build()
)
.put("09:01:15", ImmutableMap.<SqlType, Object>builder()
.put(STRING, "09:01:15")
.put(TIME, new Time(32475000L))
.build()
)
.put("2018-09-01", ImmutableMap.<SqlType, Object>builder()
.put(STRING, "2018-09-01")
.put(DATE, new Date(1535760000000L))
.put(TIMESTAMP, new Timestamp(1535760000000L))
.build()
)
// TIMESTAMP:
.put(new Timestamp(1535792475000L), ImmutableMap.<SqlType, Object>builder()
.put(TIMESTAMP, new Timestamp(1535792475000L))
Expand All @@ -296,10 +307,12 @@ public static class LaxValueCoercionTest {
)
.put(new Time(1000L), ImmutableMap.<SqlType, Object>builder()
.put(TIME, new Time(1000L))
.put(STRING, laxOnly("00:00:01"))
.build()
)
.put(new Date(636451200000L), ImmutableMap.<SqlType, Object>builder()
.put(DATE, new Date(636451200000L))
.put(STRING, laxOnly("1990-03-03"))
.build()
)
// ARRAY:
Expand All @@ -313,6 +326,8 @@ public static class LaxValueCoercionTest {
.put(array(array(BOOLEAN)), ImmutableList.of())
.put(array(map(STRING, STRING)), ImmutableList.of())
.put(array(SqlTypes.struct().field("a", INTEGER).build()), ImmutableList.of())
.put(array(TIME), ImmutableList.of())
.put(array(DATE), ImmutableList.of())
.put(array(TIMESTAMP), ImmutableList.of())
.build()
)
Expand Down Expand Up @@ -409,6 +424,14 @@ public static class LaxValueCoercionTest {
struct().field("a", TIMESTAMP).build(),
createStruct(struct().field("a", TIMESTAMP).build())
)
.put(
struct().field("abc", TIME).build(),
createStruct(struct().field("abc", TIME).build())
)
.put(
struct().field("def", DATE).build(),
createStruct(struct().field("def", DATE).build())
)
.build()
)
.put(createStruct(SqlTypes.struct()
Expand Down Expand Up @@ -1134,6 +1157,12 @@ static Object instanceFor(final SqlType from, final SqlType to) {
if (to.baseType() == SqlBaseType.TIMESTAMP) {
return "2018-09-01T09:01:15.000";
}
if (to.baseType() == SqlBaseType.TIME) {
return "09:01:15";
}
if (to.baseType() == SqlBaseType.DATE) {
return "2018-09-01";
}
// Intentional fall through
default:
final Object instance = INSTANCES.get(from.baseType());
Expand Down Expand Up @@ -1174,6 +1203,8 @@ private static final class SupportedCoercions {
.build())
.put(SqlBaseType.STRING, ImmutableSet.<SqlBaseType>builder()
.add(SqlBaseType.STRING)
.add(SqlBaseType.TIME)
.add(SqlBaseType.DATE)
.add(SqlBaseType.TIMESTAMP)
.build())
.put(SqlBaseType.ARRAY, ImmutableSet.<SqlBaseType>builder()
Expand Down Expand Up @@ -1229,6 +1260,12 @@ private static final class SupportedCoercions {
.put(SqlBaseType.TIMESTAMP, ImmutableSet.<SqlBaseType>builder()
.add(SqlBaseType.STRING)
.build())
.put(SqlBaseType.TIME, ImmutableSet.<SqlBaseType>builder()
.add(SqlBaseType.STRING)
.build())
.put(SqlBaseType.DATE, ImmutableSet.<SqlBaseType>builder()
.add(SqlBaseType.STRING)
.build())
.build();

private static boolean supported(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.schema.ksql;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThrows;

Expand Down Expand Up @@ -43,12 +44,52 @@ public void shouldFormatTimestamp() {
assertThat(SqlTimeTypes.formatTimestamp(new Timestamp(1552816800000L)), is("2019-03-17T10:00:00.000"));
}

@Test
public void shouldParseTime() {
assertThat(SqlTimeTypes.parseTime("10:00:00"), is(new Time(36000000)));
assertThat(SqlTimeTypes.parseTime("10:00"), is(new Time(36000000)));
assertThat(SqlTimeTypes.parseTime("10:00:00.001"), is(new Time(36000001)));
}

@Test
public void shouldNotParseTime() {
// When:
final KsqlException e = assertThrows(
KsqlException.class,
() -> SqlTimeTypes.parseTime("foo")
);

// Then
assertThat(e.getMessage(), containsString(
"Required format is: \"HH:mm:ss.SSS\""));
}

@Test
public void shouldFormatTime() {
assertThat(SqlTimeTypes.formatTime(new Time(1000)), is("00:00:01"));
assertThat(SqlTimeTypes.formatTime(new Time(1005)), is("00:00:01"));
}

@Test
public void shouldParseDate() {
assertThat(SqlTimeTypes.parseDate("1990"), is(new Date(631152000000L)));
assertThat(SqlTimeTypes.parseDate("1990-01"), is(new Date(631152000000L)));
assertThat(SqlTimeTypes.parseDate("1990-01-01"), is(new Date(631152000000L)));
}

@Test
public void shouldNotParseDate() {
// When:
final KsqlException e = assertThrows(
KsqlException.class,
() -> SqlTimeTypes.parseDate("foo")
);

// Then
assertThat(e.getMessage(), containsString(
"Required format is: \"yyyy-MM-dd\""));
}

@Test
public void shouldFormatDate() {
assertThat(SqlTimeTypes.formatDate(new Date(864000000)), is("1970-01-11"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.confluent.ksql.engine.generic;

import static io.confluent.ksql.schema.ksql.types.SqlBaseType.STRING;
import static io.confluent.ksql.schema.ksql.types.SqlTypes.DATE;
import static io.confluent.ksql.schema.ksql.types.SqlTypes.TIME;
import static io.confluent.ksql.schema.ksql.types.SqlTypes.TIMESTAMP;

import io.confluent.ksql.GenericRow;
Expand Down Expand Up @@ -113,15 +115,26 @@ protected Object visitExpression(final Expression expression, final Void context
fieldName,
valueSqlType,
value,
(fieldType == TIMESTAMP && valueSqlType == STRING)
? ". Timestamp format must be yyyy-mm-ddThh:mm:ss[.S]" :
""
parseTimeErrorMessage(fieldType, valueSqlType)
);
return new KsqlException(errorMessage);
})
.orElse(null);
}

private String parseTimeErrorMessage(final SqlType fieldType, final SqlBaseType valueType) {
if (valueType == STRING) {
if (fieldType == TIMESTAMP) {
return ". Timestamp format must be yyyy-mm-ddThh:mm:ss[.S]";
} else if (fieldType == TIME) {
return ". Time format must be hh:mm:ss[.S]";
} else if (fieldType == DATE) {
return ". Date format must be yyyy-mm-dd";
}
}
return "";
}

@Override
public Object visitNullLiteral(final NullLiteral node, final Void context) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
Expand Down Expand Up @@ -125,4 +127,66 @@ public void shouldParseTimestamp() {
assertTrue(o instanceof Timestamp);
assertThat(((Timestamp) o).getTime(), is(1610167202000L));
}

@Test
public void shouldThrowIfCannotParseTime() {
// Given:
final SqlType type = SqlTypes.TIME;
final Expression exp = new StringLiteral("abc");

// When:
final KsqlException e = assertThrows(
KsqlException.class,
() -> new GenericExpressionResolver(type, FIELD_NAME, registry, config, "insert value",
false).resolve(exp));

// Then:
assertThat(e.getMessage(), containsString("Time format must be hh:mm:ss[.S]"));
}

@Test
public void shouldParseTime() {
// Given:
final SqlType type = SqlTypes.TIME;
final Expression exp = new StringLiteral("04:40:02");

// When:
Object o = new GenericExpressionResolver(type, FIELD_NAME, registry, config, "insert value",
false).resolve(exp);

// Then:
assertTrue(o instanceof Time);
assertThat(((Time) o).getTime(), is(16802000L));
}

@Test
public void shouldThrowIfCannotParseDate() {
// Given:
final SqlType type = SqlTypes.DATE;
final Expression exp = new StringLiteral("abc");

// When:
final KsqlException e = assertThrows(
KsqlException.class,
() -> new GenericExpressionResolver(type, FIELD_NAME, registry, config, "insert value",
false).resolve(exp));

// Then:
assertThat(e.getMessage(), containsString("Date format must be yyyy-mm-dd"));
}

@Test
public void shouldParseDate() {
// Given:
final SqlType type = SqlTypes.DATE;
final Expression exp = new StringLiteral("2021-01-09");

// When:
Object o = new GenericExpressionResolver(type, FIELD_NAME, registry, config, "insert value",
false).resolve(exp);

// Then:
assertTrue(o instanceof Date);
assertThat(((Date) o).getTime(), is(1610150400000L));
}
}
Loading

0 comments on commit 18cc030

Please sign in to comment.