Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow scalable push queries to handle rebalances #7988

Merged
merged 8 commits into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ private void doSubscribe(final Subscriber<? super T> subscriber) {
} catch (final Throwable t) {
sendError(new IllegalStateException("Exception encountered in onSubscribe", t));
}
if (isFailed()) {
sendError(new IllegalStateException(
"Cannot subscribe to failed publisher. Failure cause: " + failure));
}
afterSubscribe();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,13 @@ public class KsqlConfig extends AbstractConfig {
+ "functions, aggregations, or joins, but may include projections and filters.";
public static final boolean KSQL_QUERY_PUSH_SCALABLE_ENABLED_DEFAULT = false;

public static final String KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY
= "ksql.query.push.scalable.new.node.continuity";
public static final String KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY_DOC =
"Whether new node continuity is enforced for scalable push queries. This means that it's an "
+ "error for an existing query to miss data processed on a newly added node";
public static final boolean KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY_DEFAULT = false;

public static final String KSQL_QUERY_PUSH_SCALABLE_INTERPRETER_ENABLED
= "ksql.query.push.scalable.interpreter.enabled";
public static final String KSQL_QUERY_PUSH_SCALABLE_INTERPRETER_ENABLED_DOC =
Expand Down Expand Up @@ -901,6 +908,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_QUERY_PUSH_SCALABLE_ENABLED_DOC
)
.define(
KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY,
Type.BOOLEAN,
KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY_DEFAULT,
Importance.LOW,
KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY_DOC
)
.define(
KSQL_QUERY_PUSH_SCALABLE_INTERPRETER_ENABLED,
Type.BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ public class KsqlRequestConfig extends AbstractConfig {
private static final String KSQL_REQUEST_QUERY_PUSH_SKIP_FORWARDING_DOC =
"Controls whether a ksql host forwards a push query request to another host";

public static final String KSQL_REQUEST_QUERY_PUSH_REGISTRY_START =
"request.ksql.query.push.registry.start";
public static final boolean KSQL_REQUEST_QUERY_PUSH_REGISTRY_START_DEFAULT = false;
private static final String KSQL_REQUEST_QUERY_PUSH_REGISTRY_START_DOC =
"Indicates whether a connecting node expects to be at the start of the registry data. After a"
+ "rebalance, this ensures we don't miss any data.";

private static ConfigDef buildConfigDef() {
final ConfigDef configDef = new ConfigDef()
.define(
Expand Down Expand Up @@ -88,6 +95,12 @@ private static ConfigDef buildConfigDef() {
KSQL_REQUEST_QUERY_PUSH_SKIP_FORWARDING_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_REQUEST_QUERY_PUSH_SKIP_FORWARDING_DOC
).define(
KSQL_REQUEST_QUERY_PUSH_REGISTRY_START,
Type.BOOLEAN,
KSQL_REQUEST_QUERY_PUSH_REGISTRY_START_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_REQUEST_QUERY_PUSH_REGISTRY_START_DOC
);
return configDef;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.confluent.ksql.physical.pull.PullQueryResult;
import io.confluent.ksql.physical.scalablepush.PushPhysicalPlan;
import io.confluent.ksql.physical.scalablepush.PushPhysicalPlanBuilder;
import io.confluent.ksql.physical.scalablepush.PushQueryPreparer;
import io.confluent.ksql.physical.scalablepush.PushQueryQueuePopulator;
import io.confluent.ksql.physical.scalablepush.PushRouting;
import io.confluent.ksql.physical.scalablepush.PushRoutingOptions;
Expand Down Expand Up @@ -272,7 +273,8 @@ ScalablePushQueryMetadata executeScalablePushQuery(
final PushPhysicalPlan physicalPlan = buildScalablePushPhysicalPlan(
logicalPlan,
analysis,
context
context,
pushRoutingOptions
);
final TransientQueryQueue transientQueryQueue
= new TransientQueryQueue(analysis.getLimitClause());
Expand All @@ -285,12 +287,15 @@ ScalablePushQueryMetadata executeScalablePushQuery(
final PushQueryQueuePopulator populator = () ->
pushRouting.handlePushQuery(serviceContext, physicalPlan, statement, pushRoutingOptions,
physicalPlan.getOutputSchema(), transientQueryQueue);
final PushQueryPreparer preparer = () ->
pushRouting.preparePushQuery(physicalPlan, statement, pushRoutingOptions);
final ScalablePushQueryMetadata metadata = new ScalablePushQueryMetadata(
physicalPlan.getOutputSchema(),
physicalPlan.getQueryId(),
transientQueryQueue,
resultType,
populator
populator,
preparer
);

return metadata;
Expand Down Expand Up @@ -452,12 +457,14 @@ private LogicalPlanNode buildAndValidateLogicalPlan(
private PushPhysicalPlan buildScalablePushPhysicalPlan(
final LogicalPlanNode logicalPlan,
final ImmutableAnalysis analysis,
final Context context
final Context context,
final PushRoutingOptions pushRoutingOptions
) {

final PushPhysicalPlanBuilder builder = new PushPhysicalPlanBuilder(
engineContext.getProcessingLogContext(),
ScalablePushQueryExecutionUtil.findQuery(engineContext, analysis)
ScalablePushQueryExecutionUtil.findQuery(engineContext, analysis),
pushRoutingOptions.getExpectingStartOfRegistryData()
);
return builder.buildPushPhysicalPlan(logicalPlan, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,12 @@ public boolean isClosed() {

private void maybeNext(final Publisher publisher) {
List<?> row;
while (!isErrored(publisher) && (row = (List<?>)next()) != null) {
while (!isErrored(publisher) && (row = (List<?>) next(publisher)) != null) {
publisher.accept(row);
}
if (publisher.isFailed()) {
return;
}
if (!closed) {
if (timer >= 0) {
context.owner().cancelTimer(timer);
Expand Down Expand Up @@ -112,14 +115,23 @@ private boolean isErrored(final Publisher publisher) {

private void open(final Publisher publisher) {
VertxUtils.checkContext(context);
dataSourceOperator.setNewRowCallback(() -> context.runOnContext(v -> maybeNext(publisher)));
root.open();
maybeNext(publisher);
try {
dataSourceOperator.setNewRowCallback(() -> context.runOnContext(v -> maybeNext(publisher)));
root.open();
maybeNext(publisher);
} catch (Throwable t) {
publisher.sendException(t);
}
}

private Object next() {
private Object next(final Publisher publisher) {
VertxUtils.checkContext(context);
return root.next();
try {
return root.next();
} catch (final Throwable t) {
publisher.sendException(t);
return null;
}
}

public void close() {
Expand Down Expand Up @@ -149,6 +161,11 @@ public ScalablePushRegistry getScalablePushRegistry() {
return scalablePushRegistry;
}

@SuppressFBWarnings(value = "EI_EXPOSE_REP")
public Context getContext() {
return context;
}

public static class Publisher extends BufferedPublisher<List<?>> {

public Publisher(final Context ctx) {
Expand All @@ -162,5 +179,13 @@ public void reportDroppedRows() {
public void reportHasError() {
sendError(new RuntimeException("Persistent query has error"));
}

public void sendException(final Throwable e) {
sendError(e);
}

public boolean isFailed() {
return super.isFailed();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,20 @@ public class PushPhysicalPlanBuilder {

private final ProcessingLogContext processingLogContext;
private final PersistentQueryMetadata persistentQueryMetadata;
private final boolean expectingStartOfRegistryData;
private final Stacker contextStacker;
private final QueryId queryId;

public PushPhysicalPlanBuilder(
final ProcessingLogContext processingLogContext,
final PersistentQueryMetadata persistentQueryMetadata
final PersistentQueryMetadata persistentQueryMetadata,
final boolean expectingStartOfRegistryData
) {
this.processingLogContext = Objects.requireNonNull(
processingLogContext, "processingLogContext");
this.persistentQueryMetadata = Objects.requireNonNull(
persistentQueryMetadata, "persistentQueryMetadata");
this.expectingStartOfRegistryData = expectingStartOfRegistryData;
this.contextStacker = new Stacker();
queryId = uniqueQueryId();
}
Expand Down Expand Up @@ -160,7 +163,8 @@ private AbstractPhysicalOperator translateDataSourceNode(
final ScalablePushRegistry scalablePushRegistry =
persistentQueryMetadata.getScalablePushRegistry()
.orElseThrow(() -> new IllegalStateException("Scalable push registry cannot be found"));
return new PeekStreamOperator(scalablePushRegistry, logicalNode, queryId);
return new PeekStreamOperator(scalablePushRegistry, logicalNode, queryId,
expectingStartOfRegistryData);
}

private QueryId uniqueQueryId() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.physical.scalablepush;

public interface PushQueryPreparer {

void prepare();
}
Loading