-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Fixes race condition exposing uninitialized query #7627
fix: Fixes race condition exposing uninitialized query #7627
Conversation
@@ -251,6 +251,9 @@ private void registerQuery( | |||
final QueryMetadata query, | |||
final boolean createAsQuery | |||
) { | |||
// First initialize the query. Once it's added to these maps/sets, it's exposed to other | |||
// threads and it's important that it's initialized. | |||
query.initialize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this :). We can't initialize it here without some additional changes. In the upgrade case the previous query would still be running, and would have a lock on the state store dirs. This initialize call would throw because it can't acquire the lock. We could refactor to:
- if the query already exists in the map, stop and unregister it
- initialize this query
- register this query
Alternatively we could refactor QueryMetadata to be able to deal with an uninitialized Kafka Streams, and do the initialization transparently when the query is started. I think this is my preference because it's less brittle for the caller - you shouldn't get NPEs by calling methods of a non-null object - if you do it's a bug in the class you're calling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I definitely agree that this contract to the caller that the QueryMetadata
is invalid until initialize
is called is brittle and the ideal solution would be to make the contract easier to deal with and hopefully not throw NPEs if you violate the contract.
I was looking into QueryMetadataImpl
and it seems to assume that it's initialized once and then is immutable (and therefore threadsafe) because there's no synchronization and fields are largely not updated. The fact that there's a reset call on PersistentQueryMetadataImpl
which resets kafkaStreams is a little worrying though it only seems to be called from tests. Also, fields like closed
and everStarted
are not volatile as they likely should be. (Am I right here? Happy to make that fix since I'm already looking here.)
I could make the changes you suggest, though allowing for the object to be mutable after it's exposed publicly would require adding a lot of synchronization and complicating QueryMetadataImpl
a bit. I'm fine to do that, but wanted to keep things simple to target this race condition, so did your first suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Description
Fixes a race that was caused by exposing a query via the map
persistentQueries
or setallLiveQueries
without having initialized it. This is done in particular inHeartbeatAgent
andRemoteHostExecutor
. For example, when calling:The list of persistent queries isn't guaranteed to be initialized and the calls to the query in
DiscoverRemoteHostsUtil
such asgetState()
andgetAllMetadata()
will fail with NPEs.Testing done
Unit tests.
Reviewer checklist