-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: move selectKey impl to plan builder #3362
feat: move selectKey impl to plan builder #3362
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @rodesai LGTM
ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderTest.java
Outdated
Show resolved
Hide resolved
ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderTest.java
Outdated
Show resolved
Hide resolved
StreamSelectKeyBuilder.build(kstream, selectKey); | ||
|
||
// Then: | ||
verify(rekeyedKstream).mapValues(mapperCaptor.capture()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider moving into method to make tests more readable:
private ValueMapperWithKey<Object, GenericRow, GenericRow> getMapper() {
verify(rekeyedKstream).mapValues(mapperCaptor.capture());
final ValueMapperWithKey<Object, GenericRow, GenericRow> mapper = mapperCaptor.getValue();
}
Same for others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider moving into method to make tests more readable:
private ValueMapperWithKey<Object, GenericRow, GenericRow> getMapper() {
verify(rekeyedKstream).mapValues(mapperCaptor.capture());
final ValueMapperWithKey<Object, GenericRow, GenericRow> mapper = mapperCaptor.getValue();
}
Same for others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
double checked this PR after seeing issues with raw types and generics in the other related PRs... looks like this one has some minor generic issues to clean up too.
final QueryContext.Stacker stacker, | ||
final ExecutionStep<KStream<K, GenericRow>> source, | ||
final String fieldName, | ||
final boolean updateRowKey | ||
) { | ||
final QueryContext queryContext = stacker.getQueryContext(); | ||
return new StreamSelectKey<>( | ||
return (StreamSelectKey<KStream<Struct, GenericRow>>) new StreamSelectKey( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use of raw type.
We need to get the generics right here.
Will StreamSelectKey
always build a KStream<Struct, GenericRow>
regardless of whether it's source is KStream<Windowed<Struct>, GenericRow>
or KStream<Struct, GenericRow>
??? I think it does, right?
That being the case, StreamSelectKey
should be defined as:
public class StreamSelectKey implements ExecutionStep<KStream<Struct, GenericRow>> {
private final ExecutionStep<KStream<?, GenericRow>> source;
...
public StreamSelectKey(
...
final ExecutionStep<KStream<?, GenericRow>> source,
...
)
...
}
And this method should be:
public static StreamSelectKey streamSelectKey(
final QueryContext.Stacker stacker,
final ExecutionStep<KStream<?, GenericRow>> source,
final String fieldName,
final boolean updateRowKey
) {
final QueryContext queryContext = stacker.getQueryContext();
return new StreamSelectKey(
new DefaultExecutionStepProperties(
source.getProperties().getSchema(),
queryContext
),
source,
fieldName,
updateRowKey
);
}
private static final String KEY = "ATL.BOI"; | ||
|
||
@Mock | ||
private KStream kstream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use of raw types - can we please use generic types in tests.
e1ec945
to
79b47d9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rodesai, this looks much better!
|
||
@Immutable | ||
public class StreamSelectKey<S> implements ExecutionStep<S> { | ||
public class StreamSelectKey<K> implements ExecutionStep<KStream<Struct, GenericRow>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need K
here? Can we not just use ExecutionStep<KStream<?, GenericRow>>
for the source
param / field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can't pass an ExecutionStep<KStream<K, GenericRow>>
, which is what SchemaKStream
has, to ExecutionStep<KStream<?, GenericRow>>
. We could make the type ExecutionStep<? extends KStream<?, GenericRow>>
, but this felt easier to read.
StreamSelectKeyBuilder.build(kstream, selectKey); | ||
|
||
// Then: | ||
verify(rekeyedKstream).mapValues(mapperCaptor.capture()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider moving into method to make tests more readable:
private ValueMapperWithKey<Object, GenericRow, GenericRow> getMapper() {
verify(rekeyedKstream).mapValues(mapperCaptor.capture());
final ValueMapperWithKey<Object, GenericRow, GenericRow> mapper = mapperCaptor.getValue();
}
Same for others.
Moves rekeying from SchemaKStream and into an execution plan builder.
79b47d9
to
1f37cdb
Compare
Description
Moves rekeying from SchemaKStream and into an execution plan builder.
Testing done
Added tests for step builder
Reviewer checklist