Skip to content

Commit

Permalink
refactor: store function expr in flatmap step (#3707)
Browse files Browse the repository at this point in the history
Minor refactor to have the flat map exec step store the function call
rather than the applier.
  • Loading branch information
rodesai authored Nov 3, 2019
1 parent f05e262 commit 7c6076c
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,12 @@
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.engine.rewrite.StatementRewriteForRowtime;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.codegen.CodeGenRunner;
import io.confluent.ksql.execution.codegen.ExpressionMetadata;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.context.QueryLoggerUtil;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.FunctionCall;
import io.confluent.ksql.execution.function.UdtfUtil;
import io.confluent.ksql.execution.function.udtf.TableFunctionApplier;
import io.confluent.ksql.execution.plan.AbstractStreamSource;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.ExecutionStepProperties;
Expand All @@ -55,7 +51,6 @@
import io.confluent.ksql.execution.plan.WindowedStreamSource;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.function.KsqlTableFunction;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.metastore.model.KeyField.LegacyField;
Expand All @@ -72,7 +67,6 @@
import io.confluent.ksql.util.IdentifierUtil;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.SchemaUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -691,33 +685,13 @@ public SchemaKStream<K> flatMap(
final List<FunctionCall> tableFunctions,
final QueryContext.Stacker contextStacker
) {
final List<TableFunctionApplier> tableFunctionAppliers = new ArrayList<>(tableFunctions.size());
final CodeGenRunner codeGenRunner =
new CodeGenRunner(getSchema(), ksqlConfig, functionRegistry);
for (FunctionCall functionCall: tableFunctions) {
final List<ExpressionMetadata> expressionMetadataList = new ArrayList<>(
functionCall.getArguments().size());
for (Expression expression : functionCall.getArguments()) {
final ExpressionMetadata expressionMetadata =
codeGenRunner.buildCodeGenFromParseTree(expression, "Table function");
expressionMetadataList.add(expressionMetadata);
}
final KsqlTableFunction tableFunction = UdtfUtil.resolveTableFunction(
functionRegistry,
functionCall,
getSchema()
);
final TableFunctionApplier tableFunctionApplier =
new TableFunctionApplier(tableFunction, expressionMetadataList);
tableFunctionAppliers.add(tableFunctionApplier);
}
final StreamFlatMap<K> step = ExecutionStepFactory.streamFlatMap(
contextStacker,
sourceStep,
outputSchema,
tableFunctionAppliers
tableFunctions
);
return new SchemaKStream<K>(
return new SchemaKStream<>(
step,
keyFormat,
keyField,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package io.confluent.ksql.execution.plan;

import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.execution.function.udtf.TableFunctionApplier;
import io.confluent.ksql.execution.expression.tree.FunctionCall;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand All @@ -25,16 +25,16 @@ public class StreamFlatMap<K> implements ExecutionStep<KStreamHolder<K>> {

private final ExecutionStepProperties properties;
private final ExecutionStep<KStreamHolder<K>> source;
private final List<TableFunctionApplier> tableFunctionAppliers;
private final List<FunctionCall> tableFunctions;

public StreamFlatMap(
final ExecutionStepProperties properties,
final ExecutionStep<KStreamHolder<K>> source,
final List<TableFunctionApplier> tableFunctionAppliers
final List<FunctionCall> tableFunctionAppliers
) {
this.properties = Objects.requireNonNull(properties, "properties");
this.source = Objects.requireNonNull(source, "source");
this.tableFunctionAppliers = Objects.requireNonNull(tableFunctionAppliers);
this.tableFunctions = Objects.requireNonNull(tableFunctionAppliers);
}

@Override
Expand All @@ -52,8 +52,8 @@ public KStreamHolder<K> build(final PlanBuilder builder) {
return builder.visitFlatMap(this);
}

public List<TableFunctionApplier> getTableFunctionAppliers() {
return tableFunctionAppliers;
public List<FunctionCall> getTableFunctions() {
return tableFunctions;
}

public ExecutionStep<KStreamHolder<K>> getSource() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.FunctionCall;
import io.confluent.ksql.execution.function.udtf.TableFunctionApplier;
import io.confluent.ksql.execution.plan.DefaultExecutionStepProperties;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.Formats;
Expand Down Expand Up @@ -147,13 +146,13 @@ public static <K> StreamFlatMap<K> streamFlatMap(
final QueryContext.Stacker stacker,
final ExecutionStep<KStreamHolder<K>> source,
final LogicalSchema resultSchema,
final List<TableFunctionApplier> tableFunctionAppliers
final List<FunctionCall> tableFunctions
) {
final QueryContext queryContext = stacker.getQueryContext();
return new StreamFlatMap<>(
new DefaultExecutionStepProperties(resultSchema, queryContext),
source,
tableFunctionAppliers
tableFunctions
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public <K> KStreamHolder<K> visitStreamMapValues(
@Override
public <K> KStreamHolder<K> visitFlatMap(final StreamFlatMap<K> streamFlatMap) {
final KStreamHolder<K> source = streamFlatMap.getSource().build(this);
return StreamFlatMapBuilder.build(source, streamFlatMap);
return StreamFlatMapBuilder.build(source, streamFlatMap, queryBuilder);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,20 @@

package io.confluent.ksql.execution.streams;

import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.codegen.CodeGenRunner;
import io.confluent.ksql.execution.codegen.ExpressionMetadata;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.FunctionCall;
import io.confluent.ksql.execution.function.UdtfUtil;
import io.confluent.ksql.execution.function.udtf.KudtfFlatMapper;
import io.confluent.ksql.execution.function.udtf.TableFunctionApplier;
import io.confluent.ksql.execution.plan.KStreamHolder;
import io.confluent.ksql.execution.plan.StreamFlatMap;
import io.confluent.ksql.function.KsqlTableFunction;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.util.ArrayList;
import java.util.List;

public final class StreamFlatMapBuilder {

Expand All @@ -26,9 +37,32 @@ private StreamFlatMapBuilder() {

public static <K> KStreamHolder<K> build(
final KStreamHolder<K> stream,
final StreamFlatMap<K> step) {
final StreamFlatMap<K> step,
final KsqlQueryBuilder queryBuilder) {
final List<FunctionCall> tableFunctions = step.getTableFunctions();
final LogicalSchema schema = step.getSource().getSchema();
final List<TableFunctionApplier> tableFunctionAppliers = new ArrayList<>(tableFunctions.size());
final CodeGenRunner codeGenRunner =
new CodeGenRunner(schema, queryBuilder.getKsqlConfig(), queryBuilder.getFunctionRegistry());
for (FunctionCall functionCall: tableFunctions) {
final List<ExpressionMetadata> expressionMetadataList = new ArrayList<>(
functionCall.getArguments().size());
for (Expression expression : functionCall.getArguments()) {
final ExpressionMetadata expressionMetadata =
codeGenRunner.buildCodeGenFromParseTree(expression, "Table function");
expressionMetadataList.add(expressionMetadata);
}
final KsqlTableFunction tableFunction = UdtfUtil.resolveTableFunction(
queryBuilder.getFunctionRegistry(),
functionCall,
schema
);
final TableFunctionApplier tableFunctionApplier =
new TableFunctionApplier(tableFunction, expressionMetadataList);
tableFunctionAppliers.add(tableFunctionApplier);
}
return stream.withStream(stream.getStream().flatMapValues(
new KudtfFlatMapper(step.getTableFunctionAppliers())));
new KudtfFlatMapper(tableFunctionAppliers))
);
}

}

0 comments on commit 7c6076c

Please sign in to comment.