Skip to content

Commit

Permalink
feat: Rewrites pull query WHERE clause to be in DNF and allow more ex…
Browse files Browse the repository at this point in the history
…pressions (#6874)

* feat: More generic where clause and key extraction
  • Loading branch information
AlanConfluent authored Jan 28, 2021
1 parent fcac459 commit b8e0c99
Show file tree
Hide file tree
Showing 20 changed files with 1,404 additions and 282 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@

package io.confluent.ksql.physical.pull;

import io.confluent.ksql.GenericKey;
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.execution.streams.materialization.Locator.KsqlKey;
import io.confluent.ksql.execution.streams.materialization.Locator.KsqlPartitionLocation;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.physical.pull.operators.AbstractPhysicalOperator;
import io.confluent.ksql.physical.pull.operators.DataSourceOperator;
import io.confluent.ksql.planner.plan.KeyConstraint;
import io.confluent.ksql.planner.plan.KeyConstraint.ConstraintOperator;
import io.confluent.ksql.planner.plan.LookupConstraint;
import io.confluent.ksql.query.PullQueryQueue;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.BiFunction;
Expand All @@ -41,22 +46,22 @@ public class PullPhysicalPlan {
private final AbstractPhysicalOperator root;
private final LogicalSchema schema;
private final QueryId queryId;
private final List<GenericKey> keys;
private final List<LookupConstraint> lookupConstraints;
private final Materialization mat;
private final DataSourceOperator dataSourceOperator;

public PullPhysicalPlan(
final AbstractPhysicalOperator root,
final LogicalSchema schema,
final QueryId queryId,
final List<GenericKey> keys,
final List<LookupConstraint> lookupConstraints,
final Materialization mat,
final DataSourceOperator dataSourceOperator
) {
this.root = Objects.requireNonNull(root, "root");
this.schema = Objects.requireNonNull(schema, "schema");
this.queryId = Objects.requireNonNull(queryId, "queryId");
this.keys = Objects.requireNonNull(keys, "keys");
this.lookupConstraints = Objects.requireNonNull(lookupConstraints, "lookupConstraints");
this.mat = Objects.requireNonNull(mat, "mat");
this.dataSourceOperator = Objects.requireNonNull(
dataSourceOperator, "dataSourceOperator");
Expand Down Expand Up @@ -108,8 +113,27 @@ public Materialization getMaterialization() {
return mat;
}

public List<GenericKey> getKeys() {
return keys;
public List<KsqlKey> getKeys() {
if (requiresRequestsToAllPartitions()) {
return Collections.emptyList();
}
return lookupConstraints.stream()
.filter(lookupConstraint -> lookupConstraint instanceof KeyConstraint)
.map(KeyConstraint.class::cast)
.filter(keyConstraint -> keyConstraint.getConstraintOperator() == ConstraintOperator.EQUAL)
.map(KeyConstraint::getKsqlKey)
.collect(ImmutableList.toImmutableList());
}

private boolean requiresRequestsToAllPartitions() {
return lookupConstraints.stream()
.anyMatch(lookupConstraint -> {
if (lookupConstraint instanceof KeyConstraint) {
final KeyConstraint keyConstraint = (KeyConstraint) lookupConstraint;
return keyConstraint.getConstraintOperator() != ConstraintOperator.EQUAL;
}
return true;
});
}

public LogicalSchema getOutputSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package io.confluent.ksql.physical.pull;

import io.confluent.ksql.GenericKey;
import io.confluent.ksql.analyzer.ImmutableAnalysis;
import io.confluent.ksql.analyzer.PullQueryValidator;
import io.confluent.ksql.execution.context.QueryContext.Stacker;
Expand All @@ -37,18 +36,17 @@
import io.confluent.ksql.planner.LogicalPlanNode;
import io.confluent.ksql.planner.plan.DataSourceNode;
import io.confluent.ksql.planner.plan.KsqlBareOutputNode;
import io.confluent.ksql.planner.plan.LookupConstraint;
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.planner.plan.PullFilterNode;
import io.confluent.ksql.planner.plan.PullFilterNode.WindowBounds;
import io.confluent.ksql.planner.plan.PullProjectNode;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
* Traverses the logical plan top-down and creates a physical plan for pull queries.
Expand All @@ -66,8 +64,7 @@ public class PullPhysicalPlanBuilder {
private final QueryId queryId;
private final Materialization mat;

private List<GenericKey> keys;
private Optional<WindowBounds> windowBounds;
private List<LookupConstraint> lookupConstraints;
private boolean seenSelectOperator = false;

public PullPhysicalPlanBuilder(
Expand Down Expand Up @@ -145,7 +142,7 @@ public PullPhysicalPlan buildPullPhysicalPlan(final LogicalPlanNode logicalPlanN
rootPhysicalOp,
(rootPhysicalOp).getLogicalNode().getSchema(),
queryId,
keys,
lookupConstraints,
mat,
dataSourceOperator);
}
Expand All @@ -165,8 +162,7 @@ private ProjectOperator translateProjectNode(final PullProjectNode logicalNode)
}

private SelectOperator translateFilterNode(final PullFilterNode logicalNode) {
keys = logicalNode.getKeyValues();
windowBounds = logicalNode.getWindowBounds();
lookupConstraints = logicalNode.getLookupConstraints();

final ProcessingLogger logger = processingLogContext
.getLoggerFactory()
Expand All @@ -181,7 +177,7 @@ private AbstractPhysicalOperator translateDataSourceNode(
final DataSourceNode logicalNode
) {
if (!seenSelectOperator) {
keys = Collections.emptyList();
lookupConstraints = Collections.emptyList();
if (!logicalNode.isWindowed()) {
return new TableScanOperator(mat, logicalNode);
} else {
Expand All @@ -191,7 +187,7 @@ private AbstractPhysicalOperator translateDataSourceNode(
if (!logicalNode.isWindowed()) {
return new KeyedTableLookupOperator(mat, logicalNode);
} else {
return new KeyedWindowedTableLookupOperator(mat, logicalNode, windowBounds.get());
return new KeyedWindowedTableLookupOperator(mat, logicalNode);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.GenericKey;
import io.confluent.ksql.execution.streams.materialization.Locator.KsqlKey;
import io.confluent.ksql.execution.streams.materialization.Locator.KsqlPartitionLocation;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.execution.streams.materialization.Row;
Expand Down Expand Up @@ -60,7 +61,7 @@ public void open() {
if (!nextLocation.getKeys().isPresent()) {
throw new IllegalStateException("Table lookup queries should be done with keys");
}
keyIterator = nextLocation.getKeys().get().iterator();
keyIterator = nextLocation.getKeys().get().stream().map(KsqlKey::getKey).iterator();
if (keyIterator.hasNext()) {
nextKey = keyIterator.next();
resultIterator = mat.nonWindowed()
Expand All @@ -85,7 +86,7 @@ public Object next() {
if (!nextLocation.getKeys().isPresent()) {
throw new IllegalStateException("Table lookup queries should be done with keys");
}
keyIterator = nextLocation.getKeys().get().iterator();
keyIterator = nextLocation.getKeys().get().stream().map(KsqlKey::getKey).iterator();
}
nextKey = keyIterator.next();
resultIterator = mat.nonWindowed()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@

package io.confluent.ksql.physical.pull.operators;

import io.confluent.ksql.GenericKey;
import io.confluent.ksql.execution.streams.materialization.Locator.KsqlKey;
import io.confluent.ksql.execution.streams.materialization.Locator.KsqlPartitionLocation;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.execution.streams.materialization.WindowedRow;
import io.confluent.ksql.planner.plan.DataSourceNode;
import io.confluent.ksql.planner.plan.KeyConstraint.KeyConstraintKey;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.planner.plan.PullFilterNode.WindowBounds;
import java.util.Iterator;
Expand All @@ -37,24 +38,21 @@ public class KeyedWindowedTableLookupOperator

private final Materialization mat;
private final DataSourceNode logicalNode;
private final WindowBounds windowBounds;

private List<KsqlPartitionLocation> partitionLocations;
private Iterator<WindowedRow> resultIterator;
private Iterator<GenericKey> keyIterator;
private Iterator<KsqlKey> keyIterator;
private Iterator<KsqlPartitionLocation> partitionLocationIterator;
private KsqlPartitionLocation nextLocation;
private GenericKey nextKey;
private KsqlKey nextKey;


public KeyedWindowedTableLookupOperator(
final Materialization mat,
final DataSourceNode logicalNode,
final WindowBounds windowBounds
final DataSourceNode logicalNode
) {
this.logicalNode = Objects.requireNonNull(logicalNode, "logicalNode");
this.mat = Objects.requireNonNull(mat, "mat");
this.windowBounds = Objects.requireNonNull(windowBounds, "windowBounds");
}

@Override
Expand All @@ -65,11 +63,12 @@ public void open() {
if (!nextLocation.getKeys().isPresent()) {
throw new IllegalStateException("Table windowed queries should be done with keys");
}
keyIterator = nextLocation.getKeys().get().iterator();
keyIterator = nextLocation.getKeys().get().stream().iterator();
if (keyIterator.hasNext()) {
nextKey = keyIterator.next();
final WindowBounds windowBounds = getWindowBounds(nextKey);
resultIterator = mat.windowed().get(
nextKey,
nextKey.getKey(),
nextLocation.getPartition(),
windowBounds.getMergedStart(),
windowBounds.getMergedEnd())
Expand All @@ -95,8 +94,9 @@ public Object next() {
keyIterator = nextLocation.getKeys().get().iterator();
}
nextKey = keyIterator.next();
final WindowBounds windowBounds = getWindowBounds(nextKey);
resultIterator = mat.windowed().get(
nextKey,
nextKey.getKey(),
nextLocation.getPartition(),
windowBounds.getMergedStart(),
windowBounds.getMergedEnd())
Expand All @@ -106,6 +106,19 @@ public Object next() {

}

private static WindowBounds getWindowBounds(final KsqlKey ksqlKey) {
if (!(ksqlKey instanceof KeyConstraintKey)) {
throw new IllegalStateException(String.format("Table windowed queries should be done with "
+ "key constraints: %s", ksqlKey.toString()));
}
final KeyConstraintKey keyConstraintKey = (KeyConstraintKey) ksqlKey;
if (!keyConstraintKey.getWindowBounds().isPresent()) {
throw new IllegalStateException(String.format("Table windowed queries should be done with "
+ "window bounds: %s", ksqlKey.toString()));
}
return keyConstraintKey.getWindowBounds().get();
}

@Override
public void close() {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.planner.plan;

import io.confluent.ksql.GenericKey;
import io.confluent.ksql.execution.streams.materialization.Locator.KsqlKey;
import io.confluent.ksql.planner.plan.PullFilterNode.WindowBounds;
import java.util.Objects;
import java.util.Optional;

/**
* An instance of this class represents what we know about the use of keys in a given disjunct
* from an expression. The key's value, operator associated with it, and window bounds are
* available through the given methods. These are used as hints for the physical planning
* layer about how to fetch the corresponding rows.
*/
public class KeyConstraint implements LookupConstraint {

private final ConstraintOperator operator;
private final GenericKey key;
private final Optional<WindowBounds> windowBounds;

public KeyConstraint(
final ConstraintOperator operator,
final GenericKey key,
final Optional<WindowBounds> windowBounds
) {
this.operator = operator;
this.key = key;
this.windowBounds = windowBounds;
}

public static KeyConstraint equal(
final GenericKey key,
final Optional<WindowBounds> windowBounds
) {
return new KeyConstraint(ConstraintOperator.EQUAL, key, windowBounds);
}

// The key value.
public GenericKey getKey() {
return key;
}

// The constraint operator associated with the value
public ConstraintOperator getConstraintOperator() {
return operator;
}

// Window bounds, if the query is for a windowed table.
public Optional<WindowBounds> getWindowBounds() {
return windowBounds;
}

public KeyConstraintKey getKsqlKey() {
return new KeyConstraintKey(key, windowBounds);
}

// If the operator represents a range of keys
public boolean isRangeOperator() {
return operator != ConstraintOperator.EQUAL;
}

public enum ConstraintOperator {
EQUAL,
LESS_THAN,
LESS_THAN_OR_EQUAL,
GREATER_THAN,
GREATER_THAN_OR_EQUAL
}

public static class KeyConstraintKey implements KsqlKey {

private final GenericKey key;
private final Optional<WindowBounds> windowBounds;

public KeyConstraintKey(final GenericKey key, final Optional<WindowBounds> windowBounds) {
this.key = key;
this.windowBounds = windowBounds;
}

@Override
public GenericKey getKey() {
return key;
}

public Optional<WindowBounds> getWindowBounds() {
return windowBounds;
}

@Override
public int hashCode() {
return Objects.hash(key, windowBounds);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

final KeyConstraintKey that = (KeyConstraintKey) o;
return Objects.equals(this.key, that.key)
&& Objects.equals(this.windowBounds, that.windowBounds);
}

@Override
public String toString() {
return key.toString() + "-" + windowBounds.toString();
}
}
}
Loading

0 comments on commit b8e0c99

Please sign in to comment.