Skip to content

Commit

Permalink
feat: Move code from physical to logical plan for Projection (#6684)
Browse files Browse the repository at this point in the history
* compiling expression sin logical plan

logical refactoring

filter and project

refactoring

move project operator code into logical plan

still in progress, fixed some tests and made refactor smaller

fixing

* added tests for pull queries

* rebase

* remove comments

* add separate logicl nodes for pull

* fix comments

* fix test

* remove unnecessary code

* Address alans comments

* add back final

* remove unused field
  • Loading branch information
vpapavas authored Dec 17, 2020
1 parent 96645b5 commit cb0e6f2
Show file tree
Hide file tree
Showing 16 changed files with 672 additions and 312 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ private LogicalPlanNode buildAndValidateLogicalPlan(
final KsqlConfig config
) {
final OutputNode outputNode = new LogicalPlanner(config, analysis, engineContext.getMetaStore())
.buildPlan();
.buildPullLogicalPlan();
return new LogicalPlanNode(
statement.getStatementText(),
Optional.of(outputNode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ static OutputNode buildQueryLogicalPlan(

final Analysis analysis = queryAnalyzer.analyze(query, sink);

return new LogicalPlanner(config, analysis, metaStore).buildPlan();
return new LogicalPlanner(config, analysis, metaStore).buildPersistentLogicalPlan();
}

PhysicalPlan buildPhysicalPlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,13 @@
import io.confluent.ksql.execution.context.QueryContext.Stacker;
import io.confluent.ksql.execution.context.QueryLoggerUtil;
import io.confluent.ksql.execution.context.QueryLoggerUtil.QueryType;
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.execution.util.ExpressionTypeManager;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.tree.AllColumns;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Select;
import io.confluent.ksql.parser.tree.SingleColumn;
import io.confluent.ksql.physical.pull.operators.AbstractPhysicalOperator;
import io.confluent.ksql.physical.pull.operators.DataSourceOperator;
import io.confluent.ksql.physical.pull.operators.KeyedTableLookupOperator;
Expand All @@ -48,21 +42,14 @@
import io.confluent.ksql.planner.plan.KsqlBareOutputNode;
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.planner.plan.ProjectNode;
import io.confluent.ksql.planner.plan.PullProjectNode;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.LogicalSchema.Builder;
import io.confluent.ksql.schema.ksql.SystemColumns;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Traverses the logical plan top-down and creates a physical plan for pull queries.
Expand Down Expand Up @@ -137,17 +124,18 @@ public PullPhysicalPlan buildPullPhysicalPlan(final LogicalPlanNode logicalPlanN
while (true) {

AbstractPhysicalOperator currentPhysicalOp = null;
if (currentLogicalNode instanceof ProjectNode) {
currentPhysicalOp = translateProjectNode((ProjectNode)currentLogicalNode);
if (currentLogicalNode instanceof PullProjectNode) {
currentPhysicalOp = translateProjectNode((PullProjectNode)currentLogicalNode);
} else if (currentLogicalNode instanceof FilterNode) {
currentPhysicalOp = translateFilterNode((FilterNode)currentLogicalNode);
} else if (currentLogicalNode instanceof DataSourceNode) {
currentPhysicalOp = translateDataSourceNode(
(DataSourceNode) currentLogicalNode);
dataSourceOperator = (DataSourceOperator)currentPhysicalOp;
} else {
throw new KsqlException("Error in translating logical to physical plan for pull queries: "
+ "unrecognized logical node.");
throw new KsqlException(String.format(
"Error in translating logical to physical plan for pull queries: unrecognized logical"
+ " node %s.", currentLogicalNode));
}

if (prevPhysicalOp == null) {
Expand All @@ -171,50 +159,25 @@ public PullPhysicalPlan buildPullPhysicalPlan(final LogicalPlanNode logicalPlanN
}
return new PullPhysicalPlan(
rootPhysicalOp,
((ProjectOperator)rootPhysicalOp).getOutputSchema(),
(rootPhysicalOp).getLogicalNode().getSchema(),
queryId,
keys,
mat,
dataSourceOperator);
}

private ProjectOperator translateProjectNode(final ProjectNode logicalNode) {
final LogicalSchema outputSchema;
boolean isStar = false;
if (isSelectStar(statement.getStatement().getSelect())) {
isStar = true;
outputSchema = buildSelectStarSchema(mat.schema(), mat.windowType().isPresent());
} else {
final List<SelectExpression> projection = analysis.getSelectItems().stream()
.map(SingleColumn.class::cast)
.map(si -> SelectExpression
.of(si.getAlias().orElseThrow(IllegalStateException::new), si.getExpression()))
.collect(Collectors.toList());

outputSchema = selectOutputSchema(projection, mat.windowType());
}
private ProjectOperator translateProjectNode(final PullProjectNode logicalNode) {
final ProcessingLogger logger = processingLogContext
.getLoggerFactory()
.getLogger(
QueryLoggerUtil.queryLoggerName(
QueryType.PULL_QUERY, contextStacker.push("PROJECT").getQueryContext())
);

final boolean noSystemColumns = analysis.getSelectColumnNames().stream()
.noneMatch(SystemColumns::isSystemColumn);
final boolean noKeyColumns = analysis.getSelectColumnNames().stream()
.noneMatch(mat.schema()::isKeyColumn);

return new ProjectOperator(
config,
metaStore,
logger,
mat,
logicalNode,
outputSchema,
isStar,
noSystemColumns,
noKeyColumns);
logicalNode
);
}

private SelectOperator translateFilterNode(final FilterNode logicalNode) {
Expand Down Expand Up @@ -247,67 +210,10 @@ private AbstractPhysicalOperator translateDataSourceNode(
}
}

private LogicalSchema selectOutputSchema(
final List<SelectExpression> selectExpressions,
final Optional<WindowType> windowType
) {
final Builder schemaBuilder = LogicalSchema.builder();

// Copy meta & key columns into the value schema as SelectValueMapper expects it:
final LogicalSchema schema = mat.schema()
.withPseudoAndKeyColsInValue(windowType.isPresent());

final ExpressionTypeManager expressionTypeManager =
new ExpressionTypeManager(schema, metaStore);

for (final SelectExpression select : selectExpressions) {
final SqlType type = expressionTypeManager.getExpressionSqlType(select.getExpression());

if (mat.schema().isKeyColumn(select.getAlias())
|| select.getAlias().equals(SystemColumns.WINDOWSTART_NAME)
|| select.getAlias().equals(SystemColumns.WINDOWEND_NAME)
) {
schemaBuilder.keyColumn(select.getAlias(), type);
} else {
schemaBuilder.valueColumn(select.getAlias(), type);
}
}
return schemaBuilder.build();
}

private static boolean isSelectStar(final Select select) {
final boolean someStars = select.getSelectItems().stream()
.anyMatch(s -> s instanceof AllColumns);

if (someStars && select.getSelectItems().size() != 1) {
throw new KsqlException("Pull queries only support wildcards in the projects "
+ "if they are the only expression");
}

return someStars;
}

private QueryId uniqueQueryId() {
return new QueryId("query_" + System.currentTimeMillis());
}

private LogicalSchema buildSelectStarSchema(
final LogicalSchema schema,
final boolean windowed
) {
final Builder builder = LogicalSchema.builder()
.keyColumns(schema.key());

if (windowed) {
builder.keyColumn(SystemColumns.WINDOWSTART_NAME, SqlTypes.BIGINT);
builder.keyColumn(SystemColumns.WINDOWEND_NAME, SqlTypes.BIGINT);
}

return builder
.valueColumns(schema.value())
.build();
}

private KsqlException notMaterializedException(final SourceName sourceTable) {
return new KsqlException(
"Can't pull from " + sourceTable + " as it's not a materialized table."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.physical.pull.operators;

import io.confluent.ksql.planner.plan.PlanNode;
import java.util.List;

/**
Expand All @@ -29,6 +30,8 @@ public abstract class AbstractPhysicalOperator {

public abstract void close();

public abstract PlanNode getLogicalNode();

public abstract void addChild(AbstractPhysicalOperator child);

public abstract AbstractPhysicalOperator getChild(int index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.execution.streams.materialization.Row;
import io.confluent.ksql.planner.plan.DataSourceNode;
import io.confluent.ksql.planner.plan.PlanNode;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
Expand All @@ -34,7 +35,7 @@ public class KeyedTableLookupOperator
private static final Logger LOG = LoggerFactory.getLogger(KeyedTableLookupOperator.class);

private final Materialization mat;
private final DataSourceNode logicalOperator;
private final DataSourceNode logicalNode;

private List<KsqlPartitionLocation> partitionLocations;
private Iterator<Row> resultIterator;
Expand All @@ -48,7 +49,7 @@ public KeyedTableLookupOperator(
final DataSourceNode logicalNode
) {
this.mat = Objects.requireNonNull(mat, "mat");
this.logicalOperator = Objects.requireNonNull(logicalNode, "logicalNode");
this.logicalNode = Objects.requireNonNull(logicalNode, "logicalNode");
}

@Override
Expand Down Expand Up @@ -101,6 +102,11 @@ public void close() {

}

@Override
public PlanNode getLogicalNode() {
return logicalNode;
}

@Override
public void addChild(final AbstractPhysicalOperator child) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.confluent.ksql.execution.streams.materialization.WindowedRow;
import io.confluent.ksql.physical.pull.operators.WhereInfo.WindowBounds;
import io.confluent.ksql.planner.plan.DataSourceNode;
import io.confluent.ksql.planner.plan.PlanNode;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
Expand All @@ -35,7 +36,7 @@ public class KeyedWindowedTableLookupOperator
KeyedWindowedTableLookupOperator.class);

private final Materialization mat;
private final DataSourceNode logicalOperator;
private final DataSourceNode logicalNode;
private final WindowBounds windowBounds;

private List<KsqlPartitionLocation> partitionLocations;
Expand All @@ -51,7 +52,7 @@ public KeyedWindowedTableLookupOperator(
final DataSourceNode logicalNode,
final WindowBounds windowBounds
) {
this.logicalOperator = Objects.requireNonNull(logicalNode, "logicalNode");
this.logicalNode = Objects.requireNonNull(logicalNode, "logicalNode");
this.mat = Objects.requireNonNull(mat, "mat");
this.windowBounds = Objects.requireNonNull(windowBounds, "windowBounds");
}
Expand Down Expand Up @@ -110,6 +111,11 @@ public void close() {

}

@Override
public PlanNode getLogicalNode() {
return logicalNode;
}

@Override
public void addChild(final AbstractPhysicalOperator child) {
throw new UnsupportedOperationException();
Expand Down
Loading

0 comments on commit cb0e6f2

Please sign in to comment.