Skip to content

Commit

Permalink
Add date data type support for Kudu
Browse files Browse the repository at this point in the history
Co-authored-by: Pascal Gasp <[email protected]>
  • Loading branch information
pgasp authored and Praveen2112 committed Jun 27, 2024
1 parent 2ea1838 commit 8947d40
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 34 deletions.
4 changes: 3 additions & 1 deletion docs/src/main/sphinx/connector/kudu.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ this table:
- `VARCHAR`
* - `BINARY`
- `VARBINARY`
* - `DATE`
- `DATE`
* - `UNIXTIME_MICROS`
- `TIMESTAMP(3)`
:::
Expand Down Expand Up @@ -266,7 +268,7 @@ this table:
- `BINARY`
-
* - `DATE`
- `STRING`
- `DATE`
-
* - `TIMESTAMP(3)`
- `UNIXTIME_MICROS`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.kudu.util.DateUtil.epochDaysToSqlDate;

public class KuduPageSink
implements ConnectorPageSink, ConnectorMergeSink
Expand Down Expand Up @@ -152,6 +153,9 @@ private void appendColumn(PartialRow row, Page page, int position, int channel,
if (block.isNull(position)) {
row.setNull(destChannel);
}
else if (DATE.equals(type)) {
row.addDate(destChannel, epochDaysToSqlDate(INTEGER.getInt(block, position)));
}
else if (TIMESTAMP_MILLIS.equals(type)) {
row.addLong(destChannel, truncateEpochMicrosToMillis(TIMESTAMP_MILLIS.getLong(block, position)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public static org.apache.kudu.Type toKuduClientType(Type type)
return org.apache.kudu.Type.BINARY;
}
if (type == DateType.DATE) {
return org.apache.kudu.Type.STRING;
return org.apache.kudu.Type.DATE;
}
if (type.equals(TIMESTAMP_MILLIS)) {
return org.apache.kudu.Type.UNIXTIME_MICROS;
Expand Down Expand Up @@ -116,15 +116,16 @@ private static Type fromKuduClientType(org.apache.kudu.Type ktype, ColumnTypeAtt
return DoubleType.DOUBLE;
case DECIMAL:
return DecimalType.createDecimalType(attributes.getPrecision(), attributes.getScale());
// TODO: add support for varchar and date types: https://github.com/trinodb/trino/issues/11009
case STRING:
return VarcharType.VARCHAR;
case BINARY:
return VarbinaryType.VARBINARY;
case DATE:
return DateType.DATE;
case UNIXTIME_MICROS:
return TIMESTAMP_MILLIS;
// TODO: add support for varchar types: https://github.com/trinodb/trino/issues/11009
case VARCHAR:
case DATE:
break;
}
throw new IllegalStateException("Kudu type not implemented for " + ktype);
Expand Down Expand Up @@ -166,6 +167,9 @@ public static Object getJavaValue(Type type, Object nativeValue)
if (type instanceof VarbinaryType) {
return ((Slice) nativeValue).toByteBuffer();
}
if (type.equals(DateType.DATE)) {
return nativeValue;
}
if (type.equals(TIMESTAMP_MILLIS)) {
// Kudu's native format is in microseconds
return nativeValue;
Expand Down Expand Up @@ -204,6 +208,9 @@ public static long getLong(Type type, RowResult row, int field)
}
throw new IllegalStateException("getLong not supported for long decimal: " + type);
}
if (type.equals(DateType.DATE)) {
return row.getInt(field);
}
if (type.equals(TIMESTAMP_MILLIS)) {
return truncateEpochMicrosToMillis(row.getLong(field));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ protected MaterializedResult getDescribeOrdersResult()
.row("custkey", "bigint", extra, "")
.row("orderstatus", "varchar", extra, "")
.row("totalprice", "double", extra, "")
.row("orderdate", "varchar", extra, "")
.row("orderdate", "date", extra, "")
.row("orderpriority", "varchar", extra, "")
.row("clerk", "varchar", extra, "")
.row("shippriority", "integer", extra, "")
Expand All @@ -198,7 +198,7 @@ public void testShowCreateTable()
" custkey bigint COMMENT '' WITH (nullable = true),\n" +
" orderstatus varchar COMMENT '' WITH (nullable = true),\n" +
" totalprice double COMMENT '' WITH (nullable = true),\n" +
" orderdate varchar COMMENT '' WITH (nullable = true),\n" +
" orderdate date COMMENT '' WITH (nullable = true),\n" +
" orderpriority varchar COMMENT '' WITH (nullable = true),\n" +
" clerk varchar COMMENT '' WITH (nullable = true),\n" +
" shippriority integer COMMENT '' WITH (nullable = true),\n" +
Expand Down Expand Up @@ -601,8 +601,6 @@ public void testInsertHighestUnicodeCharacter()
public void testInsertNegativeDate()
{
// TODO Remove this overriding test once kudu connector can create tables with default partitions
// TODO Update this test once kudu connector supports DATE type: https://github.com/trinodb/trino/issues/11009
// DATE type is not supported by Kudu connector
try (TestTable table = new TestTable(getQueryRunner()::execute, "insert_date",
"(dt DATE WITH (primary_key=true)) " +
"WITH (partition_by_hash_columns = ARRAY['dt'], partition_by_hash_buckets = 2)")) {
Expand All @@ -613,7 +611,7 @@ public void testInsertNegativeDate()
@Override
protected String errorMessageForInsertNegativeDate(String date)
{
return "Insert query has mismatched column types: Table: \\[varchar\\], Query: \\[date\\]";
return "Date value <-719893>} is out of range '0001-01-01':'9999-12-31'";
}

@Test
Expand Down Expand Up @@ -731,37 +729,24 @@ public void testWrittenStats()

@Test
@Override
public void testCreateTableAsSelectNegativeDate()
public void testVarcharCastToDateInPredicate()
{
// Map date column type to varchar
String tableName = "negative_date_" + randomNameSuffix();
assertThatThrownBy(super::testVarcharCastToDateInPredicate)
.hasStackTraceContaining("Table partitioning must be specified using setRangePartitionColumns or addHashPartitions");

try {
assertUpdate(format("CREATE TABLE %s AS SELECT DATE '-0001-01-01' AS dt", tableName), 1);
assertQuery("SELECT * FROM " + tableName, "VALUES '-0001-01-01'");
assertQuery(format("SELECT * FROM %s WHERE dt = '-0001-01-01'", tableName), "VALUES '-0001-01-01'");
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + tableName);
}
abort("TODO: implement the test for Kudu");
}

@Test
@Override
@SuppressWarnings("deprecation")
public void testDateYearOfEraPredicate()
{
assertThatThrownBy(super::testDateYearOfEraPredicate)
.hasStackTraceContaining("Cannot apply operator: varchar = date");
}

@Test
@Override
public void testVarcharCastToDateInPredicate()
{
assertThatThrownBy(super::testVarcharCastToDateInPredicate)
.hasStackTraceContaining("Table partitioning must be specified using setRangePartitionColumns or addHashPartitions");

abort("TODO: implement the test for Kudu");
// Override because the connector throws an exception instead of an empty result when the value is out of supported range
assertQuery("SELECT orderdate FROM orders WHERE orderdate = DATE '1997-09-14'", "VALUES DATE '1997-09-14'");
// TODO Replace failure with a TrinoException
assertThat(query("SELECT * FROM orders WHERE orderdate = DATE '-1996-09-14'"))
.nonTrinoExceptionFailure().hasMessageContaining("integer value out of range for Type: date column: -1448295");
}

@Test
Expand Down Expand Up @@ -1011,13 +996,17 @@ protected Optional<DataMappingTestSetup> filterDataMappingSmokeTestData(DataMapp
return Optional.of(dataMappingTestSetup.asUnsupported());
}

if (typeName.equals("date") // date gets stored as varchar
|| typeName.equals("varbinary") // TODO (https://github.com/trinodb/trino/issues/3416)
if (typeName.equals("varbinary") // TODO (https://github.com/trinodb/trino/issues/3416)
|| (typeName.startsWith("char") && dataMappingTestSetup.getSampleValueLiteral().contains(" "))) { // TODO: https://github.com/trinodb/trino/issues/3597
// TODO this should either work or fail cleanly
return Optional.empty();
}

if (typeName.equals("date") && dataMappingTestSetup.getSampleValueLiteral().equals("DATE '1582-10-05'")) {
// Kudu connector returns +10 days during julian->gregorian switch. The test case exists in TestKuduTypeMapping.testDate().
return Optional.empty();
}

return Optional.of(dataMappingTestSetup);
}

Expand Down Expand Up @@ -1065,6 +1054,12 @@ protected void verifyColumnNameLengthFailurePermissible(Throwable e)
assertThat(e).hasMessageContaining("invalid column name: identifier");
}

@Override
protected String errorMessageForCreateTableAsSelectNegativeDate(String date)
{
return ".*Date value <-719893>} is out of range '0001-01-01':'9999-12-31'.*";
}

private void assertTableProperty(String tableProperties, String key, String regexValue)
{
assertThat(Pattern.compile(key + "\\s*=\\s*" + regexValue + ",?\\s+").matcher(tableProperties).find())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.DecimalType.createDecimalType;
import static io.trino.spi.type.DoubleType.DOUBLE;
import static io.trino.spi.type.IntegerType.INTEGER;
Expand All @@ -42,6 +44,7 @@
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.String.format;
import static java.time.ZoneOffset.UTC;

final class TestKuduTypeMapping
Expand Down Expand Up @@ -244,6 +247,48 @@ void testVarbinary()
.execute(getQueryRunner(), trinoCreateAndInsert("test_varbinary"));
}

@Test
void testDate()
{
testDate(UTC);
testDate(jvmZone);
// using two non-JVM zones
testDate(vilnius);
testDate(kathmandu);
testDate(TestingSession.DEFAULT_TIME_ZONE_KEY.getZoneId());
}

private void testDate(ZoneId sessionZone)
{
Session session = Session.builder(getSession())
.setTimeZoneKey(TimeZoneKey.getTimeZoneKey(sessionZone.getId()))
.build();

dateTest(inputLiteral -> format("DATE %s", inputLiteral))
.execute(getQueryRunner(), session, trinoCreateAsSelect(session, "test_date"))
.execute(getQueryRunner(), session, trinoCreateAsSelect("test_date"))
.execute(getQueryRunner(), session, trinoCreateAndInsert(session, "test_date"))
.execute(getQueryRunner(), session, trinoCreateAndInsert("test_date"));
}

private static SqlDataTypeTest dateTest(Function<String, String> inputLiteralFactory)
{
return SqlDataTypeTest.create()
.addRoundTrip("date", "NULL", DATE, "CAST(NULL AS DATE)")
.addRoundTrip("date", inputLiteralFactory.apply("'0001-01-01'"), DATE, "DATE '0001-01-01'") // mon value in Kudu
.addRoundTrip("date", inputLiteralFactory.apply("'1582-10-04'"), DATE, "DATE '1582-10-04'") // before julian->gregorian switch
.addRoundTrip("date", inputLiteralFactory.apply("'1582-10-05'"), DATE, "DATE '1582-10-15'") // begin julian->gregorian switch
.addRoundTrip("date", inputLiteralFactory.apply("'1582-10-14'"), DATE, "DATE '1582-10-24'") // end julian->gregorian switch
.addRoundTrip("date", inputLiteralFactory.apply("'1952-04-03'"), DATE, "DATE '1952-04-03'") // before epoch
.addRoundTrip("date", inputLiteralFactory.apply("'1970-01-01'"), DATE, "DATE '1970-01-01'")
.addRoundTrip("date", inputLiteralFactory.apply("'1970-02-03'"), DATE, "DATE '1970-02-03'")
.addRoundTrip("date", inputLiteralFactory.apply("'1983-04-01'"), DATE, "DATE '1983-04-01'")
.addRoundTrip("date", inputLiteralFactory.apply("'1983-10-01'"), DATE, "DATE '1983-10-01'")
.addRoundTrip("date", inputLiteralFactory.apply("'2017-07-01'"), DATE, "DATE '2017-07-01'") // summer on northern hemisphere (possible DST)
.addRoundTrip("date", inputLiteralFactory.apply("'2017-01-01'"), DATE, "DATE '2017-01-01'") // winter on northern hemisphere (possible DST on southern hemisphere)
.addRoundTrip("date", inputLiteralFactory.apply("'9999-12-31'"), DATE, "DATE '9999-12-31'"); // max value in Kudu
}

@Test
void testTimestamp()
{
Expand Down

0 comments on commit 8947d40

Please sign in to comment.