Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize key-range queries in pull queries (#6105) #7993

Merged
merged 1 commit into from
Sep 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public static GenericKey fromList(final List<?> columns) {
return builder(columns.size()).appendAll(columns).build();
}

public static GenericKey fromArray(final Object[] columns) {
return fromList(Arrays.asList(columns));
}

private GenericKey(final List<Object> values) {
this.values = Collections.unmodifiableList(values);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,12 @@ public class KsqlConfig extends AbstractConfig {
"Config to enable pull queries on streams";
public static final boolean KSQL_QUERY_STREAM_PULL_QUERY_ENABLED_DEFAULT = true;

public static final String KSQL_QUERY_PULL_RANGE_SCAN_ENABLED
= "ksql.query.pull.range.scan.enabled";
public static final String KSQL_QUERY_PULL_RANGE_SCAN_ENABLED_DOC =
"Config to enable range scans on table for pull queries";
public static final boolean KSQL_QUERY_PULL_RANGE_SCAN_ENABLED_DEFAULT = true;

public static final String KSQL_QUERY_PULL_INTERPRETER_ENABLED
= "ksql.query.pull.interpreter.enabled";
public static final String KSQL_QUERY_PULL_INTERPRETER_ENABLED_DOC =
Expand Down Expand Up @@ -939,6 +945,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_QUERY_STREAM_PULL_QUERY_ENABLED_DOC
)
.define(
KSQL_QUERY_PULL_RANGE_SCAN_ENABLED,
Type.BOOLEAN,
KSQL_QUERY_PULL_RANGE_SCAN_ENABLED_DEFAULT,
Importance.LOW,
KSQL_QUERY_PULL_RANGE_SCAN_ENABLED_DOC
)
.define(
KSQL_QUERY_PULL_INTERPRETER_ENABLED,
Type.BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ PullQueryResult executeTablePullQuery(

plan = buildPullPhysicalPlan(
logicalPlan,
analysis
analysis,
queryPlannerOptions
);
final PullPhysicalPlan physicalPlan = plan;

Expand Down Expand Up @@ -649,13 +650,15 @@ private PushPhysicalPlan buildScalablePushPhysicalPlan(

private PullPhysicalPlan buildPullPhysicalPlan(
final LogicalPlanNode logicalPlan,
final ImmutableAnalysis analysis
final ImmutableAnalysis analysis,
final QueryPlannerOptions queryPlannerOptions
) {

final PullPhysicalPlanBuilder builder = new PullPhysicalPlanBuilder(
engineContext.getProcessingLogContext(),
PullQueryExecutionUtil.findMaterializingQuery(engineContext, analysis),
analysis
analysis,
queryPlannerOptions
);
return builder.buildPullPhysicalPlan(logicalPlan);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.confluent.ksql.execution.streams.materialization.MaterializationException;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.physical.pull.PullPhysicalPlan.PullPhysicalPlanType;
import io.confluent.ksql.query.PullQueryQueue;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.client.RestResponse;
Expand Down Expand Up @@ -111,7 +112,8 @@ public CompletableFuture<Void> handlePullQuery(
.locate(
pullPhysicalPlan.getKeys(),
routingOptions,
routingFilterFactory
routingFilterFactory,
pullPhysicalPlan.getPlanType() == PullPhysicalPlanType.RANGE_SCAN
);

final Map<Integer, List<Host>> emptyPartitions = allLocations.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import io.confluent.ksql.physical.common.operators.AbstractPhysicalOperator;
import io.confluent.ksql.physical.pull.operators.DataSourceOperator;
import io.confluent.ksql.planner.plan.KeyConstraint;
import io.confluent.ksql.planner.plan.KeyConstraint.ConstraintOperator;
import io.confluent.ksql.planner.plan.LookupConstraint;
import io.confluent.ksql.query.PullQueryQueue;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -123,26 +123,17 @@ public Materialization getMaterialization() {
}

public List<KsqlKey> getKeys() {
if (requiresRequestsToAllPartitions()) {
return Collections.emptyList();
final List<KsqlKey> list = new ArrayList<>();
for (LookupConstraint c : lookupConstraints) {
if (c instanceof KeyConstraint) {
final KeyConstraint kc = (KeyConstraint) c;
patrickstuedi marked this conversation as resolved.
Show resolved Hide resolved
list.add(kc.getKsqlKey());
} else {
//we shouldn't see any NonKeyContraints here
return Collections.emptyList();
}
}
return lookupConstraints.stream()
.filter(lookupConstraint -> lookupConstraint instanceof KeyConstraint)
.map(KeyConstraint.class::cast)
.filter(keyConstraint -> keyConstraint.getConstraintOperator() == ConstraintOperator.EQUAL)
.map(KeyConstraint::getKsqlKey)
.collect(ImmutableList.toImmutableList());
}

private boolean requiresRequestsToAllPartitions() {
return lookupConstraints.stream()
.anyMatch(lookupConstraint -> {
if (lookupConstraint instanceof KeyConstraint) {
final KeyConstraint keyConstraint = (KeyConstraint) lookupConstraint;
return keyConstraint.getConstraintOperator() != ConstraintOperator.EQUAL;
patrickstuedi marked this conversation as resolved.
Show resolved Hide resolved
}
return true;
});
return ImmutableList.copyOf(list);
patrickstuedi marked this conversation as resolved.
Show resolved Hide resolved
}

public LogicalSchema getOutputSchema() {
Expand Down Expand Up @@ -172,6 +163,7 @@ public QueryId getQueryId() {
public enum PullPhysicalPlanType {
// Could be one or more keys
KEY_LOOKUP,
RANGE_SCAN,
TABLE_SCAN
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
import io.confluent.ksql.physical.pull.operators.TableScanOperator;
import io.confluent.ksql.physical.pull.operators.WindowedTableScanOperator;
import io.confluent.ksql.planner.LogicalPlanNode;
import io.confluent.ksql.planner.QueryPlannerOptions;
import io.confluent.ksql.planner.plan.DataSourceNode;
import io.confluent.ksql.planner.plan.KeyConstraint;
import io.confluent.ksql.planner.plan.KeyConstraint.ConstraintOperator;
import io.confluent.ksql.planner.plan.KsqlBareOutputNode;
import io.confluent.ksql.planner.plan.LookupConstraint;
import io.confluent.ksql.planner.plan.NonKeyConstraint;
Expand Down Expand Up @@ -66,6 +69,7 @@ public class PullPhysicalPlanBuilder {
private final PersistentQueryMetadata persistentQueryMetadata;
private final QueryId queryId;
private final Materialization mat;
private final QueryPlannerOptions queryPlannerOptions;

private List<LookupConstraint> lookupConstraints;
private PullPhysicalPlanType pullPhysicalPlanType;
Expand All @@ -75,7 +79,8 @@ public class PullPhysicalPlanBuilder {
public PullPhysicalPlanBuilder(
final ProcessingLogContext processingLogContext,
final PersistentQueryMetadata persistentQueryMetadata,
final ImmutableAnalysis analysis
final ImmutableAnalysis analysis,
final QueryPlannerOptions queryPlannerOptions
) {
this.processingLogContext = Objects.requireNonNull(
processingLogContext, "processingLogContext");
Expand All @@ -86,6 +91,7 @@ public PullPhysicalPlanBuilder(
mat = this.persistentQueryMetadata
.getMaterialization(queryId, contextStacker)
.orElseThrow(() -> notMaterializedException(getSourceName(analysis)));
this.queryPlannerOptions = queryPlannerOptions;
}

/**
Expand Down Expand Up @@ -180,27 +186,53 @@ private SelectOperator translateFilterNode(final QueryFilterNode logicalNode) {
return new SelectOperator(logicalNode, logger);
}

private AbstractPhysicalOperator translateDataSourceNode(
final DataSourceNode logicalNode
) {
boolean isTableScan = false;
private PullPhysicalPlanType getPlanType() {
if (!seenSelectOperator) {
lookupConstraints = Collections.emptyList();
isTableScan = true;
return PullPhysicalPlanType.TABLE_SCAN;
} else if (lookupConstraints.stream().anyMatch(lc -> lc instanceof NonKeyConstraint)) {
isTableScan = true;
lookupConstraints = Collections.emptyList();
return PullPhysicalPlanType.TABLE_SCAN;
} else if (lookupConstraints.stream().allMatch(lc -> ((KeyConstraint) lc).getOperator()
== ConstraintOperator.EQUAL)) {
return PullPhysicalPlanType.KEY_LOOKUP;
} else if (lookupConstraints.size() == 1
&& lookupConstraints.stream().allMatch(lc -> ((KeyConstraint) lc).getOperator()
!= ConstraintOperator.EQUAL)) {
return PullPhysicalPlanType.RANGE_SCAN;
} else {
lookupConstraints = Collections.emptyList();
return PullPhysicalPlanType.TABLE_SCAN;
}
}

private AbstractPhysicalOperator translateDataSourceNode(
final DataSourceNode logicalNode
) {
pullPhysicalPlanType = getPlanType();
if (pullPhysicalPlanType == PullPhysicalPlanType.RANGE_SCAN
&& (!queryPlannerOptions.getRangeScansEnabled() || logicalNode.isWindowed())) {
pullPhysicalPlanType = PullPhysicalPlanType.TABLE_SCAN;
}
if (pullPhysicalPlanType == PullPhysicalPlanType.TABLE_SCAN) {
if (queryPlannerOptions.getTableScansEnabled()) {
lookupConstraints = Collections.emptyList();
} else {
throw new KsqlException("Query requires table scan to be enabled. Table scans can be"
+ " enabled by setting ksql.query.pull.table.scan.enabled=true");
}
}

pullSourceType = logicalNode.isWindowed()
? PullSourceType.WINDOWED : PullSourceType.NON_WINDOWED;
if (isTableScan) {
pullPhysicalPlanType = PullPhysicalPlanType.TABLE_SCAN;
if (pullPhysicalPlanType == PullPhysicalPlanType.TABLE_SCAN) {
if (!logicalNode.isWindowed()) {
return new TableScanOperator(mat, logicalNode);
} else {
return new WindowedTableScanOperator(mat, logicalNode);
}
}
pullPhysicalPlanType = PullPhysicalPlanType.KEY_LOOKUP;

if (!logicalNode.isWindowed()) {
return new KeyedTableLookupOperator(mat, logicalNode);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.confluent.ksql.physical.common.operators.AbstractPhysicalOperator;
import io.confluent.ksql.physical.common.operators.UnaryPhysicalOperator;
import io.confluent.ksql.planner.plan.DataSourceNode;
import io.confluent.ksql.planner.plan.KeyConstraint;
import io.confluent.ksql.planner.plan.KeyConstraint.ConstraintOperator;
import io.confluent.ksql.planner.plan.PlanNode;
import java.util.Iterator;
import java.util.List;
Expand All @@ -43,10 +45,10 @@ public class KeyedTableLookupOperator

private ImmutableList<KsqlPartitionLocation> partitionLocations;
private Iterator<Row> resultIterator;
private Iterator<GenericKey> keyIterator;
private Iterator<KsqlKey> keyIterator;
private Iterator<KsqlPartitionLocation> partitionLocationIterator;
private KsqlPartitionLocation nextLocation;
private GenericKey nextKey;
private KsqlKey nextKey;
private long returnedRows = 0;

public KeyedTableLookupOperator(
Expand All @@ -65,13 +67,10 @@ public void open() {
if (!nextLocation.getKeys().isPresent()) {
throw new IllegalStateException("Table lookup queries should be done with keys");
}
keyIterator = nextLocation.getKeys().get().stream().map(KsqlKey::getKey).iterator();
keyIterator = nextLocation.getKeys().get().stream().iterator();
if (keyIterator.hasNext()) {
nextKey = keyIterator.next();
resultIterator = mat.nonWindowed()
.get(nextKey, nextLocation.getPartition())
.map(ImmutableList::of)
.orElse(ImmutableList.of()).iterator();
resultIterator = getMatIterator(nextKey);
}
}
}
Expand All @@ -90,19 +89,49 @@ public Object next() {
if (!nextLocation.getKeys().isPresent()) {
throw new IllegalStateException("Table lookup queries should be done with keys");
}
keyIterator = nextLocation.getKeys().get().stream().map(KsqlKey::getKey).iterator();
keyIterator = nextLocation.getKeys().get().stream().iterator();
}
nextKey = keyIterator.next();
resultIterator = mat.nonWindowed()
.get(nextKey, nextLocation.getPartition())
.map(ImmutableList::of)
.orElse(ImmutableList.of()).iterator();
resultIterator = getMatIterator(nextKey);
}

returnedRows++;
return resultIterator.next();
}

private Iterator<Row> getMatIterator(final KsqlKey ksqlKey) {
if (!(nextKey instanceof KeyConstraint)) {
throw new IllegalStateException(String.format("Keyed lookup queries should be done with "
+ "key constraints: %s", ksqlKey.toString()));
}
final KeyConstraint keyConstraintKey = (KeyConstraint) ksqlKey;
if (keyConstraintKey.getOperator() == ConstraintOperator.EQUAL) {
return mat.nonWindowed()
.get(ksqlKey.getKey(), nextLocation.getPartition())
.map(ImmutableList::of)
.orElse(ImmutableList.of()).iterator();
} else if (keyConstraintKey.getOperator() == ConstraintOperator.GREATER_THAN
patrickstuedi marked this conversation as resolved.
Show resolved Hide resolved
|| keyConstraintKey.getOperator() == ConstraintOperator.GREATER_THAN_OR_EQUAL) {
//Underlying store will always return keys inclusive the endpoints
//and filtering is used to trim start and end of the range in case of ">"
final GenericKey fromKey = keyConstraintKey.getKey();
final GenericKey toKey = null;
patrickstuedi marked this conversation as resolved.
Show resolved Hide resolved
return mat.nonWindowed()
.get(nextLocation.getPartition(), fromKey, toKey);
} else if (keyConstraintKey.getOperator() == ConstraintOperator.LESS_THAN
|| keyConstraintKey.getOperator() == ConstraintOperator.LESS_THAN_OR_EQUAL) {
//Underlying store will always return keys inclusive the endpoints
//and filtering is used to trim start and end of the range in case of "<"
final GenericKey fromKey = null;
final GenericKey toKey = keyConstraintKey.getKey();
return mat.nonWindowed()
.get(nextLocation.getPartition(), fromKey, toKey);
} else {
throw new IllegalStateException(String.format("Invalid comparator type "
+ keyConstraintKey.getOperator()));
}
}

@Override
public void close() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.confluent.ksql.physical.common.operators.AbstractPhysicalOperator;
import io.confluent.ksql.physical.common.operators.UnaryPhysicalOperator;
import io.confluent.ksql.planner.plan.DataSourceNode;
import io.confluent.ksql.planner.plan.KeyConstraint.KeyConstraintKey;
import io.confluent.ksql.planner.plan.KeyConstraint;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.planner.plan.QueryFilterNode.WindowBounds;
import java.util.Iterator;
Expand Down Expand Up @@ -112,11 +112,11 @@ public Object next() {
}

private static WindowBounds getWindowBounds(final KsqlKey ksqlKey) {
if (!(ksqlKey instanceof KeyConstraintKey)) {
if (!(ksqlKey instanceof KeyConstraint)) {
throw new IllegalStateException(String.format("Table windowed queries should be done with "
+ "key constraints: %s", ksqlKey.toString()));
}
final KeyConstraintKey keyConstraintKey = (KeyConstraintKey) ksqlKey;
final KeyConstraint keyConstraintKey = (KeyConstraint) ksqlKey;
if (!keyConstraintKey.getWindowBounds().isPresent()) {
throw new IllegalStateException(String.format("Table windowed queries should be done with "
+ "window bounds: %s", ksqlKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public interface QueryPlannerOptions {

boolean getInterpreterEnabled();

boolean getRangeScansEnabled();

/**
* @return a human readable representation of the {@code QueryPlannerOptions},
* used to debug requests
Expand Down
Loading