Skip to content

Commit

Permalink
feat: execute source table query plans
Browse files Browse the repository at this point in the history
  • Loading branch information
ConfluentJenkins committed Aug 27, 2021
1 parent 180832b commit 09487c6
Show file tree
Hide file tree
Showing 19 changed files with 496 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public enum KsqlQueryType {
}

public enum PersistentQueryType {
CREATE_AS, INSERT
CREATE_SOURCE, CREATE_AS, INSERT
}

public enum KsqlQueryStatus {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public final class SourceSchemas {
SourceSchemas(final Map<SourceName, LogicalSchema> sourceSchemas) {
this.sourceSchemas = ImmutableMap.copyOf(requireNonNull(sourceSchemas, "sourceSchemas"));

// This will fail
if (sourceSchemas.isEmpty()) {
throw new IllegalArgumentException("Must supply at least one schema");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,13 @@ public DdlCommandResult executeCreateTable(final CreateTableCommand createTable)
createTable.getIsSource()
);
metaStore.putSource(ksqlTable, createTable.isOrReplace());
metaStore.addSourceReferences(ksqlTable.getName(), withQuerySources);

// Source tables only has a query source reference to itself. We don't need to register
// this source for source tables.
if (!createTable.getIsSource()) {
metaStore.addSourceReferences(ksqlTable.getName(), withQuerySources);
}

return new DdlCommandResult(true, "Table created");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static io.confluent.ksql.metastore.model.DataSource.DataSourceType;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.KsqlExecutionContext.ExecuteResult;
Expand Down Expand Up @@ -80,12 +81,14 @@
import io.confluent.ksql.query.QueryRegistry;
import io.confluent.ksql.query.TransientQueryQueue;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.utils.FormatOptions;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.RefinementInfo;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
Expand Down Expand Up @@ -159,11 +162,20 @@ ExecuteResult execute(final KsqlPlan plan) {
}

final QueryPlan queryPlan = plan.getQueryPlan().get();
final DataSource sinkSource = engineContext.getMetaStore().getSource(queryPlan.getSink());

if (sinkSource != null && sinkSource.isSource()) {
throw new KsqlException(String.format("Cannot insert into read-only %s: %s",
sinkSource.getDataSourceType().getKsqlType().toLowerCase(), sinkSource.getName().text()));
final KsqlConstants.PersistentQueryType persistentQueryType =
plan.getPersistentQueryType().get();

// CREATE_SOURCE do not write to any topic. We check for read-only topics only for queries
// that attempt to write to a sink (i.e. INSERT or CREATE_AS).
if (persistentQueryType != KsqlConstants.PersistentQueryType.CREATE_SOURCE) {
final DataSource sinkSource = engineContext.getMetaStore()
.getSource(queryPlan.getSink().get());

if (sinkSource != null && sinkSource.isSource()) {
throw new KsqlException(String.format("Cannot insert into read-only %s: %s",
sinkSource.getDataSourceType().getKsqlType().toLowerCase(),
sinkSource.getName().text()));
}
}

final Optional<String> ddlResult = plan.getDdlCommand().map(ddl ->
Expand All @@ -177,7 +189,7 @@ ExecuteResult execute(final KsqlPlan plan) {
return ExecuteResult.of(executePersistentQuery(
queryPlan,
plan.getStatementText(),
plan.getDdlCommand().isPresent())
persistentQueryType)
);
}

Expand Down Expand Up @@ -444,7 +456,7 @@ private KsqlPlan sourceTablePlan(

final QueryPlan queryPlan = new QueryPlan(
getSourceNames(outputNode),
null,
Optional.empty(),
plans.physicalPlan.getPhysicalPlan(),
plans.physicalPlan.getQueryId()
);
Expand Down Expand Up @@ -505,7 +517,7 @@ KsqlPlan plan(final ConfiguredStatement<?> statement) {

final QueryPlan queryPlan = new QueryPlan(
getSourceNames(outputNode),
outputNode.getSinkName().get(),
outputNode.getSinkName(),
plans.physicalPlan.getPhysicalPlan(),
plans.physicalPlan.getQueryId()
);
Expand Down Expand Up @@ -738,10 +750,24 @@ private String executeDdl(
}
}

private Set<DataSource> getSources(final QueryPlan queryPlan) {
final ImmutableSet.Builder<DataSource> sources = ImmutableSet.builder();
for (final SourceName name : queryPlan.getSources()) {
final DataSource dataSource = engineContext.getMetaStore().getSource(name);
if (dataSource == null) {
throw new KsqlException("Unknown source: " + name.toString(FormatOptions.noEscape()));
}

sources.add(dataSource);
}

return sources.build();
}

private PersistentQueryMetadata executePersistentQuery(
final QueryPlan queryPlan,
final String statementText,
final boolean createAsQuery
final KsqlConstants.PersistentQueryType persistentQueryType
) {
final QueryRegistry queryRegistry = engineContext.getQueryRegistry();
return queryRegistry.createOrReplacePersistentQuery(
Expand All @@ -751,11 +777,11 @@ private PersistentQueryMetadata executePersistentQuery(
engineContext.getMetaStore(),
statementText,
queryPlan.getQueryId(),
engineContext.getMetaStore().getSource(queryPlan.getSink()),
queryPlan.getSources(),
queryPlan.getSink().map(s -> engineContext.getMetaStore().getSource(s)),
getSources(queryPlan),
queryPlan.getPhysicalPlan(),
buildPlanSummary(queryPlan.getQueryId(), queryPlan.getPhysicalPlan()),
createAsQuery
persistentQueryType
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@

package io.confluent.ksql.engine;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.confluent.ksql.execution.ddl.commands.DdlCommand;
import io.confluent.ksql.util.KsqlConstants;
import java.util.Optional;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
Expand All @@ -35,6 +37,9 @@ public interface KsqlPlan {

KsqlPlan withoutQuery();

@JsonIgnore
Optional<KsqlConstants.PersistentQueryType> getPersistentQueryType();

static KsqlPlan ddlPlanCurrent(final String statementText, final DdlCommand ddlCommand) {
return new KsqlPlanV1(statementText, Optional.of(ddlCommand), Optional.empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
package io.confluent.ksql.engine;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.confluent.ksql.execution.ddl.commands.CreateTableCommand;
import io.confluent.ksql.execution.ddl.commands.DdlCommand;
import io.confluent.ksql.util.KsqlConstants;
import java.util.Objects;
import java.util.Optional;

Expand Down Expand Up @@ -66,6 +68,27 @@ public KsqlPlan withoutQuery() {
return new KsqlPlanV1(statementText, ddlCommand, Optional.empty());
}

@Override
public Optional<KsqlConstants.PersistentQueryType> getPersistentQueryType() {
if (!queryPlan.isPresent()) {
return Optional.empty();
}

// CREATE_AS and CREATE_SOURCE commands contain a DDL command and a Query plan.
if (ddlCommand.isPresent()) {
if (ddlCommand.get() instanceof CreateTableCommand
&& ((CreateTableCommand) ddlCommand.get()).getIsSource()) {
return Optional.of(KsqlConstants.PersistentQueryType.CREATE_SOURCE);
} else {
return Optional.of(KsqlConstants.PersistentQueryType.CREATE_AS);
}
} else {
// INSERT INTO persistent queries are the only queries types that exist without a
// DDL command linked to the plan.
return Optional.of(KsqlConstants.PersistentQueryType.INSERT);
}
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,33 @@
import io.confluent.ksql.query.QueryId;
import java.util.Comparator;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

public final class QueryPlan {
private final ImmutableSet<SourceName> sources;
private final SourceName sink;
private final Optional<SourceName> sink;
private final ExecutionStep<?> physicalPlan;
private final QueryId queryId;

public QueryPlan(
@JsonProperty(value = "sources", required = true) final Set<SourceName> sources,
@JsonProperty(value = "sink", required = true) final SourceName sink,
@JsonProperty(value = "sink") final Optional<SourceName> sink,
@JsonProperty(value = "physicalPlan", required = true) final ExecutionStep<?> physicalPlan,
@JsonProperty(value = "queryId", required = true) final QueryId queryId
) {
this.sources = ImmutableSortedSet.copyOf(
Comparator.comparing(Name::text),
Objects.requireNonNull(sources, "sources")
);
this.sink = Objects.requireNonNull(sink, "sink");
// Sink can be null when a query plan forms part of a create source table command.
// CST statements need a plan without a sink topic.
this.sink = sink;
this.physicalPlan = Objects.requireNonNull(physicalPlan, "physicalPlan");
this.queryId = Objects.requireNonNull(queryId, "queryId");
}

public SourceName getSink() {
public Optional<SourceName> getSink() {
return sink;
}

Expand Down
Loading

0 comments on commit 09487c6

Please sign in to comment.