Skip to content

Commit

Permalink
feat: Adds Scalable Push Query physical operators (#7430)
Browse files Browse the repository at this point in the history
* feat: Adds Scalable Push Query physical operators
  • Loading branch information
AlanConfluent authored May 13, 2021
1 parent 7d0fec6 commit 100767d
Show file tree
Hide file tree
Showing 28 changed files with 988 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.physical.pull.operators;
package io.confluent.ksql.physical.common.operators;

import io.confluent.ksql.planner.plan.PlanNode;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.physical.pull.operators;
package io.confluent.ksql.physical.common.operators;

import io.confluent.ksql.GenericKey;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.streams.materialization.TableRow;
import java.util.List;

final class PullPhysicalOperatorUtil {
final class PhysicalOperatorUtil {

private PullPhysicalOperatorUtil() {
private PhysicalOperatorUtil() {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.physical.pull.operators;
package io.confluent.ksql.physical.common.operators;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.GenericRow;
Expand All @@ -26,7 +26,7 @@
import io.confluent.ksql.execution.transform.select.SelectValueMapperFactory.SelectValueMapperFactorySupplier;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.planner.plan.PullProjectNode;
import io.confluent.ksql.planner.plan.QueryProjectNode;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.util.ArrayList;
Expand All @@ -38,15 +38,15 @@ public class ProjectOperator extends AbstractPhysicalOperator implements UnaryPh

private final ProcessingLogger logger;
private final SelectValueMapperFactorySupplier selectValueMapperFactorySupplier;
private final PullProjectNode logicalNode;
private final QueryProjectNode logicalNode;

private AbstractPhysicalOperator child;
private TableRow row;
private KsqlTransformer<Object, GenericRow> transformer;

public ProjectOperator(
final ProcessingLogger logger,
final PullProjectNode logicalNode
final QueryProjectNode logicalNode
) {
this(
logger,
Expand All @@ -58,7 +58,7 @@ public ProjectOperator(
@VisibleForTesting
ProjectOperator(
final ProcessingLogger logger,
final PullProjectNode logicalNode,
final QueryProjectNode logicalNode,
final SelectValueMapperFactorySupplier selectValueMapperFactorySupplier
) {
this.logger = Objects.requireNonNull(logger, "logger");
Expand Down Expand Up @@ -89,7 +89,7 @@ public Object next() {
return null;
}

final GenericRow intermediate = PullPhysicalOperatorUtil.getIntermediateRow(
final GenericRow intermediate = PhysicalOperatorUtil.getIntermediateRow(
row, logicalNode.getAddAdditionalColumnsToIntermediateSchema());

if (logicalNode.getIsSelectStar()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.physical.pull.operators;
package io.confluent.ksql.physical.common.operators;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.GenericRow;
Expand All @@ -26,28 +26,28 @@
import io.confluent.ksql.execution.transform.sqlpredicate.SqlPredicate;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.planner.plan.PullFilterNode;
import io.confluent.ksql.planner.plan.QueryFilterNode;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

public class SelectOperator extends AbstractPhysicalOperator implements UnaryPhysicalOperator {

private final PullFilterNode logicalNode;
private final QueryFilterNode logicalNode;
private final ProcessingLogger logger;
private final SqlPredicate predicate;

private AbstractPhysicalOperator child;
private KsqlTransformer<Object, Optional<GenericRow>> transformer;
private TableRow row;

public SelectOperator(final PullFilterNode logicalNode, final ProcessingLogger logger) {
public SelectOperator(final QueryFilterNode logicalNode, final ProcessingLogger logger) {
this(logicalNode, logger, SqlPredicate::new);
}

@VisibleForTesting
SelectOperator(
final PullFilterNode logicalNode,
final QueryFilterNode logicalNode,
final ProcessingLogger logger,
final SqlPredicateFactory predicateFactory
) {
Expand Down Expand Up @@ -80,7 +80,7 @@ public Object next() {
}

private Optional<TableRow> transformRow(final TableRow tableRow) {
final GenericRow intermediate = PullPhysicalOperatorUtil.getIntermediateRow(
final GenericRow intermediate = PhysicalOperatorUtil.getIntermediateRow(
tableRow, logicalNode.getAddAdditionalColumnsToIntermediateSchema());
return transformer.transform(
tableRow.key(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.physical.pull.operators;
package io.confluent.ksql.physical.common.operators;

/**
* Represents a physical operator of the physical plan that has a single child.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.confluent.ksql.execution.streams.materialization.Locator.KsqlKey;
import io.confluent.ksql.execution.streams.materialization.Locator.KsqlPartitionLocation;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.physical.pull.operators.AbstractPhysicalOperator;
import io.confluent.ksql.physical.common.operators.AbstractPhysicalOperator;
import io.confluent.ksql.physical.pull.operators.DataSourceOperator;
import io.confluent.ksql.planner.plan.KeyConstraint;
import io.confluent.ksql.planner.plan.KeyConstraint.ConstraintOperator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.physical.common.operators.AbstractPhysicalOperator;
import io.confluent.ksql.physical.common.operators.ProjectOperator;
import io.confluent.ksql.physical.common.operators.SelectOperator;
import io.confluent.ksql.physical.pull.PullPhysicalPlan.PullPhysicalPlanType;
import io.confluent.ksql.physical.pull.PullPhysicalPlan.PullSourceType;
import io.confluent.ksql.physical.pull.operators.AbstractPhysicalOperator;
import io.confluent.ksql.physical.pull.operators.DataSourceOperator;
import io.confluent.ksql.physical.pull.operators.KeyedTableLookupOperator;
import io.confluent.ksql.physical.pull.operators.KeyedWindowedTableLookupOperator;
import io.confluent.ksql.physical.pull.operators.ProjectOperator;
import io.confluent.ksql.physical.pull.operators.SelectOperator;
import io.confluent.ksql.physical.pull.operators.TableScanOperator;
import io.confluent.ksql.physical.pull.operators.WindowedTableScanOperator;
import io.confluent.ksql.planner.LogicalPlanNode;
Expand All @@ -42,8 +42,8 @@
import io.confluent.ksql.planner.plan.NonKeyConstraint;
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.planner.plan.PullFilterNode;
import io.confluent.ksql.planner.plan.PullProjectNode;
import io.confluent.ksql.planner.plan.QueryFilterNode;
import io.confluent.ksql.planner.plan.QueryProjectNode;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.PersistentQueryMetadata;
Expand Down Expand Up @@ -109,10 +109,10 @@ public PullPhysicalPlan buildPullPhysicalPlan(final LogicalPlanNode logicalPlanN
AbstractPhysicalOperator rootPhysicalOp = null;
while (true) {
AbstractPhysicalOperator currentPhysicalOp = null;
if (currentLogicalNode instanceof PullProjectNode) {
currentPhysicalOp = translateProjectNode((PullProjectNode)currentLogicalNode);
} else if (currentLogicalNode instanceof PullFilterNode) {
currentPhysicalOp = translateFilterNode((PullFilterNode) currentLogicalNode);
if (currentLogicalNode instanceof QueryProjectNode) {
currentPhysicalOp = translateProjectNode((QueryProjectNode)currentLogicalNode);
} else if (currentLogicalNode instanceof QueryFilterNode) {
currentPhysicalOp = translateFilterNode((QueryFilterNode) currentLogicalNode);
seenSelectOperator = true;
} else if (currentLogicalNode instanceof DataSourceNode) {
currentPhysicalOp = translateDataSourceNode(
Expand Down Expand Up @@ -154,7 +154,7 @@ public PullPhysicalPlan buildPullPhysicalPlan(final LogicalPlanNode logicalPlanN
dataSourceOperator);
}

private ProjectOperator translateProjectNode(final PullProjectNode logicalNode) {
private ProjectOperator translateProjectNode(final QueryProjectNode logicalNode) {
final ProcessingLogger logger = processingLogContext
.getLoggerFactory()
.getLogger(
Expand All @@ -168,7 +168,7 @@ private ProjectOperator translateProjectNode(final PullProjectNode logicalNode)
);
}

private SelectOperator translateFilterNode(final PullFilterNode logicalNode) {
private SelectOperator translateFilterNode(final QueryFilterNode logicalNode) {
lookupConstraints = logicalNode.getLookupConstraints();

final ProcessingLogger logger = processingLogContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.confluent.ksql.execution.streams.materialization.Locator.KsqlPartitionLocation;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.execution.streams.materialization.Row;
import io.confluent.ksql.physical.common.operators.AbstractPhysicalOperator;
import io.confluent.ksql.physical.common.operators.UnaryPhysicalOperator;
import io.confluent.ksql.planner.plan.DataSourceNode;
import io.confluent.ksql.planner.plan.PlanNode;
import java.util.Iterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import io.confluent.ksql.execution.streams.materialization.Locator.KsqlPartitionLocation;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.execution.streams.materialization.WindowedRow;
import io.confluent.ksql.physical.common.operators.AbstractPhysicalOperator;
import io.confluent.ksql.physical.common.operators.UnaryPhysicalOperator;
import io.confluent.ksql.planner.plan.DataSourceNode;
import io.confluent.ksql.planner.plan.KeyConstraint.KeyConstraintKey;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.planner.plan.PullFilterNode.WindowBounds;
import io.confluent.ksql.planner.plan.QueryFilterNode.WindowBounds;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import io.confluent.ksql.execution.streams.materialization.Locator.KsqlPartitionLocation;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.execution.streams.materialization.Row;
import io.confluent.ksql.physical.common.operators.AbstractPhysicalOperator;
import io.confluent.ksql.physical.common.operators.UnaryPhysicalOperator;
import io.confluent.ksql.planner.plan.DataSourceNode;
import io.confluent.ksql.planner.plan.PlanNode;
import java.util.Iterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.confluent.ksql.execution.streams.materialization.Locator.KsqlPartitionLocation;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.execution.streams.materialization.WindowedRow;
import io.confluent.ksql.physical.common.operators.AbstractPhysicalOperator;
import io.confluent.ksql.physical.common.operators.UnaryPhysicalOperator;
import io.confluent.ksql.planner.plan.DataSourceNode;
import io.confluent.ksql.planner.plan.PlanNode;
import java.util.Iterator;
Expand Down
Loading

0 comments on commit 100767d

Please sign in to comment.