Skip to content

Commit

Permalink
feat: Add support for IN clause to pull queries (#6409)
Browse files Browse the repository at this point in the history
* Add support for IN clause to pull queries
  • Loading branch information
AlanConfluent authored Oct 22, 2020
1 parent a6fc67c commit d5fc365
Show file tree
Hide file tree
Showing 30 changed files with 1,524 additions and 304 deletions.
13 changes: 13 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,12 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_QUERY_PULL_MAX_QPS_DOC = "The maximum qps allowed for pull "
+ "queries. Once the limit is hit, queries will fail immediately";

public static final String KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG
= "ksql.query.pull.thread.pool.size";
public static final Integer KSQL_QUERY_PULL_THREAD_POOL_SIZE_DEFAULT = 100;
public static final String KSQL_QUERY_PULL_THREAD_POOL_SIZE_DOC =
"Size of thread pool used for sending/executing pull queries";

public static final String KSQL_STRING_CASE_CONFIG_TOGGLE = "ksql.cast.strings.preserve.nulls";
public static final String KSQL_STRING_CASE_CONFIG_TOGGLE_DOC =
"When casting a SQLType to string, if false, use String.valueof(), else if true use"
Expand Down Expand Up @@ -746,6 +752,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_QUERY_PULL_MAX_QPS_DOC
)
.define(
KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG,
Type.INT,
KSQL_QUERY_PULL_THREAD_POOL_SIZE_DEFAULT,
Importance.LOW,
KSQL_QUERY_PULL_THREAD_POOL_SIZE_DOC
)
.define(
KSQL_ERROR_CLASSIFIER_REGEX_PREFIX,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public class KsqlRequestConfig extends AbstractConfig {
private static final String KSQL_DEBUG_REQUEST_DOC =
"Indicates whether a KsqlRequest should contain debugging information.";

public static final String KSQL_REQUEST_QUERY_PULL_PARTITIONS =
"request.ksql.query.pull.partition";
public static final String KSQL_REQUEST_QUERY_PULL_PARTITIONS_DEFAULT = "";
private static final String KSQL_REQUEST_QUERY_PULL_PARTITIONS_DOC =
"Indicates which partitions to limit pull queries to.";

private static ConfigDef buildConfigDef() {
final ConfigDef configDef = new ConfigDef()
.define(
Expand All @@ -64,6 +70,12 @@ private static ConfigDef buildConfigDef() {
KSQL_DEBUG_REQUEST_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_DEBUG_REQUEST_DOC
).define(
KSQL_REQUEST_QUERY_PULL_PARTITIONS,
Type.LIST,
KSQL_REQUEST_QUERY_PULL_PARTITIONS_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_REQUEST_QUERY_PULL_PARTITIONS_DOC
);
return configDef;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public class KsMaterializationFunctionalTest {
private static final Format VALUE_FORMAT = JSON;
private static final UserDataProvider USER_DATA_PROVIDER = new UserDataProvider();
private static final PageViewDataProvider PAGE_VIEW_DATA_PROVIDER = new PageViewDataProvider();
private static final int PARTITION = 0;

private static final Duration WINDOW_SIZE = Duration.ofSeconds(5);
private static final Duration WINDOW_SEGMENT_DURATION = Duration.ofSeconds(60);
Expand Down Expand Up @@ -253,14 +254,14 @@ public void shouldQueryMaterializedTableForAggregatedTable() {
rows.forEach((rowKey, value) -> {
final Struct key = asKeyStruct(rowKey, query.getPhysicalSchema());

final Optional<Row> row = withRetry(() -> table.get(key));
final Optional<Row> row = withRetry(() -> table.get(key, PARTITION));
assertThat(row.map(Row::schema), is(Optional.of(schema)));
assertThat(row.map(Row::key), is(Optional.of(key)));
assertThat(row.map(Row::value), is(Optional.of(value)));
});

final Struct key = asKeyStruct("Won't find me", query.getPhysicalSchema());
assertThat("unknown key", withRetry(() -> table.get(key)), is(Optional.empty()));
assertThat("unknown key", withRetry(() -> table.get(key, PARTITION)), is(Optional.empty()));
}

@Test
Expand All @@ -287,14 +288,14 @@ public void shouldQueryMaterializedTableForAggregatedStream() {
rows.forEach((rowKey, value) -> {
final Struct key = asKeyStruct(rowKey, query.getPhysicalSchema());

final Optional<Row> row = withRetry(() -> table.get(key));
final Optional<Row> row = withRetry(() -> table.get(key, PARTITION));
assertThat(row.map(Row::schema), is(Optional.of(schema)));
assertThat(row.map(Row::key), is(Optional.of(key)));
assertThat(row.map(Row::value), is(Optional.of(value)));
});

final Struct key = asKeyStruct("Won't find me", query.getPhysicalSchema());
assertThat("unknown key", withRetry(() -> table.get(key)), is(Optional.empty()));
assertThat("unknown key", withRetry(() -> table.get(key, PARTITION)), is(Optional.empty()));
}

@Test
Expand Down Expand Up @@ -325,7 +326,7 @@ public void shouldQueryMaterializedTableForTumblingWindowed() {
final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema());

final List<WindowedRow> resultAtWindowStart =
withRetry(() -> table.get(key, Range.singleton(w.start()), Range.all()));
withRetry(() -> table.get(key, PARTITION, Range.singleton(w.start()), Range.all()));

assertThat("at exact window start", resultAtWindowStart, hasSize(1));
assertThat(resultAtWindowStart.get(0).schema(), is(schema));
Expand All @@ -334,16 +335,18 @@ public void shouldQueryMaterializedTableForTumblingWindowed() {
assertThat(resultAtWindowStart.get(0).value(), is(v));

final List<WindowedRow> resultAtWindowEnd =
withRetry(() -> table.get(key, Range.all(), Range.singleton(w.end())));
withRetry(() -> table.get(key, PARTITION, Range.all(), Range.singleton(w.end())));
assertThat("at exact window end", resultAtWindowEnd, hasSize(1));

final List<WindowedRow> resultFromRange = withRetry(() -> withRetry(() -> table
.get(key, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)), Range.all())));
.get(key, PARTITION, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)),
Range.all())));

assertThat("range including window start", resultFromRange, is(resultAtWindowStart));

final List<WindowedRow> resultPast = withRetry(() -> table
.get(key, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)), Range.all()));
.get(key, PARTITION, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)),
Range.all()));
assertThat("past start", resultPast, is(empty())
);
});
Expand Down Expand Up @@ -378,7 +381,7 @@ public void shouldQueryMaterializedTableForHoppingWindowed() {
final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema());

final List<WindowedRow> resultAtWindowStart =
withRetry(() -> table.get(key, Range.singleton(w.start()), Range.all()));
withRetry(() -> table.get(key, PARTITION, Range.singleton(w.start()), Range.all()));

assertThat("at exact window start", resultAtWindowStart, hasSize(1));
assertThat(resultAtWindowStart.get(0).schema(), is(schema));
Expand All @@ -387,16 +390,18 @@ public void shouldQueryMaterializedTableForHoppingWindowed() {
assertThat(resultAtWindowStart.get(0).value(), is(v));

final List<WindowedRow> resultAtWindowEnd =
withRetry(() -> table.get(key, Range.all(), Range.singleton(w.end())));
withRetry(() -> table.get(key, PARTITION, Range.all(), Range.singleton(w.end())));
assertThat("at exact window end", resultAtWindowEnd, hasSize(1));

final List<WindowedRow> resultFromRange = withRetry(() -> table
.get(key, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)), Range.all()));
.get(key, PARTITION, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)),
Range.all()));

assertThat("range including window start", resultFromRange, is(resultAtWindowStart));

final List<WindowedRow> resultPast = withRetry(() -> table
.get(key, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)), Range.all()));
.get(key, PARTITION, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)),
Range.all()));

assertThat("past start", resultPast, is(empty()));
});
Expand Down Expand Up @@ -430,7 +435,7 @@ public void shouldQueryMaterializedTableForSessionWindowed() {
final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema());

final List<WindowedRow> resultAtWindowStart =
withRetry(() -> table.get(key, Range.singleton(w.start()), Range.all()));
withRetry(() -> table.get(key, PARTITION, Range.singleton(w.start()), Range.all()));

assertThat("at exact window start", resultAtWindowStart, hasSize(1));
assertThat(resultAtWindowStart.get(0).schema(), is(schema));
Expand All @@ -439,15 +444,17 @@ public void shouldQueryMaterializedTableForSessionWindowed() {
assertThat(resultAtWindowStart.get(0).value(), is(v));

final List<WindowedRow> resultAtWindowEnd =
withRetry(() -> table.get(key, Range.all(), Range.singleton(w.end())));
withRetry(() -> table.get(key, PARTITION, Range.all(), Range.singleton(w.end())));
assertThat("at exact window end", resultAtWindowEnd, hasSize(1));

final List<WindowedRow> resultFromRange = withRetry(() -> table
.get(key, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)), Range.all()));
.get(key, PARTITION, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)),
Range.all()));
assertThat("range including window start", resultFromRange, is(resultAtWindowStart));

final List<WindowedRow> resultPast = withRetry(() -> table
.get(key, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)), Range.all()));
.get(key, PARTITION, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)),
Range.all()));
assertThat("past start", resultPast, is(empty()));
});
}
Expand Down Expand Up @@ -577,7 +584,7 @@ public void shouldQueryMaterializedTableWithKeyFieldsInProjection() {
rows.forEach((rowKey, value) -> {
final Struct key = asKeyStruct(rowKey, query.getPhysicalSchema());

final Optional<Row> row = withRetry(() -> table.get(key));
final Optional<Row> row = withRetry(() -> table.get(key, PARTITION));
assertThat(row.map(Row::schema), is(Optional.of(schema)));
assertThat(row.map(Row::key), is(Optional.of(key)));
assertThat(row.map(Row::value), is(Optional.of(value)));
Expand Down Expand Up @@ -611,7 +618,7 @@ public void shouldQueryMaterializedTableWithMultipleAggregationColumns() {
rows.forEach((rowKey, value) -> {
final Struct key = asKeyStruct(rowKey, query.getPhysicalSchema());

final Optional<Row> row = withRetry(() -> table.get(key));
final Optional<Row> row = withRetry(() -> table.get(key, PARTITION));
assertThat(row.map(Row::schema), is(Optional.of(schema)));
assertThat(row.map(Row::key), is(Optional.of(key)));
assertThat(row.map(Row::value), is(Optional.of(value)));
Expand Down Expand Up @@ -649,7 +656,7 @@ public void shouldHandleHavingClause() {
// Rows passing the HAVING clause:
final Struct key = asKeyStruct(rowKey, query.getPhysicalSchema());

final Optional<Row> row = withRetry(() -> table.get(key));
final Optional<Row> row = withRetry(() -> table.get(key, PARTITION));
assertThat(row.map(Row::schema), is(Optional.of(schema)));
assertThat(row.map(Row::key), is(Optional.of(key)));
assertThat(row.map(Row::value), is(Optional.of(value)));
Expand All @@ -659,7 +666,7 @@ public void shouldHandleHavingClause() {
.filter(e -> !rows.containsKey(e.getKey().getString("USERID")))
.forEach(e -> {
// Rows filtered by the HAVING clause:
final Optional<Row> row = withRetry(() -> table.get(e.getKey()));
final Optional<Row> row = withRetry(() -> table.get(e.getKey(), PARTITION));
assertThat(row, is(Optional.empty()));
});
}
Expand All @@ -673,7 +680,7 @@ private static void verifyRetainedWindows(
rows.forEach(record -> {
final Struct key = asKeyStruct(record.key().key(), query.getPhysicalSchema());
final List<WindowedRow> resultAtWindowStart =
withRetry(() -> table.get(key, Range.all(), Range.all()));
withRetry(() -> table.get(key, PARTITION, Range.all(), Range.all()));

assertThat("Should have fewer windows retained",
resultAtWindowStart,
Expand Down
Loading

0 comments on commit d5fc365

Please sign in to comment.