Skip to content

Commit

Permalink
fix: classify KsqlSerializationException as USER error based on topic(K…
Browse files Browse the repository at this point in the history
…SE-1045)

- Related JIRAs: KSE-1045, KSE-1053, KSE-1113
- Classify KsqlSerializationException as USER errors by looking at the topic
  parameter and verifying if the topic parameter doesn't start with
  _confluent-ksql prefix
  • Loading branch information
bvarghese1 committed Jul 21, 2022
1 parent 033c892 commit 56dddbb
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.query;

import io.confluent.ksql.query.QueryError.Type;
import io.confluent.ksql.serde.KsqlSerializationException;
import io.confluent.ksql.util.ReservedInternalTopics;
import java.util.Objects;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.streams.errors.StreamsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@code SerializationClassifier} classifies serialization exceptions as user error
* while writing to a user topic due to schema mismatch
*/
public class KsqlSerializationClassifier implements QueryErrorClassifier {

private static final Logger LOG = LoggerFactory.getLogger(KsqlSerializationClassifier.class);

private final String queryId;

public KsqlSerializationClassifier(final String queryId) {
this.queryId = Objects.requireNonNull(queryId, "queryId");
}

private boolean hasInternalTopicPrefix(final Throwable e) {
final int index = ExceptionUtils.indexOfThrowable(e, KsqlSerializationException.class);
final KsqlSerializationException kse =
(KsqlSerializationException) ExceptionUtils.getThrowableList(e).get(index);

return kse.getTopic().startsWith(ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX);
}

@Override
public Type classify(final Throwable e) {
Type type = Type.UNKNOWN;

if (e instanceof KsqlSerializationException
|| (e instanceof StreamsException
&& (ExceptionUtils.indexOfThrowable(e, KsqlSerializationException.class) != -1))) {
if (!hasInternalTopicPrefix(e)) {
type = Type.USER;
LOG.info(
"Classified error as USER error based on schema mismatch. Query ID: {} Exception: {}",
queryId,
e);
}
}

return type;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,8 @@ private QueryErrorClassifier getConfiguredQueryErrorClassifier(
.and(new AuthorizationClassifier(applicationId))
.and(new KsqlFunctionClassifier(applicationId))
.and(new MissingSubjectClassifier(applicationId))
.and(new SchemaAuthorizationClassifier(applicationId));
.and(new SchemaAuthorizationClassifier(applicationId))
.and(new KsqlSerializationClassifier(applicationId));
return buildConfiguredClassifiers(ksqlConfig, applicationId)
.map(userErrorClassifiers::and)
.orElse(userErrorClassifiers);
Expand Down

0 comments on commit 56dddbb

Please sign in to comment.