From 4437a4470dd6454e7db6d45112f50f095988a098 Mon Sep 17 00:00:00 2001 From: Jim Hughes Date: Tue, 15 Mar 2022 11:53:39 -0400 Subject: [PATCH] feat: Apply the ExtensionSecurityManager to UDAFs Addresses https://github.com/confluentinc/ksql/issues/8662 --- .../ksql/function/FunctionLoaderUtils.java | 4 + .../ksql/function/UdafFactoryInvoker.java | 5 +- .../io/confluent/ksql/function/UdfLoader.java | 3 + .../security/ExtensionSecurityManager.java | 23 +- .../ksql/function/UdfLoaderTest.java | 134 +++++++++- .../ksql/function/UdtfLoaderTest.java | 57 ++++ .../ksql/function/udaf/BadTestUdaf.java | 245 ++++++++++++++++++ .../ksql/function/udf/BadTestUdf.java | 42 +++ .../ksql/function/udf/BadTestUdtf.java | 154 +++++++++++ .../confluent/ksql/function/udf/TestUdf.java | 18 ++ 10 files changed, 682 insertions(+), 3 deletions(-) create mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/BadTestUdaf.java create mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/BadTestUdf.java create mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/BadTestUdtf.java diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/FunctionLoaderUtils.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/FunctionLoaderUtils.java index 8cac1a9eb61d..a5bce7ff7108 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/FunctionLoaderUtils.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/FunctionLoaderUtils.java @@ -29,6 +29,7 @@ import io.confluent.ksql.schema.ksql.SqlTypeParser; import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.security.ExtensionSecurityManager; import io.confluent.ksql.util.KsqlException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -262,6 +263,7 @@ private static SqlType invokeSchemaProviderMethod( final String functionName ) { try { + ExtensionSecurityManager.INSTANCE.pushInUdf(); return (SqlType) m.invoke(instance, args); } catch (IllegalAccessException | InvocationTargetException e) { @@ -269,6 +271,8 @@ private static SqlType invokeSchemaProviderMethod( + "method %s for UDF %s. ", m.getName(), functionName ), e); + } finally { + ExtensionSecurityManager.INSTANCE.popOutUdf(); } } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafFactoryInvoker.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafFactoryInvoker.java index 741a27d71947..c6d77739accf 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafFactoryInvoker.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafFactoryInvoker.java @@ -24,6 +24,7 @@ import io.confluent.ksql.schema.ksql.SqlArgument; import io.confluent.ksql.schema.ksql.SqlTypeParser; import io.confluent.ksql.schema.ksql.types.SqlType; +import io.confluent.ksql.security.ExtensionSecurityManager; import io.confluent.ksql.util.KsqlException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; @@ -87,12 +88,14 @@ KsqlAggregateFunction createFunction(final AggregateFunctionInitArguments initAr final List argTypeList) { final Object[] factoryArgs = initArgs.args().toArray(); try { + ExtensionSecurityManager.INSTANCE.pushInUdf(); final Udaf udaf = (Udaf)method.invoke(null, factoryArgs); - udaf.initializeTypeArguments(argTypeList); + if (udaf instanceof Configurable) { ((Configurable) udaf).configure(initArgs.config()); } + ExtensionSecurityManager.INSTANCE.popOutUdf(); final SqlType aggregateSqlType = (SqlType) udaf.getAggregateSqlType() .orElseGet(() -> SchemaConverters.functionToSqlConverter().toSqlType(aggregateArgType)); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java index 440cbcd1d131..ec4cfb644c0d 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java @@ -24,6 +24,7 @@ import io.confluent.ksql.function.udf.UdfMetadata; import io.confluent.ksql.name.FunctionName; import io.confluent.ksql.schema.ksql.SqlTypeParser; +import io.confluent.ksql.security.ExtensionSecurityManager; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import java.lang.reflect.Method; @@ -186,8 +187,10 @@ private Function getUdfFactory( final Object actualUdf = FunctionLoaderUtils.instantiateFunctionInstance( method.getDeclaringClass(), udfDescriptionAnnotation.name()); if (actualUdf instanceof Configurable) { + ExtensionSecurityManager.INSTANCE.pushInUdf(); ((Configurable) actualUdf) .configure(ksqlConfig.getKsqlFunctionsConfigProps(functionName)); + ExtensionSecurityManager.INSTANCE.popOutUdf(); } final PluggableUdf theUdf = new PluggableUdf(invoker, actualUdf); return metrics.map(m -> new UdfMetricProducer( diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/security/ExtensionSecurityManager.java b/ksqldb-engine/src/main/java/io/confluent/ksql/security/ExtensionSecurityManager.java index 49e87706d869..71ea6d396096 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/security/ExtensionSecurityManager.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/security/ExtensionSecurityManager.java @@ -15,7 +15,10 @@ package io.confluent.ksql.security; +import io.confluent.ksql.function.FunctionLoaderUtils; +import io.confluent.ksql.function.UdfLoader; import io.confluent.ksql.function.udf.PluggableUdf; +import java.lang.reflect.ReflectPermission; import java.security.AllPermission; import java.security.CodeSource; import java.security.Permission; @@ -93,6 +96,19 @@ public void checkExec(final String cmd) { super.checkExec(cmd); } + @Override + public void checkPermission(final Permission perm) { + System.out.println("Checking permission " + perm); + if (inUdfExecution()) { + if (perm instanceof ReflectPermission) { + throw new SecurityException("A UDF attempted to use reflection."); + } + if (perm instanceof RuntimePermission) { + throw new SecurityException("A UDF attempted to make a system call."); + } + } + super.checkPermission(perm); + } private boolean inUdfExecution() { final Stack executing = UDF_IS_EXECUTING.get(); @@ -105,6 +121,11 @@ private boolean inUdfExecution() { * @return true if caller is allowed */ private boolean validateCaller() { - return getClassContext()[2].equals(PluggableUdf.class); + final Class caller = getClassContext()[2]; + System.out.println("Caller is " + caller); + return caller.equals(PluggableUdf.class) + || caller.equals(FunctionLoaderUtils.class) + || caller.equals(UdfLoader.class) + || caller.getName().equals("io.confluent.ksql.function.UdafFactoryInvoker"); } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java index f491ff495069..590510883312 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java @@ -51,7 +51,6 @@ import io.confluent.ksql.function.udf.UdfParameter; import io.confluent.ksql.function.udf.UdfSchemaProvider; import io.confluent.ksql.metastore.TypeRegistry; -import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.name.FunctionName; import io.confluent.ksql.schema.ksql.SqlArgument; import io.confluent.ksql.schema.ksql.SqlTypeParser; @@ -62,6 +61,7 @@ import io.confluent.ksql.schema.ksql.types.SqlStruct; import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.security.ExtensionSecurityManager; import io.confluent.ksql.test.util.KsqlTestFolder; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; @@ -138,6 +138,44 @@ public void shouldLoadFunctionsInKsqlEngine() { assertThat(substring2.evaluate("foo", 2, 1), equalTo("o")); } + @Test + public void shouldLoadBadFunctionButNotLetItExit() { + final List argList = Arrays.asList(SqlArgument.of(SqlTypes.STRING)); + // We do need to set up the ExtensionSecurityManager for our test. + // This is controlled by a feature flag and in this test, we just directly enable it. + SecurityManager manager = System.getSecurityManager(); + System.setSecurityManager(ExtensionSecurityManager.INSTANCE); + + final UdfFactory function = FUNC_REG.getUdfFactory(FunctionName.of("test_udf")); + assertThat(function, not(nullValue())); + + KsqlScalarFunction ksqlScalarFunction = function.getFunction(argList); + final Kudf badFunction = ksqlScalarFunction.newInstance(ksqlConfig); + + final Exception e0 = assertThrows( + java.lang.SecurityException.class, + () -> FUNC_REG.getUdfFactory(FunctionName.of("bad_test_udf")) + .getFunction(argList).newInstance(ksqlConfig) + ); + assertThat(e0.getMessage(), containsString("A UDF attempted to call System.exit")); + + final Exception e1 = assertThrows( + KsqlException.class, + () -> ksqlScalarFunction.getReturnType(argList) + ); + assertThat(e1.getMessage(), containsString( + "Cannot invoke the schema provider method exit for UDF test_udf.")); + + final Exception e2 = assertThrows( + KsqlFunctionException.class, + () -> badFunction.evaluate("foo") + ); + assertThat(e2.getMessage(), containsString( + "Failed to invoke function public org.apache.kafka.connect.data.Struct " + + "io.confluent.ksql.function.udf.TestUdf.returnList(java.lang.String)")); + System.setSecurityManager(manager); + } + @SuppressWarnings("unchecked") @Test public void shouldLoadUdafs() { @@ -180,6 +218,100 @@ public void shouldLoadStructUdafs() { equalTo(new Struct(schema).put("A", 1).put("B", 2))); } + @Test + public void shouldNotLetBadUdafsExit() { + // We do need to set up the ExtensionSecurityManager for our test. + // This is controlled by a feature flag and in this test, we just directly enable it. + SecurityManager manager = System.getSecurityManager(); + System.setSecurityManager(ExtensionSecurityManager.INSTANCE); + + // This will exit via create + final Exception e1 = assertThrows( + KsqlException.class, + () -> + ((KsqlAggregateFunction) FUNC_REG + .getAggregateFunction(FunctionName.of("bad_test_udaf"), SqlTypes.array(SqlTypes.INTEGER), + AggregateFunctionInitArguments.EMPTY_ARGS)).aggregate("foo", 2L) + ); + assertThat(e1.getMessage(), containsString("Failed to invoke UDAF factory method")); + + // This will exit via configure + final Exception e2 = assertThrows( + KsqlException.class, + () -> + ((Configurable)FUNC_REG + .getAggregateFunction(FunctionName.of("bad_test_udaf"), SqlTypes.INTEGER, + AggregateFunctionInitArguments.EMPTY_ARGS)).configure(Collections.EMPTY_MAP) + ); + assertThat(e2.getMessage(), containsString("Failed to invoke UDAF factory method")); + + + // This will exit via initialize + final Exception e3 = assertThrows( + SecurityException.class, + () -> + FUNC_REG + .getAggregateFunction(FunctionName.of("bad_test_udaf"), SqlTypes.DOUBLE, + AggregateFunctionInitArguments.EMPTY_ARGS).getInitialValueSupplier().get() + ); + assertThat(e3.getMessage(), containsString("A UDF attempted to call System.exit")); + + // This will exit via map + final Exception e4 = assertThrows( + SecurityException.class, + () -> + ((KsqlAggregateFunction) FUNC_REG + .getAggregateFunction(FunctionName.of("bad_test_udaf"), SqlTypes.BOOLEAN, + AggregateFunctionInitArguments.EMPTY_ARGS)).getResultMapper().apply(true) + ); + assertThat(e4.getMessage(), containsString("A UDF attempted to call System.exit")); + + + // This will exit via merge + final Schema schema = SchemaBuilder.struct() + .field("A", Schema.OPTIONAL_INT32_SCHEMA) + .field("B", Schema.OPTIONAL_INT32_SCHEMA) + .optional() + .build(); + final SqlStruct sqlSchema = SqlTypes.struct() + .field("A", SqlTypes.INTEGER) + .field("B", SqlTypes.INTEGER) + .build(); + final Struct input = new Struct(schema).put("A", 0).put("B", 0); + final Exception e5 = assertThrows( + SecurityException.class, + () -> + ((KsqlAggregateFunction) FUNC_REG.getAggregateFunction(FunctionName.of("bad_test_udaf"), + sqlSchema, + AggregateFunctionInitArguments.EMPTY_ARGS)).getMerger().apply(null, input, input) + ); + assertThat(e5.getMessage(), containsString("A UDF attempted to call System.exit")); + + + // This will exit via aggregate + final Exception e6 = assertThrows( + SecurityException.class, + () -> + ((KsqlAggregateFunction) FUNC_REG + .getAggregateFunction(FunctionName.of("bad_test_udaf"), SqlTypes.STRING, + AggregateFunctionInitArguments.EMPTY_ARGS)).aggregate("foo", 2L) + ); + assertThat(e6.getMessage(), containsString("A UDF attempted to call System.exit")); + + // This will exit via undo. + final Exception e7 = assertThrows( + SecurityException.class, + () -> + ((TableAggregationFunction) FUNC_REG + .getAggregateFunction(FunctionName.of("bad_test_udaf"), SqlTypes.BIGINT, + AggregateFunctionInitArguments.EMPTY_ARGS)).undo(1L, 1L) + ); + assertThat(e7.getMessage(), containsString("A UDF attempted to call System.exit")); + + System.setSecurityManager(manager); + } + + @Test public void shouldLoadDecimalUdfs() { // Given: diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/UdtfLoaderTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/UdtfLoaderTest.java index 3316a3eea90b..2b2223b8d8a3 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/function/UdtfLoaderTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/UdtfLoaderTest.java @@ -32,10 +32,12 @@ import io.confluent.ksql.schema.ksql.SqlTypeParser; import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.security.ExtensionSecurityManager; import io.confluent.ksql.util.KsqlException; import java.io.File; import java.math.BigDecimal; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -227,6 +229,61 @@ public void shouldLoadVarArgsMethod() { assertThat(function.getReturnType(args), equalTo(STRUCT_SCHEMA)); } + @Test + public void shouldNotBadUdtfsExit() { + // We do need to set up the ExtensionSecurityManager for our test. + // This is controlled by a feature flag and in this test, we just directly enable it. + SecurityManager manager = System.getSecurityManager(); + System.setSecurityManager(ExtensionSecurityManager.INSTANCE); + + final Exception e1 = assertThrows( + KsqlException.class, + () -> + FUNC_REG.getTableFunction( + FunctionName.of("bad_test_udtf"), + Collections.singletonList(SqlArgument.of(SqlTypes.decimal(2,0)))) + .getReturnType(ImmutableList.of(SqlArgument.of(SqlTypes.DOUBLE))) + ); + assertThat(e1.getMessage(), containsString( + "Cannot invoke the schema provider method provideSchema for UDF bad_test_udtf.")); + + final Exception e2 = assertThrows( + KsqlFunctionException.class, + () -> + FUNC_REG.getTableFunction( + FunctionName.of("bad_test_udtf"), + Collections.singletonList(SqlArgument.of(SqlTypes.STRING))).apply("foo") + ); + assertThat(e2.getMessage(), containsString( + "Failed to invoke function public java.util.List " + + "io.confluent.ksql.function.udf.BadTestUdtf.listStringReturn(java.lang.String)")); + + // Stop reflection + final Exception e3 = assertThrows( + KsqlFunctionException.class, + () -> + FUNC_REG.getTableFunction( + FunctionName.of("bad_test_udtf"), + Collections.singletonList(SqlArgument.of(SqlTypes.BOOLEAN))).apply(true) + ); + assertThat(e3.getMessage(), containsString( + "Failed to invoke function public java.util.List " + + "io.confluent.ksql.function.udf.BadTestUdtf.listBooleanReturn(boolean)")); + + final Exception e4 = assertThrows( + KsqlFunctionException.class, + () -> + FUNC_REG.getTableFunction( + FunctionName.of("bad_test_udtf"), + Collections.singletonList(SqlArgument.of(SqlTypes.DOUBLE))).apply(1.234) + ); + assertThat(e4.getMessage(), containsString( + "Failed to invoke function public java.util.List " + + "io.confluent.ksql.function.udf.BadTestUdtf.listDoubleReturn(double)")); + + System.setSecurityManager(manager); + } + @Test public void shouldNotLoadUdtfWithWrongReturnValue() { // Given: diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/BadTestUdaf.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/BadTestUdaf.java new file mode 100644 index 000000000000..7e85c47607c9 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/BadTestUdaf.java @@ -0,0 +1,245 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udaf; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.confluent.ksql.util.KsqlConstants; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; + +@UdafDescription( + name = "bad_test_udaf", + description = "bad_test_udaf", + author = KsqlConstants.CONFLUENT_AUTHOR +) +public final class BadTestUdaf { + + private BadTestUdaf() { + } + + @SuppressFBWarnings("DM_EXIT") + private static void runBadCode() { + System.exit(-1); + } + + @UdafFactory(description = "sums longs with bad 'undo' method") + public static TableUdaf createSumLong() { + return new TableUdaf() { + @Override + public Long undo(final Long valueToUndo, final Long aggregateValue) { + runBadCode(); + return aggregateValue - valueToUndo; + } + + @Override + public Long initialize() { + return 0L; + } + + @Override + public Long aggregate(final Long value, final Long aggregate) { + return aggregate + value; + } + + @Override + public Long merge(final Long aggOne, final Long aggTwo) { + return aggOne + aggTwo; + } + + @Override + public Long map(final Long agg) { + return agg; + } + }; + } + + @UdafFactory(description = "sums int with a bad factory call") + public static TableUdaf, Long, Long> createFactoryExiting() { + runBadCode(); + return null; + } + + @UdafFactory(description = "sums int") + public static TableUdaf createSumInt() { + return new SumIntUdaf(); + } + + @UdafFactory(description = "sums double with a bad initialize") + public static Udaf createSumDouble() { + return new Udaf() { + @Override + public Double initialize() { + runBadCode(); + return 0.0; + } + + @Override + public Double aggregate(final Double val, final Double aggregate) { + return aggregate + val; + } + + @Override + public Double merge(final Double aggOne, final Double aggTwo) { + return aggOne + aggTwo; + } + + @Override + public Double map(final Double agg) { + return agg; + } + }; + } + + @UdafFactory(description = "sums the length of strings with a bad aggregate") + public static Udaf createSumLengthString() { + return new Udaf() { + @Override + public Long initialize() { + return (long) "initial".length(); + } + + @Override + public Long aggregate(final String s, final Long aggregate) { + runBadCode(); + return aggregate + s.length(); + } + + @Override + public Long merge(final Long aggOne, final Long aggTwo) { + return aggOne + aggTwo; + } + + @Override + public Long map(final Long agg) { + return agg; + } + }; + } + + @UdafFactory( + description = "returns a struct with {SUM(in->A), SUM(in->B)} with a bad merger", + paramSchema = "STRUCT", + aggregateSchema = "STRUCT", + returnSchema = "STRUCT") + public static Udaf createStructUdaf() { + return new Udaf() { + + @Override + public Struct initialize() { + return new Struct(SchemaBuilder.struct() + .field("A", Schema.OPTIONAL_INT32_SCHEMA) + .field("B", Schema.OPTIONAL_INT32_SCHEMA) + .optional() + .build()) + .put("A", 0) + .put("B", 0); + } + + @Override + public Struct aggregate(final Struct current, final Struct aggregate) { + aggregate.put("A", current.getInt32("A") + aggregate.getInt32("A")); + aggregate.put("B", current.getInt32("B") + aggregate.getInt32("B")); + return aggregate; + } + + @Override + public Struct merge(final Struct aggOne, final Struct aggTwo) { + runBadCode(); + return aggregate(aggOne, aggTwo); + } + + @Override + public Struct map(final Struct agg) { + return agg; + } + }; + } + + // With a bad map method + static class SumIntUdaf implements TableUdaf, Configurable { + + public static final String INIT_CONFIG = "ksql.functions.test_udaf.init"; + private long init = 0L; + + @Override + public Long undo(final Integer valueToUndo, final Long aggregateValue) { + return aggregateValue - valueToUndo; + } + + @Override + public Long initialize() { + return init; + } + + @Override + public Long aggregate(final Integer current, final Long aggregate) { + return current + aggregate; + } + + @Override + public Long merge(final Long aggOne, final Long aggTwo) { + return aggOne + aggTwo; + } + + @Override + public Long map(final Long agg) { + runBadCode(); + return agg; + } + + @Override + public void configure(final Map map) { + runBadCode(); + final Object init = map.get(INIT_CONFIG); + this.init = (init == null) ? this.init : (long) init; + } + } + + @UdafFactory( + description = "bad map", + paramSchema = "BOOLEAN", + aggregateSchema = "BOOLEAN", + returnSchema = "BOOLEAN") + public static Udaf createBadMapUdaf() { + return new Udaf() { + + @Override + public Boolean initialize() { + return Boolean.FALSE; + } + + @Override + public Boolean aggregate(Boolean current, Boolean aggregate) { + return Boolean.FALSE; + } + + @Override + public Boolean merge(Boolean aggOne, Boolean aggTwo) { + return Boolean.FALSE; + } + + @Override + public Boolean map(Boolean agg) { + runBadCode(); + return Boolean.FALSE; + } + }; + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/BadTestUdf.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/BadTestUdf.java new file mode 100644 index 000000000000..193db9881959 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/BadTestUdf.java @@ -0,0 +1,42 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.confluent.ksql.schema.ksql.types.SqlStruct; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import java.util.Map; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.connect.data.Struct; + +@UdfDescription(name="bad_test_udf", description = "test") +@SuppressWarnings("unused") +public class BadTestUdf implements Configurable { + + @SuppressFBWarnings("DM_EXIT") + @Override + public void configure(Map map) { + System.exit(-5); + } + + private static final SqlStruct RETURN = + SqlStruct.builder().field("A", SqlTypes.STRING).build(); + + @Udf(description = "Sample Bad") + public Struct returnList(final String string) { + return null; + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/BadTestUdtf.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/BadTestUdtf.java new file mode 100644 index 000000000000..21ffcf6119fb --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/BadTestUdtf.java @@ -0,0 +1,154 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf; + +import com.google.common.collect.ImmutableList; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.confluent.ksql.function.udtf.Udtf; +import io.confluent.ksql.function.udtf.UdtfDescription; +import io.confluent.ksql.schema.ksql.types.SqlDecimal; +import io.confluent.ksql.schema.ksql.types.SqlType; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.math.BigDecimal; +import java.util.List; +import java.util.Map; +import org.apache.kafka.connect.data.Struct; + +@UdtfDescription(name = "bad_test_udtf", description = "test") +@SuppressWarnings("unused") +public class BadTestUdtf { + + @SuppressFBWarnings("DM_EXIT") + private static void runBadCode() { + System.exit(-1); + } + + @Udtf + public List standardParams( + final int i, final long l, final double d, final boolean b, final String s, + final BigDecimal bd, @UdfParameter(schema = "STRUCT") final Struct struct + ) { + return ImmutableList.of(String.valueOf(i), String.valueOf(l), String.valueOf(d), + String.valueOf(b), s, bd.toString(), struct.toString() + ); + } + + @Udtf + public List parameterizedListParams( + final List i, final List l, final List d, final List b, final List s, + final List bd, @UdfParameter(schema = "ARRAY>") final List struct + ) { + return ImmutableList + .of(String.valueOf(i.get(0)), String.valueOf(l.get(0)), String.valueOf(d.get(0)), + String.valueOf(b.get(0)), s.get(0), bd.get(0).toString(), struct.get(0).toString() + ); + } + + @Udtf + public List parameterizedMapParams( + final Map i, + final Map l, + final Map d, + final Map b, + final Map s, + final Map bd, + @UdfParameter(schema = "MAP>") final Map struct + ) { + return ImmutableList + .of( + String.valueOf(i.values().iterator().next()), + String.valueOf(l.values().iterator().next()), + String.valueOf(d.values().iterator().next()), + String.valueOf(b.values().iterator().next()), + s.values().iterator().next(), + bd.values().iterator().next().toString(), + struct.values().iterator().next().toString() + ); + } + + @Udtf + public List parameterizedMapParams2( + final Map i, + final Map l, + final Map d, + final Map b, + final Map s, + final Map bd, + @UdfParameter(schema = "MAP>") final Map struct + ) { + return ImmutableList + .of( + String.valueOf(i.values().iterator().next()), + String.valueOf(l.values().iterator().next()), + String.valueOf(d.values().iterator().next()), + String.valueOf(b.values().iterator().next()), + s.values().iterator().next(), + bd.values().iterator().next().toString(), + struct.values().iterator().next().toString() + ); + } + + @Udtf + public List listIntegerReturn(final int i) { + return ImmutableList.of(i); + } + + @Udtf + public List listLongReturn(final long l) { + return ImmutableList.of(l); + } + + @Udtf + public List listDoubleReturn(final double d) { + System.setSecurityManager(new SecurityManager()); + runBadCode(); + return ImmutableList.of(d); + } + + @Udtf + public List listBooleanReturn(final boolean b) + throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Class shutdown = Class.forName("java.lang.Shutdown"); + Method method = shutdown.getDeclaredMethod("exit", int.class); + method.setAccessible(true); + method.invoke(shutdown, -10); + return ImmutableList.of(b); + } + + @Udtf + public List listStringReturn(final String s) { + runBadCode(); + return ImmutableList.of(s); + } + + @Udtf(schemaProvider = "provideSchema") + public List listBigDecimalReturnWithSchemaProvider(final BigDecimal bd) { + return ImmutableList.of(bd); + } + + @Udtf(schema = "STRUCT") + public List listStructReturn(@UdfParameter(schema = "STRUCT") final Struct struct) { + return ImmutableList.of(struct); + } + + @UdfSchemaProvider + public SqlType provideSchema(final List params) { + runBadCode(); + return SqlDecimal.of(30, 10); + } + +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/TestUdf.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/TestUdf.java index 2040efd67f34..14788e4f95e5 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/TestUdf.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/TestUdf.java @@ -15,6 +15,7 @@ package io.confluent.ksql.function.udf; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.schema.ksql.types.SqlStruct; import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.schema.ksql.types.SqlTypes; @@ -67,4 +68,21 @@ public Struct returnStructStuff() { public SqlType structProvider(final List params) { return RETURN; } + + @SuppressFBWarnings("DM_EXIT") + @Udf(description = "Sample Bad", schemaProvider = "exit") + public Struct returnList(String string) { + System.out.println("In returnList"); + System.exit(-1); + return null; + } + + @SuppressFBWarnings("DM_EXIT") + @UdfSchemaProvider + public SqlType exit(final List params) { + System.out.println("In schemaProvider"); + System.exit(-3); + return RETURN; + } + }