From 9219c6d4caa29c9e695a94bea646c2b8065248ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Thu, 17 Sep 2020 22:36:45 -0500 Subject: [PATCH] feat: add KSQL processing log message on uncaught streams exceptions --- .../ProcessingLogMessageSchema.java | 16 +++- .../confluent/ksql/query/QueryExecutor.java | 4 +- .../ksql/util/KafkaStreamsThreadError.java | 96 +++++++++++++++++++ .../ksql/util/PersistentQueryMetadata.java | 17 +++- .../io/confluent/ksql/util/QueryMetadata.java | 2 +- .../util/KafkaStreamsThreadErrorTest.java | 76 +++++++++++++++ .../util/PersistentQueryMetadataTest.java | 29 +++++- .../SandboxedPersistentQueryMetadataTest.java | 6 +- .../ProcessingLogServerUtilsTest.java | 3 +- .../entity/QueryDescriptionFactoryTest.java | 6 +- 10 files changed, 247 insertions(+), 8 deletions(-) create mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/util/KafkaStreamsThreadError.java create mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/util/KafkaStreamsThreadErrorTest.java diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/logging/processing/ProcessingLogMessageSchema.java b/ksqldb-common/src/main/java/io/confluent/ksql/logging/processing/ProcessingLogMessageSchema.java index 1a858ca4c7a8..9fad96a57650 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/logging/processing/ProcessingLogMessageSchema.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/logging/processing/ProcessingLogMessageSchema.java @@ -76,11 +76,23 @@ public final class ProcessingLogMessageSchema { .optional() .build(); + public static final String KAFKA_STREAMS_THREAD_ERROR_FIELD_MESSAGE = "errorMessage"; + public static final String KAFKA_STREAMS_THREAD_ERROR_FIELD_NAME = "name"; + public static final String KAFKA_STREAMS_THREAD_ERROR_FIELD_CAUSE = "cause"; + + private static final Schema KAFKA_STREAMS_THREAD_ERROR_SCHEMA = SchemaBuilder.struct() + .name(NAMESPACE + "KafkaStreamsThreadError") + .field(KAFKA_STREAMS_THREAD_ERROR_FIELD_MESSAGE, Schema.OPTIONAL_STRING_SCHEMA) + .field(KAFKA_STREAMS_THREAD_ERROR_FIELD_NAME, Schema.OPTIONAL_STRING_SCHEMA) + .field(KAFKA_STREAMS_THREAD_ERROR_FIELD_CAUSE, CAUSE_SCHEMA) + .build(); + public enum MessageType { DESERIALIZATION_ERROR(0, DESERIALIZATION_ERROR_SCHEMA), RECORD_PROCESSING_ERROR(1, RECORD_PROCESSING_ERROR_SCHEMA), PRODUCTION_ERROR(2, PRODUCTION_ERROR_SCHEMA), - SERIALIZATION_ERROR(3, SERIALIZATION_ERROR_SCHEMA); + SERIALIZATION_ERROR(3, SERIALIZATION_ERROR_SCHEMA), + KAFKA_STREAMS_THREAD_ERROR(4, KAFKA_STREAMS_THREAD_ERROR_SCHEMA); private final int typeId; private final Schema schema; @@ -104,6 +116,7 @@ public Schema getSchema() { public static final String RECORD_PROCESSING_ERROR = "recordProcessingError"; public static final String PRODUCTION_ERROR = "productionError"; public static final String SERIALIZATION_ERROR = "serializationError"; + public static final String KAFKA_STREAMS_THREAD_ERROR = "kafkaStreamsThreadError"; public static final Schema PROCESSING_LOG_SCHEMA = SchemaBuilder.struct() .name(NAMESPACE + "ProcessingLogRecord") @@ -112,6 +125,7 @@ public Schema getSchema() { .field(RECORD_PROCESSING_ERROR, RECORD_PROCESSING_ERROR_SCHEMA) .field(PRODUCTION_ERROR, PRODUCTION_ERROR_SCHEMA) .field(SERIALIZATION_ERROR, SERIALIZATION_ERROR_SCHEMA) + .field(KAFKA_STREAMS_THREAD_ERROR, KAFKA_STREAMS_THREAD_ERROR_SCHEMA) .optional() .build(); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java index 88ed6c9ca8d5..5d57bbf4f57c 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java @@ -229,7 +229,9 @@ public PersistentQueryMetadata buildPersistentQuery( ksqlConfig.getLong(KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG), classifier, physicalPlan, - ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_ERROR_MAX_QUEUE_SIZE) + ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_ERROR_MAX_QUEUE_SIZE), + (ProcessingLogger) streamsProperties.get( + ProductionExceptionHandlerUtil.KSQL_PRODUCTION_ERROR_LOGGER) ); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/KafkaStreamsThreadError.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/KafkaStreamsThreadError.java new file mode 100644 index 000000000000..8083b04d1068 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/KafkaStreamsThreadError.java @@ -0,0 +1,96 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.util; + +import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.MessageType; +import static java.util.Objects.requireNonNull; + +import io.confluent.ksql.logging.processing.ProcessingLogConfig; +import io.confluent.ksql.logging.processing.ProcessingLogMessageSchema; +import io.confluent.ksql.logging.processing.ProcessingLogger; +import java.util.Objects; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.Struct; + +public final class KafkaStreamsThreadError implements ProcessingLogger.ErrorMessage { + public static KafkaStreamsThreadError of( + final String errorMsg, + final Thread thread, + final Throwable exception + ) { + return new KafkaStreamsThreadError(errorMsg, thread, exception); + } + + private final String errorMsg; + private final Thread thread; + private final Throwable exception; + + private KafkaStreamsThreadError( + final String errorMsg, + final Thread thread, + final Throwable exception + ) { + this.errorMsg = requireNonNull(errorMsg, "errorMsg"); + this.thread = requireNonNull(thread, "thread"); + this.exception = requireNonNull(exception, "exception"); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final KafkaStreamsThreadError that = (KafkaStreamsThreadError) o; + return Objects.equals(errorMsg, that.errorMsg) + && Objects.equals(thread.getName(), thread.getName()) + && Objects.equals(exception.getClass(), that.exception.getClass()) + && Objects.equals(exception.toString(), that.exception.toString()); + } + + @Override + public int hashCode() { + return Objects.hash(thread, exception); + } + + @Override + public SchemaAndValue get(final ProcessingLogConfig config) { + final Struct struct = new Struct(ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA) + .put(ProcessingLogMessageSchema.TYPE, + MessageType.KAFKA_STREAMS_THREAD_ERROR.getTypeId()) + .put(ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR, + streamsThreadError()); + + return new SchemaAndValue(ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA, struct); + } + + private Struct streamsThreadError() { + final Struct threadError = new Struct(MessageType.KAFKA_STREAMS_THREAD_ERROR.getSchema()) + .put( + ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR_FIELD_MESSAGE, + errorMsg) + .put( + ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR_FIELD_NAME, + thread.getName()) + .put( + ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR_FIELD_CAUSE, + ErrorMessageUtil.getErrorMessages(exception)); + + return threadError; + } +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java index dd6ee72a3321..a6be21c1ef90 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java @@ -23,6 +23,7 @@ import io.confluent.ksql.execution.plan.ExecutionStep; import io.confluent.ksql.execution.streams.materialization.Materialization; import io.confluent.ksql.execution.streams.materialization.MaterializationProvider; +import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.metastore.model.DataSource.DataSourceType; import io.confluent.ksql.name.SourceName; @@ -51,6 +52,7 @@ public class PersistentQueryMetadata extends QueryMetadata { materializationProviderBuilder; private Optional materializationProvider; + private ProcessingLogger processingLogger; // CHECKSTYLE_RULES.OFF: ParameterNumberCheck public PersistentQueryMetadata( @@ -72,7 +74,8 @@ public PersistentQueryMetadata( final long closeTimeout, final QueryErrorClassifier errorClassifier, final ExecutionStep physicalPlan, - final int maxQueryErrorsQueueSize + final int maxQueryErrorsQueueSize, + final ProcessingLogger processingLogger ) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck super( @@ -98,9 +101,12 @@ public PersistentQueryMetadata( this.physicalPlan = requireNonNull(physicalPlan, "physicalPlan"); this.materializationProviderBuilder = requireNonNull(materializationProviderBuilder, "materializationProviderBuilder"); + this.processingLogger = requireNonNull(processingLogger, "processingLogger"); this.materializationProvider = materializationProviderBuilder .flatMap(builder -> builder.apply(getKafkaStreams())); + + setUncaughtExceptionHandler(this::uncaughtHandler); } protected PersistentQueryMetadata( @@ -114,6 +120,15 @@ protected PersistentQueryMetadata( this.materializationProvider = other.materializationProvider; this.physicalPlan = other.physicalPlan; this.materializationProviderBuilder = other.materializationProviderBuilder; + this.processingLogger = other.processingLogger; + } + + @Override + protected void uncaughtHandler(final Thread thread, final Throwable error) { + super.uncaughtHandler(thread, error); + + processingLogger.error(KafkaStreamsThreadError.of( + "Unhandled exception caught in streams thread", thread, error)); } public DataSourceType getDataSourceType() { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java index 558da91b27ca..00716d0031da 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java @@ -154,7 +154,7 @@ public void onStop(final Consumer onStop) { this.onStop = onStop; } - private void uncaughtHandler(final Thread t, final Throwable e) { + protected void uncaughtHandler(final Thread t, final Throwable e) { LOG.error("Unhandled exception caught in streams thread {}.", t.getName(), e); final QueryError queryError = new QueryError( diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/KafkaStreamsThreadErrorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/KafkaStreamsThreadErrorTest.java new file mode 100644 index 000000000000..d12ef24f9c52 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/KafkaStreamsThreadErrorTest.java @@ -0,0 +1,76 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.util; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +import io.confluent.ksql.logging.processing.ProcessingLogConfig; +import io.confluent.ksql.logging.processing.ProcessingLogMessageSchema; +import io.confluent.ksql.logging.processing.ProcessingLogger.ErrorMessage; +import java.util.Collections; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.Struct; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class KafkaStreamsThreadErrorTest { + private final ProcessingLogConfig config = new ProcessingLogConfig(Collections.emptyMap()); + + private Thread thread = new Thread("thread-1"); + private Throwable error = new Exception("cause0"); + + @Test + public void shouldBuildKafkaStreamThreadErrorCorrectly() { + // Given: + final ErrorMessage errorMessage = KafkaStreamsThreadError.of("errorMsg", thread, error); + + // When: + final SchemaAndValue msgAndSchema = errorMessage.get(config); + + // Then: + assertThat(msgAndSchema.schema(), equalTo(ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA)); + final Struct msg = (Struct) msgAndSchema.value(); + assertThat( + msg.get(ProcessingLogMessageSchema.TYPE), + equalTo(ProcessingLogMessageSchema.MessageType.KAFKA_STREAMS_THREAD_ERROR.getTypeId()) + ); + assertThat( + msg.get(ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR), + notNullValue() + ); + final Struct kafkaStreamsThreadError = + msg.getStruct(ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR); + assertThat( + kafkaStreamsThreadError.get( + ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR_FIELD_MESSAGE), + equalTo("errorMsg") + ); + assertThat( + kafkaStreamsThreadError.get( + ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR_FIELD_NAME), + equalTo("thread-1") + ); + assertThat( + kafkaStreamsThreadError.get( + ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR_FIELD_CAUSE), + equalTo(ErrorMessageUtil.getErrorMessages(error)) + ); + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java index 3204af7650cc..2dc26ee4aacc 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java @@ -22,13 +22,16 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.confluent.ksql.execution.plan.ExecutionStep; import io.confluent.ksql.execution.streams.materialization.MaterializationProvider; +import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.query.KafkaStreamsBuilder; import io.confluent.ksql.query.MaterializationProviderBuilderFactory; +import io.confluent.ksql.query.QueryError; import io.confluent.ksql.query.QueryErrorClassifier; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.LogicalSchema; @@ -42,6 +45,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @@ -81,6 +85,8 @@ public class PersistentQueryMetadataTest { private QueryErrorClassifier queryErrorClassifier; @Mock private ExecutionStep physicalPlan; + @Mock + private ProcessingLogger processingLogger; private PersistentQueryMetadata query; @@ -109,7 +115,8 @@ public void setUp() { CLOSE_TIMEOUT, queryErrorClassifier, physicalPlan, - 10 + 10, + processingLogger ); } @@ -158,4 +165,24 @@ public void shouldNotRestartIfQueryIsClosed() { // Then: assertThat(e.getMessage(), containsString("is already closed, cannot restart.")); } + + @Test + public void shouldCallProcessingLoggerOnError() { + // Given: + final Thread thread = mock(Thread.class); + final Throwable error = mock(Throwable.class); + final ArgumentCaptor errorMessageCaptor = + ArgumentCaptor.forClass(ProcessingLogger.ErrorMessage.class); + when(queryErrorClassifier.classify(error)).thenReturn(QueryError.Type.SYSTEM); + + // When: + query.uncaughtHandler(thread, error); + + // Then: + verify(processingLogger).error(errorMessageCaptor.capture()); + assertThat( + KafkaStreamsThreadError.of( + "Unhandled exception caught in streams thread", thread, error), + is(errorMessageCaptor.getValue())); + } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataTest.java index bd389b35389a..89fbcb4ac5ac 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataTest.java @@ -17,6 +17,7 @@ import io.confluent.ksql.execution.plan.ExecutionStep; import io.confluent.ksql.execution.streams.materialization.MaterializationProvider; +import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.query.KafkaStreamsBuilder; import io.confluent.ksql.query.MaterializationProviderBuilderFactory; @@ -80,6 +81,8 @@ public class SandboxedPersistentQueryMetadataTest { private QueryErrorClassifier queryErrorClassifier; @Mock private ExecutionStep physicalPlan; + @Mock + private ProcessingLogger processingLogger; private PersistentQueryMetadata query; private SandboxedPersistentQueryMetadata sandbox; @@ -109,7 +112,8 @@ public void setUp() { CLOSE_TIMEOUT, queryErrorClassifier, physicalPlan, - 10 + 10, + processingLogger ); sandbox = SandboxedPersistentQueryMetadata.of(query, closeCallback); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/logging/processing/ProcessingLogServerUtilsTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/logging/processing/ProcessingLogServerUtilsTest.java index 9c76cd3503bc..0677d7bbccd5 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/logging/processing/ProcessingLogServerUtilsTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/logging/processing/ProcessingLogServerUtilsTest.java @@ -117,7 +117,8 @@ public void shouldBuildCorrectStreamCreateDDL() { + "deserializationError STRUCT, `topic` VARCHAR>, " + "recordProcessingError STRUCT>, " + "productionError STRUCT, " - + "serializationError STRUCT, `topic` VARCHAR>" + + "serializationError STRUCT, `topic` VARCHAR>, " + + "kafkaStreamsThreadError STRUCT>" + ">" + ") WITH(KAFKA_TOPIC='processing_log_topic', VALUE_FORMAT='JSON');")); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java index 1c811ed104bf..5b3327370ab6 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableSet; import io.confluent.ksql.execution.ddl.commands.KsqlTopic; import io.confluent.ksql.execution.plan.ExecutionStep; +import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.SourceName; @@ -106,6 +107,8 @@ public class QueryDescriptionFactoryTest { private ExecutionStep physicalPlan; @Mock private DataSource sinkDataSource; + @Mock + private ProcessingLogger processingLogger; private QueryMetadata transientQuery; private PersistentQueryMetadata persistentQuery; @@ -156,7 +159,8 @@ public void setUp() { closeTimeout, QueryErrorClassifier.DEFAULT_CLASSIFIER, physicalPlan, - 10 + 10, + processingLogger ); persistentQueryDescription = QueryDescriptionFactory.forQueryMetadata(persistentQuery, STATUS_MAP);