Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: preserve the rest of a struct when one field has a processing error #7373

Merged
merged 5 commits into from
Apr 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,13 @@ public class KsqlConfig extends AbstractConfig {
"When casting a SQLType to string, if false, use String.valueof(), else if true use"
+ "Objects.toString()";

public static final String KSQL_NESTED_ERROR_HANDLING_CONFIG =
"ksql.nested.error.set.null";
public static final String KSQL_NESTED_ERROR_HANDLING_CONFIG_DOC =
"If there is a processing error in an element of a map, struct or array, if true set only the"
+ " failing element to null and preserve the rest of the value, else if false, set the"
+ " the entire value to null.";

public static final String KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG =
"ksql.streams.shutdown.timeout.ms";
public static final Long KSQL_SHUTDOWN_TIMEOUT_MS_DEFAULT = 300_000L;
Expand Down Expand Up @@ -409,16 +416,27 @@ private enum ConfigGeneration {
CURRENT
}

public static final Collection<CompatibilityBreakingConfigDef> COMPATIBLY_BREAKING_CONFIG_DEFS
= ImmutableList.of(new CompatibilityBreakingConfigDef(
KSQL_STRING_CASE_CONFIG_TOGGLE,
Type.BOOLEAN,
false,
true,
Importance.LOW,
Optional.empty(),
KSQL_STRING_CASE_CONFIG_TOGGLE_DOC
));
public static final Collection<CompatibilityBreakingConfigDef> COMPATIBLY_BREAKING_CONFIG_DEFS =
ImmutableList.of(
new CompatibilityBreakingConfigDef(
KSQL_STRING_CASE_CONFIG_TOGGLE,
Type.BOOLEAN,
false,
true,
Importance.LOW,
Optional.empty(),
KSQL_STRING_CASE_CONFIG_TOGGLE_DOC
),
new CompatibilityBreakingConfigDef(
KSQL_NESTED_ERROR_HANDLING_CONFIG,
Type.BOOLEAN,
false,
true,
Importance.LOW,
Optional.empty(),
KSQL_NESTED_ERROR_HANDLING_CONFIG_DOC
)
);

public static class CompatibilityBreakingConfigDef {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.expression.tree.CreateArrayExpression;
import io.confluent.ksql.execution.expression.tree.CreateMapExpression;
import io.confluent.ksql.execution.expression.tree.CreateStructExpression;
Expand All @@ -38,6 +39,7 @@
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.function.KsqlScalarFunction;
import io.confluent.ksql.function.UdfFactory;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
Expand All @@ -52,6 +54,7 @@
import java.util.Map.Entry;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.kafka.connect.data.Schema;
import org.codehaus.commons.compiler.CompileException;
import org.codehaus.commons.compiler.CompilerFactoryFactory;
Expand Down Expand Up @@ -182,7 +185,10 @@ public static IExpressionEvaluator cook(
.newExpressionEvaluator();

ee.setDefaultImports(SqlToJavaVisitor.JAVA_IMPORTS.toArray(new String[0]));
ee.setParameters(argNames, argTypes);
ee.setParameters(
ArrayUtils.addAll(argNames, "defaultValue", "logger", "row"),
ArrayUtils.addAll(argTypes, Object.class, ProcessingLogger.class, GenericRow.class)
);
ee.setExpressionType(expressionType);
ee.cook(javaCode);
return ee;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.commons.lang3.ArrayUtils;
import org.codehaus.commons.compiler.IExpressionEvaluator;

@Immutable
Expand Down Expand Up @@ -84,7 +85,8 @@ public Object evaluate(
final Supplier<String> errorMsg
) {
try {
return expressionEvaluator.evaluate(getParameters(row));
return expressionEvaluator.evaluate(
ArrayUtils.addAll(getParameters(row), defaultValue, logger, row));
} catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException
? e.getCause()
Expand All @@ -95,7 +97,6 @@ public Object evaluate(
}
}


private Object[] getParameters(final GenericRow row) {
final Object[] parameters = threadLocalParameters.get();
spec.resolve(row, parameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ public class SqlToJavaVisitor {
"io.confluent.ksql.execution.codegen.helpers.ArrayAccess",
"io.confluent.ksql.execution.codegen.helpers.SearchedCaseFunction",
"io.confluent.ksql.execution.codegen.helpers.SearchedCaseFunction.LazyWhenClause",
"io.confluent.ksql.logging.processing.RecordProcessingError",
"java.lang.reflect.InvocationTargetException",
"java.util.concurrent.TimeUnit",
"java.sql.Timestamp",
"java.util.Arrays",
Expand Down Expand Up @@ -1033,7 +1035,7 @@ public Pair<String, SqlType> visitCreateArrayExpression(

for (Expression value : expressions) {
array.append(".add(");
array.append(process(value, context).getLeft());
array.append(evaluateOrReturnNull(process(value, context).getLeft(), "array item"));
array.append(")");
}
return new Pair<>(
Expand Down Expand Up @@ -1064,8 +1066,8 @@ public Pair<String, SqlType> visitCreateMapExpression(
final String entries = Streams.zip(
keys.stream(),
values.stream(),
(k, v) -> ".put(" + process(k, context).getLeft() + ", " + process(v, context).getLeft()
+ ")"
(k, v) -> ".put(" + evaluateOrReturnNull(process(k, context).getLeft(), "map key")
+ ", " + evaluateOrReturnNull(process(v, context).getLeft(), "map value") + ")"
).collect(Collectors.joining());

return new Pair<>(
Expand All @@ -1086,7 +1088,8 @@ public Pair<String, SqlType> visitStructExpression(
.append(field.getName())
.append('"')
.append(",")
.append(process(field.getValue(), context).getLeft())
.append(evaluateOrReturnNull(
process(field.getValue(), context).getLeft(), "struct field"))
.append(")");
}
return new Pair<>(
Expand All @@ -1095,6 +1098,25 @@ public Pair<String, SqlType> visitStructExpression(
);
}

private String evaluateOrReturnNull(final String s, final String type) {
if (ksqlConfig.getBoolean(KsqlConfig.KSQL_NESTED_ERROR_HANDLING_CONFIG)) {
return " (new " + Supplier.class.getSimpleName() + "<Object>() {"
+ "@Override public Object get() {"
+ " try {"
+ " return " + s + ";"
+ " } catch (Exception e) {"
+ " logger.error(RecordProcessingError.recordProcessingError("
+ " \"Error processing " + type + "\","
+ " e instanceof InvocationTargetException? e.getCause() : e,"
+ " row));"
+ " return defaultValue;"
+ " }"
+ "}}).get()";
} else {
return s;
}
}

@Override
public Pair<String, SqlType> visitBetweenPredicate(
final BetweenPredicate node,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import static java.util.Objects.requireNonNull;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.List;
import org.apache.commons.lang3.ArrayUtils;
import org.codehaus.commons.compiler.IExpressionEvaluator;

public final class CodeGenTestUtil {
Expand Down Expand Up @@ -136,7 +139,7 @@ public Object rawEvaluate(final Object arg) throws Exception {

public Object rawEvaluate(final List<?> args) throws Exception {
try {
return ee.evaluate(args == null ? new Object[]{null} : args.toArray());
return ee.evaluate(ArrayUtils.addAll(args == null ? new Object[]{null} : args.toArray(), null, null, null));
} catch (final InvocationTargetException e) {
throw e.getTargetException() instanceof Exception
? (Exception) e.getTargetException()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void shouldEvaluateExpressionWithValueColumnSpecs() throws Exception {

// Then:
assertThat(result, equalTo(RETURN_VALUE));
verify(expressionEvaluator).evaluate(new Object[]{123, 456});
verify(expressionEvaluator).evaluate(new Object[]{123, 456, DEFAULT_VAL, processingLogger, genericRow(123, 456)});
}

@Test
Expand Down Expand Up @@ -115,7 +115,7 @@ public void shouldEvaluateExpressionWithUdfsSpecs() throws Exception {

// Then:
assertThat(result, equalTo(RETURN_VALUE));
verify(expressionEvaluator).evaluate(new Object[]{udf, 123});
verify(expressionEvaluator).evaluate(new Object[]{udf, 123, DEFAULT_VAL, processingLogger, genericRow(123)});
}

@Test
Expand All @@ -135,7 +135,7 @@ public void shouldPerformThreadSafeParameterEvaluation() throws Exception {
final CountDownLatch threadLatch = new CountDownLatch(1);
final CountDownLatch mainLatch = new CountDownLatch(1);

when(expressionEvaluator.evaluate(new Object[]{123, 456}))
when(expressionEvaluator.evaluate(new Object[]{123, 456, DEFAULT_VAL, processingLogger, genericRow(123, 456)}))
.thenAnswer(
invocation -> {
threadLatch.countDown();
Expand Down Expand Up @@ -169,9 +169,9 @@ public void shouldPerformThreadSafeParameterEvaluation() throws Exception {
// Then:
thread.join();
verify(expressionEvaluator, times(1))
.evaluate(new Object[]{123, 456});
.evaluate(new Object[]{123, 456, DEFAULT_VAL, processingLogger, genericRow(123, 456)});
verify(expressionEvaluator, times(1))
.evaluate(new Object[]{100, 200});
.evaluate(new Object[]{100, 200, DEFAULT_VAL, processingLogger, genericRow(100, 200)});
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ public void shouldProcessCreateArrayExpressionCorrectly() {
// Then:
assertThat(
java,
equalTo("((List)new ArrayBuilder(2).add(((Double) ((java.util.Map)COL5).get(\"key1\"))).add(1E0).build())"));
equalTo("((List)new ArrayBuilder(2)"
+ ".add( (new Supplier<Object>() {@Override public Object get() { try { return ((Double) ((java.util.Map)COL5).get(\"key1\")); } catch (Exception e) { " + onException("array item") + " }}}).get())"
+ ".add( (new Supplier<Object>() {@Override public Object get() { try { return 1E0; } catch (Exception e) { " + onException("array item") + " }}}).get()).build())"));
}

@Test
Expand All @@ -194,7 +196,9 @@ public void shouldProcessCreateMapExpressionCorrectly() {
String java = sqlToJavaVisitor.process(expression);

// Then:
assertThat(java, equalTo("((Map)new MapBuilder(2).put(\"foo\", ((Double) ((java.util.Map)COL5).get(\"key1\"))).put(\"bar\", 1E0).build())"));
assertThat(java, equalTo("((Map)new MapBuilder(2)"
+ ".put( (new Supplier<Object>() {@Override public Object get() { try { return \"foo\"; } catch (Exception e) { " + onException("map key") + " }}}).get(), (new Supplier<Object>() {@Override public Object get() { try { return ((Double) ((java.util.Map)COL5).get(\"key1\")); } catch (Exception e) { " + onException("map value") + " }}}).get())"
+ ".put( (new Supplier<Object>() {@Override public Object get() { try { return \"bar\"; } catch (Exception e) { " + onException("map key") + " }}}).get(), (new Supplier<Object>() {@Override public Object get() { try { return 1E0; } catch (Exception e) { " + onException("map value") + " }}}).get()).build())"));
}

@Test
Expand All @@ -213,7 +217,9 @@ public void shouldProcessStructExpressionCorrectly() {
// Then:
assertThat(
javaExpression,
equalTo("((Struct)new Struct(schema0).put(\"col1\",\"foo\").put(\"col2\",((Double) ((java.util.Map)COL5).get(\"key1\"))))"));
equalTo("((Struct)new Struct(schema0)"
+ ".put(\"col1\", (new Supplier<Object>() {@Override public Object get() { try { return \"foo\"; } catch (Exception e) { " + onException("struct field") + " }}}).get())"
+ ".put(\"col2\", (new Supplier<Object>() {@Override public Object get() { try { return ((Double) ((java.util.Map)COL5).get(\"key1\")); } catch (Exception e) { " + onException("struct field") + " }}}).get()))"));
}

@Test
Expand Down Expand Up @@ -1165,4 +1171,9 @@ private void givenUdf(
final UdfMetadata metadata = mock(UdfMetadata.class);
when(factory.getMetadata()).thenReturn(metadata);
}

private String onException(final String type) {
return String.format("logger.error(RecordProcessingError.recordProcessingError( \"Error processing %s\", "
+ "e instanceof InvocationTargetException? e.getCause() : e, row)); return defaultValue;", type);
}
}
Loading