diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/KsqlSerializationClassifier.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/KsqlSerializationClassifier.java new file mode 100644 index 000000000000..4652903f1259 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/KsqlSerializationClassifier.java @@ -0,0 +1,68 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.query; + +import io.confluent.ksql.query.QueryError.Type; +import io.confluent.ksql.serde.KsqlSerializationException; +import io.confluent.ksql.util.ReservedInternalTopics; +import java.util.Objects; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.kafka.streams.errors.StreamsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@code SerializationClassifier} classifies serialization exceptions as user error + * while writing to a user topic due to schema mismatch + */ +public class KsqlSerializationClassifier implements QueryErrorClassifier { + + private static final Logger LOG = LoggerFactory.getLogger(KsqlSerializationClassifier.class); + + private final String queryId; + + public KsqlSerializationClassifier(final String queryId) { + this.queryId = Objects.requireNonNull(queryId, "queryId"); + } + + private boolean hasInternalTopicPrefix(final Throwable e) { + final int index = ExceptionUtils.indexOfThrowable(e, KsqlSerializationException.class); + final KsqlSerializationException kse = + (KsqlSerializationException) ExceptionUtils.getThrowableList(e).get(index); + + return kse.getTopic().startsWith(ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX); + } + + @Override + public Type classify(final Throwable e) { + Type type = Type.UNKNOWN; + + if (e instanceof KsqlSerializationException + || (e instanceof StreamsException + && (ExceptionUtils.indexOfThrowable(e, KsqlSerializationException.class) != -1))) { + if (!hasInternalTopicPrefix(e)) { + type = Type.USER; + LOG.info( + "Classified error as USER error based on schema mismatch. Query ID: {} Exception: {}", + queryId, + e); + } + } + + return type; + } + +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java index 0ee67161d1cf..86fa735981c7 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java @@ -653,7 +653,8 @@ private QueryErrorClassifier getConfiguredQueryErrorClassifier( .and(new AuthorizationClassifier(applicationId)) .and(new KsqlFunctionClassifier(applicationId)) .and(new MissingSubjectClassifier(applicationId)) - .and(new SchemaAuthorizationClassifier(applicationId)); + .and(new SchemaAuthorizationClassifier(applicationId)) + .and(new KsqlSerializationClassifier(applicationId)); return buildConfiguredClassifiers(ksqlConfig, applicationId) .map(userErrorClassifiers::and) .orElse(userErrorClassifiers);