From 8b52aa8764d4e90c8024ab0132ecba959757f3d7 Mon Sep 17 00:00:00 2001 From: Tim Fox Date: Fri, 25 Oct 2019 15:00:10 -0700 Subject: [PATCH] feat: Implement EXPLODE(ARRAY) for single table function in SELECT (#3589) This is the first PR in a series to implement table functions. This PR implements the EXPLODE function. --- .../java/io/confluent/ksql/cli/CliTest.java | 2 +- .../function/AggregateFunctionFactory.java | 10 +- .../ksql/function/FunctionRegistry.java | 12 + .../ksql/function/FunctionSignature.java | 4 +- .../ksql/function/KsqlAggregateFunction.java | 5 - .../ksql/function/KsqlTableFunction.java | 31 +++ .../function/MutableFunctionRegistry.java | 2 + .../ksql/function/TableFunctionFactory.java | 59 +++++ .../io/confluent/ksql/name/ColumnName.java | 20 +- .../AggregateExpressionRewriter.java | 4 +- .../io/confluent/ksql/analyzer/Analysis.java | 11 + .../io/confluent/ksql/analyzer/Analyzer.java | 37 +++ .../ksql/analyzer/QueryAnalyzer.java | 5 +- .../io/confluent/ksql/engine/QueryEngine.java | 41 +-- .../ksql/function/BaseTableFunction.java | 74 ++++++ .../function/InternalFunctionRegistry.java | 239 ++++++++++++------ .../io/confluent/ksql/function/UdfLoader.java | 6 +- .../function/udaf/window/WindowEndKudaf.java | 3 +- .../udaf/window/WindowStartKudaf.java | 3 +- .../udtf/array/ExplodeFunctionFactory.java | 61 +++++ .../udtf/array/ExplodeTableFunction.java | 39 +++ .../ksql/planner/LogicalPlanner.java | 39 ++- .../planner/PlanSourceExtractorVisitor.java | 9 +- .../ksql/planner/plan/AggregateNode.java | 7 +- .../ksql/planner/plan/DataSourceNode.java | 15 +- .../ksql/planner/plan/FilterNode.java | 8 + .../ksql/planner/plan/FlatMapNode.java | 196 ++++++++++++++ .../confluent/ksql/planner/plan/JoinNode.java | 9 + .../ksql/planner/plan/OutputNode.java | 7 + .../confluent/ksql/planner/plan/PlanNode.java | 3 + .../ksql/planner/plan/PlanVisitor.java | 4 + .../ksql/planner/plan/ProjectNode.java | 11 +- .../ksql/structured/SchemaKStream.java | 45 +++- .../analyzer/QueryAnalyzerFunctionalTest.java | 22 +- .../InternalFunctionRegistryTest.java | 204 +++++++++------ .../tf/ExplodeArrayTableFunctionTest.java | 77 ++++++ .../SelectValueMapperIntegrationTest.java | 2 +- .../ksql/planner/plan/DataSourceNodeTest.java | 10 +- .../ksql/planner/plan/JoinNodeTest.java | 24 ++ .../ksql/planner/plan/ProjectNodeTest.java | 22 +- .../ksql/structured/SchemaKStreamTest.java | 20 +- .../ksql/structured/SchemaKTableTest.java | 10 +- .../ksql/testutils/AnalysisTestUtil.java | 4 +- .../ksql/execution/function/UdafUtil.java | 1 + .../ksql/execution/function/UdtfUtil.java | 47 ++++ .../function/udtf/KudtfFlatMapper.java | 48 ++++ .../function/udtf/TableFunctionApplier.java | 41 +++ .../ksql/execution/plan/PlanBuilder.java | 2 + .../ksql/execution/plan/StreamFlatMap.java | 63 +++++ .../execution/util/ExpressionTypeManager.java | 13 + .../query-validation-tests/explode.json | 79 ++++++ .../table-functions.json | 102 ++++++++ .../ksql/metastore/MetaStoreImpl.java | 12 + .../confluent/ksql/util/MetaStoreFixture.java | 29 +++ .../io/confluent/ksql/parser/AstBuilder.java | 3 +- .../streams/ExecutionStepFactory.java | 17 ++ .../ksql/execution/streams/KSPlanBuilder.java | 7 + .../streams/StreamFlatMapBuilder.java | 34 +++ 58 files changed, 1662 insertions(+), 252 deletions(-) create mode 100644 ksql-common/src/main/java/io/confluent/ksql/function/KsqlTableFunction.java create mode 100644 ksql-common/src/main/java/io/confluent/ksql/function/TableFunctionFactory.java rename ksql-engine/src/main/java/io/confluent/ksql/{util => analyzer}/AggregateExpressionRewriter.java (95%) create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/function/BaseTableFunction.java create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/function/udtf/array/ExplodeFunctionFactory.java create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/function/udtf/array/ExplodeTableFunction.java create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FlatMapNode.java create mode 100644 ksql-engine/src/test/java/io/confluent/ksql/function/tf/ExplodeArrayTableFunctionTest.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/function/UdtfUtil.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/function/udtf/KudtfFlatMapper.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/function/udtf/TableFunctionApplier.java create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamFlatMap.java create mode 100644 ksql-functional-tests/src/test/resources/query-validation-tests/explode.json create mode 100644 ksql-functional-tests/src/test/resources/query-validation-tests/table-functions.json create mode 100644 ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamFlatMapBuilder.java diff --git a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java index f619b1738ffa..4f6283130aa7 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java @@ -853,7 +853,7 @@ public void shouldDescribeOverloadedScalarFunction() { public void shouldDescribeAggregateFunction() { final String expectedSummary = "Name : TOPK\n" + - "Author : confluent\n" + + "Author : Confluent\n" + "Type : aggregate\n" + "Jar : internal\n" + "Variations : \n"; diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/AggregateFunctionFactory.java b/ksql-common/src/main/java/io/confluent/ksql/function/AggregateFunctionFactory.java index 5bea37bbdfd1..c6b9bffaab23 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/function/AggregateFunctionFactory.java +++ b/ksql-common/src/main/java/io/confluent/ksql/function/AggregateFunctionFactory.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableList; import io.confluent.ksql.function.udf.UdfMetadata; import io.confluent.ksql.util.DecimalUtil; +import io.confluent.ksql.util.KsqlConstants; import java.util.List; import java.util.Objects; import java.util.function.Consumer; @@ -38,7 +39,14 @@ public abstract class AggregateFunctionFactory { .build(); public AggregateFunctionFactory(final String functionName) { - this(new UdfMetadata(functionName, "", "confluent", "", KsqlFunction.INTERNAL_PATH, false)); + this(new UdfMetadata( + functionName, + "", + KsqlConstants.CONFLUENT_AUTHOR, + "", + KsqlFunction.INTERNAL_PATH, + false + )); } public AggregateFunctionFactory(final UdfMetadata metadata) { diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/FunctionRegistry.java b/ksql-common/src/main/java/io/confluent/ksql/function/FunctionRegistry.java index 09b6126614de..cf987f1f0160 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/function/FunctionRegistry.java +++ b/ksql-common/src/main/java/io/confluent/ksql/function/FunctionRegistry.java @@ -33,6 +33,16 @@ public interface FunctionRegistry { */ boolean isAggregate(String functionName); + /** + * Test if the supplied {@code functionName} is a table function. + * + *

Note: unknown functions result in {@code false} return value. + * + * @param functionName the name of the function to test + * @return {@code true} if it is a table function, {@code false} otherwise. + */ + boolean isTableFunction(String functionName); + /** * Get the factory for a UDF. * @@ -72,6 +82,8 @@ public interface FunctionRegistry { KsqlAggregateFunction getAggregateFunction(String functionName, Schema argumentType, AggregateFunctionInitArguments initArgs); + KsqlTableFunction getTableFunction(String functionName, Schema argumentType); + /** * @return all UDF factories. */ diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/FunctionSignature.java b/ksql-common/src/main/java/io/confluent/ksql/function/FunctionSignature.java index 8e9565999416..d24f06775d8b 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/function/FunctionSignature.java +++ b/ksql-common/src/main/java/io/confluent/ksql/function/FunctionSignature.java @@ -38,7 +38,9 @@ public interface FunctionSignature { * @return whether or not to consider the final argument in * {@link #getArguments()} as variadic */ - boolean isVariadic(); + default boolean isVariadic() { + return false; + } } diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/KsqlAggregateFunction.java b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlAggregateFunction.java index 2999e70c0652..fdaf98c55a0f 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/function/KsqlAggregateFunction.java +++ b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlAggregateFunction.java @@ -51,9 +51,4 @@ public interface KsqlAggregateFunction extends FunctionSignature { Function getResultMapper(); String getDescription(); - - @Override - default boolean isVariadic() { - return false; - } } diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/KsqlTableFunction.java b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlTableFunction.java new file mode 100644 index 000000000000..10c8d2c4cdb5 --- /dev/null +++ b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlTableFunction.java @@ -0,0 +1,31 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function; + +import io.confluent.ksql.schema.ksql.types.SqlType; +import java.util.List; +import org.apache.kafka.connect.data.Schema; + +public interface KsqlTableFunction extends FunctionSignature { + + Schema getReturnType(); + + SqlType returnType(); + + List flatMap(I input); + + String getDescription(); +} diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/MutableFunctionRegistry.java b/ksql-common/src/main/java/io/confluent/ksql/function/MutableFunctionRegistry.java index 3c39d12dfb15..944cc97a0378 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/function/MutableFunctionRegistry.java +++ b/ksql-common/src/main/java/io/confluent/ksql/function/MutableFunctionRegistry.java @@ -51,4 +51,6 @@ public interface MutableFunctionRegistry extends FunctionRegistry { * @throws KsqlException if a function, (of any type), with the same name exists. */ void addAggregateFunctionFactory(AggregateFunctionFactory aggregateFunctionFactory); + + void addTableFunctionFactory(TableFunctionFactory tableFunctionFactory); } diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/TableFunctionFactory.java b/ksql-common/src/main/java/io/confluent/ksql/function/TableFunctionFactory.java new file mode 100644 index 000000000000..825b7bf5b95b --- /dev/null +++ b/ksql-common/src/main/java/io/confluent/ksql/function/TableFunctionFactory.java @@ -0,0 +1,59 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function; + +import io.confluent.ksql.function.udf.UdfMetadata; +import java.util.List; +import java.util.Objects; +import org.apache.kafka.connect.data.Schema; + + +public abstract class TableFunctionFactory { + + private final UdfMetadata metadata; + + public TableFunctionFactory(final UdfMetadata metadata) { + this.metadata = Objects.requireNonNull(metadata, "metadata can't be null"); + } + + public abstract KsqlTableFunction createTableFunction(List argTypeList); + + protected abstract List> supportedArgs(); + + public String getName() { + return metadata.getName(); + } + + public String getDescription() { + return metadata.getDescription(); + } + + public String getPath() { + return metadata.getPath(); + } + + public String getAuthor() { + return metadata.getAuthor(); + } + + public String getVersion() { + return metadata.getVersion(); + } + + public boolean isInternal() { + return metadata.isInternal(); + } +} diff --git a/ksql-common/src/main/java/io/confluent/ksql/name/ColumnName.java b/ksql-common/src/main/java/io/confluent/ksql/name/ColumnName.java index 264b0fbab4d1..606b9a97f396 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/name/ColumnName.java +++ b/ksql-common/src/main/java/io/confluent/ksql/name/ColumnName.java @@ -24,11 +24,29 @@ public final class ColumnName extends Name { private static final String AGGREGATE_COLUMN_PREFIX = "KSQL_AGG_VARIABLE_"; + private static final String GENERATED_ALIAS_PREFIX = "KSQL_COL_"; + private static final String SYNTHESISED_COLUMN_PREFIX = "KSQL_SYNTH_"; - public static ColumnName aggregate(final int idx) { + public static ColumnName aggregateColumn(final int idx) { return of(AGGREGATE_COLUMN_PREFIX + idx); } + /** + * Where the user hasn't specified an alias for an expression in a SELECT we generate them + * using this method. This value is exposed to the user in the output schema + */ + public static ColumnName generatedColumnAlias(final int idx) { + return ColumnName.of(GENERATED_ALIAS_PREFIX + idx); + } + + /** + * Used to generate a column name in an intermediate schema, e.g. for a column to hold + * values of a table function. These are never exposed to the user + */ + public static ColumnName synthesisedSchemaColumn(final int idx) { + return ColumnName.of(SYNTHESISED_COLUMN_PREFIX + idx); + } + public static ColumnName of(final String name) { return new ColumnName(name); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/AggregateExpressionRewriter.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/AggregateExpressionRewriter.java similarity index 95% rename from ksql-engine/src/main/java/io/confluent/ksql/util/AggregateExpressionRewriter.java rename to ksql-engine/src/main/java/io/confluent/ksql/analyzer/AggregateExpressionRewriter.java index a401e3635215..0a05b26a66ce 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/AggregateExpressionRewriter.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/AggregateExpressionRewriter.java @@ -13,7 +13,7 @@ * specific language governing permissions and limitations under the License. */ -package io.confluent.ksql.util; +package io.confluent.ksql.analyzer; import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter; import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter.Context; @@ -45,7 +45,7 @@ public Optional visitFunctionCall( final ExpressionTreeRewriter.Context context) { final String functionName = node.getName().name(); if (functionRegistry.isAggregate(functionName)) { - final ColumnName aggVarName = ColumnName.aggregate(aggVariableIndex); + final ColumnName aggVarName = ColumnName.aggregateColumn(aggVariableIndex); aggVariableIndex++; return Optional.of( new ColumnReferenceExp(node.getLocation(), ColumnRef.withoutSource(aggVarName))); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java index dcdfcc962abb..0625c12ec7f5 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java @@ -23,6 +23,7 @@ import io.confluent.ksql.execution.ddl.commands.KsqlTopic; import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.expression.tree.FunctionCall; import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.metastore.model.KsqlStream; @@ -45,6 +46,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; import java.util.Set; @@ -66,6 +68,7 @@ public class Analysis { private Optional havingExpression = Optional.empty(); private OptionalInt limitClause = OptionalInt.empty(); private CreateSourceAsProperties withProperties = CreateSourceAsProperties.none(); + private final List tableFunctions = new ArrayList<>(); public Analysis(final ResultMaterialization resultMaterialization) { this.resultMaterialization = requireNonNull(resultMaterialization, "resultMaterialization"); @@ -206,6 +209,14 @@ public CreateSourceAsProperties getProperties() { return withProperties; } + void addTableFunction(final FunctionCall functionCall) { + this.tableFunctions.add(Objects.requireNonNull(functionCall)); + } + + public List getTableFunctions() { + return tableFunctions; + } + @Immutable public static final class Into { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java index 746f205c74f0..be6694642fcc 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java @@ -26,6 +26,7 @@ import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.execution.expression.tree.ComparisonExpression; import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.expression.tree.FunctionCall; import io.confluent.ksql.execution.expression.tree.TraversalExpressionVisitor; import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.execution.windows.KsqlWindowExpression; @@ -516,6 +517,7 @@ protected AstNode visitSelect(final Select node, final Void context) { } else if (selectItem instanceof SingleColumn) { final SingleColumn column = (SingleColumn) selectItem; addSelectItem(column.getExpression(), column.getAlias()); + visitTableFunctions(column.getExpression()); } else { throw new IllegalArgumentException( "Unsupported SelectItem type: " + selectItem.getClass().getName()); @@ -626,6 +628,41 @@ public Void visitColumnReference( analysis.addSelectItem(exp, columnName); analysis.addSelectColumnRefs(columnRefs); } + + private void visitTableFunctions(final Expression expression) { + final TableFunctionVisitor visitor = new TableFunctionVisitor(); + visitor.process(expression, null); + } + + private final class TableFunctionVisitor extends TraversalExpressionVisitor { + + private Optional tableFunctionName = Optional.empty(); + + @Override + public Void visitFunctionCall(final FunctionCall functionCall, final Void context) { + final String functionName = functionCall.getName().name(); + final boolean isTableFunction = metaStore.isTableFunction(functionName); + + if (isTableFunction) { + if (tableFunctionName.isPresent()) { + throw new KsqlException("Table functions cannot be nested: " + + tableFunctionName.get() + "(" + functionName + "())"); + } + + tableFunctionName = Optional.of(functionName); + + analysis.addTableFunction(functionCall); + } + + super.visitFunctionCall(functionCall, context); + + if (isTableFunction) { + tableFunctionName = Optional.empty(); + } + + return null; + } + } } @FunctionalInterface diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java index 62cc1470c45d..8753f4e43389 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java @@ -31,7 +31,6 @@ import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.Sink; import io.confluent.ksql.serde.SerdeOption; -import io.confluent.ksql.util.AggregateExpressionRewriter; import io.confluent.ksql.util.KsqlException; import java.util.List; import java.util.Map; @@ -175,6 +174,10 @@ private static void enforceAggregateRules( return; } + if (!analysis.getTableFunctions().isEmpty()) { + throw new KsqlException("Table functions cannot be used with aggregations."); + } + if (aggregateAnalysis.getAggregateFunctions().isEmpty()) { throw new KsqlException( "GROUP BY requires columns using aggregate functions in SELECT clause."); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/QueryEngine.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/QueryEngine.java index 1cb4c63afd7a..449fd6617d38 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/QueryEngine.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/QueryEngine.java @@ -59,10 +59,30 @@ class QueryEngine { this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext"); this.processingLogContext = Objects.requireNonNull( processingLogContext, - "processingLogContext"); + "processingLogContext" + ); this.queryIdGenerator = Objects.requireNonNull(queryIdGenerator, "queryIdGenerator"); } + static OutputNode buildQueryLogicalPlan( + final Query query, + final Optional sink, + final MetaStore metaStore, + final KsqlConfig config + ) { + final String outputPrefix = config.getString(KsqlConfig.KSQL_OUTPUT_TOPIC_NAME_PREFIX_CONFIG); + + final Set defaultSerdeOptions = SerdeOptions.buildDefaults(config); + + final QueryAnalyzer queryAnalyzer = + new QueryAnalyzer(metaStore, outputPrefix, defaultSerdeOptions); + + final Analysis analysis = queryAnalyzer.analyze(query, sink); + final AggregateAnalysisResult aggAnalysis = queryAnalyzer.analyzeAggregate(query, analysis); + + return new LogicalPlanner(config, analysis, aggAnalysis, metaStore).buildPlan(); + } + PhysicalPlan buildPhysicalPlan( final LogicalPlanNode logicalPlanNode, final KsqlConfig ksqlConfig, @@ -84,23 +104,4 @@ PhysicalPlan buildPhysicalPlan( return physicalPlanBuilder.buildPhysicalPlan(logicalPlanNode); } - - static OutputNode buildQueryLogicalPlan( - final Query query, - final Optional sink, - final MetaStore metaStore, - final KsqlConfig config - ) { - final String outputPrefix = config.getString(KsqlConfig.KSQL_OUTPUT_TOPIC_NAME_PREFIX_CONFIG); - - final Set defaultSerdeOptions = SerdeOptions.buildDefaults(config); - - final QueryAnalyzer queryAnalyzer = - new QueryAnalyzer(metaStore, outputPrefix, defaultSerdeOptions); - - final Analysis analysis = queryAnalyzer.analyze(query, sink); - final AggregateAnalysisResult aggAnalysis = queryAnalyzer.analyzeAggregate(query, analysis); - - return new LogicalPlanner(config, analysis, aggAnalysis, metaStore).buildPlan(); - } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/BaseTableFunction.java b/ksql-engine/src/main/java/io/confluent/ksql/function/BaseTableFunction.java new file mode 100644 index 000000000000..c4e3194446f7 --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/BaseTableFunction.java @@ -0,0 +1,74 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function; + +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.name.FunctionName; +import io.confluent.ksql.schema.ksql.SchemaConverters; +import io.confluent.ksql.schema.ksql.SchemaConverters.ConnectToSqlTypeConverter; +import io.confluent.ksql.schema.ksql.types.SqlType; +import java.util.List; +import java.util.Objects; +import org.apache.kafka.connect.data.Schema; + +@Immutable +public abstract class BaseTableFunction implements KsqlTableFunction { + + private static final ConnectToSqlTypeConverter CONNECT_TO_SQL_CONVERTER + = SchemaConverters.connectToSqlConverter(); + + private final Schema outputSchema; + private final SqlType outputType; + private final List arguments; + + protected final FunctionName functionName; + private final String description; + + public BaseTableFunction( + final FunctionName functionName, + final Schema outputType, + final List arguments, + final String description + ) { + this.outputSchema = Objects.requireNonNull(outputType, "outputType"); + this.outputType = CONNECT_TO_SQL_CONVERTER.toSqlType(outputType); + this.arguments = Objects.requireNonNull(arguments, "arguments"); + this.functionName = Objects.requireNonNull(functionName, "functionName"); + this.description = Objects.requireNonNull(description, "description"); + } + + public FunctionName getFunctionName() { + return functionName; + } + + public Schema getReturnType() { + return outputSchema; + } + + @Override + public SqlType returnType() { + return outputType; + } + + public List getArguments() { + return arguments; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java b/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java index 1fedc00cbee3..262764755ca8 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java @@ -34,14 +34,16 @@ import io.confluent.ksql.function.udf.string.LenKudf; import io.confluent.ksql.function.udf.string.TrimKudf; import io.confluent.ksql.function.udf.string.UCaseKudf; +import io.confluent.ksql.function.udtf.array.ExplodeFunctionFactory; import io.confluent.ksql.name.FunctionName; +import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; import javax.annotation.concurrent.ThreadSafe; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -49,16 +51,16 @@ @ThreadSafe public class InternalFunctionRegistry implements MutableFunctionRegistry { - private final Object lock = new Object(); - private final Map udfs = new ConcurrentHashMap<>(); - private final Map udafs = new ConcurrentHashMap<>(); + private final Map udfs = new HashMap<>(); + private final Map udafs = new HashMap<>(); + private final Map udtfs = new HashMap<>(); private final FunctionNameValidator functionNameValidator = new FunctionNameValidator(); public InternalFunctionRegistry() { new BuiltInInitializer(this).init(); } - public UdfFactory getUdfFactory(final String functionName) { + public synchronized UdfFactory getUdfFactory(final String functionName) { final UdfFactory udfFactory = udfs.get(functionName.toUpperCase()); if (udfFactory == null) { throw new KsqlException("Can't find any functions with the name '" + functionName + "'"); @@ -67,7 +69,7 @@ public UdfFactory getUdfFactory(final String functionName) { } @Override - public void addFunction(final KsqlFunction ksqlFunction) { + public synchronized void addFunction(final KsqlFunction ksqlFunction) { final UdfFactory udfFactory = udfs.get(ksqlFunction.getFunctionName().name().toUpperCase()); if (udfFactory == null) { throw new KsqlException("Unknown function factory: " + ksqlFunction.getFunctionName()); @@ -76,34 +78,39 @@ public void addFunction(final KsqlFunction ksqlFunction) { } @Override - public UdfFactory ensureFunctionFactory(final UdfFactory factory) { + public synchronized UdfFactory ensureFunctionFactory(final UdfFactory factory) { validateFunctionName(factory.getName()); - synchronized (lock) { - final String functionName = factory.getName().toUpperCase(); - if (udafs.containsKey(functionName)) { - throw new KsqlException("UdfFactory already registered as aggregate: " + functionName); - } - - final UdfFactory existing = udfs.putIfAbsent(functionName, factory); - if (existing != null && !existing.matches(factory)) { - throw new KsqlException("UdfFactory not compatible with existing factory." - + " function: " + functionName - + " existing: " + existing - + ", factory: " + factory); - } - - return existing == null ? factory : existing; + final String functionName = factory.getName().toUpperCase(); + if (udafs.containsKey(functionName)) { + throw new KsqlException("UdfFactory already registered as aggregate: " + functionName); } + if (udtfs.containsKey(functionName)) { + throw new KsqlException("UdfFactory already registered as table function: " + functionName); + } + + final UdfFactory existing = udfs.putIfAbsent(functionName, factory); + if (existing != null && !existing.matches(factory)) { + throw new KsqlException("UdfFactory not compatible with existing factory." + + " function: " + functionName + + " existing: " + existing + + ", factory: " + factory); + } + + return existing == null ? factory : existing; } @Override - public boolean isAggregate(final String functionName) { + public synchronized boolean isAggregate(final String functionName) { return udafs.containsKey(functionName.toUpperCase()); } + public synchronized boolean isTableFunction(final String functionName) { + return udtfs.containsKey(functionName.toUpperCase()); + } + @Override - public KsqlAggregateFunction getAggregateFunction( + public synchronized KsqlAggregateFunction getAggregateFunction( final String functionName, final Schema argumentType, final AggregateFunctionInitArguments initArgs @@ -112,34 +119,76 @@ public KsqlAggregateFunction getAggregateFunction( if (udafFactory == null) { throw new KsqlException("No aggregate function with name " + functionName + " exists!"); } - return udafFactory.createAggregateFunction(Collections.singletonList(argumentType), - initArgs); + return udafFactory.createAggregateFunction( + Collections.singletonList(argumentType), + initArgs + ); } @Override - public void addAggregateFunctionFactory(final AggregateFunctionFactory aggregateFunctionFactory) { + public synchronized KsqlTableFunction getTableFunction( + final String functionName, + final Schema argumentType + ) { + final TableFunctionFactory udtfFactory = udtfs.get(functionName.toUpperCase()); + if (udtfFactory == null) { + throw new KsqlException("No table function with name " + functionName + " exists!"); + } + + return udtfFactory.createTableFunction(Collections.singletonList(argumentType)); + } + + @Override + public synchronized void addAggregateFunctionFactory( + final AggregateFunctionFactory aggregateFunctionFactory) { final String functionName = aggregateFunctionFactory.getName().toUpperCase(); validateFunctionName(functionName); - synchronized (lock) { - if (udfs.containsKey(functionName)) { - throw new KsqlException( - "Aggregate function already registered as non-aggregate: " + functionName); - } + if (udfs.containsKey(functionName)) { + throw new KsqlException( + "Aggregate function already registered as non-aggregate: " + functionName); + } - if (udafs.putIfAbsent(functionName, aggregateFunctionFactory) != null) { - throw new KsqlException("Aggregate function already registered: " + functionName); - } + if (udtfs.containsKey(functionName)) { + throw new KsqlException( + "Aggregate function already registered as table function: " + functionName); } + + if (udafs.putIfAbsent(functionName, aggregateFunctionFactory) != null) { + throw new KsqlException("Aggregate function already registered: " + functionName); + } + + } + + @Override + public synchronized void addTableFunctionFactory( + final TableFunctionFactory tableFunctionFactory) { + final String functionName = tableFunctionFactory.getName().toUpperCase(); + validateFunctionName(functionName); + + if (udfs.containsKey(functionName)) { + throw new KsqlException( + "Table function already registered as non-aggregate: " + functionName); + } + + if (udafs.containsKey(functionName)) { + throw new KsqlException( + "Table function already registered as aggregate: " + functionName); + } + + if (udtfs.putIfAbsent(functionName, tableFunctionFactory) != null) { + throw new KsqlException("Table function already registered: " + functionName); + } + } @Override - public List listFunctions() { + public synchronized List listFunctions() { return new ArrayList<>(udfs.values()); } @Override - public AggregateFunctionFactory getAggregateFactory(final String functionName) { + public synchronized AggregateFunctionFactory getAggregateFactory(final String functionName) { final AggregateFunctionFactory udafFactory = udafs.get(functionName.toUpperCase()); if (udafFactory == null) { throw new KsqlException( @@ -150,7 +199,7 @@ public AggregateFunctionFactory getAggregateFactory(final String functionName) { } @Override - public List listAggregateFunctions() { + public synchronized List listAggregateFunctions() { return new ArrayList<>(udafs.values()); } @@ -174,12 +223,29 @@ private BuiltInInitializer( this.functionRegistry = Objects.requireNonNull(functionRegistry, "functionRegistry"); } + private static UdfFactory builtInUdfFactory( + final KsqlFunction ksqlFunction, + final boolean internal + ) { + final UdfMetadata metadata = new UdfMetadata( + ksqlFunction.getFunctionName().name(), + ksqlFunction.getDescription(), + KsqlConstants.CONFLUENT_AUTHOR, + "", + KsqlFunction.INTERNAL_PATH, + internal + ); + + return new UdfFactory(ksqlFunction.getKudfClass(), metadata); + } + private void init() { addStringFunctions(); addMathFunctions(); addJsonFunctions(); addStructFieldFetcher(); addUdafFunctions(); + addUdtfFunctions(); } private void addStringFunctions() { @@ -187,34 +253,42 @@ private void addStringFunctions() { addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( Schema.OPTIONAL_STRING_SCHEMA, Collections.singletonList(Schema.OPTIONAL_STRING_SCHEMA), - FunctionName.of("LCASE"), LCaseKudf.class)); + FunctionName.of("LCASE"), LCaseKudf.class + )); addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( Schema.OPTIONAL_STRING_SCHEMA, Collections.singletonList(Schema.OPTIONAL_STRING_SCHEMA), - FunctionName.of("UCASE"), UCaseKudf.class)); + FunctionName.of("UCASE"), UCaseKudf.class + )); addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( Schema.OPTIONAL_STRING_SCHEMA, ImmutableList.of(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA), - FunctionName.of(ConcatKudf.NAME), ConcatKudf.class)); + FunctionName.of(ConcatKudf.NAME), ConcatKudf.class + )); addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( Schema.OPTIONAL_STRING_SCHEMA, Collections.singletonList(Schema.OPTIONAL_STRING_SCHEMA), - FunctionName.of("TRIM"), TrimKudf.class)); + FunctionName.of("TRIM"), TrimKudf.class + )); addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( Schema.OPTIONAL_STRING_SCHEMA, - ImmutableList.of(Schema.OPTIONAL_STRING_SCHEMA, - Schema.OPTIONAL_STRING_SCHEMA), - FunctionName.of("IFNULL"), IfNullKudf.class)); + ImmutableList.of( + Schema.OPTIONAL_STRING_SCHEMA, + Schema.OPTIONAL_STRING_SCHEMA + ), + FunctionName.of("IFNULL"), IfNullKudf.class + )); addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( Schema.OPTIONAL_INT32_SCHEMA, Collections.singletonList(Schema.OPTIONAL_STRING_SCHEMA), FunctionName.of("LEN"), - LenKudf.class)); + LenKudf.class + )); } private void addMathFunctions() { @@ -223,13 +297,15 @@ private void addMathFunctions() { Schema.OPTIONAL_FLOAT64_SCHEMA, Collections.singletonList(Schema.OPTIONAL_FLOAT64_SCHEMA), FunctionName.of("CEIL"), - CeilKudf.class)); + CeilKudf.class + )); addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( Schema.OPTIONAL_FLOAT64_SCHEMA, Collections.emptyList(), FunctionName.of("RANDOM"), - RandomKudf.class)); + RandomKudf.class + )); } private void addJsonFunctions() { @@ -238,57 +314,71 @@ private void addJsonFunctions() { Schema.OPTIONAL_STRING_SCHEMA, ImmutableList.of(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA), JsonExtractStringKudf.FUNCTION_NAME, - JsonExtractStringKudf.class)); + JsonExtractStringKudf.class + )); addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( Schema.OPTIONAL_BOOLEAN_SCHEMA, ImmutableList.of(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA), FunctionName.of("ARRAYCONTAINS"), - ArrayContainsKudf.class)); + ArrayContainsKudf.class + )); addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( Schema.OPTIONAL_BOOLEAN_SCHEMA, ImmutableList.of( SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), - Schema.OPTIONAL_STRING_SCHEMA), + Schema.OPTIONAL_STRING_SCHEMA + ), FunctionName.of("ARRAYCONTAINS"), - ArrayContainsKudf.class)); + ArrayContainsKudf.class + )); addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( Schema.OPTIONAL_BOOLEAN_SCHEMA, ImmutableList.of( SchemaBuilder.array(Schema.OPTIONAL_INT32_SCHEMA).optional().build(), - Schema.OPTIONAL_INT32_SCHEMA), + Schema.OPTIONAL_INT32_SCHEMA + ), FunctionName.of("ARRAYCONTAINS"), - ArrayContainsKudf.class)); + ArrayContainsKudf.class + )); addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( Schema.OPTIONAL_BOOLEAN_SCHEMA, ImmutableList.of( SchemaBuilder.array(Schema.OPTIONAL_INT64_SCHEMA).optional().build(), - Schema.OPTIONAL_INT64_SCHEMA), + Schema.OPTIONAL_INT64_SCHEMA + ), FunctionName.of("ARRAYCONTAINS"), - ArrayContainsKudf.class)); + ArrayContainsKudf.class + )); addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( Schema.OPTIONAL_BOOLEAN_SCHEMA, ImmutableList.of( SchemaBuilder.array(Schema.OPTIONAL_FLOAT64_SCHEMA).optional().build(), - Schema.OPTIONAL_FLOAT64_SCHEMA), + Schema.OPTIONAL_FLOAT64_SCHEMA + ), FunctionName.of("ARRAYCONTAINS"), - ArrayContainsKudf.class)); + ArrayContainsKudf.class + )); } private void addStructFieldFetcher() { - addBuiltInFunction(KsqlFunction.createLegacyBuiltIn( - SchemaBuilder.struct().optional().build(), - ImmutableList.of( + addBuiltInFunction( + KsqlFunction.createLegacyBuiltIn( SchemaBuilder.struct().optional().build(), - Schema.STRING_SCHEMA), - FetchFieldFromStruct.FUNCTION_NAME, - FetchFieldFromStruct.class), - true); + ImmutableList.of( + SchemaBuilder.struct().optional().build(), + Schema.STRING_SCHEMA + ), + FetchFieldFromStruct.FUNCTION_NAME, + FetchFieldFromStruct.class + ), + true + ); } private void addUdafFunctions() { @@ -303,6 +393,10 @@ private void addUdafFunctions() { functionRegistry.addAggregateFunctionFactory(new TopkDistinctAggFunctionFactory()); } + private void addUdtfFunctions() { + functionRegistry.addTableFunctionFactory(new ExplodeFunctionFactory()); + } + private void addBuiltInFunction(final KsqlFunction ksqlFunction) { addBuiltInFunction(ksqlFunction, false); } @@ -312,20 +406,5 @@ private void addBuiltInFunction(final KsqlFunction ksqlFunction, final boolean i .ensureFunctionFactory(builtInUdfFactory(ksqlFunction, internal)) .addFunction(ksqlFunction); } - - private static UdfFactory builtInUdfFactory( - final KsqlFunction ksqlFunction, - final boolean internal - ) { - final UdfMetadata metadata = new UdfMetadata( - ksqlFunction.getFunctionName().name(), - ksqlFunction.getDescription(), - "Confluent", - "", - KsqlFunction.INTERNAL_PATH, - internal); - - return new UdfFactory(ksqlFunction.getKudfClass(), metadata); - } } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java b/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java index 6ffe15aa196d..0d2a57f846f4 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java @@ -127,7 +127,7 @@ public void load() { // Does not handle customer udfs, i.e the loader is the ParentClassLoader and path is internal void loadUdfFromClass(final Class... udfClass) { - for (final Class theClass: udfClass) { + for (final Class theClass : udfClass) { //classes must be annotated with @UdfDescription final UdfDescription udfDescription = theClass.getAnnotation(UdfDescription.class); if (udfDescription == null) { @@ -135,7 +135,7 @@ void loadUdfFromClass(final Class... udfClass) { + "be annotated with @UdfDescription.", theClass.getName())); } //method must be public and annotated with @Udf - for (Method m: theClass.getDeclaredMethods()) { + for (Method m : theClass.getDeclaredMethods()) { if (m.isAnnotationPresent(Udf.class) && Modifier.isPublic(m.getModifiers())) { processUdfAnnotation( theClass, @@ -342,7 +342,7 @@ private void addFunction(final Class theClass, ksqlConfig -> { final Object actualUdf = instantiateUdfClass(method, classLevelAnnotation); if (actualUdf instanceof Configurable) { - ((Configurable)actualUdf) + ((Configurable) actualUdf) .configure(ksqlConfig.getKsqlFunctionsConfigProps(functionName)); } final PluggableUdf theUdf = new PluggableUdf(udf, actualUdf); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowEndKudaf.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowEndKudaf.java index 482abe7d8d8f..dd561764b4a5 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowEndKudaf.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowEndKudaf.java @@ -20,6 +20,7 @@ import io.confluent.ksql.function.udaf.UdafDescription; import io.confluent.ksql.function.udaf.UdafFactory; import io.confluent.ksql.function.udaf.placeholder.PlaceholderTableUdaf; +import io.confluent.ksql.util.KsqlConstants; /** * A placeholder KUDAF for extracting window end times. @@ -29,7 +30,7 @@ * @see WindowSelectMapper */ @SuppressWarnings("WeakerAccess") // Invoked via reflection. -@UdafDescription(name = "WindowEnd", author = "Confluent", +@UdafDescription(name = "WindowEnd", author = KsqlConstants.CONFLUENT_AUTHOR, description = "Returns the window end time, in milliseconds, for the given record. " + "If the given record is not part of a window the function will return NULL.") public final class WindowEndKudaf { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowStartKudaf.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowStartKudaf.java index c8765548373b..5f81a0d048b4 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowStartKudaf.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowStartKudaf.java @@ -20,6 +20,7 @@ import io.confluent.ksql.function.udaf.UdafDescription; import io.confluent.ksql.function.udaf.UdafFactory; import io.confluent.ksql.function.udaf.placeholder.PlaceholderTableUdaf; +import io.confluent.ksql.util.KsqlConstants; /** * A placeholder KUDAF for extracting window start times. @@ -29,7 +30,7 @@ * @see WindowSelectMapper */ @SuppressWarnings("WeakerAccess") // Invoked via reflection. -@UdafDescription(name = "WindowStart", author = "Confluent", +@UdafDescription(name = "WindowStart", author = KsqlConstants.CONFLUENT_AUTHOR, description = "Returns the window start time, in milliseconds, for the given record. " + "If the given record is not part of a window the function will return NULL.") public final class WindowStartKudaf { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udtf/array/ExplodeFunctionFactory.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udtf/array/ExplodeFunctionFactory.java new file mode 100644 index 000000000000..028b7f0a0f1c --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udtf/array/ExplodeFunctionFactory.java @@ -0,0 +1,61 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udtf.array; + +import io.confluent.ksql.function.KsqlTableFunction; +import io.confluent.ksql.function.TableFunctionFactory; +import io.confluent.ksql.function.udf.UdfMetadata; +import io.confluent.ksql.name.FunctionName; +import io.confluent.ksql.util.KsqlConstants; +import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.Version; +import java.util.List; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Schema.Type; + +public class ExplodeFunctionFactory extends TableFunctionFactory { + + private static final FunctionName NAME = FunctionName.of("EXPLODE"); + + public ExplodeFunctionFactory() { + super(new UdfMetadata(NAME.name(), + "Explodes an array into zero or more rows", + KsqlConstants.CONFLUENT_AUTHOR, + Version.getVersion(), + "", + false)); + } + + @SuppressWarnings("unchecked") + @Override + public KsqlTableFunction createTableFunction(final List argTypeList) { + if (argTypeList.size() != 1) { + throw new KsqlException("EXPLODE function should have one arguments."); + } + + final Schema schema = argTypeList.get(0); + if (schema.type() == Type.ARRAY) { + return new ExplodeTableFunction(NAME, schema.valueSchema(), argTypeList, + "Explodes an array"); + } + throw new KsqlException("Unsupported argument type for EXPLODE " + schema); + } + + @Override + public List> supportedArgs() { + return null; + } +} \ No newline at end of file diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udtf/array/ExplodeTableFunction.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udtf/array/ExplodeTableFunction.java new file mode 100644 index 000000000000..05d223b373da --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udtf/array/ExplodeTableFunction.java @@ -0,0 +1,39 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udtf.array; + +import io.confluent.ksql.function.BaseTableFunction; +import io.confluent.ksql.name.FunctionName; +import java.util.Collections; +import java.util.List; +import org.apache.kafka.connect.data.Schema; + +public class ExplodeTableFunction extends BaseTableFunction, T> { + + public ExplodeTableFunction( + final FunctionName functionName, + final Schema outputType, + final List arguments, + final String description) { + super(functionName, outputType, arguments, description); + } + + @Override + public List flatMap(final List input) { + return input == null ? Collections.emptyList() : input; + } + +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java index 352356cae1a9..633b7003b3a5 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java @@ -30,6 +30,7 @@ import io.confluent.ksql.planner.plan.AggregateNode; import io.confluent.ksql.planner.plan.DataSourceNode; import io.confluent.ksql.planner.plan.FilterNode; +import io.confluent.ksql.planner.plan.FlatMapNode; import io.confluent.ksql.planner.plan.JoinNode; import io.confluent.ksql.planner.plan.KsqlBareOutputNode; import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode; @@ -76,6 +77,10 @@ public LogicalPlanner( public OutputNode buildPlan() { PlanNode currentNode = buildSourceNode(); + if (!analysis.getTableFunctions().isEmpty()) { + currentNode = buildFlatMapNode(currentNode); + } + if (analysis.getWhereExpression().isPresent()) { currentNode = buildFilterNode(currentNode, analysis.getWhereExpression().get()); } @@ -177,7 +182,8 @@ private AggregateNode buildAggregateNode(final PlanNode sourcePlanNode) { final Optional keyFieldName = getSelectAliasMatching((expression, alias) -> expression.equals(groupBy) && !SchemaUtil.isFieldName(alias.name(), SchemaUtil.ROWTIME_NAME.name()) - && !SchemaUtil.isFieldName(alias.name(), SchemaUtil.ROWKEY_NAME.name())); + && !SchemaUtil.isFieldName(alias.name(), SchemaUtil.ROWKEY_NAME.name()), + sourcePlanNode); return new AggregateNode( new PlanNodeId("Aggregate"), @@ -204,15 +210,15 @@ private ProjectNode buildProjectNode(final PlanNode sourcePlanNode) { final Optional keyFieldName = getSelectAliasMatching((expression, alias) -> expression instanceof ColumnReferenceExp - && ((ColumnReferenceExp) expression).getReference().equals(sourceKeyFieldName) + && ((ColumnReferenceExp) expression).getReference().equals(sourceKeyFieldName), + sourcePlanNode ); return new ProjectNode( new PlanNodeId("Project"), sourcePlanNode, schema, - keyFieldName.map(ColumnRef::withoutSource), - analysis.getSelectExpressions() + keyFieldName.map(ColumnRef::withoutSource) ); } @@ -223,6 +229,10 @@ private static FilterNode buildFilterNode( return new FilterNode(new PlanNodeId("Filter"), sourcePlanNode, filterExpression); } + private FlatMapNode buildFlatMapNode(final PlanNode sourcePlanNode) { + return new FlatMapNode(new PlanNodeId("FlatMap"), sourcePlanNode, functionRegistry, analysis); + } + private PlanNode buildSourceNode() { final List sources = analysis.getFromDataSources(); @@ -242,17 +252,20 @@ private PlanNode buildSourceNode() { final DataSourceNode leftSourceNode = new DataSourceNode( new PlanNodeId("KafkaTopic_Left"), left.getDataSource(), - left.getAlias() + left.getAlias(), + analysis.getSelectExpressions() ); final DataSourceNode rightSourceNode = new DataSourceNode( new PlanNodeId("KafkaTopic_Right"), right.getDataSource(), - right.getAlias() + right.getAlias(), + analysis.getSelectExpressions() ); return new JoinNode( new PlanNodeId("Join"), + analysis.getSelectExpressions(), joinInfo.get().getType(), leftSourceNode, rightSourceNode, @@ -271,15 +284,17 @@ private DataSourceNode buildNonJoinNode(final List sources) { return new DataSourceNode( new PlanNodeId("KsqlTopic"), dataSource.getDataSource(), - dataSource.getAlias() + dataSource.getAlias(), + analysis.getSelectExpressions() ); } private Optional getSelectAliasMatching( - final BiFunction matcher + final BiFunction matcher, + final PlanNode sourcePlanNode ) { - for (int i = 0; i < analysis.getSelectExpressions().size(); i++) { - final SelectExpression select = analysis.getSelectExpressions().get(i); + for (int i = 0; i < sourcePlanNode.getSelectExpressions().size(); i++) { + final SelectExpression select = sourcePlanNode.getSelectExpressions().get(i); if (matcher.apply(select.getExpression(), select.getAlias())) { return Optional.of(select.getAlias()); @@ -303,8 +318,8 @@ private LogicalSchema buildProjectionSchema(final PlanNode sourcePlanNode) { builder.keyColumns(keyColumns); - for (int i = 0; i < analysis.getSelectExpressions().size(); i++) { - final SelectExpression select = analysis.getSelectExpressions().get(i); + for (int i = 0; i < sourcePlanNode.getSelectExpressions().size(); i++) { + final SelectExpression select = sourcePlanNode.getSelectExpressions().get(i); final SqlType expressionType = expressionTypeManager .getExpressionSqlType(select.getExpression()); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/PlanSourceExtractorVisitor.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/PlanSourceExtractorVisitor.java index a5a2c3c500fc..e8252d18b81e 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/PlanSourceExtractorVisitor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/PlanSourceExtractorVisitor.java @@ -19,6 +19,7 @@ import io.confluent.ksql.planner.plan.AggregateNode; import io.confluent.ksql.planner.plan.DataSourceNode; import io.confluent.ksql.planner.plan.FilterNode; +import io.confluent.ksql.planner.plan.FlatMapNode; import io.confluent.ksql.planner.plan.JoinNode; import io.confluent.ksql.planner.plan.OutputNode; import io.confluent.ksql.planner.plan.PlanNode; @@ -49,7 +50,7 @@ protected R visitFilter(final FilterNode node, final C context) { } protected R visitProject(final ProjectNode node, final C context) { - return process(node.getSources().get(0), context); + return process(node.getSource(), context); } protected R visitDataSourceNode(final DataSourceNode node, final C context) { @@ -73,6 +74,12 @@ protected R visitOutput(final OutputNode node, final C context) { return null; } + @Override + protected R visitFlatMap(final FlatMapNode node, final C context) { + process(node.getSources().get(0), context); + return null; + } + public Set getSourceNames() { return sourceNames; } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java index 428ce3760802..6f5a43b470ee 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java @@ -176,6 +176,11 @@ private List getFinalSelectExpressions() { return finalSelectExpressionList; } + @Override + public List getSelectExpressions() { + return Collections.emptyList(); + } + @Override public R accept(final PlanVisitor visitor, final C context) { return visitor.visitAggregate(this, context); @@ -302,7 +307,7 @@ private LogicalSchema buildLogicalSchema( for (int i = 0; i < aggregations.size(); i++) { final KsqlAggregateFunction aggregateFunction = UdafUtil.resolveAggregateFunction(functionRegistry, aggregations.get(i), inputSchema); - final ColumnName colName = ColumnName.aggregate(i); + final ColumnName colName = ColumnName.aggregateColumn(i); final SqlType fieldType = converter.toSqlType( useAggregate ? aggregateFunction.getAggregateType() : aggregateFunction.getReturnType() ); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/DataSourceNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/DataSourceNode.java index 50754440d4f8..c86f9c3b7f8e 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/DataSourceNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/DataSourceNode.java @@ -22,6 +22,7 @@ import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.context.QueryContext.Stacker; import io.confluent.ksql.execution.plan.LogicalSchemaWithMetaAndKeyFields; +import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.execution.plan.StreamSource; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.metastore.model.DataSource.DataSourceType; @@ -51,24 +52,29 @@ public class DataSourceNode extends PlanNode { private final LogicalSchemaWithMetaAndKeyFields schema; private final KeyField keyField; private final SchemaKStreamFactory schemaKStreamFactory; + private final List selectExpressions; public DataSourceNode( final PlanNodeId id, final DataSource dataSource, - final SourceName alias + final SourceName alias, + final List selectExpressions ) { - this(id, dataSource, alias, SchemaKStream::forSource); + this(id, dataSource, alias, selectExpressions, SchemaKStream::forSource); } DataSourceNode( final PlanNodeId id, final DataSource dataSource, final SourceName alias, + final List selectExpressions, final SchemaKStreamFactory schemaKStreamFactory ) { super(id, dataSource.getDataSourceType()); this.dataSource = requireNonNull(dataSource, "dataSource"); this.alias = requireNonNull(alias, "alias"); + this.selectExpressions = + ImmutableList.copyOf(requireNonNull(selectExpressions, "selectExpressions")); // DataSourceNode copies implicit and key fields into the value schema // It users a KS valueMapper to add the key fields @@ -121,6 +127,11 @@ public List getSources() { return ImmutableList.of(); } + @Override + public List getSelectExpressions() { + return selectExpressions; + } + @Override public R accept(final PlanVisitor visitor, final C context) { return visitor.visitDataSourceNode(this, context); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FilterNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FilterNode.java index 9259b5c14bbc..c3b3524a0e25 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FilterNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FilterNode.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableList; import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.metastore.model.KeyField; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.services.KafkaTopicClient; @@ -31,6 +32,7 @@ public class FilterNode extends PlanNode { private final PlanNode source; private final Expression predicate; + private final List selectExpressions; public FilterNode( final PlanNodeId id, @@ -41,6 +43,7 @@ public FilterNode( this.source = Objects.requireNonNull(source, "source"); this.predicate = Objects.requireNonNull(predicate, "predicate"); + this.selectExpressions = ImmutableList.copyOf(source.getSelectExpressions()); } public Expression getPredicate() { @@ -62,6 +65,11 @@ public List getSources() { return ImmutableList.of(source); } + @Override + public List getSelectExpressions() { + return selectExpressions; + } + public PlanNode getSource() { return source; } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FlatMapNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FlatMapNode.java new file mode 100644 index 000000000000..a05a5606c2d4 --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FlatMapNode.java @@ -0,0 +1,196 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.planner.plan; + +import com.google.common.collect.ImmutableList; +import io.confluent.ksql.analyzer.Analysis; +import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter; +import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter.Context; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; +import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.expression.tree.FunctionCall; +import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor; +import io.confluent.ksql.execution.function.UdtfUtil; +import io.confluent.ksql.execution.plan.SelectExpression; +import io.confluent.ksql.function.FunctionRegistry; +import io.confluent.ksql.function.KsqlTableFunction; +import io.confluent.ksql.metastore.model.KeyField; +import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.schema.ksql.Column; +import io.confluent.ksql.schema.ksql.ColumnRef; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.SchemaConverters; +import io.confluent.ksql.schema.ksql.SchemaConverters.ConnectToSqlTypeConverter; +import io.confluent.ksql.schema.ksql.types.SqlType; +import io.confluent.ksql.services.KafkaTopicClient; +import io.confluent.ksql.structured.SchemaKStream; +import io.confluent.ksql.util.KsqlException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import javax.annotation.concurrent.Immutable; + +/** + * A node in the logical plan which represents a flat map operation - transforming a single row into + * zero or more rows. + */ +@Immutable +public class FlatMapNode extends PlanNode { + + private final PlanNode source; + private final LogicalSchema outputSchema; + private final List finalSelectExpressions; + private final Analysis analysis; + private final FunctionRegistry functionRegistry; + + public FlatMapNode( + final PlanNodeId id, + final PlanNode source, + final FunctionRegistry functionRegistry, + final Analysis analysis + ) { + super(id, source.getNodeOutputType()); + this.source = Objects.requireNonNull(source, "source"); + this.analysis = Objects.requireNonNull(analysis); + this.functionRegistry = functionRegistry; + this.finalSelectExpressions = buildFinalSelectExpressions(); + outputSchema = buildLogicalSchema(source.getSchema()); + if (analysis.getTableFunctions().size() > 1) { + throw new KsqlException("Only one table function per query currently is supported"); + } + } + + @Override + public LogicalSchema getSchema() { + return outputSchema; + } + + @Override + public KeyField getKeyField() { + return source.getKeyField(); + } + + @Override + public List getSources() { + return ImmutableList.of(source); + } + + public PlanNode getSource() { + return source; + } + + @Override + public List getSelectExpressions() { + return finalSelectExpressions; + } + + @Override + public R accept(final PlanVisitor visitor, final C context) { + return visitor.visitFlatMap(this, context); + } + + @Override + protected int getPartitions(final KafkaTopicClient kafkaTopicClient) { + return source.getPartitions(kafkaTopicClient); + } + + @Override + public SchemaKStream buildStream(final KsqlQueryBuilder builder) { + + final QueryContext.Stacker contextStacker = builder.buildNodeContext(getId().toString()); + + return getSource().buildStream(builder).flatMap( + outputSchema, + analysis.getTableFunctions().get(0), + contextStacker + ); + } + + private LogicalSchema buildLogicalSchema(final LogicalSchema inputSchema) { + final LogicalSchema.Builder schemaBuilder = LogicalSchema.builder(); + final List cols = inputSchema.value(); + + // We copy all the original columns to the output schema + schemaBuilder.keyColumns(inputSchema.key()); + for (Column col : cols) { + schemaBuilder.valueColumn(col); + } + + final ConnectToSqlTypeConverter converter = SchemaConverters.connectToSqlConverter(); + + // And add new columns representing the exploded values at the end + for (int i = 0; i < analysis.getTableFunctions().size(); i++) { + final KsqlTableFunction tableFunction = + UdtfUtil.resolveTableFunction(functionRegistry, + analysis.getTableFunctions().get(i), inputSchema + ); + final ColumnName colName = ColumnName.synthesisedSchemaColumn(i); + final SqlType fieldType = converter.toSqlType(tableFunction.getReturnType()); + schemaBuilder.valueColumn(colName, fieldType); + } + + return schemaBuilder.build(); + } + + private List buildFinalSelectExpressions() { + final TableFunctionExpressionRewriter tableFunctionExpressionRewriter = + new TableFunctionExpressionRewriter(); + final List selectExpressions = new ArrayList<>(); + for (final SelectExpression select : analysis.getSelectExpressions()) { + final Expression exp = select.getExpression(); + selectExpressions.add( + SelectExpression.of( + select.getAlias(), + ExpressionTreeRewriter.rewriteWith( + tableFunctionExpressionRewriter::process, exp) + )); + } + return selectExpressions; + } + + private class TableFunctionExpressionRewriter + extends VisitParentExpressionVisitor, Context> { + + private int variableIndex = 0; + + TableFunctionExpressionRewriter() { + super(Optional.empty()); + } + + @Override + public Optional visitFunctionCall( + final FunctionCall node, + final Context context + ) { + final String functionName = node.getName().name(); + if (functionRegistry.isTableFunction(functionName)) { + final ColumnName varName = ColumnName.synthesisedSchemaColumn(variableIndex); + variableIndex++; + return Optional.of( + new ColumnReferenceExp(node.getLocation(), ColumnRef.of(Optional.empty(), varName))); + } else { + final List arguments = new ArrayList<>(); + for (final Expression argExpression : node.getArguments()) { + arguments.add(context.process(argExpression)); + } + return Optional.of(new FunctionCall(node.getLocation(), node.getName(), arguments)); + } + } + } +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java index 6723aac5618a..b873fb0244f5 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java @@ -19,6 +19,7 @@ import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.context.QueryContext.Stacker; +import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.metastore.model.DataSource.DataSourceType; import io.confluent.ksql.metastore.model.KeyField; import io.confluent.ksql.metastore.model.KeyField.LegacyField; @@ -59,9 +60,11 @@ public enum JoinType { private final ColumnRef rightJoinFieldName; private final KeyField keyField; private final Optional withinExpression; + private final List selectExpressions; public JoinNode( final PlanNodeId id, + final List selectExpressions, final JoinType joinType, final DataSourceNode left, final DataSourceNode right, @@ -76,6 +79,7 @@ public JoinNode( this.leftJoinFieldName = Objects.requireNonNull(leftJoinFieldName, "leftJoinFieldName"); this.rightJoinFieldName = Objects.requireNonNull(rightJoinFieldName, "rightJoinFieldName"); this.withinExpression = Objects.requireNonNull(withinExpression, "withinExpression"); + this.selectExpressions = Objects.requireNonNull(selectExpressions, "selectExpressions"); final Column leftKeyCol = validateSchemaColumn(leftJoinFieldName, left.getSchema()); validateSchemaColumn(rightJoinFieldName, right.getSchema()); @@ -107,6 +111,11 @@ public R accept(final PlanVisitor visitor, final C context) { return visitor.visitJoin(this, context); } + @Override + public List getSelectExpressions() { + return selectExpressions; + } + public DataSourceNode getLeft() { return left; } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/OutputNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/OutputNode.java index 7089f93b1bc0..effb3fee0570 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/OutputNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/OutputNode.java @@ -18,11 +18,13 @@ import static java.util.Objects.requireNonNull; import com.google.common.collect.ImmutableList; +import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.query.id.QueryIdGenerator; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy; +import java.util.Collections; import java.util.List; import java.util.OptionalInt; import javax.annotation.concurrent.Immutable; @@ -76,6 +78,11 @@ protected int getPartitions(final KafkaTopicClient kafkaTopicClient) { return source.getPartitions(kafkaTopicClient); } + @Override + public List getSelectExpressions() { + return Collections.emptyList(); + } + @Override public R accept(final PlanVisitor visitor, final C context) { return visitor.visitOutput(this, context); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/PlanNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/PlanNode.java index 9f83890f7923..7f1469da3525 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/PlanNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/PlanNode.java @@ -18,6 +18,7 @@ import static java.util.Objects.requireNonNull; import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.metastore.model.DataSource.DataSourceType; import io.confluent.ksql.metastore.model.KeyField; import io.confluent.ksql.schema.ksql.LogicalSchema; @@ -51,6 +52,8 @@ public DataSourceType getNodeOutputType() { public abstract KeyField getKeyField(); public abstract List getSources(); + + public abstract List getSelectExpressions(); public R accept(final PlanVisitor visitor, final C context) { return visitor.visitPlan(this, context); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/PlanVisitor.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/PlanVisitor.java index a6c34516486e..97365e776469 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/PlanVisitor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/PlanVisitor.java @@ -45,4 +45,8 @@ protected R visitOutput(final OutputNode node, final C context) { return visitPlan(node, context); } + protected R visitFlatMap(final FlatMapNode node, final C context) { + return visitPlan(node, context); + } + } \ No newline at end of file diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ProjectNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ProjectNode.java index bcbe0de308ae..7626ffc4730d 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ProjectNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ProjectNode.java @@ -43,15 +43,13 @@ public ProjectNode( final PlanNodeId id, final PlanNode source, final LogicalSchema schema, - final Optional keyFieldName, - final List projectExpressions + final Optional keyFieldName ) { super(id, source.getNodeOutputType()); this.source = requireNonNull(source, "source"); this.schema = requireNonNull(schema, "schema"); - this.projectExpressions = ImmutableList - .copyOf(requireNonNull(projectExpressions, "projectExpressions")); + this.projectExpressions = ImmutableList.copyOf(source.getSelectExpressions()); this.keyField = KeyField.of( requireNonNull(keyFieldName, "keyFieldName"), source.getKeyField().legacy()) @@ -89,7 +87,8 @@ public KeyField getKeyField() { return keyField; } - public List getProjectSelectExpressions() { + @Override + public List getSelectExpressions() { return projectExpressions; } @@ -102,7 +101,7 @@ public R accept(final PlanVisitor visitor, final C context) { public SchemaKStream buildStream(final KsqlQueryBuilder builder) { return getSource().buildStream(builder) .select( - getProjectSelectExpressions(), + getSelectExpressions(), builder.buildNodeContext(getId().toString()), builder ); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index 356c1f1a8f96..f6261d038461 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -28,6 +28,9 @@ import io.confluent.ksql.execution.ddl.commands.KsqlTopic; import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.expression.tree.FunctionCall; +import io.confluent.ksql.execution.function.UdtfUtil; +import io.confluent.ksql.execution.function.udtf.TableFunctionApplier; import io.confluent.ksql.execution.plan.AbstractStreamSource; import io.confluent.ksql.execution.plan.ExecutionStep; import io.confluent.ksql.execution.plan.ExecutionStepProperties; @@ -37,6 +40,7 @@ import io.confluent.ksql.execution.plan.LogicalSchemaWithMetaAndKeyFields; import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.execution.plan.StreamFilter; +import io.confluent.ksql.execution.plan.StreamFlatMap; import io.confluent.ksql.execution.plan.StreamGroupBy; import io.confluent.ksql.execution.plan.StreamGroupByKey; import io.confluent.ksql.execution.plan.StreamMapValues; @@ -49,6 +53,7 @@ import io.confluent.ksql.execution.plan.WindowedStreamSource; import io.confluent.ksql.execution.streams.ExecutionStepFactory; import io.confluent.ksql.function.FunctionRegistry; +import io.confluent.ksql.function.KsqlTableFunction; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.metastore.model.KeyField; import io.confluent.ksql.metastore.model.KeyField.LegacyField; @@ -69,6 +74,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.OptionalInt; import java.util.Set; import java.util.stream.Collectors; import org.apache.kafka.connect.data.Struct; @@ -535,8 +541,8 @@ public SchemaKStream selectKey( final LegacyField proposedLegacy = LegacyField.of(proposedKey.ref(), proposedKey.type()); final KeyField resultantKeyField = isRowKey(columnRef) - ? keyField.withLegacy(proposedLegacy) - : KeyField.of(columnRef, proposedLegacy); + ? keyField.withLegacy(proposedLegacy) + : KeyField.of(columnRef, proposedLegacy); final boolean namesMatch = existingKey .map(kf -> kf.matches(proposedKey.ref())) @@ -678,6 +684,41 @@ private SchemaKGroupedStream groupByKey( ); } + public SchemaKStream flatMap( + final LogicalSchema outputSchema, + final FunctionCall functionCall, + final QueryContext.Stacker contextStacker + ) { + final ColumnReferenceExp exp = (ColumnReferenceExp)functionCall.getArguments().get(0); + final ColumnName columnName = exp.getReference().name(); + final ColumnRef ref = ColumnRef.withoutSource(columnName); + final OptionalInt indexInInput = getSchema().valueColumnIndex(ref); + if (!indexInInput.isPresent()) { + throw new IllegalArgumentException("Can't find input column " + columnName); + } + final KsqlTableFunction tableFunction = UdtfUtil.resolveTableFunction( + functionRegistry, + functionCall, + getSchema() + ); + final TableFunctionApplier functionHolder = + new TableFunctionApplier(tableFunction, indexInInput.getAsInt()); + final StreamFlatMap step = ExecutionStepFactory.streamFlatMap( + contextStacker, + sourceStep, + outputSchema, + functionHolder + ); + return new SchemaKStream( + step, + keyFormat, + keyField, + sourceSchemaKStreams, + type, + ksqlConfig, + functionRegistry); + } + public ExecutionStep getSourceStep() { return sourceStep; } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerFunctionalTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerFunctionalTest.java index 9c5ea40c9fbe..31ee0db8357d 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerFunctionalTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerFunctionalTest.java @@ -28,19 +28,18 @@ import io.confluent.ksql.analyzer.Analysis.AliasedDataSource; import io.confluent.ksql.analyzer.Analysis.Into; +import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.execution.expression.tree.ComparisonExpression; import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.execution.expression.tree.IntegerLiteral; import io.confluent.ksql.execution.plan.SelectExpression; -import io.confluent.ksql.name.ColumnName; -import io.confluent.ksql.name.SourceName; -import io.confluent.ksql.schema.ksql.ColumnRef; -import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.metastore.model.KsqlStream; import io.confluent.ksql.metastore.model.KsqlTable; +import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.name.SourceName; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.parser.KsqlParserTestUtil; import io.confluent.ksql.parser.tree.CreateStreamAsSelect; @@ -48,6 +47,7 @@ import io.confluent.ksql.parser.tree.InsertInto; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.Sink; +import io.confluent.ksql.schema.ksql.ColumnRef; import io.confluent.ksql.serde.Format; import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.util.KsqlException; @@ -192,6 +192,20 @@ public void shouldCreateAnalysisForInsertInto() { assertThat(into.getKsqlTopic(), is(test0.getKsqlTopic())); } + @Test + public void shouldAnalyseTableFunctions() { + + // Given: + final Query query = givenQuery("SELECT ID, EXPLODE(ARR1) FROM SENSOR_READINGS;"); + + // When: + final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); + + // Then: + assertThat(analysis.getTableFunctions(), hasSize(1)); + assertThat(analysis.getTableFunctions().get(0).getName().name(), equalTo("EXPLODE")); + } + @Test public void shouldAnalyseWindowedAggregate() { // Given: diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/InternalFunctionRegistryTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/InternalFunctionRegistryTest.java index 213711f868d4..b6160f1e11c3 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/function/InternalFunctionRegistryTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/InternalFunctionRegistryTest.java @@ -30,6 +30,7 @@ import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import io.confluent.ksql.function.udf.Kudf; +import io.confluent.ksql.function.udf.UdfMetadata; import io.confluent.ksql.name.FunctionName; import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.util.KsqlException; @@ -233,85 +234,19 @@ public void shouldKnowIfFunctionIsAggregate() { @Test public void shouldAddAggregateFunction() { - functionRegistry.addAggregateFunctionFactory( - new AggregateFunctionFactory("my_aggregate") { - @Override - public KsqlAggregateFunction createAggregateFunction(final List argTypeList, - final AggregateFunctionInitArguments initArgs) { - return new KsqlAggregateFunction() { - - @Override - public FunctionName getFunctionName() { - return FunctionName.of("my_aggregate"); - } - - @Override - public Supplier getInitialValueSupplier() { - return null; - } - - @Override - public int getArgIndexInValue() { - return 0; - } - - @Override - public Schema getAggregateType() { - return null; - } - - @Override - public SqlType aggregateType() { - return null; - } - - @Override - public Schema getReturnType() { - return null; - } - - @Override - public SqlType returnType() { - return null; - } - - @Override - public Object aggregate(final Object currentValue, final Object aggregateValue) { - return null; - } - - @Override - public Merger getMerger() { - return null; - } - - @Override - public Function getResultMapper() { - return null; - } - - @Override - public List getArguments() { - return argTypeList; - } - - @Override - public String getDescription() { - return null; - } - }; - } - - @Override - public List> supportedArgs() { - return ImmutableList.of(); - } - }); + functionRegistry.addAggregateFunctionFactory(createAggregateFunctionFactory()); assertThat(functionRegistry.getAggregateFunction("my_aggregate", Schema.OPTIONAL_INT32_SCHEMA, AggregateFunctionInitArguments.EMPTY_ARGS), not(nullValue())); } + @Test + public void shouldAddTableFunction() { + functionRegistry.addTableFunctionFactory(createTableFunctionFactory()); + assertThat(functionRegistry.getTableFunction("my_tablefunction", + Schema.OPTIONAL_INT32_SCHEMA), not(nullValue())); + } + @Test public void shouldAddFunctionWithSameNameButDifferentReturnTypes() { // Given: @@ -419,4 +354,125 @@ public void shouldNotAllowModificationViaListFunctions() { private void givenUdfFactoryRegistered() { functionRegistry.ensureFunctionFactory(UdfLoaderUtil.createTestUdfFactory(func)); } + + private static AggregateFunctionFactory createAggregateFunctionFactory() { + return new AggregateFunctionFactory("my_aggregate") { + @Override + public KsqlAggregateFunction createAggregateFunction(final List argTypeList, + final AggregateFunctionInitArguments initArgs) { + return new KsqlAggregateFunction() { + + @Override + public FunctionName getFunctionName() { + return FunctionName.of("my_aggregate"); + } + + @Override + public Supplier getInitialValueSupplier() { + return null; + } + + @Override + public int getArgIndexInValue() { + return 0; + } + + @Override + public Schema getAggregateType() { + return null; + } + + @Override + public SqlType aggregateType() { + return null; + } + + @Override + public Schema getReturnType() { + return null; + } + + @Override + public SqlType returnType() { + return null; + } + + @Override + public Object aggregate(final Object currentValue, final Object aggregateValue) { + return null; + } + + @Override + public Merger getMerger() { + return null; + } + + @Override + public Function getResultMapper() { + return null; + } + + @Override + public List getArguments() { + return argTypeList; + } + + @Override + public String getDescription() { + return null; + } + }; + } + + @Override + public List> supportedArgs() { + return ImmutableList.of(); + } + }; + } + + private static TableFunctionFactory createTableFunctionFactory() { + return new TableFunctionFactory(new UdfMetadata("my_tablefunction", + "", "", "", "", false)) { + @Override + public KsqlTableFunction createTableFunction(List argTypeList) { + return new KsqlTableFunction, Object>() { + @Override + public Schema getReturnType() { + return null; + } + + @Override + public SqlType returnType() { + return null; + } + + @Override + public List flatMap(List currentValue) { + return null; + } + + @Override + public String getDescription() { + return null; + } + + @Override + public FunctionName getFunctionName() { + return null; + } + + @Override + public List getArguments() { + return null; + } + }; + } + + @Override + protected List> supportedArgs() { + return null; + } + }; + } } \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/tf/ExplodeArrayTableFunctionTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/tf/ExplodeArrayTableFunctionTest.java new file mode 100644 index 000000000000..e57056d7b18f --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/tf/ExplodeArrayTableFunctionTest.java @@ -0,0 +1,77 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.tf; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; + +import io.confluent.ksql.function.KsqlTableFunction; +import io.confluent.ksql.function.udtf.array.ExplodeFunctionFactory; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.junit.Before; +import org.junit.Test; + +public class ExplodeArrayTableFunctionTest { + + private ExplodeFunctionFactory factory; + + @Before + @SuppressWarnings("unchecked") + public void setUp() { + factory = new ExplodeFunctionFactory(); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldCreateTableFunction() { + KsqlTableFunction tf = + (KsqlTableFunction)factory.createTableFunction(intListParamTypes()); + assertThat(tf, is(notNullValue())); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldFlatMapArray() { + List input = Arrays.asList(1, 2, 3, 4, 5, 6); + KsqlTableFunction tf = createTableFunction(); + List output = tf.flatMap(input); + assertThat(input, is(output)); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldFlatMapEmptyArray() { + List input = Collections.emptyList(); + KsqlTableFunction tf = createTableFunction(); + List output = tf.flatMap(input); + assertThat(input, is(output)); + } + + private static List intListParamTypes() { + Schema schema = SchemaBuilder.array(SchemaBuilder.OPTIONAL_INT32_SCHEMA).build(); + return Collections.singletonList(schema); + } + + @SuppressWarnings("unchecked") + private KsqlTableFunction, Integer> createTableFunction() { + return (KsqlTableFunction, Integer>)factory.createTableFunction(intListParamTypes()); + } +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/SelectValueMapperIntegrationTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/SelectValueMapperIntegrationTest.java index a0897560a3ac..985080fb6482 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/SelectValueMapperIntegrationTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/SelectValueMapperIntegrationTest.java @@ -83,7 +83,7 @@ private SelectValueMapper givenSelectMapperFor(final String query) { final PlanNode planNode = AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore); final ProjectNode projectNode = (ProjectNode) planNode.getSources().get(0); final LogicalSchema schema = planNode.getTheSourceNode().getSchema(); - final List selectExpressions = projectNode.getProjectSelectExpressions(); + final List selectExpressions = projectNode.getSelectExpressions(); return SelectValueMapperFactory.create( selectExpressions, diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java index 65bf517c9d98..3cf68032207f 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java @@ -138,7 +138,8 @@ public class DataSourceNodeTest { private final DataSourceNode node = new DataSourceNode( PLAN_NODE_ID, SOME_SOURCE, - SOME_SOURCE.getName() + SOME_SOURCE.getName(), + Collections.emptyList() ); private final QueryId queryId = new QueryId("source-test"); @@ -274,7 +275,8 @@ public void shouldBuildSchemaKTableWhenKTableSource() { final DataSourceNode node = new DataSourceNode( PLAN_NODE_ID, table, - table.getName()); + table.getName(), + Collections.emptyList()); final SchemaKStream result = buildStream(node); assertThat(result.getClass(), equalTo(SchemaKTable.class)); @@ -427,7 +429,8 @@ private DataSourceNode nodeWithMockTableSource() { return new DataSourceNode( realNodeId, dataSource, - SourceName.of("t") + SourceName.of("t"), + Collections.emptyList() ); } @@ -438,6 +441,7 @@ private DataSourceNode buildNodeWithMockSource() { PLAN_NODE_ID, dataSource, SourceName.of("name"), + Collections.emptyList(), schemaKStreamFactory ); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java index c378e9bbc8ce..5f28e7615e75 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java @@ -213,6 +213,7 @@ public void shouldThrowIfLeftKeyFieldNotInLeftSchema() { // When: new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.LEFT, left, right, @@ -231,6 +232,7 @@ public void shouldThrowIfRightKeyFieldNotInRightSchema() { // When: new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.LEFT, left, right, @@ -245,6 +247,7 @@ public void shouldReturnLeftJoinKeyAsKeyField() { // When: final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinType.LEFT, left, right, @@ -333,6 +336,7 @@ public void shouldPerformStreamToStreamLeftJoin() { final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.LEFT, left, right, @@ -364,6 +368,7 @@ public void shouldPerformStreamToStreamInnerJoin() { final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.INNER, left, right, @@ -395,6 +400,7 @@ public void shouldPerformStreamToStreamOuterJoin() { final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.OUTER, left, right, @@ -426,6 +432,7 @@ public void shouldNotPerformStreamStreamJoinWithoutJoinWindow() { final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.INNER, left, right, @@ -451,6 +458,7 @@ public void shouldNotPerformJoinIfInputPartitionsMisMatch() { final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.OUTER, left, right, @@ -480,6 +488,7 @@ public void shouldFailJoinIfTableCriteriaColumnIsNotKey() { final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.LEFT, left, right, @@ -510,6 +519,7 @@ public void shouldFailJoinIfTableHasNoKeyAndJoinFieldIsNotRowKey() { final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.LEFT, left, right, @@ -537,6 +547,7 @@ public void shouldHandleJoinIfTableHasNoKeyAndJoinFieldIsRowKey() { final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.LEFT, left, right, @@ -566,6 +577,7 @@ public void shouldPerformStreamToTableLeftJoin() { final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.LEFT, left, right, @@ -595,6 +607,7 @@ public void shouldPerformStreamToTableInnerJoin() { final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.INNER, left, right, @@ -624,6 +637,7 @@ public void shouldNotAllowStreamToTableOuterJoin() { final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.OUTER, left, right, @@ -652,6 +666,7 @@ public void shouldNotPerformStreamToTableJoinIfJoinWindowIsSpecified() { final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.OUTER, left, right, @@ -681,6 +696,7 @@ public void shouldFailTableTableJoinIfLeftCriteriaColumnIsNotKey() { final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.LEFT, left, right, @@ -714,6 +730,7 @@ public void shouldFailTableTableJoinIfRightCriteriaColumnIsNotKey() { final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.LEFT, left, right, @@ -744,6 +761,7 @@ public void shouldPerformTableToTableInnerJoin() { final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.INNER, left, right, @@ -771,6 +789,7 @@ public void shouldPerformTableToTableLeftJoin() { final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.LEFT, left, right, @@ -798,6 +817,7 @@ public void shouldPerformTableToTableOuterJoin() { final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.OUTER, left, right, @@ -827,6 +847,7 @@ public void shouldNotPerformTableToTableJoinIfJoinWindowIsSpecified() { final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.OUTER, left, right, @@ -850,6 +871,7 @@ public void shouldHaveFullyQualifiedJoinSchema() { // When: final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.OUTER, left, right, @@ -880,6 +902,7 @@ public void shouldSelectLeftKeyField() { final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.OUTER, left, right, @@ -907,6 +930,7 @@ public void shouldNotUseSourceSerdeOptionsForInternalTopics() { final JoinNode joinNode = new JoinNode( nodeId, + Collections.emptyList(), JoinNode.JoinType.LEFT, left, right, diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java index f79b3e0b436c..52f560962a60 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java @@ -81,6 +81,7 @@ public void init() { when(source.getKeyField()).thenReturn(SOURCE_KEY_FIELD); when(source.buildStream(any())).thenReturn((SchemaKStream) stream); when(source.getNodeOutputType()).thenReturn(DataSourceType.KSTREAM); + when(source.getSelectExpressions()).thenReturn(ImmutableList.of(SELECT_0, SELECT_1)); when(ksqlStreamBuilder.buildNodeContext(NODE_ID.toString())).thenReturn(stacker); when(stream.select(anyList(), any(), any())).thenReturn((SchemaKStream) stream); @@ -88,31 +89,30 @@ public void init() { NODE_ID, source, SCHEMA, - Optional.of(ColumnRef.withoutSource(ColumnName.of(KEY_FIELD_NAME))), - ImmutableList.of(SELECT_0, SELECT_1)); + Optional.of(ColumnRef.withoutSource(ColumnName.of(KEY_FIELD_NAME)))); } @Test(expected = KsqlException.class) public void shouldThrowIfSchemaSizeDoesntMatchProjection() { + when(source.getSelectExpressions()).thenReturn(ImmutableList.of(SELECT_0)); new ProjectNode( NODE_ID, source, SCHEMA, - Optional.of(ColumnRef.withoutSource(ColumnName.of(KEY_FIELD_NAME))), - ImmutableList.of(SELECT_0)); // <-- not enough expressions + Optional.of(ColumnRef.withoutSource(ColumnName.of(KEY_FIELD_NAME)))); // <-- not enough expressions } @Test(expected = IllegalArgumentException.class) public void shouldThrowOnSchemaSelectNameMismatch() { + when(source.getSelectExpressions()).thenReturn(ImmutableList.of( + SelectExpression.of(ColumnName.of("wrongName"), TRUE_EXPRESSION), + SELECT_1 + )); projectNode = new ProjectNode( NODE_ID, source, SCHEMA, - Optional.of(ColumnRef.withoutSource(ColumnName.of(KEY_FIELD_NAME))), - ImmutableList.of( - SelectExpression.of(ColumnName.of("wrongName"), TRUE_EXPRESSION), - SELECT_1 - ) + Optional.of(ColumnRef.withoutSource(ColumnName.of(KEY_FIELD_NAME))) ); } @@ -140,12 +140,12 @@ public void shouldCreateProjectionWithFieldNameExpressionPairs() { @Test(expected = IllegalArgumentException.class) public void shouldThrowIfKeyFieldNameNotInSchema() { + when(source.getSelectExpressions()).thenReturn(ImmutableList.of(SELECT_0, SELECT_1)); new ProjectNode( NODE_ID, source, SCHEMA, - Optional.of(ColumnRef.withoutSource(ColumnName.of("Unknown Key Field"))), - ImmutableList.of(SELECT_0, SELECT_1)); + Optional.of(ColumnRef.withoutSource(ColumnName.of("Unknown Key Field")))); } @Test diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java index 2bd66a9dfc96..b8c0a4a28f11 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java @@ -168,7 +168,7 @@ public void testSelectSchemaKStream() { final ProjectNode projectNode = (ProjectNode) logicalPlan.getSources().get(0); - final List selectExpressions = projectNode.getProjectSelectExpressions(); + final List selectExpressions = projectNode.getSelectExpressions(); // When: final SchemaKStream projectedSchemaKStream = initialSchemaKStream.select( @@ -191,7 +191,7 @@ public void shouldBuildStepForSelect() { final PlanNode logicalPlan = givenInitialKStreamOf( "SELECT col0, col2, col3 FROM test1 WHERE col0 > 100 EMIT CHANGES;"); final ProjectNode projectNode = (ProjectNode) logicalPlan.getSources().get(0); - final List selectExpressions = projectNode.getProjectSelectExpressions(); + final List selectExpressions = projectNode.getSelectExpressions(); // When: final SchemaKStream projectedSchemaKStream = initialSchemaKStream.select( @@ -219,7 +219,7 @@ public void shouldUpdateKeyIfRenamed() { final PlanNode logicalPlan = givenInitialKStreamOf( "SELECT col0 as NEWKEY, col2, col3 FROM test1 EMIT CHANGES;"); final ProjectNode projectNode = (ProjectNode) logicalPlan.getSources().get(0); - final List selectExpressions = projectNode.getProjectSelectExpressions(); + final List selectExpressions = projectNode.getSelectExpressions(); // When: final SchemaKStream result = initialSchemaKStream @@ -238,7 +238,7 @@ public void shouldUpdateKeyIfRenamedViaFullyQualifiedName() { final PlanNode logicalPlan = givenInitialKStreamOf( "SELECT test1.col0 as NEWKEY, col2, col3 FROM test1 EMIT CHANGES;"); final ProjectNode projectNode = (ProjectNode) logicalPlan.getSources().get(0); - final List selectExpressions = projectNode.getProjectSelectExpressions(); + final List selectExpressions = projectNode.getSelectExpressions(); // When: final SchemaKStream result = initialSchemaKStream @@ -257,7 +257,7 @@ public void shouldUpdateKeyIfRenamedAndSourceIsAliased() { final PlanNode logicalPlan = givenInitialKStreamOf( "SELECT t.col0 as NEWKEY, col2, col3 FROM test1 t EMIT CHANGES;"); final ProjectNode projectNode = (ProjectNode) logicalPlan.getSources().get(0); - final List selectExpressions = projectNode.getProjectSelectExpressions(); + final List selectExpressions = projectNode.getSelectExpressions(); // When: final SchemaKStream result = initialSchemaKStream @@ -275,7 +275,7 @@ public void shouldPreserveKeyOnSelectStar() { // Given: final PlanNode logicalPlan = givenInitialKStreamOf("SELECT * FROM test1 EMIT CHANGES;"); final ProjectNode projectNode = (ProjectNode) logicalPlan.getSources().get(0); - final List selectExpressions = projectNode.getProjectSelectExpressions(); + final List selectExpressions = projectNode.getSelectExpressions(); // When: final SchemaKStream result = initialSchemaKStream @@ -292,7 +292,7 @@ public void shouldUpdateKeyIfMovedToDifferentIndex() { // Given: final PlanNode logicalPlan = givenInitialKStreamOf("SELECT col2, col0, col3 FROM test1 EMIT CHANGES;"); final ProjectNode projectNode = (ProjectNode) logicalPlan.getSources().get(0); - final List selectExpressions = projectNode.getProjectSelectExpressions(); + final List selectExpressions = projectNode.getSelectExpressions(); // When: final SchemaKStream result = initialSchemaKStream @@ -310,7 +310,7 @@ public void shouldDropKeyIfNotSelected() { // Given: final PlanNode logicalPlan = givenInitialKStreamOf("SELECT col2, col3 FROM test1 EMIT CHANGES;"); final ProjectNode projectNode = (ProjectNode) logicalPlan.getSources().get(0); - final List selectExpressions = projectNode.getProjectSelectExpressions(); + final List selectExpressions = projectNode.getSelectExpressions(); // When: final SchemaKStream result = initialSchemaKStream @@ -325,7 +325,7 @@ public void shouldHandleSourceWithoutKey() { // Given: final PlanNode logicalPlan = givenInitialKStreamOf("SELECT * FROM test4 EMIT CHANGES;"); final ProjectNode projectNode = (ProjectNode) logicalPlan.getSources().get(0); - final List selectExpressions = projectNode.getProjectSelectExpressions(); + final List selectExpressions = projectNode.getSelectExpressions(); // When: final SchemaKStream result = initialSchemaKStream @@ -344,7 +344,7 @@ public void testSelectWithExpression() { // When: final SchemaKStream projectedSchemaKStream = initialSchemaKStream.select( - projectNode.getProjectSelectExpressions(), + projectNode.getSelectExpressions(), childContextStacker, queryBuilder); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java index 1773307d56c9..07da5a061012 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java @@ -283,7 +283,7 @@ public void testSelectSchemaKStream() { // When: final SchemaKTable projectedSchemaKStream = initialSchemaKTable.select( - projectNode.getProjectSelectExpressions(), + projectNode.getSelectExpressions(), childContextStacker, queryBuilder ); @@ -308,7 +308,7 @@ public void shouldBuildStepForSelect() { // When: final SchemaKTable projectedSchemaKStream = initialSchemaKTable.select( - projectNode.getProjectSelectExpressions(), + projectNode.getSelectExpressions(), childContextStacker, queryBuilder ); @@ -320,7 +320,7 @@ public void shouldBuildStepForSelect() { ExecutionStepFactory.tableMapValues( childContextStacker, initialSchemaKTable.getSourceTableStep(), - projectNode.getProjectSelectExpressions(), + projectNode.getSelectExpressions(), queryBuilder ) ) @@ -337,7 +337,7 @@ public void testSelectWithExpression() { // When: final SchemaKTable projectedSchemaKStream = initialSchemaKTable.select( - projectNode.getProjectSelectExpressions(), + projectNode.getSelectExpressions(), childContextStacker, queryBuilder ); @@ -846,7 +846,7 @@ private List givenInitialKTableOf(final String selectQuery) { ); final ProjectNode projectNode = (ProjectNode) logicalPlan.getSources().get(0); - return projectNode.getProjectSelectExpressions(); + return projectNode.getSelectExpressions(); } private PlanNode buildLogicalPlan(final String query) { diff --git a/ksql-engine/src/test/java/io/confluent/ksql/testutils/AnalysisTestUtil.java b/ksql-engine/src/test/java/io/confluent/ksql/testutils/AnalysisTestUtil.java index 9ff80889d9b3..42869c37f8b5 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/testutils/AnalysisTestUtil.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/testutils/AnalysisTestUtil.java @@ -54,7 +54,7 @@ public static OutputNode buildLogicalPlan( final LogicalPlanner logicalPlanner = new LogicalPlanner( ksqlConfig, analyzer.analysis, - analyzer.aggregateAnalys(), + analyzer.aggregateAnalysis(), metaStore); return logicalPlanner.buildPlan(); @@ -89,7 +89,7 @@ private static Statement parseStatement( return statements.get(0).getStatement(); } - AggregateAnalysisResult aggregateAnalys() { + AggregateAnalysisResult aggregateAnalysis() { return queryAnalyzer.analyzeAggregate(query, analysis); } } diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/function/UdafUtil.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/function/UdafUtil.java index f7828a754701..7bb287365c0c 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/function/UdafUtil.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/function/UdafUtil.java @@ -33,6 +33,7 @@ import org.apache.kafka.connect.data.Schema; public final class UdafUtil { + private UdafUtil() { } diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/function/UdtfUtil.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/function/UdtfUtil.java new file mode 100644 index 000000000000..cc50a91c9b80 --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/function/UdtfUtil.java @@ -0,0 +1,47 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.function; + +import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.expression.tree.FunctionCall; +import io.confluent.ksql.execution.util.ExpressionTypeManager; +import io.confluent.ksql.function.FunctionRegistry; +import io.confluent.ksql.function.KsqlTableFunction; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import java.util.List; +import org.apache.kafka.connect.data.Schema; + +public final class UdtfUtil { + + private UdtfUtil() { + } + + @SuppressWarnings("deprecation") // Need to migrate away from Connect Schema use. + public static KsqlTableFunction resolveTableFunction( + final FunctionRegistry functionRegistry, + final FunctionCall functionCall, + final LogicalSchema schema + ) { + final ExpressionTypeManager expressionTypeManager = + new ExpressionTypeManager(schema, functionRegistry); + final List functionArgs = functionCall.getArguments(); + final Schema expressionType = expressionTypeManager.getExpressionSchema(functionArgs.get(0)); + return functionRegistry.getTableFunction( + functionCall.getName().name(), + expressionType + ); + } +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/function/udtf/KudtfFlatMapper.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/function/udtf/KudtfFlatMapper.java new file mode 100644 index 000000000000..fdfa39cda1ea --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/function/udtf/KudtfFlatMapper.java @@ -0,0 +1,48 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.function.udtf; + +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.GenericRow; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import org.apache.kafka.streams.kstream.ValueMapper; + +/** + * Implements the actual flat-mapping logic - this is called by Kafka Streams + */ +@Immutable +public class KudtfFlatMapper implements ValueMapper> { + + private final TableFunctionApplier functionHolder; + + public KudtfFlatMapper(final TableFunctionApplier functionHolder) { + this.functionHolder = Objects.requireNonNull(functionHolder); + } + + @Override + public Iterable apply(final GenericRow row) { + final List exploded = functionHolder.apply(row); + final List rows = new ArrayList<>(exploded.size()); + for (Object val : exploded) { + final List arrayList = new ArrayList<>(row.getColumns()); + arrayList.add(val); + rows.add(new GenericRow(arrayList)); + } + return rows; + } +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/function/udtf/TableFunctionApplier.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/function/udtf/TableFunctionApplier.java new file mode 100644 index 000000000000..30e78afa981c --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/function/udtf/TableFunctionApplier.java @@ -0,0 +1,41 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.function.udtf; + +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.function.KsqlTableFunction; +import java.util.List; +import java.util.Objects; + +/** + * Applies a table function on a row to get a list of values + */ +@Immutable +public class TableFunctionApplier { + private final KsqlTableFunction tableFunction; + private final int argColumnIndex; + + public TableFunctionApplier(final KsqlTableFunction tableFunction, final int argColumnIndex) { + this.tableFunction = Objects.requireNonNull(tableFunction); + this.argColumnIndex = argColumnIndex; + } + + @SuppressWarnings("unchecked") + List apply(final GenericRow row) { + final List unexplodedValue = row.getColumnValue(argColumnIndex); + return tableFunction.flatMap(unexplodedValue); + } +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/PlanBuilder.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/PlanBuilder.java index bdb5c91d0316..dda8e39f1d9e 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/PlanBuilder.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/PlanBuilder.java @@ -38,6 +38,8 @@ public interface PlanBuilder { KStreamHolder visitStreamMapValues(StreamMapValues streamMapValues); + KStreamHolder visitFlatMap(StreamFlatMap streamFlatMap); + KStreamHolder visitStreamSelectKey(StreamSelectKey streamSelectKey); KStreamHolder visitStreamSink(StreamSink streamSink); diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamFlatMap.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamFlatMap.java new file mode 100644 index 000000000000..94a5a8816e2e --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamFlatMap.java @@ -0,0 +1,63 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.execution.function.udtf.TableFunctionApplier; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +@Immutable +public class StreamFlatMap implements ExecutionStep> { + + private final ExecutionStepProperties properties; + private final ExecutionStep> source; + private final TableFunctionApplier functionHolder; + + public StreamFlatMap( + final ExecutionStepProperties properties, + final ExecutionStep> source, + final TableFunctionApplier functionHolder + ) { + this.properties = Objects.requireNonNull(properties, "properties"); + this.source = Objects.requireNonNull(source, "source"); + this.functionHolder = Objects.requireNonNull(functionHolder); + } + + @Override + public ExecutionStepProperties getProperties() { + return properties; + } + + @Override + public List> getSources() { + return Collections.singletonList(source); + } + + @Override + public KStreamHolder build(final PlanBuilder builder) { + return builder.visitFlatMap(this); + } + + public TableFunctionApplier getFunctionHolder() { + return functionHolder; + } + + public ExecutionStep> getSource() { + return source; + } + +} diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java index 65f5b819aaa5..ff621b15fb2c 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java @@ -52,6 +52,7 @@ import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.KsqlAggregateFunction; import io.confluent.ksql.function.KsqlFunctionException; +import io.confluent.ksql.function.KsqlTableFunction; import io.confluent.ksql.function.UdfFactory; import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.schema.ksql.LogicalSchema; @@ -374,6 +375,18 @@ public Void visitFunctionCall( return null; } + if (functionRegistry.isTableFunction(node.getName().name())) { + final Schema schema = node.getArguments().isEmpty() + ? FunctionRegistry.DEFAULT_FUNCTION_ARG_SCHEMA + : getExpressionSchema(node.getArguments().get(0)); + + final KsqlTableFunction tableFunction = functionRegistry + .getTableFunction(node.getName().name(), schema); + + expressionTypeContext.setSchema(tableFunction.getReturnType()); + return null; + } + if (node.getName().equals(FetchFieldFromStruct.FUNCTION_NAME)) { process(node.getArguments().get(0), expressionTypeContext); final Schema firstArgSchema = expressionTypeContext.getSchema(); diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/explode.json b/ksql-functional-tests/src/test/resources/query-validation-tests/explode.json new file mode 100644 index 000000000000..0d3d97847eaf --- /dev/null +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/explode.json @@ -0,0 +1,79 @@ +{ + "comments": [ + "Tests for the EXPLODE table function" + ], + "tests": [ + { + "name": "explode array with values", + "statements": [ + "CREATE STREAM TEST (MY_ARR ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT EXPLODE(MY_ARR) VAL FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"MY_ARR": [1, 2]}}, + {"topic": "test_topic", "key": 1, "value": {"MY_ARR": [3, 4]}}, + {"topic": "test_topic", "key": 2, "value": {"MY_ARR": [5]}}, + {"topic": "test_topic", "key": 3, "value": {"MY_ARR": []}}, + {"topic": "test_topic", "key": 4, "value": {"MY_ARR": [6]}}, + {"topic": "test_topic", "key": 5, "value": {"MY_ARR": null}}, + {"topic": "test_topic", "key": 6, "value": {"MY_ARR": [7]}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "0", "value": {"VAL": 1}}, + {"topic": "OUTPUT", "key": "0", "value": {"VAL": 2}}, + {"topic": "OUTPUT", "key": "1", "value": {"VAL": 3}}, + {"topic": "OUTPUT", "key": "1", "value": {"VAL": 4}}, + {"topic": "OUTPUT", "key": "2", "value": {"VAL": 5}}, + {"topic": "OUTPUT", "key": "4", "value": {"VAL": 6}}, + {"topic": "OUTPUT", "key": "6", "value": {"VAL": 7}} + ] + }, + { + "name": "udfs with table functions and no aliases, verifies intermediate generated column names don't clash with aliases", + "statements": [ + "CREATE STREAM TEST (MY_ARR ARRAY, F1 BIGINT, F2 BIGINT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT ABS(F1), EXPLODE(MY_ARR), ABS(F2) FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"F1": 1, "F2": 2, "MY_ARR": [1, 2]}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 1.0, "KSQL_COL_1": 1, "KSQL_COL_2": 2.0}}, + {"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 1.0, "KSQL_COL_1": 2, "KSQL_COL_2": 2.0}} + ] + }, + { + "name": "explode shouldn't accept map", + "statements": [ + "CREATE STREAM TEST (MY_MAP MAP) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT EXPLODE(MY_MAP) FROM TEST;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Unsupported argument type for EXPLODE Schema{MAP}" + } + }, + { + "name": "shouldn't handle more than one table function", + "statements": [ + "CREATE STREAM TEST (MY_ARR ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT EXPLODE(MY_ARR), EXPLODE(MY_ARR) VAL FROM TEST;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlException", + "message": "Only one table function per query currently is supported" + } + }, + { + "name": "shouldn't be able to have table functions with aggregation", + "statements": [ + "CREATE STREAM TEST (VAL INT, MY_MAP MAP) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE OUTPUT AS SELECT VAL, EXPLODE(MY_MAP), COUNT(*) FROM TEST GROUP BY VAL;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Table functions cannot be used with aggregations." + } + } + ] +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/table-functions.json b/ksql-functional-tests/src/test/resources/query-validation-tests/table-functions.json new file mode 100644 index 000000000000..f7346914bc4f --- /dev/null +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/table-functions.json @@ -0,0 +1,102 @@ +{ + "comments": [ + "Tests applicable to table functions in general - we use EXPLODE here but we could use any table table function" + ], + "tests": [ + { + "name": "table function as last select", + "statements": [ + "CREATE STREAM TEST (ID BIGINT, MY_ARR ARRAY) WITH (kafka_topic='test_topic', KEY='ID', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT ID, EXPLODE(MY_ARR) VAL FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "MY_ARR": [1, 2]}}, + {"topic": "test_topic", "key": 1, "value": {"ID": 1, "MY_ARR": [3, 4]}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "0", "value": {"ID": 0, "VAL": 1}}, + {"topic": "OUTPUT", "key": "0", "value": {"ID": 0, "VAL": 2}}, + {"topic": "OUTPUT", "key": "1", "value": {"ID": 1, "VAL": 3}}, + {"topic": "OUTPUT", "key": "1", "value": {"ID": 1, "VAL": 4}} + ] + }, + { + "name": "table function as first select", + "statements": [ + "CREATE STREAM TEST (ID BIGINT, MY_ARR ARRAY) WITH (kafka_topic='test_topic', KEY='ID', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT EXPLODE(MY_ARR) AS VAL, ID FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "MY_ARR": [1, 2]}}, + {"topic": "test_topic", "key": 1, "value": {"ID": 1, "MY_ARR": [3, 4]}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "0", "value": {"VAL": 1, "ID": 0}}, + {"topic": "OUTPUT", "key": "0", "value": {"VAL": 2, "ID": 0}}, + {"topic": "OUTPUT", "key": "1", "value": {"VAL": 3, "ID": 1}}, + {"topic": "OUTPUT", "key": "1", "value": {"VAL": 4, "ID": 1}} + ] + }, + { + "name": "table function with non selected columns", + "statements": [ + "CREATE STREAM TEST (FOO BIGINT, ID BIGINT, MY_ARR ARRAY, BAR BIGINT) WITH (kafka_topic='test_topic', KEY='ID', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT EXPLODE(MY_ARR) AS VAL, ID FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "MY_ARR": [1, 2]}}, + {"topic": "test_topic", "key": 1, "value": {"ID": 1, "MY_ARR": [3, 4]}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "0", "value": {"VAL": 1, "ID": 0}}, + {"topic": "OUTPUT", "key": "0", "value": {"VAL": 2, "ID": 0}}, + {"topic": "OUTPUT", "key": "1", "value": {"VAL": 3, "ID": 1}}, + {"topic": "OUTPUT", "key": "1", "value": {"VAL": 4, "ID": 1}} + ] + }, + { + "name": "table function with no other selected columns", + "statements": [ + "CREATE STREAM TEST (FOO BIGINT, ID BIGINT, MY_ARR ARRAY, BAR BIGINT) WITH (kafka_topic='test_topic', KEY='ID', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT EXPLODE(MY_ARR) AS VAL FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "MY_ARR": [1, 2]}}, + {"topic": "test_topic", "key": 1, "value": {"ID": 1, "MY_ARR": [3, 4]}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "0", "value": {"VAL": 1}}, + {"topic": "OUTPUT", "key": "0", "value": {"VAL": 2}}, + {"topic": "OUTPUT", "key": "1", "value": {"VAL": 3}}, + {"topic": "OUTPUT", "key": "1", "value": {"VAL": 4}} + ] + }, + { + "name": "table function with no alias", + "statements": [ + "CREATE STREAM TEST (FOO BIGINT, ID BIGINT, MY_ARR ARRAY, BAR BIGINT) WITH (kafka_topic='test_topic', KEY='ID', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT EXPLODE(MY_ARR) FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "MY_ARR": [1, 2]}}, + {"topic": "test_topic", "key": 1, "value": {"ID": 1, "MY_ARR": [3, 4]}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 1}}, + {"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 2}}, + {"topic": "OUTPUT", "key": "1", "value": {"KSQL_COL_0": 3}}, + {"topic": "OUTPUT", "key": "1", "value": {"KSQL_COL_0": 4}} + ] + }, + { + "name": "table function shouldn't be in FROM clause", + "statements": [ + "CREATE STREAM TEST (ID BIGINT, MY_ARR ARRAY) WITH (kafka_topic='test_topic', KEY='ID', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT * FROM EXPLODE(MY_ARR);" + ], + "expectedException": { + "type": "io.confluent.ksql.parser.exception.ParseFailedException" + } + } + ] +} \ No newline at end of file diff --git a/ksql-metastore/src/main/java/io/confluent/ksql/metastore/MetaStoreImpl.java b/ksql-metastore/src/main/java/io/confluent/ksql/metastore/MetaStoreImpl.java index 5a720b6ff76d..b3ff412310ed 100644 --- a/ksql-metastore/src/main/java/io/confluent/ksql/metastore/MetaStoreImpl.java +++ b/ksql-metastore/src/main/java/io/confluent/ksql/metastore/MetaStoreImpl.java @@ -19,6 +19,7 @@ import io.confluent.ksql.function.AggregateFunctionInitArguments; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.KsqlAggregateFunction; +import io.confluent.ksql.function.KsqlTableFunction; import io.confluent.ksql.function.UdfFactory; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.name.SourceName; @@ -210,6 +211,10 @@ public boolean isAggregate(final String functionName) { return functionRegistry.isAggregate(functionName); } + public boolean isTableFunction(final String functionName) { + return functionRegistry.isTableFunction(functionName); + } + public KsqlAggregateFunction getAggregateFunction( final String functionName, final Schema argumentType, @@ -218,6 +223,13 @@ public boolean isAggregate(final String functionName) { return functionRegistry.getAggregateFunction(functionName, argumentType, initArgs); } + public KsqlTableFunction getTableFunction( + final String functionName, + final Schema argumentType + ) { + return functionRegistry.getTableFunction(functionName, argumentType); + } + @Override public List listFunctions() { return functionRegistry.listFunctions(); diff --git a/ksql-metastore/src/test/java/io/confluent/ksql/util/MetaStoreFixture.java b/ksql-metastore/src/test/java/io/confluent/ksql/util/MetaStoreFixture.java index 869d0b3f0e98..6cbf845089a2 100644 --- a/ksql-metastore/src/test/java/io/confluent/ksql/util/MetaStoreFixture.java +++ b/ksql-metastore/src/test/java/io/confluent/ksql/util/MetaStoreFixture.java @@ -268,6 +268,35 @@ public static MutableMetaStore getNewMetaStore( metaStore.putSource(ksqlStream4); + + + final LogicalSchema sensorReadingsSchema = LogicalSchema.builder() + .valueColumn(ColumnName.of("ID"), SqlTypes.BIGINT) + .valueColumn(ColumnName.of("SENSOR_NAME"), SqlTypes.STRING) + .valueColumn(ColumnName.of("ARR1"), SqlTypes.array(SqlTypes.BIGINT)) + .valueColumn(ColumnName.of("ARR2"), SqlTypes.array(SqlTypes.STRING)) + .build(); + + final KsqlTopic ksqlTopicSensorReadings = new KsqlTopic( + "sensor_readings_topic", + keyFormat, + valueFormat, + false + ); + + final KsqlStream ksqlStreamSensorReadings = new KsqlStream<>( + "sqlexpression", + SourceName.of("SENSOR_READINGS"), + sensorReadingsSchema, + SerdeOption.none(), + KeyField.of(ColumnRef.withoutSource(ColumnName.of("ID")), + sensorReadingsSchema.findValueColumn(ColumnRef.withoutSource(ColumnName.of("ID"))).get()), + timestampExtractionPolicy, + ksqlTopicSensorReadings + ); + + metaStore.putSource(ksqlStreamSensorReadings); + return metaStore; } } diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index 9854703cc4e0..e1c1dfde1395 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -595,7 +595,8 @@ public Node visitSelectSingle(final SqlBaseParser.SelectSingleContext context) { dereferenceExpressionString.substring( dereferenceExpressionString.indexOf(KsqlConstants.DOT) + 1))); } else { - alias = ColumnName.of("KSQL_COL_" + selectItemIndex); + + alias = ColumnName.generatedColumnAlias(selectItemIndex); } } selectItemIndex++; diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java index 7250c02f8790..452638160180 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java @@ -19,6 +19,7 @@ import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.execution.expression.tree.FunctionCall; +import io.confluent.ksql.execution.function.udtf.TableFunctionApplier; import io.confluent.ksql.execution.plan.DefaultExecutionStepProperties; import io.confluent.ksql.execution.plan.ExecutionStep; import io.confluent.ksql.execution.plan.Formats; @@ -29,6 +30,7 @@ import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.execution.plan.StreamAggregate; import io.confluent.ksql.execution.plan.StreamFilter; +import io.confluent.ksql.execution.plan.StreamFlatMap; import io.confluent.ksql.execution.plan.StreamGroupBy; import io.confluent.ksql.execution.plan.StreamGroupByKey; import io.confluent.ksql.execution.plan.StreamMapValues; @@ -61,6 +63,7 @@ // CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling public final class ExecutionStepFactory { + // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling private ExecutionStepFactory() { } @@ -140,6 +143,20 @@ public static StreamSink streamSink( ); } + public static StreamFlatMap streamFlatMap( + final QueryContext.Stacker stacker, + final ExecutionStep> source, + final LogicalSchema resultSchema, + final TableFunctionApplier functionHolder + ) { + final QueryContext queryContext = stacker.getQueryContext(); + return new StreamFlatMap<>( + new DefaultExecutionStepProperties(resultSchema, queryContext), + source, + functionHolder + ); + } + public static StreamFilter streamFilter( final QueryContext.Stacker stacker, final ExecutionStep> source, diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java index 351a224093a1..5456f0d60284 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java @@ -22,6 +22,7 @@ import io.confluent.ksql.execution.plan.PlanBuilder; import io.confluent.ksql.execution.plan.StreamAggregate; import io.confluent.ksql.execution.plan.StreamFilter; +import io.confluent.ksql.execution.plan.StreamFlatMap; import io.confluent.ksql.execution.plan.StreamGroupBy; import io.confluent.ksql.execution.plan.StreamGroupByKey; import io.confluent.ksql.execution.plan.StreamMapValues; @@ -126,6 +127,12 @@ public KStreamHolder visitStreamMapValues( return StreamMapValuesBuilder.build(source, streamMapValues, queryBuilder); } + @Override + public KStreamHolder visitFlatMap(final StreamFlatMap streamFlatMap) { + final KStreamHolder source = streamFlatMap.getSource().build(this); + return StreamFlatMapBuilder.build(source, streamFlatMap); + } + @Override public KStreamHolder visitStreamSelectKey( final StreamSelectKey streamSelectKey) { diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamFlatMapBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamFlatMapBuilder.java new file mode 100644 index 000000000000..e740f353fc8a --- /dev/null +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamFlatMapBuilder.java @@ -0,0 +1,34 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import io.confluent.ksql.execution.function.udtf.KudtfFlatMapper; +import io.confluent.ksql.execution.plan.KStreamHolder; +import io.confluent.ksql.execution.plan.StreamFlatMap; + +public final class StreamFlatMapBuilder { + + private StreamFlatMapBuilder() { + } + + public static KStreamHolder build( + final KStreamHolder stream, + final StreamFlatMap step) { + return stream.withStream(stream.getStream().flatMapValues( + new KudtfFlatMapper(step.getFunctionHolder()))); + } + +} \ No newline at end of file