Skip to content

Commit

Permalink
chore: remove ROWKEY from datagen (#4934)
Browse files Browse the repository at this point in the history
This commit removes any use of `ROWKEY` from the DataGen app. This is being done as part of the work to allow any key column names, (#3536).

This is a compatible change as the key column name is not persisted in the data sent to Kafka. Hence the output doesn't change.

Co-authored-by: Andy Coates <[email protected]>
  • Loading branch information
big-andy-coates and big-andy-coates authored Apr 6, 2020
1 parent 5be677d commit 3628e23
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ public final class LogicalSchema {
private static final Column IMPLICIT_TIME_COLUMN = Column
.of(SchemaUtil.ROWTIME_NAME, SqlTypes.BIGINT, Column.Namespace.META, 0);

private static final Column IMPLICIT_KEY_COLUMN = Column
.of(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING, Column.Namespace.KEY, 0);

private final ImmutableList<Column> columns;

public static Builder builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.confluent.ksql.schema.ksql.SchemaConverters;
import io.confluent.ksql.schema.ksql.SchemaConverters.ConnectToSqlTypeConverter;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.SchemaUtil;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
Expand All @@ -43,21 +42,21 @@

public class RowGenerator {

private static final ColumnName KEY_COL_NAME = ColumnName.of("Key");

private final Set<String> allTokens = new HashSet<>();
private final Map<String, Integer> sessionMap = new HashMap<>();
private final Set<Integer> allocatedIds = new HashSet<>();
private final Generator generator;
private final AvroData avroData;
private final SessionManager sessionManager = new SessionManager();
private final String keyFieldName;
private final ConnectSchema keySchema;
private final ConnectSchema valueSchema;
private final int keyFieldIndex;

public RowGenerator(final Generator generator, final String keyFieldName) {
this.generator = Objects.requireNonNull(generator, "generator");
this.avroData = new AvroData(1);
this.keyFieldName = Objects.requireNonNull(keyFieldName, "keyFieldName");
final LogicalSchema ksqlSchema = buildLogicalSchema(generator, avroData, keyFieldName);
this.keySchema = ksqlSchema.keyConnectSchema();
this.valueSchema = ksqlSchema.valueConnectSchema();
Expand Down Expand Up @@ -142,7 +141,7 @@ public Pair<Struct, GenericRow> generateRow() {
}

final Struct keyStruct = new Struct(keySchema);
keyStruct.put(SchemaUtil.ROWKEY_NAME.text(), row.get(keyFieldIndex));
keyStruct.put(KEY_COL_NAME.text(), row.get(keyFieldIndex));

return Pair.of(keyStruct, row);
}
Expand Down Expand Up @@ -234,8 +233,8 @@ private int mapSessionValueToSibling(final String sessionisationValue, final Sch
if (!sessionMap.containsKey(sessionisationValue)) {

final LinkedHashMap<?, ?> properties =
(LinkedHashMap) field.schema().getObjectProps().get("arg.properties");
final Integer max = (Integer) ((LinkedHashMap) properties.get("range")).get("max");
(LinkedHashMap<?, ?>) field.schema().getObjectProps().get("arg.properties");
final Integer max = (Integer) ((LinkedHashMap<?, ?>) properties.get("range")).get("max");

int vvalue = Math.abs(sessionisationValue.hashCode() % max);

Expand Down Expand Up @@ -283,7 +282,7 @@ private static LogicalSchema buildLogicalSchema(
final ConnectToSqlTypeConverter converter = SchemaConverters.connectToSqlConverter();

schemaBuilder
.keyColumn(SchemaUtil.ROWKEY_NAME, converter.toSqlType(keyField.schema()));
.keyColumn(KEY_COL_NAME, converter.toSqlType(keyField.schema()));

connectSchema.fields()
.forEach(f -> schemaBuilder.valueColumn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void shouldGenerateCorrectRow() throws IOException {

final Struct key = rowPair.getLeft();
assertThat(key, is(notNullValue()));
assertThat(key.get("ROWKEY"), is(instanceOf(Integer.class)));
assertThat(key.get("Key"), is(instanceOf(Integer.class)));

assertThat(rowPair.getRight().values(), hasSize(5));
assertThat(rowPair.getRight().get(4), instanceOf(Struct.class));
Expand All @@ -67,8 +67,8 @@ public void shouldGenerateCorrectKey() throws IOException {
final Struct key = rowPair.getLeft();
final GenericRow value = rowPair.getRight();
assertThat(key, is(notNullValue()));
assertThat(key.get("ROWKEY"), is(instanceOf(Long.class)));
assertThat(key.get("Key"), is(instanceOf(Long.class)));

assertThat("must match copy of key in value", key.get("ROWKEY"), is(value.get(0)));
assertThat("must match copy of key in value", key.get("Key"), is(value.get(0)));
}
}

0 comments on commit 3628e23

Please sign in to comment.