diff --git a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
index f619b1738ffa..4f6283130aa7 100644
--- a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
+++ b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
@@ -853,7 +853,7 @@ public void shouldDescribeOverloadedScalarFunction() {
public void shouldDescribeAggregateFunction() {
final String expectedSummary =
"Name : TOPK\n" +
- "Author : confluent\n" +
+ "Author : Confluent\n" +
"Type : aggregate\n" +
"Jar : internal\n" +
"Variations : \n";
diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/AggregateFunctionFactory.java b/ksql-common/src/main/java/io/confluent/ksql/function/AggregateFunctionFactory.java
index 5bea37bbdfd1..c6b9bffaab23 100644
--- a/ksql-common/src/main/java/io/confluent/ksql/function/AggregateFunctionFactory.java
+++ b/ksql-common/src/main/java/io/confluent/ksql/function/AggregateFunctionFactory.java
@@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.function.udf.UdfMetadata;
import io.confluent.ksql.util.DecimalUtil;
+import io.confluent.ksql.util.KsqlConstants;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
@@ -38,7 +39,14 @@ public abstract class AggregateFunctionFactory {
.build();
public AggregateFunctionFactory(final String functionName) {
- this(new UdfMetadata(functionName, "", "confluent", "", KsqlFunction.INTERNAL_PATH, false));
+ this(new UdfMetadata(
+ functionName,
+ "",
+ KsqlConstants.CONFLUENT_AUTHOR,
+ "",
+ KsqlFunction.INTERNAL_PATH,
+ false
+ ));
}
public AggregateFunctionFactory(final UdfMetadata metadata) {
diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/FunctionRegistry.java b/ksql-common/src/main/java/io/confluent/ksql/function/FunctionRegistry.java
index 09b6126614de..cf987f1f0160 100644
--- a/ksql-common/src/main/java/io/confluent/ksql/function/FunctionRegistry.java
+++ b/ksql-common/src/main/java/io/confluent/ksql/function/FunctionRegistry.java
@@ -33,6 +33,16 @@ public interface FunctionRegistry {
*/
boolean isAggregate(String functionName);
+ /**
+ * Test if the supplied {@code functionName} is a table function.
+ *
+ *
Note: unknown functions result in {@code false} return value.
+ *
+ * @param functionName the name of the function to test
+ * @return {@code true} if it is a table function, {@code false} otherwise.
+ */
+ boolean isTableFunction(String functionName);
+
/**
* Get the factory for a UDF.
*
@@ -72,6 +82,8 @@ public interface FunctionRegistry {
KsqlAggregateFunction, ?, ?> getAggregateFunction(String functionName, Schema argumentType,
AggregateFunctionInitArguments initArgs);
+ KsqlTableFunction, ?> getTableFunction(String functionName, Schema argumentType);
+
/**
* @return all UDF factories.
*/
diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/FunctionSignature.java b/ksql-common/src/main/java/io/confluent/ksql/function/FunctionSignature.java
index 8e9565999416..d24f06775d8b 100644
--- a/ksql-common/src/main/java/io/confluent/ksql/function/FunctionSignature.java
+++ b/ksql-common/src/main/java/io/confluent/ksql/function/FunctionSignature.java
@@ -38,7 +38,9 @@ public interface FunctionSignature {
* @return whether or not to consider the final argument in
* {@link #getArguments()} as variadic
*/
- boolean isVariadic();
+ default boolean isVariadic() {
+ return false;
+ }
}
diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/KsqlAggregateFunction.java b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlAggregateFunction.java
index 2999e70c0652..fdaf98c55a0f 100644
--- a/ksql-common/src/main/java/io/confluent/ksql/function/KsqlAggregateFunction.java
+++ b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlAggregateFunction.java
@@ -51,9 +51,4 @@ public interface KsqlAggregateFunction extends FunctionSignature {
Function getResultMapper();
String getDescription();
-
- @Override
- default boolean isVariadic() {
- return false;
- }
}
diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/KsqlTableFunction.java b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlTableFunction.java
new file mode 100644
index 000000000000..10c8d2c4cdb5
--- /dev/null
+++ b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlTableFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.function;
+
+import io.confluent.ksql.schema.ksql.types.SqlType;
+import java.util.List;
+import org.apache.kafka.connect.data.Schema;
+
+public interface KsqlTableFunction extends FunctionSignature {
+
+ Schema getReturnType();
+
+ SqlType returnType();
+
+ List flatMap(I input);
+
+ String getDescription();
+}
diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/MutableFunctionRegistry.java b/ksql-common/src/main/java/io/confluent/ksql/function/MutableFunctionRegistry.java
index 3c39d12dfb15..944cc97a0378 100644
--- a/ksql-common/src/main/java/io/confluent/ksql/function/MutableFunctionRegistry.java
+++ b/ksql-common/src/main/java/io/confluent/ksql/function/MutableFunctionRegistry.java
@@ -51,4 +51,6 @@ public interface MutableFunctionRegistry extends FunctionRegistry {
* @throws KsqlException if a function, (of any type), with the same name exists.
*/
void addAggregateFunctionFactory(AggregateFunctionFactory aggregateFunctionFactory);
+
+ void addTableFunctionFactory(TableFunctionFactory tableFunctionFactory);
}
diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/TableFunctionFactory.java b/ksql-common/src/main/java/io/confluent/ksql/function/TableFunctionFactory.java
new file mode 100644
index 000000000000..825b7bf5b95b
--- /dev/null
+++ b/ksql-common/src/main/java/io/confluent/ksql/function/TableFunctionFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.function;
+
+import io.confluent.ksql.function.udf.UdfMetadata;
+import java.util.List;
+import java.util.Objects;
+import org.apache.kafka.connect.data.Schema;
+
+
+public abstract class TableFunctionFactory {
+
+ private final UdfMetadata metadata;
+
+ public TableFunctionFactory(final UdfMetadata metadata) {
+ this.metadata = Objects.requireNonNull(metadata, "metadata can't be null");
+ }
+
+ public abstract KsqlTableFunction, ?> createTableFunction(List argTypeList);
+
+ protected abstract List> supportedArgs();
+
+ public String getName() {
+ return metadata.getName();
+ }
+
+ public String getDescription() {
+ return metadata.getDescription();
+ }
+
+ public String getPath() {
+ return metadata.getPath();
+ }
+
+ public String getAuthor() {
+ return metadata.getAuthor();
+ }
+
+ public String getVersion() {
+ return metadata.getVersion();
+ }
+
+ public boolean isInternal() {
+ return metadata.isInternal();
+ }
+}
diff --git a/ksql-common/src/main/java/io/confluent/ksql/name/ColumnName.java b/ksql-common/src/main/java/io/confluent/ksql/name/ColumnName.java
index 264b0fbab4d1..606b9a97f396 100644
--- a/ksql-common/src/main/java/io/confluent/ksql/name/ColumnName.java
+++ b/ksql-common/src/main/java/io/confluent/ksql/name/ColumnName.java
@@ -24,11 +24,29 @@
public final class ColumnName extends Name {
private static final String AGGREGATE_COLUMN_PREFIX = "KSQL_AGG_VARIABLE_";
+ private static final String GENERATED_ALIAS_PREFIX = "KSQL_COL_";
+ private static final String SYNTHESISED_COLUMN_PREFIX = "KSQL_SYNTH_";
- public static ColumnName aggregate(final int idx) {
+ public static ColumnName aggregateColumn(final int idx) {
return of(AGGREGATE_COLUMN_PREFIX + idx);
}
+ /**
+ * Where the user hasn't specified an alias for an expression in a SELECT we generate them
+ * using this method. This value is exposed to the user in the output schema
+ */
+ public static ColumnName generatedColumnAlias(final int idx) {
+ return ColumnName.of(GENERATED_ALIAS_PREFIX + idx);
+ }
+
+ /**
+ * Used to generate a column name in an intermediate schema, e.g. for a column to hold
+ * values of a table function. These are never exposed to the user
+ */
+ public static ColumnName synthesisedSchemaColumn(final int idx) {
+ return ColumnName.of(SYNTHESISED_COLUMN_PREFIX + idx);
+ }
+
public static ColumnName of(final String name) {
return new ColumnName(name);
}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/AggregateExpressionRewriter.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/AggregateExpressionRewriter.java
similarity index 95%
rename from ksql-engine/src/main/java/io/confluent/ksql/util/AggregateExpressionRewriter.java
rename to ksql-engine/src/main/java/io/confluent/ksql/analyzer/AggregateExpressionRewriter.java
index a401e3635215..0a05b26a66ce 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/util/AggregateExpressionRewriter.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/AggregateExpressionRewriter.java
@@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/
-package io.confluent.ksql.util;
+package io.confluent.ksql.analyzer;
import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter;
import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter.Context;
@@ -45,7 +45,7 @@ public Optional visitFunctionCall(
final ExpressionTreeRewriter.Context context) {
final String functionName = node.getName().name();
if (functionRegistry.isAggregate(functionName)) {
- final ColumnName aggVarName = ColumnName.aggregate(aggVariableIndex);
+ final ColumnName aggVarName = ColumnName.aggregateColumn(aggVariableIndex);
aggVariableIndex++;
return Optional.of(
new ColumnReferenceExp(node.getLocation(), ColumnRef.withoutSource(aggVarName)));
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java
index dcdfcc962abb..0625c12ec7f5 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java
@@ -23,6 +23,7 @@
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.Expression;
+import io.confluent.ksql.execution.expression.tree.FunctionCall;
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.KsqlStream;
@@ -45,6 +46,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
@@ -66,6 +68,7 @@ public class Analysis {
private Optional havingExpression = Optional.empty();
private OptionalInt limitClause = OptionalInt.empty();
private CreateSourceAsProperties withProperties = CreateSourceAsProperties.none();
+ private final List tableFunctions = new ArrayList<>();
public Analysis(final ResultMaterialization resultMaterialization) {
this.resultMaterialization = requireNonNull(resultMaterialization, "resultMaterialization");
@@ -206,6 +209,14 @@ public CreateSourceAsProperties getProperties() {
return withProperties;
}
+ void addTableFunction(final FunctionCall functionCall) {
+ this.tableFunctions.add(Objects.requireNonNull(functionCall));
+ }
+
+ public List getTableFunctions() {
+ return tableFunctions;
+ }
+
@Immutable
public static final class Into {
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java
index 746f205c74f0..be6694642fcc 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java
@@ -26,6 +26,7 @@
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
import io.confluent.ksql.execution.expression.tree.Expression;
+import io.confluent.ksql.execution.expression.tree.FunctionCall;
import io.confluent.ksql.execution.expression.tree.TraversalExpressionVisitor;
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.execution.windows.KsqlWindowExpression;
@@ -516,6 +517,7 @@ protected AstNode visitSelect(final Select node, final Void context) {
} else if (selectItem instanceof SingleColumn) {
final SingleColumn column = (SingleColumn) selectItem;
addSelectItem(column.getExpression(), column.getAlias());
+ visitTableFunctions(column.getExpression());
} else {
throw new IllegalArgumentException(
"Unsupported SelectItem type: " + selectItem.getClass().getName());
@@ -626,6 +628,41 @@ public Void visitColumnReference(
analysis.addSelectItem(exp, columnName);
analysis.addSelectColumnRefs(columnRefs);
}
+
+ private void visitTableFunctions(final Expression expression) {
+ final TableFunctionVisitor visitor = new TableFunctionVisitor();
+ visitor.process(expression, null);
+ }
+
+ private final class TableFunctionVisitor extends TraversalExpressionVisitor {
+
+ private Optional tableFunctionName = Optional.empty();
+
+ @Override
+ public Void visitFunctionCall(final FunctionCall functionCall, final Void context) {
+ final String functionName = functionCall.getName().name();
+ final boolean isTableFunction = metaStore.isTableFunction(functionName);
+
+ if (isTableFunction) {
+ if (tableFunctionName.isPresent()) {
+ throw new KsqlException("Table functions cannot be nested: "
+ + tableFunctionName.get() + "(" + functionName + "())");
+ }
+
+ tableFunctionName = Optional.of(functionName);
+
+ analysis.addTableFunction(functionCall);
+ }
+
+ super.visitFunctionCall(functionCall, context);
+
+ if (isTableFunction) {
+ tableFunctionName = Optional.empty();
+ }
+
+ return null;
+ }
+ }
}
@FunctionalInterface
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java
index 62cc1470c45d..8753f4e43389 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java
@@ -31,7 +31,6 @@
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.serde.SerdeOption;
-import io.confluent.ksql.util.AggregateExpressionRewriter;
import io.confluent.ksql.util.KsqlException;
import java.util.List;
import java.util.Map;
@@ -175,6 +174,10 @@ private static void enforceAggregateRules(
return;
}
+ if (!analysis.getTableFunctions().isEmpty()) {
+ throw new KsqlException("Table functions cannot be used with aggregations.");
+ }
+
if (aggregateAnalysis.getAggregateFunctions().isEmpty()) {
throw new KsqlException(
"GROUP BY requires columns using aggregate functions in SELECT clause.");
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/QueryEngine.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/QueryEngine.java
index 1cb4c63afd7a..449fd6617d38 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/engine/QueryEngine.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/QueryEngine.java
@@ -59,10 +59,30 @@ class QueryEngine {
this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext");
this.processingLogContext = Objects.requireNonNull(
processingLogContext,
- "processingLogContext");
+ "processingLogContext"
+ );
this.queryIdGenerator = Objects.requireNonNull(queryIdGenerator, "queryIdGenerator");
}
+ static OutputNode buildQueryLogicalPlan(
+ final Query query,
+ final Optional sink,
+ final MetaStore metaStore,
+ final KsqlConfig config
+ ) {
+ final String outputPrefix = config.getString(KsqlConfig.KSQL_OUTPUT_TOPIC_NAME_PREFIX_CONFIG);
+
+ final Set defaultSerdeOptions = SerdeOptions.buildDefaults(config);
+
+ final QueryAnalyzer queryAnalyzer =
+ new QueryAnalyzer(metaStore, outputPrefix, defaultSerdeOptions);
+
+ final Analysis analysis = queryAnalyzer.analyze(query, sink);
+ final AggregateAnalysisResult aggAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);
+
+ return new LogicalPlanner(config, analysis, aggAnalysis, metaStore).buildPlan();
+ }
+
PhysicalPlan> buildPhysicalPlan(
final LogicalPlanNode logicalPlanNode,
final KsqlConfig ksqlConfig,
@@ -84,23 +104,4 @@ PhysicalPlan> buildPhysicalPlan(
return physicalPlanBuilder.buildPhysicalPlan(logicalPlanNode);
}
-
- static OutputNode buildQueryLogicalPlan(
- final Query query,
- final Optional sink,
- final MetaStore metaStore,
- final KsqlConfig config
- ) {
- final String outputPrefix = config.getString(KsqlConfig.KSQL_OUTPUT_TOPIC_NAME_PREFIX_CONFIG);
-
- final Set defaultSerdeOptions = SerdeOptions.buildDefaults(config);
-
- final QueryAnalyzer queryAnalyzer =
- new QueryAnalyzer(metaStore, outputPrefix, defaultSerdeOptions);
-
- final Analysis analysis = queryAnalyzer.analyze(query, sink);
- final AggregateAnalysisResult aggAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);
-
- return new LogicalPlanner(config, analysis, aggAnalysis, metaStore).buildPlan();
- }
}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/BaseTableFunction.java b/ksql-engine/src/main/java/io/confluent/ksql/function/BaseTableFunction.java
new file mode 100644
index 000000000000..c4e3194446f7
--- /dev/null
+++ b/ksql-engine/src/main/java/io/confluent/ksql/function/BaseTableFunction.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.function;
+
+import com.google.errorprone.annotations.Immutable;
+import io.confluent.ksql.name.FunctionName;
+import io.confluent.ksql.schema.ksql.SchemaConverters;
+import io.confluent.ksql.schema.ksql.SchemaConverters.ConnectToSqlTypeConverter;
+import io.confluent.ksql.schema.ksql.types.SqlType;
+import java.util.List;
+import java.util.Objects;
+import org.apache.kafka.connect.data.Schema;
+
+@Immutable
+public abstract class BaseTableFunction implements KsqlTableFunction {
+
+ private static final ConnectToSqlTypeConverter CONNECT_TO_SQL_CONVERTER
+ = SchemaConverters.connectToSqlConverter();
+
+ private final Schema outputSchema;
+ private final SqlType outputType;
+ private final List arguments;
+
+ protected final FunctionName functionName;
+ private final String description;
+
+ public BaseTableFunction(
+ final FunctionName functionName,
+ final Schema outputType,
+ final List arguments,
+ final String description
+ ) {
+ this.outputSchema = Objects.requireNonNull(outputType, "outputType");
+ this.outputType = CONNECT_TO_SQL_CONVERTER.toSqlType(outputType);
+ this.arguments = Objects.requireNonNull(arguments, "arguments");
+ this.functionName = Objects.requireNonNull(functionName, "functionName");
+ this.description = Objects.requireNonNull(description, "description");
+ }
+
+ public FunctionName getFunctionName() {
+ return functionName;
+ }
+
+ public Schema getReturnType() {
+ return outputSchema;
+ }
+
+ @Override
+ public SqlType returnType() {
+ return outputType;
+ }
+
+ public List getArguments() {
+ return arguments;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java b/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java
index 1fedc00cbee3..262764755ca8 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java
@@ -34,14 +34,16 @@
import io.confluent.ksql.function.udf.string.LenKudf;
import io.confluent.ksql.function.udf.string.TrimKudf;
import io.confluent.ksql.function.udf.string.UCaseKudf;
+import io.confluent.ksql.function.udtf.array.ExplodeFunctionFactory;
import io.confluent.ksql.name.FunctionName;
+import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -49,16 +51,16 @@
@ThreadSafe
public class InternalFunctionRegistry implements MutableFunctionRegistry {
- private final Object lock = new Object();
- private final Map udfs = new ConcurrentHashMap<>();
- private final Map udafs = new ConcurrentHashMap<>();
+ private final Map udfs = new HashMap<>();
+ private final Map udafs = new HashMap<>();
+ private final Map udtfs = new HashMap<>();
private final FunctionNameValidator functionNameValidator = new FunctionNameValidator();
public InternalFunctionRegistry() {
new BuiltInInitializer(this).init();
}
- public UdfFactory getUdfFactory(final String functionName) {
+ public synchronized UdfFactory getUdfFactory(final String functionName) {
final UdfFactory udfFactory = udfs.get(functionName.toUpperCase());
if (udfFactory == null) {
throw new KsqlException("Can't find any functions with the name '" + functionName + "'");
@@ -67,7 +69,7 @@ public UdfFactory getUdfFactory(final String functionName) {
}
@Override
- public void addFunction(final KsqlFunction ksqlFunction) {
+ public synchronized void addFunction(final KsqlFunction ksqlFunction) {
final UdfFactory udfFactory = udfs.get(ksqlFunction.getFunctionName().name().toUpperCase());
if (udfFactory == null) {
throw new KsqlException("Unknown function factory: " + ksqlFunction.getFunctionName());
@@ -76,34 +78,39 @@ public void addFunction(final KsqlFunction ksqlFunction) {
}
@Override
- public UdfFactory ensureFunctionFactory(final UdfFactory factory) {
+ public synchronized UdfFactory ensureFunctionFactory(final UdfFactory factory) {
validateFunctionName(factory.getName());
- synchronized (lock) {
- final String functionName = factory.getName().toUpperCase();
- if (udafs.containsKey(functionName)) {
- throw new KsqlException("UdfFactory already registered as aggregate: " + functionName);
- }
-
- final UdfFactory existing = udfs.putIfAbsent(functionName, factory);
- if (existing != null && !existing.matches(factory)) {
- throw new KsqlException("UdfFactory not compatible with existing factory."
- + " function: " + functionName
- + " existing: " + existing
- + ", factory: " + factory);
- }
-
- return existing == null ? factory : existing;
+ final String functionName = factory.getName().toUpperCase();
+ if (udafs.containsKey(functionName)) {
+ throw new KsqlException("UdfFactory already registered as aggregate: " + functionName);
}
+ if (udtfs.containsKey(functionName)) {
+ throw new KsqlException("UdfFactory already registered as table function: " + functionName);
+ }
+
+ final UdfFactory existing = udfs.putIfAbsent(functionName, factory);
+ if (existing != null && !existing.matches(factory)) {
+ throw new KsqlException("UdfFactory not compatible with existing factory."
+ + " function: " + functionName
+ + " existing: " + existing
+ + ", factory: " + factory);
+ }
+
+ return existing == null ? factory : existing;
}
@Override
- public boolean isAggregate(final String functionName) {
+ public synchronized boolean isAggregate(final String functionName) {
return udafs.containsKey(functionName.toUpperCase());
}
+ public synchronized boolean isTableFunction(final String functionName) {
+ return udtfs.containsKey(functionName.toUpperCase());
+ }
+
@Override
- public KsqlAggregateFunction getAggregateFunction(
+ public synchronized KsqlAggregateFunction getAggregateFunction(
final String functionName,
final Schema argumentType,
final AggregateFunctionInitArguments initArgs
@@ -112,34 +119,76 @@ public KsqlAggregateFunction getAggregateFunction(
if (udafFactory == null) {
throw new KsqlException("No aggregate function with name " + functionName + " exists!");
}
- return udafFactory.createAggregateFunction(Collections.singletonList(argumentType),
- initArgs);
+ return udafFactory.createAggregateFunction(
+ Collections.singletonList(argumentType),
+ initArgs
+ );
}
@Override
- public void addAggregateFunctionFactory(final AggregateFunctionFactory aggregateFunctionFactory) {
+ public synchronized KsqlTableFunction getTableFunction(
+ final String functionName,
+ final Schema argumentType
+ ) {
+ final TableFunctionFactory udtfFactory = udtfs.get(functionName.toUpperCase());
+ if (udtfFactory == null) {
+ throw new KsqlException("No table function with name " + functionName + " exists!");
+ }
+
+ return udtfFactory.createTableFunction(Collections.singletonList(argumentType));
+ }
+
+ @Override
+ public synchronized void addAggregateFunctionFactory(
+ final AggregateFunctionFactory aggregateFunctionFactory) {
final String functionName = aggregateFunctionFactory.getName().toUpperCase();
validateFunctionName(functionName);
- synchronized (lock) {
- if (udfs.containsKey(functionName)) {
- throw new KsqlException(
- "Aggregate function already registered as non-aggregate: " + functionName);
- }
+ if (udfs.containsKey(functionName)) {
+ throw new KsqlException(
+ "Aggregate function already registered as non-aggregate: " + functionName);
+ }
- if (udafs.putIfAbsent(functionName, aggregateFunctionFactory) != null) {
- throw new KsqlException("Aggregate function already registered: " + functionName);
- }
+ if (udtfs.containsKey(functionName)) {
+ throw new KsqlException(
+ "Aggregate function already registered as table function: " + functionName);
}
+
+ if (udafs.putIfAbsent(functionName, aggregateFunctionFactory) != null) {
+ throw new KsqlException("Aggregate function already registered: " + functionName);
+ }
+
+ }
+
+ @Override
+ public synchronized void addTableFunctionFactory(
+ final TableFunctionFactory tableFunctionFactory) {
+ final String functionName = tableFunctionFactory.getName().toUpperCase();
+ validateFunctionName(functionName);
+
+ if (udfs.containsKey(functionName)) {
+ throw new KsqlException(
+ "Table function already registered as non-aggregate: " + functionName);
+ }
+
+ if (udafs.containsKey(functionName)) {
+ throw new KsqlException(
+ "Table function already registered as aggregate: " + functionName);
+ }
+
+ if (udtfs.putIfAbsent(functionName, tableFunctionFactory) != null) {
+ throw new KsqlException("Table function already registered: " + functionName);
+ }
+
}
@Override
- public List listFunctions() {
+ public synchronized List listFunctions() {
return new ArrayList<>(udfs.values());
}
@Override
- public AggregateFunctionFactory getAggregateFactory(final String functionName) {
+ public synchronized AggregateFunctionFactory getAggregateFactory(final String functionName) {
final AggregateFunctionFactory udafFactory = udafs.get(functionName.toUpperCase());
if (udafFactory == null) {
throw new KsqlException(
@@ -150,7 +199,7 @@ public AggregateFunctionFactory getAggregateFactory(final String functionName) {
}
@Override
- public List listAggregateFunctions() {
+ public synchronized List listAggregateFunctions() {
return new ArrayList<>(udafs.values());
}
@@ -174,12 +223,29 @@ private BuiltInInitializer(
this.functionRegistry = Objects.requireNonNull(functionRegistry, "functionRegistry");
}
+ private static UdfFactory builtInUdfFactory(
+ final KsqlFunction ksqlFunction,
+ final boolean internal
+ ) {
+ final UdfMetadata metadata = new UdfMetadata(
+ ksqlFunction.getFunctionName().name(),
+ ksqlFunction.getDescription(),
+ KsqlConstants.CONFLUENT_AUTHOR,
+ "",
+ KsqlFunction.INTERNAL_PATH,
+ internal
+ );
+
+ return new UdfFactory(ksqlFunction.getKudfClass(), metadata);
+ }
+
private void init() {
addStringFunctions();
addMathFunctions();
addJsonFunctions();
addStructFieldFetcher();
addUdafFunctions();
+ addUdtfFunctions();
}
private void addStringFunctions() {
@@ -187,34 +253,42 @@ private void addStringFunctions() {
addBuiltInFunction(KsqlFunction.createLegacyBuiltIn(
Schema.OPTIONAL_STRING_SCHEMA,
Collections.singletonList(Schema.OPTIONAL_STRING_SCHEMA),
- FunctionName.of("LCASE"), LCaseKudf.class));
+ FunctionName.of("LCASE"), LCaseKudf.class
+ ));
addBuiltInFunction(KsqlFunction.createLegacyBuiltIn(
Schema.OPTIONAL_STRING_SCHEMA,
Collections.singletonList(Schema.OPTIONAL_STRING_SCHEMA),
- FunctionName.of("UCASE"), UCaseKudf.class));
+ FunctionName.of("UCASE"), UCaseKudf.class
+ ));
addBuiltInFunction(KsqlFunction.createLegacyBuiltIn(
Schema.OPTIONAL_STRING_SCHEMA,
ImmutableList.of(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA),
- FunctionName.of(ConcatKudf.NAME), ConcatKudf.class));
+ FunctionName.of(ConcatKudf.NAME), ConcatKudf.class
+ ));
addBuiltInFunction(KsqlFunction.createLegacyBuiltIn(
Schema.OPTIONAL_STRING_SCHEMA,
Collections.singletonList(Schema.OPTIONAL_STRING_SCHEMA),
- FunctionName.of("TRIM"), TrimKudf.class));
+ FunctionName.of("TRIM"), TrimKudf.class
+ ));
addBuiltInFunction(KsqlFunction.createLegacyBuiltIn(
Schema.OPTIONAL_STRING_SCHEMA,
- ImmutableList.of(Schema.OPTIONAL_STRING_SCHEMA,
- Schema.OPTIONAL_STRING_SCHEMA),
- FunctionName.of("IFNULL"), IfNullKudf.class));
+ ImmutableList.of(
+ Schema.OPTIONAL_STRING_SCHEMA,
+ Schema.OPTIONAL_STRING_SCHEMA
+ ),
+ FunctionName.of("IFNULL"), IfNullKudf.class
+ ));
addBuiltInFunction(KsqlFunction.createLegacyBuiltIn(
Schema.OPTIONAL_INT32_SCHEMA,
Collections.singletonList(Schema.OPTIONAL_STRING_SCHEMA),
FunctionName.of("LEN"),
- LenKudf.class));
+ LenKudf.class
+ ));
}
private void addMathFunctions() {
@@ -223,13 +297,15 @@ private void addMathFunctions() {
Schema.OPTIONAL_FLOAT64_SCHEMA,
Collections.singletonList(Schema.OPTIONAL_FLOAT64_SCHEMA),
FunctionName.of("CEIL"),
- CeilKudf.class));
+ CeilKudf.class
+ ));
addBuiltInFunction(KsqlFunction.createLegacyBuiltIn(
Schema.OPTIONAL_FLOAT64_SCHEMA,
Collections.emptyList(),
FunctionName.of("RANDOM"),
- RandomKudf.class));
+ RandomKudf.class
+ ));
}
private void addJsonFunctions() {
@@ -238,57 +314,71 @@ private void addJsonFunctions() {
Schema.OPTIONAL_STRING_SCHEMA,
ImmutableList.of(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA),
JsonExtractStringKudf.FUNCTION_NAME,
- JsonExtractStringKudf.class));
+ JsonExtractStringKudf.class
+ ));
addBuiltInFunction(KsqlFunction.createLegacyBuiltIn(
Schema.OPTIONAL_BOOLEAN_SCHEMA,
ImmutableList.of(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA),
FunctionName.of("ARRAYCONTAINS"),
- ArrayContainsKudf.class));
+ ArrayContainsKudf.class
+ ));
addBuiltInFunction(KsqlFunction.createLegacyBuiltIn(
Schema.OPTIONAL_BOOLEAN_SCHEMA,
ImmutableList.of(
SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(),
- Schema.OPTIONAL_STRING_SCHEMA),
+ Schema.OPTIONAL_STRING_SCHEMA
+ ),
FunctionName.of("ARRAYCONTAINS"),
- ArrayContainsKudf.class));
+ ArrayContainsKudf.class
+ ));
addBuiltInFunction(KsqlFunction.createLegacyBuiltIn(
Schema.OPTIONAL_BOOLEAN_SCHEMA,
ImmutableList.of(
SchemaBuilder.array(Schema.OPTIONAL_INT32_SCHEMA).optional().build(),
- Schema.OPTIONAL_INT32_SCHEMA),
+ Schema.OPTIONAL_INT32_SCHEMA
+ ),
FunctionName.of("ARRAYCONTAINS"),
- ArrayContainsKudf.class));
+ ArrayContainsKudf.class
+ ));
addBuiltInFunction(KsqlFunction.createLegacyBuiltIn(
Schema.OPTIONAL_BOOLEAN_SCHEMA,
ImmutableList.of(
SchemaBuilder.array(Schema.OPTIONAL_INT64_SCHEMA).optional().build(),
- Schema.OPTIONAL_INT64_SCHEMA),
+ Schema.OPTIONAL_INT64_SCHEMA
+ ),
FunctionName.of("ARRAYCONTAINS"),
- ArrayContainsKudf.class));
+ ArrayContainsKudf.class
+ ));
addBuiltInFunction(KsqlFunction.createLegacyBuiltIn(
Schema.OPTIONAL_BOOLEAN_SCHEMA,
ImmutableList.of(
SchemaBuilder.array(Schema.OPTIONAL_FLOAT64_SCHEMA).optional().build(),
- Schema.OPTIONAL_FLOAT64_SCHEMA),
+ Schema.OPTIONAL_FLOAT64_SCHEMA
+ ),
FunctionName.of("ARRAYCONTAINS"),
- ArrayContainsKudf.class));
+ ArrayContainsKudf.class
+ ));
}
private void addStructFieldFetcher() {
- addBuiltInFunction(KsqlFunction.createLegacyBuiltIn(
- SchemaBuilder.struct().optional().build(),
- ImmutableList.of(
+ addBuiltInFunction(
+ KsqlFunction.createLegacyBuiltIn(
SchemaBuilder.struct().optional().build(),
- Schema.STRING_SCHEMA),
- FetchFieldFromStruct.FUNCTION_NAME,
- FetchFieldFromStruct.class),
- true);
+ ImmutableList.of(
+ SchemaBuilder.struct().optional().build(),
+ Schema.STRING_SCHEMA
+ ),
+ FetchFieldFromStruct.FUNCTION_NAME,
+ FetchFieldFromStruct.class
+ ),
+ true
+ );
}
private void addUdafFunctions() {
@@ -303,6 +393,10 @@ private void addUdafFunctions() {
functionRegistry.addAggregateFunctionFactory(new TopkDistinctAggFunctionFactory());
}
+ private void addUdtfFunctions() {
+ functionRegistry.addTableFunctionFactory(new ExplodeFunctionFactory());
+ }
+
private void addBuiltInFunction(final KsqlFunction ksqlFunction) {
addBuiltInFunction(ksqlFunction, false);
}
@@ -312,20 +406,5 @@ private void addBuiltInFunction(final KsqlFunction ksqlFunction, final boolean i
.ensureFunctionFactory(builtInUdfFactory(ksqlFunction, internal))
.addFunction(ksqlFunction);
}
-
- private static UdfFactory builtInUdfFactory(
- final KsqlFunction ksqlFunction,
- final boolean internal
- ) {
- final UdfMetadata metadata = new UdfMetadata(
- ksqlFunction.getFunctionName().name(),
- ksqlFunction.getDescription(),
- "Confluent",
- "",
- KsqlFunction.INTERNAL_PATH,
- internal);
-
- return new UdfFactory(ksqlFunction.getKudfClass(), metadata);
- }
}
}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java b/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java
index 6ffe15aa196d..0d2a57f846f4 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java
@@ -127,7 +127,7 @@ public void load() {
// Does not handle customer udfs, i.e the loader is the ParentClassLoader and path is internal
void loadUdfFromClass(final Class>... udfClass) {
- for (final Class> theClass: udfClass) {
+ for (final Class> theClass : udfClass) {
//classes must be annotated with @UdfDescription
final UdfDescription udfDescription = theClass.getAnnotation(UdfDescription.class);
if (udfDescription == null) {
@@ -135,7 +135,7 @@ void loadUdfFromClass(final Class>... udfClass) {
+ "be annotated with @UdfDescription.", theClass.getName()));
}
//method must be public and annotated with @Udf
- for (Method m: theClass.getDeclaredMethods()) {
+ for (Method m : theClass.getDeclaredMethods()) {
if (m.isAnnotationPresent(Udf.class) && Modifier.isPublic(m.getModifiers())) {
processUdfAnnotation(
theClass,
@@ -342,7 +342,7 @@ private void addFunction(final Class theClass,
ksqlConfig -> {
final Object actualUdf = instantiateUdfClass(method, classLevelAnnotation);
if (actualUdf instanceof Configurable) {
- ((Configurable)actualUdf)
+ ((Configurable) actualUdf)
.configure(ksqlConfig.getKsqlFunctionsConfigProps(functionName));
}
final PluggableUdf theUdf = new PluggableUdf(udf, actualUdf);
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowEndKudaf.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowEndKudaf.java
index 482abe7d8d8f..dd561764b4a5 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowEndKudaf.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowEndKudaf.java
@@ -20,6 +20,7 @@
import io.confluent.ksql.function.udaf.UdafDescription;
import io.confluent.ksql.function.udaf.UdafFactory;
import io.confluent.ksql.function.udaf.placeholder.PlaceholderTableUdaf;
+import io.confluent.ksql.util.KsqlConstants;
/**
* A placeholder KUDAF for extracting window end times.
@@ -29,7 +30,7 @@
* @see WindowSelectMapper
*/
@SuppressWarnings("WeakerAccess") // Invoked via reflection.
-@UdafDescription(name = "WindowEnd", author = "Confluent",
+@UdafDescription(name = "WindowEnd", author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Returns the window end time, in milliseconds, for the given record. "
+ "If the given record is not part of a window the function will return NULL.")
public final class WindowEndKudaf {
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowStartKudaf.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowStartKudaf.java
index c8765548373b..5f81a0d048b4 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowStartKudaf.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowStartKudaf.java
@@ -20,6 +20,7 @@
import io.confluent.ksql.function.udaf.UdafDescription;
import io.confluent.ksql.function.udaf.UdafFactory;
import io.confluent.ksql.function.udaf.placeholder.PlaceholderTableUdaf;
+import io.confluent.ksql.util.KsqlConstants;
/**
* A placeholder KUDAF for extracting window start times.
@@ -29,7 +30,7 @@
* @see WindowSelectMapper
*/
@SuppressWarnings("WeakerAccess") // Invoked via reflection.
-@UdafDescription(name = "WindowStart", author = "Confluent",
+@UdafDescription(name = "WindowStart", author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Returns the window start time, in milliseconds, for the given record. "
+ "If the given record is not part of a window the function will return NULL.")
public final class WindowStartKudaf {
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udtf/array/ExplodeFunctionFactory.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udtf/array/ExplodeFunctionFactory.java
new file mode 100644
index 000000000000..028b7f0a0f1c
--- /dev/null
+++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udtf/array/ExplodeFunctionFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.function.udtf.array;
+
+import io.confluent.ksql.function.KsqlTableFunction;
+import io.confluent.ksql.function.TableFunctionFactory;
+import io.confluent.ksql.function.udf.UdfMetadata;
+import io.confluent.ksql.name.FunctionName;
+import io.confluent.ksql.util.KsqlConstants;
+import io.confluent.ksql.util.KsqlException;
+import io.confluent.ksql.util.Version;
+import java.util.List;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+
+public class ExplodeFunctionFactory extends TableFunctionFactory {
+
+ private static final FunctionName NAME = FunctionName.of("EXPLODE");
+
+ public ExplodeFunctionFactory() {
+ super(new UdfMetadata(NAME.name(),
+ "Explodes an array into zero or more rows",
+ KsqlConstants.CONFLUENT_AUTHOR,
+ Version.getVersion(),
+ "",
+ false));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public KsqlTableFunction, ?> createTableFunction(final List argTypeList) {
+ if (argTypeList.size() != 1) {
+ throw new KsqlException("EXPLODE function should have one arguments.");
+ }
+
+ final Schema schema = argTypeList.get(0);
+ if (schema.type() == Type.ARRAY) {
+ return new ExplodeTableFunction(NAME, schema.valueSchema(), argTypeList,
+ "Explodes an array");
+ }
+ throw new KsqlException("Unsupported argument type for EXPLODE " + schema);
+ }
+
+ @Override
+ public List> supportedArgs() {
+ return null;
+ }
+}
\ No newline at end of file
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udtf/array/ExplodeTableFunction.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udtf/array/ExplodeTableFunction.java
new file mode 100644
index 000000000000..05d223b373da
--- /dev/null
+++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udtf/array/ExplodeTableFunction.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.function.udtf.array;
+
+import io.confluent.ksql.function.BaseTableFunction;
+import io.confluent.ksql.name.FunctionName;
+import java.util.Collections;
+import java.util.List;
+import org.apache.kafka.connect.data.Schema;
+
+public class ExplodeTableFunction extends BaseTableFunction, T> {
+
+ public ExplodeTableFunction(
+ final FunctionName functionName,
+ final Schema outputType,
+ final List arguments,
+ final String description) {
+ super(functionName, outputType, arguments, description);
+ }
+
+ @Override
+ public List flatMap(final List input) {
+ return input == null ? Collections.emptyList() : input;
+ }
+
+}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java
index 352356cae1a9..633b7003b3a5 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java
@@ -30,6 +30,7 @@
import io.confluent.ksql.planner.plan.AggregateNode;
import io.confluent.ksql.planner.plan.DataSourceNode;
import io.confluent.ksql.planner.plan.FilterNode;
+import io.confluent.ksql.planner.plan.FlatMapNode;
import io.confluent.ksql.planner.plan.JoinNode;
import io.confluent.ksql.planner.plan.KsqlBareOutputNode;
import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode;
@@ -76,6 +77,10 @@ public LogicalPlanner(
public OutputNode buildPlan() {
PlanNode currentNode = buildSourceNode();
+ if (!analysis.getTableFunctions().isEmpty()) {
+ currentNode = buildFlatMapNode(currentNode);
+ }
+
if (analysis.getWhereExpression().isPresent()) {
currentNode = buildFilterNode(currentNode, analysis.getWhereExpression().get());
}
@@ -177,7 +182,8 @@ private AggregateNode buildAggregateNode(final PlanNode sourcePlanNode) {
final Optional keyFieldName = getSelectAliasMatching((expression, alias) ->
expression.equals(groupBy)
&& !SchemaUtil.isFieldName(alias.name(), SchemaUtil.ROWTIME_NAME.name())
- && !SchemaUtil.isFieldName(alias.name(), SchemaUtil.ROWKEY_NAME.name()));
+ && !SchemaUtil.isFieldName(alias.name(), SchemaUtil.ROWKEY_NAME.name()),
+ sourcePlanNode);
return new AggregateNode(
new PlanNodeId("Aggregate"),
@@ -204,15 +210,15 @@ private ProjectNode buildProjectNode(final PlanNode sourcePlanNode) {
final Optional keyFieldName = getSelectAliasMatching((expression, alias) ->
expression instanceof ColumnReferenceExp
- && ((ColumnReferenceExp) expression).getReference().equals(sourceKeyFieldName)
+ && ((ColumnReferenceExp) expression).getReference().equals(sourceKeyFieldName),
+ sourcePlanNode
);
return new ProjectNode(
new PlanNodeId("Project"),
sourcePlanNode,
schema,
- keyFieldName.map(ColumnRef::withoutSource),
- analysis.getSelectExpressions()
+ keyFieldName.map(ColumnRef::withoutSource)
);
}
@@ -223,6 +229,10 @@ private static FilterNode buildFilterNode(
return new FilterNode(new PlanNodeId("Filter"), sourcePlanNode, filterExpression);
}
+ private FlatMapNode buildFlatMapNode(final PlanNode sourcePlanNode) {
+ return new FlatMapNode(new PlanNodeId("FlatMap"), sourcePlanNode, functionRegistry, analysis);
+ }
+
private PlanNode buildSourceNode() {
final List sources = analysis.getFromDataSources();
@@ -242,17 +252,20 @@ private PlanNode buildSourceNode() {
final DataSourceNode leftSourceNode = new DataSourceNode(
new PlanNodeId("KafkaTopic_Left"),
left.getDataSource(),
- left.getAlias()
+ left.getAlias(),
+ analysis.getSelectExpressions()
);
final DataSourceNode rightSourceNode = new DataSourceNode(
new PlanNodeId("KafkaTopic_Right"),
right.getDataSource(),
- right.getAlias()
+ right.getAlias(),
+ analysis.getSelectExpressions()
);
return new JoinNode(
new PlanNodeId("Join"),
+ analysis.getSelectExpressions(),
joinInfo.get().getType(),
leftSourceNode,
rightSourceNode,
@@ -271,15 +284,17 @@ private DataSourceNode buildNonJoinNode(final List sources) {
return new DataSourceNode(
new PlanNodeId("KsqlTopic"),
dataSource.getDataSource(),
- dataSource.getAlias()
+ dataSource.getAlias(),
+ analysis.getSelectExpressions()
);
}
private Optional getSelectAliasMatching(
- final BiFunction matcher
+ final BiFunction matcher,
+ final PlanNode sourcePlanNode
) {
- for (int i = 0; i < analysis.getSelectExpressions().size(); i++) {
- final SelectExpression select = analysis.getSelectExpressions().get(i);
+ for (int i = 0; i < sourcePlanNode.getSelectExpressions().size(); i++) {
+ final SelectExpression select = sourcePlanNode.getSelectExpressions().get(i);
if (matcher.apply(select.getExpression(), select.getAlias())) {
return Optional.of(select.getAlias());
@@ -303,8 +318,8 @@ private LogicalSchema buildProjectionSchema(final PlanNode sourcePlanNode) {
builder.keyColumns(keyColumns);
- for (int i = 0; i < analysis.getSelectExpressions().size(); i++) {
- final SelectExpression select = analysis.getSelectExpressions().get(i);
+ for (int i = 0; i < sourcePlanNode.getSelectExpressions().size(); i++) {
+ final SelectExpression select = sourcePlanNode.getSelectExpressions().get(i);
final SqlType expressionType = expressionTypeManager
.getExpressionSqlType(select.getExpression());
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/PlanSourceExtractorVisitor.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/PlanSourceExtractorVisitor.java
index a5a2c3c500fc..e8252d18b81e 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/planner/PlanSourceExtractorVisitor.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/PlanSourceExtractorVisitor.java
@@ -19,6 +19,7 @@
import io.confluent.ksql.planner.plan.AggregateNode;
import io.confluent.ksql.planner.plan.DataSourceNode;
import io.confluent.ksql.planner.plan.FilterNode;
+import io.confluent.ksql.planner.plan.FlatMapNode;
import io.confluent.ksql.planner.plan.JoinNode;
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.planner.plan.PlanNode;
@@ -49,7 +50,7 @@ protected R visitFilter(final FilterNode node, final C context) {
}
protected R visitProject(final ProjectNode node, final C context) {
- return process(node.getSources().get(0), context);
+ return process(node.getSource(), context);
}
protected R visitDataSourceNode(final DataSourceNode node, final C context) {
@@ -73,6 +74,12 @@ protected R visitOutput(final OutputNode node, final C context) {
return null;
}
+ @Override
+ protected R visitFlatMap(final FlatMapNode node, final C context) {
+ process(node.getSources().get(0), context);
+ return null;
+ }
+
public Set getSourceNames() {
return sourceNames;
}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java
index 428ce3760802..6f5a43b470ee 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java
@@ -176,6 +176,11 @@ private List getFinalSelectExpressions() {
return finalSelectExpressionList;
}
+ @Override
+ public List getSelectExpressions() {
+ return Collections.emptyList();
+ }
+
@Override
public R accept(final PlanVisitor visitor, final C context) {
return visitor.visitAggregate(this, context);
@@ -302,7 +307,7 @@ private LogicalSchema buildLogicalSchema(
for (int i = 0; i < aggregations.size(); i++) {
final KsqlAggregateFunction aggregateFunction =
UdafUtil.resolveAggregateFunction(functionRegistry, aggregations.get(i), inputSchema);
- final ColumnName colName = ColumnName.aggregate(i);
+ final ColumnName colName = ColumnName.aggregateColumn(i);
final SqlType fieldType = converter.toSqlType(
useAggregate ? aggregateFunction.getAggregateType() : aggregateFunction.getReturnType()
);
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/DataSourceNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/DataSourceNode.java
index 50754440d4f8..c86f9c3b7f8e 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/DataSourceNode.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/DataSourceNode.java
@@ -22,6 +22,7 @@
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.context.QueryContext.Stacker;
import io.confluent.ksql.execution.plan.LogicalSchemaWithMetaAndKeyFields;
+import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.execution.plan.StreamSource;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
@@ -51,24 +52,29 @@ public class DataSourceNode extends PlanNode {
private final LogicalSchemaWithMetaAndKeyFields schema;
private final KeyField keyField;
private final SchemaKStreamFactory schemaKStreamFactory;
+ private final List selectExpressions;
public DataSourceNode(
final PlanNodeId id,
final DataSource> dataSource,
- final SourceName alias
+ final SourceName alias,
+ final List selectExpressions
) {
- this(id, dataSource, alias, SchemaKStream::forSource);
+ this(id, dataSource, alias, selectExpressions, SchemaKStream::forSource);
}
DataSourceNode(
final PlanNodeId id,
final DataSource> dataSource,
final SourceName alias,
+ final List selectExpressions,
final SchemaKStreamFactory schemaKStreamFactory
) {
super(id, dataSource.getDataSourceType());
this.dataSource = requireNonNull(dataSource, "dataSource");
this.alias = requireNonNull(alias, "alias");
+ this.selectExpressions =
+ ImmutableList.copyOf(requireNonNull(selectExpressions, "selectExpressions"));
// DataSourceNode copies implicit and key fields into the value schema
// It users a KS valueMapper to add the key fields
@@ -121,6 +127,11 @@ public List getSources() {
return ImmutableList.of();
}
+ @Override
+ public List getSelectExpressions() {
+ return selectExpressions;
+ }
+
@Override
public R accept(final PlanVisitor visitor, final C context) {
return visitor.visitDataSourceNode(this, context);
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FilterNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FilterNode.java
index 9259b5c14bbc..c3b3524a0e25 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FilterNode.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FilterNode.java
@@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.expression.tree.Expression;
+import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.services.KafkaTopicClient;
@@ -31,6 +32,7 @@ public class FilterNode extends PlanNode {
private final PlanNode source;
private final Expression predicate;
+ private final List selectExpressions;
public FilterNode(
final PlanNodeId id,
@@ -41,6 +43,7 @@ public FilterNode(
this.source = Objects.requireNonNull(source, "source");
this.predicate = Objects.requireNonNull(predicate, "predicate");
+ this.selectExpressions = ImmutableList.copyOf(source.getSelectExpressions());
}
public Expression getPredicate() {
@@ -62,6 +65,11 @@ public List getSources() {
return ImmutableList.of(source);
}
+ @Override
+ public List getSelectExpressions() {
+ return selectExpressions;
+ }
+
public PlanNode getSource() {
return source;
}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FlatMapNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FlatMapNode.java
new file mode 100644
index 000000000000..a05a5606c2d4
--- /dev/null
+++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FlatMapNode.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.planner.plan;
+
+import com.google.common.collect.ImmutableList;
+import io.confluent.ksql.analyzer.Analysis;
+import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter;
+import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter.Context;
+import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
+import io.confluent.ksql.execution.context.QueryContext;
+import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
+import io.confluent.ksql.execution.expression.tree.Expression;
+import io.confluent.ksql.execution.expression.tree.FunctionCall;
+import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor;
+import io.confluent.ksql.execution.function.UdtfUtil;
+import io.confluent.ksql.execution.plan.SelectExpression;
+import io.confluent.ksql.function.FunctionRegistry;
+import io.confluent.ksql.function.KsqlTableFunction;
+import io.confluent.ksql.metastore.model.KeyField;
+import io.confluent.ksql.name.ColumnName;
+import io.confluent.ksql.schema.ksql.Column;
+import io.confluent.ksql.schema.ksql.ColumnRef;
+import io.confluent.ksql.schema.ksql.LogicalSchema;
+import io.confluent.ksql.schema.ksql.SchemaConverters;
+import io.confluent.ksql.schema.ksql.SchemaConverters.ConnectToSqlTypeConverter;
+import io.confluent.ksql.schema.ksql.types.SqlType;
+import io.confluent.ksql.services.KafkaTopicClient;
+import io.confluent.ksql.structured.SchemaKStream;
+import io.confluent.ksql.util.KsqlException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * A node in the logical plan which represents a flat map operation - transforming a single row into
+ * zero or more rows.
+ */
+@Immutable
+public class FlatMapNode extends PlanNode {
+
+ private final PlanNode source;
+ private final LogicalSchema outputSchema;
+ private final List finalSelectExpressions;
+ private final Analysis analysis;
+ private final FunctionRegistry functionRegistry;
+
+ public FlatMapNode(
+ final PlanNodeId id,
+ final PlanNode source,
+ final FunctionRegistry functionRegistry,
+ final Analysis analysis
+ ) {
+ super(id, source.getNodeOutputType());
+ this.source = Objects.requireNonNull(source, "source");
+ this.analysis = Objects.requireNonNull(analysis);
+ this.functionRegistry = functionRegistry;
+ this.finalSelectExpressions = buildFinalSelectExpressions();
+ outputSchema = buildLogicalSchema(source.getSchema());
+ if (analysis.getTableFunctions().size() > 1) {
+ throw new KsqlException("Only one table function per query currently is supported");
+ }
+ }
+
+ @Override
+ public LogicalSchema getSchema() {
+ return outputSchema;
+ }
+
+ @Override
+ public KeyField getKeyField() {
+ return source.getKeyField();
+ }
+
+ @Override
+ public List getSources() {
+ return ImmutableList.of(source);
+ }
+
+ public PlanNode getSource() {
+ return source;
+ }
+
+ @Override
+ public List getSelectExpressions() {
+ return finalSelectExpressions;
+ }
+
+ @Override
+ public R accept(final PlanVisitor visitor, final C context) {
+ return visitor.visitFlatMap(this, context);
+ }
+
+ @Override
+ protected int getPartitions(final KafkaTopicClient kafkaTopicClient) {
+ return source.getPartitions(kafkaTopicClient);
+ }
+
+ @Override
+ public SchemaKStream> buildStream(final KsqlQueryBuilder builder) {
+
+ final QueryContext.Stacker contextStacker = builder.buildNodeContext(getId().toString());
+
+ return getSource().buildStream(builder).flatMap(
+ outputSchema,
+ analysis.getTableFunctions().get(0),
+ contextStacker
+ );
+ }
+
+ private LogicalSchema buildLogicalSchema(final LogicalSchema inputSchema) {
+ final LogicalSchema.Builder schemaBuilder = LogicalSchema.builder();
+ final List cols = inputSchema.value();
+
+ // We copy all the original columns to the output schema
+ schemaBuilder.keyColumns(inputSchema.key());
+ for (Column col : cols) {
+ schemaBuilder.valueColumn(col);
+ }
+
+ final ConnectToSqlTypeConverter converter = SchemaConverters.connectToSqlConverter();
+
+ // And add new columns representing the exploded values at the end
+ for (int i = 0; i < analysis.getTableFunctions().size(); i++) {
+ final KsqlTableFunction tableFunction =
+ UdtfUtil.resolveTableFunction(functionRegistry,
+ analysis.getTableFunctions().get(i), inputSchema
+ );
+ final ColumnName colName = ColumnName.synthesisedSchemaColumn(i);
+ final SqlType fieldType = converter.toSqlType(tableFunction.getReturnType());
+ schemaBuilder.valueColumn(colName, fieldType);
+ }
+
+ return schemaBuilder.build();
+ }
+
+ private List buildFinalSelectExpressions() {
+ final TableFunctionExpressionRewriter tableFunctionExpressionRewriter =
+ new TableFunctionExpressionRewriter();
+ final List selectExpressions = new ArrayList<>();
+ for (final SelectExpression select : analysis.getSelectExpressions()) {
+ final Expression exp = select.getExpression();
+ selectExpressions.add(
+ SelectExpression.of(
+ select.getAlias(),
+ ExpressionTreeRewriter.rewriteWith(
+ tableFunctionExpressionRewriter::process, exp)
+ ));
+ }
+ return selectExpressions;
+ }
+
+ private class TableFunctionExpressionRewriter
+ extends VisitParentExpressionVisitor, Context> {
+
+ private int variableIndex = 0;
+
+ TableFunctionExpressionRewriter() {
+ super(Optional.empty());
+ }
+
+ @Override
+ public Optional visitFunctionCall(
+ final FunctionCall node,
+ final Context context
+ ) {
+ final String functionName = node.getName().name();
+ if (functionRegistry.isTableFunction(functionName)) {
+ final ColumnName varName = ColumnName.synthesisedSchemaColumn(variableIndex);
+ variableIndex++;
+ return Optional.of(
+ new ColumnReferenceExp(node.getLocation(), ColumnRef.of(Optional.empty(), varName)));
+ } else {
+ final List arguments = new ArrayList<>();
+ for (final Expression argExpression : node.getArguments()) {
+ arguments.add(context.process(argExpression));
+ }
+ return Optional.of(new FunctionCall(node.getLocation(), node.getName(), arguments));
+ }
+ }
+ }
+}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java
index 6723aac5618a..b873fb0244f5 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java
@@ -19,6 +19,7 @@
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.context.QueryContext.Stacker;
+import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.metastore.model.KeyField.LegacyField;
@@ -59,9 +60,11 @@ public enum JoinType {
private final ColumnRef rightJoinFieldName;
private final KeyField keyField;
private final Optional withinExpression;
+ private final List selectExpressions;
public JoinNode(
final PlanNodeId id,
+ final List selectExpressions,
final JoinType joinType,
final DataSourceNode left,
final DataSourceNode right,
@@ -76,6 +79,7 @@ public JoinNode(
this.leftJoinFieldName = Objects.requireNonNull(leftJoinFieldName, "leftJoinFieldName");
this.rightJoinFieldName = Objects.requireNonNull(rightJoinFieldName, "rightJoinFieldName");
this.withinExpression = Objects.requireNonNull(withinExpression, "withinExpression");
+ this.selectExpressions = Objects.requireNonNull(selectExpressions, "selectExpressions");
final Column leftKeyCol = validateSchemaColumn(leftJoinFieldName, left.getSchema());
validateSchemaColumn(rightJoinFieldName, right.getSchema());
@@ -107,6 +111,11 @@ public R accept(final PlanVisitor visitor, final C context) {
return visitor.visitJoin(this, context);
}
+ @Override
+ public List getSelectExpressions() {
+ return selectExpressions;
+ }
+
public DataSourceNode getLeft() {
return left;
}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/OutputNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/OutputNode.java
index 7089f93b1bc0..effb3fee0570 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/OutputNode.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/OutputNode.java
@@ -18,11 +18,13 @@
import static java.util.Objects.requireNonNull;
import com.google.common.collect.ImmutableList;
+import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.id.QueryIdGenerator;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy;
+import java.util.Collections;
import java.util.List;
import java.util.OptionalInt;
import javax.annotation.concurrent.Immutable;
@@ -76,6 +78,11 @@ protected int getPartitions(final KafkaTopicClient kafkaTopicClient) {
return source.getPartitions(kafkaTopicClient);
}
+ @Override
+ public List getSelectExpressions() {
+ return Collections.emptyList();
+ }
+
@Override
public R accept(final PlanVisitor visitor, final C context) {
return visitor.visitOutput(this, context);
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/PlanNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/PlanNode.java
index 9f83890f7923..7f1469da3525 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/PlanNode.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/PlanNode.java
@@ -18,6 +18,7 @@
import static java.util.Objects.requireNonNull;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
+import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.schema.ksql.LogicalSchema;
@@ -51,6 +52,8 @@ public DataSourceType getNodeOutputType() {
public abstract KeyField getKeyField();
public abstract List getSources();
+
+ public abstract List getSelectExpressions();
public R accept(final PlanVisitor visitor, final C context) {
return visitor.visitPlan(this, context);
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/PlanVisitor.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/PlanVisitor.java
index a6c34516486e..97365e776469 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/PlanVisitor.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/PlanVisitor.java
@@ -45,4 +45,8 @@ protected R visitOutput(final OutputNode node, final C context) {
return visitPlan(node, context);
}
+ protected R visitFlatMap(final FlatMapNode node, final C context) {
+ return visitPlan(node, context);
+ }
+
}
\ No newline at end of file
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ProjectNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ProjectNode.java
index bcbe0de308ae..7626ffc4730d 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ProjectNode.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ProjectNode.java
@@ -43,15 +43,13 @@ public ProjectNode(
final PlanNodeId id,
final PlanNode source,
final LogicalSchema schema,
- final Optional keyFieldName,
- final List projectExpressions
+ final Optional keyFieldName
) {
super(id, source.getNodeOutputType());
this.source = requireNonNull(source, "source");
this.schema = requireNonNull(schema, "schema");
- this.projectExpressions = ImmutableList
- .copyOf(requireNonNull(projectExpressions, "projectExpressions"));
+ this.projectExpressions = ImmutableList.copyOf(source.getSelectExpressions());
this.keyField = KeyField.of(
requireNonNull(keyFieldName, "keyFieldName"),
source.getKeyField().legacy())
@@ -89,7 +87,8 @@ public KeyField getKeyField() {
return keyField;
}
- public List getProjectSelectExpressions() {
+ @Override
+ public List getSelectExpressions() {
return projectExpressions;
}
@@ -102,7 +101,7 @@ public R accept(final PlanVisitor visitor, final C context) {
public SchemaKStream> buildStream(final KsqlQueryBuilder builder) {
return getSource().buildStream(builder)
.select(
- getProjectSelectExpressions(),
+ getSelectExpressions(),
builder.buildNodeContext(getId().toString()),
builder
);
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java
index 356c1f1a8f96..f6261d038461 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java
@@ -28,6 +28,9 @@
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.Expression;
+import io.confluent.ksql.execution.expression.tree.FunctionCall;
+import io.confluent.ksql.execution.function.UdtfUtil;
+import io.confluent.ksql.execution.function.udtf.TableFunctionApplier;
import io.confluent.ksql.execution.plan.AbstractStreamSource;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.ExecutionStepProperties;
@@ -37,6 +40,7 @@
import io.confluent.ksql.execution.plan.LogicalSchemaWithMetaAndKeyFields;
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.execution.plan.StreamFilter;
+import io.confluent.ksql.execution.plan.StreamFlatMap;
import io.confluent.ksql.execution.plan.StreamGroupBy;
import io.confluent.ksql.execution.plan.StreamGroupByKey;
import io.confluent.ksql.execution.plan.StreamMapValues;
@@ -49,6 +53,7 @@
import io.confluent.ksql.execution.plan.WindowedStreamSource;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.function.FunctionRegistry;
+import io.confluent.ksql.function.KsqlTableFunction;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.metastore.model.KeyField.LegacyField;
@@ -69,6 +74,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.OptionalInt;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
@@ -535,8 +541,8 @@ public SchemaKStream selectKey(
final LegacyField proposedLegacy = LegacyField.of(proposedKey.ref(), proposedKey.type());
final KeyField resultantKeyField = isRowKey(columnRef)
- ? keyField.withLegacy(proposedLegacy)
- : KeyField.of(columnRef, proposedLegacy);
+ ? keyField.withLegacy(proposedLegacy)
+ : KeyField.of(columnRef, proposedLegacy);
final boolean namesMatch = existingKey
.map(kf -> kf.matches(proposedKey.ref()))
@@ -678,6 +684,41 @@ private SchemaKGroupedStream groupByKey(
);
}
+ public SchemaKStream flatMap(
+ final LogicalSchema outputSchema,
+ final FunctionCall functionCall,
+ final QueryContext.Stacker contextStacker
+ ) {
+ final ColumnReferenceExp exp = (ColumnReferenceExp)functionCall.getArguments().get(0);
+ final ColumnName columnName = exp.getReference().name();
+ final ColumnRef ref = ColumnRef.withoutSource(columnName);
+ final OptionalInt indexInInput = getSchema().valueColumnIndex(ref);
+ if (!indexInInput.isPresent()) {
+ throw new IllegalArgumentException("Can't find input column " + columnName);
+ }
+ final KsqlTableFunction tableFunction = UdtfUtil.resolveTableFunction(
+ functionRegistry,
+ functionCall,
+ getSchema()
+ );
+ final TableFunctionApplier functionHolder =
+ new TableFunctionApplier(tableFunction, indexInInput.getAsInt());
+ final StreamFlatMap step = ExecutionStepFactory.streamFlatMap(
+ contextStacker,
+ sourceStep,
+ outputSchema,
+ functionHolder
+ );
+ return new SchemaKStream(
+ step,
+ keyFormat,
+ keyField,
+ sourceSchemaKStreams,
+ type,
+ ksqlConfig,
+ functionRegistry);
+ }
+
public ExecutionStep> getSourceStep() {
return sourceStep;
}
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerFunctionalTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerFunctionalTest.java
index 9c5ea40c9fbe..31ee0db8357d 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerFunctionalTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerFunctionalTest.java
@@ -28,19 +28,18 @@
import io.confluent.ksql.analyzer.Analysis.AliasedDataSource;
import io.confluent.ksql.analyzer.Analysis.Into;
+import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.IntegerLiteral;
import io.confluent.ksql.execution.plan.SelectExpression;
-import io.confluent.ksql.name.ColumnName;
-import io.confluent.ksql.name.SourceName;
-import io.confluent.ksql.schema.ksql.ColumnRef;
-import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.KsqlStream;
import io.confluent.ksql.metastore.model.KsqlTable;
+import io.confluent.ksql.name.ColumnName;
+import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.KsqlParserTestUtil;
import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
@@ -48,6 +47,7 @@
import io.confluent.ksql.parser.tree.InsertInto;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Sink;
+import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.util.KsqlException;
@@ -192,6 +192,20 @@ public void shouldCreateAnalysisForInsertInto() {
assertThat(into.getKsqlTopic(), is(test0.getKsqlTopic()));
}
+ @Test
+ public void shouldAnalyseTableFunctions() {
+
+ // Given:
+ final Query query = givenQuery("SELECT ID, EXPLODE(ARR1) FROM SENSOR_READINGS;");
+
+ // When:
+ final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
+
+ // Then:
+ assertThat(analysis.getTableFunctions(), hasSize(1));
+ assertThat(analysis.getTableFunctions().get(0).getName().name(), equalTo("EXPLODE"));
+ }
+
@Test
public void shouldAnalyseWindowedAggregate() {
// Given:
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/InternalFunctionRegistryTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/InternalFunctionRegistryTest.java
index 213711f868d4..b6160f1e11c3 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/function/InternalFunctionRegistryTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/function/InternalFunctionRegistryTest.java
@@ -30,6 +30,7 @@
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.function.udf.Kudf;
+import io.confluent.ksql.function.udf.UdfMetadata;
import io.confluent.ksql.name.FunctionName;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.KsqlException;
@@ -233,85 +234,19 @@ public void shouldKnowIfFunctionIsAggregate() {
@Test
public void shouldAddAggregateFunction() {
- functionRegistry.addAggregateFunctionFactory(
- new AggregateFunctionFactory("my_aggregate") {
- @Override
- public KsqlAggregateFunction createAggregateFunction(final List argTypeList,
- final AggregateFunctionInitArguments initArgs) {
- return new KsqlAggregateFunction() {
-
- @Override
- public FunctionName getFunctionName() {
- return FunctionName.of("my_aggregate");
- }
-
- @Override
- public Supplier getInitialValueSupplier() {
- return null;
- }
-
- @Override
- public int getArgIndexInValue() {
- return 0;
- }
-
- @Override
- public Schema getAggregateType() {
- return null;
- }
-
- @Override
- public SqlType aggregateType() {
- return null;
- }
-
- @Override
- public Schema getReturnType() {
- return null;
- }
-
- @Override
- public SqlType returnType() {
- return null;
- }
-
- @Override
- public Object aggregate(final Object currentValue, final Object aggregateValue) {
- return null;
- }
-
- @Override
- public Merger getMerger() {
- return null;
- }
-
- @Override
- public Function getResultMapper() {
- return null;
- }
-
- @Override
- public List getArguments() {
- return argTypeList;
- }
-
- @Override
- public String getDescription() {
- return null;
- }
- };
- }
-
- @Override
- public List> supportedArgs() {
- return ImmutableList.of();
- }
- });
+ functionRegistry.addAggregateFunctionFactory(createAggregateFunctionFactory());
assertThat(functionRegistry.getAggregateFunction("my_aggregate",
Schema.OPTIONAL_INT32_SCHEMA,
AggregateFunctionInitArguments.EMPTY_ARGS), not(nullValue()));
}
+ @Test
+ public void shouldAddTableFunction() {
+ functionRegistry.addTableFunctionFactory(createTableFunctionFactory());
+ assertThat(functionRegistry.getTableFunction("my_tablefunction",
+ Schema.OPTIONAL_INT32_SCHEMA), not(nullValue()));
+ }
+
@Test
public void shouldAddFunctionWithSameNameButDifferentReturnTypes() {
// Given:
@@ -419,4 +354,125 @@ public void shouldNotAllowModificationViaListFunctions() {
private void givenUdfFactoryRegistered() {
functionRegistry.ensureFunctionFactory(UdfLoaderUtil.createTestUdfFactory(func));
}
+
+ private static AggregateFunctionFactory createAggregateFunctionFactory() {
+ return new AggregateFunctionFactory("my_aggregate") {
+ @Override
+ public KsqlAggregateFunction createAggregateFunction(final List argTypeList,
+ final AggregateFunctionInitArguments initArgs) {
+ return new KsqlAggregateFunction() {
+
+ @Override
+ public FunctionName getFunctionName() {
+ return FunctionName.of("my_aggregate");
+ }
+
+ @Override
+ public Supplier getInitialValueSupplier() {
+ return null;
+ }
+
+ @Override
+ public int getArgIndexInValue() {
+ return 0;
+ }
+
+ @Override
+ public Schema getAggregateType() {
+ return null;
+ }
+
+ @Override
+ public SqlType aggregateType() {
+ return null;
+ }
+
+ @Override
+ public Schema getReturnType() {
+ return null;
+ }
+
+ @Override
+ public SqlType returnType() {
+ return null;
+ }
+
+ @Override
+ public Object aggregate(final Object currentValue, final Object aggregateValue) {
+ return null;
+ }
+
+ @Override
+ public Merger getMerger() {
+ return null;
+ }
+
+ @Override
+ public Function getResultMapper() {
+ return null;
+ }
+
+ @Override
+ public List getArguments() {
+ return argTypeList;
+ }
+
+ @Override
+ public String getDescription() {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public List> supportedArgs() {
+ return ImmutableList.of();
+ }
+ };
+ }
+
+ private static TableFunctionFactory createTableFunctionFactory() {
+ return new TableFunctionFactory(new UdfMetadata("my_tablefunction",
+ "", "", "", "", false)) {
+ @Override
+ public KsqlTableFunction, ?> createTableFunction(List argTypeList) {
+ return new KsqlTableFunction, Object>() {
+ @Override
+ public Schema getReturnType() {
+ return null;
+ }
+
+ @Override
+ public SqlType returnType() {
+ return null;
+ }
+
+ @Override
+ public List