-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Allow scalable push queries to handle rebalances #7988
feat: Allow scalable push queries to handle rebalances #7988
Conversation
ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRouting.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRouting.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I skimmed the PR, it makes sense to me - though PushQueryRouting is getting a little too large for comfort
serviceContext, pushPhysicalPlan, statement, hosts, outputSchema, | ||
transientQueryQueue, pushConnectionsHandle, false); | ||
// Only check for new nodes if this is the source node | ||
if (!pushRoutingOptions.getIsSkipForwardRequest()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems like a violation of demeter's law - it assumes that !getIsSkipForwardRequest
implies that this is the source node, which requires knowledge about how forwarding works. can we rename that method to something more meaningful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason I copied that name, which I agree is pretty not great. I changed it to getHasBeenForwarded()
which is pretty clear what it means and hopefully shouldn't seem like a violation of encapsulation.
result.close(); | ||
result.updateStatus(RoutingResultStatus.COMPLETE); | ||
}); | ||
LOG.info("Host {} completed request.", node); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have any identifier for what request it completed? Otherwise I'm not sure how helpful this message will be for debugging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I added pushPhysicalPlan.getQueryId() to most of the logging calls to make it a bit easier to understand, though we don't log every request because it could result in a lot of logging. They probably happen less often than pull queries, but we want them to be that lightweight.
I still think there's some value having semi exceptional or uncommon things in the logs to at least know something even if we don't know how to tie it to an exact query. This usually should only happen if a node is shut down which doesn't happen often.
private static Function<ScalablePushRegistry, Set<KsqlNode>> createLoadingCache() { | ||
final LoadingCache<ScalablePushRegistry, Set<KsqlNode>> cache = CacheBuilder.newBuilder() | ||
.maximumSize(40) | ||
.expireAfterWrite(2000, TimeUnit.MILLISECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this meant to be CLUSTER_CHECK_INTERVAL
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it probably should be. We want them to expire at least that often. I'll actually create a constant for this.
public enum RoutingResultStatus { | ||
SUCCESS | ||
IN_PROGRESS, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: javadocs for each of these could help :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, added a bunch of javadoc.
For my understanding, when does something get too large for comfort? When it's logic is trying to encompass too much and is better served by breaking it up more modularly? I just ask because on first glance |
@nateab My barometer is usually when I have to pull up the IDE to effectively review the code, it's getting too large. There isn't a science to this, unfortunately, and everyone has different opinions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! just left some comments
c508a16
to
69daf4d
Compare
69daf4d
to
183aa20
Compare
Description
Currently, we don't handle rebalances explicitly in SPQ. If a new node joins the cluster or is removed from the cluster, we don't do anything. In this PR, it attempts to start a new request to the node or expects the node to drop out of the cluster and tries to keep the connections ongoing, ensuring now data is missed.
Testing done
Ran unit tests.
Reviewer checklist