Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: allow structs in schema provider return types #5287

Merged
merged 2 commits into from
May 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@

public final class StructType extends ObjectType {

/**
* An empty struct accepts any struct as an instantiation.
*/
public static final StructType ANY_STRUCT = StructType.builder().build();

private final ImmutableMap<String, ParamType> schema;

private StructType(final Map<String, ParamType> schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,20 @@ public void shouldLoadFunctionWithSchemaProvider() {
assertThat(function.getReturnType(args), equalTo(decimal));
}

@Test
public void shouldLoadFunctionWithStructSchemaProvider() {
// Given:
final UdfFactory returnDecimal = FUNC_REG.getUdfFactory(FunctionName.of("KsqlStructUdf"));

// When:
final List<SqlType> args = ImmutableList.of();
final KsqlScalarFunction function = returnDecimal.getFunction(args);

// Then:
assertThat(function.getReturnType(args), equalTo(KsqlStructUdf.RETURN));
}


@Test
public void shouldLoadFunctionWithNestedDecimalSchema() {
// Given:
Expand All @@ -270,7 +284,6 @@ public void shouldLoadFunctionWithNestedDecimalSchema() {
equalTo(SqlStruct.builder().field("VAL", SqlDecimal.of(64, 2)).build()));
}


@Test
public void shouldThrowOnReturnTypeMismatch() {
// Given:
Expand Down Expand Up @@ -1391,6 +1404,25 @@ public Struct getDecimalStruct() {
}
}

@UdfDescription(
name = "KsqlStructUdf",
description = "A test-only UDF for testing struct return types")
public static class KsqlStructUdf {

private static final SqlStruct RETURN =
SqlStruct.builder().field("VAL", SqlTypes.STRING).build();

@UdfSchemaProvider
public SqlType provide(final List<SqlType> params) {
return RETURN;
}

@Udf(schemaProvider = "provide")
public Struct getDecimalStruct() {
return null;
}
}

@SuppressWarnings({"unused", "MethodMayBeStatic"}) // Invoked via reflection in test.
@UdfDescription(
name = "ReturnIncompatible",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,20 @@

package io.confluent.ksql.function.udf;

import io.confluent.ksql.schema.ksql.types.SqlStruct;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import java.util.List;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;

@UdfDescription(name="test_udf", description = "test")
@SuppressWarnings("unused")
public class TestUdf {

private static final SqlStruct RETURN =
SqlStruct.builder().field("A", SqlTypes.STRING).build();

@Udf(description = "returns the method name")
public String doStuffIntString(final int arg1, final String arg2) {
return "doStuffIntString";
Expand All @@ -47,4 +55,16 @@ public String doStuffStruct(
) {
return struct.getString("A");
}

@Udf(description = "returns the value of 'STRUCT<A VARCHAR>'", schemaProvider = "structProvider")
public Struct returnStructStuff() {
return new Struct(
SchemaBuilder.struct().field("A", SchemaBuilder.OPTIONAL_STRING_SCHEMA).optional().build()
).put("A", "foo");
}

@UdfSchemaProvider
public SqlType structProvider(final List<SqlType> params) {
return RETURN;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.ksql.function.types.ParamType;
import io.confluent.ksql.function.types.ParamTypes;
import io.confluent.ksql.function.types.StringType;
import io.confluent.ksql.function.types.StructType;
import io.confluent.ksql.name.FunctionName;
import io.confluent.ksql.util.KsqlException;
import java.lang.reflect.GenericArrayType;
Expand All @@ -32,6 +33,7 @@
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Struct;

public final class UdfUtil {

Expand Down Expand Up @@ -104,7 +106,9 @@ public static ParamType getSchemaFromType(final Type type) {
return schema;
}

// CHECKSTYLE_RULES.OFF: CyclomaticComplexity
private static ParamType handleParameterizedType(final Type type) {
// CHECKSTYLE_RULES.ON: CyclomaticComplexity
if (type instanceof ParameterizedType) {
final ParameterizedType parameterizedType = (ParameterizedType) type;
if (parameterizedType.getRawType() == Map.class) {
Expand Down Expand Up @@ -133,6 +137,11 @@ private static ParamType handleParameterizedType(final Type type) {
return ArrayType.of(
GenericType.of(
((GenericArrayType) type).getGenericComponentType().getTypeName()));
} else if (type instanceof Class<?> && Struct.class.isAssignableFrom((Class<?>) type)) {
// we don't have enough information here to return a more specific type of struct,
// but there are other parts of the code that enforce having a schema provider or
// schema annotation if a struct is being used
return StructType.ANY_STRUCT;
}

throw new KsqlException("Type inference is not supported for: " + type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import io.confluent.ksql.function.types.MapType;
import io.confluent.ksql.function.types.ParamType;
import io.confluent.ksql.function.types.ParamTypes;
import io.confluent.ksql.function.types.StructType;
import io.confluent.ksql.name.FunctionName;
import io.confluent.ksql.util.KsqlException;
import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Struct;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Test;
Expand Down Expand Up @@ -173,6 +175,14 @@ public void shouldGetStringSchemaFromStringClass() {
);
}

@Test
public void shouldGetStringSchemaFromStructClass() {
assertThat(
UdfUtil.getSchemaFromType(Struct.class),
equalTo(StructType.ANY_STRUCT)
);
}

@Test(expected = KsqlException.class)
public void shouldThrowExceptionIfClassDoesntMapToSchema() {
UdfUtil.getSchemaFromType(System.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (VAL STRING) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`ROWKEY` STRING KEY, `VAL` STRING",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT TEST_UDF() VALUE\nFROM TEST TEST\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ROWKEY` STRING KEY, `VALUE` STRUCT<`A` STRING>",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`ROWKEY` STRING KEY, `VAL` STRING"
},
"selectExpressions" : [ "TEST_UDF() AS VALUE" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.any.key.name.enabled" : "false",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
{
"version" : "6.0.0",
"timestamp" : 1588882528086,
"path" : "query-validation-tests/struct-udfs.json",
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<VAL VARCHAR> NOT NULL",
"CSAS_OUTPUT_0.OUTPUT" : "STRUCT<VALUE STRUCT<A VARCHAR>> NOT NULL"
},
"testCase" : {
"name" : "Output struct",
"inputs" : [ {
"topic" : "test_topic",
"key" : "1",
"value" : {
"val" : "foo"
},
"timestamp" : 0
} ],
"outputs" : [ {
"topic" : "OUTPUT",
"key" : "1",
"value" : {
"VALUE" : {
"A" : "foo"
}
},
"timestamp" : 0
} ],
"topics" : [ {
"name" : "OUTPUT",
"replicas" : 1,
"numPartitions" : 4
}, {
"name" : "test_topic",
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM test (val VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT TEST_UDF() AS VALUE FROM test;" ],
"post" : {
"topics" : {
"topics" : [ {
"name" : "OUTPUT",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
}, {
"name" : "test_topic",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
} ]
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

Loading