Skip to content

Commit

Permalink
feat: create and write a query plan for source tables (#8043)
Browse files Browse the repository at this point in the history
  • Loading branch information
spena authored Aug 24, 2021
1 parent a322310 commit 381b7bf
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 32 deletions.
151 changes: 138 additions & 13 deletions ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,41 @@

import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.KsqlExecutionContext.ExecuteResult;
import io.confluent.ksql.analyzer.ImmutableAnalysis;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.execution.ddl.commands.CreateTableCommand;
import io.confluent.ksql.execution.ddl.commands.DdlCommand;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.MetaStoreImpl;
import io.confluent.ksql.metastore.MutableMetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.KsqlTable;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.OutputRefinement;
import io.confluent.ksql.parser.tree.AliasedRelation;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.CreateTableAsSelect;
import io.confluent.ksql.parser.tree.ExecutableDdlStatement;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.QueryContainer;
import io.confluent.ksql.parser.tree.Relation;
import io.confluent.ksql.parser.tree.Select;
import io.confluent.ksql.parser.tree.SingleColumn;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.Table;
import io.confluent.ksql.parser.tree.TableElement;
import io.confluent.ksql.physical.PhysicalPlan;
import io.confluent.ksql.physical.pull.HARouting;
import io.confluent.ksql.physical.pull.PullPhysicalPlan;
Expand All @@ -62,6 +80,9 @@
import io.confluent.ksql.query.QueryRegistry;
import io.confluent.ksql.query.TransientQueryQueue;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.RefinementInfo;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
Expand All @@ -79,6 +100,7 @@
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
Expand Down Expand Up @@ -321,7 +343,7 @@ TransientQueryMetadata executeTransientQuery(
final boolean excludeTombstones
) {
final ExecutorPlans plans = planQuery(statement, statement.getStatement(),
Optional.empty(), Optional.empty());
Optional.empty(), Optional.empty(), engineContext.getMetaStore());
final KsqlBareOutputNode outputNode = (KsqlBareOutputNode) plans.logicalPlan.getNode().get();
engineContext.createQueryValidator().validateQuery(
config,
Expand All @@ -347,28 +369,128 @@ TransientQueryMetadata executeTransientQuery(
);
}

@SuppressFBWarnings(value = "NP_NULL_PARAM_DEREF_NONVIRTUAL")
private KsqlPlan sourceTablePlan(
final ConfiguredStatement<?> statement) {
final CreateTable createTable = (CreateTable) statement.getStatement();
final CreateTableCommand ddlCommand = (CreateTableCommand) engineContext.createDdlCommand(
statement.getStatementText(),
(ExecutableDdlStatement) statement.getStatement(),
config
);

final Relation from = new AliasedRelation(
new Table(createTable.getName()), createTable.getName());

// Only VALUE columns must be selected from the source table. When running a pull query, the
// keys are added if selecting all columns.
final Select select = new Select(
createTable.getElements().stream()
.filter(column -> column.getNamespace() == TableElement.Namespace.VALUE)
.map(column -> new SingleColumn(
new UnqualifiedColumnReferenceExp(column.getName()),
Optional.of(column.getName())))
.collect(Collectors.toList()));

// Source table need to keep emitting changes so every new record is materialized for
// pull query availability.
final RefinementInfo refinementInfo = RefinementInfo.of(OutputRefinement.CHANGES);

// This is a plan for a `select * from <source-table> emit changes` statement,
// without a sink topic to write the results. The query is just made to materialize the
// source table.
final Query query = new Query(
Optional.empty(),
select,
from,
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.of(refinementInfo),
false,
OptionalInt.empty());

// The source table does not exist in the current metastore, so a temporary metastore that
// contains only the source table is created here. This metastore is used later to create
// ExecutorsPlan.
final MutableMetaStore tempMetastore = new MetaStoreImpl(new InternalFunctionRegistry());
final Formats formats = ddlCommand.getFormats();
tempMetastore.putSource(new KsqlTable<>(
statement.getStatementText(),
createTable.getName(),
ddlCommand.getSchema(),
Optional.empty(),
false,
new KsqlTopic(
ddlCommand.getTopicName(),
KeyFormat.of(formats.getKeyFormat(), formats.getKeyFeatures(), Optional.empty()),
ValueFormat.of(formats.getValueFormat(), formats.getValueFeatures())
),
true
), false);

final ExecutorPlans plans = planQuery(
statement,
query,
Optional.empty(),
Optional.empty(),
tempMetastore
);

final KsqlBareOutputNode outputNode =
(KsqlBareOutputNode) plans.logicalPlan.getNode().get();

final QueryPlan queryPlan = new QueryPlan(
getSourceNames(outputNode),
null,
plans.physicalPlan.getPhysicalPlan(),
plans.physicalPlan.getQueryId()
);

engineContext.createQueryValidator().validateQuery(
config,
plans.physicalPlan,
engineContext.getQueryRegistry().getAllLiveQueries()
);

return KsqlPlan.queryPlanCurrent(
statement.getStatementText(),
Optional.of(ddlCommand),
queryPlan);
}

// Known to be non-empty
@SuppressWarnings("OptionalGetWithoutIsPresent")
KsqlPlan plan(final ConfiguredStatement<?> statement) {
try {
throwOnNonExecutableStatement(statement);

if (statement.getStatement() instanceof ExecutableDdlStatement) {
final DdlCommand ddlCommand = engineContext.createDdlCommand(
statement.getStatementText(),
(ExecutableDdlStatement) statement.getStatement(),
config
);

return KsqlPlan.ddlPlanCurrent(statement.getStatementText(), ddlCommand);
if (statement.getStatement() instanceof CreateTable
&& ((CreateTable) statement.getStatement()).isSource()) {
return sourceTablePlan(statement);
} else {
final DdlCommand ddlCommand = engineContext.createDdlCommand(
statement.getStatementText(),
(ExecutableDdlStatement) statement.getStatement(),
config
);

return KsqlPlan.ddlPlanCurrent(
statement.getStatementText(),
ddlCommand);
}
}

final QueryContainer queryContainer = (QueryContainer) statement.getStatement();
final ExecutorPlans plans = planQuery(
statement,
queryContainer.getQuery(),
Optional.of(queryContainer.getSink()),
queryContainer.getQueryId()
queryContainer.getQueryId(),
engineContext.getMetaStore()
);

final KsqlStructuredDataOutputNode outputNode =
Expand Down Expand Up @@ -410,13 +532,14 @@ private ExecutorPlans planQuery(
final ConfiguredStatement<?> statement,
final Query query,
final Optional<Sink> sink,
final Optional<String> withQueryId) {
final Optional<String> withQueryId,
final MetaStore metaStore) {
final QueryEngine queryEngine = engineContext.createQueryEngine(serviceContext);
final KsqlConfig ksqlConfig = config.getConfig(true);
final OutputNode outputNode = QueryEngine.buildQueryLogicalPlan(
query,
sink,
engineContext.getMetaStore(),
metaStore,
ksqlConfig
);
final LogicalPlanNode logicalPlan = new LogicalPlanNode(
Expand All @@ -428,7 +551,9 @@ private ExecutorPlans planQuery(
engineContext.idGenerator(),
outputNode,
ksqlConfig.getBoolean(KsqlConfig.KSQL_CREATE_OR_REPLACE_ENABLED),
withQueryId
withQueryId,
statement.getStatement() instanceof CreateTable
&& ((CreateTable) statement.getStatement()).isSource()
);

if (withQueryId.isPresent()
Expand All @@ -439,7 +564,7 @@ private ExecutorPlans planQuery(
final PhysicalPlan physicalPlan = queryEngine.buildPhysicalPlan(
logicalPlan,
config,
engineContext.getMetaStore(),
metaStore,
queryId
);
return new ExecutorPlans(logicalPlan, physicalPlan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.MutableMetaStore;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.physical.PhysicalPlan;
Expand Down Expand Up @@ -72,7 +71,7 @@ static OutputNode buildQueryLogicalPlan(
PhysicalPlan buildPhysicalPlan(
final LogicalPlanNode logicalPlanNode,
final SessionConfig config,
final MutableMetaStore metaStore,
final MetaStore metaStore,
final QueryId queryId
) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public final class QueryIdUtil {
public enum ReservedQueryIdsPrefixes {
INSERT("INSERTQUERY_"),
CTAS("CTAS_"),
CSAS("CSAS_");
CSAS("CSAS_"),
CST("CST_");

private final String prefix;
ReservedQueryIdsPrefixes(final String prefix) {
Expand Down Expand Up @@ -92,17 +93,24 @@ static QueryId buildId(
final QueryIdGenerator idGenerator,
final OutputNode outputNode,
final boolean createOrReplaceEnabled,
final Optional<String> withQueryId) {
final Optional<String> withQueryId,
final boolean isSourceTable) {
if (withQueryId.isPresent()) {
final String queryId = withQueryId.get().toUpperCase();
validateWithQueryId(queryId);
return new QueryId(queryId);
}

if (!outputNode.getSinkName().isPresent()) {
final String prefix =
"transient_" + outputNode.getSource().getLeftmostSourceNode().getAlias().text() + "_";
return new QueryId(prefix + Math.abs(ThreadLocalRandom.current().nextLong()));
if (isSourceTable) {
final String suffix = outputNode.getId().toString().toUpperCase()
+ "_" + idGenerator.getNext().toUpperCase();
return new QueryId(ReservedQueryIdsPrefixes.CST + suffix);
} else {
final String prefix =
"transient_" + outputNode.getSource().getLeftmostSourceNode().getAlias().text() + "_";
return new QueryId(prefix + Math.abs(ThreadLocalRandom.current().nextLong()));
}
}

final KsqlStructuredDataOutputNode structured = (KsqlStructuredDataOutputNode) outputNode;
Expand Down
Loading

0 comments on commit 381b7bf

Please sign in to comment.