Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KSQL-508: add metrics interceptors instead of replacing #585

Merged
merged 7 commits into from
Jan 9, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyDescription;

import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

Expand Down Expand Up @@ -132,12 +135,14 @@ private QueryMetadata buildPlanForBareQuery(final QueuedSchemaKStream schemaKStr

final String applicationId = addTimeSuffix(getBareQueryApplicationId(serviceId, transientQueryPrefix));

KafkaStreams streams = buildStreams(builder, applicationId, ksqlConfig, overriddenStreamsProperties);
StreamsConfig streamsConfig = buildStreamsConfig(applicationId, ksqlConfig, overriddenStreamsProperties);
KafkaStreams streams = buildStreams(builder, streamsConfig);

SchemaKStream sourceSchemaKstream = schemaKStream.getSourceSchemaKStreams().get(0);

return new QueuedQueryMetadata(
statement,
streamsConfig,
streams,
bareOutputNode,
schemaKStream.getExecutionPlan(""),
Expand Down Expand Up @@ -189,12 +194,13 @@ private QueryMetadata buildPlanForStructuredOutputNode(String sqlExpression, fin
final QueryId queryId = sinkDataSource.getPersistentQueryId();
final String applicationId = serviceId + persistanceQueryPrefix + queryId;

KafkaStreams streams = buildStreams(builder, applicationId, ksqlConfig, overriddenStreamsProperties);
StreamsConfig streamsConfig = buildStreamsConfig(applicationId, ksqlConfig, overriddenStreamsProperties);
KafkaStreams streams = buildStreams(builder, streamsConfig);

TopologyDescription topologyDescription = builder.build().describe();

return new PersistentQueryMetadata(statement,
streams, outputNode, schemaKStream
streamsConfig, streams, outputNode, schemaKStream
.getExecutionPlan(""), queryId,
(schemaKStream instanceof SchemaKTable) ? DataSource
.DataSourceType.KTABLE : DataSource.DataSourceType
Expand All @@ -214,12 +220,23 @@ private String addTimeSuffix(String original) {
return String.format("%s_%d", original, System.currentTimeMillis());
}

private KafkaStreams buildStreams(
final StreamsBuilder builder,
final String applicationId,
final KsqlConfig ksqlConfig,
final Map<String, Object> overriddenProperties
) {
private void updateListProperty(Map<String, Object> properties, String key, Object value) {
Object obj = properties.getOrDefault(key, new java.util.LinkedList<String>());
Copy link
Contributor

@satybald satybald Jan 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there no need to import LL with java.util full path. LinkedList is enough as it already imported.

List valueList;
if (obj instanceof String) {
String asString = (String)obj;
valueList = new LinkedList<>(Arrays.asList(asString));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why Arrays.asList(...) when you have a string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The setting accepts a comma-separated list, and there was a .split("\s*,\s*") that got dropped somehow when I was moving some code around. Fixing.

} else if (obj instanceof List) {
valueList = (List) obj;
} else {
throw new KsqlException("Expecting list or string for property: " + key);
}
valueList.add(value);
properties.put(key, valueList);
}

private StreamsConfig buildStreamsConfig(final String applicationId, final KsqlConfig ksqlConfig,
final Map<String, Object> overriddenProperties) {
Map<String, Object> newStreamsProperties = ksqlConfig.getKsqlStreamConfigProps();
newStreamsProperties.putAll(overriddenProperties);
newStreamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
Expand All @@ -239,9 +256,15 @@ private KafkaStreams buildStreams(
newStreamsProperties.put(
StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, KsqlTimestampExtractor.class);
}
newStreamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), ConsumerCollector.class.getCanonicalName());
newStreamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG), ProducerCollector.class.getCanonicalName());
return new KafkaStreams(builder.build(), new StreamsConfig(newStreamsProperties));
updateListProperty(newStreamsProperties, StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
ConsumerCollector.class.getCanonicalName());
updateListProperty(newStreamsProperties, StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG),
ProducerCollector.class.getCanonicalName());
return new StreamsConfig(newStreamsProperties);
}

private KafkaStreams buildStreams(final StreamsBuilder builder, final StreamsConfig config) {
return new KafkaStreams(builder.build(), config);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;

import java.util.Objects;

Expand All @@ -34,6 +35,7 @@ public class PersistentQueryMetadata extends QueryMetadata {


public PersistentQueryMetadata(final String statementString,
final StreamsConfig streamsConfig,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the streamsConfig need to be passed around? Its an internal config state. I can only see it being used in tests - in which case it shouldn't be passed around.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its just for testing. It was a style choice between making the streams config for a query readable or exposing the function that builds it. Now that I think about it a bit more it would be cleaner to just pull constructing the KafkaStreams out into its own interface/impl that I can mock.

final KafkaStreams kafkaStreams,
final OutputNode outputNode,
final String executionPlan,
Expand All @@ -45,7 +47,7 @@ public PersistentQueryMetadata(final String statementString,
final Schema resultSchema,
final KsqlTopic resultTopic,
final String topology) {
super(statementString, kafkaStreams, outputNode, executionPlan, dataSourceType,
super(statementString, streamsConfig, kafkaStreams, outputNode, executionPlan, dataSourceType,
queryApplicationId, kafkaTopicClient, ksqlConfig, topology);
this.id = id;
this.resultSchema = resultSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.ksql.planner.plan.OutputNode;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,6 +30,7 @@ public class QueryMetadata {

private static final Logger log = LoggerFactory.getLogger(QueryMetadata.class);
private final String statementString;
private final StreamsConfig streamsConfig;
private final KafkaStreams kafkaStreams;
private final OutputNode outputNode;
private final String executionPlan;
Expand All @@ -40,6 +42,7 @@ public class QueryMetadata {


public QueryMetadata(final String statementString,
final StreamsConfig streamsConfig,
final KafkaStreams kafkaStreams,
final OutputNode outputNode,
final String executionPlan,
Expand All @@ -49,6 +52,7 @@ public QueryMetadata(final String statementString,
final KsqlConfig ksqlConfig,
String topoplogy) {
this.statementString = statementString;
this.streamsConfig = streamsConfig;
this.kafkaStreams = kafkaStreams;
this.outputNode = outputNode;
this.executionPlan = executionPlan;
Expand All @@ -63,6 +67,10 @@ public String getStatementString() {
return statementString;
}

public StreamsConfig getStreamsConfig() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can only see this in the tests... looks like it shouldnt be here...

return streamsConfig;
}

public KafkaStreams getKafkaStreams() {
return kafkaStreams;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;

import java.util.Objects;
import java.util.concurrent.BlockingQueue;
Expand All @@ -32,6 +33,7 @@ public class QueuedQueryMetadata extends QueryMetadata {

public QueuedQueryMetadata(
final String statementString,
final StreamsConfig streamsConfig,
final KafkaStreams kafkaStreams,
final OutputNode outputNode,
final String executionPlan,
Expand All @@ -41,7 +43,7 @@ public QueuedQueryMetadata(
final KafkaTopicClient kafkaTopicClient,
final KsqlConfig ksqlConfig
) {
super(statementString, kafkaStreams, outputNode, executionPlan, dataSourceType,
super(statementString, streamsConfig, kafkaStreams, outputNode, executionPlan, dataSourceType,
queryApplicationId, kafkaTopicClient, ksqlConfig, "not-applicable-for-queued");
this.rowQueue = rowQueue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ private List<QueryMetadata> getQueryMetadata(QueryId queryid, DataSource.DataSou
queryStreams.start();
expectLastCall();
PersistentQueryMetadata persistentQueryMetadata = new PersistentQueryMetadata(queryid.toString(),
null,
queryStreams,
null,
"",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.MetastoreUtil;
import io.confluent.ksql.metrics.ConsumerCollector;
import io.confluent.ksql.metrics.ProducerCollector;
import io.confluent.ksql.planner.plan.KsqlBareOutputNode;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.serde.DataSource;
Expand All @@ -29,14 +31,19 @@
import io.confluent.ksql.util.QueryMetadata;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.*;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -50,6 +57,13 @@ public class PhysicalPlanBuilderTest {

@Before
public void before() {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
final FunctionRegistry functionRegistry = new FunctionRegistry();
physicalPlanBuilder = buildPhysicalPlanBuilder(Collections.emptyMap());
planBuilder = new LogicalPlanBuilder(metaStore);
}

private PhysicalPlanBuilder buildPhysicalPlanBuilder(Map<String, Object> overrideProperties) {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
final FunctionRegistry functionRegistry = new FunctionRegistry();
Map<String, Object> configMap = new HashMap<>();
Expand All @@ -58,18 +72,17 @@ public void before() {
configMap.put("commit.interval.ms", 0);
configMap.put("cache.max.bytes.buffering", 0);
configMap.put("auto.offset.reset", "earliest");
physicalPlanBuilder = new PhysicalPlanBuilder(streamsBuilder,
return new PhysicalPlanBuilder(streamsBuilder,
new KsqlConfig(configMap),
new FakeKafkaTopicClient(),
new MetastoreUtil(),
functionRegistry,
Collections.emptyMap(),
overrideProperties,
false,
metaStore,
new MockSchemaRegistryClient()
new MockSchemaRegistryClient()
);

planBuilder = new LogicalPlanBuilder(metaStore);
}

private QueryMetadata buildPhysicalPlan(final String query) throws Exception {
Expand Down Expand Up @@ -105,4 +118,107 @@ public void shouldCreateExecutionPlan() throws Exception {
Assert.assertEquals(lines[5], "\t\t\t\t\t\t\t\t\t\t > [ SOURCE ] Schema: [TEST1.COL0 : INT64 , TEST1.COL1 : STRING , TEST1.COL2 : STRING , TEST1.COL3 : FLOAT64 , TEST1.COL4 : ARRAY , TEST1.COL5 : MAP].");
}

@Test
public void shouldAddMetricsInterceptors() throws Exception {
final QueryMetadata queryMetadata = buildPhysicalPlan(simpleSelectFilter);
StreamsConfig config = queryMetadata.getStreamsConfig();

Object val = config.originals().get(
StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG));
Assert.assertTrue(val instanceof List);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertThat(val, instanceOf(List.class))

List<String> consumerInterceptors = (List<String>) val;
Assert.assertEquals(1, consumerInterceptors.size());
Assert.assertEquals(ConsumerCollector.class, Class.forName(consumerInterceptors.get(0)));

val = config.originals().get(
StreamsConfig.producerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG));
Assert.assertTrue(val instanceof List);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

List<String> producerInterceptors = (List<String>) val;
Assert.assertEquals(1, producerInterceptors.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next 2 lines can be replaced with assertThat(expectedList, equalTo(actualList))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had something like that originally. Felt like the current check was more robust. We're checking that the name passed to the class loader results in our interceptor class being loaded.

Assert.assertEquals(ProducerCollector.class, Class.forName(producerInterceptors.get(0)));
}

public static class DummyConsumerInterceptor implements ConsumerInterceptor {
public ConsumerRecords onConsume(ConsumerRecords consumerRecords) {
return consumerRecords;
}
public void close() { }
public void onCommit(Map map) { }
public void configure(Map<String, ?> map) { }
}

public static class DummyProducerInterceptor implements ProducerInterceptor {
public void onAcknowledgement(RecordMetadata rm, Exception e) {}
public ProducerRecord onSend(ProducerRecord producerRecords) {
return producerRecords;
}
public void close() { }
public void configure(Map<String, ?> map) { }
}

@Test
public void shouldAddMetricsInterceptorsToExistingList() throws Exception {
// Initialize override properties with lists for producer/consumer interceptors
Map<String, Object> overrideProperties = new HashMap<>();
List<String> consumerInterceptors = new LinkedList<>();
consumerInterceptors.add(DummyConsumerInterceptor.class.getName());
overrideProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
consumerInterceptors);
List<String> producerInterceptors = new LinkedList<>();
producerInterceptors.add(DummyProducerInterceptor.class.getName());
overrideProperties.put(StreamsConfig.producerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
producerInterceptors);

physicalPlanBuilder = buildPhysicalPlanBuilder(overrideProperties);

final QueryMetadata queryMetadata = buildPhysicalPlan(simpleSelectFilter);

StreamsConfig config = queryMetadata.getStreamsConfig();
Object val = config.originals().get(
StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG));
Assert.assertTrue(val instanceof List);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

consumerInterceptors = (List<String>) val;
Assert.assertEquals(2, consumerInterceptors.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

Assert.assertEquals(DummyConsumerInterceptor.class.getName(), consumerInterceptors.get(0));
Assert.assertEquals(ConsumerCollector.class, Class.forName(consumerInterceptors.get(1)));

val = config.originals().get(
StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG));
Assert.assertTrue(val instanceof List);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

producerInterceptors = (List<String>) val;
Assert.assertEquals(2, producerInterceptors.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

Assert.assertEquals(DummyProducerInterceptor.class.getName(), producerInterceptors.get(0));
Assert.assertEquals(ProducerCollector.class, Class.forName(producerInterceptors.get(1)));
}

@Test
public void shouldAddMetricsInterceptorsToExistingString() throws Exception {
// Initialize override properties with class name strings for producer/consumer interceptors
Map<String, Object> overrideProperties = new HashMap<>();
overrideProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
DummyConsumerInterceptor.class.getName());
overrideProperties.put(StreamsConfig.producerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
DummyProducerInterceptor.class.getName());
physicalPlanBuilder = buildPhysicalPlanBuilder(overrideProperties);

final QueryMetadata queryMetadata = buildPhysicalPlan(simpleSelectFilter);

StreamsConfig config = queryMetadata.getStreamsConfig();

Object val = config.originals().get(
StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG));
Assert.assertTrue(val instanceof List);
List<String> consumerInterceptors = (List<String>) val;
Assert.assertEquals(2, consumerInterceptors.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

Assert.assertEquals(DummyConsumerInterceptor.class.getName(), consumerInterceptors.get(0));
Assert.assertEquals(ConsumerCollector.class, Class.forName(consumerInterceptors.get(1)));

val = config.originals().get(
StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG));
Assert.assertTrue(val instanceof List);
List<String> producerInterceptors = (List<String>) val;
Assert.assertEquals(2, producerInterceptors.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

Assert.assertEquals(DummyProducerInterceptor.class.getName(), producerInterceptors.get(0));
Assert.assertEquals(ProducerCollector.class, Class.forName(producerInterceptors.get(1)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public void shouldValidatePersistentQueryResultCorrectly()
KsqlTopic resultTopic = new KsqlTopic("testTopic", "testTopic", new KsqlAvroTopicSerDe());
Schema resultSchema = SerDeUtil.getSchemaFromAvro(ordersAveroSchemaStr);
PersistentQueryMetadata persistentQueryMetadata = new PersistentQueryMetadata("",
null,
null,
null,
"",
Expand Down Expand Up @@ -141,6 +142,7 @@ public void shouldFailForInvalidResultAvroSchema()
());
Schema resultSchema = SerDeUtil.getSchemaFromAvro(ordersAveroSchemaStr);
PersistentQueryMetadata persistentQueryMetadata = new PersistentQueryMetadata("",
null,
null,
null,
"",
Expand Down
Loading