Skip to content

Commit

Permalink
fix: Don't throw NPE on null columns (#3647)
Browse files Browse the repository at this point in the history
* Fix bug #3617
  • Loading branch information
vpapavas authored Nov 3, 2019
1 parent 7c6076c commit 6969768
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
package io.confluent.ksql.rest.entity;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
import io.confluent.ksql.execution.streams.materialization.TableRow;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -79,18 +79,18 @@ private static LogicalSchema addWindowFieldsIntoSchema(
}

private static List<?> createRow(final TableRow row) {
final Builder<Object> builder = ImmutableList.builder();
final List<Object> rowList = new ArrayList<>();

keyFields(row.key()).forEach(builder::add);
keyFields(row.key()).forEach(rowList::add);

row.window().ifPresent(window -> {
builder.add(window.start().toEpochMilli());
window.end().map(Instant::toEpochMilli).ifPresent(builder::add);
rowList.add(window.start().toEpochMilli());
window.end().map(Instant::toEpochMilli).ifPresent(rowList::add);
});

builder.addAll(row.value().getColumns());
rowList.addAll(row.value().getColumns());

return builder.build();
return rowList;
}

private static Stream<?> keyFields(final Struct key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.hamcrest.Matchers.sameInstance;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.streams.materialization.Row;
import io.confluent.ksql.execution.streams.materialization.TableRow;
Expand All @@ -33,6 +34,7 @@
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.junit.Test;
Expand All @@ -50,6 +52,14 @@ public class TableRowsEntityFactoryTest {
.valueColumn(ColumnName.of("v1"), SqlTypes.BOOLEAN)
.build();

private static final LogicalSchema SCHEMA_NULL = LogicalSchema.builder()
.keyColumn(ColumnName.of("k0"), SqlTypes.STRING)
.valueColumn(ColumnName.of("v0"), SqlTypes.STRING)
.valueColumn(ColumnName.of("v1"), SqlTypes.INTEGER)
.valueColumn(ColumnName.of("v2"), SqlTypes.DOUBLE)
.valueColumn(ColumnName.of("v3"), SqlTypes.BOOLEAN)
.build();

@Test
public void shouldAddNonWindowedRowToValues() {
// Given:
Expand Down Expand Up @@ -100,6 +110,27 @@ public void shouldAddWindowedRowToValues() {
assertThat(output.get(1), contains("y", now.toEpochMilli(), now.toEpochMilli(), false));
}

@Test
public void shouldSupportNullColumns() {
// Given:
final List<Object> newColumns = new ArrayList<>();
newColumns.add(null);
newColumns.add(null);
newColumns.add(null);
newColumns.add(null);
GenericRow row = new GenericRow(newColumns);

final Builder<Row> builder = ImmutableList.builder();
builder.add(Row.of(SCHEMA_NULL, StructKeyUtil.asStructKey("k"), row));

// When:
final List<List<?>> output = TableRowsEntityFactory.createRows(builder.build());

// Then:
assertThat(output, hasSize(1));
assertThat(output.get(0), contains("k", null, null, null, null));
}

@Test
public void shouldReturnSameSchemaIfNotWindowed() {
// When:
Expand Down

0 comments on commit 6969768

Please sign in to comment.