Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KSQL-508: add metrics interceptors instead of replacing #585

Merged
merged 7 commits into from
Jan 9, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
**/

package io.confluent.ksql.physical;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;

public interface KafkaStreamsBuilder {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is much better - it would be nice if you could create a fixture or mock that has internal state which is used validate its use; this would be a pita and result in a test thats a bit too functional

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went down that path (using EasyMock) originally but there is so much boilerplate. And its not really adding any more validation than what's being done here. Both approaches validate that KafkaStreamsBuilder is used once and check that the config is consistent with what is expected. I think the only additional thing the mock would do is validate how the mock is used with respect to other mocks, which the test doesn't use anyway.

KafkaStreams buildKafkaStreams(StreamsBuilder builder, StreamsConfig conf);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
**/

package io.confluent.ksql.physical;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;

public class KafkaStreamsBuilderImpl implements KafkaStreamsBuilder {
@Override
public KafkaStreams buildKafkaStreams(StreamsBuilder builder, StreamsConfig conf) {
return new KafkaStreams(builder.build(), conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class PhysicalPlanBuilder {
private final MetaStore metaStore;
private final boolean updateMetastore;
private final SchemaRegistryClient schemaRegistryClient;
private final KafkaStreamsBuilder kafkaStreamsBuilder;

public PhysicalPlanBuilder(final StreamsBuilder builder,
final KsqlConfig ksqlConfig,
Expand All @@ -76,7 +77,8 @@ public PhysicalPlanBuilder(final StreamsBuilder builder,
final Map<String, Object> overriddenStreamsProperties,
final boolean updateMetastore,
final MetaStore metaStore,
final SchemaRegistryClient schemaRegistryClient) {
final SchemaRegistryClient schemaRegistryClient,
final KafkaStreamsBuilder kafkaStreamsBuilder) {
this.builder = builder;
this.ksqlConfig = ksqlConfig;
this.kafkaTopicClient = kafkaTopicClient;
Expand All @@ -86,8 +88,23 @@ public PhysicalPlanBuilder(final StreamsBuilder builder,
this.metaStore = metaStore;
this.updateMetastore = updateMetastore;
this.schemaRegistryClient = schemaRegistryClient;
this.kafkaStreamsBuilder = kafkaStreamsBuilder;
}

public PhysicalPlanBuilder(final StreamsBuilder builder,
final KsqlConfig ksqlConfig,
final KafkaTopicClient kafkaTopicClient,
final MetastoreUtil metastoreUtil,
final FunctionRegistry functionRegistry,
final Map<String, Object> overriddenStreamsProperties,
final boolean updateMetastore,
final MetaStore metaStore,
final SchemaRegistryClient schemaRegistryClient) {
this(builder, ksqlConfig, kafkaTopicClient, metastoreUtil, functionRegistry, overriddenStreamsProperties,
updateMetastore, metaStore, schemaRegistryClient, new KafkaStreamsBuilderImpl());
}


public QueryMetadata buildPhysicalPlan(final Pair<String, PlanNode> statementPlanPair) throws Exception {
final SchemaKStream resultStream = statementPlanPair.getRight().buildStream(builder,
ksqlConfig,
Expand Down Expand Up @@ -135,14 +152,12 @@ private QueryMetadata buildPlanForBareQuery(final QueuedSchemaKStream schemaKStr

final String applicationId = addTimeSuffix(getBareQueryApplicationId(serviceId, transientQueryPrefix));

StreamsConfig streamsConfig = buildStreamsConfig(applicationId, ksqlConfig, overriddenStreamsProperties);
KafkaStreams streams = buildStreams(builder, streamsConfig);
KafkaStreams streams = buildStreams(builder, applicationId, ksqlConfig, overriddenStreamsProperties);

SchemaKStream sourceSchemaKstream = schemaKStream.getSourceSchemaKStreams().get(0);

return new QueuedQueryMetadata(
statement,
streamsConfig,
streams,
bareOutputNode,
schemaKStream.getExecutionPlan(""),
Expand Down Expand Up @@ -194,13 +209,12 @@ private QueryMetadata buildPlanForStructuredOutputNode(String sqlExpression, fin
final QueryId queryId = sinkDataSource.getPersistentQueryId();
final String applicationId = serviceId + persistanceQueryPrefix + queryId;

StreamsConfig streamsConfig = buildStreamsConfig(applicationId, ksqlConfig, overriddenStreamsProperties);
KafkaStreams streams = buildStreams(builder, streamsConfig);
KafkaStreams streams = buildStreams(builder, applicationId, ksqlConfig, overriddenStreamsProperties);

TopologyDescription topologyDescription = builder.build().describe();

return new PersistentQueryMetadata(statement,
streamsConfig, streams, outputNode, schemaKStream
streams, outputNode, schemaKStream
.getExecutionPlan(""), queryId,
(schemaKStream instanceof SchemaKTable) ? DataSource
.DataSourceType.KTABLE : DataSource.DataSourceType
Expand All @@ -223,9 +237,11 @@ private String addTimeSuffix(String original) {
private void updateListProperty(Map<String, Object> properties, String key, Object value) {
Object obj = properties.getOrDefault(key, new java.util.LinkedList<String>());
Copy link
Contributor

@satybald satybald Jan 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there no need to import LL with java.util full path. LinkedList is enough as it already imported.

List valueList;
// The property value is either a comma-separated string of class names, or a list of class names
if (obj instanceof String) {
// If its a string just split it on the separator so we dont have to worry about adding a separator
String asString = (String)obj;
valueList = new LinkedList<>(Arrays.asList(asString));
valueList = new LinkedList<>(Arrays.asList(asString.split("\\s*,\\s*")));
} else if (obj instanceof List) {
valueList = (List) obj;
} else {
Expand All @@ -235,8 +251,12 @@ private void updateListProperty(Map<String, Object> properties, String key, Obje
properties.put(key, valueList);
}

private StreamsConfig buildStreamsConfig(final String applicationId, final KsqlConfig ksqlConfig,
final Map<String, Object> overriddenProperties) {
private KafkaStreams buildStreams(
final StreamsBuilder builder,
final String applicationId,
final KsqlConfig ksqlConfig,
final Map<String, Object> overriddenProperties
) {
Map<String, Object> newStreamsProperties = ksqlConfig.getKsqlStreamConfigProps();
newStreamsProperties.putAll(overriddenProperties);
newStreamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
Expand All @@ -260,12 +280,7 @@ private StreamsConfig buildStreamsConfig(final String applicationId, final KsqlC
ConsumerCollector.class.getCanonicalName());
updateListProperty(newStreamsProperties, StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG),
ProducerCollector.class.getCanonicalName());
return new StreamsConfig(newStreamsProperties);
}

private KafkaStreams buildStreams(final StreamsBuilder builder, final StreamsConfig config) {
return new KafkaStreams(builder.build(), config);
return kafkaStreamsBuilder.buildKafkaStreams(builder, new StreamsConfig(newStreamsProperties));
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;

import java.util.Objects;

Expand All @@ -35,7 +34,6 @@ public class PersistentQueryMetadata extends QueryMetadata {


public PersistentQueryMetadata(final String statementString,
final StreamsConfig streamsConfig,
final KafkaStreams kafkaStreams,
final OutputNode outputNode,
final String executionPlan,
Expand All @@ -47,7 +45,7 @@ public PersistentQueryMetadata(final String statementString,
final Schema resultSchema,
final KsqlTopic resultTopic,
final String topology) {
super(statementString, streamsConfig, kafkaStreams, outputNode, executionPlan, dataSourceType,
super(statementString, kafkaStreams, outputNode, executionPlan, dataSourceType,
queryApplicationId, kafkaTopicClient, ksqlConfig, topology);
this.id = id;
this.resultSchema = resultSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.confluent.ksql.planner.plan.OutputNode;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,7 +29,6 @@ public class QueryMetadata {

private static final Logger log = LoggerFactory.getLogger(QueryMetadata.class);
private final String statementString;
private final StreamsConfig streamsConfig;
private final KafkaStreams kafkaStreams;
private final OutputNode outputNode;
private final String executionPlan;
Expand All @@ -42,7 +40,6 @@ public class QueryMetadata {


public QueryMetadata(final String statementString,
final StreamsConfig streamsConfig,
final KafkaStreams kafkaStreams,
final OutputNode outputNode,
final String executionPlan,
Expand All @@ -52,7 +49,6 @@ public QueryMetadata(final String statementString,
final KsqlConfig ksqlConfig,
String topoplogy) {
this.statementString = statementString;
this.streamsConfig = streamsConfig;
this.kafkaStreams = kafkaStreams;
this.outputNode = outputNode;
this.executionPlan = executionPlan;
Expand All @@ -67,10 +63,6 @@ public String getStatementString() {
return statementString;
}

public StreamsConfig getStreamsConfig() {
return streamsConfig;
}

public KafkaStreams getKafkaStreams() {
return kafkaStreams;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;

import java.util.Objects;
import java.util.concurrent.BlockingQueue;
Expand All @@ -33,7 +32,6 @@ public class QueuedQueryMetadata extends QueryMetadata {

public QueuedQueryMetadata(
final String statementString,
final StreamsConfig streamsConfig,
final KafkaStreams kafkaStreams,
final OutputNode outputNode,
final String executionPlan,
Expand All @@ -43,7 +41,7 @@ public QueuedQueryMetadata(
final KafkaTopicClient kafkaTopicClient,
final KsqlConfig ksqlConfig
) {
super(statementString, streamsConfig, kafkaStreams, outputNode, executionPlan, dataSourceType,
super(statementString, kafkaStreams, outputNode, executionPlan, dataSourceType,
queryApplicationId, kafkaTopicClient, ksqlConfig, "not-applicable-for-queued");
this.rowQueue = rowQueue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ private List<QueryMetadata> getQueryMetadata(QueryId queryid, DataSource.DataSou
queryStreams.start();
expectLastCall();
PersistentQueryMetadata persistentQueryMetadata = new PersistentQueryMetadata(queryid.toString(),
null,
queryStreams,
null,
"",
Expand Down
Loading