From d7926fcf5ce887c718457ea7d43299ebdd35d9d9 Mon Sep 17 00:00:00 2001 From: Patrick Stuedi Date: Thu, 12 Aug 2021 16:55:41 +0200 Subject: [PATCH] feat: optimize key-range queries in pull queries implementation of KLIP-54 --- .../java/io/confluent/ksql/GenericKey.java | 4 + .../io/confluent/ksql/util/KsqlConfig.java | 13 + .../confluent/ksql/engine/EngineExecutor.java | 9 +- .../ksql/physical/pull/HARouting.java | 4 +- .../ksql/physical/pull/PullPhysicalPlan.java | 32 +- .../pull/PullPhysicalPlanBuilder.java | 52 +- .../operators/KeyedTableLookupOperator.java | 53 +- .../KeyedWindowedTableLookupOperator.java | 6 +- .../ksql/planner/QueryPlannerOptions.java | 2 + .../ksql/planner/plan/KeyConstraint.java | 76 +-- .../ksql/planner/plan/QueryFilterNode.java | 93 ++- .../ksql/physical/pull/HARoutingTest.java | 3 +- .../KeyedTableLookupOperatorTest.java | 63 +- .../KeyedWindowedTableLookupOperatorTest.java | 10 +- .../planner/plan/QueryFilterNodeTest.java | 138 +++- ...eries-against-materialized-aggregates.json | 12 +- .../pull-queries-with-range-scan.json | 635 ++++++++++++++++++ .../PullQueryConfigPlannerOptions.java | 8 + .../PushQueryConfigPlannerOptions.java | 5 + .../integration/PullQueryFunctionalTest.java | 1 + .../materialization/KsqlMaterialization.java | 15 + .../streams/materialization/Locator.java | 3 +- .../materialization/MaterializedTable.java | 10 + .../streams/materialization/ks/KsLocator.java | 17 +- .../ks/KsMaterializedTable.java | 18 + .../materialization/ks/KsLocatorTest.java | 82 ++- 26 files changed, 1172 insertions(+), 192 deletions(-) create mode 100644 ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-with-range-scan.json diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/GenericKey.java b/ksqldb-common/src/main/java/io/confluent/ksql/GenericKey.java index 425168f91b0b..9dfe708def15 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/GenericKey.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/GenericKey.java @@ -48,6 +48,10 @@ public static GenericKey fromList(final List columns) { return builder(columns.size()).appendAll(columns).build(); } + public static GenericKey fromArray(final Object[] columns) { + return fromList(Arrays.asList(columns)); + } + private GenericKey(final List values) { this.values = Collections.unmodifiableList(values); } diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index f4e1f542ec7b..d3f9cf0117e7 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -276,6 +276,12 @@ public class KsqlConfig extends AbstractConfig { "Config to enable pull queries on streams"; public static final boolean KSQL_QUERY_STREAM_PULL_QUERY_ENABLED_DEFAULT = true; + public static final String KSQL_QUERY_PULL_RANGE_SCAN_ENABLED + = "ksql.query.pull.range.scan.enabled"; + public static final String KSQL_QUERY_PULL_RANGE_SCAN_ENABLED_DOC = + "Config to enable range scans on table for pull queries"; + public static final boolean KSQL_QUERY_PULL_RANGE_SCAN_ENABLED_DEFAULT = true; + public static final String KSQL_QUERY_PULL_INTERPRETER_ENABLED = "ksql.query.pull.interpreter.enabled"; public static final String KSQL_QUERY_PULL_INTERPRETER_ENABLED_DOC = @@ -939,6 +945,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { Importance.LOW, KSQL_QUERY_STREAM_PULL_QUERY_ENABLED_DOC ) + .define( + KSQL_QUERY_PULL_RANGE_SCAN_ENABLED, + Type.BOOLEAN, + KSQL_QUERY_PULL_RANGE_SCAN_ENABLED_DEFAULT, + Importance.LOW, + KSQL_QUERY_PULL_RANGE_SCAN_ENABLED_DOC + ) .define( KSQL_QUERY_PULL_INTERPRETER_ENABLED, Type.BOOLEAN, diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java index 6fa5f3394290..5ffae2b2b7c7 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java @@ -243,7 +243,8 @@ PullQueryResult executeTablePullQuery( plan = buildPullPhysicalPlan( logicalPlan, - analysis + analysis, + queryPlannerOptions ); final PullPhysicalPlan physicalPlan = plan; @@ -649,13 +650,15 @@ private PushPhysicalPlan buildScalablePushPhysicalPlan( private PullPhysicalPlan buildPullPhysicalPlan( final LogicalPlanNode logicalPlan, - final ImmutableAnalysis analysis + final ImmutableAnalysis analysis, + final QueryPlannerOptions queryPlannerOptions ) { final PullPhysicalPlanBuilder builder = new PullPhysicalPlanBuilder( engineContext.getProcessingLogContext(), PullQueryExecutionUtil.findMaterializingQuery(engineContext, analysis), - analysis + analysis, + queryPlannerOptions ); return builder.buildPullPhysicalPlan(logicalPlan); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java index db753ae89a33..b2c8f4903224 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java @@ -28,6 +28,7 @@ import io.confluent.ksql.execution.streams.materialization.MaterializationException; import io.confluent.ksql.internal.PullQueryExecutorMetrics; import io.confluent.ksql.parser.tree.Query; +import io.confluent.ksql.physical.pull.PullPhysicalPlan.PullPhysicalPlanType; import io.confluent.ksql.query.PullQueryQueue; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.client.RestResponse; @@ -111,7 +112,8 @@ public CompletableFuture handlePullQuery( .locate( pullPhysicalPlan.getKeys(), routingOptions, - routingFilterFactory + routingFilterFactory, + pullPhysicalPlan.getPlanType() == PullPhysicalPlanType.RANGE_SCAN ); final Map> emptyPartitions = allLocations.stream() diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullPhysicalPlan.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullPhysicalPlan.java index 6e88a8af7469..24b6b3bd95d6 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullPhysicalPlan.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullPhysicalPlan.java @@ -23,11 +23,11 @@ import io.confluent.ksql.physical.common.operators.AbstractPhysicalOperator; import io.confluent.ksql.physical.pull.operators.DataSourceOperator; import io.confluent.ksql.planner.plan.KeyConstraint; -import io.confluent.ksql.planner.plan.KeyConstraint.ConstraintOperator; import io.confluent.ksql.planner.plan.LookupConstraint; import io.confluent.ksql.query.PullQueryQueue; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.LogicalSchema; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -123,26 +123,17 @@ public Materialization getMaterialization() { } public List getKeys() { - if (requiresRequestsToAllPartitions()) { - return Collections.emptyList(); + final List list = new ArrayList<>(); + for (LookupConstraint c : lookupConstraints) { + if (c instanceof KeyConstraint) { + final KeyConstraint kc = (KeyConstraint) c; + list.add(kc.getKsqlKey()); + } else { + //we shouldn't see any NonKeyContraints here + return Collections.emptyList(); + } } - return lookupConstraints.stream() - .filter(lookupConstraint -> lookupConstraint instanceof KeyConstraint) - .map(KeyConstraint.class::cast) - .filter(keyConstraint -> keyConstraint.getConstraintOperator() == ConstraintOperator.EQUAL) - .map(KeyConstraint::getKsqlKey) - .collect(ImmutableList.toImmutableList()); - } - - private boolean requiresRequestsToAllPartitions() { - return lookupConstraints.stream() - .anyMatch(lookupConstraint -> { - if (lookupConstraint instanceof KeyConstraint) { - final KeyConstraint keyConstraint = (KeyConstraint) lookupConstraint; - return keyConstraint.getConstraintOperator() != ConstraintOperator.EQUAL; - } - return true; - }); + return ImmutableList.copyOf(list); } public LogicalSchema getOutputSchema() { @@ -172,6 +163,7 @@ public QueryId getQueryId() { public enum PullPhysicalPlanType { // Could be one or more keys KEY_LOOKUP, + RANGE_SCAN, TABLE_SCAN } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullPhysicalPlanBuilder.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullPhysicalPlanBuilder.java index 2370923028bb..6714a9404edd 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullPhysicalPlanBuilder.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullPhysicalPlanBuilder.java @@ -36,7 +36,10 @@ import io.confluent.ksql.physical.pull.operators.TableScanOperator; import io.confluent.ksql.physical.pull.operators.WindowedTableScanOperator; import io.confluent.ksql.planner.LogicalPlanNode; +import io.confluent.ksql.planner.QueryPlannerOptions; import io.confluent.ksql.planner.plan.DataSourceNode; +import io.confluent.ksql.planner.plan.KeyConstraint; +import io.confluent.ksql.planner.plan.KeyConstraint.ConstraintOperator; import io.confluent.ksql.planner.plan.KsqlBareOutputNode; import io.confluent.ksql.planner.plan.LookupConstraint; import io.confluent.ksql.planner.plan.NonKeyConstraint; @@ -66,6 +69,7 @@ public class PullPhysicalPlanBuilder { private final PersistentQueryMetadata persistentQueryMetadata; private final QueryId queryId; private final Materialization mat; + private final QueryPlannerOptions queryPlannerOptions; private List lookupConstraints; private PullPhysicalPlanType pullPhysicalPlanType; @@ -75,7 +79,8 @@ public class PullPhysicalPlanBuilder { public PullPhysicalPlanBuilder( final ProcessingLogContext processingLogContext, final PersistentQueryMetadata persistentQueryMetadata, - final ImmutableAnalysis analysis + final ImmutableAnalysis analysis, + final QueryPlannerOptions queryPlannerOptions ) { this.processingLogContext = Objects.requireNonNull( processingLogContext, "processingLogContext"); @@ -86,6 +91,7 @@ public PullPhysicalPlanBuilder( mat = this.persistentQueryMetadata .getMaterialization(queryId, contextStacker) .orElseThrow(() -> notMaterializedException(getSourceName(analysis))); + this.queryPlannerOptions = queryPlannerOptions; } /** @@ -180,27 +186,53 @@ private SelectOperator translateFilterNode(final QueryFilterNode logicalNode) { return new SelectOperator(logicalNode, logger); } - private AbstractPhysicalOperator translateDataSourceNode( - final DataSourceNode logicalNode - ) { - boolean isTableScan = false; + private PullPhysicalPlanType getPlanType() { if (!seenSelectOperator) { lookupConstraints = Collections.emptyList(); - isTableScan = true; + return PullPhysicalPlanType.TABLE_SCAN; } else if (lookupConstraints.stream().anyMatch(lc -> lc instanceof NonKeyConstraint)) { - isTableScan = true; + lookupConstraints = Collections.emptyList(); + return PullPhysicalPlanType.TABLE_SCAN; + } else if (lookupConstraints.stream().allMatch(lc -> ((KeyConstraint) lc).getOperator() + == ConstraintOperator.EQUAL)) { + return PullPhysicalPlanType.KEY_LOOKUP; + } else if (lookupConstraints.size() == 1 + && lookupConstraints.stream().allMatch(lc -> ((KeyConstraint) lc).getOperator() + != ConstraintOperator.EQUAL)) { + return PullPhysicalPlanType.RANGE_SCAN; + } else { + lookupConstraints = Collections.emptyList(); + return PullPhysicalPlanType.TABLE_SCAN; } + } + + private AbstractPhysicalOperator translateDataSourceNode( + final DataSourceNode logicalNode + ) { + pullPhysicalPlanType = getPlanType(); + if (pullPhysicalPlanType == PullPhysicalPlanType.RANGE_SCAN + && (!queryPlannerOptions.getRangeScansEnabled() || logicalNode.isWindowed())) { + pullPhysicalPlanType = PullPhysicalPlanType.TABLE_SCAN; + } + if (pullPhysicalPlanType == PullPhysicalPlanType.TABLE_SCAN) { + if (queryPlannerOptions.getTableScansEnabled()) { + lookupConstraints = Collections.emptyList(); + } else { + throw new KsqlException("Query requires table scan to be enabled. Table scans can be" + + " enabled by setting ksql.query.pull.table.scan.enabled=true"); + } + } + pullSourceType = logicalNode.isWindowed() ? PullSourceType.WINDOWED : PullSourceType.NON_WINDOWED; - if (isTableScan) { - pullPhysicalPlanType = PullPhysicalPlanType.TABLE_SCAN; + if (pullPhysicalPlanType == PullPhysicalPlanType.TABLE_SCAN) { if (!logicalNode.isWindowed()) { return new TableScanOperator(mat, logicalNode); } else { return new WindowedTableScanOperator(mat, logicalNode); } } - pullPhysicalPlanType = PullPhysicalPlanType.KEY_LOOKUP; + if (!logicalNode.isWindowed()) { return new KeyedTableLookupOperator(mat, logicalNode); } else { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/operators/KeyedTableLookupOperator.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/operators/KeyedTableLookupOperator.java index cac72ec76c10..6e034b0a6c66 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/operators/KeyedTableLookupOperator.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/operators/KeyedTableLookupOperator.java @@ -25,6 +25,8 @@ import io.confluent.ksql.physical.common.operators.AbstractPhysicalOperator; import io.confluent.ksql.physical.common.operators.UnaryPhysicalOperator; import io.confluent.ksql.planner.plan.DataSourceNode; +import io.confluent.ksql.planner.plan.KeyConstraint; +import io.confluent.ksql.planner.plan.KeyConstraint.ConstraintOperator; import io.confluent.ksql.planner.plan.PlanNode; import java.util.Iterator; import java.util.List; @@ -43,10 +45,10 @@ public class KeyedTableLookupOperator private ImmutableList partitionLocations; private Iterator resultIterator; - private Iterator keyIterator; + private Iterator keyIterator; private Iterator partitionLocationIterator; private KsqlPartitionLocation nextLocation; - private GenericKey nextKey; + private KsqlKey nextKey; private long returnedRows = 0; public KeyedTableLookupOperator( @@ -65,13 +67,10 @@ public void open() { if (!nextLocation.getKeys().isPresent()) { throw new IllegalStateException("Table lookup queries should be done with keys"); } - keyIterator = nextLocation.getKeys().get().stream().map(KsqlKey::getKey).iterator(); + keyIterator = nextLocation.getKeys().get().stream().iterator(); if (keyIterator.hasNext()) { nextKey = keyIterator.next(); - resultIterator = mat.nonWindowed() - .get(nextKey, nextLocation.getPartition()) - .map(ImmutableList::of) - .orElse(ImmutableList.of()).iterator(); + resultIterator = getMatIterator(nextKey); } } } @@ -90,19 +89,49 @@ public Object next() { if (!nextLocation.getKeys().isPresent()) { throw new IllegalStateException("Table lookup queries should be done with keys"); } - keyIterator = nextLocation.getKeys().get().stream().map(KsqlKey::getKey).iterator(); + keyIterator = nextLocation.getKeys().get().stream().iterator(); } nextKey = keyIterator.next(); - resultIterator = mat.nonWindowed() - .get(nextKey, nextLocation.getPartition()) - .map(ImmutableList::of) - .orElse(ImmutableList.of()).iterator(); + resultIterator = getMatIterator(nextKey); } returnedRows++; return resultIterator.next(); } + private Iterator getMatIterator(final KsqlKey ksqlKey) { + if (!(nextKey instanceof KeyConstraint)) { + throw new IllegalStateException(String.format("Keyed lookup queries should be done with " + + "key constraints: %s", ksqlKey.toString())); + } + final KeyConstraint keyConstraintKey = (KeyConstraint) ksqlKey; + if (keyConstraintKey.getOperator() == ConstraintOperator.EQUAL) { + return mat.nonWindowed() + .get(ksqlKey.getKey(), nextLocation.getPartition()) + .map(ImmutableList::of) + .orElse(ImmutableList.of()).iterator(); + } else if (keyConstraintKey.getOperator() == ConstraintOperator.GREATER_THAN + || keyConstraintKey.getOperator() == ConstraintOperator.GREATER_THAN_OR_EQUAL) { + //Underlying store will always return keys inclusive the endpoints + //and filtering is used to trim start and end of the range in case of ">" + final GenericKey fromKey = keyConstraintKey.getKey(); + final GenericKey toKey = null; + return mat.nonWindowed() + .get(nextLocation.getPartition(), fromKey, toKey); + } else if (keyConstraintKey.getOperator() == ConstraintOperator.LESS_THAN + || keyConstraintKey.getOperator() == ConstraintOperator.LESS_THAN_OR_EQUAL) { + //Underlying store will always return keys inclusive the endpoints + //and filtering is used to trim start and end of the range in case of "<" + final GenericKey fromKey = null; + final GenericKey toKey = keyConstraintKey.getKey(); + return mat.nonWindowed() + .get(nextLocation.getPartition(), fromKey, toKey); + } else { + throw new IllegalStateException(String.format("Invalid comparator type " + + keyConstraintKey.getOperator())); + } + } + @Override public void close() { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/operators/KeyedWindowedTableLookupOperator.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/operators/KeyedWindowedTableLookupOperator.java index 64945ea8c3ee..7ddae8804218 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/operators/KeyedWindowedTableLookupOperator.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/operators/KeyedWindowedTableLookupOperator.java @@ -24,7 +24,7 @@ import io.confluent.ksql.physical.common.operators.AbstractPhysicalOperator; import io.confluent.ksql.physical.common.operators.UnaryPhysicalOperator; import io.confluent.ksql.planner.plan.DataSourceNode; -import io.confluent.ksql.planner.plan.KeyConstraint.KeyConstraintKey; +import io.confluent.ksql.planner.plan.KeyConstraint; import io.confluent.ksql.planner.plan.PlanNode; import io.confluent.ksql.planner.plan.QueryFilterNode.WindowBounds; import java.util.Iterator; @@ -112,11 +112,11 @@ public Object next() { } private static WindowBounds getWindowBounds(final KsqlKey ksqlKey) { - if (!(ksqlKey instanceof KeyConstraintKey)) { + if (!(ksqlKey instanceof KeyConstraint)) { throw new IllegalStateException(String.format("Table windowed queries should be done with " + "key constraints: %s", ksqlKey.toString())); } - final KeyConstraintKey keyConstraintKey = (KeyConstraintKey) ksqlKey; + final KeyConstraint keyConstraintKey = (KeyConstraint) ksqlKey; if (!keyConstraintKey.getWindowBounds().isPresent()) { throw new IllegalStateException(String.format("Table windowed queries should be done with " + "window bounds: %s", ksqlKey)); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/QueryPlannerOptions.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/QueryPlannerOptions.java index f753b95e3f9e..56259fa7c988 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/QueryPlannerOptions.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/QueryPlannerOptions.java @@ -21,6 +21,8 @@ public interface QueryPlannerOptions { boolean getInterpreterEnabled(); + boolean getRangeScansEnabled(); + /** * @return a human readable representation of the {@code QueryPlannerOptions}, * used to debug requests diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/KeyConstraint.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/KeyConstraint.java index 7008ae16850e..302bd759cdcc 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/KeyConstraint.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/KeyConstraint.java @@ -27,36 +27,26 @@ * available through the given methods. These are used as hints for the physical planning * layer about how to fetch the corresponding rows. */ -public class KeyConstraint implements LookupConstraint { +public class KeyConstraint implements LookupConstraint, KsqlKey { private final ConstraintOperator operator; private final GenericKey key; private final Optional windowBounds; - public KeyConstraint( - final ConstraintOperator operator, - final GenericKey key, - final Optional windowBounds - ) { + public KeyConstraint(final ConstraintOperator operator, final GenericKey key, + final Optional windowBounds) { this.operator = operator; this.key = key; this.windowBounds = windowBounds; } - public static KeyConstraint equal( - final GenericKey key, - final Optional windowBounds - ) { - return new KeyConstraint(ConstraintOperator.EQUAL, key, windowBounds); - } - // The key value. public GenericKey getKey() { return key; } // The constraint operator associated with the value - public ConstraintOperator getConstraintOperator() { + public ConstraintOperator getOperator() { return operator; } @@ -65,8 +55,8 @@ public Optional getWindowBounds() { return windowBounds; } - public KeyConstraintKey getKsqlKey() { - return new KeyConstraintKey(key, windowBounds); + public KsqlKey getKsqlKey() { + return this; } // If the operator represents a range of keys @@ -82,48 +72,28 @@ public enum ConstraintOperator { GREATER_THAN_OR_EQUAL } - public static class KeyConstraintKey implements KsqlKey { - - private final GenericKey key; - private final Optional windowBounds; - - public KeyConstraintKey(final GenericKey key, final Optional windowBounds) { - this.key = key; - this.windowBounds = windowBounds; - } - - @Override - public GenericKey getKey() { - return key; - } + @Override + public int hashCode() { + return Objects.hash(key, windowBounds); + } - public Optional getWindowBounds() { - return windowBounds; + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; } - @Override - public int hashCode() { - return Objects.hash(key, windowBounds); + if (o == null || getClass() != o.getClass()) { + return false; } - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - - if (o == null || getClass() != o.getClass()) { - return false; - } - - final KeyConstraintKey that = (KeyConstraintKey) o; - return Objects.equals(this.key, that.key) - && Objects.equals(this.windowBounds, that.windowBounds); - } + final KeyConstraint that = (KeyConstraint) o; + return Objects.equals(this.key, that.key) + && Objects.equals(this.windowBounds, that.windowBounds); + } - @Override - public String toString() { - return key.toString() + "-" + windowBounds.toString(); - } + @Override + public String toString() { + return key.toString() + "-" + windowBounds.toString(); } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/QueryFilterNode.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/QueryFilterNode.java index fd9e25fe01a5..73cd69b95977 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/QueryFilterNode.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/QueryFilterNode.java @@ -41,11 +41,13 @@ import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.planner.QueryPlannerOptions; +import io.confluent.ksql.planner.plan.KeyConstraint.ConstraintOperator; import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.schema.ksql.Column.Namespace; import io.confluent.ksql.schema.ksql.DefaultSqlValueCoercer; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.SystemColumns; +import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.schema.utils.FormatOptions; import io.confluent.ksql.structured.SchemaKStream; @@ -53,8 +55,8 @@ import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.timestamp.PartialStringToTimestampParser; import java.time.Instant; -import java.util.Arrays; import java.util.BitSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -62,6 +64,7 @@ import java.util.Set; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -240,13 +243,10 @@ private ImmutableList extractLookupConstraints() { optionalWindowBounds = Optional.empty(); } - if (keyValueExtractor.seenKeys.isEmpty()) { - constraintPerDisjunct.add(new NonKeyConstraint()); - } else { - constraintPerDisjunct.add(KeyConstraint.equal( - GenericKey.fromList(Arrays.asList(keyValueExtractor.keyContents)), - optionalWindowBounds)); - } + final LookupConstraint constraint = + keyValueExtractor.getLookupConstraint(optionalWindowBounds); + constraintPerDisjunct.add(constraint); + } return constraintPerDisjunct.build(); } @@ -306,7 +306,6 @@ public Void visitComparisonExpression( final Expression other = getNonColumnRefSide(node); final HasColumnRef hasColumnRef = new HasColumnRef(); hasColumnRef.process(other, null); - if (hasColumnRef.hasColumnRef()) { setTableScanOrElseThrow(() -> invalidWhereClauseException("A comparison must be between a key column and a " @@ -340,16 +339,15 @@ public Void visitComparisonExpression( "Bound on non-existent column " + columnName, isWindowed)); if (col.namespace() == Namespace.KEY) { - if (node.getType() != Type.EQUAL) { + if (!isKeyQuery(node)) { setTableScanOrElseThrow(() -> invalidWhereClauseException("Bound on key columns '" - + getSource().getSchema().key() + "' must currently be '='", isWindowed)); + + getSource().getSchema().key() + "' must currently be '='", isWindowed)); } if (seenKeys.get(col.index())) { setTableScanOrElseThrow(() -> invalidWhereClauseException( - "An equality condition on the key column cannot be combined with other comparisons" - + " such as an IN predicate", - isWindowed)); + "A comparison condition on the key column cannot be combined with other" + + " comparisons such as an IN predicate", isWindowed)); } seenKeys.set(col.index()); isKeyedQuery = true; @@ -359,6 +357,14 @@ public Void visitComparisonExpression( } } + private boolean isKeyQuery(final ComparisonExpression node) { + if (node.getType() == Type.NOT_EQUAL || node.getType() == Type.IS_DISTINCT_FROM + || node.getType() == Type.IS_NOT_DISTINCT_FROM) { + return false; + } + return true; + } + private void setTableScanOrElseThrow(final Supplier exceptionSupplier) { if (queryPlannerOptions.getTableScansEnabled()) { requiresTableScan = true; @@ -428,10 +434,12 @@ public Void visitUnqualifiedColumnReference( private final class KeyValueExtractor extends TraversalExpressionVisitor { private final BitSet seenKeys; private final Object[] keyContents; + private HashMap> operators; KeyValueExtractor() { keyContents = new Object[schema.key().size()]; seenKeys = new BitSet(schema.key().size()); + operators = new HashMap<>(); } @Override @@ -447,10 +455,60 @@ public Void visitComparisonExpression( final Object key = resolveKey(other, col.get(), metaStore, ksqlConfig, node); keyContents[col.get().index()] = key; seenKeys.set(col.get().index()); + operators.put(col.get().index(), new ImmutablePair<>(node.getType(), col.get().type())); } return null; } + public LookupConstraint getLookupConstraint(final Optional windowBounds) { + if (seenKeys.isEmpty()) { + return new NonKeyConstraint(); + } + + if (operators.size() > 1) { + //if disjunct consist of multiple operations + //we set the ground for a keylookup if all ops are of type EQUAL, + //otherwise we return nokeyconsraint which leads to a table scan + if (operators.values().stream().allMatch(op -> op.getKey().equals(Type.EQUAL))) { + return new KeyConstraint(ConstraintOperator.EQUAL, + GenericKey.fromArray(keyContents), windowBounds); + } + return new NonKeyConstraint(); + } + + //single operator disjunct case + final Type operatorType = operators.get(0).getLeft(); + if (operatorType == Type.EQUAL) { + return new KeyConstraint(ConstraintOperator.EQUAL, + GenericKey.fromArray(keyContents), windowBounds); + } else if (isSupportedType(operators.get(0).getRight())) { + if (operatorType == Type.GREATER_THAN) { + return new KeyConstraint(ConstraintOperator.GREATER_THAN, + GenericKey.fromArray(keyContents), windowBounds); + } else if (operatorType == Type.GREATER_THAN_OR_EQUAL) { + return new KeyConstraint(ConstraintOperator.GREATER_THAN_OR_EQUAL, + GenericKey.fromArray(keyContents), windowBounds); + } else if (operatorType == Type.LESS_THAN) { + return new KeyConstraint(ConstraintOperator.LESS_THAN, + GenericKey.fromArray(keyContents), windowBounds); + } else if (operatorType == Type.LESS_THAN_OR_EQUAL) { + return new KeyConstraint(ConstraintOperator.LESS_THAN_OR_EQUAL, + GenericKey.fromArray(keyContents), windowBounds); + } + } + + return new NonKeyConstraint(); + } + + private boolean isSupportedType(final SqlType sqlType) { + if (sqlType == SqlTypes.STRING) { + return true; + } else if (sqlType == SqlTypes.BYTES) { + return true; + } + return false; + } + private Object resolveKey( final Expression exp, final Column keyColumn, @@ -662,14 +720,17 @@ public static KsqlException invalidWhereClauseException( + System.lineSeparator() + "Pull queries require a WHERE clause that:" + System.lineSeparator() - + " - includes a key equality expression, e.g. `SELECT * FROM X WHERE =Y;`." + + " - includes a key equality expression, " + + "e.g. `SELECT * FROM X WHERE = Y;`." + System.lineSeparator() + " - in the case of a multi-column key, is a conjunction of equality expressions " + "that cover all key columns." + System.lineSeparator() + + " - to support range expressions, e.g., SELECT * FROM X WHERE < Y;`, " + + "range scans need to be enabled by setting ksql.query.pull.range.scan.enabled=true" + additional + System.lineSeparator() - + "If more flexible queries are needed, table scans can be enabled by " + + "If more flexible queries are needed, , table scans can be enabled by " + "setting ksql.query.pull.table.scan.enabled=true." ); } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/HARoutingTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/HARoutingTest.java index 7c106bc848c2..481200131d02 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/HARoutingTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/HARoutingTest.java @@ -515,7 +515,8 @@ private void locate(final KsqlPartitionLocation... locations) { when(pullPhysicalPlan.getMaterialization().locator().locate( pullPhysicalPlan.getKeys(), routingOptions, - routingFilterFactory + routingFilterFactory, + false )).thenReturn(locationsList); } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/operators/KeyedTableLookupOperatorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/operators/KeyedTableLookupOperatorTest.java index f3c1d08be2f5..fe142adcbc39 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/operators/KeyedTableLookupOperatorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/operators/KeyedTableLookupOperatorTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import io.confluent.ksql.GenericKey; -import io.confluent.ksql.execution.streams.materialization.Locator.KsqlKey; import io.confluent.ksql.execution.streams.materialization.Locator.KsqlNode; import io.confluent.ksql.execution.streams.materialization.Locator.KsqlPartitionLocation; import io.confluent.ksql.execution.streams.materialization.Materialization; @@ -31,7 +30,10 @@ import io.confluent.ksql.execution.streams.materialization.Row; import io.confluent.ksql.execution.streams.materialization.ks.KsLocator; import io.confluent.ksql.planner.plan.DataSourceNode; +import io.confluent.ksql.planner.plan.KeyConstraint; +import io.confluent.ksql.planner.plan.KeyConstraint.ConstraintOperator; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Optional; import org.junit.Before; @@ -56,13 +58,13 @@ public class KeyedTableLookupOperatorTest { @Mock private DataSourceNode logicalNode; @Mock - private KsqlKey KEY1; + private KeyConstraint KEY1; @Mock - private KsqlKey KEY2; + private KeyConstraint KEY2; @Mock - private KsqlKey KEY3; + private KeyConstraint KEY3; @Mock - private KsqlKey KEY4; + private KeyConstraint KEY4; @Mock private GenericKey GKEY1; @Mock @@ -84,6 +86,10 @@ public void setUp() { when(KEY2.getKey()).thenReturn(GKEY2); when(KEY3.getKey()).thenReturn(GKEY3); when(KEY4.getKey()).thenReturn(GKEY4); + when(KEY1.getOperator()).thenReturn(ConstraintOperator.EQUAL); + when(KEY2.getOperator()).thenReturn(ConstraintOperator.EQUAL); + when(KEY3.getOperator()).thenReturn(ConstraintOperator.EQUAL); + when(KEY4.getOperator()).thenReturn(ConstraintOperator.EQUAL); } @Test @@ -142,4 +148,51 @@ public void shouldLookupRowsForMultipleKeys() { assertThat(lookupOperator.next(), is(nullValue())); assertThat(lookupOperator.getReturnedRowCount(), is(3L)); } + + @Test + public void shouldLookupRowsForRangeKeySinglePartition() { + //Given: + when(KEY3.getOperator()).thenReturn(ConstraintOperator.LESS_THAN_OR_EQUAL); + final List singleKeyPartitionLocations = new ArrayList<>(); + singleKeyPartitionLocations.add(new KsLocator.PartitionLocation( + Optional.of(ImmutableSet.of(KEY1, KEY3)), 1, ImmutableList.of(node1))); + + final KeyedTableLookupOperator lookupOperator = new KeyedTableLookupOperator(materialization, logicalNode); + when(materialization.nonWindowed()).thenReturn(nonWindowedTable); + when(materialization.nonWindowed().get(1,null, GKEY3)).thenReturn(Arrays.asList(ROW1, ROW3).iterator()); + + lookupOperator.setPartitionLocations(singleKeyPartitionLocations); + lookupOperator.open(); + + //Then: + assertThat(lookupOperator.next(), is(ROW1)); + assertThat(lookupOperator.next(), is(ROW3)); + assertThat(lookupOperator.getReturnedRowCount(), is(2L)); + } + + @Test + public void shouldLookupRowsForRangeKeyMultiplePartitions() { + //Given: + when(KEY2.getOperator()).thenReturn(ConstraintOperator.LESS_THAN_OR_EQUAL); + when(KEY3.getOperator()).thenReturn(ConstraintOperator.LESS_THAN_OR_EQUAL); + final List multipleKeysPartitionLocations = new ArrayList<>(); + multipleKeysPartitionLocations.add(new KsLocator.PartitionLocation( + Optional.of(ImmutableSet.of(KEY1, KEY2)), 1, ImmutableList.of(node1))); + multipleKeysPartitionLocations.add(new KsLocator.PartitionLocation( + Optional.of(ImmutableSet.of(KEY3, KEY4)), 3, ImmutableList.of(node3))); + + final KeyedTableLookupOperator lookupOperator = new KeyedTableLookupOperator(materialization, logicalNode); + when(materialization.nonWindowed()).thenReturn(nonWindowedTable); + when(materialization.nonWindowed().get(1,null, GKEY2)).thenReturn(Arrays.asList(ROW1).iterator()); + when(materialization.nonWindowed().get(3,null, GKEY3)).thenReturn(Arrays.asList(ROW3).iterator()); + + lookupOperator.setPartitionLocations(multipleKeysPartitionLocations); + lookupOperator.open(); + + //Then: + assertThat(lookupOperator.next(), is(ROW1)); + assertThat(lookupOperator.next(), is(ROW3)); + assertThat(lookupOperator.next(), is(nullValue())); + assertThat(lookupOperator.getReturnedRowCount(), is(2L)); + } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/operators/KeyedWindowedTableLookupOperatorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/operators/KeyedWindowedTableLookupOperatorTest.java index be30d024ad1f..9702f36fde21 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/operators/KeyedWindowedTableLookupOperatorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/operators/KeyedWindowedTableLookupOperatorTest.java @@ -31,7 +31,7 @@ import io.confluent.ksql.execution.streams.materialization.WindowedRow; import io.confluent.ksql.execution.streams.materialization.ks.KsLocator; import io.confluent.ksql.planner.plan.DataSourceNode; -import io.confluent.ksql.planner.plan.KeyConstraint.KeyConstraintKey; +import io.confluent.ksql.planner.plan.KeyConstraint; import io.confluent.ksql.planner.plan.QueryFilterNode.WindowBounds; import java.time.Instant; import java.util.ArrayList; @@ -58,13 +58,13 @@ public class KeyedWindowedTableLookupOperatorTest { @Mock private DataSourceNode logicalNode; @Mock - private KeyConstraintKey KEY1; + private KeyConstraint KEY1; @Mock - private KeyConstraintKey KEY2; + private KeyConstraint KEY2; @Mock - private KeyConstraintKey KEY3; + private KeyConstraint KEY3; @Mock - private KeyConstraintKey KEY4; + private KeyConstraint KEY4; @Mock private GenericKey GKEY1; @Mock diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/QueryFilterNodeTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/QueryFilterNodeTest.java index 3103cd367268..de70ab8c8313 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/QueryFilterNodeTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/QueryFilterNodeTest.java @@ -17,6 +17,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isA; import static org.junit.Assert.assertThrows; @@ -29,6 +30,7 @@ import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression; import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression.Sign; import io.confluent.ksql.execution.expression.tree.BooleanLiteral; +import io.confluent.ksql.execution.expression.tree.BytesLiteral; import io.confluent.ksql.execution.expression.tree.ComparisonExpression; import io.confluent.ksql.execution.expression.tree.ComparisonExpression.Type; import io.confluent.ksql.execution.expression.tree.Expression; @@ -38,6 +40,7 @@ import io.confluent.ksql.execution.expression.tree.LogicalBinaryExpression; import io.confluent.ksql.execution.expression.tree.NullLiteral; import io.confluent.ksql.execution.expression.tree.StringLiteral; +import io.confluent.ksql.execution.expression.tree.TimeLiteral; import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.model.DataSource.DataSourceType; @@ -49,6 +52,8 @@ import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; +import java.nio.ByteBuffer; +import java.sql.Time; import java.time.Instant; import java.util.List; import java.util.Optional; @@ -78,6 +83,12 @@ public class QueryFilterNodeTest { .valueColumn(ColumnName.of("C1"), SqlTypes.INTEGER) .build(); + private static final LogicalSchema STRING_SCHEMA = LogicalSchema.builder() + .keyColumn(K, SqlTypes.STRING) + .valueColumn(COL0, SqlTypes.STRING) + .valueColumn(ColumnName.of("ROWKEY"), SqlTypes.STRING) + .valueColumn(K, SqlTypes.STRING) + .build(); @Mock private PlanNode source; @@ -149,7 +160,6 @@ public void shouldExtractKeyValueFromNullLiteral() { assertThat(e.getMessage(), containsString("Primary key columns can not be NULL: (K = null)")); } - @Test public void shouldExtractKeyValueFromExpressionEquals() { // Given: @@ -913,7 +923,6 @@ public void shouldThrowKeyExpressionThatDoestCoverKey_multipleDisjuncts() { + "(WINDOWSTART = 1)")); } - @Test public void shouldThrowMultiKeyExpressionsThatDontCoverAllKeys() { // Given: @@ -957,7 +966,6 @@ public void shouldExtractConstraintForMultiKeyExpressionsThatDontCoverAllKeys_ta expectTableScan(expression, false); } - @Test public void shouldThrowIfComparisonOnNonKey() { // Given: @@ -1006,9 +1014,8 @@ public void shouldThrowIfNotComparisonExpression() { assertThat(e.getMessage(), containsString("Unsupported expression in WHERE clause: true.")); } - @Test - public void shouldThrowIfNonEqualComparison() { + public void shouldExtractKeyFromNonEqualComparison() { // Given: final Expression expression = new ComparisonExpression( Type.GREATER_THAN, @@ -1016,36 +1023,22 @@ public void shouldThrowIfNonEqualComparison() { new IntegerLiteral(1) ); - // When: - final KsqlException e = assertThrows( - KsqlException.class, - () -> new QueryFilterNode( - NODE_ID, - source, - expression, - metaStore, - ksqlConfig, - false, - plannerOptions - )); - - // Then: - assertThat(e.getMessage(), containsString("Bound on key columns '[`K` INTEGER KEY]' must currently be '='.")); - } - - @SuppressWarnings("unchecked") - @Test - public void shouldExtractConstraintWithLiteralRange_tableScan() { - // Given: - when(plannerOptions.getTableScansEnabled()).thenReturn(true); - final Expression expression = new ComparisonExpression( - Type.GREATER_THAN, - new UnqualifiedColumnReferenceExp(ColumnName.of("K")), - new IntegerLiteral(1) + final QueryFilterNode filterNode = new QueryFilterNode( + NODE_ID, + source, + expression, + metaStore, + ksqlConfig, + false, + plannerOptions ); + // When: + final List keys = filterNode.getLookupConstraints(); + // Then: - expectTableScan(expression, false); + assertThat(keys.size(), is(1)); + assertThat(keys.get(0), is(instanceOf(NonKeyConstraint.class))); } @Test @@ -1081,8 +1074,10 @@ public void shouldThrowOnInAndComparisonExpression() { plannerOptions )); + + // Then: - assertThat(e.getMessage(), containsString("An equality condition on the key column cannot be " + assertThat(e.getMessage(), containsString("A comparison condition on the key column cannot be " + "combined with other comparisons")); } @@ -1177,7 +1172,7 @@ public void shouldThrowOnMultipleKeyExpressions() { )); // Then: - assertThat(e.getMessage(), containsString("An equality condition on the key column cannot be combined with other comparisons")); + assertThat(e.getMessage(), containsString("A comparison condition on the key column cannot be combined with other comparisons")); } @SuppressWarnings("unchecked") @@ -1320,7 +1315,6 @@ public void shouldThrowOnInvalidTimestampType() { + "STRING containing a datetime.")); } - @Test public void shouldThrowOnEqOneWindowBoundAndGtAnother() { // Given: @@ -1534,6 +1528,57 @@ public void shouldThrowOnUsageOfWindowBoundOnNonwindowedTable() { assertThat(e.getMessage(), containsString("Cannot use WINDOWSTART/WINDOWEND on non-windowed source.")); } + @Test + public void shouldRangeScanFromStringRangeComparison() { + // Given: + when(source.getSchema()).thenReturn(STRING_SCHEMA); + final Expression expression = new ComparisonExpression( + Type.GREATER_THAN, + new UnqualifiedColumnReferenceExp(ColumnName.of("K")), + new StringLiteral("1") + ); + + // Then: + expectRangeScan(expression, false); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldTableScanFromIntRangeComparison() { + // Given: + final Expression expression = new ComparisonExpression( + Type.GREATER_THAN, + new UnqualifiedColumnReferenceExp(ColumnName.of("K")), + new IntegerLiteral(1) + ); + + // Then: + expectTableScan(expression, false); + } + + @Test + public void shouldSupportMultiRangeExpressionsUsingTableScan() { + // Given: + when(source.getSchema()).thenReturn(MULTI_KEY_SCHEMA); + final Expression expression1 = new ComparisonExpression( + Type.GREATER_THAN, + new UnqualifiedColumnReferenceExp(ColumnName.of("K1")), + new IntegerLiteral(1) + ); + final Expression expression2 = new ComparisonExpression( + Type.GREATER_THAN, + new UnqualifiedColumnReferenceExp(ColumnName.of("K2")), + new IntegerLiteral(2) + ); + final Expression expression = new LogicalBinaryExpression( + LogicalBinaryExpression.Type.AND, + expression1, + expression2 + ); + + expectTableScan(expression, false); + } + @SuppressWarnings("unchecked") private void expectTableScan(final Expression expression, final boolean windowed) { // Given: @@ -1555,4 +1600,27 @@ private void expectTableScan(final Expression expression, final boolean windowed assertThat(keys.size(), is(1)); assertThat(keys.get(0), isA((Class) NonKeyConstraint.class)); } + + @SuppressWarnings("unchecked") + private void expectRangeScan(final Expression expression, final boolean windowed) { + // Given: + QueryFilterNode filterNode = new QueryFilterNode( + NODE_ID, + source, + expression, + metaStore, + ksqlConfig, + windowed, + plannerOptions + ); + + // When: + final List keys = filterNode.getLookupConstraints(); + + // Then: + assertThat(filterNode.isWindowed(), is(windowed)); + assertThat(keys.size(), is(1)); + assertThat(keys.get(0), isA((Class) KeyConstraint.class)); + assertThat(((KeyConstraint) keys.get(0)).isRangeOperator(), is(true)); + } } \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json b/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json index bd40146fdf00..771b59cdf5f7 100644 --- a/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json +++ b/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json @@ -1043,9 +1043,7 @@ "ksql.query.pull.table.scan.enabled": false }, "expectedError": { - "type": "io.confluent.ksql.rest.entity.KsqlStatementErrorMessage", - "message": "Bound on key columns '[`ID` STRING KEY]' must currently be '='.", - "status": 400 + "message": "A comparison condition on the key column cannot be combined with other comparisons such as an IN predicate" } }, { @@ -2452,9 +2450,7 @@ "ksql.query.pull.table.scan.enabled": false }, "expectedError": { - "type": "io.confluent.ksql.rest.entity.KsqlStatementErrorMessage", - "message": "Bound on key columns '[`ID` INTEGER KEY]' must currently be '='.", - "status": 400 + "message": "A comparison condition on the key column cannot be combined with other comparisons such as an IN predicate" } }, { @@ -2554,9 +2550,7 @@ "ksql.query.pull.table.scan.enabled": false }, "expectedError": { - "type": "io.confluent.ksql.rest.entity.KsqlStatementErrorMessage", - "message": "An equality condition on the key column cannot be combined with other comparisons such as an IN predicate", - "status": 400 + "message": "A comparison condition on the key column cannot be combined with other comparisons such as an IN predicate" } }, { diff --git a/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-with-range-scan.json b/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-with-range-scan.json new file mode 100644 index 000000000000..a85e7c1c4057 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-with-range-scan.json @@ -0,0 +1,635 @@ +{ + "comments": [ + "Tests covering pull queries with range scan" + ], + "tests": [ + { + "name": "range scan query must fail with table scan and range scan disable", + "statements": [ + "CREATE TABLE INPUT (ID STRING PRIMARY KEY, V0 STRING, V1 STRING) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE MATVIEW AS SELECT ID, V0, V1 FROM INPUT;", + "SELECT ID, V0, V1 FROM MATVIEW WHERE ID <= '10';" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": false, + "ksql.query.pull.range.scan.enabled": false + }, + "expectedError": { + "message": "Query requires table scan to be enabled" + } + }, + { + "name": "range scan query must succeed with table scan disabled and range scan enabled", + "statements": [ + "CREATE TABLE INPUT (ID STRING PRIMARY KEY, V0 STRING, V1 STRING) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE MATVIEW AS SELECT ID, V0, V1 FROM INPUT;", + "SELECT ID, V0, V1 FROM MATVIEW WHERE ID < '12';" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": false, + "ksql.query.pull.range.scan.enabled": true + }, + "inputs": [ + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {"v0": "v10a", "v1": "v10b"}}, + {"topic": "test_topic", "timestamp": 12355, "key": "11", "value": {"v0": "v11a", "v1": "v11b"}}, + {"topic": "test_topic", "timestamp": 12365, "key": "12", "value": {"v0": "v12a", "v1": "v12b"}}, + {"topic": "test_topic", "timestamp": 12375, "key": "13", "value": {"v0": "v13a", "v1": "v13b"}}, + {"topic": "test_topic", "timestamp": 12385, "key": "14", "value": {"v0": "v14a", "v1": "v14b"}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `V0` STRING, `V1` STRING"}}, + {"row":{"columns":["11", "v11a", "v11b"]}}, + {"row":{"columns":["10", "v10a", "v10b"]}} + ]} + ] + }, + { + "name": "less-equal", + "statements": [ + "CREATE TABLE INPUT (ID STRING PRIMARY KEY, V0 STRING, V1 STRING) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE MATVIEW AS SELECT ID, V0, V1 FROM INPUT;", + "SELECT ID, V0, V1 FROM MATVIEW WHERE ID <= '12';" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": false, + "ksql.query.pull.range.scan.enabled": true + }, + "inputs": [ + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {"v0": "v10a", "v1": "v10b"}}, + {"topic": "test_topic", "timestamp": 12355, "key": "11", "value": {"v0": "v11a", "v1": "v11b"}}, + {"topic": "test_topic", "timestamp": 12365, "key": "12", "value": {"v0": "v12a", "v1": "v12b"}}, + {"topic": "test_topic", "timestamp": 12375, "key": "13", "value": {"v0": "v13a", "v1": "v13b"}}, + {"topic": "test_topic", "timestamp": 12385, "key": "14", "value": {"v0": "v14a", "v1": "v14b"}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `V0` STRING, `V1` STRING"}}, + {"row":{"columns":["11", "v11a", "v11b"]}}, + {"row":{"columns":["10", "v10a", "v10b"]}}, + {"row":{"columns":["12", "v12a", "v12b"]}} + ]} + ] + }, + { + "name": "greater", + "statements": [ + "CREATE TABLE INPUT (ID STRING PRIMARY KEY, V0 STRING, V1 STRING) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE MATVIEW AS SELECT ID, V0, V1 FROM INPUT;", + "SELECT ID, V0, V1 FROM MATVIEW WHERE ID > '12';" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": false, + "ksql.query.pull.range.scan.enabled": true + }, + "inputs": [ + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {"v0": "v10a", "v1": "v10b"}}, + {"topic": "test_topic", "timestamp": 12355, "key": "11", "value": {"v0": "v11a", "v1": "v11b"}}, + {"topic": "test_topic", "timestamp": 12365, "key": "12", "value": {"v0": "v12a", "v1": "v12b"}}, + {"topic": "test_topic", "timestamp": 12375, "key": "13", "value": {"v0": "v13a", "v1": "v13b"}}, + {"topic": "test_topic", "timestamp": 12385, "key": "14", "value": {"v0": "v14a", "v1": "v14b"}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `V0` STRING, `V1` STRING"}}, + {"row":{"columns":["13", "v13a", "v13b"]}}, + {"row":{"columns":["14", "v14a", "v14b"]}} + ]} + ] + }, + { + "name": "greater-equal", + "statements": [ + "CREATE TABLE INPUT (ID STRING PRIMARY KEY, V0 STRING, V1 STRING) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE MATVIEW AS SELECT ID, V0, V1 FROM INPUT;", + "SELECT ID, V0, V1 FROM MATVIEW WHERE ID >= '12';" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": false, + "ksql.query.pull.range.scan.enabled": true + }, + "inputs": [ + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {"v0": "v10a", "v1": "v10b"}}, + {"topic": "test_topic", "timestamp": 12355, "key": "11", "value": {"v0": "v11a", "v1": "v11b"}}, + {"topic": "test_topic", "timestamp": 12365, "key": "12", "value": {"v0": "v12a", "v1": "v12b"}}, + {"topic": "test_topic", "timestamp": 12375, "key": "13", "value": {"v0": "v13a", "v1": "v13b"}}, + {"topic": "test_topic", "timestamp": 12385, "key": "14", "value": {"v0": "v14a", "v1": "v14b"}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `V0` STRING, `V1` STRING"}}, + {"row":{"columns":["12", "v12a", "v12b"]}}, + {"row":{"columns":["13", "v13a", "v13b"]}}, + {"row":{"columns":["14", "v14a", "v14b"]}} + ]} + ] + }, + { + "name": "range scan on non-strings should fail if table-scan is disabled", + "statements": [ + "CREATE TABLE INPUT (ID INTEGER PRIMARY KEY, V0 STRING, V1 STRING) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE MATVIEW AS SELECT ID, V0, V1 FROM INPUT;", + "SELECT ID, V0, V1 FROM MATVIEW WHERE ID >= 12;" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": false, + "ksql.query.pull.range.scan.enabled": true + }, + "expectedError": { + "message": "Query requires table scan to be enabled" + } + }, + { + "name": "range scan on non-strings should fall back to table-scan if enabled", + "statements": [ + "CREATE TABLE INPUT (ID INTEGER PRIMARY KEY, V0 STRING, V1 STRING) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE MATVIEW AS SELECT ID, V0, V1 FROM INPUT;", + "SELECT ID, V0, V1 FROM MATVIEW WHERE ID >= 12;" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": true, + "ksql.query.pull.range.scan.enabled": true + }, + "inputs": [ + {"topic": "test_topic", "timestamp": 12345, "key": 10, "value": {"v0": "v10a", "v1": "v10b"}}, + {"topic": "test_topic", "timestamp": 12355, "key": 11, "value": {"v0": "v11a", "v1": "v11b"}}, + {"topic": "test_topic", "timestamp": 12365, "key": 12, "value": {"v0": "v12a", "v1": "v12b"}}, + {"topic": "test_topic", "timestamp": 12375, "key": 13, "value": {"v0": "v13a", "v1": "v13b"}}, + {"topic": "test_topic", "timestamp": 12385, "key": 14, "value": {"v0": "v14a", "v1": "v14b"}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` INTEGER KEY, `V0` STRING, `V1` STRING"}}, + {"row":{"columns":[13, "v13a", "v13b"]}}, + {"row":{"columns":[12, "v12a", "v12b"]}}, + {"row":{"columns":[14, "v14a", "v14b"]}} + ]} + ] + }, + { + "name": "range scan query must fail for multiple range expressions and table scan disabled", + "statements": [ + "CREATE TABLE INPUT (ID STRING PRIMARY KEY, V0 STRING, V1 STRING) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE MATVIEW AS SELECT ID, V0, V1 FROM INPUT;", + "SELECT ID, V0, V1 FROM MATVIEW WHERE ID >= '11' AND ID <= '13';" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": false, + "ksql.query.pull.range.scan.enabled": true + }, + "expectedError": { + "message": "A comparison condition on the key column cannot be combined with other comparisons such as an IN predicate" + } + }, + { + "name": "range scan query must fall back to table scan for multiple range expressions if table scan is enabled", + "statements": [ + "CREATE TABLE INPUT (ID STRING PRIMARY KEY, V0 STRING, V1 STRING) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE MATVIEW AS SELECT ID, V0, V1 FROM INPUT;", + "SELECT ID, V0, V1 FROM MATVIEW WHERE ID >= '11' AND ID <= '13';" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": true, + "ksql.query.pull.range.scan.enabled": true + }, + "inputs": [ + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {"v0": "v10a", "v1": "v10b"}}, + {"topic": "test_topic", "timestamp": 12355, "key": "11", "value": {"v0": "v11a", "v1": "v11b"}}, + {"topic": "test_topic", "timestamp": 12365, "key": "12", "value": {"v0": "v12a", "v1": "v12b"}}, + {"topic": "test_topic", "timestamp": 12375, "key": "13", "value": {"v0": "v13a", "v1": "v13b"}}, + {"topic": "test_topic", "timestamp": 12385, "key": "14", "value": {"v0": "v14a", "v1": "v14b"}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `V0` STRING, `V1` STRING"}}, + {"row":{"columns":["11", "v11a", "v11b"]}}, + {"row":{"columns":["12", "v12a", "v12b"]}}, + {"row":{"columns":["13", "v13a", "v13b"]}} + ]} + ] + }, + { + "name": "range scan query on non primary key must fail if table scan is disabled", + "statements": [ + "CREATE TABLE INPUT (ID STRING PRIMARY KEY, V0 STRING, V1 STRING) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE MATVIEW AS SELECT ID, V0, V1 FROM INPUT;", + "SELECT ID, V0, V1 FROM MATVIEW WHERE V0 >= '13a';" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": false, + "ksql.query.pull.range.scan.enabled": true + }, + "expectedError": { + "message": "WHERE clause missing key column for disjunct" + } + }, + { + "name": "range scan query on non primary key must succeed if table scan is enabled", + "statements": [ + "CREATE TABLE INPUT (ID STRING PRIMARY KEY, V0 STRING, V1 STRING) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE MATVIEW AS SELECT ID, V0, V1 FROM INPUT;", + "SELECT ID, V0, V1 FROM MATVIEW WHERE V0 >= 'v13a';" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": true, + "ksql.query.pull.range.scan.enabled": false + }, + "inputs": [ + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {"v0": "v10a", "v1": "v10b"}}, + {"topic": "test_topic", "timestamp": 12355, "key": "11", "value": {"v0": "v11a", "v1": "v11b"}}, + {"topic": "test_topic", "timestamp": 12365, "key": "12", "value": {"v0": "v12a", "v1": "v12b"}}, + {"topic": "test_topic", "timestamp": 12375, "key": "13", "value": {"v0": "v13a", "v1": "v13b"}}, + {"topic": "test_topic", "timestamp": 12385, "key": "14", "value": {"v0": "v14a", "v1": "v14b"}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `V0` STRING, `V1` STRING"}}, + {"row":{"columns":["13", "v13a", "v13b"]}}, + {"row":{"columns":["14", "v14a", "v14b"]}} + ]} + ] + }, + { + "name": "less-equal with a single partition", + "statements": [ + "CREATE TABLE INPUT (ID STRING PRIMARY KEY, V0 STRING, V1 STRING) WITH (kafka_topic='test_topic', value_format='JSON', PARTITIONS=1);", + "CREATE TABLE MATVIEW AS SELECT ID, V0, V1 FROM INPUT;", + "SELECT ID, V0, V1 FROM MATVIEW WHERE ID <= '12';" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": false, + "ksql.query.pull.range.scan.enabled": true + }, + "inputs": [ + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {"v0": "v10a", "v1": "v10b"}}, + {"topic": "test_topic", "timestamp": 12355, "key": "11", "value": {"v0": "v11a", "v1": "v11b"}}, + {"topic": "test_topic", "timestamp": 12365, "key": "12", "value": {"v0": "v12a", "v1": "v12b"}}, + {"topic": "test_topic", "timestamp": 12375, "key": "13", "value": {"v0": "v13a", "v1": "v13b"}}, + {"topic": "test_topic", "timestamp": 12385, "key": "14", "value": {"v0": "v14a", "v1": "v14b"}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `V0` STRING, `V1` STRING"}}, + {"row":{"columns":["10", "v10a", "v10b"]}}, + {"row":{"columns":["11", "v11a", "v11b"]}}, + {"row":{"columns":["12", "v12a", "v12b"]}} + ]} + ] + }, + { + "name": "less-equal with a 3 partitions", + "statements": [ + "CREATE TABLE INPUT (ID STRING PRIMARY KEY, V0 STRING, V1 STRING) WITH (kafka_topic='test_topic', value_format='JSON', PARTITIONS=3);", + "CREATE TABLE MATVIEW AS SELECT ID, V0, V1 FROM INPUT;", + "SELECT ID, V0, V1 FROM MATVIEW WHERE ID <= '12';" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": false, + "ksql.query.pull.range.scan.enabled": true + }, + "inputs": [ + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {"v0": "v10a", "v1": "v10b"}}, + {"topic": "test_topic", "timestamp": 12355, "key": "11", "value": {"v0": "v11a", "v1": "v11b"}}, + {"topic": "test_topic", "timestamp": 12365, "key": "12", "value": {"v0": "v12a", "v1": "v12b"}}, + {"topic": "test_topic", "timestamp": 12375, "key": "13", "value": {"v0": "v13a", "v1": "v13b"}}, + {"topic": "test_topic", "timestamp": 12385, "key": "14", "value": {"v0": "v14a", "v1": "v14b"}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `V0` STRING, `V1` STRING"}}, + {"row":{"columns":["11", "v11a", "v11b"]}}, + {"row":{"columns":["10", "v10a", "v10b"]}}, + {"row":{"columns":["12", "v12a", "v12b"]}} + ]} + ] + }, + { + "name": "less-equal with a 4 partitions", + "statements": [ + "CREATE TABLE INPUT (ID STRING PRIMARY KEY, V0 STRING, V1 STRING) WITH (kafka_topic='test_topic', value_format='JSON', PARTITIONS=4);", + "CREATE TABLE MATVIEW AS SELECT ID, V0, V1 FROM INPUT;", + "SELECT ID, V0, V1 FROM MATVIEW WHERE ID <= '12';" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": false, + "ksql.query.pull.range.scan.enabled": true + }, + "inputs": [ + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {"v0": "v10a", "v1": "v10b"}}, + {"topic": "test_topic", "timestamp": 12355, "key": "11", "value": {"v0": "v11a", "v1": "v11b"}}, + {"topic": "test_topic", "timestamp": 12365, "key": "12", "value": {"v0": "v12a", "v1": "v12b"}}, + {"topic": "test_topic", "timestamp": 12375, "key": "13", "value": {"v0": "v13a", "v1": "v13b"}}, + {"topic": "test_topic", "timestamp": 12385, "key": "14", "value": {"v0": "v14a", "v1": "v14b"}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `V0` STRING, `V1` STRING"}}, + {"row":{"columns":["11", "v11a", "v11b"]}}, + {"row":{"columns":["10", "v10a", "v10b"]}}, + {"row":{"columns":["12", "v12a", "v12b"]}} + ]} + ] + }, + { + "name": "range scan query must fall back to table scan for OR disjunctions of range expressions", + "statements": [ + "CREATE TABLE INPUT (ID STRING PRIMARY KEY, V0 STRING, V1 STRING) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE MATVIEW AS SELECT ID, V0, V1 FROM INPUT;", + "SELECT ID, V0, V1 FROM MATVIEW WHERE ID <= '11' OR ID >= '13';" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": false, + "ksql.query.pull.range.scan.enabled": true + }, + "expectedError": { + "message": "Query requires table scan to be enabled" + } + }, + { + "name": "range scan query must for OR disjunctions must succeed if table scan is enabled", + "statements": [ + "CREATE TABLE INPUT (ID STRING PRIMARY KEY, V0 STRING, V1 STRING) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE MATVIEW AS SELECT ID, V0, V1 FROM INPUT;", + "SELECT ID, V0, V1 FROM MATVIEW WHERE ID <= '11' OR ID >= '13';" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": true, + "ksql.query.pull.range.scan.enabled": true + }, + "inputs": [ + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {"v0": "v10a", "v1": "v10b"}}, + {"topic": "test_topic", "timestamp": 12355, "key": "11", "value": {"v0": "v11a", "v1": "v11b"}}, + {"topic": "test_topic", "timestamp": 12365, "key": "12", "value": {"v0": "v12a", "v1": "v12b"}}, + {"topic": "test_topic", "timestamp": 12375, "key": "13", "value": {"v0": "v13a", "v1": "v13b"}}, + {"topic": "test_topic", "timestamp": 12385, "key": "14", "value": {"v0": "v14a", "v1": "v14b"}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `V0` STRING, `V1` STRING"}}, + {"row":{"columns":["11", "v11a", "v11b"]}}, + {"row":{"columns":["10", "v10a", "v10b"]}}, + {"row":{"columns":["13", "v13a", "v13b"]}}, + {"row":{"columns":["14", "v14a", "v14b"]}} + ]} + ] + }, + { + "name": "multi-column range must succeed with table scan enabled", + "statements": [ + "CREATE STREAM INPUT (ID1 STRING KEY, ID2 INT) WITH (kafka_topic='test_topic', format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID1, ID2, COUNT(1) AS COUNT FROM INPUT GROUP BY ID1, ID2;", + "SELECT * FROM AGGREGATE WHERE ID1<='11' AND ID2>=10;" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": true, + "ksql.query.pull.range.scan.enabled": true + }, + "inputs": [ + {"topic": "test_topic", "timestamp": 12345, "key": "11", "value": {"id2": 10}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID1` STRING KEY, `ID2` INTEGER KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["11", 10, 1]}} + ]} + ] + }, + { + "name": "multi-column range must fail with table scan disabled", + "statements": [ + "CREATE STREAM INPUT (ID1 STRING KEY, ID2 INT) WITH (kafka_topic='test_topic', format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID1, ID2, COUNT(1) AS COUNT FROM INPUT GROUP BY ID1, ID2;", + "SELECT * FROM AGGREGATE WHERE ID1<='11' AND ID2>=10;" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": false, + "ksql.query.pull.range.scan.enabled": true + }, + "expectedError": { + "message": "Query requires table scan to be enabled" + } + }, + { + "name": "multi-column equality comparison must succeed with table scan and range scan disabled", + "statements": [ + "CREATE STREAM INPUT (ID1 STRING KEY, ID2 INT) WITH (kafka_topic='test_topic', format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID1, ID2, COUNT(1) AS COUNT FROM INPUT GROUP BY ID1, ID2;", + "SELECT * FROM AGGREGATE WHERE ID1='11' AND ID2=10;" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": false, + "ksql.query.pull.range.scan.enabled": false + }, + "inputs": [ + {"topic": "test_topic", "timestamp": 12345, "key": "11", "value": {"id2": 10}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID1` STRING KEY, `ID2` INTEGER KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["11", 10, 1]}} + ]} + ] + }, + { + "name": "multi-column range disjunction must succeed with table scan enabled", + "statements": [ + "CREATE STREAM INPUT (ID1 STRING KEY, ID2 INT) WITH (kafka_topic='test_topic', format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID1, ID2, COUNT(1) AS COUNT FROM INPUT GROUP BY ID1, ID2;", + "SELECT * FROM AGGREGATE WHERE ID1<='11' OR ID2>=10;" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": true, + "ksql.query.pull.range.scan.enabled": true + }, + "inputs": [ + {"topic": "test_topic", "timestamp": 12345, "key": "11", "value": {"id2": 10}}, + {"topic": "test_topic", "timestamp": 12365, "key": "9", "value": {"id2": 12}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID1` STRING KEY, `ID2` INTEGER KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["9", 12, 1]}}, + {"row":{"columns":["11", 10, 1]}} + ]} + ] + }, + { + "name": "multi-column range disjunction must fail with table scan disabled", + "statements": [ + "CREATE STREAM INPUT (ID1 STRING KEY, ID2 INT) WITH (kafka_topic='test_topic', format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID1, ID2, COUNT(1) AS COUNT FROM INPUT GROUP BY ID1, ID2;", + "SELECT * FROM AGGREGATE WHERE ID1<='11' OR ID2>=10;" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": false, + "ksql.query.pull.range.scan.enabled": true + }, + "expectedError": { + "message": "Multi-column sources must specify every key in the WHERE clause" + } + }, + { + "name": "multi-column range and equality should succeed", + "statements": [ + "CREATE STREAM INPUT (ID1 STRING KEY, ID2 INT) WITH (kafka_topic='test_topic', format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID1, ID2, COUNT(1) AS COUNT FROM INPUT GROUP BY ID1, ID2;", + "SELECT * FROM AGGREGATE WHERE ID1<='11' AND COUNT = 1;" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": true, + "ksql.query.pull.range.scan.enabled": true + }, + "inputs": [ + {"topic": "test_topic", "timestamp": 12345, "key": "11", "value": {"id2": 10}}, + {"topic": "test_topic", "timestamp": 12365, "key": "09", "value": {"id2": 12}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID1` STRING KEY, `ID2` INTEGER KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["11", 10, 1]}}, + {"row":{"columns":["09", 12, 1]}} + ]} + ] + }, + { + "name": "range scan query on key and non key must succeed with table scan enabled", + "statements": [ + "CREATE TABLE INPUT (ID STRING PRIMARY KEY, V0 STRING, V1 STRING) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE MATVIEW AS SELECT ID, V0, V1 FROM INPUT;", + "SELECT ID, V0, V1 FROM MATVIEW WHERE ID < '12' AND V0 > '10';" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": true, + "ksql.query.pull.range.scan.enabled": true + }, + "inputs": [ + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {"v0": "v10a", "v1": "v10b"}}, + {"topic": "test_topic", "timestamp": 12355, "key": "11", "value": {"v0": "v11a", "v1": "v11b"}}, + {"topic": "test_topic", "timestamp": 12365, "key": "12", "value": {"v0": "v12a", "v1": "v12b"}}, + {"topic": "test_topic", "timestamp": 12375, "key": "13", "value": {"v0": "v13a", "v1": "v13b"}}, + {"topic": "test_topic", "timestamp": 12385, "key": "14", "value": {"v0": "v14a", "v1": "v14b"}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `V0` STRING, `V1` STRING"}}, + {"row":{"columns":["11", "v11a", "v11b"]}}, + {"row":{"columns":["10", "v10a", "v10b"]}} + ]} + ] + }, + { + "name": "range scan query on key and non key must succeed with table scan disabled but range scan enabled", + "statements": [ + "CREATE TABLE INPUT (ID STRING PRIMARY KEY, V0 STRING, V1 STRING) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE MATVIEW AS SELECT ID, V0, V1 FROM INPUT;", + "SELECT ID, V0, V1 FROM MATVIEW WHERE ID < '12' AND V0 > '10';" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": false, + "ksql.query.pull.range.scan.enabled": true + }, + "inputs": [ + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {"v0": "v10a", "v1": "v10b"}}, + {"topic": "test_topic", "timestamp": 12355, "key": "11", "value": {"v0": "v11a", "v1": "v11b"}}, + {"topic": "test_topic", "timestamp": 12365, "key": "12", "value": {"v0": "v12a", "v1": "v12b"}}, + {"topic": "test_topic", "timestamp": 12375, "key": "13", "value": {"v0": "v13a", "v1": "v13b"}}, + {"topic": "test_topic", "timestamp": 12385, "key": "14", "value": {"v0": "v14a", "v1": "v14b"}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `V0` STRING, `V1` STRING"}}, + {"row":{"columns":["11", "v11a", "v11b"]}}, + {"row":{"columns":["10", "v10a", "v10b"]}} + ]} + ] + }, + { + "name": "windowed range query must succeed with table scan enabled", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;", + "SELECT * FROM AGGREGATE WHERE ID<= '10';" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": true, + "ksql.query.pull.range.scan.enabled": true + }, + "inputs": [ + {"topic": "test_topic", "timestamp": 12346, "key": "11", "value": {"val": 1}}, + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {"val": 2}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 12000, 13000, 1]}} + ]} + ] + }, + { + "name": "windowed range query must fail with table scan disabled", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;", + "SELECT * FROM AGGREGATE WHERE ID<= '10';" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": false, + "ksql.query.pull.range.scan.enabled": true + }, + "expectedError": { + "message": "Query requires table scan to be enabled" + } + }, + { + "name": "multi-column range on single key must fail with table scan disabled", + "statements": [ + "CREATE STREAM INPUT (ID1 STRING KEY, ID2 INT) WITH (kafka_topic='test_topic', format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID1, ID2, COUNT(1) AS COUNT FROM INPUT GROUP BY ID1, ID2;", + "SELECT * FROM AGGREGATE WHERE ID1<='09';" + ], + "properties": { + "ksql.query.pull.table.scan.enabled": false, + "ksql.query.pull.range.scan.enabled": true + }, + "expectedError": { + "message": "Multi-column sources must specify every key in the WHERE clause." + } + } + ] +} \ No newline at end of file diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryConfigPlannerOptions.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryConfigPlannerOptions.java index 67b6502f2b44..fc9f9bf075e8 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryConfigPlannerOptions.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryConfigPlannerOptions.java @@ -50,4 +50,12 @@ public boolean getInterpreterEnabled() { } return ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PULL_INTERPRETER_ENABLED); } + + @Override + public boolean getRangeScansEnabled() { + if (configOverrides.containsKey(KsqlConfig.KSQL_QUERY_PULL_RANGE_SCAN_ENABLED)) { + return (Boolean) configOverrides.get(KsqlConfig.KSQL_QUERY_PULL_RANGE_SCAN_ENABLED); + } + return ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PULL_RANGE_SCAN_ENABLED); + } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryConfigPlannerOptions.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryConfigPlannerOptions.java index 05e2978eb555..7d2801a7330b 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryConfigPlannerOptions.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryConfigPlannerOptions.java @@ -45,4 +45,9 @@ public boolean getInterpreterEnabled() { } return ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_INTERPRETER_ENABLED); } + + @Override + public boolean getRangeScansEnabled() { + return true; + } } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryFunctionalTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryFunctionalTest.java index d49d92b5f440..8ded5679082e 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryFunctionalTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryFunctionalTest.java @@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; import com.google.common.collect.ImmutableList; import io.confluent.common.utils.IntegrationTest; diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterialization.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterialization.java index c26e7fcc1204..0832976b2aec 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterialization.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterialization.java @@ -160,6 +160,21 @@ public Iterator get(final int partition) { .map(Optional::get) .iterator(); } + + @Override + public Iterator get(final int partition, final GenericKey from, final GenericKey to) { + if (transforms.isEmpty()) { + return table.get(partition, from, to); + } + + return Streams.stream(table.get(partition, from, to)) + .map(row -> filterAndTransform(row.key(), getIntermediateRow(row), row.rowTime()) + .map(v -> row.withValue(v, schema()))) + .filter(Optional::isPresent) + .map(Optional::get) + .iterator(); + } + } final class KsqlMaterializedWindowedTable implements MaterializedWindowedTable { diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/Locator.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/Locator.java index 912441d84945..8a48f0f62423 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/Locator.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/Locator.java @@ -45,7 +45,8 @@ public interface Locator { List locate( List keys, RoutingOptions routingOptions, - RoutingFilterFactory routingFilterFactory + RoutingFilterFactory routingFilterFactory, + boolean isRangeScan ); interface KsqlNode { diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/MaterializedTable.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/MaterializedTable.java index 48786afcfe7b..42dfc6a7800a 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/MaterializedTable.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/MaterializedTable.java @@ -40,4 +40,14 @@ public interface MaterializedTable { * @return the rows. */ Iterator get(int partition); + + /** + * RangeScan the table for rows + * + * @param partition partition to limit the get to + * @param from first key in the range + * @param to last key in range + * @return the rows. + */ + Iterator get(int partition, GenericKey from, GenericKey to); } diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java index c9cd6d95879c..261345cf9d5c 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; import com.google.common.collect.Streams; import com.google.errorprone.annotations.Immutable; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -96,16 +97,22 @@ public final class KsLocator implements Locator { public List locate( final List keys, final RoutingOptions routingOptions, - final RoutingFilterFactory routingFilterFactory + final RoutingFilterFactory routingFilterFactory, + final boolean isRangeScan ) { + if (isRangeScan && keys.isEmpty()) { + throw new IllegalStateException("Query is range scan but found no range keys."); + } final ImmutableList.Builder partitionLocations = ImmutableList.builder(); final Set filterPartitions = routingOptions.getPartitions(); + final Optional> keySet = keys.isEmpty() ? Optional.empty() : + Optional.of(Sets.newHashSet(keys)); // Depending on whether this is a key-based lookup, determine which metadata method to use. // If we don't have keys, find the metadata for all partitions since we'll run the query for // all partitions of the state store rather than a particular one. - final List metadata = keys.isEmpty() - ? getMetadataForAllPartitions(filterPartitions) + final List metadata = keys.isEmpty() || isRangeScan + ? getMetadataForAllPartitions(filterPartitions, keySet) : getMetadataForKeys(keys, filterPartitions); // Go through the metadata and group them by partition. for (PartitionMetadata partitionMetadata : metadata) { @@ -185,7 +192,7 @@ private List getMetadataForKeys( * @return The metadata associated with all partitions */ private List getMetadataForAllPartitions( - final Set filterPartitions) { + final Set filterPartitions, final Optional> keys) { // It's important that we consider only the source topics for the subtopology that contains the // state store. Otherwise, we'll be given the wrong partition -> host mappings. // The underlying state store has a number of partitions that is the MAX of the number of @@ -234,7 +241,7 @@ private List getMetadataForAllPartitions( final Set standbyHosts = standbyHostsByPartition.getOrDefault(partition, Collections.emptySet()); metadataList.add( - new PartitionMetadata(activeHost, standbyHosts, partition, Optional.empty())); + new PartitionMetadata(activeHost, standbyHosts, partition, keys)); } return metadataList; } diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedTable.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedTable.java index e873d6bd18b0..39bbfe324090 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedTable.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedTable.java @@ -72,4 +72,22 @@ public Iterator get(final int partition) { throw new MaterializationException("Failed to scan materialized table", e); } } + + @Override + public Iterator get(final int partition, final GenericKey from, final GenericKey to) { + try { + final ReadOnlyKeyValueStore> store = stateStore + .store(QueryableStoreTypes.timestampedKeyValueStore(), partition); + + final KeyValueIterator> iterator = + store.range(from, to); + return Streams.stream(IteratorUtil.onComplete(iterator, iterator::close)) + .map(keyValue -> Row.of(stateStore.schema(), keyValue.key, keyValue.value.value(), + keyValue.value.timestamp())) + .iterator(); + } catch (final Exception e) { + throw new MaterializationException("Failed to range scan materialized table", e); + } + } + } diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocatorTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocatorTest.java index b15f2f7de995..903ef3e916dc 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocatorTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocatorTest.java @@ -205,7 +205,7 @@ public void shouldThrowIfMetadataNotAvailable() { // When: final Exception e = assertThrows( MaterializationException.class, - () -> locator.locate(ImmutableList.of(KEY), routingOptions, routingFilterFactoryActive) + () -> locator.locate(ImmutableList.of(KEY), routingOptions, routingFilterFactoryActive, false) ); // Then: @@ -213,6 +213,22 @@ public void shouldThrowIfMetadataNotAvailable() { "Materialized data for key [1] is not available yet. Please try again later.")); } + @Test + public void shouldThrowIfRangeScanAndKeysEmpty() { + // Given: + getEmtpyMetadata(); + + // When: + final Exception e = assertThrows( + IllegalStateException.class, + () -> locator.locate(Collections.emptyList(), routingOptions, routingFilterFactoryActive, true) + ); + + // Then: + assertThat(e.getMessage(), containsString( + "Query is range scan but found no range keys.")); + } + @Test public void shouldReturnOwnerIfKnown() { // Given: @@ -220,7 +236,7 @@ public void shouldReturnOwnerIfKnown() { // When: final List result = locator.locate(ImmutableList.of(KEY), routingOptions, - routingFilterFactoryActive); + routingFilterFactoryActive, false); // Then: List nodeList = result.get(0).getNodes(); @@ -244,7 +260,7 @@ public void shouldReturnLocalOwnerIfSameAsSuppliedLocalHost() { // When: final List result = locator.locate(ImmutableList.of(KEY), - routingOptions, routingFilterFactoryActive); + routingOptions, routingFilterFactoryActive, false); // Then: List nodeList = result.get(0).getNodes(); @@ -264,7 +280,7 @@ public void shouldReturnLocalOwnerIfExplicitlyLocalHostOnSamePortAsSuppliedLocal // When: final List result = locator.locate(ImmutableList.of(KEY), routingOptions, - routingFilterFactoryActive); + routingFilterFactoryActive, false); // Then: List nodeList = result.get(0).getNodes(); @@ -284,7 +300,7 @@ public void shouldReturnRemoteOwnerForDifferentHost() { // When: final List result = locator.locate(ImmutableList.of(KEY), routingOptions, - routingFilterFactoryActive); + routingFilterFactoryActive, false); // Then: List nodeList = result.get(0).getNodes(); @@ -304,7 +320,7 @@ public void shouldReturnRemoteOwnerForDifferentPort() { // When: final List result = locator.locate(ImmutableList.of(KEY), routingOptions, - routingFilterFactoryActive); + routingFilterFactoryActive, false); // Then: List nodeList = result.get(0).getNodes(); @@ -325,7 +341,7 @@ public void shouldReturnRemoteOwnerForDifferentPortOnLocalHost() { // When: final List result = locator.locate(ImmutableList.of(KEY), routingOptions, - routingFilterFactoryActive); + routingFilterFactoryActive, false); // Then: List nodeList = result.get(0).getNodes(); @@ -340,7 +356,7 @@ public void shouldReturnActiveWhenRoutingStandbyNotEnabledHeartBeatNotEnabled() // When: final List result = locator.locate(ImmutableList.of(KEY), routingOptions, - routingFilterFactoryActive); + routingFilterFactoryActive, false); // Then: List nodeList = result.get(0).getNodes().stream() @@ -357,7 +373,7 @@ public void shouldReturnActiveAndStandBysWhenRoutingStandbyEnabledHeartBeatNotEn // When: final List result = locator.locate(ImmutableList.of(KEY), routingOptions, - routingFilterFactoryStandby); + routingFilterFactoryStandby, false); // Then: List nodeList = result.get(0).getNodes(); @@ -375,7 +391,7 @@ public void shouldReturnStandBysWhenActiveDown() { // When: final List result = locator.locate(ImmutableList.of(KEY), routingOptions, - routingFilterFactoryStandby); + routingFilterFactoryStandby, false); // Then: List nodeList = result.get(0).getNodes().stream() @@ -396,7 +412,7 @@ public void shouldReturnOneStandByWhenActiveAndOtherStandByDown() { // When: final List result = locator.locate(ImmutableList.of(KEY), routingOptions, - routingFilterFactoryStandby); + routingFilterFactoryStandby, false); // Then: List nodeList = result.get(0).getNodes().stream() @@ -417,7 +433,7 @@ public void shouldGroupKeysByLocation() { // When: final List result = locator.locate( ImmutableList.of(KEY, KEY1, KEY2, KEY3), routingOptions, - routingFilterFactoryStandby); + routingFilterFactoryStandby, false); // Then: assertThat(result.size(), is(3)); @@ -451,7 +467,47 @@ public void shouldFindAllPartitionsWhenNoKeys() { // When: final List result = locator.locate( - ImmutableList.of(), routingOptions, routingFilterFactoryStandby); + ImmutableList.of(), routingOptions, routingFilterFactoryStandby, false); + + // Then: + assertThat(result.size(), is(3)); + int partition = result.get(0).getPartition(); + assertThat(partition, is(0)); + List nodeList = result.get(0).getNodes(); + assertThat(nodeList.size(), is(3)); + assertThat(nodeList.get(0), is(activeNode)); + assertThat(nodeList.get(1), is(standByNode1)); + assertThat(nodeList.get(2), is(standByNode2)); + partition = result.get(1).getPartition(); + assertThat(partition, is(1)); + nodeList = result.get(1).getNodes(); + assertThat(nodeList.size(), is(3)); + assertThat(nodeList.get(0), is(standByNode1)); + assertThat(nodeList.get(1), is(activeNode)); + assertThat(nodeList.get(2), is(standByNode2)); + partition = result.get(2).getPartition(); + assertThat(partition, is(2)); + nodeList = result.get(2).getNodes(); + assertThat(nodeList.size(), is(3)); + assertThat(nodeList.get(0), is(standByNode2)); + assertThat(nodeList.get(1), is(activeNode)); + assertThat(nodeList.get(2), is(standByNode1)); + } + + @Test + public void shouldFindAllPartitionsWithKeysAndRangeScan() { + // Given: + when(topology.describe()).thenReturn(description); + when(description.subtopologies()).thenReturn(ImmutableSet.of(sub1)); + when(sub1.nodes()).thenReturn(ImmutableSet.of(source, processor)); + when(source.topicSet()).thenReturn(ImmutableSet.of(TOPIC_NAME)); + when(processor.stores()).thenReturn(ImmutableSet.of(STORE_NAME)); + when(kafkaStreams.streamsMetadataForStore(any())) + .thenReturn(ImmutableList.of(HOST1_STREAMS_MD1, HOST1_STREAMS_MD2, HOST1_STREAMS_MD3)); + + // When: + final List result = locator.locate( + ImmutableList.of(KEY), routingOptions, routingFilterFactoryStandby, true); // Then: assertThat(result.size(), is(3));