-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Adds Scalable Push Query physical operators #7430
feat: Adds Scalable Push Query physical operators #7430
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM @AlanConfluent. Just a few questions
} | ||
|
||
private QueryId uniqueQueryId() { | ||
return new QueryId("query_" + System.currentTimeMillis()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make the query ID convey some information about the 'topology' of the query and make the ID richer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. We can include some data source context here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the prefix to SCALABLE_PUSH_QUERY_
.
The only types of queries that contain anything more to the query id are persistent queries because they're user visible in that case. E.g. normal push queries are here: https://github.com/confluentinc/ksql/blob/master/ksqldb-engine/src/main/java/io/confluent/ksql/engine/QueryIdUtil.java#L103. At least at the moment, for pull and push queries, QueryIds aren't used in any meaningful way. I'm happy to try to pipe in some meaningful info like the source name if we'll use it somewhere.
if (currentLogicalNode instanceof PullProjectNode) { | ||
currentPhysicalOp = translateProjectNode((PullProjectNode)currentLogicalNode); | ||
} else if (currentLogicalNode instanceof PullFilterNode) { | ||
currentPhysicalOp = translateFilterNode((PullFilterNode) currentLogicalNode); | ||
} else if (currentLogicalNode instanceof DataSourceNode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we start naming PullProjectNode
and PullFilterNode
to TransientProjectNode
and TransientFilterNode
in the codebase now that they are being used in different types of queries? This would make the naming more generic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Called it QueryProjectNode, per offline discussion.
@Override | ||
public boolean droppedRows() { | ||
return processingQueue.hasDroppedRows(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is our intent exactly when we detect that some rows were dropped?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our intent is that we'll throw an error and stop the push query. This effectively means that the requester is reading too slowly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG!
import java.util.Objects; | ||
|
||
/** | ||
* Traverses the logical plan top-down and creates a physical plan for pull queries. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit typo: push queries in a few places on comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good catch. Updated.
} | ||
prevPhysicalOp = currentPhysicalOp; | ||
// Exit the loop when a leaf node is reached | ||
if (currentLogicalNode.getSources().isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, is it true that only DataSourceNode
falls in this case today?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Once you get to the DataSourceNode, that should be it, at least in our current setup.
} | ||
|
||
private QueryId uniqueQueryId() { | ||
return new QueryId("query_" + System.currentTimeMillis()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. We can include some data source context here.
|
||
private final DataSourceNode logicalNode; | ||
private final ScalablePushRegistry scalablePushRegistry; | ||
private final ProcessingQueue processingQueue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not find these two classes in the PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind, just realized it was not against master
:) will review the other 7424.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that first PR is merged, so hopefully it's easier to deal with.
return schema; | ||
} | ||
|
||
public ScalablePushRegistry getScalablePushRegistry() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This getter, and hence the member field, seems not used anywhere in this or the other PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other than the above comments, LGTM.
My only question is around when / how the PushPhysicalPlanBuilder
constructor would be called and how we pick which PersistentQueryMetadata
parameter to pass in. I will read for the next PR :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My only question is around when / how the PushPhysicalPlanBuilder constructor would be called and how we pick which PersistentQueryMetadata parameter to pass in. I will read for the next PR
Yes, it's coming next!
return schema; | ||
} | ||
|
||
public ScalablePushRegistry getScalablePushRegistry() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used in another PR.
import java.util.Objects; | ||
|
||
/** | ||
* Traverses the logical plan top-down and creates a physical plan for pull queries. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good catch. Updated.
} | ||
prevPhysicalOp = currentPhysicalOp; | ||
// Exit the loop when a leaf node is reached | ||
if (currentLogicalNode.getSources().isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Once you get to the DataSourceNode, that should be it, at least in our current setup.
} | ||
|
||
private QueryId uniqueQueryId() { | ||
return new QueryId("query_" + System.currentTimeMillis()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the prefix to SCALABLE_PUSH_QUERY_
.
The only types of queries that contain anything more to the query id are persistent queries because they're user visible in that case. E.g. normal push queries are here: https://github.com/confluentinc/ksql/blob/master/ksqldb-engine/src/main/java/io/confluent/ksql/engine/QueryIdUtil.java#L103. At least at the moment, for pull and push queries, QueryIds aren't used in any meaningful way. I'm happy to try to pipe in some meaningful info like the source name if we'll use it somewhere.
|
||
private final DataSourceNode logicalNode; | ||
private final ScalablePushRegistry scalablePushRegistry; | ||
private final ProcessingQueue processingQueue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that first PR is merged, so hopefully it's easier to deal with.
@Override | ||
public boolean droppedRows() { | ||
return processingQueue.hasDroppedRows(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our intent is that we'll throw an error and stop the push query. This effectively means that the requester is reading too slowly.
if (currentLogicalNode instanceof PullProjectNode) { | ||
currentPhysicalOp = translateProjectNode((PullProjectNode)currentLogicalNode); | ||
} else if (currentLogicalNode instanceof PullFilterNode) { | ||
currentPhysicalOp = translateFilterNode((PullFilterNode) currentLogicalNode); | ||
} else if (currentLogicalNode instanceof DataSourceNode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Called it QueryProjectNode, per offline discussion.
421930d
to
846e7c0
Compare
@AlanConfluent maybe rebase the PR? :) |
@guozhangwang I had previous had this pointed a branch with the previous PR, but have since rebased on master after merging it. Should be good now. |
Description
This adds the Scalable Push Query physical operators. It introduces an operator
PeekStreamOperator
which registers aProcessingQueue
with theScalablePushRegistry
. This new operator is combined with existing pull query operators such asProjectOperator
andSelectOperator
to create a full query execution. These operators are created with newly introducedPushPhysicalPlanBuilder
, which then creates aPushPhysicalPlan
which actually does the execution.Note that
PushPhysicalPlan
executes async on a Vert.xContext
. The idea is that all of the passing of rows doesn't require any dedicated threads, and so many requests can be executing at once and be long-running without taxing threadpools.Also, this PR includes moving those common operators that are now used by both pull and push to a common package.
Testing done
Ran unit tests.
Reviewer checklist