diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index 1d4eac929b42..e66395324cb9 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -23,16 +23,12 @@ import com.google.common.collect.ImmutableList; import io.confluent.ksql.engine.rewrite.StatementRewriteForRowtime; import io.confluent.ksql.execution.builder.KsqlQueryBuilder; -import io.confluent.ksql.execution.codegen.CodeGenRunner; -import io.confluent.ksql.execution.codegen.ExpressionMetadata; import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.context.QueryLoggerUtil; import io.confluent.ksql.execution.ddl.commands.KsqlTopic; import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.execution.expression.tree.FunctionCall; -import io.confluent.ksql.execution.function.UdtfUtil; -import io.confluent.ksql.execution.function.udtf.TableFunctionApplier; import io.confluent.ksql.execution.plan.AbstractStreamSource; import io.confluent.ksql.execution.plan.ExecutionStep; import io.confluent.ksql.execution.plan.ExecutionStepProperties; @@ -55,7 +51,6 @@ import io.confluent.ksql.execution.plan.WindowedStreamSource; import io.confluent.ksql.execution.streams.ExecutionStepFactory; import io.confluent.ksql.function.FunctionRegistry; -import io.confluent.ksql.function.KsqlTableFunction; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.metastore.model.KeyField; import io.confluent.ksql.metastore.model.KeyField.LegacyField; @@ -72,7 +67,6 @@ import io.confluent.ksql.util.IdentifierUtil; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.SchemaUtil; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -691,33 +685,13 @@ public SchemaKStream flatMap( final List tableFunctions, final QueryContext.Stacker contextStacker ) { - final List tableFunctionAppliers = new ArrayList<>(tableFunctions.size()); - final CodeGenRunner codeGenRunner = - new CodeGenRunner(getSchema(), ksqlConfig, functionRegistry); - for (FunctionCall functionCall: tableFunctions) { - final List expressionMetadataList = new ArrayList<>( - functionCall.getArguments().size()); - for (Expression expression : functionCall.getArguments()) { - final ExpressionMetadata expressionMetadata = - codeGenRunner.buildCodeGenFromParseTree(expression, "Table function"); - expressionMetadataList.add(expressionMetadata); - } - final KsqlTableFunction tableFunction = UdtfUtil.resolveTableFunction( - functionRegistry, - functionCall, - getSchema() - ); - final TableFunctionApplier tableFunctionApplier = - new TableFunctionApplier(tableFunction, expressionMetadataList); - tableFunctionAppliers.add(tableFunctionApplier); - } final StreamFlatMap step = ExecutionStepFactory.streamFlatMap( contextStacker, sourceStep, outputSchema, - tableFunctionAppliers + tableFunctions ); - return new SchemaKStream( + return new SchemaKStream<>( step, keyFormat, keyField, diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamFlatMap.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamFlatMap.java index f33a335f96dd..576f24f14a66 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamFlatMap.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamFlatMap.java @@ -15,7 +15,7 @@ package io.confluent.ksql.execution.plan; import com.google.errorprone.annotations.Immutable; -import io.confluent.ksql.execution.function.udtf.TableFunctionApplier; +import io.confluent.ksql.execution.expression.tree.FunctionCall; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -25,16 +25,16 @@ public class StreamFlatMap implements ExecutionStep> { private final ExecutionStepProperties properties; private final ExecutionStep> source; - private final List tableFunctionAppliers; + private final List tableFunctions; public StreamFlatMap( final ExecutionStepProperties properties, final ExecutionStep> source, - final List tableFunctionAppliers + final List tableFunctionAppliers ) { this.properties = Objects.requireNonNull(properties, "properties"); this.source = Objects.requireNonNull(source, "source"); - this.tableFunctionAppliers = Objects.requireNonNull(tableFunctionAppliers); + this.tableFunctions = Objects.requireNonNull(tableFunctionAppliers); } @Override @@ -52,8 +52,8 @@ public KStreamHolder build(final PlanBuilder builder) { return builder.visitFlatMap(this); } - public List getTableFunctionAppliers() { - return tableFunctionAppliers; + public List getTableFunctions() { + return tableFunctions; } public ExecutionStep> getSource() { diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java index fe26cfd8576c..09f84f0e6d08 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java @@ -19,7 +19,6 @@ import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.execution.expression.tree.FunctionCall; -import io.confluent.ksql.execution.function.udtf.TableFunctionApplier; import io.confluent.ksql.execution.plan.DefaultExecutionStepProperties; import io.confluent.ksql.execution.plan.ExecutionStep; import io.confluent.ksql.execution.plan.Formats; @@ -147,13 +146,13 @@ public static StreamFlatMap streamFlatMap( final QueryContext.Stacker stacker, final ExecutionStep> source, final LogicalSchema resultSchema, - final List tableFunctionAppliers + final List tableFunctions ) { final QueryContext queryContext = stacker.getQueryContext(); return new StreamFlatMap<>( new DefaultExecutionStepProperties(resultSchema, queryContext), source, - tableFunctionAppliers + tableFunctions ); } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java index 5456f0d60284..4d711edefb69 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java @@ -130,7 +130,7 @@ public KStreamHolder visitStreamMapValues( @Override public KStreamHolder visitFlatMap(final StreamFlatMap streamFlatMap) { final KStreamHolder source = streamFlatMap.getSource().build(this); - return StreamFlatMapBuilder.build(source, streamFlatMap); + return StreamFlatMapBuilder.build(source, streamFlatMap, queryBuilder); } @Override diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamFlatMapBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamFlatMapBuilder.java index a64d5f7c486e..92cfb0953c74 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamFlatMapBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamFlatMapBuilder.java @@ -15,9 +15,20 @@ package io.confluent.ksql.execution.streams; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.codegen.CodeGenRunner; +import io.confluent.ksql.execution.codegen.ExpressionMetadata; +import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.expression.tree.FunctionCall; +import io.confluent.ksql.execution.function.UdtfUtil; import io.confluent.ksql.execution.function.udtf.KudtfFlatMapper; +import io.confluent.ksql.execution.function.udtf.TableFunctionApplier; import io.confluent.ksql.execution.plan.KStreamHolder; import io.confluent.ksql.execution.plan.StreamFlatMap; +import io.confluent.ksql.function.KsqlTableFunction; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import java.util.ArrayList; +import java.util.List; public final class StreamFlatMapBuilder { @@ -26,9 +37,32 @@ private StreamFlatMapBuilder() { public static KStreamHolder build( final KStreamHolder stream, - final StreamFlatMap step) { + final StreamFlatMap step, + final KsqlQueryBuilder queryBuilder) { + final List tableFunctions = step.getTableFunctions(); + final LogicalSchema schema = step.getSource().getSchema(); + final List tableFunctionAppliers = new ArrayList<>(tableFunctions.size()); + final CodeGenRunner codeGenRunner = + new CodeGenRunner(schema, queryBuilder.getKsqlConfig(), queryBuilder.getFunctionRegistry()); + for (FunctionCall functionCall: tableFunctions) { + final List expressionMetadataList = new ArrayList<>( + functionCall.getArguments().size()); + for (Expression expression : functionCall.getArguments()) { + final ExpressionMetadata expressionMetadata = + codeGenRunner.buildCodeGenFromParseTree(expression, "Table function"); + expressionMetadataList.add(expressionMetadata); + } + final KsqlTableFunction tableFunction = UdtfUtil.resolveTableFunction( + queryBuilder.getFunctionRegistry(), + functionCall, + schema + ); + final TableFunctionApplier tableFunctionApplier = + new TableFunctionApplier(tableFunction, expressionMetadataList); + tableFunctionAppliers.add(tableFunctionApplier); + } return stream.withStream(stream.getStream().flatMapValues( - new KudtfFlatMapper(step.getTableFunctionAppliers()))); + new KudtfFlatMapper(tableFunctionAppliers)) + ); } - } \ No newline at end of file