Skip to content

Commit

Permalink
fix: reintroduce FetchFieldFromStruct as a public UDF (#4185)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Dec 23, 2019
1 parent 9b89a21 commit a50a665
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,10 @@ public Pair<String, Schema> visitFunctionCall(
private Schema getFunctionReturnSchema(
final FunctionCall node
) {
if (node.getName().equals(FetchFieldFromStruct.FUNCTION_NAME)) {
return expressionTypeManager.getExpressionSchema(node);
}

final UdfFactory udfFactory = functionRegistry.getUdfFactory(node.getName().name());
final List<Schema> argumentSchemas = node.getArguments().stream()
.map(expressionTypeManager::getExpressionSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.confluent.ksql.execution.expression.tree.Type;
import io.confluent.ksql.execution.expression.tree.WhenClause;
import io.confluent.ksql.execution.function.UdafUtil;
import io.confluent.ksql.execution.function.udf.structfieldextractor.FetchFieldFromStruct;
import io.confluent.ksql.function.AggregateFunctionInitArguments;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.function.KsqlAggregateFunction;
Expand Down Expand Up @@ -407,6 +408,20 @@ public Void visitFunctionCall(
return null;
}

if (node.getName().equals(FetchFieldFromStruct.FUNCTION_NAME)) {
process(node.getArguments().get(0), expressionTypeContext);
final Schema firstArgSchema = expressionTypeContext.getSchema();
final String fieldName = ((StringLiteral) node.getArguments().get(1)).getValue();
if (firstArgSchema.field(fieldName) == null) {
throw new KsqlException(String.format("Could not find field %s in %s.",
fieldName,
node.getArguments().get(0).toString()));
}
final Schema returnSchema = firstArgSchema.field(fieldName).schema();
expressionTypeContext.setSchema(returnSchema);
return null;
}

final UdfFactory udfFactory = functionRegistry.getUdfFactory(node.getName().name());
final UdfMetadata metadata = udfFactory.getMetadata();
if (metadata.isInternal()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,19 +297,18 @@ public void shouldHandleStructFieldDereference() {
}

@Test
public void shouldThrowOnFetchFieldFromStructFunctionCall() {
public void shouldHandleFetchFieldFromStructFunctionCall() {
// Given:
final Expression expression = new FunctionCall(
FetchFieldFromStruct.FUNCTION_NAME,
ImmutableList.of(ADDRESS, new StringLiteral("NUMBER"))
);

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Can't find any functions with the name 'FETCH_FIELD_FROM_STRUCT'");

// When:
expressionTypeManager.getExpressionSqlType(expression);
final SqlType sqlType = expressionTypeManager.getExpressionSqlType(expression);

// Then:
assertThat(sqlType, is(SqlTypes.BIGINT));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"comments": [
"External usage of FETCH_FIELD_FROM_STRUCT"
],
"tests": [
{
"name": "Fetch Field",
"statements": [
"CREATE STREAM TEST (s STRUCT<val VARCHAR>) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT as SELECT FETCH_FIELD_FROM_STRUCT(s, 'VAL') AS value FROM test;"
],
"inputs": [
{"topic": "test_topic", "key": "foo", "value": {"s": {"val": "foo"}}}
],
"outputs": [
{"topic": "OUTPUT", "key": "foo", "value": {"VALUE": "foo"}}
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -1821,17 +1821,6 @@
"key": 0
}
]
},
{
"name": "direct use of FETCH_FIELD_FROM_STRUCT",
"statements": [
"CREATE STREAM input (s STRUCT<f0 BIGINT>) WITH (kafka_topic='input_topic', value_format='JSON');",
"CREATE STREAM output AS SELECT FETCH_FIELD_FROM_STRUCT(s, 'f0') FROM input;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Can't find any functions with the name 'FETCH_FIELD_FROM_STRUCT'"
}
}
]
}

0 comments on commit a50a665

Please sign in to comment.