Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move functionality out of PhysicalPlanBuilder into other classes and add tests #405

Merged
merged 1 commit into from
Oct 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 21 additions & 217 deletions ksql-engine/src/main/java/io/confluent/ksql/QueryEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@
import io.confluent.ksql.ddl.commands.DropTopicCommand;
import io.confluent.ksql.ddl.commands.RegisterTopicCommand;
import io.confluent.ksql.metastore.MetastoreUtil;
import io.confluent.ksql.serde.DataSource;
import io.confluent.ksql.metastore.KsqlStream;
import io.confluent.ksql.metastore.KsqlTable;
import io.confluent.ksql.metastore.KsqlTopic;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.StructuredDataSource;
Expand All @@ -53,33 +51,21 @@
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.physical.PhysicalPlanBuilder;
import io.confluent.ksql.planner.LogicalPlanner;
import io.confluent.ksql.planner.plan.KsqlBareOutputNode;
import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode;
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.structured.QueuedSchemaKStream;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.QueuedQueryMetadata;
import io.confluent.ksql.util.timestamp.KsqlTimestampExtractor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;

public class QueryEngine {
Expand Down Expand Up @@ -182,7 +168,7 @@ private PlanNode buildQueryLogicalPlan(final Query query, final MetaStore tempMe
return logicalPlan;
}

public List<QueryMetadata> buildPhysicalPlans(
List<QueryMetadata> buildPhysicalPlans(
final boolean addUniqueTimeSuffix,
final List<Pair<String, PlanNode>> logicalPlans,
final List<Pair<String, Statement>> statementList,
Expand All @@ -206,170 +192,30 @@ public List<QueryMetadata> buildPhysicalPlans(
return physicalPlans;
}

public void buildQueryPhysicalPlan(final List<QueryMetadata> physicalPlans,
final boolean addUniqueTimeSuffix,
final Pair<String, PlanNode> statementPlanPair,
final Map<String, Object> overriddenStreamsProperties,
final boolean updateMetastore) throws Exception {
private void buildQueryPhysicalPlan(final List<QueryMetadata> physicalPlans,
final boolean addUniqueTimeSuffix,
final Pair<String, PlanNode> statementPlanPair,
final Map<String, Object> overriddenStreamsProperties,
final boolean updateMetastore) throws Exception {

PlanNode logicalPlan = statementPlanPair.getRight();
StreamsBuilder builder = new StreamsBuilder();
final StreamsBuilder builder = new StreamsBuilder();

KsqlConfig ksqlConfigClone = ksqlEngine.getKsqlConfig().clone();
final KsqlConfig ksqlConfigClone = ksqlEngine.getKsqlConfig().clone();

// Build a physical plan, in this case a Kafka Streams DSL
PhysicalPlanBuilder physicalPlanBuilder = new PhysicalPlanBuilder(builder, ksqlConfigClone,
ksqlEngine.getTopicClient(),
new MetastoreUtil(), ksqlEngine.getFunctionRegistry());
SchemaKStream schemaKStream = physicalPlanBuilder.buildPhysicalPlan(logicalPlan);

OutputNode outputNode = physicalPlanBuilder.getPlanSink();
boolean isBareQuery = outputNode instanceof KsqlBareOutputNode;

// Check to make sure the logical and physical plans match up;
// important to do this BEFORE actually starting up
// the corresponding Kafka Streams job
if (isBareQuery && !(schemaKStream instanceof QueuedSchemaKStream)) {
throw new Exception(String.format(
"Mismatch between logical and physical output; "
+ "expected a QueuedSchemaKStream based on logical "
+ "KsqlBareOutputNode, found a %s instead",
schemaKStream.getClass().getCanonicalName()
));
}
String serviceId = ksqlEngine.getKsqlConfig().get(KsqlConfig.KSQL_SERVICE_ID_CONFIG).toString();
String persistanceQueryPrefix = ksqlEngine.getKsqlConfig().get(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG).toString();
String transientQueryPrefix = ksqlEngine.getKsqlConfig().get(KsqlConfig.KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG).toString();

if (isBareQuery) {

physicalPlans.add(buildPlanForBareQuery(addUniqueTimeSuffix, statementPlanPair, overriddenStreamsProperties,
builder, ksqlConfigClone, (QueuedSchemaKStream) schemaKStream, (KsqlBareOutputNode) outputNode,
serviceId, transientQueryPrefix));

} else if (outputNode instanceof KsqlStructuredDataOutputNode) {

physicalPlans.add(buildPlanForStructuredOutputNode(addUniqueTimeSuffix, statementPlanPair,
overriddenStreamsProperties, updateMetastore, builder, ksqlConfigClone, schemaKStream,
(KsqlStructuredDataOutputNode) outputNode, serviceId, persistanceQueryPrefix));

} else {
throw new KsqlException("Sink data source is not correct.");
}

log.info("Build physical plan for {}.", statementPlanPair.getLeft());
log.info(" Execution plan: \n");
log.info(schemaKStream.getExecutionPlan(""));
}

/**
*
* @param addUniqueTimeSuffix
* @param statementPlanPair
* @param overriddenStreamsProperties
* @param builder
* @param ksqlConfigClone
* @param bareOutputNode
* @param serviceId
* @param transientQueryPrefix
*/
private QueryMetadata buildPlanForBareQuery(boolean addUniqueTimeSuffix,
Pair<String, PlanNode> statementPlanPair, Map<String, Object> overriddenStreamsProperties,
StreamsBuilder builder, KsqlConfig ksqlConfigClone, QueuedSchemaKStream schemaKStream,
KsqlBareOutputNode bareOutputNode, String serviceId, String transientQueryPrefix) {

String applicationId = getBareQueryApplicationId(serviceId, transientQueryPrefix);
if (addUniqueTimeSuffix) {
applicationId = addTimeSuffix(applicationId);
}

KafkaStreams streams = buildStreams(builder, applicationId, ksqlConfigClone, overriddenStreamsProperties);

SchemaKStream sourceSchemaKstream = schemaKStream.getSourceSchemaKStreams().get(0);

return new QueuedQueryMetadata(
statementPlanPair.getLeft(),
streams,
bareOutputNode,
schemaKStream.getExecutionPlan(""),
schemaKStream.getQueue(),
(sourceSchemaKstream instanceof SchemaKTable) ?
DataSource.DataSourceType.KTABLE : DataSource.DataSourceType.KSTREAM,
applicationId,
// Build a physical plan, in this case a Kafka Streams DSL
final PhysicalPlanBuilder physicalPlanBuilder = new PhysicalPlanBuilder(builder,
ksqlConfigClone,
ksqlEngine.getTopicClient(),
ksqlConfigClone
);
}

/**
*
* @param addUniqueTimeSuffix
* @param statementPlanPair
* @param overriddenStreamsProperties
* @param updateMetastore
* @param builder
* @param ksqlConfigClone
* @param schemaKStream
* @param serviceId
* @param persistanceQueryPrefix
*/
private QueryMetadata buildPlanForStructuredOutputNode(boolean addUniqueTimeSuffix,
Pair<String, PlanNode> statementPlanPair,
Map<String, Object> overriddenStreamsProperties,
boolean updateMetastore,
StreamsBuilder builder,
KsqlConfig ksqlConfigClone,
SchemaKStream schemaKStream,
KsqlStructuredDataOutputNode outputNode,
String serviceId,
String persistanceQueryPrefix) {

long queryId = getNextQueryId();

String applicationId = serviceId + persistanceQueryPrefix + queryId;
if (addUniqueTimeSuffix) {
applicationId = addTimeSuffix(applicationId);
}

MetaStore metaStore = ksqlEngine.getMetaStore();
if (metaStore.getTopic(outputNode.getKafkaTopicName()) == null) {
metaStore.putTopic(outputNode.getKsqlTopic());
}
StructuredDataSource sinkDataSource;
if (schemaKStream instanceof SchemaKTable) {
SchemaKTable schemaKTable = (SchemaKTable) schemaKStream;
sinkDataSource =
new KsqlTable(outputNode.getId().toString(),
outputNode.getSchema(),
schemaKStream.getKeyField(),
outputNode.getTimestampField(),
outputNode.getKsqlTopic(),
outputNode.getId().toString() +
ksqlEngine.getKsqlConfig().get(KsqlConfig.KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG),
schemaKTable.isWindowed());
} else {
sinkDataSource =
new KsqlStream(outputNode.getId().toString(),
outputNode.getSchema(),
schemaKStream.getKeyField(),
outputNode.getTimestampField(),
outputNode.getKsqlTopic());
}

if (updateMetastore) {
metaStore.putSource(sinkDataSource.cloneWithTimeKeyColumns());
}
KafkaStreams streams = buildStreams(builder, applicationId, ksqlConfigClone, overriddenStreamsProperties);

return new PersistentQueryMetadata(statementPlanPair.getLeft(),
streams, outputNode, schemaKStream
.getExecutionPlan(""), queryId,
(schemaKStream instanceof SchemaKTable) ? DataSource
.DataSourceType.KTABLE : DataSource.DataSourceType
.KSTREAM,
applicationId,
ksqlEngine.getTopicClient(),
ksqlConfigClone);
new MetastoreUtil(),
ksqlEngine.getFunctionRegistry(),
addUniqueTimeSuffix,
overriddenStreamsProperties,
updateMetastore,
ksqlEngine.getMetaStore(),
queryIdCounter.getAndIncrement());

physicalPlans.add(physicalPlanBuilder.buildPhysicalPlan(statementPlanPair));
}


Expand Down Expand Up @@ -408,7 +254,7 @@ private DDLCommand generateDDLCommand(
}
}

public StructuredDataSource getResultDatasource(final Select select, final String name) {
StructuredDataSource getResultDatasource(final Select select, final String name) {

SchemaBuilder dataSource = SchemaBuilder.struct().name(name);
for (SelectItem selectItem : select.getSelectItems()) {
Expand All @@ -423,48 +269,6 @@ public StructuredDataSource getResultDatasource(final Select select, final Strin
return new KsqlStream(name, dataSource.schema(), null, null, ksqlTopic);
}

private KafkaStreams buildStreams(
final StreamsBuilder builder,
final String applicationId,
final KsqlConfig ksqlConfig,
final Map<String, Object> overriddenProperties
) {
Map<String, Object> newStreamsProperties = ksqlConfig.getKsqlStreamConfigProps();
newStreamsProperties.putAll(overriddenProperties);
newStreamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
newStreamsProperties.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
ksqlConfig.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
newStreamsProperties.put(
StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
ksqlConfig.get(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
newStreamsProperties.put(
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
ksqlConfig.get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG));
if (ksqlConfig.get(KsqlConfig.KSQL_TIMESTAMP_COLUMN_INDEX) != null) {
newStreamsProperties.put(
KsqlConfig.KSQL_TIMESTAMP_COLUMN_INDEX,
ksqlConfig.get(KsqlConfig.KSQL_TIMESTAMP_COLUMN_INDEX));
newStreamsProperties.put(
StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, KsqlTimestampExtractor.class);
}
return new KafkaStreams(builder.build(), new StreamsConfig(newStreamsProperties));
}

private long getNextQueryId() {
return queryIdCounter.getAndIncrement();
}

// TODO: This should probably be changed
private String getBareQueryApplicationId(String serviceId, String transientQueryPrefix) {
return serviceId + transientQueryPrefix +
Math.abs(ThreadLocalRandom.current().nextLong());
}

private String addTimeSuffix(String original) {
return String.format("%s_%d", original, System.currentTimeMillis());
}

private void enforceAggregateRules(Query query, AggregateAnalysis aggregateAnalysis) {
if (!((QuerySpecification) query.getQueryBody()).getGroupBy().isPresent()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import io.confluent.ksql.GenericRow;

class AddTimestampColumnValueTransformerSupplier implements ValueTransformerSupplier<GenericRow, GenericRow> {
public class AddTimestampColumn implements ValueTransformerSupplier<GenericRow, GenericRow> {
@Override
public ValueTransformer<GenericRow, GenericRow> get() {
return new ValueTransformer<GenericRow, GenericRow>() {
Expand Down
Loading