Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement EXPLODE(ARRAY) for single table function in SELECT #3589

Merged
merged 44 commits into from
Oct 25, 2019
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
a503028
Apply patch from prototype
Oct 9, 2019
0a726ca
Apply patch from prototype2
Oct 9, 2019
42b1a6d
Remove unnecessary files
Oct 9, 2019
4af2b53
renamed some functions
Oct 9, 2019
39d1880
Some improvements to explode factory
Oct 10, 2019
3cb190d
Added test for ExplodeArrayTableFunction
Oct 10, 2019
324862b
Some refactoring
Oct 11, 2019
5edcff7
Another test
Oct 12, 2019
ccfffee
feat: Some improvements to tests
Oct 15, 2019
1d6a015
chore: resolved merge conflict
Oct 16, 2019
8184307
fix: remove check for optional return value
Oct 16, 2019
c3f5923
chore: updates after review
Oct 18, 2019
f9b9560
refactor: Simplify synchronization of InternalFunctionRegistry
Oct 20, 2019
8981e2d
chore: review comments
Oct 20, 2019
8a3238a
chore: review comments
Oct 20, 2019
22a7c2b
chore: tweak to tests
Oct 21, 2019
267ac6d
refactor: Improved flat mapping setup logic and added some more javadoc
Oct 21, 2019
7d6ac8c
chore: Use consistent generated column name
Oct 21, 2019
285eb01
refactor: PlanNode now encapsulates select expressions as these can c…
Oct 21, 2019
b11fc2e
chore: null param checking
Oct 22, 2019
46107f7
fix: fixed test
Oct 22, 2019
167d9b7
refactor: Make TableFunctionAnalysis into an interface
Oct 22, 2019
01d6bd1
chore: Make BaseAggregateFunction and BaseTableFunction Immutable
Oct 22, 2019
64e0e58
chore: Use KsqlConstants.CONFLUENT_AUTHOR instead of hardcoded Confluent
Oct 22, 2019
1310db6
chore: provide Description and version for ExplodeFunctionFactory
Oct 22, 2019
8195bba
fix: Traverse FlatMapNode in PlanSourceExtractorVisitor
Oct 22, 2019
fd7c9fb
refactor: Move TableFunctionExpressionWriter and AggregateExpressionR…
Oct 22, 2019
214b4ef
chore: removed TODOs
Oct 22, 2019
7c6b9bc
chore: moved creation of Aggregate and table function factories into …
Oct 22, 2019
f1f3d5c
chore: Refactored test
Oct 22, 2019
4dada0d
chore: null check
Oct 22, 2019
dc03181
chore: combined udtf.json test cases into one
Oct 22, 2019
28b3208
fix: Add check and test for more than one table function
Oct 22, 2019
a668aa5
fix: remove unused import
Oct 23, 2019
814a8b9
fix: Use passed through select expressions plus fix some tests
Oct 23, 2019
7c9b6b9
chore: merged with master
Oct 23, 2019
e0cfc64
refactor: Removed TableFunctionAnalyzer and TableFunctionAnalysis - t…
Oct 23, 2019
6331a35
refactor: Use different naming for synthesised intermediate schema co…
Oct 23, 2019
9bcb561
fix: changes after review comments
Oct 23, 2019
6d4547f
fix: Added check for non aggregates with table functions and test
Oct 25, 2019
11ccfa8
chore: Added null check
Oct 25, 2019
e38222b
fix: Add check if udtf with same name already exists
Oct 25, 2019
a208b63
chore: revie updates
Oct 25, 2019
99cbff6
chore: not null check
Oct 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
826 changes: 826 additions & 0 deletions diff.txt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.function.udf.UdfMetadata;
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.util.KsqlConstants;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
Expand All @@ -38,7 +39,14 @@ public abstract class AggregateFunctionFactory {
.build();

public AggregateFunctionFactory(final String functionName) {
this(new UdfMetadata(functionName, "", "confluent", "", KsqlFunction.INTERNAL_PATH, false));
this(new UdfMetadata(
functionName,
"",
KsqlConstants.CONFLUENT_AUTHOR,
"",
KsqlFunction.INTERNAL_PATH,
false
));
}

public AggregateFunctionFactory(final UdfMetadata metadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ public interface FunctionRegistry {
*/
boolean isAggregate(String functionName);

/**
* Test if the supplied {@code functionName} is a table function.
*
* <p>Note: unknown functions result in {@code false} return value.
*
* @param functionName the name of the function to test
* @return {@code true} if it is a table function, {@code false} otherwise.
*/
boolean isTableFunction(String functionName);
purplefox marked this conversation as resolved.
Show resolved Hide resolved

/**
* Get the factory for a UDF.
*
Expand Down Expand Up @@ -72,6 +82,8 @@ public interface FunctionRegistry {
KsqlAggregateFunction<?, ?, ?> getAggregateFunction(String functionName, Schema argumentType,
AggregateFunctionInitArguments initArgs);

KsqlTableFunction<?, ?> getTableFunction(String functionName, Schema argumentType);
purplefox marked this conversation as resolved.
Show resolved Hide resolved
purplefox marked this conversation as resolved.
Show resolved Hide resolved

/**
* @return all UDF factories.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ public interface FunctionSignature {
* @return whether or not to consider the final argument in
* {@link #getArguments()} as variadic
*/
boolean isVariadic();
default boolean isVariadic() {
return false;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,4 @@ public interface KsqlAggregateFunction<I, A, O> extends FunctionSignature {
Function<A, O> getResultMapper();

String getDescription();

@Override
default boolean isVariadic() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function;

import io.confluent.ksql.schema.ksql.types.SqlType;
import java.util.List;
import org.apache.kafka.connect.data.Schema;

public interface KsqlTableFunction<I, O> extends FunctionSignature {

Schema getReturnType();
purplefox marked this conversation as resolved.
Show resolved Hide resolved

SqlType returnType();
purplefox marked this conversation as resolved.
Show resolved Hide resolved

List<O> flatMap(List<I> input);
purplefox marked this conversation as resolved.
Show resolved Hide resolved

String getDescription();
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ public interface MutableFunctionRegistry extends FunctionRegistry {
* @throws KsqlException if a function, (of any type), with the same name exists.
*/
void addAggregateFunctionFactory(AggregateFunctionFactory aggregateFunctionFactory);

void addTableFunctionFactory(TableFunctionFactory tableFunctionFactory);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function;

import io.confluent.ksql.function.udf.UdfMetadata;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.connect.data.Schema;


public abstract class TableFunctionFactory {
purplefox marked this conversation as resolved.
Show resolved Hide resolved
agavra marked this conversation as resolved.
Show resolved Hide resolved

private final UdfMetadata metadata;

public TableFunctionFactory(final UdfMetadata metadata) {
this.metadata = Objects.requireNonNull(metadata, "metadata can't be null");
}

public abstract KsqlTableFunction<?, ?> createTableFunction(List<Schema> argTypeList);

protected abstract List<List<Schema>> supportedArgs();

public String getName() {
return metadata.getName();
}

public String getDescription() {
return metadata.getDescription();
}

public String getPath() {
return metadata.getPath();
}

public String getAuthor() {
return metadata.getAuthor();
}

public String getVersion() {
return metadata.getVersion();
}

public boolean isInternal() {
return metadata.isInternal();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@
public final class ColumnName extends Name<ColumnName> {

private static final String AGGREGATE_COLUMN_PREFIX = "KSQL_AGG_VARIABLE_";
private static final String GENERATED_COLUMN_PREFIX = "KSQL_COL_";

public static ColumnName aggregate(final int idx) {
public static ColumnName aggregateColumn(final int idx) {
return of(AGGREGATE_COLUMN_PREFIX + idx);
}

public static ColumnName generatedColumnName(final int idx) {
return ColumnName.of(GENERATED_COLUMN_PREFIX + idx);
agavra marked this conversation as resolved.
Show resolved Hide resolved
}

public static ColumnName of(final String name) {
return new ColumnName(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.util;
package io.confluent.ksql.analyzer;

import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter;
import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter.Context;
Expand Down Expand Up @@ -45,7 +45,7 @@ public Optional<Expression> visitFunctionCall(
final ExpressionTreeRewriter.Context<Void> context) {
final String functionName = node.getName().name();
if (functionRegistry.isAggregate(functionName)) {
final ColumnName aggVarName = ColumnName.aggregate(aggVariableIndex);
final ColumnName aggVarName = ColumnName.aggregateColumn(aggVariableIndex);
aggVariableIndex++;
return Optional.of(
new ColumnReferenceExp(node.getLocation(), ColumnRef.withoutSource(aggVarName)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.analyzer;

import io.confluent.ksql.execution.expression.tree.FunctionCall;
import io.confluent.ksql.execution.plan.SelectExpression;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class MutableTableFunctionAnalysis implements TableFunctionAnalysis {

private final List<FunctionCall> tableFunctions = new ArrayList<>();
private final List<SelectExpression> selectExpressions = new ArrayList<>();

@Override
public List<FunctionCall> getTableFunctions() {
return Collections.unmodifiableList(tableFunctions);
}

@Override
public List<SelectExpression> getFinalSelectExpressions() {
return selectExpressions;
}

void addTableFunction(final FunctionCall functionCall) {
tableFunctions.add(Objects.requireNonNull(functionCall));
}

void addFinalSelectExpression(final SelectExpression selectExpression) {
this.selectExpressions.add(Objects.requireNonNull(selectExpression));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.util.AggregateExpressionRewriter;
import io.confluent.ksql.util.KsqlException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
purplefox marked this conversation as resolved.
Show resolved Hide resolved
public class QueryAnalyzer {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private final Analyzer analyzer;
private final MetaStore metaStore;
Expand Down Expand Up @@ -130,6 +131,27 @@ public AggregateAnalysis analyzeAggregate(final Query query, final Analysis anal
return aggregateAnalysis;
}

public TableFunctionAnalysis analyzeTableFunctions(final Analysis analysis) {
purplefox marked this conversation as resolved.
Show resolved Hide resolved
final MutableTableFunctionAnalysis tableFunctionAnalysis = new MutableTableFunctionAnalysis();
final TableFunctionAnalyzer tableFunctionAnalyzer =
new TableFunctionAnalyzer(tableFunctionAnalysis, metaStore);
final TableFunctionExpressionRewriter tableFunctionExpressionRewriter =
new TableFunctionExpressionRewriter(metaStore);

processSelectExpressionsForTableFunctionAnalysis(
analysis,
tableFunctionAnalysis,
tableFunctionAnalyzer,
tableFunctionExpressionRewriter
);

if (tableFunctionAnalysis.getTableFunctions().size() > 1) {
throw new KsqlException("Only one table function per query currently is supported");
}

return tableFunctionAnalysis;
}

private static void processHavingExpression(
final Expression having,
final MutableAggregateAnalysis aggregateAnalysis,
Expand Down Expand Up @@ -166,6 +188,24 @@ private static void processSelectExpressions(
}
}

private static void processSelectExpressionsForTableFunctionAnalysis(
final Analysis analysis,
final MutableTableFunctionAnalysis tableFunctionAnalysis,
final TableFunctionAnalyzer tableFunctionAnalyzer,
final TableFunctionExpressionRewriter tableFunctionExpressionRewriter
) {
for (final SelectExpression select : analysis.getSelectExpressions()) {
final Expression exp = select.getExpression();
tableFunctionAnalyzer.processSelect(exp);

tableFunctionAnalysis.addFinalSelectExpression(
purplefox marked this conversation as resolved.
Show resolved Hide resolved
SelectExpression.of(
select.getAlias(),
ExpressionTreeRewriter.rewriteWith(
tableFunctionExpressionRewriter::process, exp)));
}
}

private static void enforceAggregateRules(
final Query query,
final Analysis analysis,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.analyzer;

import io.confluent.ksql.execution.expression.tree.FunctionCall;
import io.confluent.ksql.execution.plan.SelectExpression;
import java.util.List;

/**
* Encapsulates data that's been extracted from a query related to table functions.
*/
public interface TableFunctionAnalysis {

List<FunctionCall> getTableFunctions();

List<SelectExpression> getFinalSelectExpressions();
}
Loading