Skip to content

Commit

Permalink
feat: add KSQL processing log message on uncaught streams exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Sep 18, 2020
1 parent 5974cfd commit 9219c6d
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,23 @@ public final class ProcessingLogMessageSchema {
.optional()
.build();

public static final String KAFKA_STREAMS_THREAD_ERROR_FIELD_MESSAGE = "errorMessage";
public static final String KAFKA_STREAMS_THREAD_ERROR_FIELD_NAME = "name";
public static final String KAFKA_STREAMS_THREAD_ERROR_FIELD_CAUSE = "cause";

private static final Schema KAFKA_STREAMS_THREAD_ERROR_SCHEMA = SchemaBuilder.struct()
.name(NAMESPACE + "KafkaStreamsThreadError")
.field(KAFKA_STREAMS_THREAD_ERROR_FIELD_MESSAGE, Schema.OPTIONAL_STRING_SCHEMA)
.field(KAFKA_STREAMS_THREAD_ERROR_FIELD_NAME, Schema.OPTIONAL_STRING_SCHEMA)
.field(KAFKA_STREAMS_THREAD_ERROR_FIELD_CAUSE, CAUSE_SCHEMA)
.build();

public enum MessageType {
DESERIALIZATION_ERROR(0, DESERIALIZATION_ERROR_SCHEMA),
RECORD_PROCESSING_ERROR(1, RECORD_PROCESSING_ERROR_SCHEMA),
PRODUCTION_ERROR(2, PRODUCTION_ERROR_SCHEMA),
SERIALIZATION_ERROR(3, SERIALIZATION_ERROR_SCHEMA);
SERIALIZATION_ERROR(3, SERIALIZATION_ERROR_SCHEMA),
KAFKA_STREAMS_THREAD_ERROR(4, KAFKA_STREAMS_THREAD_ERROR_SCHEMA);

private final int typeId;
private final Schema schema;
Expand All @@ -104,6 +116,7 @@ public Schema getSchema() {
public static final String RECORD_PROCESSING_ERROR = "recordProcessingError";
public static final String PRODUCTION_ERROR = "productionError";
public static final String SERIALIZATION_ERROR = "serializationError";
public static final String KAFKA_STREAMS_THREAD_ERROR = "kafkaStreamsThreadError";

public static final Schema PROCESSING_LOG_SCHEMA = SchemaBuilder.struct()
.name(NAMESPACE + "ProcessingLogRecord")
Expand All @@ -112,6 +125,7 @@ public Schema getSchema() {
.field(RECORD_PROCESSING_ERROR, RECORD_PROCESSING_ERROR_SCHEMA)
.field(PRODUCTION_ERROR, PRODUCTION_ERROR_SCHEMA)
.field(SERIALIZATION_ERROR, SERIALIZATION_ERROR_SCHEMA)
.field(KAFKA_STREAMS_THREAD_ERROR, KAFKA_STREAMS_THREAD_ERROR_SCHEMA)
.optional()
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ public PersistentQueryMetadata buildPersistentQuery(
ksqlConfig.getLong(KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG),
classifier,
physicalPlan,
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_ERROR_MAX_QUEUE_SIZE)
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_ERROR_MAX_QUEUE_SIZE),
(ProcessingLogger) streamsProperties.get(
ProductionExceptionHandlerUtil.KSQL_PRODUCTION_ERROR_LOGGER)
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.util;

import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.MessageType;
import static java.util.Objects.requireNonNull;

import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import io.confluent.ksql.logging.processing.ProcessingLogMessageSchema;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import java.util.Objects;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;

public final class KafkaStreamsThreadError implements ProcessingLogger.ErrorMessage {
public static KafkaStreamsThreadError of(
final String errorMsg,
final Thread thread,
final Throwable exception
) {
return new KafkaStreamsThreadError(errorMsg, thread, exception);
}

private final String errorMsg;
private final Thread thread;
private final Throwable exception;

private KafkaStreamsThreadError(
final String errorMsg,
final Thread thread,
final Throwable exception
) {
this.errorMsg = requireNonNull(errorMsg, "errorMsg");
this.thread = requireNonNull(thread, "thread");
this.exception = requireNonNull(exception, "exception");
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final KafkaStreamsThreadError that = (KafkaStreamsThreadError) o;
return Objects.equals(errorMsg, that.errorMsg)
&& Objects.equals(thread.getName(), thread.getName())
&& Objects.equals(exception.getClass(), that.exception.getClass())
&& Objects.equals(exception.toString(), that.exception.toString());
}

@Override
public int hashCode() {
return Objects.hash(thread, exception);
}

@Override
public SchemaAndValue get(final ProcessingLogConfig config) {
final Struct struct = new Struct(ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA)
.put(ProcessingLogMessageSchema.TYPE,
MessageType.KAFKA_STREAMS_THREAD_ERROR.getTypeId())
.put(ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR,
streamsThreadError());

return new SchemaAndValue(ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA, struct);
}

private Struct streamsThreadError() {
final Struct threadError = new Struct(MessageType.KAFKA_STREAMS_THREAD_ERROR.getSchema())
.put(
ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR_FIELD_MESSAGE,
errorMsg)
.put(
ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR_FIELD_NAME,
thread.getName())
.put(
ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR_FIELD_CAUSE,
ErrorMessageUtil.getErrorMessages(exception));

return threadError;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.execution.streams.materialization.MaterializationProvider;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.name.SourceName;
Expand Down Expand Up @@ -51,6 +52,7 @@ public class PersistentQueryMetadata extends QueryMetadata {
materializationProviderBuilder;

private Optional<MaterializationProvider> materializationProvider;
private ProcessingLogger processingLogger;

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
public PersistentQueryMetadata(
Expand All @@ -72,7 +74,8 @@ public PersistentQueryMetadata(
final long closeTimeout,
final QueryErrorClassifier errorClassifier,
final ExecutionStep<?> physicalPlan,
final int maxQueryErrorsQueueSize
final int maxQueryErrorsQueueSize,
final ProcessingLogger processingLogger
) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
super(
Expand All @@ -98,9 +101,12 @@ public PersistentQueryMetadata(
this.physicalPlan = requireNonNull(physicalPlan, "physicalPlan");
this.materializationProviderBuilder =
requireNonNull(materializationProviderBuilder, "materializationProviderBuilder");
this.processingLogger = requireNonNull(processingLogger, "processingLogger");

this.materializationProvider = materializationProviderBuilder
.flatMap(builder -> builder.apply(getKafkaStreams()));

setUncaughtExceptionHandler(this::uncaughtHandler);
}

protected PersistentQueryMetadata(
Expand All @@ -114,6 +120,15 @@ protected PersistentQueryMetadata(
this.materializationProvider = other.materializationProvider;
this.physicalPlan = other.physicalPlan;
this.materializationProviderBuilder = other.materializationProviderBuilder;
this.processingLogger = other.processingLogger;
}

@Override
protected void uncaughtHandler(final Thread thread, final Throwable error) {
super.uncaughtHandler(thread, error);

processingLogger.error(KafkaStreamsThreadError.of(
"Unhandled exception caught in streams thread", thread, error));
}

public DataSourceType getDataSourceType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void onStop(final Consumer<Boolean> onStop) {
this.onStop = onStop;
}

private void uncaughtHandler(final Thread t, final Throwable e) {
protected void uncaughtHandler(final Thread t, final Throwable e) {
LOG.error("Unhandled exception caught in streams thread {}.", t.getName(), e);
final QueryError queryError =
new QueryError(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.util;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;

import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import io.confluent.ksql.logging.processing.ProcessingLogMessageSchema;
import io.confluent.ksql.logging.processing.ProcessingLogger.ErrorMessage;
import java.util.Collections;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class KafkaStreamsThreadErrorTest {
private final ProcessingLogConfig config = new ProcessingLogConfig(Collections.emptyMap());

private Thread thread = new Thread("thread-1");
private Throwable error = new Exception("cause0");

@Test
public void shouldBuildKafkaStreamThreadErrorCorrectly() {
// Given:
final ErrorMessage errorMessage = KafkaStreamsThreadError.of("errorMsg", thread, error);

// When:
final SchemaAndValue msgAndSchema = errorMessage.get(config);

// Then:
assertThat(msgAndSchema.schema(), equalTo(ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA));
final Struct msg = (Struct) msgAndSchema.value();
assertThat(
msg.get(ProcessingLogMessageSchema.TYPE),
equalTo(ProcessingLogMessageSchema.MessageType.KAFKA_STREAMS_THREAD_ERROR.getTypeId())
);
assertThat(
msg.get(ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR),
notNullValue()
);
final Struct kafkaStreamsThreadError =
msg.getStruct(ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR);
assertThat(
kafkaStreamsThreadError.get(
ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR_FIELD_MESSAGE),
equalTo("errorMsg")
);
assertThat(
kafkaStreamsThreadError.get(
ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR_FIELD_NAME),
equalTo("thread-1")
);
assertThat(
kafkaStreamsThreadError.get(
ProcessingLogMessageSchema.KAFKA_STREAMS_THREAD_ERROR_FIELD_CAUSE),
equalTo(ErrorMessageUtil.getErrorMessages(error))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.streams.materialization.MaterializationProvider;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.query.KafkaStreamsBuilder;
import io.confluent.ksql.query.MaterializationProviderBuilderFactory;
import io.confluent.ksql.query.QueryError;
import io.confluent.ksql.query.QueryErrorClassifier;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.LogicalSchema;
Expand All @@ -42,6 +45,7 @@
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
Expand Down Expand Up @@ -81,6 +85,8 @@ public class PersistentQueryMetadataTest {
private QueryErrorClassifier queryErrorClassifier;
@Mock
private ExecutionStep<?> physicalPlan;
@Mock
private ProcessingLogger processingLogger;

private PersistentQueryMetadata query;

Expand Down Expand Up @@ -109,7 +115,8 @@ public void setUp() {
CLOSE_TIMEOUT,
queryErrorClassifier,
physicalPlan,
10
10,
processingLogger
);
}

Expand Down Expand Up @@ -158,4 +165,24 @@ public void shouldNotRestartIfQueryIsClosed() {
// Then:
assertThat(e.getMessage(), containsString("is already closed, cannot restart."));
}

@Test
public void shouldCallProcessingLoggerOnError() {
// Given:
final Thread thread = mock(Thread.class);
final Throwable error = mock(Throwable.class);
final ArgumentCaptor<ProcessingLogger.ErrorMessage> errorMessageCaptor =
ArgumentCaptor.forClass(ProcessingLogger.ErrorMessage.class);
when(queryErrorClassifier.classify(error)).thenReturn(QueryError.Type.SYSTEM);

// When:
query.uncaughtHandler(thread, error);

// Then:
verify(processingLogger).error(errorMessageCaptor.capture());
assertThat(
KafkaStreamsThreadError.of(
"Unhandled exception caught in streams thread", thread, error),
is(errorMessageCaptor.getValue()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.streams.materialization.MaterializationProvider;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.query.KafkaStreamsBuilder;
import io.confluent.ksql.query.MaterializationProviderBuilderFactory;
Expand Down Expand Up @@ -80,6 +81,8 @@ public class SandboxedPersistentQueryMetadataTest {
private QueryErrorClassifier queryErrorClassifier;
@Mock
private ExecutionStep<?> physicalPlan;
@Mock
private ProcessingLogger processingLogger;

private PersistentQueryMetadata query;
private SandboxedPersistentQueryMetadata sandbox;
Expand Down Expand Up @@ -109,7 +112,8 @@ public void setUp() {
CLOSE_TIMEOUT,
queryErrorClassifier,
physicalPlan,
10
10,
processingLogger
);

sandbox = SandboxedPersistentQueryMetadata.of(query, closeCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ public void shouldBuildCorrectStreamCreateDDL() {
+ "deserializationError STRUCT<target VARCHAR, errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, "
+ "recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, "
+ "productionError STRUCT<errorMessage VARCHAR>, "
+ "serializationError STRUCT<target VARCHAR, errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>"
+ "serializationError STRUCT<target VARCHAR, errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, "
+ "kafkaStreamsThreadError STRUCT<errorMessage VARCHAR, name VARCHAR, cause ARRAY<VARCHAR>>"
+ ">"
+ ") WITH(KAFKA_TOPIC='processing_log_topic', VALUE_FORMAT='JSON');"));
}
Expand Down
Loading

0 comments on commit 9219c6d

Please sign in to comment.