Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: /inserts-stream endpoint now supports nested types #5621

Merged
merged 16 commits into from
Jun 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@
import java.math.BigDecimal;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import kafka.zookeeper.ZooKeeperClientException;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.After;
import org.junit.AfterClass;
Expand All @@ -91,9 +93,9 @@ public class ClientIntegrationTest {
private static final String TEST_STREAM = TEST_DATA_PROVIDER.kstreamName();
private static final int TEST_NUM_ROWS = TEST_DATA_PROVIDER.data().size();
private static final List<String> TEST_COLUMN_NAMES =
ImmutableList.of("STR", "LONG", "DEC", "ARRAY", "MAP");
ImmutableList.of("STR", "LONG", "DEC", "ARRAY", "MAP", "STRUCT", "COMPLEX");
private static final List<ColumnType> TEST_COLUMN_TYPES =
RowUtil.columnTypesFromStrings(ImmutableList.of("STRING", "BIGINT", "DECIMAL", "ARRAY", "MAP"));
RowUtil.columnTypesFromStrings(ImmutableList.of("STRING", "BIGINT", "DECIMAL", "ARRAY", "MAP", "STRUCT", "STRUCT"));
private static final List<KsqlArray> TEST_EXPECTED_ROWS = convertToClientRows(
TEST_DATA_PROVIDER.data());

Expand Down Expand Up @@ -123,6 +125,18 @@ public class ClientIntegrationTest {
RowUtil.columnTypesFromStrings(ImmutableList.of("STRING", "BIGINT"));
private static final KsqlArray PULL_QUERY_EXPECTED_ROW = new KsqlArray(ImmutableList.of("FOO", 1));

private static final KsqlObject COMPLEX_FIELD_VALUE = new KsqlObject()
.put("DECIMAL", new BigDecimal("1.1"))
.put("STRUCT", new KsqlObject().put("F1", "foo").put("F2", 3))
.put("ARRAY_ARRAY", new KsqlArray().add(new KsqlArray().add("bar")))
.put("ARRAY_STRUCT", new KsqlArray().add(new KsqlObject().put("F1", "x")))
.put("ARRAY_MAP", new KsqlArray().add(new KsqlObject().put("k", 10)))
.put("MAP_ARRAY", new KsqlObject().put("k", new KsqlArray().add("e1").add("e2")))
.put("MAP_MAP", new KsqlObject().put("k1", new KsqlObject().put("k2", 5)))
.put("MAP_STRUCT", new KsqlObject().put("k", new KsqlObject().put("F1", "baz")));
private static final KsqlObject EXPECTED_COMPLEX_FIELD_VALUE = COMPLEX_FIELD_VALUE.copy()
.put("DECIMAL", 1.1d); // Expect raw decimal value, whereas put(BigDecimal) serializes as string to avoid loss of precision

private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build();

private static final TestKsqlRestApp REST_APP = TestKsqlRestApp
Expand Down Expand Up @@ -435,10 +449,12 @@ public void shouldInsertInto() throws Exception {
// Given
final KsqlObject insertRow = new KsqlObject()
.put("str", "HELLO") // Column names are case-insensitive
.put("`LONG`", 100L) // Quotes may be used to preserve case-sensitivity
.put("DEC", new BigDecimal("13.31"))
.put("`LONG`", 100L) // Backticks may be used to preserve case-sensitivity
.put("\"DEC\"", new BigDecimal("13.31")) // Double quotes may also be used to preserve case-sensitivity
.put("ARRAY", new KsqlArray().add("v1").add("v2"))
.put("MAP", new KsqlObject().put("some_key", "a_value").put("another_key", ""));
.put("MAP", new KsqlObject().put("some_key", "a_value").put("another_key", ""))
.put("STRUCT", new KsqlObject().put("f1", 12)) // Nested field names are case-insensitive
.put("COMPLEX", COMPLEX_FIELD_VALUE);

// When
client.insertInto(EMPTY_TEST_STREAM.toLowerCase(), insertRow).get(); // Stream name is case-insensitive
Expand All @@ -454,6 +470,8 @@ public void shouldInsertInto() throws Exception {
assertThat(rows.get(0).getDecimal("DEC"), is(new BigDecimal("13.31")));
assertThat(rows.get(0).getKsqlArray("ARRAY"), is(new KsqlArray().add("v1").add("v2")));
assertThat(rows.get(0).getKsqlObject("MAP"), is(new KsqlObject().put("some_key", "a_value").put("another_key", "")));
assertThat(rows.get(0).getKsqlObject("STRUCT"), is(new KsqlObject().put("F1", 12)));
assertThat(rows.get(0).getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE));
}

@Test
Expand Down Expand Up @@ -487,7 +505,9 @@ public void shouldStreamQueryWithProperties() throws Exception {
.put("LONG", 2000L)
.put("DEC", new BigDecimal("12.34"))
.put("ARRAY", new KsqlArray().add("v1_shouldStreamQueryWithProperties").add("v2_shouldStreamQueryWithProperties"))
.put("MAP", new KsqlObject().put("test_name", "shouldStreamQueryWithProperties"));
.put("MAP", new KsqlObject().put("test_name", "shouldStreamQueryWithProperties"))
.put("STRUCT", new KsqlObject().put("F1", 4))
.put("COMPLEX", COMPLEX_FIELD_VALUE);

// When
final StreamedQueryResult queryResult = client.streamQuery(sql, properties).get();
Expand All @@ -508,6 +528,8 @@ public void shouldStreamQueryWithProperties() throws Exception {
assertThat(row.getDecimal("DEC"), is(new BigDecimal("12.34")));
assertThat(row.getKsqlArray("ARRAY"), is(new KsqlArray().add("v1_shouldStreamQueryWithProperties").add("v2_shouldStreamQueryWithProperties")));
assertThat(row.getKsqlObject("MAP"), is(new KsqlObject().put("test_name", "shouldStreamQueryWithProperties")));
assertThat(row.getKsqlObject("STRUCT"), is(new KsqlObject().put("F1", 4)));
assertThat(row.getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE));
}

@Test
Expand All @@ -522,7 +544,9 @@ public void shouldExecuteQueryWithProperties() {
.put("LONG", 2000L)
.put("DEC", new BigDecimal("12.34"))
.put("ARRAY", new KsqlArray().add("v1_shouldExecuteQueryWithProperties").add("v2_shouldExecuteQueryWithProperties"))
.put("MAP", new KsqlObject().put("test_name", "shouldExecuteQueryWithProperties"));
.put("MAP", new KsqlObject().put("test_name", "shouldExecuteQueryWithProperties"))
.put("STRUCT", new KsqlObject().put("F1", 4))
.put("COMPLEX", COMPLEX_FIELD_VALUE);

// When
final BatchedQueryResult queryResult = client.executeQuery(sql, properties);
Expand Down Expand Up @@ -558,6 +582,8 @@ public void shouldExecuteQueryWithProperties() {
assertThat(row.getDecimal("DEC"), is(new BigDecimal("12.34")));
assertThat(row.getKsqlArray("ARRAY"), is(new KsqlArray().add("v1_shouldExecuteQueryWithProperties").add("v2_shouldExecuteQueryWithProperties")));
assertThat(row.getKsqlObject("MAP"), is(new KsqlObject().put("test_name", "shouldExecuteQueryWithProperties")));
assertThat(row.getKsqlObject("STRUCT"), is(new KsqlObject().put("F1", 4)));
assertThat(row.getKsqlObject("COMPLEX"), is(EXPECTED_COMPLEX_FIELD_VALUE));
}

private Client createClient() {
Expand Down Expand Up @@ -617,13 +643,17 @@ private static void verifyStreamRowWithIndex(final Row row, final int index) {
assertThat(row.getDecimal("DEC"), is(expectedRow.getDecimal(2)));
assertThat(row.getKsqlArray("ARRAY"), is(expectedRow.getKsqlArray(3)));
assertThat(row.getKsqlObject("MAP"), is(expectedRow.getKsqlObject(4)));
assertThat(row.getKsqlObject("STRUCT"), is(expectedRow.getKsqlObject(5)));
assertThat(row.getKsqlObject("COMPLEX"), is(expectedRow.getKsqlObject(6)));

// verify index-based getters are 1-indexed
assertThat(row.getString(1), is(row.getString("STR")));
assertThat(row.getLong(2), is(row.getLong("LONG")));
assertThat(row.getDecimal(3), is(row.getDecimal("DEC")));
assertThat(row.getKsqlArray(4), is(row.getKsqlArray("ARRAY")));
assertThat(row.getKsqlObject(5), is(row.getKsqlObject("MAP")));
assertThat(row.getKsqlObject(6), is(row.getKsqlObject("STRUCT")));
assertThat(row.getKsqlObject(7), is(row.getKsqlObject("COMPLEX")));

// verify isNull() evaluation
assertThat(row.isNull("STR"), is(false));
Expand All @@ -640,6 +670,8 @@ private static void verifyStreamRowWithIndex(final Row row, final int index) {
assertThat(values.getDecimal(2), is(row.getDecimal("DEC")));
assertThat(values.getKsqlArray(3), is(row.getKsqlArray("ARRAY")));
assertThat(values.getKsqlObject(4), is(row.getKsqlObject("MAP")));
assertThat(values.getKsqlObject(5), is(row.getKsqlObject("STRUCT")));
assertThat(values.getKsqlObject(6), is(row.getKsqlObject("COMPLEX")));
assertThat(values.toJsonString(), is((new JsonArray(values.getList())).toString()));
assertThat(values.toString(), is(values.toJsonString()));

Expand All @@ -653,6 +685,8 @@ private static void verifyStreamRowWithIndex(final Row row, final int index) {
assertThat(obj.getDecimal("DEC"), is(row.getDecimal("DEC")));
assertThat(obj.getKsqlArray("ARRAY"), is(row.getKsqlArray("ARRAY")));
assertThat(obj.getKsqlObject("MAP"), is(row.getKsqlObject("MAP")));
assertThat(obj.getKsqlObject("STRUCT"), is(row.getKsqlObject("STRUCT")));
assertThat(obj.getKsqlObject("COMPLEX"), is(row.getKsqlObject("COMPLEX")));
assertThat(obj.containsKey("DEC"), is(true));
assertThat(obj.containsKey("notafield"), is(false));
assertThat(obj.toJsonString(), is((new JsonObject(obj.getMap())).toString()));
Expand Down Expand Up @@ -720,8 +754,18 @@ private static List<KsqlArray> convertToClientRows(final Multimap<String, Generi
final List<KsqlArray> expectedRows = new ArrayList<>();
for (final Map.Entry<String, GenericRow> entry : data.entries()) {
final KsqlArray expectedRow = new KsqlArray()
.add(entry.getKey())
.addAll(new KsqlArray(entry.getValue().values()));
.add(entry.getKey());
for (final Object value : entry.getValue().values()) {
if (value instanceof Struct) {
expectedRow.add(StructuredTypesDataProvider.structToMap((Struct) value));
} else if (value instanceof BigDecimal) {
// Can't use expectedRow.add((BigDecimal) value) directly since client serializes BigDecimal as string,
// whereas this method builds up the expected result (unrelated to serialization)
expectedRow.addAll(new KsqlArray(Collections.singletonList(value)));
} else {
expectedRow.add(value);
}
}
expectedRows.add(expectedRow);
}
return expectedRows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.SerdeOption;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;

public class StructuredTypesDataProvider extends TestDataProvider<String> {

Expand All @@ -37,23 +44,114 @@ public class StructuredTypesDataProvider extends TestDataProvider<String> {
.valueColumn(ColumnName.of("DEC"), SqlTypes.decimal(4, 2))
.valueColumn(ColumnName.of("ARRAY"), SqlTypes.array(SqlTypes.STRING))
.valueColumn(ColumnName.of("MAP"), SqlTypes.map(SqlTypes.STRING))
.valueColumn(ColumnName.of("STRUCT"), SqlTypes.struct().field("F1", SqlTypes.INTEGER).build())
.valueColumn(ColumnName.of("COMPLEX"), SqlTypes.struct()
.field("DECIMAL", SqlTypes.decimal(2, 1))
.field("STRUCT", SqlTypes.struct()
.field("F1", SqlTypes.STRING)
.field("F2", SqlTypes.INTEGER)
.build())
.field("ARRAY_ARRAY", SqlTypes.array(SqlTypes.array(SqlTypes.STRING)))
.field("ARRAY_STRUCT", SqlTypes.array(SqlTypes.struct().field("F1", SqlTypes.STRING).build()))
.field("ARRAY_MAP", SqlTypes.array(SqlTypes.map(SqlTypes.INTEGER)))
.field("MAP_ARRAY", SqlTypes.map(SqlTypes.array(SqlTypes.STRING)))
.field("MAP_MAP", SqlTypes.map(SqlTypes.map(SqlTypes.INTEGER)))
.field("MAP_STRUCT", SqlTypes.map(SqlTypes.struct().field("F1", SqlTypes.STRING).build()))
.build()
)
.build();

private static final PhysicalSchema PHYSICAL_SCHEMA = PhysicalSchema
.from(LOGICAL_SCHEMA, SerdeOption.none());

private static final Schema STRUCT_FIELD_SCHEMA = LOGICAL_SCHEMA.valueConnectSchema().field("STRUCT").schema();
private static final Schema COMPLEX_FIELD_SCHEMA = LOGICAL_SCHEMA.valueConnectSchema().field("COMPLEX").schema();

private static final Multimap<String, GenericRow> ROWS = ImmutableListMultimap
.<String, GenericRow>builder()
.put("FOO", genericRow(1L, new BigDecimal("1.11"), Collections.singletonList("a"), Collections.singletonMap("k1", "v1")))
.put("BAR", genericRow(2L, new BigDecimal("2.22"), Collections.emptyList(), Collections.emptyMap()))
.put("BAZ", genericRow(3L, new BigDecimal("30.33"), Collections.singletonList("b"), Collections.emptyMap()))
.put("BUZZ", genericRow(4L, new BigDecimal("40.44"), ImmutableList.of("c", "d"), Collections.emptyMap()))
.put("FOO", genericRow(1L, new BigDecimal("1.11"), Collections.singletonList("a"), Collections.singletonMap("k1", "v1"), generateStruct(2), generateComplexStruct(0)))
.put("BAR", genericRow(2L, new BigDecimal("2.22"), Collections.emptyList(), Collections.emptyMap(), generateStruct(3), generateComplexStruct(1)))
.put("BAZ", genericRow(3L, new BigDecimal("30.33"), Collections.singletonList("b"), Collections.emptyMap(), generateStruct(null), generateComplexStruct(2)))
.put("BUZZ", genericRow(4L, new BigDecimal("40.44"), ImmutableList.of("c", "d"), Collections.emptyMap(), generateStruct(88), generateComplexStruct(3)))
// Additional entries for repeated keys
.put("BAZ", genericRow(5L, new BigDecimal("12"), ImmutableList.of("e"), ImmutableMap.of("k1", "v1", "k2", "v2")))
.put("BUZZ", genericRow(6L, new BigDecimal("10.1"), ImmutableList.of("f", "g"), Collections.emptyMap()))
.put("BAZ", genericRow(5L, new BigDecimal("12"), ImmutableList.of("e"), ImmutableMap.of("k1", "v1", "k2", "v2"), generateStruct(0), generateComplexStruct(4)))
.put("BUZZ", genericRow(6L, new BigDecimal("10.1"), ImmutableList.of("f", "g"), Collections.emptyMap(), generateStruct(null), generateComplexStruct(5)))
.build();

public StructuredTypesDataProvider() {
super("STRUCTURED_TYPES", PHYSICAL_SCHEMA, ROWS);
}

@SuppressWarnings("unchecked")
public static Map<String, Object> structToMap(final Struct struct) {
return (Map<String, Object>) structToMapHelper(struct);
}

private static Object structToMapHelper(final Object value) {
if (value instanceof Struct) {
final Struct struct = (Struct) value;

final Map<String, Object> result = new HashMap<>();
for (final Field field : struct.schema().fields()) {
result.put(field.name(), structToMapHelper(struct.get(field)));
}

return result;
} else if (value instanceof List) {
final List<?> list = (List<?>) value;

final List<Object> result = new ArrayList<>();
for (final Object o : list) {
result.add(structToMapHelper(o));
}

return result;
} else if (value instanceof Map) {
final Map<?, ?> map = (Map<?, ?>) value;

final Map<String, Object> result = new HashMap<>();
for (final Map.Entry<?, ?> entry : map.entrySet()) {
result.put(entry.getKey().toString(), structToMapHelper(entry.getValue()));
}

return result;
} else {
return value;
}
}

private static Struct generateStruct(final Integer value) {
final Struct struct = new Struct(STRUCT_FIELD_SCHEMA);
struct.put("F1", value);
return struct;
}

private static Struct generateComplexStruct(final int i) {
final Struct complexStruct = new Struct(COMPLEX_FIELD_SCHEMA);

complexStruct.put("DECIMAL", new BigDecimal(i));

final Struct struct = new Struct(COMPLEX_FIELD_SCHEMA.field("STRUCT").schema());
struct.put("F1", "v" + i);
struct.put("F2", i);
complexStruct.put("STRUCT", struct);

complexStruct.put("ARRAY_ARRAY", ImmutableList.of(ImmutableList.of("foo")));

final Struct arrayStruct = new Struct(COMPLEX_FIELD_SCHEMA.field("ARRAY_STRUCT").schema().valueSchema());
arrayStruct.put("F1", "v" + i);
complexStruct.put("ARRAY_STRUCT", ImmutableList.of(arrayStruct));

complexStruct.put("ARRAY_MAP", ImmutableList.of(ImmutableMap.of("k1", i)));

complexStruct.put("MAP_ARRAY", ImmutableMap.of("k", ImmutableList.of("v" + i)));

complexStruct.put("MAP_MAP", ImmutableMap.of("k", ImmutableMap.of("k", i)));

final Struct mapStruct = new Struct(COMPLEX_FIELD_SCHEMA.field("MAP_STRUCT").schema().valueSchema());
mapStruct.put("F1", "v" + i);
complexStruct.put("MAP_STRUCT", ImmutableMap.of("k", mapStruct));

return complexStruct;
}
}
Loading