diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/TableRowsEntityFactory.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/TableRowsEntityFactory.java index 46aa68669ef4..e8cd0beb1c7f 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/TableRowsEntityFactory.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/TableRowsEntityFactory.java @@ -16,7 +16,6 @@ package io.confluent.ksql.rest.entity; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableList.Builder; import io.confluent.ksql.execution.streams.materialization.TableRow; import io.confluent.ksql.model.WindowType; import io.confluent.ksql.name.ColumnName; @@ -24,6 +23,7 @@ import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; import java.time.Instant; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -79,18 +79,18 @@ private static LogicalSchema addWindowFieldsIntoSchema( } private static List createRow(final TableRow row) { - final Builder builder = ImmutableList.builder(); + final List rowList = new ArrayList<>(); - keyFields(row.key()).forEach(builder::add); + keyFields(row.key()).forEach(rowList::add); row.window().ifPresent(window -> { - builder.add(window.start().toEpochMilli()); - window.end().map(Instant::toEpochMilli).ifPresent(builder::add); + rowList.add(window.start().toEpochMilli()); + window.end().map(Instant::toEpochMilli).ifPresent(rowList::add); }); - builder.addAll(row.value().getColumns()); + rowList.addAll(row.value().getColumns()); - return builder.build(); + return rowList; } private static Stream keyFields(final Struct key) { diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/TableRowsEntityFactoryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/TableRowsEntityFactoryTest.java index 7bb38e8ebad2..764b4ec06de0 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/TableRowsEntityFactoryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/TableRowsEntityFactoryTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.sameInstance; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.streams.materialization.Row; import io.confluent.ksql.execution.streams.materialization.TableRow; @@ -33,6 +34,7 @@ import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; import java.time.Instant; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import org.junit.Test; @@ -50,6 +52,14 @@ public class TableRowsEntityFactoryTest { .valueColumn(ColumnName.of("v1"), SqlTypes.BOOLEAN) .build(); + private static final LogicalSchema SCHEMA_NULL = LogicalSchema.builder() + .keyColumn(ColumnName.of("k0"), SqlTypes.STRING) + .valueColumn(ColumnName.of("v0"), SqlTypes.STRING) + .valueColumn(ColumnName.of("v1"), SqlTypes.INTEGER) + .valueColumn(ColumnName.of("v2"), SqlTypes.DOUBLE) + .valueColumn(ColumnName.of("v3"), SqlTypes.BOOLEAN) + .build(); + @Test public void shouldAddNonWindowedRowToValues() { // Given: @@ -100,6 +110,27 @@ public void shouldAddWindowedRowToValues() { assertThat(output.get(1), contains("y", now.toEpochMilli(), now.toEpochMilli(), false)); } + @Test + public void shouldSupportNullColumns() { + // Given: + final List newColumns = new ArrayList<>(); + newColumns.add(null); + newColumns.add(null); + newColumns.add(null); + newColumns.add(null); + GenericRow row = new GenericRow(newColumns); + + final Builder builder = ImmutableList.builder(); + builder.add(Row.of(SCHEMA_NULL, StructKeyUtil.asStructKey("k"), row)); + + // When: + final List> output = TableRowsEntityFactory.createRows(builder.build()); + + // Then: + assertThat(output, hasSize(1)); + assertThat(output.get(0), contains("k", null, null, null, null)); + } + @Test public void shouldReturnSameSchemaIfNotWindowed() { // When: