Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add CREATE SOURCE TABLE syntax and metadata info #7945

Merged
merged 5 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public CreateStreamCommand createStreamCommand(
);
}

// This method is called by CREATE_AS statements
public CreateTableCommand createTableCommand(final KsqlStructuredDataOutputNode outputNode) {
return new CreateTableCommand(
outputNode.getSinkName().get(),
Expand All @@ -142,10 +143,12 @@ public CreateTableCommand createTableCommand(final KsqlStructuredDataOutputNode
outputNode.getKsqlTopic().getKafkaTopicName(),
Formats.from(outputNode.getKsqlTopic()),
outputNode.getKsqlTopic().getKeyFormat().getWindowInfo(),
Optional.of(outputNode.getOrReplace())
Optional.of(outputNode.getOrReplace()),
Optional.of(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about define another isDerived here, rather than using false to indicate is not source?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter name is tricky. I initially wanted to use a Enum for future purposes like External, Temp, Source, etc types, but I found that those other types can be mixed with source too, like external source, temp source, or something. So I ended up using a boolean. IsDerived makes sense too. I think any of isDerived or isSource could work.

);
}

// This method is called by simple CREATE statements
public CreateTableCommand createTableCommand(
final CreateTable statement,
final KsqlConfig ksqlConfig
Expand Down Expand Up @@ -188,7 +191,8 @@ public CreateTableCommand createTableCommand(
topicName,
buildFormats(statement.getName(), schema, props, ksqlConfig),
getWindowInfo(props),
Optional.of(statement.isOrReplace())
Optional.of(statement.isOrReplace()),
Optional.of(statement.isSource())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,15 @@ public DdlCommandResult executeCreateTable(final CreateTableCommand createTable)
+ "already exists.",
sourceName, sourceType.toLowerCase()));
}

final KsqlTable<?> ksqlTable = new KsqlTable<>(
sql,
createTable.getSourceName(),
createTable.getSchema(),
createTable.getTimestampColumn(),
withQuery,
getKsqlTopic(createTable)
getKsqlTopic(createTable),
createTable.isSource()
);
metaStore.putSource(ksqlTable, createTable.isOrReplace());
metaStore.addSourceReferences(ksqlTable.getName(), withQuerySources);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,25 @@ public void shouldCreateCommandForCreateTable() {
TableElements.of(
tableElement(Namespace.VALUE, "COL1", new Type(SqlTypes.BIGINT)),
tableElement(Namespace.VALUE, "COL2", new Type(SqlTypes.STRING))),
false, true, withProperties);
false, true, withProperties, false);

// When:
final DdlCommand result = commandFactories
.create(sqlExpression, statement, SessionConfig.of(ksqlConfig, emptyMap()));

// Then:
assertThat(result, is(createTableCommand));
verify(createSourceFactory).createTableCommand(statement, ksqlConfig);
}

@Test
public void shouldCreateCommandForCreateSourceTable() {
// Given:
final CreateTable statement = new CreateTable(SOME_NAME,
TableElements.of(
tableElement(Namespace.VALUE, "COL1", new Type(SqlTypes.BIGINT)),
tableElement(Namespace.VALUE, "COL2", new Type(SqlTypes.STRING))),
false, true, withProperties, true);

// When:
final DdlCommand result = commandFactories
Expand All @@ -216,7 +234,7 @@ public void shouldCreateCommandForCreateTableWithOverriddenProperties() {
TableElements.of(
tableElement(Namespace.VALUE, "COL1", new Type(SqlTypes.BIGINT)),
tableElement(Namespace.VALUE, "COL2", new Type(SqlTypes.STRING))),
false, true, withProperties);
false, true, withProperties, false);

// When:
commandFactories.create(sqlExpression, statement, SessionConfig.of(ksqlConfig, OVERRIDES));
Expand Down Expand Up @@ -354,7 +372,8 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromOverridesNotConfi
);

final DdlStatement statement =
new CreateTable(SOME_NAME, ELEMENTS_WITH_PK, false, true, withProperties);
new CreateTable(SOME_NAME, ELEMENTS_WITH_PK,
false, true, withProperties, false);

// When:
final DdlCommand cmd = commandFactories
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public void shouldCreateCommandForCreateTable() {
TableElements.of(
tableElement(PRIMARY_KEY, "COL1", new Type(BIGINT)),
tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))),
false, true, withProperties);
false, true, withProperties, false);

// When:
final CreateTableCommand result = createSourceFactory
Expand All @@ -302,6 +302,26 @@ public void shouldCreateCommandForCreateTable() {
// Then:
assertThat(result.getSourceName(), is(SOME_NAME));
assertThat(result.getTopicName(), is(TOPIC_NAME));
assertThat(result.isSource(), is(false));
}

@Test
public void shouldCreateCommandForCreateSourceTable() {
// Given:
final CreateTable ddlStatement = new CreateTable(SOME_NAME,
TableElements.of(
tableElement(PRIMARY_KEY, "COL1", new Type(BIGINT)),
tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))),
false, true, withProperties, true);

// When:
final CreateTableCommand result = createSourceFactory
.createTableCommand(ddlStatement, ksqlConfig);

// Then:
assertThat(result.getSourceName(), is(SOME_NAME));
assertThat(result.getTopicName(), is(TOPIC_NAME));
assertThat(result.isSource(), is(true));
}

@Test
Expand Down Expand Up @@ -379,7 +399,8 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromPropertiesNotConf
givenProperty(CommonCreateConfigs.WRAP_SINGLE_VALUE, new BooleanLiteral("false"));

final CreateTable statement =
new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE, false, true, withProperties);
new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE,
false, true, withProperties, false);

// When:
final CreateTableCommand cmd = createSourceFactory
Expand All @@ -400,7 +421,8 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromConfig() {
));

final CreateTable statement =
new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE, false, true, withProperties);
new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE,
false, true, withProperties, false);

// When:
final CreateTableCommand cmd = createSourceFactory
Expand All @@ -415,7 +437,8 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromConfig() {
public void shouldCreateTableCommandWithSingleValueWrappingFromDefaultConfig() {
// Given:
final CreateTable statement =
new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE, false, true, withProperties);
new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE,
false, true, withProperties, false);

// When:
final CreateTableCommand cmd = createSourceFactory
Expand Down Expand Up @@ -446,7 +469,8 @@ public void shouldThrowOnNoElementsInCreateStream() {
public void shouldThrowOnNoElementsInCreateTable() {
// Given:
final CreateTable statement
= new CreateTable(SOME_NAME, TableElements.of(), false, true, withProperties);
= new CreateTable(SOME_NAME, TableElements.of(),
false, true, withProperties, false);

// When:
final Exception e = assertThrows(
Expand Down Expand Up @@ -475,7 +499,8 @@ public void shouldNotThrowWhenThereAreElementsInCreateStream() {
public void shouldNotThrowWhenThereAreElementsInCreateTable() {
// Given:
final CreateTable statement =
new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE, false, true, withProperties);
new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE,
false, true, withProperties, false);

// When:
createSourceFactory.createTableCommand(statement, ksqlConfig);
Expand Down Expand Up @@ -609,7 +634,8 @@ public void shouldBuildTimestampColumnForTable() {
new StringLiteral(quote(ELEMENT2.getName().text()))
);
final CreateTable statement =
new CreateTable(SOME_NAME, TABLE_ELEMENTS, false, true, withProperties);
new CreateTable(SOME_NAME, TABLE_ELEMENTS,
false, true, withProperties, false);

// When:
final CreateTableCommand cmd = createSourceFactory.createTableCommand(
Expand Down Expand Up @@ -731,7 +757,7 @@ public void shouldCreateValueSerdeToValidateValueFormatCanHandleValueSchema() {
// Given:
givenCommandFactoriesWithMocks();
final CreateTable statement = new CreateTable(SOME_NAME, TABLE_ELEMENTS, false, true,
withProperties);
withProperties, false);

when(valueSerdeFactory.create(
FormatInfo.of(JSON.name()),
Expand Down Expand Up @@ -1010,7 +1036,8 @@ public void shouldThrowIfTableIsMissingPrimaryKey() {
final TableElements noKey = TableElements.of(ELEMENT1);

final CreateTable statement =
new CreateTable(SOME_NAME, noKey, false, true, withProperties);
new CreateTable(SOME_NAME, noKey,
false, true, withProperties, false);

// When:
final Exception e = assertThrows(
Expand Down Expand Up @@ -1060,7 +1087,7 @@ public void shouldNotThrowOnCreateTableIfNotExistsIsSet() {
TableElements.of(
tableElement(PRIMARY_KEY, "COL1", new Type(BIGINT)),
tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))),
false, true, withProperties);
false, true, withProperties, false);

// When:
final CreateTableCommand result = createSourceFactory
Expand All @@ -1077,7 +1104,7 @@ public void shouldThrowIfTableExists() {
TableElements.of(
tableElement(PRIMARY_KEY, "COL1", new Type(BIGINT)),
tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))),
false, false, withProperties);
false, false, withProperties, false);

// When:
final Exception e = assertThrows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.confluent.ksql.metastore.MutableMetaStore;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.metastore.model.KsqlStream;
import io.confluent.ksql.metastore.model.KsqlTable;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.schema.ksql.Column;
Expand Down Expand Up @@ -257,12 +258,49 @@ public void shouldAddSinkTable() {
assertThat(metaStore.getSource(TABLE_NAME).isCasTarget(), is(true));
}

@Test
public void shouldAddNormalTableWhenNoTypeIsSpecified() {
// Given:
final CreateTableCommand cmd = buildCreateTable(
SourceName.of("t1"),
false,
null
);

// When:
cmdExec.execute(SQL_TEXT, cmd, true, NO_QUERY_SOURCES);

// Then:
final KsqlTable ksqlTable = (KsqlTable) metaStore.getSource(SourceName.of("t1"));
assertThat(ksqlTable.isSource(), is(false));
}

@Test
public void shouldAddSourceTable() {
// Given:
final CreateTableCommand cmd = buildCreateTable(
SourceName.of("t1"),
false,
true
);

// When:
cmdExec.execute(SQL_TEXT, cmd, true, NO_QUERY_SOURCES);

// Then:
final KsqlTable ksqlTable = (KsqlTable) metaStore.getSource(SourceName.of("t1"));
assertThat(ksqlTable.isSource(), is(true));
}

@Test
public void shouldThrowOnDropTableWhenConstraintExist() {
// Given:
final CreateTableCommand table1 = buildCreateTable(SourceName.of("t1"), false);
final CreateTableCommand table2 = buildCreateTable(SourceName.of("t2"), false);
final CreateTableCommand table3 = buildCreateTable(SourceName.of("t3"), false);
final CreateTableCommand table1 = buildCreateTable(SourceName.of("t1"),
false, false);
final CreateTableCommand table2 = buildCreateTable(SourceName.of("t2"),
false, false);
final CreateTableCommand table3 = buildCreateTable(SourceName.of("t3"),
false, false);
cmdExec.execute(SQL_TEXT, table1, true, Collections.emptySet());
cmdExec.execute(SQL_TEXT, table2, true, Collections.singleton(SourceName.of("t1")));
cmdExec.execute(SQL_TEXT, table3, true, Collections.singleton(SourceName.of("t1")));
Expand Down Expand Up @@ -470,7 +508,7 @@ public void shouldWarnAddDuplicateTableWithoutReplace() {
cmdExec.execute(SQL_TEXT, createTable, false, NO_QUERY_SOURCES);

// When:
givenCreateTable(false);
givenCreateTable();
final DdlCommandResult result =cmdExec.execute(SQL_TEXT, createTable,
false, NO_QUERY_SOURCES);

Expand Down Expand Up @@ -546,21 +584,19 @@ private void givenCreateWindowedTable() {
SerdeFeatures.of()
),
Optional.of(windowInfo),
Optional.of(false),
Optional.of(false)
);
}

private void givenCreateTable() {
createTable = buildCreateTable(TABLE_NAME, false);
}

private void givenCreateTable(final boolean allowReplace) {
createTable = buildCreateTable(TABLE_NAME, allowReplace);
createTable = buildCreateTable(TABLE_NAME, false, false);
}

private CreateTableCommand buildCreateTable(
final SourceName sourceName,
final boolean allowReplace
final boolean allowReplace,
final Boolean isSource
) {
return new CreateTableCommand(
sourceName,
Expand All @@ -574,7 +610,8 @@ private CreateTableCommand buildCreateTable(
SerdeFeatures.of()
),
Optional.empty(),
Optional.of(allowReplace)
Optional.of(allowReplace),
Optional.ofNullable(isSource)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,8 @@ public void shouldRewriteCreateTable() {
TableElements.of(tableElement1, tableElement2),
false,
false,
sourceProperties
sourceProperties,
false
);
when(mockRewriter.apply(tableElement1, context)).thenReturn(rewrittenTableElement1);
when(mockRewriter.apply(tableElement2, context)).thenReturn(rewrittenTableElement2);
Expand All @@ -677,7 +678,8 @@ public void shouldRewriteCreateTable() {
TableElements.of(rewrittenTableElement1, rewrittenTableElement2),
false,
false,
sourceProperties
sourceProperties,
false
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ public void shouldBuildSchemaKTableWhenKTableSource() {
"topic2",
KeyFormat.nonWindowed(FormatInfo.of(FormatFactory.KAFKA.name()), SerdeFeatures.of()),
ValueFormat.of(FormatInfo.of(FormatFactory.JSON.name()), SerdeFeatures.of())
)
),
false
);

node = new DataSourceNode(
Expand Down
Loading