Skip to content

Commit

Permalink
feat: terminate persistent query on DROP command
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Sep 2, 2020
1 parent 0b8c70a commit 0d21017
Show file tree
Hide file tree
Showing 10 changed files with 397 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

package io.confluent.ksql.ddl.commands;

import static java.util.Objects.requireNonNull;

import io.confluent.ksql.engine.EngineContext;
import io.confluent.ksql.execution.ddl.commands.CreateSourceCommand;
import io.confluent.ksql.execution.ddl.commands.CreateStreamCommand;
import io.confluent.ksql.execution.ddl.commands.CreateTableCommand;
Expand All @@ -29,20 +32,23 @@
import io.confluent.ksql.metastore.model.KsqlStream;
import io.confluent.ksql.metastore.model.KsqlTable;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.ValueFormat;
import java.util.Objects;
import io.confluent.ksql.util.QueryMetadata;

/**
* Execute DDL Commands
*/
public class DdlCommandExec {

private final MutableMetaStore metaStore;
private final EngineContext engineContext;

public DdlCommandExec(final MutableMetaStore metaStore) {
this.metaStore = metaStore;
public DdlCommandExec(final MutableMetaStore metaStore, final EngineContext engineContext) {
this.metaStore = requireNonNull(metaStore, "metaStore");
this.engineContext = requireNonNull(engineContext, "engineContext");
}

/**
Expand All @@ -60,7 +66,7 @@ private final class Executor implements io.confluent.ksql.execution.ddl.commands
private final boolean withQuery;

private Executor(final String sql, final boolean withQuery) {
this.sql = Objects.requireNonNull(sql, "sql");
this.sql = requireNonNull(sql, "sql");
this.withQuery = withQuery;
}

Expand Down Expand Up @@ -101,7 +107,10 @@ public DdlCommandResult executeDropSource(final DropSourceCommand dropSource) {
if (dataSource == null) {
return new DdlCommandResult(true, "Source " + sourceName + " does not exist.");
}
metaStore.deleteSource(sourceName);

metaStore.deleteSource(sourceName, queryId ->
engineContext.getPersistentQuery(new QueryId(queryId)).ifPresent(QueryMetadata::close));

return new DdlCommandResult(true,
"Source " + sourceName + " (topic: " + dataSource.getKafkaTopicName() + ") was dropped.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.metastore.MutableMetaStore;
import io.confluent.ksql.metrics.StreamsErrorCollector;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.DefaultKsqlParser;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
Expand Down Expand Up @@ -58,7 +59,7 @@
* Holds the mutable state and services of the engine.
*/
// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
final class EngineContext {
public final class EngineContext {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private static final Logger LOG = LoggerFactory.getLogger(EngineContext.class);
Expand Down Expand Up @@ -99,7 +100,7 @@ private EngineContext(
this.metaStore = requireNonNull(metaStore, "metaStore");
this.queryIdGenerator = requireNonNull(queryIdGenerator, "queryIdGenerator");
this.ddlCommandFactory = new CommandFactories(serviceContext, metaStore);
this.ddlCommandExec = new DdlCommandExec(metaStore);
this.ddlCommandExec = new DdlCommandExec(metaStore, this);
this.persistentQueries = new ConcurrentHashMap<>();
this.processingLogContext = requireNonNull(processingLogContext, "processingLogContext");
this.parser = requireNonNull(parser, "parser");
Expand All @@ -121,7 +122,7 @@ EngineContext createSandbox(final ServiceContext serviceContext) {
return sandBox;
}

Optional<PersistentQueryMetadata> getPersistentQuery(final QueryId queryId) {
public Optional<PersistentQueryMetadata> getPersistentQuery(final QueryId queryId) {
return Optional.ofNullable(persistentQueries.get(queryId));
}

Expand Down Expand Up @@ -215,7 +216,7 @@ String executeDdl(
return result.getMessage();
}

void registerQuery(final QueryMetadata query) {
void registerQuery(final Optional<SourceName> sourceStreamOrTable, final QueryMetadata query) {
allLiveQueries.add(query);
if (query instanceof PersistentQueryMetadata) {
final PersistentQueryMetadata persistentQuery = (PersistentQueryMetadata) query;
Expand All @@ -239,7 +240,8 @@ void registerQuery(final QueryMetadata query) {
metaStore.updateForPersistentQuery(
queryId.toString(),
persistentQuery.getSourceNames(),
ImmutableSet.of(persistentQuery.getSinkName()));
ImmutableSet.of(persistentQuery.getSinkName()),
sourceStreamOrTable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,23 @@ ExecuteResult execute(final KsqlPlan plan) {
}

final QueryPlan queryPlan = plan.getQueryPlan().get();
plan.getDdlCommand().map(ddl -> executeDdl(ddl, plan.getStatementText(), true));
return ExecuteResult.of(executePersistentQuery(queryPlan, plan.getStatementText()));
final Optional<SourceName> sourceStreamOrTable = getSourceStreamOrTable(plan);
plan.getDdlCommand().ifPresent(ddl -> executeDdl(ddl, plan.getStatementText(), true));
return ExecuteResult.of(executePersistentQuery(
sourceStreamOrTable,
queryPlan,
plan.getStatementText()
));
}

private Optional<SourceName> getSourceStreamOrTable(final KsqlPlan plan) {
return plan.getDdlCommand().flatMap(ddlCommand -> {
if (ddlCommand instanceof CreateSourceCommand) {
return Optional.of(((CreateSourceCommand) ddlCommand).getSourceName());
}

return Optional.empty();
});
}

@SuppressWarnings("OptionalGetWithoutIsPresent") // Known to be non-empty
Expand Down Expand Up @@ -364,6 +379,7 @@ private String executeDdl(
}

private PersistentQueryMetadata executePersistentQuery(
final Optional<SourceName> sourceStreamOrTable,
final QueryPlan queryPlan,
final String statementText
) {
Expand All @@ -382,7 +398,7 @@ private PersistentQueryMetadata executePersistentQuery(
buildPlanSummary(queryPlan.getQueryId(), queryPlan.getPhysicalPlan())
);

engineContext.registerQuery(queryMetadata);
engineContext.registerQuery(sourceStreamOrTable, queryMetadata);
return queryMetadata;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public TransientQueryMetadata executeQuery(
statement.getConfigOverrides())
.executeQuery(statement);
registerQuery(query);
primaryContext.registerQuery(query);
primaryContext.registerQuery(Optional.empty(), query);
return query;
} catch (final KsqlStatementException e) {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.engine.EngineContext;
import io.confluent.ksql.execution.ddl.commands.CreateStreamCommand;
import io.confluent.ksql.execution.ddl.commands.CreateTableCommand;
import io.confluent.ksql.execution.ddl.commands.DdlCommandResult;
Expand All @@ -21,6 +23,12 @@
import io.confluent.ksql.metastore.model.KsqlStream;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.properties.with.CreateSourceAsProperties;
import io.confluent.ksql.parser.tree.AllColumns;
import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Select;
import io.confluent.ksql.parser.tree.Table;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.FormatFactory;
Expand All @@ -32,6 +40,8 @@
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.MetaStoreFixture;
import java.util.Optional;
import java.util.OptionalInt;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -65,6 +75,7 @@ public class DdlCommandExecTest {
private CreateTableCommand createTable;
private DropSourceCommand dropSource;
private DropTypeCommand dropType;
private CreateStreamAsSelect createStreamAsSelect;

private final MutableMetaStore metaStore
= MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry());
Expand All @@ -75,6 +86,8 @@ public class DdlCommandExecTest {
private KsqlStream source;
@Mock
private WindowInfo windowInfo;
@Mock
private EngineContext engineContext;

private DdlCommandExec cmdExec;

Expand All @@ -87,7 +100,7 @@ public void setup() {
when(source.getDataSourceType()).thenReturn(DataSourceType.KSTREAM);
when(source.getKafkaTopicName()).thenReturn(TOPIC_NAME);

cmdExec = new DdlCommandExec(metaStore);
cmdExec = new DdlCommandExec(metaStore, engineContext);
dropType = new DropTypeCommand("type");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,13 +425,13 @@ public void shouldUpdateReferentialIntegrityTableCorrectly() {
}

@Test
public void shouldFailIfReferentialIntegrityIsViolated() {
public void shouldFailIfOnDropTableWhenOtherQueriesAreReadingFromIt() {
// Given:
KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"create table bar as select * from test2;"
+ "create table foo as select * from test2;",
"create table foo as select * from test2;"
+ "create table bar as select * from foo;",
KSQL_CONFIG,
Collections.emptyMap()
);
Expand All @@ -451,12 +451,103 @@ public void shouldFailIfReferentialIntegrityIsViolated() {
// Then:
assertThat(e, rawMessage(is(
"Cannot drop FOO.\n"
+ "The following queries read from this source: [].\n"
+ "The following queries write into this source: [CTAS_FOO_1].\n"
+ "The following queries read from this source: [CTAS_BAR_1].\n"
+ "The following queries write into this source: [CTAS_FOO_0].\n"
+ "You need to terminate them before dropping FOO.")));
assertThat(e, statementText(is("drop table foo;")));
}

@Test
public void shouldFailIfOnDropStreamWhenOtherQueriesAreWritingToIt() {
// Given:
KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"create stream foo as select * from test1;"
+ "insert into foo select * from test1;",
KSQL_CONFIG,
Collections.emptyMap()
);

// When:
final KsqlStatementException e = assertThrows(
KsqlStatementException.class,
() -> KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"drop stream foo;",
KSQL_CONFIG,
Collections.emptyMap()
)
);

// Then:
assertThat(e, rawMessage(is(
"Cannot drop FOO.\n"
+ "The following queries read from this source: [].\n"
+ "The following queries write into this source: [CSAS_FOO_0, INSERTQUERY_1].\n"
+ "You need to terminate them before dropping FOO.")));
assertThat(e, statementText(is("drop stream foo;")));
}

@Test
public void shouldTerminateQueryOnDropTableForStreamsOnlyOnPersistentQueryReferenced() {
// Given:
KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"create table foo as select * from test2;"
+ "create table bar as select * from foo;",
KSQL_CONFIG,
Collections.emptyMap()
);

// When:
KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"drop table bar;",
KSQL_CONFIG,
Collections.emptyMap()
);

// Then:
assertThat(metaStore.getSource(SourceName.of("bar")), nullValue());

// Only CTAS_FOO_0 query must be running
assertThat(ksqlEngine.getPersistentQueries().size(), is(1));
assertThat(ksqlEngine.getPersistentQuery(new QueryId("CTAS_FOO_0")).get(), not(nullValue()));
}

@Test
public void shouldTerminateQueryOnDropStreamForStreamsOnlyOnSinglePersistentQueryReferenced() {
// Given:
KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"create stream foo as select * from test1;"
+ "create stream bar as select * from foo;",
KSQL_CONFIG,
Collections.emptyMap()
);

// When:
KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"drop stream bar;",
KSQL_CONFIG,
Collections.emptyMap()
);

// Then:
assertThat(metaStore.getSource(SourceName.of("bar")), nullValue());

// Only CSAS_FOO_0 query must be running
assertThat(ksqlEngine.getPersistentQueries().size(), is(1));
assertThat(ksqlEngine.getPersistentQuery(new QueryId("CSAS_FOO_0")).get(), not(nullValue()));
}

@Test
public void shouldFailDDLStatementIfTopicDoesNotExist() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlException;
import java.util.Optional;
import java.util.function.Consumer;

import org.apache.kafka.clients.admin.TopicDescription;
import org.junit.After;
import org.junit.Before;
Expand All @@ -54,6 +56,7 @@

@RunWith(MockitoJUnitRunner.class)
public class SourceTopicsExtractorTest {
private static final Consumer<String> NOOP_TERMINATE_QUERY = queryId -> { return; };

private static final LogicalSchema SCHEMA = LogicalSchema.builder()
.keyColumn(SystemColumns.ROWKEY_NAME, SqlTypes.STRING)
Expand All @@ -66,8 +69,6 @@ public class SourceTopicsExtractorTest {
@Mock
private ServiceContext serviceContext;
@Mock
private KafkaTopicClient kafkaTopicClient;
@Mock
private TopicDescription TOPIC_1;
@Mock
private TopicDescription TOPIC_2;
Expand Down Expand Up @@ -142,7 +143,7 @@ public void shouldExtractJoinTopicsFromJoinSelect() {
public void shouldFailIfSourceTopicNotInMetastore() {
// Given:
final Statement statement = givenStatement("SELECT * FROM " + STREAM_TOPIC_1 + ";");
metaStore.deleteSource(SourceName.of(STREAM_TOPIC_1.toUpperCase()));
metaStore.deleteSource(SourceName.of(STREAM_TOPIC_1.toUpperCase()), NOOP_TERMINATE_QUERY);

// When:
final Exception e = assertThrows(
Expand Down
Loading

0 comments on commit 0d21017

Please sign in to comment.