diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java index 7b14f2ba005a..23f840bf1694 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java @@ -366,6 +366,10 @@ public Pair visitFunctionCall( private Schema getFunctionReturnSchema( final FunctionCall node ) { + if (node.getName().equals(FetchFieldFromStruct.FUNCTION_NAME)) { + return expressionTypeManager.getExpressionSchema(node); + } + final UdfFactory udfFactory = functionRegistry.getUdfFactory(node.getName().name()); final List argumentSchemas = node.getArguments().stream() .map(expressionTypeManager::getExpressionSchema) diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java index 07ee31929203..a07d1582b71e 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java @@ -48,6 +48,7 @@ import io.confluent.ksql.execution.expression.tree.Type; import io.confluent.ksql.execution.expression.tree.WhenClause; import io.confluent.ksql.execution.function.UdafUtil; +import io.confluent.ksql.execution.function.udf.structfieldextractor.FetchFieldFromStruct; import io.confluent.ksql.function.AggregateFunctionInitArguments; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.KsqlAggregateFunction; @@ -407,6 +408,20 @@ public Void visitFunctionCall( return null; } + if (node.getName().equals(FetchFieldFromStruct.FUNCTION_NAME)) { + process(node.getArguments().get(0), expressionTypeContext); + final Schema firstArgSchema = expressionTypeContext.getSchema(); + final String fieldName = ((StringLiteral) node.getArguments().get(1)).getValue(); + if (firstArgSchema.field(fieldName) == null) { + throw new KsqlException(String.format("Could not find field %s in %s.", + fieldName, + node.getArguments().get(0).toString())); + } + final Schema returnSchema = firstArgSchema.field(fieldName).schema(); + expressionTypeContext.setSchema(returnSchema); + return null; + } + final UdfFactory udfFactory = functionRegistry.getUdfFactory(node.getName().name()); final UdfMetadata metadata = udfFactory.getMetadata(); if (metadata.isInternal()) { diff --git a/ksql-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java b/ksql-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java index f718a25b467d..ce6fbfe5a015 100644 --- a/ksql-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java +++ b/ksql-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java @@ -297,19 +297,18 @@ public void shouldHandleStructFieldDereference() { } @Test - public void shouldThrowOnFetchFieldFromStructFunctionCall() { + public void shouldHandleFetchFieldFromStructFunctionCall() { // Given: final Expression expression = new FunctionCall( FetchFieldFromStruct.FUNCTION_NAME, ImmutableList.of(ADDRESS, new StringLiteral("NUMBER")) ); - // Then: - expectedException.expect(KsqlException.class); - expectedException.expectMessage("Can't find any functions with the name 'FETCH_FIELD_FROM_STRUCT'"); - // When: - expressionTypeManager.getExpressionSqlType(expression); + final SqlType sqlType = expressionTypeManager.getExpressionSqlType(expression); + + // Then: + assertThat(sqlType, is(SqlTypes.BIGINT)); } @Test diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/fetch-field-from-struct.json b/ksql-functional-tests/src/test/resources/query-validation-tests/fetch-field-from-struct.json new file mode 100644 index 000000000000..52fd58af776f --- /dev/null +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/fetch-field-from-struct.json @@ -0,0 +1,20 @@ +{ + "comments": [ + "External usage of FETCH_FIELD_FROM_STRUCT" + ], + "tests": [ + { + "name": "Fetch Field", + "statements": [ + "CREATE STREAM TEST (s STRUCT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT as SELECT FETCH_FIELD_FROM_STRUCT(s, 'VAL') AS value FROM test;" + ], + "inputs": [ + {"topic": "test_topic", "key": "foo", "value": {"s": {"val": "foo"}}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "foo", "value": {"VALUE": "foo"}} + ] + } + ] +} diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/simple-struct.json b/ksql-functional-tests/src/test/resources/query-validation-tests/simple-struct.json index fc4906db3278..6c0b66a07a1e 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/simple-struct.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/simple-struct.json @@ -1821,17 +1821,6 @@ "key": 0 } ] - }, - { - "name": "direct use of FETCH_FIELD_FROM_STRUCT", - "statements": [ - "CREATE STREAM input (s STRUCT) WITH (kafka_topic='input_topic', value_format='JSON');", - "CREATE STREAM output AS SELECT FETCH_FIELD_FROM_STRUCT(s, 'f0') FROM input;" - ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Can't find any functions with the name 'FETCH_FIELD_FROM_STRUCT'" - } } ] } \ No newline at end of file