Skip to content

Commit

Permalink
chore: fix conflicts for IF NOT EXISTS (#8214)
Browse files Browse the repository at this point in the history
* chore: fix conflicts
  • Loading branch information
wcarlson5 authored Oct 4, 2021
1 parent eb521a4 commit e23b75c
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.tree.CreateStream;
import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.CreateTableAsSelect;
import io.confluent.ksql.parser.tree.InsertInto;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.WarningEntity;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.services.ServiceContext;
Expand Down Expand Up @@ -91,6 +97,44 @@ public DistributingExecutor(
Objects.requireNonNull(commandRunnerWarning, "commandRunnerWarning");
}

// CHECKSTYLE_RULES.OFF: CyclomaticComplexity
private Optional<KsqlEntity> checkIfNotExistsResponse(
final KsqlExecutionContext executionContext,
final ConfiguredStatement<?> statement
) {
SourceName sourceName = null;
String type = "";
if (statement.getStatement() instanceof CreateStream
&& ((CreateStream) statement.getStatement()).isNotExists()) {
type = "stream";
sourceName = ((CreateStream) statement.getStatement()).getName();
} else if (statement.getStatement() instanceof CreateTable
&& ((CreateTable) statement.getStatement()).isNotExists()) {
type = "table";
sourceName = ((CreateTable) statement.getStatement()).getName();
} else if (statement.getStatement() instanceof CreateTableAsSelect
&& ((CreateTableAsSelect) statement.getStatement()).isNotExists()) {
type = "table";
sourceName = ((CreateTableAsSelect) statement.getStatement()).getName();
} else if (statement.getStatement() instanceof CreateStreamAsSelect
&& ((CreateStreamAsSelect) statement.getStatement()).isNotExists()) {
type = "stream";
sourceName = ((CreateStreamAsSelect) statement.getStatement()).getName();
}
if (sourceName != null
&& executionContext.getMetaStore().getSource(sourceName) != null) {
return Optional.of(
new WarningEntity(statement.getStatementText(),
String.format("Cannot add %s %s: A %s with the same name already exists.",
type,
sourceName,
type)
));
} else {
return Optional.empty();
}
}

/**
* The transactional protocol for sending a command to the command topic is to
* initTransaction(), beginTransaction(), wait for commandRunner to finish processing all previous
Expand All @@ -101,6 +145,7 @@ public DistributingExecutor(
* If a new transactional producer is initialized while the current transaction is incomplete,
* the old producer will be fenced off and unable to continue with its transaction.
*/
// CHECKSTYLE_RULES.OFF: NPathComplexity
public Optional<KsqlEntity> execute(
final ConfiguredStatement<? extends Statement> statement,
final KsqlExecutionContext executionContext,
Expand All @@ -123,6 +168,15 @@ public Optional<KsqlEntity> execute(
);
}

final Optional<KsqlEntity> response = checkIfNotExistsResponse(
executionContext,
statement
);

if (response.isPresent()) {
return response;
}

checkAuthorization(injected, securityContext, executionContext);

final Producer<CommandId, Command> transactionalProducer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatus.Status;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.WarningEntity;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.services.SandboxedServiceContext;
Expand Down Expand Up @@ -434,4 +436,33 @@ public void shouldAbortOnError_Exception() {
// Then:
verify(queue).abortCommand(IDGEN.getCommandId(CONFIGURED_STATEMENT.getStatement()));
}

@Test
public void shouldNotEnqueueRedundantIfNotExists() {
// Given:
final PreparedStatement<Statement> preparedStatement =
PreparedStatement.of("", new CreateStream(
Optional.empty(),
SourceName.of("TEST"),
TableElements.of(),
false,
true,
CreateSourceProperties.from(ImmutableMap.of(
CommonCreateConfigs.KAFKA_TOPIC_NAME_PROPERTY, new StringLiteral("topic"),
CommonCreateConfigs.VALUE_FORMAT_PROPERTY, new StringLiteral("json")
))
));
final ConfiguredStatement<Statement> configured =
ConfiguredStatement.of(preparedStatement, SessionConfig.of(KSQL_CONFIG, ImmutableMap.of())
);
final DataSource dataSource = mock(DataSource.class);
doReturn(dataSource).when(metaStore).getSource(SourceName.of("TEST"));

// When:
final Optional<KsqlEntity> response = distributor.execute(configured, executionContext, securityContext);

// Then:
assertThat("Should be present", response.isPresent());
assertThat(((WarningEntity) response.get()).getMessage(), containsString(""));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,16 @@ public void shouldRecoverInsertIntosRecreates() {
shouldRecover(commands);
}

@Test
public void shouldRecoverIfNotExists() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM IF NOT EXISTS B AS SELECT * FROM A;",
"CREATE STREAM IF NOT EXISTS B AS SELECT * FROM A;"
);
shouldRecover(commands);
}

@Test
public void shouldRecoverTerminates() {
server1.submitCommands(
Expand Down

0 comments on commit e23b75c

Please sign in to comment.