Skip to content

Commit

Permalink
feat: terminate persistent query on DROP command
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Oct 23, 2020
1 parent dea737f commit f6779fa
Show file tree
Hide file tree
Showing 5 changed files with 340 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.ddl.commands.CommandFactories;
import io.confluent.ksql.ddl.commands.DdlCommandExec;
import io.confluent.ksql.engine.rewrite.AstSanitizer;
import io.confluent.ksql.execution.ddl.commands.DdlCommand;
import io.confluent.ksql.execution.ddl.commands.DdlCommandResult;
import io.confluent.ksql.execution.ddl.commands.DropSourceCommand;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.metastore.MutableMetaStore;
import io.confluent.ksql.metrics.StreamsErrorCollector;
Expand All @@ -39,17 +41,21 @@
import io.confluent.ksql.query.id.QueryIdGenerator;
import io.confluent.ksql.services.SandboxedServiceContext;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlReferentialIntegrityException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.SandboxedPersistentQueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.apache.kafka.streams.KafkaStreams.State;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -73,6 +79,8 @@ final class EngineContext {
private final Map<QueryId, PersistentQueryMetadata> persistentQueries;
private final Set<QueryMetadata> allLiveQueries = ConcurrentHashMap.newKeySet();
private final QueryCleanupService cleanupService;
private final Map<SourceName, QueryId> createAsQueries = new ConcurrentHashMap();
private final Map<SourceName, Set<QueryId>> otherQueries = new ConcurrentHashMap();

static EngineContext create(
final ServiceContext serviceContext,
Expand Down Expand Up @@ -124,6 +132,9 @@ EngineContext createSandbox(final ServiceContext serviceContext) {
query.getQueryId(),
SandboxedPersistentQueryMetadata.of(query, sandBox::closeQuery)));

sandBox.createAsQueries.putAll(createAsQueries);
sandBox.otherQueries.putAll(otherQueries);

return sandBox;
}

Expand All @@ -136,7 +147,14 @@ Map<QueryId, PersistentQueryMetadata> getPersistentQueries() {
}

Set<String> getQueriesWithSink(final SourceName sourceName) {
return metaStore.getQueriesWithSink(sourceName);
final ImmutableSet.Builder<String> queries = ImmutableSet.builder();

if (createAsQueries.containsKey(sourceName)) {
queries.add(createAsQueries.get(sourceName).toString());
}

queries.addAll(getOtherQueriesWithSink(sourceName));
return queries.build();
}

MutableMetaStore getMetaStore() {
Expand Down Expand Up @@ -216,15 +234,87 @@ String executeDdl(
final boolean withQuery,
final Set<SourceName> withQuerySources
) {
if (command instanceof DropSourceCommand) {
throwIfOtherQueriesExist(((DropSourceCommand) command).getSourceName());
}

final DdlCommandResult result =
ddlCommandExec.execute(sqlExpression, command, withQuery, withQuerySources);
if (!result.isSuccess()) {
throw new KsqlStatementException(result.getMessage(), sqlExpression);
}

if (command instanceof DropSourceCommand) {
terminateCreateAsQuery(((DropSourceCommand) command).getSourceName());
}

return result.getMessage();
}

void registerQuery(final QueryMetadata query) {
private void terminateCreateAsQuery(final SourceName sourceName) {
createAsQueries.computeIfPresent(sourceName, (ignore , queryId) -> {
final PersistentQueryMetadata query = persistentQueries.get(queryId);
if (query != null) {
query.close();
}

return null;
});
}

private Set<String> getOtherQueriesWithSink(final SourceName sourceName) {
final ImmutableSet.Builder<String> queries = ImmutableSet.builder();

if (otherQueries.containsKey(sourceName)) {
otherQueries.get(sourceName).forEach(queryId -> {
final PersistentQueryMetadata query = persistentQueries.get(queryId);
if (query != null && query.getSinkName().equals(sourceName)) {
queries.add(queryId.toString());
}
});
}

return queries.build();
}

private Set<String> getOtherQueriesWithSource(final SourceName sourceName) {
final ImmutableSet.Builder<String> queries = ImmutableSet.builder();

if (otherQueries.containsKey(sourceName)) {
otherQueries.get(sourceName).forEach(queryId -> {
final PersistentQueryMetadata query = persistentQueries.get(queryId);
if (query != null && query.getSourceNames().contains(sourceName)) {
queries.add(queryId.toString());
}
});
}

return queries.build();
}

private void throwIfOtherQueriesExist(final SourceName sourceName) {
final Set<String> sinkQueries = getOtherQueriesWithSink(sourceName);
final Set<String> sourceQueries = getOtherQueriesWithSource(sourceName);

if (!sinkQueries.isEmpty() || !sourceQueries.isEmpty()) {
throw new KsqlReferentialIntegrityException(String.format(
"Cannot drop %s.%n"
+ "The following queries read from this source: [%s].%n"
+ "The following queries write into this source: [%s].%n"
+ "You need to terminate them before dropping %s.",
sourceName.text(),
sourceQueries.stream()
.sorted()
.collect(Collectors.joining(", ")),
sinkQueries.stream()
.sorted()
.collect(Collectors.joining(", ")),
sourceName.text()
));
}
}

void registerQuery(final QueryMetadata query, final boolean createAsQuery) {
allLiveQueries.add(query);
if (query instanceof PersistentQueryMetadata) {
final PersistentQueryMetadata persistentQuery = (PersistentQueryMetadata) query;
Expand All @@ -245,10 +335,17 @@ void registerQuery(final QueryMetadata query) {
}

persistentQueries.put(queryId, persistentQuery);
metaStore.updateForPersistentQuery(
queryId.toString(),
persistentQuery.getSourceNames(),
ImmutableSet.of(persistentQuery.getSinkName()));
if (createAsQuery) {
createAsQueries.put(persistentQuery.getSinkName(), queryId);
} else {
final Iterable<SourceName> allSourceNames = Iterables.concat(
Collections.singleton(persistentQuery.getSinkName()),
persistentQuery.getSourceNames()
);

allSourceNames.forEach(sourceName ->
otherQueries.computeIfAbsent(sourceName, x -> new HashSet<>()).add(queryId));
}
}
}

Expand All @@ -260,8 +357,25 @@ private void closeQuery(final QueryMetadata query) {

private boolean unregisterQuery(final QueryMetadata query) {
if (query instanceof PersistentQueryMetadata) {
persistentQueries.remove(query.getQueryId());
metaStore.removePersistentQuery(query.getQueryId().toString());
final PersistentQueryMetadata persistentQuery = (PersistentQueryMetadata) query;
final QueryId queryId = persistentQuery.getQueryId();
persistentQueries.remove(queryId);

final Iterable<SourceName> allSourceNames = Iterables.concat(
Collections.singleton(persistentQuery.getSinkName()),
persistentQuery.getSourceNames()
);

// If query is a INSERT query, then this line should not cause any effect
createAsQueries.remove(persistentQuery.getSinkName());

// If query is a C*AS query, then these lines should not cause any effect
allSourceNames.forEach(sourceName ->
otherQueries.computeIfPresent(sourceName, (s, queries) -> {
queries.remove(queryId);
return (queries.isEmpty()) ? null : queries;
})
);
}

if (!query.getState().equals(State.NOT_RUNNING)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ ExecuteResult execute(final KsqlPlan plan) {
final QueryPlan queryPlan = plan.getQueryPlan().get();
plan.getDdlCommand().map(ddl ->
executeDdl(ddl, plan.getStatementText(), true, queryPlan.getSources()));
return ExecuteResult.of(executePersistentQuery(queryPlan, plan.getStatementText()));
return ExecuteResult.of(executePersistentQuery(
queryPlan,
plan.getStatementText(),
plan.getDdlCommand().isPresent())
);
}

@SuppressWarnings("OptionalGetWithoutIsPresent") // Known to be non-empty
Expand Down Expand Up @@ -379,7 +383,8 @@ private String executeDdl(

private PersistentQueryMetadata executePersistentQuery(
final QueryPlan queryPlan,
final String statementText
final String statementText,
final boolean createAsQuery
) {
final QueryExecutor executor = engineContext.createQueryExecutor(
config,
Expand All @@ -395,7 +400,7 @@ private PersistentQueryMetadata executePersistentQuery(
buildPlanSummary(queryPlan.getQueryId(), queryPlan.getPhysicalPlan())
);

engineContext.registerQuery(queryMetadata);
engineContext.registerQuery(queryMetadata, createAsQuery);
return queryMetadata;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public TransientQueryMetadata executeQuery(
.executeQuery(statement);

registerQuery(query);
primaryContext.registerQuery(query);
primaryContext.registerQuery(query, false);
return query;
} catch (final KsqlStatementException e) {
throw e;
Expand Down
Loading

0 comments on commit f6779fa

Please sign in to comment.