-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KSQL-508: add metrics interceptors instead of replacing #585
Conversation
Have the physical plan builder add metrics interceptors instead of replacing the existing ones. It looks at the current setting, and builds a list of interceptor classes/names out of it. Finally, the builder appends the metrics collector interceptor to the list. To test these changes I added an accessor for the underlying stream config to QueryMetadata. The tests pull the configured interceptors from the stream config and verify that they include both the client and the metrics interceptors.
List valueList; | ||
if (obj instanceof String) { | ||
String asString = (String)obj; | ||
valueList = new LinkedList<>(Arrays.asList(asString)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why Arrays.asList(...)
when you have a string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The setting accepts a comma-separated list, and there was a .split("\s*,\s*") that got dropped somehow when I was moving some code around. Fixing.
|
||
Object val = config.originals().get( | ||
StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG)); | ||
Assert.assertTrue(val instanceof List); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertThat(val, instanceOf(List.class))
|
||
val = config.originals().get( | ||
StreamsConfig.producerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG)); | ||
Assert.assertTrue(val instanceof List); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
StreamsConfig.producerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG)); | ||
Assert.assertTrue(val instanceof List); | ||
List<String> producerInterceptors = (List<String>) val; | ||
Assert.assertEquals(1, producerInterceptors.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
next 2 lines can be replaced with assertThat(expectedList, equalTo(actualList))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had something like that originally. Felt like the current check was more robust. We're checking that the name passed to the class loader results in our interceptor class being loaded.
StreamsConfig config = queryMetadata.getStreamsConfig(); | ||
Object val = config.originals().get( | ||
StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG)); | ||
Assert.assertTrue(val instanceof List); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG)); | ||
Assert.assertTrue(val instanceof List); | ||
consumerInterceptors = (List<String>) val; | ||
Assert.assertEquals(2, consumerInterceptors.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
|
||
val = config.originals().get( | ||
StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG)); | ||
Assert.assertTrue(val instanceof List); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG)); | ||
Assert.assertTrue(val instanceof List); | ||
producerInterceptors = (List<String>) val; | ||
Assert.assertEquals(2, producerInterceptors.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG)); | ||
Assert.assertTrue(val instanceof List); | ||
List<String> consumerInterceptors = (List<String>) val; | ||
Assert.assertEquals(2, consumerInterceptors.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG)); | ||
Assert.assertTrue(val instanceof List); | ||
List<String> producerInterceptors = (List<String>) val; | ||
Assert.assertEquals(2, producerInterceptors.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only 1 question really as you will see below. The change is trivial however why does it get passed around to the Meta objects? Its its for testing then it should be contained and tested in the owning class and not passed through APIs. This would reduce the amount of code changes required. (Unless I'm missing something!)
@@ -34,6 +35,7 @@ | |||
|
|||
|
|||
public PersistentQueryMetadata(final String statementString, | |||
final StreamsConfig streamsConfig, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does the streamsConfig need to be passed around? Its an internal config state. I can only see it being used in tests - in which case it shouldn't be passed around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its just for testing. It was a style choice between making the streams config for a query readable or exposing the function that builds it. Now that I think about it a bit more it would be cleaner to just pull constructing the KafkaStreams out into its own interface/impl that I can mock.
@@ -63,6 +67,10 @@ public String getStatementString() { | |||
return statementString; | |||
} | |||
|
|||
public StreamsConfig getStreamsConfig() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can only see this in the tests... looks like it shouldnt be here...
- Dont pass the stream configs around just for the sake of testing. Instead, factor out the last step of KafkaStreams construction into a factory interface that we can fake out in the test to intercept the args, which includes the streams config - Split the client interceptor config before turning it into a list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
import org.apache.kafka.streams.StreamsBuilder; | ||
import org.apache.kafka.streams.StreamsConfig; | ||
|
||
public interface KafkaStreamsBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is much better - it would be nice if you could create a fixture or mock that has internal state which is used validate its use; this would be a pita and result in a test thats a bit too functional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went down that path (using EasyMock) originally but there is so much boilerplate. And its not really adding any more validation than what's being done here. Both approaches validate that KafkaStreamsBuilder is used once and check that the config is consistent with what is expected. I think the only additional thing the mock would do is validate how the mock is used with respect to other mocks, which the test doesn't use anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, assuming the build passes
retest this please |
retest this please |
@@ -214,6 +234,23 @@ private String addTimeSuffix(String original) { | |||
return String.format("%s_%d", original, System.currentTimeMillis()); | |||
} | |||
|
|||
private void updateListProperty(Map<String, Object> properties, String key, Object value) { | |||
Object obj = properties.getOrDefault(key, new java.util.LinkedList<String>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess there no need to import LL with java.util full path. LinkedList
is enough as it already imported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Have the physical plan builder add metrics interceptors instead of
replacing the existing ones. It looks at the current setting, and
builds a list of interceptor classes/names out of it. Finally, the
builder appends the metrics collector interceptor to the list.
To test these changes I added an accessor for the underlying stream
config to QueryMetadata. The tests pull the configured interceptors
from the stream config and verify that they include both the client
and the metrics interceptors.