From e2627c613945a51b683b5dd5bc0bf25cd0611980 Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Thu, 1 Apr 2021 20:02:47 -0700 Subject: [PATCH 1/4] fix: preserve the rest of a struct when one field has a processing error --- .../execution/codegen/SqlToJavaVisitor.java | 19 +++++++++++++++---- .../codegen/SqlToJavaVisitorTest.java | 12 +++++++++--- .../query-validation-tests/array.json | 13 +++++++++++++ .../resources/query-validation-tests/map.json | 13 +++++++++++++ .../query-validation-tests/struct-udfs.json | 13 +++++++++++++ 5 files changed, 63 insertions(+), 7 deletions(-) diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java index 2936129efb83..1d5952fd4b35 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java @@ -1033,7 +1033,7 @@ public Pair visitCreateArrayExpression( for (Expression value : expressions) { array.append(".add("); - array.append(process(value, context).getLeft()); + array.append(evaluateOrReturnNull(process(value, context).getLeft())); array.append(")"); } return new Pair<>( @@ -1064,8 +1064,8 @@ public Pair visitCreateMapExpression( final String entries = Streams.zip( keys.stream(), values.stream(), - (k, v) -> ".put(" + process(k, context).getLeft() + ", " + process(v, context).getLeft() - + ")" + (k, v) -> ".put(" + evaluateOrReturnNull(process(k, context).getLeft()) + ", " + + evaluateOrReturnNull(process(v, context).getLeft()) + ")" ).collect(Collectors.joining()); return new Pair<>( @@ -1086,7 +1086,7 @@ public Pair visitStructExpression( .append(field.getName()) .append('"') .append(",") - .append(process(field.getValue(), context).getLeft()) + .append(evaluateOrReturnNull(process(field.getValue(), context).getLeft())) .append(")"); } return new Pair<>( @@ -1095,6 +1095,17 @@ public Pair visitStructExpression( ); } + private String evaluateOrReturnNull(final String s) { + return " (new " + Supplier.class.getSimpleName() + "() {" + + "@Override public Object get() {" + + " try {" + + " return " + s + ";" + + " } catch (Exception e) {" + + " return null;" + + " }" + + "}}).get()"; + } + @Override public Pair visitBetweenPredicate( final BetweenPredicate node, diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java index b0c93b74d22c..8a3bc39a0616 100644 --- a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java +++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java @@ -175,7 +175,9 @@ public void shouldProcessCreateArrayExpressionCorrectly() { // Then: assertThat( java, - equalTo("((List)new ArrayBuilder(2).add(((Double) ((java.util.Map)COL5).get(\"key1\"))).add(1E0).build())")); + equalTo("((List)new ArrayBuilder(2)" + + ".add( (new Supplier() {@Override public Object get() { try { return ((Double) ((java.util.Map)COL5).get(\"key1\")); } catch (Exception e) { return null; }}}).get())" + + ".add( (new Supplier() {@Override public Object get() { try { return 1E0; } catch (Exception e) { return null; }}}).get()).build())")); } @Test @@ -194,7 +196,9 @@ public void shouldProcessCreateMapExpressionCorrectly() { String java = sqlToJavaVisitor.process(expression); // Then: - assertThat(java, equalTo("((Map)new MapBuilder(2).put(\"foo\", ((Double) ((java.util.Map)COL5).get(\"key1\"))).put(\"bar\", 1E0).build())")); + assertThat(java, equalTo("((Map)new MapBuilder(2)" + + ".put( (new Supplier() {@Override public Object get() { try { return \"foo\"; } catch (Exception e) { return null; }}}).get(), (new Supplier() {@Override public Object get() { try { return ((Double) ((java.util.Map)COL5).get(\"key1\")); } catch (Exception e) { return null; }}}).get())" + + ".put( (new Supplier() {@Override public Object get() { try { return \"bar\"; } catch (Exception e) { return null; }}}).get(), (new Supplier() {@Override public Object get() { try { return 1E0; } catch (Exception e) { return null; }}}).get()).build())")); } @Test @@ -213,7 +217,9 @@ public void shouldProcessStructExpressionCorrectly() { // Then: assertThat( javaExpression, - equalTo("((Struct)new Struct(schema0).put(\"col1\",\"foo\").put(\"col2\",((Double) ((java.util.Map)COL5).get(\"key1\"))))")); + equalTo("((Struct)new Struct(schema0)" + + ".put(\"col1\", (new Supplier() {@Override public Object get() { try { return \"foo\"; } catch (Exception e) { return null; }}}).get())" + + ".put(\"col2\", (new Supplier() {@Override public Object get() { try { return ((Double) ((java.util.Map)COL5).get(\"key1\")); } catch (Exception e) { return null; }}}).get()))")); } @Test diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/array.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/array.json index 07a4b57688e1..2b481572d94b 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/array.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/array.json @@ -119,6 +119,19 @@ "outputs": [ {"topic": "OUTPUT", "value": {"KSQL_COL_0": 1}} ] + }, + { + "name": "Output array with an error", + "statements": [ + "CREATE STREAM test (K STRING KEY, val VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT K, ARRAY[44, stringtodate(val, 'yyyyMMdd')] AS VALUE FROM test;" + ], + "inputs": [ + {"topic": "test_topic", "key": "1", "value": {"val": "foo"}, "timestamp": 0} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "1", "value": {"VALUE": [44, null]}, "timestamp": 0} + ] } ] } \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/map.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/map.json index 9353f581e1e8..89fdadeac0c4 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/map.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/map.json @@ -132,6 +132,19 @@ {"topic": "OUTPUT", "key": "r4", "value": {"COMBINED": { } } }, {"topic": "OUTPUT", "key": "r5", "value": {"COMBINED": {"foo": null } } } ] + }, + { + "name": "Output map with an error", + "statements": [ + "CREATE STREAM test (K STRING KEY, val VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT K, MAP('boo':=stringtodate(val, 'yyyyMMdd'), 'shoe':=3) AS VALUE FROM test;" + ], + "inputs": [ + {"topic": "test_topic", "key": "1", "value": {"val": "foo"}, "timestamp": 0} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "1", "value": {"VALUE": {"boo": null, "shoe": 3}}, "timestamp": 0} + ] } ] } \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/struct-udfs.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/struct-udfs.json index abdeff6bbe7c..185a9717b951 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/struct-udfs.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/struct-udfs.json @@ -41,6 +41,19 @@ "outputs": [ {"topic": "OUTPUT", "key": "1", "value": {"VALUE": {"A": "foo"}}, "timestamp": 0} ] + }, + { + "name": "Output struct with errors in one field", + "statements": [ + "CREATE STREAM test (K STRING KEY, val VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT K, STRUCT(field1:='moo', field2:=stringtodate(val, 'yyyyMMdd')) AS VALUE FROM test;" + ], + "inputs": [ + {"topic": "test_topic", "key": "1", "value": {"val": "foo"}, "timestamp": 0} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "1", "value": {"VALUE": {"FIELD1": "moo", "FIELD2": null}}, "timestamp": 0} + ] } ] } \ No newline at end of file From 830f67cc4971dc37d555fe4a598eef170544824d Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Mon, 12 Apr 2021 16:14:06 -0700 Subject: [PATCH 2/4] use default value, log errors --- .../ksql/execution/codegen/CodeGenRunner.java | 8 +- .../execution/codegen/CompiledExpression.java | 5 +- .../execution/codegen/SqlToJavaVisitor.java | 19 ++- .../execution/codegen/CodeGenTestUtil.java | 5 +- .../codegen/CompiledExpressionTest.java | 10 +- .../codegen/SqlToJavaVisitorTest.java | 17 +- .../7.0.0_1618268101306/plan.json | 148 ++++++++++++++++++ .../7.0.0_1618268101306/spec.json | 98 ++++++++++++ .../7.0.0_1618268101306/topology | 13 ++ .../7.0.0_1618268165027/plan.json | 148 ++++++++++++++++++ .../7.0.0_1618268165027/spec.json | 101 ++++++++++++ .../7.0.0_1618268165027/topology | 13 ++ .../7.0.0_1618268182111/plan.json | 148 ++++++++++++++++++ .../7.0.0_1618268182111/spec.json | 101 ++++++++++++ .../7.0.0_1618268182111/topology | 13 ++ .../ksql/rest/entity/KsqlRequestTest.java | 4 +- 16 files changed, 828 insertions(+), 23 deletions(-) create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618268101306/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618268101306/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618268101306/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618268165027/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618268165027/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618268165027/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618268182111/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618268182111/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618268182111/topology diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/CodeGenRunner.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/CodeGenRunner.java index e8b151cdeffa..e74d5b37739c 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/CodeGenRunner.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/CodeGenRunner.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.expression.tree.CreateArrayExpression; import io.confluent.ksql.execution.expression.tree.CreateMapExpression; import io.confluent.ksql.execution.expression.tree.CreateStructExpression; @@ -38,6 +39,7 @@ import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.KsqlScalarFunction; import io.confluent.ksql.function.UdfFactory; +import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.schema.ksql.LogicalSchema; @@ -52,6 +54,7 @@ import java.util.Map.Entry; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.commons.lang3.ArrayUtils; import org.apache.kafka.connect.data.Schema; import org.codehaus.commons.compiler.CompileException; import org.codehaus.commons.compiler.CompilerFactoryFactory; @@ -182,7 +185,10 @@ public static IExpressionEvaluator cook( .newExpressionEvaluator(); ee.setDefaultImports(SqlToJavaVisitor.JAVA_IMPORTS.toArray(new String[0])); - ee.setParameters(argNames, argTypes); + ee.setParameters( + ArrayUtils.addAll(argNames, "defaultValue", "logger", "row"), + ArrayUtils.addAll(argTypes, Object.class, ProcessingLogger.class, GenericRow.class) + ); ee.setExpressionType(expressionType); ee.cook(javaCode); return ee; diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/CompiledExpression.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/CompiledExpression.java index af74264b25de..467836b5dc5d 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/CompiledExpression.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/CompiledExpression.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Objects; import java.util.function.Supplier; +import org.apache.commons.lang3.ArrayUtils; import org.codehaus.commons.compiler.IExpressionEvaluator; @Immutable @@ -84,7 +85,8 @@ public Object evaluate( final Supplier errorMsg ) { try { - return expressionEvaluator.evaluate(getParameters(row)); + return expressionEvaluator.evaluate( + ArrayUtils.addAll(getParameters(row), defaultValue, logger, row)); } catch (final Exception e) { final Throwable cause = e instanceof InvocationTargetException ? e.getCause() @@ -95,7 +97,6 @@ public Object evaluate( } } - private Object[] getParameters(final GenericRow row) { final Object[] parameters = threadLocalParameters.get(); spec.resolve(row, parameters); diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java index 1d5952fd4b35..cf83510cde9f 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java @@ -131,6 +131,8 @@ public class SqlToJavaVisitor { "io.confluent.ksql.execution.codegen.helpers.ArrayAccess", "io.confluent.ksql.execution.codegen.helpers.SearchedCaseFunction", "io.confluent.ksql.execution.codegen.helpers.SearchedCaseFunction.LazyWhenClause", + "io.confluent.ksql.logging.processing.RecordProcessingError", + "java.lang.reflect.InvocationTargetException", "java.util.concurrent.TimeUnit", "java.sql.Timestamp", "java.util.Arrays", @@ -1033,7 +1035,7 @@ public Pair visitCreateArrayExpression( for (Expression value : expressions) { array.append(".add("); - array.append(evaluateOrReturnNull(process(value, context).getLeft())); + array.append(evaluateOrReturnNull(process(value, context).getLeft(), "array item")); array.append(")"); } return new Pair<>( @@ -1064,8 +1066,8 @@ public Pair visitCreateMapExpression( final String entries = Streams.zip( keys.stream(), values.stream(), - (k, v) -> ".put(" + evaluateOrReturnNull(process(k, context).getLeft()) + ", " - + evaluateOrReturnNull(process(v, context).getLeft()) + ")" + (k, v) -> ".put(" + evaluateOrReturnNull(process(k, context).getLeft(), "map key") + + ", " + evaluateOrReturnNull(process(v, context).getLeft(), "map value") + ")" ).collect(Collectors.joining()); return new Pair<>( @@ -1086,7 +1088,8 @@ public Pair visitStructExpression( .append(field.getName()) .append('"') .append(",") - .append(evaluateOrReturnNull(process(field.getValue(), context).getLeft())) + .append(evaluateOrReturnNull( + process(field.getValue(), context).getLeft(), "struct field")) .append(")"); } return new Pair<>( @@ -1095,13 +1098,17 @@ public Pair visitStructExpression( ); } - private String evaluateOrReturnNull(final String s) { + private String evaluateOrReturnNull(final String s, final String type) { return " (new " + Supplier.class.getSimpleName() + "() {" + "@Override public Object get() {" + " try {" + " return " + s + ";" + " } catch (Exception e) {" - + " return null;" + + " logger.error(RecordProcessingError.recordProcessingError(" + + " \"Error processing " + type + "\"," + + " e instanceof InvocationTargetException? e.getCause() : e," + + " row));" + + " return defaultValue;" + " }" + "}}).get()"; } diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/CodeGenTestUtil.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/CodeGenTestUtil.java index 01ebd4ddc926..6e7218152606 100644 --- a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/CodeGenTestUtil.java +++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/CodeGenTestUtil.java @@ -3,9 +3,12 @@ import static java.util.Objects.requireNonNull; import com.google.common.collect.ImmutableList; +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.logging.processing.ProcessingLogger; import java.lang.reflect.InvocationTargetException; import java.util.Collections; import java.util.List; +import org.apache.commons.lang3.ArrayUtils; import org.codehaus.commons.compiler.IExpressionEvaluator; public final class CodeGenTestUtil { @@ -136,7 +139,7 @@ public Object rawEvaluate(final Object arg) throws Exception { public Object rawEvaluate(final List args) throws Exception { try { - return ee.evaluate(args == null ? new Object[]{null} : args.toArray()); + return ee.evaluate(ArrayUtils.addAll(args == null ? new Object[]{null} : args.toArray(), null, null, null)); } catch (final InvocationTargetException e) { throw e.getTargetException() instanceof Exception ? (Exception) e.getTargetException() diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/CompiledExpressionTest.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/CompiledExpressionTest.java index 0d37bfd707fa..f7480f7f663b 100644 --- a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/CompiledExpressionTest.java +++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/CompiledExpressionTest.java @@ -86,7 +86,7 @@ public void shouldEvaluateExpressionWithValueColumnSpecs() throws Exception { // Then: assertThat(result, equalTo(RETURN_VALUE)); - verify(expressionEvaluator).evaluate(new Object[]{123, 456}); + verify(expressionEvaluator).evaluate(new Object[]{123, 456, DEFAULT_VAL, processingLogger, genericRow(123, 456)}); } @Test @@ -115,7 +115,7 @@ public void shouldEvaluateExpressionWithUdfsSpecs() throws Exception { // Then: assertThat(result, equalTo(RETURN_VALUE)); - verify(expressionEvaluator).evaluate(new Object[]{udf, 123}); + verify(expressionEvaluator).evaluate(new Object[]{udf, 123, DEFAULT_VAL, processingLogger, genericRow(123)}); } @Test @@ -135,7 +135,7 @@ public void shouldPerformThreadSafeParameterEvaluation() throws Exception { final CountDownLatch threadLatch = new CountDownLatch(1); final CountDownLatch mainLatch = new CountDownLatch(1); - when(expressionEvaluator.evaluate(new Object[]{123, 456})) + when(expressionEvaluator.evaluate(new Object[]{123, 456, DEFAULT_VAL, processingLogger, genericRow(123, 456)})) .thenAnswer( invocation -> { threadLatch.countDown(); @@ -169,9 +169,9 @@ public void shouldPerformThreadSafeParameterEvaluation() throws Exception { // Then: thread.join(); verify(expressionEvaluator, times(1)) - .evaluate(new Object[]{123, 456}); + .evaluate(new Object[]{123, 456, DEFAULT_VAL, processingLogger, genericRow(123, 456)}); verify(expressionEvaluator, times(1)) - .evaluate(new Object[]{100, 200}); + .evaluate(new Object[]{100, 200, DEFAULT_VAL, processingLogger, genericRow(100, 200)}); } @Test diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java index 8a3bc39a0616..6d06109fcb9f 100644 --- a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java +++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java @@ -176,8 +176,8 @@ public void shouldProcessCreateArrayExpressionCorrectly() { assertThat( java, equalTo("((List)new ArrayBuilder(2)" - + ".add( (new Supplier() {@Override public Object get() { try { return ((Double) ((java.util.Map)COL5).get(\"key1\")); } catch (Exception e) { return null; }}}).get())" - + ".add( (new Supplier() {@Override public Object get() { try { return 1E0; } catch (Exception e) { return null; }}}).get()).build())")); + + ".add( (new Supplier() {@Override public Object get() { try { return ((Double) ((java.util.Map)COL5).get(\"key1\")); } catch (Exception e) { " + onException("array item") + " }}}).get())" + + ".add( (new Supplier() {@Override public Object get() { try { return 1E0; } catch (Exception e) { " + onException("array item") + " }}}).get()).build())")); } @Test @@ -197,8 +197,8 @@ public void shouldProcessCreateMapExpressionCorrectly() { // Then: assertThat(java, equalTo("((Map)new MapBuilder(2)" - + ".put( (new Supplier() {@Override public Object get() { try { return \"foo\"; } catch (Exception e) { return null; }}}).get(), (new Supplier() {@Override public Object get() { try { return ((Double) ((java.util.Map)COL5).get(\"key1\")); } catch (Exception e) { return null; }}}).get())" - + ".put( (new Supplier() {@Override public Object get() { try { return \"bar\"; } catch (Exception e) { return null; }}}).get(), (new Supplier() {@Override public Object get() { try { return 1E0; } catch (Exception e) { return null; }}}).get()).build())")); + + ".put( (new Supplier() {@Override public Object get() { try { return \"foo\"; } catch (Exception e) { " + onException("map key") + " }}}).get(), (new Supplier() {@Override public Object get() { try { return ((Double) ((java.util.Map)COL5).get(\"key1\")); } catch (Exception e) { " + onException("map value") + " }}}).get())" + + ".put( (new Supplier() {@Override public Object get() { try { return \"bar\"; } catch (Exception e) { " + onException("map key") + " }}}).get(), (new Supplier() {@Override public Object get() { try { return 1E0; } catch (Exception e) { " + onException("map value") + " }}}).get()).build())")); } @Test @@ -218,8 +218,8 @@ public void shouldProcessStructExpressionCorrectly() { assertThat( javaExpression, equalTo("((Struct)new Struct(schema0)" - + ".put(\"col1\", (new Supplier() {@Override public Object get() { try { return \"foo\"; } catch (Exception e) { return null; }}}).get())" - + ".put(\"col2\", (new Supplier() {@Override public Object get() { try { return ((Double) ((java.util.Map)COL5).get(\"key1\")); } catch (Exception e) { return null; }}}).get()))")); + + ".put(\"col1\", (new Supplier() {@Override public Object get() { try { return \"foo\"; } catch (Exception e) { " + onException("struct field") + " }}}).get())" + + ".put(\"col2\", (new Supplier() {@Override public Object get() { try { return ((Double) ((java.util.Map)COL5).get(\"key1\")); } catch (Exception e) { " + onException("struct field") + " }}}).get()))")); } @Test @@ -1171,4 +1171,9 @@ private void givenUdf( final UdfMetadata metadata = mock(UdfMetadata.class); when(factory.getMetadata()).thenReturn(metadata); } + + private String onException(final String type) { + return String.format("logger.error(RecordProcessingError.recordProcessingError( \"Error processing %s\", " + + "e instanceof InvocationTargetException? e.getCause() : e, row)); return defaultValue;", type); + } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618268101306/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618268101306/plan.json new file mode 100644 index 000000000000..52b248f20056 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618268101306/plan.json @@ -0,0 +1,148 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (K STRING KEY, VAL STRING) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`K` STRING KEY, `VAL` STRING", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n ARRAY[44, STRINGTODATE(TEST.VAL, 'yyyyMMdd')] VALUE\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`K` STRING KEY, `VALUE` ARRAY", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`K` STRING KEY, `VAL` STRING" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "ARRAY[44, STRINGTODATE(VAL, 'yyyyMMdd')] AS VALUE" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618268101306/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618268101306/spec.json new file mode 100644 index 000000000000..66c51f416686 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618268101306/spec.json @@ -0,0 +1,98 @@ +{ + "version" : "7.0.0", + "timestamp" : 1618268101306, + "path" : "query-validation-tests/array.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`K` STRING KEY, `VAL` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`K` STRING KEY, `VALUE` ARRAY", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "Output array with an error", + "inputs" : [ { + "topic" : "test_topic", + "key" : "1", + "value" : { + "val" : "foo" + }, + "timestamp" : 0 + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "1", + "value" : { + "VALUE" : [ 44, null ] + }, + "timestamp" : 0 + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM test (K STRING KEY, val VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, ARRAY[44, stringtodate(val, 'yyyyMMdd')] AS VALUE FROM test;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `VALUE` ARRAY", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `VAL` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618268101306/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618268101306/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618268101306/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618268165027/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618268165027/plan.json new file mode 100644 index 000000000000..881646c8f862 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618268165027/plan.json @@ -0,0 +1,148 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (K STRING KEY, VAL STRING) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`K` STRING KEY, `VAL` STRING", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n MAP('boo':=STRINGTODATE(TEST.VAL, 'yyyyMMdd'), 'shoe':=3) VALUE\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`K` STRING KEY, `VALUE` MAP", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`K` STRING KEY, `VAL` STRING" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "MAP('boo':=STRINGTODATE(VAL, 'yyyyMMdd'), 'shoe':=3) AS VALUE" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618268165027/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618268165027/spec.json new file mode 100644 index 000000000000..79f944fd0b9d --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618268165027/spec.json @@ -0,0 +1,101 @@ +{ + "version" : "7.0.0", + "timestamp" : 1618268165027, + "path" : "query-validation-tests/map.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`K` STRING KEY, `VAL` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`K` STRING KEY, `VALUE` MAP", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "Output map with an error", + "inputs" : [ { + "topic" : "test_topic", + "key" : "1", + "value" : { + "val" : "foo" + }, + "timestamp" : 0 + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "1", + "value" : { + "VALUE" : { + "boo" : null, + "shoe" : 3 + } + }, + "timestamp" : 0 + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM test (K STRING KEY, val VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, MAP('boo':=stringtodate(val, 'yyyyMMdd'), 'shoe':=3) AS VALUE FROM test;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `VALUE` MAP", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `VAL` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618268165027/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618268165027/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618268165027/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618268182111/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618268182111/plan.json new file mode 100644 index 000000000000..059f2494f8cc --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618268182111/plan.json @@ -0,0 +1,148 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (K STRING KEY, VAL STRING) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`K` STRING KEY, `VAL` STRING", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n STRUCT(FIELD1:='moo', FIELD2:=STRINGTODATE(TEST.VAL, 'yyyyMMdd')) VALUE\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`K` STRING KEY, `VALUE` STRUCT<`FIELD1` STRING, `FIELD2` INTEGER>", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`K` STRING KEY, `VAL` STRING" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "STRUCT(FIELD1:='moo', FIELD2:=STRINGTODATE(VAL, 'yyyyMMdd')) AS VALUE" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618268182111/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618268182111/spec.json new file mode 100644 index 000000000000..c2ac1c7ad044 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618268182111/spec.json @@ -0,0 +1,101 @@ +{ + "version" : "7.0.0", + "timestamp" : 1618268182111, + "path" : "query-validation-tests/struct-udfs.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`K` STRING KEY, `VAL` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`K` STRING KEY, `VALUE` STRUCT<`FIELD1` STRING, `FIELD2` INTEGER>", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "Output struct with errors in one field", + "inputs" : [ { + "topic" : "test_topic", + "key" : "1", + "value" : { + "val" : "foo" + }, + "timestamp" : 0 + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "1", + "value" : { + "VALUE" : { + "FIELD1" : "moo", + "FIELD2" : null + } + }, + "timestamp" : 0 + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM test (K STRING KEY, val VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, STRUCT(field1:='moo', field2:=stringtodate(val, 'yyyyMMdd')) AS VALUE FROM test;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `VALUE` STRUCT<`FIELD1` STRING, `FIELD2` INTEGER>", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `VAL` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618268182111/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618268182111/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618268182111/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java b/ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java index 0e365e58fd77..78b506cd7c4d 100644 --- a/ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java +++ b/ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java @@ -159,7 +159,7 @@ public void shouldSerializeToJson() { final String jsonRequest = serialize(A_REQUEST); // Then: - assertThat(jsonRequest, is(A_JSON_REQUEST_WITH_NULL_COMMAND_NUMBER)); + // assertThat(jsonRequest, is(A_JSON_REQUEST_WITH_NULL_COMMAND_NUMBER)); } @Test @@ -168,7 +168,7 @@ public void shouldSerializeToJsonWithCommandNumber() { final String jsonRequest = serialize(A_REQUEST_WITH_COMMAND_NUMBER); // Then: - assertThat(jsonRequest, is(A_JSON_REQUEST_WITH_COMMAND_NUMBER)); + // assertThat(jsonRequest, is(A_JSON_REQUEST_WITH_COMMAND_NUMBER)); } @Test From 61da15bc9ca94e9b2c7bee415d3fb6891cad1aa6 Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Mon, 12 Apr 2021 16:30:09 -0700 Subject: [PATCH 3/4] uncomment test --- .../java/io/confluent/ksql/rest/entity/KsqlRequestTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java b/ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java index 78b506cd7c4d..0e365e58fd77 100644 --- a/ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java +++ b/ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java @@ -159,7 +159,7 @@ public void shouldSerializeToJson() { final String jsonRequest = serialize(A_REQUEST); // Then: - // assertThat(jsonRequest, is(A_JSON_REQUEST_WITH_NULL_COMMAND_NUMBER)); + assertThat(jsonRequest, is(A_JSON_REQUEST_WITH_NULL_COMMAND_NUMBER)); } @Test @@ -168,7 +168,7 @@ public void shouldSerializeToJsonWithCommandNumber() { final String jsonRequest = serialize(A_REQUEST_WITH_COMMAND_NUMBER); // Then: - // assertThat(jsonRequest, is(A_JSON_REQUEST_WITH_COMMAND_NUMBER)); + assertThat(jsonRequest, is(A_JSON_REQUEST_WITH_COMMAND_NUMBER)); } @Test From efc5284fc7557edcf53741867de91278c36f70dc Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Mon, 19 Apr 2021 09:50:23 -0700 Subject: [PATCH 4/4] Add feature flag --- .../io/confluent/ksql/util/KsqlConfig.java | 38 ++++++++++++++----- .../execution/codegen/SqlToJavaVisitor.java | 28 ++++++++------ .../plan.json | 1 + .../spec.json | 2 +- .../topology | 0 .../plan.json | 1 + .../spec.json | 2 +- .../topology | 0 .../plan.json | 1 + .../spec.json | 2 +- .../topology | 0 11 files changed, 50 insertions(+), 25 deletions(-) rename ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/{7.0.0_1618268101306 => 7.0.0_1618850874952}/plan.json (99%) rename ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/{7.0.0_1618268101306 => 7.0.0_1618850874952}/spec.json (98%) rename ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/{7.0.0_1618268101306 => 7.0.0_1618850874952}/topology (100%) rename ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/{7.0.0_1618268165027 => 7.0.0_1618850948804}/plan.json (99%) rename ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/{7.0.0_1618268165027 => 7.0.0_1618850948804}/spec.json (98%) rename ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/{7.0.0_1618268165027 => 7.0.0_1618850948804}/topology (100%) rename ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/{7.0.0_1618268182111 => 7.0.0_1618850969307}/plan.json (99%) rename ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/{7.0.0_1618268182111 => 7.0.0_1618850969307}/spec.json (98%) rename ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/{7.0.0_1618268182111 => 7.0.0_1618850969307}/topology (100%) diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index dc1ea39441b2..855b91bd2e6d 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -264,6 +264,13 @@ public class KsqlConfig extends AbstractConfig { "When casting a SQLType to string, if false, use String.valueof(), else if true use" + "Objects.toString()"; + public static final String KSQL_NESTED_ERROR_HANDLING_CONFIG = + "ksql.nested.error.set.null"; + public static final String KSQL_NESTED_ERROR_HANDLING_CONFIG_DOC = + "If there is a processing error in an element of a map, struct or array, if true set only the" + + " failing element to null and preserve the rest of the value, else if false, set the" + + " the entire value to null."; + public static final String KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG = "ksql.streams.shutdown.timeout.ms"; public static final Long KSQL_SHUTDOWN_TIMEOUT_MS_DEFAULT = 300_000L; @@ -409,16 +416,27 @@ private enum ConfigGeneration { CURRENT } - public static final Collection COMPATIBLY_BREAKING_CONFIG_DEFS - = ImmutableList.of(new CompatibilityBreakingConfigDef( - KSQL_STRING_CASE_CONFIG_TOGGLE, - Type.BOOLEAN, - false, - true, - Importance.LOW, - Optional.empty(), - KSQL_STRING_CASE_CONFIG_TOGGLE_DOC - )); + public static final Collection COMPATIBLY_BREAKING_CONFIG_DEFS = + ImmutableList.of( + new CompatibilityBreakingConfigDef( + KSQL_STRING_CASE_CONFIG_TOGGLE, + Type.BOOLEAN, + false, + true, + Importance.LOW, + Optional.empty(), + KSQL_STRING_CASE_CONFIG_TOGGLE_DOC + ), + new CompatibilityBreakingConfigDef( + KSQL_NESTED_ERROR_HANDLING_CONFIG, + Type.BOOLEAN, + false, + true, + Importance.LOW, + Optional.empty(), + KSQL_NESTED_ERROR_HANDLING_CONFIG_DOC + ) + ); public static class CompatibilityBreakingConfigDef { diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java index cf83510cde9f..7ed3982c7516 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java @@ -1099,18 +1099,22 @@ public Pair visitStructExpression( } private String evaluateOrReturnNull(final String s, final String type) { - return " (new " + Supplier.class.getSimpleName() + "() {" - + "@Override public Object get() {" - + " try {" - + " return " + s + ";" - + " } catch (Exception e) {" - + " logger.error(RecordProcessingError.recordProcessingError(" - + " \"Error processing " + type + "\"," - + " e instanceof InvocationTargetException? e.getCause() : e," - + " row));" - + " return defaultValue;" - + " }" - + "}}).get()"; + if (ksqlConfig.getBoolean(KsqlConfig.KSQL_NESTED_ERROR_HANDLING_CONFIG)) { + return " (new " + Supplier.class.getSimpleName() + "() {" + + "@Override public Object get() {" + + " try {" + + " return " + s + ";" + + " } catch (Exception e) {" + + " logger.error(RecordProcessingError.recordProcessingError(" + + " \"Error processing " + type + "\"," + + " e instanceof InvocationTargetException? e.getCause() : e," + + " row));" + + " return defaultValue;" + + " }" + + "}}).get()"; + } else { + return s; + } } @Override diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618268101306/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618850874952/plan.json similarity index 99% rename from ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618268101306/plan.json rename to ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618850874952/plan.json index 52b248f20056..98c5d79598cd 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618268101306/plan.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618850874952/plan.json @@ -138,6 +138,7 @@ "ksql.udfs.enabled" : "true", "ksql.udf.enable.security.manager" : "true", "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", "ksql.udf.collect.metrics" : "false", "ksql.query.pull.thread.pool.size" : "100", "ksql.persistent.prefix" : "query_", diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618268101306/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618850874952/spec.json similarity index 98% rename from ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618268101306/spec.json rename to ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618850874952/spec.json index 66c51f416686..ef9dde3f0684 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618268101306/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618850874952/spec.json @@ -1,6 +1,6 @@ { "version" : "7.0.0", - "timestamp" : 1618268101306, + "timestamp" : 1618850874952, "path" : "query-validation-tests/array.json", "schemas" : { "CSAS_OUTPUT_0.KsqlTopic.Source" : { diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618268101306/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618850874952/topology similarity index 100% rename from ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618268101306/topology rename to ksqldb-functional-tests/src/test/resources/historical_plans/array_-_Output_array_with_an_error/7.0.0_1618850874952/topology diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618268165027/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618850948804/plan.json similarity index 99% rename from ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618268165027/plan.json rename to ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618850948804/plan.json index 881646c8f862..4c3034cf7fe9 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618268165027/plan.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618850948804/plan.json @@ -138,6 +138,7 @@ "ksql.udfs.enabled" : "true", "ksql.udf.enable.security.manager" : "true", "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", "ksql.udf.collect.metrics" : "false", "ksql.query.pull.thread.pool.size" : "100", "ksql.persistent.prefix" : "query_", diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618268165027/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618850948804/spec.json similarity index 98% rename from ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618268165027/spec.json rename to ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618850948804/spec.json index 79f944fd0b9d..4492d003c00c 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618268165027/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618850948804/spec.json @@ -1,6 +1,6 @@ { "version" : "7.0.0", - "timestamp" : 1618268165027, + "timestamp" : 1618850948804, "path" : "query-validation-tests/map.json", "schemas" : { "CSAS_OUTPUT_0.KsqlTopic.Source" : { diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618268165027/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618850948804/topology similarity index 100% rename from ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618268165027/topology rename to ksqldb-functional-tests/src/test/resources/historical_plans/map_-_Output_map_with_an_error/7.0.0_1618850948804/topology diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618268182111/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618850969307/plan.json similarity index 99% rename from ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618268182111/plan.json rename to ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618850969307/plan.json index 059f2494f8cc..0ab42282279a 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618268182111/plan.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618850969307/plan.json @@ -138,6 +138,7 @@ "ksql.udfs.enabled" : "true", "ksql.udf.enable.security.manager" : "true", "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", "ksql.udf.collect.metrics" : "false", "ksql.query.pull.thread.pool.size" : "100", "ksql.persistent.prefix" : "query_", diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618268182111/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618850969307/spec.json similarity index 98% rename from ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618268182111/spec.json rename to ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618850969307/spec.json index c2ac1c7ad044..416643ea6813 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618268182111/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618850969307/spec.json @@ -1,6 +1,6 @@ { "version" : "7.0.0", - "timestamp" : 1618268182111, + "timestamp" : 1618850969307, "path" : "query-validation-tests/struct-udfs.json", "schemas" : { "CSAS_OUTPUT_0.KsqlTopic.Source" : { diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618268182111/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618850969307/topology similarity index 100% rename from ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618268182111/topology rename to ksqldb-functional-tests/src/test/resources/historical_plans/struct-udfs_-_Output_struct_with_errors_in_one_field/7.0.0_1618850969307/topology