Skip to content

Commit

Permalink
Replace Type.SOURCE in CreateTable with a Boolean parameter instead
Browse files Browse the repository at this point in the history
  • Loading branch information
ConfluentJenkins committed Aug 10, 2021
1 parent 76789ec commit 99871d7
Show file tree
Hide file tree
Showing 27 changed files with 111 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public CreateTableCommand createTableCommand(final KsqlStructuredDataOutputNode
Formats.from(outputNode.getKsqlTopic()),
outputNode.getKsqlTopic().getKeyFormat().getWindowInfo(),
Optional.of(outputNode.getOrReplace()),
Optional.of(CreateTable.Type.NORMAL.name())
Optional.of(false)
);
}

Expand Down Expand Up @@ -192,7 +192,7 @@ public CreateTableCommand createTableCommand(
buildFormats(statement.getName(), schema, props, ksqlConfig),
getWindowInfo(props),
Optional.of(statement.isOrReplace()),
Optional.of(statement.getType().name())
Optional.of(statement.isSource())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package io.confluent.ksql.ddl.commands;

import com.google.common.base.Enums;
import io.confluent.ksql.execution.ddl.commands.AlterSourceCommand;
import io.confluent.ksql.execution.ddl.commands.CreateSourceCommand;
import io.confluent.ksql.execution.ddl.commands.CreateStreamCommand;
Expand All @@ -33,7 +32,6 @@
import io.confluent.ksql.metastore.model.KsqlStream;
import io.confluent.ksql.metastore.model.KsqlTable;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.serde.KeyFormat;
Expand Down Expand Up @@ -121,19 +119,14 @@ public DdlCommandResult executeCreateTable(final CreateTableCommand createTable)
sourceName, sourceType.toLowerCase()));
}

final CreateTable.Type tableType = createTable.getType()
.map(typeStr -> Enums.getIfPresent(CreateTable.Type.class, typeStr).orNull())
.orElse(CreateTable.Type.NORMAL);

final KsqlTable<?> ksqlTable = new KsqlTable<>(
sql,
createTable.getSourceName(),
createTable.getSchema(),
createTable.getTimestampColumn(),
withQuery,
getKsqlTopic(createTable),
tableType == CreateTable.Type.SOURCE,
tableType == CreateTable.Type.SOURCE
createTable.isSource()
);
metaStore.putSource(ksqlTable, createTable.isOrReplace());
metaStore.addSourceReferences(ksqlTable.getName(), withQuerySources);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public void shouldCreateCommandForCreateTable() {
TableElements.of(
tableElement(Namespace.VALUE, "COL1", new Type(SqlTypes.BIGINT)),
tableElement(Namespace.VALUE, "COL2", new Type(SqlTypes.STRING))),
false, true, withProperties, CreateTable.Type.NORMAL);
false, true, withProperties, false);

// When:
final DdlCommand result = commandFactories
Expand All @@ -216,7 +216,7 @@ public void shouldCreateCommandForCreateSourceTable() {
TableElements.of(
tableElement(Namespace.VALUE, "COL1", new Type(SqlTypes.BIGINT)),
tableElement(Namespace.VALUE, "COL2", new Type(SqlTypes.STRING))),
false, true, withProperties, CreateTable.Type.SOURCE);
false, true, withProperties, true);

// When:
final DdlCommand result = commandFactories
Expand All @@ -234,7 +234,7 @@ public void shouldCreateCommandForCreateTableWithOverriddenProperties() {
TableElements.of(
tableElement(Namespace.VALUE, "COL1", new Type(SqlTypes.BIGINT)),
tableElement(Namespace.VALUE, "COL2", new Type(SqlTypes.STRING))),
false, true, withProperties, CreateTable.Type.NORMAL);
false, true, withProperties, false);

// When:
commandFactories.create(sqlExpression, statement, SessionConfig.of(ksqlConfig, OVERRIDES));
Expand Down Expand Up @@ -373,7 +373,7 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromOverridesNotConfi

final DdlStatement statement =
new CreateTable(SOME_NAME, ELEMENTS_WITH_PK,
false, true, withProperties, CreateTable.Type.NORMAL);
false, true, withProperties, false);

// When:
final DdlCommand cmd = commandFactories
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public void shouldCreateCommandForCreateTable() {
TableElements.of(
tableElement(PRIMARY_KEY, "COL1", new Type(BIGINT)),
tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))),
false, true, withProperties, CreateTable.Type.NORMAL);
false, true, withProperties, false);

// When:
final CreateTableCommand result = createSourceFactory
Expand All @@ -302,6 +302,26 @@ public void shouldCreateCommandForCreateTable() {
// Then:
assertThat(result.getSourceName(), is(SOME_NAME));
assertThat(result.getTopicName(), is(TOPIC_NAME));
assertThat(result.isSource(), is(false));
}

@Test
public void shouldCreateCommandForCreateSourceTable() {
// Given:
final CreateTable ddlStatement = new CreateTable(SOME_NAME,
TableElements.of(
tableElement(PRIMARY_KEY, "COL1", new Type(BIGINT)),
tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))),
false, true, withProperties, true);

// When:
final CreateTableCommand result = createSourceFactory
.createTableCommand(ddlStatement, ksqlConfig);

// Then:
assertThat(result.getSourceName(), is(SOME_NAME));
assertThat(result.getTopicName(), is(TOPIC_NAME));
assertThat(result.isSource(), is(true));
}

@Test
Expand Down Expand Up @@ -380,7 +400,7 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromPropertiesNotConf

final CreateTable statement =
new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE,
false, true, withProperties, CreateTable.Type.NORMAL);
false, true, withProperties, false);

// When:
final CreateTableCommand cmd = createSourceFactory
Expand All @@ -402,7 +422,7 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromConfig() {

final CreateTable statement =
new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE,
false, true, withProperties, CreateTable.Type.NORMAL);
false, true, withProperties, false);

// When:
final CreateTableCommand cmd = createSourceFactory
Expand All @@ -418,7 +438,7 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromDefaultConfig() {
// Given:
final CreateTable statement =
new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE,
false, true, withProperties, CreateTable.Type.NORMAL);
false, true, withProperties, false);

// When:
final CreateTableCommand cmd = createSourceFactory
Expand Down Expand Up @@ -450,7 +470,7 @@ public void shouldThrowOnNoElementsInCreateTable() {
// Given:
final CreateTable statement
= new CreateTable(SOME_NAME, TableElements.of(),
false, true, withProperties, CreateTable.Type.NORMAL);
false, true, withProperties, false);

// When:
final Exception e = assertThrows(
Expand Down Expand Up @@ -480,7 +500,7 @@ public void shouldNotThrowWhenThereAreElementsInCreateTable() {
// Given:
final CreateTable statement =
new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE,
false, true, withProperties, CreateTable.Type.NORMAL);
false, true, withProperties, false);

// When:
createSourceFactory.createTableCommand(statement, ksqlConfig);
Expand Down Expand Up @@ -615,7 +635,7 @@ public void shouldBuildTimestampColumnForTable() {
);
final CreateTable statement =
new CreateTable(SOME_NAME, TABLE_ELEMENTS,
false, true, withProperties, CreateTable.Type.NORMAL);
false, true, withProperties, false);

// When:
final CreateTableCommand cmd = createSourceFactory.createTableCommand(
Expand Down Expand Up @@ -737,7 +757,7 @@ public void shouldCreateValueSerdeToValidateValueFormatCanHandleValueSchema() {
// Given:
givenCommandFactoriesWithMocks();
final CreateTable statement = new CreateTable(SOME_NAME, TABLE_ELEMENTS, false, true,
withProperties, CreateTable.Type.NORMAL);
withProperties, false);

when(valueSerdeFactory.create(
FormatInfo.of(JSON.name()),
Expand Down Expand Up @@ -1017,7 +1037,7 @@ public void shouldThrowIfTableIsMissingPrimaryKey() {

final CreateTable statement =
new CreateTable(SOME_NAME, noKey,
false, true, withProperties, CreateTable.Type.NORMAL);
false, true, withProperties, false);

// When:
final Exception e = assertThrows(
Expand Down Expand Up @@ -1067,7 +1087,7 @@ public void shouldNotThrowOnCreateTableIfNotExistsIsSet() {
TableElements.of(
tableElement(PRIMARY_KEY, "COL1", new Type(BIGINT)),
tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))),
false, true, withProperties, CreateTable.Type.NORMAL);
false, true, withProperties, false);

// When:
final CreateTableCommand result = createSourceFactory
Expand All @@ -1084,7 +1104,7 @@ public void shouldThrowIfTableExists() {
TableElements.of(
tableElement(PRIMARY_KEY, "COL1", new Type(BIGINT)),
tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))),
false, false, withProperties, CreateTable.Type.NORMAL);
false, false, withProperties, false);

// When:
final Exception e = assertThrows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.confluent.ksql.metastore.model.KsqlTable;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlType;
Expand Down Expand Up @@ -273,8 +272,7 @@ public void shouldAddNormalTableWhenNoTypeIsSpecified() {

// Then:
final KsqlTable ksqlTable = (KsqlTable) metaStore.getSource(SourceName.of("t1"));
assertThat(ksqlTable.isReadOnly(), is(false));
assertThat(ksqlTable.isMaterialized(), is(false));
assertThat(ksqlTable.isSource(), is(false));
}

@Test
Expand All @@ -283,27 +281,26 @@ public void shouldAddSourceTable() {
final CreateTableCommand cmd = buildCreateTable(
SourceName.of("t1"),
false,
CreateTable.Type.SOURCE.name()
true
);

// When:
cmdExec.execute(SQL_TEXT, cmd, true, NO_QUERY_SOURCES);

// Then:
final KsqlTable ksqlTable = (KsqlTable) metaStore.getSource(SourceName.of("t1"));
assertThat(ksqlTable.isReadOnly(), is(true));
assertThat(ksqlTable.isMaterialized(), is(true));
assertThat(ksqlTable.isSource(), is(true));
}

@Test
public void shouldThrowOnDropTableWhenConstraintExist() {
// Given:
final CreateTableCommand table1 = buildCreateTable(SourceName.of("t1"),
false, CreateTable.Type.NORMAL.name());
false, false);
final CreateTableCommand table2 = buildCreateTable(SourceName.of("t2"),
false, CreateTable.Type.NORMAL.name());
false, false);
final CreateTableCommand table3 = buildCreateTable(SourceName.of("t3"),
false, CreateTable.Type.NORMAL.name());
false, false);
cmdExec.execute(SQL_TEXT, table1, true, Collections.emptySet());
cmdExec.execute(SQL_TEXT, table2, true, Collections.singleton(SourceName.of("t1")));
cmdExec.execute(SQL_TEXT, table3, true, Collections.singleton(SourceName.of("t1")));
Expand Down Expand Up @@ -588,18 +585,18 @@ private void givenCreateWindowedTable() {
),
Optional.of(windowInfo),
Optional.of(false),
Optional.of(CreateTable.Type.NORMAL.name())
Optional.of(false)
);
}

private void givenCreateTable() {
createTable = buildCreateTable(TABLE_NAME, false, CreateTable.Type.NORMAL.name());
createTable = buildCreateTable(TABLE_NAME, false, false);
}

private CreateTableCommand buildCreateTable(
final SourceName sourceName,
final boolean allowReplace,
final String type
final Boolean isSource
) {
return new CreateTableCommand(
sourceName,
Expand All @@ -614,7 +611,7 @@ private CreateTableCommand buildCreateTable(
),
Optional.empty(),
Optional.of(allowReplace),
Optional.ofNullable(type)
Optional.ofNullable(isSource)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ public void shouldRewriteCreateTable() {
false,
false,
sourceProperties,
CreateTable.Type.NORMAL
false
);
when(mockRewriter.apply(tableElement1, context)).thenReturn(rewrittenTableElement1);
when(mockRewriter.apply(tableElement2, context)).thenReturn(rewrittenTableElement2);
Expand All @@ -679,7 +679,7 @@ public void shouldRewriteCreateTable() {
false,
false,
sourceProperties,
CreateTable.Type.NORMAL
false
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ public void shouldBuildSchemaKTableWhenKTableSource() {
KeyFormat.nonWindowed(FormatInfo.of(FormatFactory.KAFKA.name()), SerdeFeatures.of()),
ValueFormat.of(FormatInfo.of(FormatFactory.JSON.name()), SerdeFeatures.of())
),
false,
false
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
@JsonIgnoreProperties({"keyField"}) // Removed at version 0.10
@Immutable
public class CreateTableCommand extends CreateSourceCommand {
private final Optional<String> type;
private final Optional<Boolean> isSource;

public CreateTableCommand(
@JsonProperty(value = "sourceName", required = true) final SourceName sourceName,
Expand All @@ -38,7 +38,7 @@ public CreateTableCommand(
@JsonProperty(value = "formats", required = true) final Formats formats,
@JsonProperty(value = "windowInfo") final Optional<WindowInfo> windowInfo,
@JsonProperty(value = "orReplace", defaultValue = "false") final Optional<Boolean> orReplace,
@JsonProperty(value = "type", defaultValue = "NORMAL") final Optional<String> type
@JsonProperty(value = "isSource", defaultValue = "false") final Optional<Boolean> isSource
) {
super(
sourceName,
Expand All @@ -54,11 +54,14 @@ public CreateTableCommand(
throw new UnsupportedOperationException("Tables require key columns");
}

this.type = type;
this.isSource = isSource;
}

public Optional<String> getType() {
return type;
// This can be in CreateSourceCommand, but it fails deserializing the JSON property when
// loading a CreateStreamCommand because it is not supported there yet. We should move this
// source variable and method to the CreateSourceCommand after supporting source streams.
public Boolean isSource() {
return isSource.orElse(false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,5 @@ public String getKsqlType() {
/**
* @return returns true if this source is read-only
*/
boolean isReadOnly();
boolean isSource();
}
Loading

0 comments on commit 99871d7

Please sign in to comment.