diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ColumnType.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ColumnType.java index 115905a01e5c..f909cef6f1d1 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ColumnType.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ColumnType.java @@ -20,7 +20,8 @@ */ public interface ColumnType { - enum Type { STRING, INTEGER, BIGINT, DOUBLE, BOOLEAN, DECIMAL, BYTES, ARRAY, MAP, STRUCT } + enum Type { STRING, INTEGER, BIGINT, DOUBLE, BOOLEAN, DECIMAL, BYTES, ARRAY, MAP, STRUCT, + TIMESTAMP, DATE, TIME } /** * Returns the type. diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java index 13bb50bd83e2..d4bd806f411c 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java @@ -1816,6 +1816,9 @@ private static void verifyRowWithIndex(final Row row, final int index) { assertThat(values.getKsqlObject(8), is(row.getKsqlObject("f_map"))); assertThat(values.getKsqlObject(9), is(row.getKsqlObject("f_struct"))); assertThat(values.getValue(10), is(nullValue())); + assertThat(values.getValue(11), is(row.getString("f_timestamp"))); + assertThat(values.getValue(12), is(row.getString("f_date"))); + assertThat(values.getValue(13), is(row.getString("f_time"))); assertThat(values.toJsonString(), is((new JsonArray(values.getList())).toString())); assertThat(values.toString(), is(values.toJsonString())); diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/RowImplTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/RowImplTest.java index 5696bfa72578..dbbc9f49f595 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/RowImplTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/RowImplTest.java @@ -41,10 +41,10 @@ public class RowImplTest { private static final List COLUMN_NAMES = ImmutableList.of("f_str", "f_int", "f_long", "f_double", "f_bool", "f_decimal", "f_bytes", - "f_array", "f_map", "f_struct", "f_null"); + "f_array", "f_map", "f_struct", "f_null", "f_timestamp", "f_date", "f_time"); private static final List COLUMN_TYPES = RowUtil.columnTypesFromStrings( ImmutableList.of("STRING", "INTEGER", "BIGINT", "DOUBLE", "BOOLEAN", "DECIMAL", "BYTES", - "ARRAY", "MAP", "STRUCT", "INTEGER")); + "ARRAY", "MAP", "STRUCT", "INTEGER", "TIMESTAMP", "DATE", "TIME")); private static final Map COLUMN_NAME_TO_INDEX = RowUtil.valueToIndexMap(COLUMN_NAMES); private static final JsonArray VALUES = new JsonArray() .add("foo") @@ -57,7 +57,10 @@ public class RowImplTest { .add(new JsonArray("[\"e1\",\"e2\"]")) .add(new JsonObject("{\"k1\":\"v1\",\"k2\":\"v2\"}")) .add(new JsonObject("{\"f1\":\"baz\",\"f2\":12}")) - .addNull(); + .addNull() + .add("2020-01-01T04:40:34.789") // server endpoint returns timestamp/date/time as strings + .add("2020-01-01") + .add("04:40:34.789"); private RowImpl row; @@ -83,6 +86,9 @@ public void shouldOneIndexColumnNames() { assertThat(row.getValue(9), is(new JsonObject("{\"k1\":\"v1\",\"k2\":\"v2\"}"))); assertThat(row.getValue(10), is(new JsonObject("{\"f1\":\"baz\",\"f2\":12}"))); assertThat(row.getValue(11), is(nullValue())); + assertThat(row.getValue(12), is("2020-01-01T04:40:34.789")); + assertThat(row.getValue(13), is("2020-01-01")); + assertThat(row.getValue(14), is("04:40:34.789")); } @Test @@ -175,6 +181,9 @@ public void shouldGetAsObject() { assertThat(obj.getKsqlArray("f_array"), is(new KsqlArray(ImmutableList.of("e1", "e2")))); assertThat(obj.getKsqlObject("f_map"), is(new KsqlObject(ImmutableMap.of("k1", "v1", "k2", "v2")))); assertThat(obj.getKsqlObject("f_struct"), is(new KsqlObject(ImmutableMap.of("f1", "baz", "f2", 12)))); + assertThat(obj.getString("f_timestamp"), is("2020-01-01T04:40:34.789")); + assertThat(obj.getString("f_date"), is("2020-01-01")); + assertThat(obj.getString("f_time"), is("04:40:34.789")); } @Test diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java index 020a98028566..694e5ede1670 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java @@ -79,6 +79,7 @@ import io.confluent.ksql.rest.server.TestKsqlRestApp; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; +import io.confluent.ksql.schema.ksql.SqlTimeTypes; import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.serde.Format; import io.confluent.ksql.serde.FormatFactory; @@ -97,6 +98,9 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -136,10 +140,11 @@ public class ClientIntegrationTest { private static final String TEST_STREAM = TEST_DATA_PROVIDER.sourceName(); private static final int TEST_NUM_ROWS = TEST_DATA_PROVIDER.data().size(); private static final List TEST_COLUMN_NAMES = - ImmutableList.of("K", "STR", "LONG", "DEC", "BYTES_", "ARRAY", "MAP", "STRUCT", "COMPLEX"); + ImmutableList.of("K", "STR", "LONG", "DEC", "BYTES_", "ARRAY", "MAP", "STRUCT", "COMPLEX", + "TIMESTAMP", "DATE", "TIME"); private static final List TEST_COLUMN_TYPES = RowUtil.columnTypesFromStrings(ImmutableList.of("STRUCT", "STRING", "BIGINT", "DECIMAL", - "BYTES", "ARRAY", "MAP", "STRUCT", "STRUCT")); + "BYTES", "ARRAY", "MAP", "STRUCT", "STRUCT", "TIMESTAMP", "DATE", "TIME")); private static final List TEST_EXPECTED_ROWS = convertToClientRows(TEST_DATA_PROVIDER.data()); @@ -623,7 +628,10 @@ public void shouldInsertInto() throws Exception { .put("ARRAY", new KsqlArray().add("v1").add("v2")) .put("MAP", new KsqlObject().put("some_key", "a_value").put("another_key", "")) .put("STRUCT", new KsqlObject().put("f1", 12)) // Nested field names are case-insensitive - .put("COMPLEX", COMPLEX_FIELD_VALUE); + .put("COMPLEX", COMPLEX_FIELD_VALUE) + .put("TIMESTAMP", "1970-01-01T00:00:00.001") + .put("DATE", "1970-01-01") + .put("TIME", "00:00:01"); // When client.insertInto(EMPTY_TEST_STREAM.toLowerCase(), insertRow).get(); // Stream name is case-insensitive @@ -643,6 +651,9 @@ public void shouldInsertInto() throws Exception { assertThat(rows.get(0).getKsqlObject("MAP"), is(new KsqlObject().put("some_key", "a_value").put("another_key", ""))); assertThat(rows.get(0).getKsqlObject("STRUCT"), is(new KsqlObject().put("F1", 12))); assertThat(rows.get(0).getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE)); + assertThat(rows.get(0).getString("TIMESTAMP"), is("1970-01-01T00:00:00.001")); + assertThat(rows.get(0).getString("DATE"), is("1970-01-01")); + assertThat(rows.get(0).getString("TIME"), is("00:00:01")); } @Test @@ -680,7 +691,10 @@ public void shouldStreamQueryWithProperties() throws Exception { .put("ARRAY", new KsqlArray().add("v1_shouldStreamQueryWithProperties").add("v2_shouldStreamQueryWithProperties")) .put("MAP", new KsqlObject().put("test_name", "shouldStreamQueryWithProperties")) .put("STRUCT", new KsqlObject().put("F1", 4)) - .put("COMPLEX", COMPLEX_FIELD_VALUE); + .put("COMPLEX", COMPLEX_FIELD_VALUE) + .put("TIMESTAMP", "1970-01-01T00:00:00.001") + .put("DATE", "1970-01-01") + .put("TIME", "00:00:00"); // When final StreamedQueryResult queryResult = client.streamQuery(sql, properties).get(); @@ -705,6 +719,9 @@ public void shouldStreamQueryWithProperties() throws Exception { assertThat(row.getKsqlObject("MAP"), is(new KsqlObject().put("test_name", "shouldStreamQueryWithProperties"))); assertThat(row.getKsqlObject("STRUCT"), is(new KsqlObject().put("F1", 4))); assertThat(row.getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE)); + assertThat(row.getString("TIMESTAMP"), is("1970-01-01T00:00:00.001")); + assertThat(row.getString("DATE"), is("1970-01-01")); + assertThat(row.getString("TIME"), is("00:00")); } @Test @@ -723,7 +740,10 @@ public void shouldExecuteQueryWithProperties() { .put("ARRAY", new KsqlArray().add("v1_shouldExecuteQueryWithProperties").add("v2_shouldExecuteQueryWithProperties")) .put("MAP", new KsqlObject().put("test_name", "shouldExecuteQueryWithProperties")) .put("STRUCT", new KsqlObject().put("F1", 4)) - .put("COMPLEX", COMPLEX_FIELD_VALUE); + .put("COMPLEX", COMPLEX_FIELD_VALUE) + .put("TIMESTAMP", "1970-01-01T00:00:00.001") + .put("DATE", "1970-01-01") + .put("TIME", "00:00:00"); // When final BatchedQueryResult queryResult = client.executeQuery(sql, properties); @@ -763,6 +783,9 @@ public void shouldExecuteQueryWithProperties() { assertThat(row.getKsqlObject("MAP"), is(new KsqlObject().put("test_name", "shouldExecuteQueryWithProperties"))); assertThat(row.getKsqlObject("STRUCT"), is(new KsqlObject().put("F1", 4))); assertThat(row.getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE)); + assertThat(row.getString("TIMESTAMP"), is("1970-01-01T00:00:00.001")); + assertThat(row.getString("DATE"), is("1970-01-01")); + assertThat(row.getString("TIME"), is("00:00")); } @Test @@ -787,7 +810,10 @@ public void shouldStreamInserts() throws Exception { .put("BYTES_", new byte[]{0, 1, 2}) .put("ARRAY", new KsqlArray().add("v_" + i)) .put("MAP", new KsqlObject().put("k_" + i, "v_" + i)) - .put("COMPLEX", COMPLEX_FIELD_VALUE)); + .put("COMPLEX", COMPLEX_FIELD_VALUE) + .put("TIMESTAMP", "1970-01-01T00:00:00.001") + .put("DATE", "1970-01-01") + .put("TIME", "00:00")); } // Then @@ -816,6 +842,9 @@ public void shouldStreamInserts() throws Exception { assertThat(rows.get(i).getKsqlArray("ARRAY"), is(new KsqlArray().add("v_" + i))); assertThat(rows.get(i).getKsqlObject("MAP"), is(new KsqlObject().put("k_" + i, "v_" + i))); assertThat(rows.get(i).getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE)); + assertThat(rows.get(i).getString("TIMESTAMP"), is("1970-01-01T00:00:00.001")); + assertThat(rows.get(i).getString("DATE"), is("1970-01-01")); + assertThat(rows.get(i).getString("TIME"), is("00:00")); } // When: end connection @@ -1067,7 +1096,7 @@ public void shouldDescribeSource() throws Exception { + "COMPLEX STRUCT<`DECIMAL` DECIMAL(2, 1), STRUCT STRUCT, " + "ARRAY_ARRAY ARRAY>, ARRAY_STRUCT ARRAY>, " + "ARRAY_MAP ARRAY>, MAP_ARRAY MAP>, " - + "MAP_MAP MAP>, MAP_STRUCT MAP>>) " + + "MAP_MAP MAP>, MAP_STRUCT MAP>>, TIMESTAMP TIMESTAMP, DATE DATE, TIME TIME) " + "WITH (KAFKA_TOPIC='STRUCTURED_TYPES_TOPIC', KEY_FORMAT='JSON', VALUE_FORMAT='JSON');")); } @@ -1430,6 +1459,12 @@ private static void addObjectToKsqlArray(final KsqlArray array, final Object val // Can't use expectedRow.add((BigDecimal) value) directly since client serializes BigDecimal as string, // whereas this method builds up the expected result (unrelated to serialization) array.addAll(new KsqlArray(Collections.singletonList(value))); + } else if (value instanceof Timestamp) { + array.add(SqlTimeTypes.formatTimestamp((Timestamp) value)); + } else if (value instanceof Date) { + array.add(SqlTimeTypes.formatDate((Date) value)); + } else if (value instanceof Time) { + array.add(SqlTimeTypes.formatTime((Time) value)); } else { array.add(value); } diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/util/RowUtilTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/util/RowUtilTest.java index 52eabb1e4b2d..96db8a84baeb 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/util/RowUtilTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/util/RowUtilTest.java @@ -38,7 +38,10 @@ public void shouldGetColumnTypesFromStrings() { "ARRAY", "MAP", "DECIMAL(4, 2)", - "STRUCT<`F1` STRING, `F2` INTEGER>" + "STRUCT<`F1` STRING, `F2` INTEGER>", + "TIMESTAMP", + "DATE", + "TIME" ); // When @@ -58,7 +61,10 @@ public void shouldGetColumnTypesFromStrings() { "ARRAY", "MAP", "DECIMAL", - "STRUCT" + "STRUCT", + "TIMESTAMP", + "DATE", + "TIME" )); } } \ No newline at end of file diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/StructuredTypesDataProvider.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/StructuredTypesDataProvider.java index 39559fa536f9..14dd69879726 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/StructuredTypesDataProvider.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/StructuredTypesDataProvider.java @@ -32,7 +32,9 @@ import io.confluent.ksql.serde.connect.ConnectSchemas; import java.math.BigDecimal; import java.math.RoundingMode; -import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -74,6 +76,9 @@ public class StructuredTypesDataProvider extends TestDataProvider { )) .build() ) + .valueColumn(ColumnName.of("TIMESTAMP"), SqlTypes.TIMESTAMP) + .valueColumn(ColumnName.of("DATE"), SqlTypes.DATE) + .valueColumn(ColumnName.of("TIME"), SqlTypes.TIME) .build(); private static final PhysicalSchema PHYSICAL_SCHEMA = PhysicalSchema @@ -89,18 +94,24 @@ public class StructuredTypesDataProvider extends TestDataProvider { private static final Multimap ROWS = ImmutableListMultimap .builder() .put(genericKey(generateStructKey("a")), genericRow("FOO", 1L, new BigDecimal("1.11"), new byte[]{1}, - Collections.singletonList("a"), Collections.singletonMap("k1", "v1"), generateSimpleStructValue(2), generateComplexStructValue(0))) + Collections.singletonList("a"), Collections.singletonMap("k1", "v1"), generateSimpleStructValue(2), generateComplexStructValue(0), + new Timestamp(1), new Date(86400000), new Time(0))) .put(genericKey(generateStructKey("b")), genericRow("BAR", 2L, new BigDecimal("2.22"), new byte[]{2}, - Collections.emptyList(), Collections.emptyMap(), generateSimpleStructValue(3), generateComplexStructValue(1))) + Collections.emptyList(), Collections.emptyMap(), generateSimpleStructValue(3), generateComplexStructValue(1), + new Timestamp(2), new Date(86400000 * 2), new Time(1))) .put(genericKey(generateStructKey("c")), genericRow("BAZ", 3L, new BigDecimal("30.33"), new byte[]{3}, - Collections.singletonList("b"), Collections.emptyMap(), generateSimpleStructValue(null), generateComplexStructValue(2))) + Collections.singletonList("b"), Collections.emptyMap(), generateSimpleStructValue(null), generateComplexStructValue(2), + new Timestamp(3), new Date(86400000 * 3), new Time(2))) .put(genericKey(generateStructKey("d")), genericRow("BUZZ", 4L, new BigDecimal("40.44"), new byte[]{4}, - ImmutableList.of("c", "d"), Collections.emptyMap(), generateSimpleStructValue(88), generateComplexStructValue(3))) + ImmutableList.of("c", "d"), Collections.emptyMap(), generateSimpleStructValue(88), generateComplexStructValue(3), + new Timestamp(4), new Date(86400000 * 4), new Time(3))) // Additional entries for repeated keys .put(genericKey(generateStructKey("c")), genericRow("BAZ", 5L, new BigDecimal("12.0"), new byte[]{15}, - ImmutableList.of("e"), ImmutableMap.of("k1", "v1", "k2", "v2"), generateSimpleStructValue(0), generateComplexStructValue(4))) + ImmutableList.of("e"), ImmutableMap.of("k1", "v1", "k2", "v2"), generateSimpleStructValue(0), generateComplexStructValue(4), + new Timestamp(11), new Date(86400000 * 11), new Time(11))) .put(genericKey(generateStructKey("d")), genericRow("BUZZ", 6L, new BigDecimal("10.1"), new byte[]{6}, - ImmutableList.of("f", "g"), Collections.emptyMap(), generateSimpleStructValue(null), generateComplexStructValue(5))) + ImmutableList.of("f", "g"), Collections.emptyMap(), generateSimpleStructValue(null), generateComplexStructValue(5), + new Timestamp(12), new Date(86400000 * 12), new Time(12))) .build(); public StructuredTypesDataProvider() { diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/BaseApiTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/BaseApiTest.java index 865162199910..d85ef6dddcd1 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/BaseApiTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/BaseApiTest.java @@ -66,10 +66,12 @@ public class BaseApiTest { protected static final JsonArray DEFAULT_COLUMN_NAMES = new JsonArray().add("f_str").add("f_int") .add("f_bool").add("f_long").add("f_double").add("f_decimal").add("f_bytes") - .add("f_array").add("f_map").add("f_struct").add("f_null"); + .add("f_array").add("f_map").add("f_struct").add("f_null").add("f_timestamp") + .add("f_date").add("f_time"); protected static final JsonArray DEFAULT_COLUMN_TYPES = new JsonArray().add("STRING").add("INTEGER") .add("BOOLEAN").add("BIGINT").add("DOUBLE").add("DECIMAL(4, 2)").add("BYTES") - .add("ARRAY").add("MAP").add("STRUCT<`F1` STRING, `F2` INTEGER>").add("INTEGER"); + .add("ARRAY").add("MAP").add("STRUCT<`F1` STRING, `F2` INTEGER>").add("INTEGER") + .add("TIMESTAMP").add("DATE").add("TIME"); protected static final Schema F_STRUCT_SCHEMA = SchemaBuilder.struct() .field("F1", Schema.OPTIONAL_STRING_SCHEMA) .field("F2", Schema.OPTIONAL_INT32_SCHEMA) @@ -267,7 +269,10 @@ private static GenericRow rowWithIndex(final int index) { ImmutableList.of("s" + index, "t" + index), ImmutableMap.of("k" + index, "v" + index), structField, - null + null, + "2020-01-01T04:40:34.789", + "2020-01-01", + "04:40:34.789" ); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/ApiIntegrationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/ApiIntegrationTest.java index 2bbd8f3ddf1a..76795bf25f83 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/ApiIntegrationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/ApiIntegrationTest.java @@ -165,7 +165,7 @@ public void shouldExecutePushQueryWithLimit() { assertThat(response.rows, hasSize(2)); assertThat(response.responseObject.getJsonArray("columnNames"), is( new JsonArray().add("K").add("STR").add("LONG").add("DEC").add("BYTES_").add("ARRAY") - .add("MAP").add("STRUCT").add("COMPLEX"))); + .add("MAP").add("STRUCT").add("COMPLEX").add("TIMESTAMP").add("DATE").add("TIME"))); assertThat(response.responseObject.getJsonArray("columnTypes"), is( new JsonArray().add("STRUCT<`F1` ARRAY>").add("STRING").add("BIGINT") .add("DECIMAL(4, 2)").add("BYTES").add("ARRAY").add("MAP") @@ -173,7 +173,8 @@ public void shouldExecutePushQueryWithLimit() { .add("STRUCT<`DECIMAL` DECIMAL(2, 1), `STRUCT` STRUCT<`F1` STRING, `F2` INTEGER>, " + "`ARRAY_ARRAY` ARRAY>, `ARRAY_STRUCT` ARRAY>, " + "`ARRAY_MAP` ARRAY>, `MAP_ARRAY` MAP>, " - + "`MAP_MAP` MAP>, `MAP_STRUCT` MAP>>"))); + + "`MAP_MAP` MAP>, `MAP_STRUCT` MAP>>") + .add("TIMESTAMP").add("DATE").add("TIME"))); assertThat(response.responseObject.getString("queryId"), is(notNullValue())); }