-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: stream pull query internal overrides shouldn't clash with query configs #8166
Conversation
final KsqlRequest request = | ||
new KsqlRequest( | ||
"SELECT USERID, PAGEID, VIEWTIME from " + PAGE_VIEW_STREAM + ";", | ||
ImmutableMap.of("auto.offset.reset", "lastest"), // should get ignored |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @vpapavas , I did also verify I got the same exception with exactly the config you reported (earliest
). I figured I could make this test run double-duty as a regression test for the problem you reported and as a test that we really override the property, since setting the config to latest should not return the rows below. So we are also verifying that the override in the code works properly.
final Map<String, Object> overrides = | ||
new HashMap<>(statementOrig.getSessionConfig().getOverrides()); | ||
// Starting from earliest is semantically necessary. | ||
overrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | ||
// Using a single thread keeps these queries as lightweight as possible, since we are | ||
// not counting them against the transient query limit. | ||
overrides.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); | ||
// There's no point in EOS, since this query only produces side effects. | ||
overrides.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE); | ||
|
||
final ConfiguredStatement<Query> statement = statementOrig.withConfigOverrides(overrides); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bug was that I assumed the ImmutableMap builder would let me put in the desired overrides after the putAll
(like a regular map) and then become immutable only after you call build
, but it turns out it actually checks that you put each key only once.
I just went ahead and made a copy of the input map (so it will be mutable) and then did the overrides.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @vvcephei!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @vvcephei! LGTM!
Previously, if you
SET 'auto.offset.reset'='earliest'
or'latest'
, the query would failwith an exception. This patch fixes that and simply overrides the configs as desired.
Reviewer checklist