From 743de036e7015041fbeb3a9865b67ac688b58318 Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Wed, 20 Apr 2022 12:24:18 -0700 Subject: [PATCH 1/4] feat: add metric for query restarts --- .../confluent/ksql/util/MetricsTagsUtil.java | 43 ++++ .../io/confluent/ksql/query/QueryBuilder.java | 12 +- .../BinPackedPersistentQueryMetadataImpl.java | 10 + .../util/PersistentQueryMetadataImpl.java | 10 +- .../io/confluent/ksql/util/QueryMetadata.java | 4 +- .../ksql/util/QueryMetadataImpl.java | 80 +++++-- .../util/SharedKafkaStreamsRuntimeImpl.java | 48 ++++- .../ksql/util/TransientQueryMetadata.java | 10 +- .../util/PersistentQueryMetadataTest.java | 16 +- .../ksql/util/QueryMetadataTest.java | 101 +++++++-- ...dboxedPersistentQueryMetadataImplTest.java | 8 +- .../SharedKafkaStreamsRuntimeImplTest.java | 88 +++++++- .../ksql/util/TransientQueryMetadataTest.java | 10 +- .../entity/QueryDescriptionFactoryTest.java | 20 +- .../QueryRestartMetricFunctionalTest.java | 202 ++++++++++++++++++ .../integration/RestIntegrationTestUtil.java | 23 ++ .../streaming/StreamedQueryResourceTest.java | 7 +- 17 files changed, 646 insertions(+), 46 deletions(-) create mode 100644 ksqldb-common/src/main/java/io/confluent/ksql/util/MetricsTagsUtil.java create mode 100644 ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/QueryRestartMetricFunctionalTest.java diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/MetricsTagsUtil.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/MetricsTagsUtil.java new file mode 100644 index 000000000000..f85470d5ee42 --- /dev/null +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/MetricsTagsUtil.java @@ -0,0 +1,43 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.util; + +import java.util.HashMap; +import java.util.Map; + +public final class MetricsTagsUtil { + + private MetricsTagsUtil() {} + + public static Map getCustomMetricsTagsForQuery( + final String id, + final KsqlConfig config + ) { + return getCustomMetricsTagsForQuery( + id, + config.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS)); + } + + public static Map getCustomMetricsTagsForQuery( + final String id, + final Map metricsTags + ) { + final Map customMetricsTags = + new HashMap<>(metricsTags); + customMetricsTags.put("query_id", id); + return customMetricsTags; + } +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java index 1635f1a9aed3..7335192a1515 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java @@ -224,7 +224,9 @@ TransientQueryMetadata buildTransientQuery( resultType, ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_INITIAL_MS), ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_MAX_MS), - listener + listener, + metricCollectors.getMetrics(), + ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS) ); } @@ -371,7 +373,9 @@ PersistentQueryMetadata buildPersistentQueryInDedicatedRuntime( ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_INITIAL_MS), ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_MAX_MS), listener, - scalablePushRegistry + scalablePushRegistry, + metricCollectors.getMetrics(), + ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS) ); } @@ -618,7 +622,9 @@ private SharedKafkaStreamsRuntime getKafkaStreamsInstance( metricCollectors, config.getConfig(true), processingLogContext - ) + ), + metricCollectors.getMetrics(), + ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS) ); } else { stream = new SandboxedSharedKafkaStreamsRuntimeImpl( diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImpl.java index 55573870945f..88d3fe58ea62 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImpl.java @@ -46,6 +46,9 @@ import java.util.Optional; import java.util.Set; import java.util.function.Function; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.LagInfo; @@ -77,6 +80,8 @@ public class BinPackedPersistentQueryMetadataImpl implements PersistentQueryMeta private final Listener listener; private final Function namedTopologyBuilder; private final TimeBoundedQueue queryErrors; + private final Sensor sensor; + private final Metrics metrics; private final Optional materializationProviderBuilder; @@ -385,6 +390,7 @@ public void close() { sharedKafkaStreamsRuntime.stop(queryId, false); scalablePushRegistry.ifPresent(ScalablePushRegistry::close); listener.onClose(this); + metrics.removeSensor(sensor.name()); } @Override @@ -406,4 +412,8 @@ Listener getListener() { return listener; } + Sensor getSensor() { + return sensor; + } + } \ No newline at end of file diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java index 9270eb53afd4..ac25b6d51e33 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java @@ -40,6 +40,8 @@ import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; + +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; @@ -86,7 +88,9 @@ public PersistentQueryMetadataImpl( final long retryBackoffInitialMs, final long retryBackoffMaxMs, final QueryMetadata.Listener listener, - final Optional scalablePushRegistry + final Optional scalablePushRegistry, + final Metrics metrics, + final Map metricsTags ) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck super( @@ -105,7 +109,9 @@ public PersistentQueryMetadataImpl( maxQueryErrorsQueueSize, retryBackoffInitialMs, retryBackoffMaxMs, - new QueryListenerWrapper(listener, scalablePushRegistry) + new QueryListenerWrapper(listener, scalablePushRegistry), + metrics, + metricsTags ); this.sinkDataSource = requireNonNull(sinkDataSource, "sinkDataSource"); this.schemas = requireNonNull(schemas, "schemas"); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java index 8dd7d4202ee3..b8eee5edd6fb 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java @@ -81,9 +81,9 @@ interface RetryEvent { long nextRestartTimeMs(); - int getNumRetries(); + int getNumRetries(final String threadName); - void backOff(); + void backOff(final String threadName); } interface Listener { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java index 7c6cf27a7a4e..e254f9b24712 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java @@ -36,12 +36,19 @@ import java.time.Duration; import java.util.Collection; import java.util.List; +import java.util.Queue; import java.util.Map; import java.util.Objects; -import java.util.Queue; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Collectors; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.LagInfo; @@ -53,6 +60,10 @@ import org.slf4j.LoggerFactory; public class QueryMetadataImpl implements QueryMetadata { + public static final String QUERY_RESTART_METRIC_NAME = "query-restart-total"; + public static final String QUERY_RESTART_METRIC_GROUP_NAME = "query-restart-metrics"; + public static final String QUERY_RESTART_METRIC_DESCRIPTION = + "The total number of times that a query thread has failed and then been restarted."; private static final Logger LOG = LoggerFactory.getLogger(QueryMetadataImpl.class); @@ -71,6 +82,9 @@ public class QueryMetadataImpl implements QueryMetadata { private final TimeBoundedQueue queryErrors; private final RetryEvent retryEvent; private final Listener listener; + private final Optional metrics; + private final Optional restartSensor; + private volatile boolean everStarted = false; private volatile KafkaStreams kafkaStreams; // These fields don't need synchronization because they are initialized in initialize() before @@ -103,7 +117,9 @@ public long read() { final int maxQueryErrorsQueueSize, final long baseWaitingTimeMs, final long retryBackoffMaxMs, - final Listener listener + final Listener listener, + final Metrics metrics, + final Map metricsTags ) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck this.statementString = Objects.requireNonNull(statementString, "statementString"); @@ -126,11 +142,26 @@ public long read() { this.queryId = Objects.requireNonNull(queryId, "queryId"); this.errorClassifier = Objects.requireNonNull(errorClassifier, "errorClassifier"); this.queryErrors = new TimeBoundedQueue(Duration.ofHours(1), maxQueryErrorsQueueSize); + this.metrics = Optional.of(metrics); + + final Map customMetricsTagsForQuery = + MetricsTagsUtil.getCustomMetricsTagsForQuery(queryId.toString(), metricsTags); + final MetricName restartMetricName = metrics.metricName( + QUERY_RESTART_METRIC_NAME, + QUERY_RESTART_METRIC_GROUP_NAME, + QUERY_RESTART_METRIC_DESCRIPTION, + customMetricsTagsForQuery + ); + this.restartSensor = Optional.of( + metrics.sensor(QUERY_RESTART_METRIC_GROUP_NAME + "-" + queryId)); + restartSensor.get().add(restartMetricName, new CumulativeSum()); + this.retryEvent = new RetryEvent( queryId, baseWaitingTimeMs, retryBackoffMaxMs, - CURRENT_TIME_MILLIS_TICKER + CURRENT_TIME_MILLIS_TICKER, + restartSensor ); } @@ -152,6 +183,8 @@ public long read() { this.errorClassifier = other.errorClassifier; this.everStarted = other.everStarted; this.queryErrors = new TimeBoundedQueue(Duration.ZERO, 0); + this.metrics = Optional.empty(); + this.restartSensor = Optional.empty(); this.retryEvent = new RetryEvent( other.getQueryId(), 0, @@ -161,7 +194,8 @@ public long read() { public long read() { return 0; } - } + }, + this.restartSensor ); this.listener = Objects.requireNonNull(listener, "stopListeners"); @@ -204,7 +238,7 @@ protected StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse uncaught e ); } - retryEvent.backOff(); + retryEvent.backOff(Thread.currentThread().getName()); return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD; } @@ -323,6 +357,11 @@ Listener getListener() { return listener; } + @VisibleForTesting + protected Optional getRestartSensor() { + return restartSensor; + } + private void resetKafkaStreams(final KafkaStreams kafkaStreams) { this.kafkaStreams = kafkaStreams; setUncaughtExceptionHandler(this::uncaughtHandler); @@ -352,6 +391,9 @@ protected boolean closeKafkaStreams() { * schemas, etc...). */ public void close() { + if (metrics.isPresent() && restartSensor.isPresent()) { + metrics.get().removeSensor(restartSensor.get().name()); + } doClose(true); listener.onClose(this); } @@ -417,16 +459,18 @@ public static class RetryEvent implements QueryMetadata.RetryEvent { private final Ticker ticker; private final QueryId queryId; - private int numRetries = 0; + private Map numRetries = new ConcurrentHashMap<>(); private long waitingTimeMs; private long expiryTimeMs; private long retryBackoffMaxMs; + private Optional sensor; RetryEvent( final QueryId queryId, final long baseWaitingTimeMs, final long retryBackoffMaxMs, - final Ticker ticker + final Ticker ticker, + final Optional sensor ) { this.ticker = ticker; this.queryId = queryId; @@ -436,19 +480,18 @@ public static class RetryEvent implements QueryMetadata.RetryEvent { this.waitingTimeMs = baseWaitingTimeMs; this.retryBackoffMaxMs = retryBackoffMaxMs; this.expiryTimeMs = now + baseWaitingTimeMs; + this.sensor = sensor; } public long nextRestartTimeMs() { return expiryTimeMs; } - public int getNumRetries() { - return numRetries; + public int getNumRetries(final String threadName) { + return numRetries.getOrDefault(threadName, 0); } - public void backOff() { - numRetries++; - + public void backOff(final String threadName) { final long now = ticker.read(); this.waitingTimeMs = getWaitingTimeMs(); @@ -459,7 +502,18 @@ public void backOff() { Thread.currentThread().interrupt(); } - LOG.info("Restarting query {} (attempt #{})", queryId, numRetries); + sensor.ifPresent(Sensor::record); + if (numRetries.containsKey(threadName)) { + numRetries.put(threadName, numRetries.get(threadName) + 1); + } else { + numRetries.put(threadName, 1); + } + + LOG.info( + "Restarting query {} thread {} (attempt #{})", + queryId, + threadName, + numRetries.get(threadName)); // Math.max() prevents overflow if now is Long.MAX_VALUE (found just in tests) this.expiryTimeMs = Math.max(now, now + waitingTimeMs); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java index 2104f400ce5f..7e0ba30066e9 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java @@ -15,6 +15,7 @@ package io.confluent.ksql.util; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.query.KafkaStreamsBuilder; @@ -25,11 +26,16 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.StateListener; import org.apache.kafka.streams.errors.StreamsException; @@ -48,12 +54,21 @@ public class SharedKafkaStreamsRuntimeImpl extends SharedKafkaStreamsRuntime { private final QueryErrorClassifier errorClassifier; private final int maxQueryErrorsQueueSize; private final List> topolgogiesToAdd; + private final Metrics metrics; + private final Map metricsTags; + + // Since we have multiple queries registered to a runtime, we need to keep track of all the + // metrics sensor in order to record metrics for each individual query, and to clean up the sensor + // if the query is d + private final Map queryIdSensorMap; public SharedKafkaStreamsRuntimeImpl(final KafkaStreamsBuilder kafkaStreamsBuilder, final QueryErrorClassifier errorClassifier, final int maxQueryErrorsQueueSize, final long shutdownTimeoutConfig, - final Map streamsProperties) { + final Map streamsProperties, + final Metrics metrics, + final Map metricsTags) { super( kafkaStreamsBuilder, streamsProperties @@ -63,6 +78,9 @@ public SharedKafkaStreamsRuntimeImpl(final KafkaStreamsBuilder kafkaStreamsBuild shutdownTimeout = shutdownTimeoutConfig; setupAndStartKafkaStreams(kafkaStreams); topolgogiesToAdd = new ArrayList<>(); + this.metrics = metrics; + this.metricsTags = metricsTags; + this.queryIdSensorMap = new HashMap<>(); } @Override @@ -71,6 +89,7 @@ public void register( ) { final QueryId queryId = binpackedPersistentQueryMetadata.getQueryId(); collocatedQueries.put(queryId, binpackedPersistentQueryMetadata); + queryIdSensorMap.put(queryId, createQueryRestartMetricSensor(queryId.toString())); log.info("Registered query: {} in {} \n" + "Runtime {} is executing these queries: {}", queryId, @@ -124,6 +143,7 @@ public StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse uncaughtHan if (queryInError != null) { queryInError.setQueryError(queryError); + queryIdSensorMap.get(queryInError.getQueryId()).record(); log.error(String.format( "Unhandled query exception caught in streams thread %s for query %s. (%s)", Thread.currentThread().getName(), @@ -134,6 +154,7 @@ public StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse uncaughtHan } else { for (BinPackedPersistentQueryMetadataImpl query : collocatedQueries.values()) { query.setQueryError(queryError); + queryIdSensorMap.get(query.getQueryId()).record(); } log.error(String.format( "Unhandled runtime exception caught in streams thread %s. (%s)", @@ -143,6 +164,7 @@ public StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse uncaughtHan ); } } + return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD; } @@ -216,6 +238,7 @@ public void stop(final QueryId queryId, final boolean isCreateOrReplace) { if (!isCreateOrReplace) { // we don't want to lose it from this runtime collocatedQueries.remove(queryId); + queryIdSensorMap.remove(queryId); } log.info("Query {} was stopped successfully", queryId); } @@ -242,6 +265,7 @@ public void start(final QueryId queryId) { throw new IllegalArgumentException("Cannot start because query " + queryId + " was not " + "registered to runtime " + getApplicationId()); } + log.info("Query {} was started successfully", queryId); } @@ -266,4 +290,26 @@ public void restartStreamsRuntime() { } setupAndStartKafkaStreams(kafkaStreamsNamedTopologyWrapper); } + + @VisibleForTesting + protected Map getQueryIdSensorMap() { + return queryIdSensorMap; + } + + // returns a metrics sensor that tracks the number of times a query was restarted when hitting + // an uncaught exception + private Sensor createQueryRestartMetricSensor(final String queryId) { + final Map customMetricsTagsForQuery = + MetricsTagsUtil.getCustomMetricsTagsForQuery(queryId, metricsTags); + final MetricName restartMetricName = metrics.metricName( + QueryMetadataImpl.QUERY_RESTART_METRIC_NAME, + QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME, + QueryMetadataImpl.QUERY_RESTART_METRIC_DESCRIPTION, + customMetricsTagsForQuery + ); + final Sensor sensor = + metrics.sensor(QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME + "-" + queryId); + sensor.add(restartMetricName, new CumulativeSum()); + return sensor; + } } \ No newline at end of file diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java index 7b6dc6e54263..906bdb745e97 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java @@ -29,6 +29,8 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.streams.Topology; /** @@ -58,7 +60,9 @@ public TransientQueryMetadata( final ResultType resultType, final long retryBackoffInitialMs, final long retryBackoffMaxMs, - final Listener listener + final Listener listener, + final Metrics metrics, + final Map metricsTags ) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck super( @@ -77,7 +81,9 @@ public TransientQueryMetadata( maxQueryErrorsQueueSize, retryBackoffInitialMs, retryBackoffMaxMs, - listener + listener, + metrics, + metricsTags ); this.rowQueue = Objects.requireNonNull(rowQueue, "rowQueue"); this.resultType = Objects.requireNonNull(resultType, "resultType"); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java index cdf24f9ad119..947968ae4149 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java @@ -44,6 +44,8 @@ import java.util.Collections; import java.util.Map; import java.util.Optional; + +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.Topology; @@ -94,6 +96,8 @@ public class PersistentQueryMetadataTest { private Listener listener; @Mock private ScalablePushRegistry scalablePushRegistry; + @Mock + private Metrics metrics; private PersistentQueryMetadata query; @@ -128,7 +132,9 @@ public void setUp() { 0L, 0L, listener, - Optional.of(scalablePushRegistry) + Optional.of(scalablePushRegistry), + metrics, + Collections.emptyMap() ); query.initialize(); @@ -160,7 +166,9 @@ public void shouldReturnInsertQueryType() { 0L, 0L, listener, - Optional.empty() + Optional.empty(), + metrics, + Collections.emptyMap() ); // When/Then @@ -193,7 +201,9 @@ public void shouldReturnCreateAsQueryType() { 0L, 0L, listener, - Optional.empty() + Optional.empty(), + metrics, + Collections.emptyMap() ); // When/Then diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/QueryMetadataTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/QueryMetadataTest.java index c2a375757dd5..7dc43004d49b 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/QueryMetadataTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/QueryMetadataTest.java @@ -18,6 +18,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.same; @@ -30,6 +31,7 @@ import com.google.common.base.Ticker; import com.google.common.collect.ImmutableSet; import io.confluent.ksql.logging.query.QueryLogger; +import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.KafkaStreamsBuilder; @@ -43,7 +45,13 @@ import io.confluent.ksql.util.KsqlConstants.KsqlQueryType; import java.time.Duration; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; import java.util.Set; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.Topology; @@ -87,11 +95,16 @@ public class QueryMetadataTest { private ArgumentCaptor streamsListenerCaptor; @Mock private Ticker ticker; + @Mock + private Sensor mockSensor; private QueryMetadataImpl query; + private MetricCollectors metricCollectors; + private final Map metricsTags = Collections.singletonMap("cluster-id", "cluster-1"); @Before public void setup() { + metricCollectors = new MetricCollectors(); when(kafkaStreamsBuilder.build(topoplogy, Collections.emptyMap())).thenReturn(kafkaStreams); when(classifier.classify(any())).thenReturn(Type.UNKNOWN); when(kafkaStreams.state()).thenReturn(State.NOT_RUNNING); @@ -112,7 +125,9 @@ public void setup() { 10, 0L, 0L, - listener + listener, + metricCollectors.getMetrics(), + metricsTags ){ }; query.initialize(); @@ -261,11 +276,12 @@ public void shouldRetryEventStartWithInitialValues() { QUERY_ID, RETRY_BACKOFF_INITIAL_MS, RETRY_BACKOFF_MAX_MS, - ticker + ticker, + Optional.of(mockSensor) ); // Then: - assertThat(retryEvent.getNumRetries(), is(0)); + assertThat(retryEvent.getNumRetries("thread-name"), is(0)); assertThat(retryEvent.nextRestartTimeMs(), is(now + RETRY_BACKOFF_INITIAL_MS)); } @@ -280,14 +296,19 @@ public void shouldRetryEventRestartAndIncrementBackoffTime() { QUERY_ID, RETRY_BACKOFF_INITIAL_MS, RETRY_BACKOFF_MAX_MS, - ticker + ticker, + Optional.of(mockSensor) ); - retryEvent.backOff(); + retryEvent.backOff("thread-name"); + retryEvent.backOff("thread-name"); + retryEvent.backOff("thread-name-2"); + final int numBackOff = 3; // Then: - assertThat(retryEvent.getNumRetries(), is(1)); - assertThat(retryEvent.nextRestartTimeMs(), is(now + RETRY_BACKOFF_INITIAL_MS * 2)); + assertThat(retryEvent.getNumRetries("thread-name"), is(2)); + assertThat(retryEvent.getNumRetries("thread-name-2"), is(1)); + assertThat(retryEvent.nextRestartTimeMs(), is(now + (RETRY_BACKOFF_INITIAL_MS * (int)(Math.pow(2, numBackOff))))); } @Test @@ -301,14 +322,19 @@ public void shouldRetryEventRestartAndNotExceedBackoffMaxTime() { QUERY_ID, RETRY_BACKOFF_INITIAL_MS, RETRY_BACKOFF_MAX_MS, - ticker + ticker, + Optional.of(mockSensor) ); - retryEvent.backOff(); - retryEvent.backOff(); + retryEvent.backOff("thread-name"); + retryEvent.backOff("thread-name"); + retryEvent.backOff("thread-name"); + retryEvent.backOff("thread-name"); + retryEvent.backOff("thread-name"); + retryEvent.backOff("thread-name"); // Then: - assertThat(retryEvent.getNumRetries(), is(2)); - assertThat(retryEvent.nextRestartTimeMs(), lessThan(now + RETRY_BACKOFF_MAX_MS)); + assertThat(retryEvent.getNumRetries("thread-name"), is(6)); + assertThat(retryEvent.nextRestartTimeMs(), lessThanOrEqualTo(now + RETRY_BACKOFF_MAX_MS)); } @Test @@ -321,4 +347,55 @@ public void shouldEvictBasedOnTime() { assertThat(queue.toImmutableList().size(), is(0)); } + @Test + public void shouldRecordMetricForEachStreamsThreadRestarted() { + // Given: + final long now = 20; + when(ticker.read()).thenReturn(now); + + // When: + final QueryMetadataImpl.RetryEvent retryEvent = new QueryMetadataImpl.RetryEvent( + QUERY_ID, + RETRY_BACKOFF_INITIAL_MS, + RETRY_BACKOFF_MAX_MS, + ticker, + query.getRestartSensor() + ); + retryEvent.backOff("thread-name"); + retryEvent.backOff("thread-name"); + retryEvent.backOff("thread-name-2"); + + // Then: + assertThat(getMetricValue(QUERY_ID.toString(), metricsTags), is(3.0)); + } + + @Test + public void shouldIncrementMetricWhenUncaughtExceptionAndRestart() { + // Given: + when(classifier.classify(any())).thenThrow(new RuntimeException("bar")); + + // When: + query.uncaughtHandler(new RuntimeException("foo")); + query.uncaughtHandler(new RuntimeException("bar")); + query.uncaughtHandler(new RuntimeException("too")); + + + // Then: + assertThat(getMetricValue(QUERY_ID.toString(), metricsTags), is(3.0)); + } + + private double getMetricValue(final String queryId, final Map metricsTags) { + final Map customMetricsTags = new HashMap<>(metricsTags); + customMetricsTags.put("query_id", queryId); + final Metrics metrics = metricCollectors.getMetrics(); + return Double.parseDouble( + metrics.metric( + metrics.metricName( + QueryMetadataImpl.QUERY_RESTART_METRIC_NAME, + QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME, + QueryMetadataImpl.QUERY_RESTART_METRIC_DESCRIPTION, + customMetricsTags) + ).metricValue().toString() + ); + } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataImplTest.java index 8e163651a2f3..f28481fdb997 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataImplTest.java @@ -37,6 +37,8 @@ import java.util.Map; import java.util.Optional; import java.util.function.Consumer; + +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.Topology; import org.junit.Before; @@ -86,6 +88,8 @@ public class SandboxedPersistentQueryMetadataImplTest { private Listener listener; @Mock private Listener sandboxListener; + @Mock + private Metrics metrics; private SandboxedPersistentQueryMetadataImpl sandbox; @@ -119,7 +123,9 @@ public void setUp() { 0L, 0L, listener, - Optional.empty() + Optional.empty(), + metrics, + Collections.emptyMap() ); query.initialize(); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java index 0f762753e067..0bbaa0701e4f 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java @@ -15,6 +15,7 @@ package io.confluent.ksql.util; +import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.query.KafkaStreamsBuilder; import io.confluent.ksql.query.QueryError.Type; import io.confluent.ksql.query.QueryErrorClassifier; @@ -23,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; @@ -39,6 +41,7 @@ import java.util.Optional; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -71,6 +74,7 @@ public class SharedKafkaStreamsRuntimeImplTest { private final QueryId queryId = new QueryId("query-1"); private final QueryId queryId2= new QueryId("query-2"); private Map streamProps = new HashMap(); + private final Map metricsTags = Collections.singletonMap("cluster-id", "cluster-1"); private final StreamsException query1Exception = new StreamsException("query down!", new TaskId(0, 0, queryId.toString())); @@ -82,9 +86,11 @@ public class SharedKafkaStreamsRuntimeImplTest { new StreamsException("query down!", new TaskId(0, 0, "not-a-real-query")); private SharedKafkaStreamsRuntimeImpl sharedKafkaStreamsRuntimeImpl; + private MetricCollectors metricCollectors; @Before public void setUp() throws Exception { + metricCollectors = new MetricCollectors(); when(kafkaStreamsBuilder.buildNamedTopologyWrapper(any())).thenReturn(kafkaStreamsNamedTopologyWrapper).thenReturn(kafkaStreamsNamedTopologyWrapper2); streamProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "runtime"); sharedKafkaStreamsRuntimeImpl = new SharedKafkaStreamsRuntimeImpl( @@ -92,7 +98,9 @@ public void setUp() throws Exception { queryErrorClassifier, 5, 300_000L, - streamProps + streamProps, + metricCollectors.getMetrics(), + metricsTags ); when(kafkaStreamsNamedTopologyWrapper.getTopologyByName(any())).thenReturn(Optional.empty()); @@ -250,4 +258,82 @@ public void shouldNotStartOrAddedToStreamsIfOnlyRegistered() { verify(kafkaStreamsNamedTopologyWrapper, never()) .addNamedTopology(binPackedPersistentQueryMetadata2.getTopologyCopy(sharedKafkaStreamsRuntimeImpl)); } + + @Test + public void shouldRegisterMetricForQueryRestarts() { + //Given: + assertThat(sharedKafkaStreamsRuntimeImpl.getQueryIdSensorMap().entrySet().size(), is(1)); + + //When: + sharedKafkaStreamsRuntimeImpl.register(binPackedPersistentQueryMetadata2); + + //Then: + assertThat(sharedKafkaStreamsRuntimeImpl.getQueryIdSensorMap().entrySet().size(), is(2)); + } + + @Test + public void shouldRemoveSensorWhenStoppingQuery() { + //Given: + sharedKafkaStreamsRuntimeImpl.register(binPackedPersistentQueryMetadata2); + + //When: + sharedKafkaStreamsRuntimeImpl.stop(queryId, false); + sharedKafkaStreamsRuntimeImpl.stop(queryId2, true); + + //Then: + assertThat(sharedKafkaStreamsRuntimeImpl.getQueryIdSensorMap().entrySet().size(), is(1)); + assertThat(sharedKafkaStreamsRuntimeImpl.getQueryIdSensorMap().containsKey(queryId2), is(true)); + } + + @Test + public void shouldRecordMetricForQuery1WhenError() { + //Given: + when(queryErrorClassifier.classify(query1Exception)).thenReturn(Type.USER); + sharedKafkaStreamsRuntimeImpl.register( + binPackedPersistentQueryMetadata2 + ); + + //When: + sharedKafkaStreamsRuntimeImpl.start(queryId); + sharedKafkaStreamsRuntimeImpl.start(queryId2); + sharedKafkaStreamsRuntimeImpl.uncaughtHandler(query1Exception); + + //Then: + assertThat(getMetricValue(queryId.toString(), metricsTags), is(1.0)); + assertThat(getMetricValue(queryId2.toString(), metricsTags), is(0.0)); + } + + @Test + public void shouldRecordMetricForAllQueriesWhenErrorWithNoTask() { + when(queryErrorClassifier.classify(runtimeExceptionWithNoTask)).thenReturn(Type.USER); + + sharedKafkaStreamsRuntimeImpl.register( + binPackedPersistentQueryMetadata2 + ); + + //When: + sharedKafkaStreamsRuntimeImpl.start(queryId); + sharedKafkaStreamsRuntimeImpl.start(queryId2); + + sharedKafkaStreamsRuntimeImpl.uncaughtHandler(runtimeExceptionWithNoTask); + + //Then: + assertThat(getMetricValue(queryId.toString(), metricsTags), is(1.0)); + assertThat(getMetricValue(queryId2.toString(), metricsTags), is(1.0)); + } + + private double getMetricValue(final String queryId, final Map metricsTags) { + final Map customMetricsTags = new HashMap<>(metricsTags); + customMetricsTags.put("query_id", queryId); + final Metrics metrics = metricCollectors.getMetrics(); + return Double.parseDouble( + metrics.metric( + metrics.metricName( + QueryMetadataImpl.QUERY_RESTART_METRIC_NAME, + QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME, + QueryMetadataImpl.QUERY_RESTART_METRIC_DESCRIPTION, + customMetricsTags) + ).metricValue().toString() + ); + } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java index 76c49e6761d3..28a23e2435e9 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java @@ -29,9 +29,13 @@ import io.confluent.ksql.util.KsqlConstants.KsqlQueryType; import io.confluent.ksql.util.PushQueryMetadata.ResultType; import io.confluent.ksql.util.QueryMetadata.Listener; + +import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.function.Consumer; + +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.Topology; @@ -72,6 +76,8 @@ public class TransientQueryMetadataTest { private Consumer closeCallback; @Mock private Listener listener; + @Mock + private Metrics metrics; private TransientQueryMetadata query; @@ -98,7 +104,9 @@ public void setUp() { ResultType.STREAM, 0L, 0L, - listener + listener, + metrics, + Collections.emptyMap() ); query.initialize(); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java index 1e4e2d53e0af..24bd9b20469b 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java @@ -56,6 +56,8 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; + +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyDescription; @@ -110,6 +112,8 @@ public class QueryDescriptionFactoryTest { private ProcessingLogger processingLogger; @Mock private QueryMetadata.Listener listener; + @Mock + private Metrics metrics; private QueryMetadata transientQuery; private PersistentQueryMetadata persistentQuery; @@ -144,7 +148,9 @@ public void setUp() { ResultType.STREAM, 0L, 0L, - listener + listener, + metrics, + Collections.emptyMap() ); transientQuery.initialize(); @@ -173,7 +179,9 @@ public void setUp() { 0L, 0L, listener, - Optional.empty() + Optional.empty(), + metrics, + Collections.emptyMap() ); persistentQuery.initialize(); @@ -287,7 +295,9 @@ public void shouldHandleRowTimeInValueSchemaForTransientQuery() { ResultType.STREAM, 0L, 0L, - listener + listener, + metrics, + Collections.emptyMap() ); transientQuery.initialize(); @@ -327,7 +337,9 @@ public void shouldHandleRowKeyInValueSchemaForTransientQuery() { ResultType.STREAM, 0L, 0L, - listener + listener, + metrics, + Collections.emptyMap() ); transientQuery.initialize(); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/QueryRestartMetricFunctionalTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/QueryRestartMetricFunctionalTest.java new file mode 100644 index 000000000000..037e779a7a22 --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/QueryRestartMetricFunctionalTest.java @@ -0,0 +1,202 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.integration; + +import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.integration.IntegrationTestHarness; +import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.entity.Queries; +import io.confluent.ksql.rest.entity.RunningQuery; +import io.confluent.ksql.rest.server.TestKsqlRestApp; +import io.confluent.ksql.util.KsqlConfig; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Metrics; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.RuleChain; +import org.junit.rules.Timeout; + +@Ignore +public class QueryRestartMetricFunctionalTest { + + private static final String TEST_TOPIC_NAME = "test"; + private static final String TEST_TOPIC_NAME2 = "test-topic"; + private static final String TEST_TOPIC_NAME3 = "test-topic-3"; + private static final String TEST_TOPIC_NAME4 = "test-topic-4"; + private static final ImmutableMap METRICS_TAGS = ImmutableMap.of( + "cluster.id", "cluster-1" + ); + private static final String METRICS_TAGS_STRING = "cluster.id:cluster-1"; + + private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build(); + + private static final TestKsqlRestApp REST_APP_NO_SHARED_RUNTIME = TestKsqlRestApp + .builder(TEST_HARNESS::kafkaBootstrapServers) + .withProperty(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS, METRICS_TAGS_STRING) + .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + .withProperty(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED, false) + .withProperty(KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_INITIAL_MS, 0L) + .withProperty(KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_MAX_MS, 3000L) + .build(); + + private static final TestKsqlRestApp REST_APP_SHARED_RUNTIME = TestKsqlRestApp + .builder(TEST_HARNESS::kafkaBootstrapServers) + .withProperty(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS, METRICS_TAGS_STRING) + .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + .withProperty(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED, true) + .withProperty(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "another-id") + .build(); + + @ClassRule + public static final RuleChain CHAIN = RuleChain.outerRule(TEST_HARNESS).around(REST_APP_NO_SHARED_RUNTIME).around(REST_APP_SHARED_RUNTIME); + + private Metrics metricsNoSharedRuntime; + private Metrics metricsSharedRuntime; + + @BeforeClass + public static void setUpClass() throws InterruptedException { + TEST_HARNESS.ensureTopics(TEST_TOPIC_NAME); + TEST_HARNESS.ensureTopics(TEST_TOPIC_NAME2); + TEST_HARNESS.ensureTopics(TEST_TOPIC_NAME3); + TEST_HARNESS.ensureTopics(TEST_TOPIC_NAME4); + + RestIntegrationTestUtil.makeKsqlRequest(REST_APP_NO_SHARED_RUNTIME, + "CREATE STREAM test_stream (f BIGINT) with (KAFKA_TOPIC='" + TEST_TOPIC_NAME + "', VALUE_FORMAT='json');" + + "CREATE STREAM test_stream2 (f BIGINT) with (KAFKA_TOPIC='" + TEST_TOPIC_NAME2 + "', VALUE_FORMAT='json');" + ); + RestIntegrationTestUtil.makeKsqlRequest(REST_APP_NO_SHARED_RUNTIME, + "create stream test_addition_5 as select f+5 from test_stream;" + + "create stream test_addition_10 as select f+10 from test_stream;" + + "create stream test_addition_20 as select f+20 from test_stream2;" + ); + + RestIntegrationTestUtil.makeKsqlRequest(REST_APP_SHARED_RUNTIME, + "CREATE STREAM test_stream (f BIGINT) with (KAFKA_TOPIC='" + TEST_TOPIC_NAME3 + "', VALUE_FORMAT='json');" + + "CREATE STREAM test_stream2 (f BIGINT) with (KAFKA_TOPIC='" + TEST_TOPIC_NAME4 + "', VALUE_FORMAT='json');" + ); + } + + @Before + public void setUp() { + metricsNoSharedRuntime = ((KsqlEngine)REST_APP_NO_SHARED_RUNTIME.getEngine()).getEngineMetrics().getMetrics(); + metricsSharedRuntime = ((KsqlEngine)REST_APP_NO_SHARED_RUNTIME.getEngine()).getEngineMetrics().getMetrics(); + } + + @Test + public void shouldVerifyMetricsOnNonSharedRuntimeServer() throws InterruptedException { + // Given: + final Map metricsTagsForQuery1 = new HashMap<>(METRICS_TAGS); + final Map metricsTagsForQuery2 = new HashMap<>(METRICS_TAGS); + final Map metricsTagsForQuery3 = new HashMap<>(METRICS_TAGS); + final List listOfQueryId = RestIntegrationTestUtil.getQueryIds(REST_APP_NO_SHARED_RUNTIME); + assertThat(listOfQueryId.size(), equalTo(3)); + for (final String queryId:listOfQueryId) { + if (queryId.toLowerCase().contains("test_addition_5")) { + metricsTagsForQuery1.put("query_id", queryId); + } else if (queryId.toLowerCase().contains("test_addition_10")) { + metricsTagsForQuery2.put("query_id", queryId); + } else if (queryId.toLowerCase().contains("test_addition_20")) { + metricsTagsForQuery3.put("query_id", queryId); + } + } + + TEST_HARNESS.deleteTopics(Collections.singletonList(TEST_TOPIC_NAME)); + Thread.sleep(10000); + REST_APP_NO_SHARED_RUNTIME.stop(); + + // When: + REST_APP_NO_SHARED_RUNTIME.start(); + Thread.sleep(15000); + + metricsNoSharedRuntime = ((KsqlEngine)REST_APP_NO_SHARED_RUNTIME.getEngine()).getEngineMetrics().getMetrics(); + final KafkaMetric restartMetric1 = getKafkaMetric(metricsNoSharedRuntime, metricsTagsForQuery1); + final KafkaMetric restartMetric2 = getKafkaMetric(metricsNoSharedRuntime, metricsTagsForQuery2); + final KafkaMetric restartMetric3 = getKafkaMetric(metricsNoSharedRuntime, metricsTagsForQuery3); + + // Then: + assertThatEventually(() -> (Double) restartMetric1.metricValue(), greaterThan(12.0)); + assertThatEventually(() -> (Double) restartMetric2.metricValue(), greaterThan(12.0)); + assertThat(restartMetric3.metricValue(), equalTo(0.0)); + } + + @Ignore + @Test + public void shouldVerifyMetricsOnSharedRuntimeServer() throws InterruptedException { +// Given: + final Map metricsTagsForQuery1 = new HashMap<>(METRICS_TAGS); + final Map metricsTagsForQuery2 = new HashMap<>(METRICS_TAGS); + final Map metricsTagsForQuery3 = new HashMap<>(METRICS_TAGS); + final List listOfQueryId = RestIntegrationTestUtil.getQueryIds(REST_APP_SHARED_RUNTIME); + assertThat(listOfQueryId.size(), equalTo(3)); + for (final String queryId:listOfQueryId) { + if (queryId.toLowerCase().contains("test_addition_5")) { + metricsTagsForQuery1.put("query_id", queryId); + } else if (queryId.toLowerCase().contains("test_addition_10")) { + metricsTagsForQuery2.put("query_id", queryId); + } else if (queryId.toLowerCase().contains("test_addition_20")) { + metricsTagsForQuery3.put("query_id", queryId); + } + } + + TEST_HARNESS.deleteTopics(Collections.singletonList(TEST_TOPIC_NAME3)); + Thread.sleep(15000); + REST_APP_SHARED_RUNTIME.stop(); + Thread.sleep(30000); + System.out.println("steven starting server up again"); + + // When: + REST_APP_SHARED_RUNTIME.start(); + Thread.sleep(20000); + + metricsSharedRuntime = ((KsqlEngine)REST_APP_SHARED_RUNTIME.getEngine()).getEngineMetrics().getMetrics(); + final KafkaMetric restartMetric1 = getKafkaMetric(metricsSharedRuntime, metricsTagsForQuery1); + final KafkaMetric restartMetric2 = getKafkaMetric(metricsSharedRuntime, metricsTagsForQuery2); + final KafkaMetric restartMetric3 = getKafkaMetric(metricsSharedRuntime, metricsTagsForQuery3); + + // Then: + assertThatEventually(() -> (Double) restartMetric1.metricValue(), greaterThan(12.0)); + assertThatEventually(() -> (Double) restartMetric2.metricValue(), greaterThan(12.0)); + assertThat(restartMetric3.metricValue(), equalTo(0.0)); + } + + private KafkaMetric getKafkaMetric(final Metrics metrics, final Map metricsTags) { + final MetricName restartMetricName3 = new MetricName( + "query-restart-total", + "query-restart-metrics", + "The total number of times that a query thread has failed and then been restarted.", + metricsTags + ); + return metrics.metric(restartMetricName3); + } +} diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java index 84a38172411d..dcf35005c584 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java @@ -37,6 +37,8 @@ import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.entity.KsqlMediaType; import io.confluent.ksql.rest.entity.KsqlRequest; +import io.confluent.ksql.rest.entity.Queries; +import io.confluent.ksql.rest.entity.RunningQuery; import io.confluent.ksql.rest.entity.ServerClusterId; import io.confluent.ksql.rest.entity.ServerInfo; import io.confluent.ksql.rest.entity.ServerMetadata; @@ -677,4 +679,25 @@ private static String createBasicAuthHeader(final BasicCredentials credentials) return "Basic " + Base64.getEncoder().encodeToString( (credentials.username() + ":" + credentials.password()).getBytes(StandardCharsets.UTF_8)); } + + public static List getQueryIds(final TestKsqlRestApp restApp) { + final List results = RestIntegrationTestUtil.makeKsqlRequest( + restApp, + "Show Queries;" + ); + + if (results.size() != 1) { + return Collections.emptyList(); + } + + final KsqlEntity result = results.get(0); + + if (!(result instanceof Queries)) { + return Collections.emptyList(); + } + + final List runningQueries = ((Queries) result) + .getQueries(); + return runningQueries.stream().map(query -> query.getId().toString()).collect(Collectors.toList()); + } } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 25efce129418..f9ccde6066ac 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -119,6 +119,7 @@ import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.StreamsConfig; @@ -204,6 +205,8 @@ public class StreamedQueryResourceTest { private QueryExecutor queryExecutor; @Mock private QueryMetadataHolder queryMetadataHolder; + @Mock + private Metrics metrics; private StreamedQueryResource testResource; private PreparedStatement invalid; @@ -562,7 +565,9 @@ public void shouldStreamRowsCorrectly() throws Throwable { ResultType.STREAM, 0L, 0L, - listener + listener, + metrics, + Collections.emptyMap() ); transientQueryMetadata.initialize(); From f69e2872aa50670c055c6a7a866aba096fadbe60 Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Fri, 22 Apr 2022 11:51:34 -0700 Subject: [PATCH 2/4] change up query metric for shared runtime --- .../io/confluent/ksql/query/QueryBuilder.java | 4 +- .../BinPackedPersistentQueryMetadataImpl.java | 24 ++- .../ksql/util/QueryMetadataImpl.java | 20 +-- .../confluent/ksql/util/QueryMetricsUtil.java | 49 ++++++ .../util/SharedKafkaStreamsRuntimeImpl.java | 47 +----- ...PackedPersistentQueryMetadataImplTest.java | 10 +- .../SharedKafkaStreamsRuntimeImplTest.java | 154 +++++++++--------- .../QueryRestartMetricFunctionalTest.java | 1 - 8 files changed, 164 insertions(+), 145 deletions(-) create mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetricsUtil.java diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java index 7335192a1515..53b14a222f10 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java @@ -496,7 +496,9 @@ PersistentQueryMetadata buildPersistentQueryInSharedRuntime( applicationId, queryOverrides, physicalPlan - ) + ), + metricCollectors.getMetrics(), + ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS) ); if (real) { return binPackedPersistentQueryMetadata; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImpl.java index 88d3fe58ea62..cfabc91b646b 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImpl.java @@ -47,8 +47,10 @@ import java.util.Set; import java.util.function.Function; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.LagInfo; @@ -80,8 +82,8 @@ public class BinPackedPersistentQueryMetadataImpl implements PersistentQueryMeta private final Listener listener; private final Function namedTopologyBuilder; private final TimeBoundedQueue queryErrors; - private final Sensor sensor; - private final Metrics metrics; + private final Optional metrics; + private final Optional restartSensor; private final Optional materializationProviderBuilder; @@ -113,7 +115,9 @@ public BinPackedPersistentQueryMetadataImpl( final Listener listener, final Map streamsProperties, final Optional scalablePushRegistry, - final Function namedTopologyBuilder) { + final Function namedTopologyBuilder, + final Metrics metrics, + final Map metricsTags) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck this.persistentQueryType = Objects.requireNonNull(persistentQueryType, "persistentQueryType"); this.statementString = Objects.requireNonNull(statementString, "statementString"); @@ -143,6 +147,9 @@ public BinPackedPersistentQueryMetadataImpl( topology )); this.scalablePushRegistry = requireNonNull(scalablePushRegistry, "scalablePushRegistry"); + this.metrics = Optional.of(metrics); + this.restartSensor = Optional.of( + QueryMetricsUtil.createQueryRestartMetricSensor(queryId.toString(), metricsTags, metrics)); } // for creating sandbox instances @@ -171,6 +178,8 @@ public BinPackedPersistentQueryMetadataImpl( this.materializationProvider = original.materializationProvider; this.scalablePushRegistry = original.scalablePushRegistry; this.namedTopologyBuilder = original.namedTopologyBuilder; + this.metrics = Optional.empty(); + this.restartSensor = Optional.empty(); } @Override @@ -390,7 +399,9 @@ public void close() { sharedKafkaStreamsRuntime.stop(queryId, false); scalablePushRegistry.ifPresent(ScalablePushRegistry::close); listener.onClose(this); - metrics.removeSensor(sensor.name()); + if (metrics.isPresent() && restartSensor.isPresent()) { + metrics.get().removeSensor(restartSensor.get().name()); + } } @Override @@ -412,8 +423,7 @@ Listener getListener() { return listener; } - Sensor getSensor() { - return sensor; + Optional getSensor() { + return restartSensor; } - } \ No newline at end of file diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java index e254f9b24712..111d63be9e43 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java @@ -44,11 +44,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Collectors; - -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.LagInfo; @@ -143,19 +140,8 @@ public long read() { this.errorClassifier = Objects.requireNonNull(errorClassifier, "errorClassifier"); this.queryErrors = new TimeBoundedQueue(Duration.ofHours(1), maxQueryErrorsQueueSize); this.metrics = Optional.of(metrics); - - final Map customMetricsTagsForQuery = - MetricsTagsUtil.getCustomMetricsTagsForQuery(queryId.toString(), metricsTags); - final MetricName restartMetricName = metrics.metricName( - QUERY_RESTART_METRIC_NAME, - QUERY_RESTART_METRIC_GROUP_NAME, - QUERY_RESTART_METRIC_DESCRIPTION, - customMetricsTagsForQuery - ); this.restartSensor = Optional.of( - metrics.sensor(QUERY_RESTART_METRIC_GROUP_NAME + "-" + queryId)); - restartSensor.get().add(restartMetricName, new CumulativeSum()); - + QueryMetricsUtil.createQueryRestartMetricSensor(queryId.toString(), metricsTags, metrics)); this.retryEvent = new RetryEvent( queryId, baseWaitingTimeMs, @@ -391,11 +377,11 @@ protected boolean closeKafkaStreams() { * schemas, etc...). */ public void close() { + doClose(true); + listener.onClose(this); if (metrics.isPresent() && restartSensor.isPresent()) { metrics.get().removeSensor(restartSensor.get().name()); } - doClose(true); - listener.onClose(this); } void doClose(final boolean cleanUp) { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetricsUtil.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetricsUtil.java new file mode 100644 index 000000000000..c7a761e4ab51 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetricsUtil.java @@ -0,0 +1,49 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.util; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.CumulativeSum; + +import java.util.Map; + +public final class QueryMetricsUtil { + + private QueryMetricsUtil() {} + // returns a metrics sensor that tracks the number of times a query was restarted when hitting + // an uncaught exception + + public static Sensor createQueryRestartMetricSensor( + final String queryId, + final Map metricsTags, + final Metrics metrics + ) { + final Map customMetricsTagsForQuery = + MetricsTagsUtil.getCustomMetricsTagsForQuery(queryId, metricsTags); + final MetricName restartMetricName = metrics.metricName( + QueryMetadataImpl.QUERY_RESTART_METRIC_NAME, + QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME, + QueryMetadataImpl.QUERY_RESTART_METRIC_DESCRIPTION, + customMetricsTagsForQuery + ); + final Sensor sensor = metrics.sensor( + QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME + "-" + queryId); + sensor.add(restartMetricName, new CumulativeSum()); + return sensor; + } +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java index 7e0ba30066e9..438b29cdb562 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java @@ -15,7 +15,6 @@ package io.confluent.ksql.util; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.query.KafkaStreamsBuilder; @@ -26,16 +25,12 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.StateListener; import org.apache.kafka.streams.errors.StreamsException; @@ -54,13 +49,6 @@ public class SharedKafkaStreamsRuntimeImpl extends SharedKafkaStreamsRuntime { private final QueryErrorClassifier errorClassifier; private final int maxQueryErrorsQueueSize; private final List> topolgogiesToAdd; - private final Metrics metrics; - private final Map metricsTags; - - // Since we have multiple queries registered to a runtime, we need to keep track of all the - // metrics sensor in order to record metrics for each individual query, and to clean up the sensor - // if the query is d - private final Map queryIdSensorMap; public SharedKafkaStreamsRuntimeImpl(final KafkaStreamsBuilder kafkaStreamsBuilder, final QueryErrorClassifier errorClassifier, @@ -78,9 +66,6 @@ public SharedKafkaStreamsRuntimeImpl(final KafkaStreamsBuilder kafkaStreamsBuild shutdownTimeout = shutdownTimeoutConfig; setupAndStartKafkaStreams(kafkaStreams); topolgogiesToAdd = new ArrayList<>(); - this.metrics = metrics; - this.metricsTags = metricsTags; - this.queryIdSensorMap = new HashMap<>(); } @Override @@ -89,7 +74,6 @@ public void register( ) { final QueryId queryId = binpackedPersistentQueryMetadata.getQueryId(); collocatedQueries.put(queryId, binpackedPersistentQueryMetadata); - queryIdSensorMap.put(queryId, createQueryRestartMetricSensor(queryId.toString())); log.info("Registered query: {} in {} \n" + "Runtime {} is executing these queries: {}", queryId, @@ -143,7 +127,9 @@ public StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse uncaughtHan if (queryInError != null) { queryInError.setQueryError(queryError); - queryIdSensorMap.get(queryInError.getQueryId()).record(); + if (queryInError.getSensor().isPresent()) { + queryInError.getSensor().get().record(); + } log.error(String.format( "Unhandled query exception caught in streams thread %s for query %s. (%s)", Thread.currentThread().getName(), @@ -154,7 +140,9 @@ public StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse uncaughtHan } else { for (BinPackedPersistentQueryMetadataImpl query : collocatedQueries.values()) { query.setQueryError(queryError); - queryIdSensorMap.get(query.getQueryId()).record(); + if (query.getSensor().isPresent()) { + query.getSensor().get().record(); + } } log.error(String.format( "Unhandled runtime exception caught in streams thread %s. (%s)", @@ -238,7 +226,6 @@ public void stop(final QueryId queryId, final boolean isCreateOrReplace) { if (!isCreateOrReplace) { // we don't want to lose it from this runtime collocatedQueries.remove(queryId); - queryIdSensorMap.remove(queryId); } log.info("Query {} was stopped successfully", queryId); } @@ -290,26 +277,4 @@ public void restartStreamsRuntime() { } setupAndStartKafkaStreams(kafkaStreamsNamedTopologyWrapper); } - - @VisibleForTesting - protected Map getQueryIdSensorMap() { - return queryIdSensorMap; - } - - // returns a metrics sensor that tracks the number of times a query was restarted when hitting - // an uncaught exception - private Sensor createQueryRestartMetricSensor(final String queryId) { - final Map customMetricsTagsForQuery = - MetricsTagsUtil.getCustomMetricsTagsForQuery(queryId, metricsTags); - final MetricName restartMetricName = metrics.metricName( - QueryMetadataImpl.QUERY_RESTART_METRIC_NAME, - QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME, - QueryMetadataImpl.QUERY_RESTART_METRIC_DESCRIPTION, - customMetricsTagsForQuery - ); - final Sensor sensor = - metrics.sensor(QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME + "-" + queryId); - sensor.add(restartMetricName, new CumulativeSum()); - return sensor; - } } \ No newline at end of file diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImplTest.java index 040c6b947ccc..f9b15aa60b45 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImplTest.java @@ -29,8 +29,12 @@ import io.confluent.ksql.schema.ksql.PhysicalSchema; import io.confluent.ksql.schema.query.QuerySchemas; import io.confluent.ksql.util.QueryMetadata.Listener; + +import java.util.Collections; import java.util.Map; import java.util.Optional; + +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology; import org.junit.Before; import org.junit.Test; @@ -69,6 +73,8 @@ public class BinPackedPersistentQueryMetadataImplTest { @Mock private Optional scalablePushRegistry; + private Metrics metrics; + private Map metricsTags = Collections.singletonMap("tag1", "value1"); private PersistentQueryMetadata query; @Before @@ -92,7 +98,9 @@ public void setUp() { listener, streamsProperties, scalablePushRegistry, - (runtime) -> topology); + (runtime) -> topology, + metrics, + metricsTags); query.initialize(); } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java index 0bbaa0701e4f..93a41a76b723 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java @@ -259,81 +259,81 @@ public void shouldNotStartOrAddedToStreamsIfOnlyRegistered() { .addNamedTopology(binPackedPersistentQueryMetadata2.getTopologyCopy(sharedKafkaStreamsRuntimeImpl)); } - @Test - public void shouldRegisterMetricForQueryRestarts() { - //Given: - assertThat(sharedKafkaStreamsRuntimeImpl.getQueryIdSensorMap().entrySet().size(), is(1)); - - //When: - sharedKafkaStreamsRuntimeImpl.register(binPackedPersistentQueryMetadata2); - - //Then: - assertThat(sharedKafkaStreamsRuntimeImpl.getQueryIdSensorMap().entrySet().size(), is(2)); - } - - @Test - public void shouldRemoveSensorWhenStoppingQuery() { - //Given: - sharedKafkaStreamsRuntimeImpl.register(binPackedPersistentQueryMetadata2); - - //When: - sharedKafkaStreamsRuntimeImpl.stop(queryId, false); - sharedKafkaStreamsRuntimeImpl.stop(queryId2, true); - - //Then: - assertThat(sharedKafkaStreamsRuntimeImpl.getQueryIdSensorMap().entrySet().size(), is(1)); - assertThat(sharedKafkaStreamsRuntimeImpl.getQueryIdSensorMap().containsKey(queryId2), is(true)); - } - - @Test - public void shouldRecordMetricForQuery1WhenError() { - //Given: - when(queryErrorClassifier.classify(query1Exception)).thenReturn(Type.USER); - sharedKafkaStreamsRuntimeImpl.register( - binPackedPersistentQueryMetadata2 - ); - - //When: - sharedKafkaStreamsRuntimeImpl.start(queryId); - sharedKafkaStreamsRuntimeImpl.start(queryId2); - sharedKafkaStreamsRuntimeImpl.uncaughtHandler(query1Exception); - - //Then: - assertThat(getMetricValue(queryId.toString(), metricsTags), is(1.0)); - assertThat(getMetricValue(queryId2.toString(), metricsTags), is(0.0)); - } - - @Test - public void shouldRecordMetricForAllQueriesWhenErrorWithNoTask() { - when(queryErrorClassifier.classify(runtimeExceptionWithNoTask)).thenReturn(Type.USER); - - sharedKafkaStreamsRuntimeImpl.register( - binPackedPersistentQueryMetadata2 - ); - - //When: - sharedKafkaStreamsRuntimeImpl.start(queryId); - sharedKafkaStreamsRuntimeImpl.start(queryId2); - - sharedKafkaStreamsRuntimeImpl.uncaughtHandler(runtimeExceptionWithNoTask); - - //Then: - assertThat(getMetricValue(queryId.toString(), metricsTags), is(1.0)); - assertThat(getMetricValue(queryId2.toString(), metricsTags), is(1.0)); - } - - private double getMetricValue(final String queryId, final Map metricsTags) { - final Map customMetricsTags = new HashMap<>(metricsTags); - customMetricsTags.put("query_id", queryId); - final Metrics metrics = metricCollectors.getMetrics(); - return Double.parseDouble( - metrics.metric( - metrics.metricName( - QueryMetadataImpl.QUERY_RESTART_METRIC_NAME, - QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME, - QueryMetadataImpl.QUERY_RESTART_METRIC_DESCRIPTION, - customMetricsTags) - ).metricValue().toString() - ); - } +// @Test +// public void shouldRegisterMetricForQueryRestarts() { +// //Given: +// assertThat(sharedKafkaStreamsRuntimeImpl.getQueryIdSensorMap().entrySet().size(), is(1)); +// +// //When: +// sharedKafkaStreamsRuntimeImpl.register(binPackedPersistentQueryMetadata2); +// +// //Then: +// assertThat(sharedKafkaStreamsRuntimeImpl.getQueryIdSensorMap().entrySet().size(), is(2)); +// } +// +// @Test +// public void shouldRemoveSensorWhenStoppingQuery() { +// //Given: +// sharedKafkaStreamsRuntimeImpl.register(binPackedPersistentQueryMetadata2); +// +// //When: +// sharedKafkaStreamsRuntimeImpl.stop(queryId, false); +// sharedKafkaStreamsRuntimeImpl.stop(queryId2, true); +// +// //Then: +// assertThat(sharedKafkaStreamsRuntimeImpl.getQueryIdSensorMap().entrySet().size(), is(1)); +// assertThat(sharedKafkaStreamsRuntimeImpl.getQueryIdSensorMap().containsKey(queryId2), is(true)); +// } +// +// @Test +// public void shouldRecordMetricForQuery1WhenError() { +// //Given: +// when(queryErrorClassifier.classify(query1Exception)).thenReturn(Type.USER); +// sharedKafkaStreamsRuntimeImpl.register( +// binPackedPersistentQueryMetadata2 +// ); +// +// //When: +// sharedKafkaStreamsRuntimeImpl.start(queryId); +// sharedKafkaStreamsRuntimeImpl.start(queryId2); +// sharedKafkaStreamsRuntimeImpl.uncaughtHandler(query1Exception); +// +// //Then: +// assertThat(getMetricValue(queryId.toString(), metricsTags), is(1.0)); +// assertThat(getMetricValue(queryId2.toString(), metricsTags), is(0.0)); +// } +// +// @Test +// public void shouldRecordMetricForAllQueriesWhenErrorWithNoTask() { +// when(queryErrorClassifier.classify(runtimeExceptionWithNoTask)).thenReturn(Type.USER); +// +// sharedKafkaStreamsRuntimeImpl.register( +// binPackedPersistentQueryMetadata2 +// ); +// +// //When: +// sharedKafkaStreamsRuntimeImpl.start(queryId); +// sharedKafkaStreamsRuntimeImpl.start(queryId2); +// +// sharedKafkaStreamsRuntimeImpl.uncaughtHandler(runtimeExceptionWithNoTask); +// +// //Then: +// assertThat(getMetricValue(queryId.toString(), metricsTags), is(1.0)); +// assertThat(getMetricValue(queryId2.toString(), metricsTags), is(1.0)); +// } +// +// private double getMetricValue(final String queryId, final Map metricsTags) { +// final Map customMetricsTags = new HashMap<>(metricsTags); +// customMetricsTags.put("query_id", queryId); +// final Metrics metrics = metricCollectors.getMetrics(); +// return Double.parseDouble( +// metrics.metric( +// metrics.metricName( +// QueryMetadataImpl.QUERY_RESTART_METRIC_NAME, +// QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME, +// QueryMetadataImpl.QUERY_RESTART_METRIC_DESCRIPTION, +// customMetricsTags) +// ).metricValue().toString() +// ); +// } } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/QueryRestartMetricFunctionalTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/QueryRestartMetricFunctionalTest.java index 037e779a7a22..e6233f048f15 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/QueryRestartMetricFunctionalTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/QueryRestartMetricFunctionalTest.java @@ -173,7 +173,6 @@ public void shouldVerifyMetricsOnSharedRuntimeServer() throws InterruptedExcepti Thread.sleep(15000); REST_APP_SHARED_RUNTIME.stop(); Thread.sleep(30000); - System.out.println("steven starting server up again"); // When: REST_APP_SHARED_RUNTIME.start(); From facc272f7dc7666b6d87b751f7399e5f35acd949 Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Fri, 13 May 2022 12:51:20 -0700 Subject: [PATCH 3/4] fix test --- .../io/confluent/ksql/query/QueryBuilder.java | 4 +- .../util/SharedKafkaStreamsRuntimeImpl.java | 5 +-- .../util/PersistentQueryMetadataTest.java | 5 +++ ...dboxedPersistentQueryMetadataImplTest.java | 5 +++ .../SharedKafkaStreamsRuntimeImplTest.java | 37 ++++++------------- .../ksql/util/TransientQueryMetadataTest.java | 4 ++ .../entity/QueryDescriptionFactoryTest.java | 4 ++ .../streaming/StreamedQueryResourceTest.java | 4 ++ 8 files changed, 36 insertions(+), 32 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java index fa8e81438bd0..8d60e8f70aa9 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java @@ -627,9 +627,7 @@ private SharedKafkaStreamsRuntime getKafkaStreamsInstance( metricCollectors, config.getConfig(true), processingLogContext - ), - metricCollectors.getMetrics(), - ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS) + ) ); } else { stream = new SandboxedSharedKafkaStreamsRuntimeImpl( diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java index b7c40a765426..e18bce448d82 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java @@ -30,7 +30,6 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.StateListener; import org.apache.kafka.streams.StreamsConfig; @@ -55,9 +54,7 @@ public SharedKafkaStreamsRuntimeImpl(final KafkaStreamsBuilder kafkaStreamsBuild final QueryErrorClassifier errorClassifier, final int maxQueryErrorsQueueSize, final long shutdownTimeoutConfig, - final Map streamsProperties, - final Metrics metrics, - final Map metricsTags) { + final Map streamsProperties) { super( kafkaStreamsBuilder, streamsProperties diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java index 3fc0e610ec4e..3f533f783f5b 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java @@ -18,6 +18,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -47,6 +48,7 @@ import java.util.Optional; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.Topology; @@ -101,6 +103,8 @@ public class PersistentQueryMetadataTest { private Metrics metrics; @Mock private MeteredProcessingLoggerFactory processingLoggerFactory; + @Mock + private Sensor sensor; private PersistentQueryMetadata query; @@ -111,6 +115,7 @@ public void setUp() { when(materializationProviderBuilder.apply(kafkaStreams, topology)) .thenReturn(Optional.of(materializationProvider)); when(kafkaStreams.state()).thenReturn(State.NOT_RUNNING); + when(metrics.sensor(anyString())).thenReturn(sensor); query = new PersistentQueryMetadataImpl( KsqlConstants.PersistentQueryType.CREATE_AS, diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataImplTest.java index d118971f4486..bdba5e082418 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataImplTest.java @@ -16,6 +16,7 @@ package io.confluent.ksql.util; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -40,6 +41,7 @@ import java.util.function.Consumer; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.Topology; import org.junit.Before; @@ -93,6 +95,8 @@ public class SandboxedPersistentQueryMetadataImplTest { private MeteredProcessingLoggerFactory processingLoggerFactory; @Mock private Metrics metrics; + @Mock + private Sensor sensor; private SandboxedPersistentQueryMetadataImpl sandbox; @@ -102,6 +106,7 @@ public void setUp() { when(physicalSchema.logicalSchema()).thenReturn(mock(LogicalSchema.class)); when(materializationProviderBuilder.apply(kafkaStreams, topology)) .thenReturn(Optional.of(materializationProvider)); + when(metrics.sensor(anyString())).thenReturn(sensor); final PersistentQueryMetadataImpl query = new PersistentQueryMetadataImpl( KsqlConstants.PersistentQueryType.CREATE_AS, diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java index fcef88305103..27e83cd45157 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java @@ -47,6 +47,7 @@ import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -72,7 +73,9 @@ public class SharedKafkaStreamsRuntimeImplTest { @Mock private KafkaFuture future; @Mock - private Sensor sensor; + private Sensor restartErrorSensorQuery1; + @Mock + private Sensor restartErrorSensorQuery2; private final QueryId queryId = new QueryId("query-1"); private final QueryId queryId2= new QueryId("query-2"); @@ -89,11 +92,9 @@ public class SharedKafkaStreamsRuntimeImplTest { new StreamsException("query down!", new TaskId(0, 0, "not-a-real-query")); private SharedKafkaStreamsRuntimeImpl sharedKafkaStreamsRuntimeImpl; - private MetricCollectors metricCollectors; @Before public void setUp() throws Exception { - metricCollectors = new MetricCollectors(); when(kafkaStreamsBuilder.buildNamedTopologyWrapper(any())).thenReturn(kafkaStreamsNamedTopologyWrapper).thenReturn(kafkaStreamsNamedTopologyWrapper2); streamProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "runtime"); streamProps.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "old"); @@ -102,9 +103,7 @@ public void setUp() throws Exception { queryErrorClassifier, 5, 300_000L, - streamProps, - metricCollectors.getMetrics(), - metricsTags + streamProps ); when(kafkaStreamsNamedTopologyWrapper.getTopologyByName(any())).thenReturn(Optional.empty()); @@ -114,6 +113,8 @@ public void setUp() throws Exception { when(binPackedPersistentQueryMetadata.getTopologyCopy(any())).thenReturn(namedTopology); when(binPackedPersistentQueryMetadata.getQueryId()).thenReturn(queryId); when(binPackedPersistentQueryMetadata2.getQueryId()).thenReturn(queryId2); + when(binPackedPersistentQueryMetadata.getRestartMetricsSensor()).thenReturn(Optional.of(restartErrorSensorQuery1)); + when(binPackedPersistentQueryMetadata2.getRestartMetricsSensor()).thenReturn(Optional.of(restartErrorSensorQuery2)); sharedKafkaStreamsRuntimeImpl.register( binPackedPersistentQueryMetadata @@ -277,8 +278,8 @@ public void shouldRecordMetricForQuery1WhenError() { sharedKafkaStreamsRuntimeImpl.uncaughtHandler(query1Exception); //Then: - assertThat(getMetricValue(queryId.toString(), metricsTags), is(1.0)); - assertThat(getMetricValue(queryId2.toString(), metricsTags), is(0.0)); + verify(restartErrorSensorQuery1, times(1)).record(); + verify(restartErrorSensorQuery2, never()).record(); } @Test @@ -293,25 +294,11 @@ public void shouldRecordMetricForAllQueriesWhenErrorWithNoTask() { sharedKafkaStreamsRuntimeImpl.start(queryId); sharedKafkaStreamsRuntimeImpl.start(queryId2); + sharedKafkaStreamsRuntimeImpl.uncaughtHandler(runtimeExceptionWithNoTask); sharedKafkaStreamsRuntimeImpl.uncaughtHandler(runtimeExceptionWithNoTask); //Then: - assertThat(getMetricValue(queryId.toString(), metricsTags), is(1.0)); - assertThat(getMetricValue(queryId2.toString(), metricsTags), is(1.0)); - } - - private double getMetricValue(final String queryId, final Map metricsTags) { - final Map customMetricsTags = new HashMap<>(metricsTags); - customMetricsTags.put("query-id", queryId); - final Metrics metrics = metricCollectors.getMetrics(); - return Double.parseDouble( - metrics.metric( - metrics.metricName( - QueryMetadataImpl.QUERY_RESTART_METRIC_NAME, - QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME, - QueryMetadataImpl.QUERY_RESTART_METRIC_DESCRIPTION, - customMetricsTags) - ).metricValue().toString() - ); + verify(restartErrorSensorQuery1, times(2)).record(); + verify(restartErrorSensorQuery2, times(2)).record(); } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java index 0a961ec0a1cc..b50a6ec6b2f4 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java @@ -37,6 +37,7 @@ import java.util.function.Consumer; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.Topology; @@ -81,6 +82,8 @@ public class TransientQueryMetadataTest { private MeteredProcessingLoggerFactory loggerFactory; @Mock private Metrics metrics; + @Mock + private Sensor sensor; private TransientQueryMetadata query; @@ -89,6 +92,7 @@ public void setUp() { when(kafkaStreamsBuilder.build(any(), any())).thenReturn(kafkaStreams); when(kafkaStreams.state()).thenReturn(State.NOT_RUNNING); when(sourceNames.toArray()).thenReturn(new SourceName[0]); + when(metrics.sensor(any())).thenReturn(sensor); query = new TransientQueryMetadata( SQL, diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java index 7d5af99bb79e..436db38df3cf 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java @@ -59,6 +59,7 @@ import java.util.stream.Collectors; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyDescription; @@ -117,6 +118,8 @@ public class QueryDescriptionFactoryTest { private MeteredProcessingLoggerFactory processingLoggerFactory; @Mock private Metrics metrics; + @Mock + private Sensor sensor; private QueryMetadata transientQuery; private PersistentQueryMetadata persistentQuery; @@ -125,6 +128,7 @@ public class QueryDescriptionFactoryTest { @Before public void setUp() { + when(metrics.sensor(any())).thenReturn(sensor); when(topology.describe()).thenReturn(topologyDescription); when(kafkaStreamsBuilder.build(any(), any())).thenReturn(queryStreams); when(queryStreams.metadataForLocalThreads()).thenReturn(Collections.emptySet()); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index f2f0f99df7f3..8fcc58b7c310 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -121,6 +121,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.StreamsConfig; @@ -210,6 +211,8 @@ public class StreamedQueryResourceTest { private MeteredProcessingLoggerFactory loggerFactory; @Mock private Metrics metrics; + @Mock + private Sensor sensor; private StreamedQueryResource testResource; private PreparedStatement invalid; @@ -220,6 +223,7 @@ public class StreamedQueryResourceTest { @Before public void setup() { when(serviceContext.getTopicClient()).thenReturn(mockKafkaTopicClient); + when(metrics.sensor(any())).thenReturn(sensor); query = PreparedStatement.of(PUSH_QUERY_STRING, mock(Query.class)); invalid = PreparedStatement.of("sql", mock(Statement.class)); when(mockStatementParser.parseSingleStatement(PUSH_QUERY_STRING)).thenReturn(invalid); From 8a0a310dff847cf6fa03327f416727c9eadeb7ea Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Tue, 17 May 2022 09:52:32 -0700 Subject: [PATCH 4/4] move metric to query state reporting listener --- .../ksql/internal/KsqlEngineMetrics.java | 2 +- .../QueryStateMetricsReportingListener.java | 53 +++++++++++--- .../io/confluent/ksql/query/QueryBuilder.java | 12 +--- .../BinPackedPersistentQueryMetadataImpl.java | 22 +----- .../util/PersistentQueryMetadataImpl.java | 9 +-- .../io/confluent/ksql/util/QueryMetadata.java | 4 -- .../ksql/util/QueryMetadataImpl.java | 39 ++--------- .../confluent/ksql/util/QueryMetricsUtil.java | 48 ------------- .../util/SharedKafkaStreamsRuntimeImpl.java | 11 +-- .../ksql/util/TransientQueryMetadata.java | 9 +-- ...ueryStateMetricsReportingListenerTest.java | 38 ++++++++-- ...PackedPersistentQueryMetadataImplTest.java | 19 +---- .../util/PersistentQueryMetadataTest.java | 21 +----- .../ksql/util/QueryMetadataTest.java | 70 ++----------------- ...dboxedPersistentQueryMetadataImplTest.java | 13 +--- .../SharedKafkaStreamsRuntimeImplTest.java | 50 ------------- .../ksql/util/TransientQueryMetadataTest.java | 14 +--- .../entity/QueryDescriptionFactoryTest.java | 24 ++----- .../QueryRestartMetricFunctionalTest.java | 52 +++++++++----- .../streaming/StreamedQueryResourceTest.java | 11 +-- 20 files changed, 140 insertions(+), 381 deletions(-) delete mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetricsUtil.java diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java index e2e94318a947..074e2a0865b4 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java @@ -158,7 +158,7 @@ public QueryEventListener getQueryEventListener() { final String metricsPrefix = metricGroupPrefix.equals(KsqlEngineMetrics.DEFAULT_METRIC_GROUP_PREFIX) ? "" : metricGroupPrefix; - return new QueryStateMetricsReportingListener(metrics, metricsPrefix); + return new QueryStateMetricsReportingListener(metrics, metricsPrefix, newCustomMetricsTags); } private void recordMessageConsumptionByQueryStats( diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/internal/QueryStateMetricsReportingListener.java b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/QueryStateMetricsReportingListener.java index 3c95a5af98c6..80bd2a50c1e2 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/internal/QueryStateMetricsReportingListener.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/QueryStateMetricsReportingListener.java @@ -22,16 +22,22 @@ import io.confluent.ksql.query.QueryId; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.QueryMetadata; -import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.streams.KafkaStreams.State; public class QueryStateMetricsReportingListener implements QueryEventListener { + public static final String QUERY_RESTART_METRIC_NAME = "query-restart-total"; + public static final String QUERY_RESTART_METRIC_DESCRIPTION = + "The total number of times that a query thread has failed and then been restarted."; public static final Ticker CURRENT_TIME_MILLIS_TICKER = new Ticker() { @Override @@ -42,12 +48,18 @@ public long read() { private final Metrics metrics; private final String metricsPrefix; + private final Map metricsTags; private final ConcurrentMap perQuery = new ConcurrentHashMap<>(); - QueryStateMetricsReportingListener(final Metrics metrics, final String metricsPrefix) { + QueryStateMetricsReportingListener( + final Metrics metrics, + final String metricsPrefix, + final Map metricsTags + ) { this.metrics = Objects.requireNonNull(metrics, "metrics"); this.metricsPrefix = Objects.requireNonNull(metricsPrefix, "metricGroupPrefix"); + this.metricsTags = Objects.requireNonNull(metricsTags); } @Override @@ -60,7 +72,12 @@ public void onCreate( } perQuery.put( queryMetadata.getQueryId(), - new PerQueryListener(metrics, metricsPrefix, queryMetadata.getQueryId().toString()) + new PerQueryListener( + metrics, + metricsPrefix, + queryMetadata.getQueryId().toString(), + metricsTags + ) ); } @@ -96,6 +113,8 @@ private static class PerQueryListener { private final Metrics metrics; private final MetricName stateMetricName; private final MetricName errorMetricName; + private final MetricName queryRestartMetricName; + private final CumulativeSum queryRestartSum; private final Ticker ticker; private volatile String state = "-"; @@ -104,16 +123,18 @@ private static class PerQueryListener { PerQueryListener( final Metrics metrics, final String groupPrefix, - final String queryId + final String queryId, + final Map metricsTags ) { - this(metrics, groupPrefix, queryId, CURRENT_TIME_MILLIS_TICKER); + this(metrics, groupPrefix, queryId, CURRENT_TIME_MILLIS_TICKER, metricsTags); } PerQueryListener( final Metrics metrics, final String groupPrefix, final String queryId, - final Ticker ticker + final Ticker ticker, + final Map metricsTags ) { Objects.requireNonNull(groupPrefix, "groupPrefix"); Objects.requireNonNull(queryId, "queryId"); @@ -124,20 +145,34 @@ private static class PerQueryListener { final String tag = "_confluent-ksql-" + groupPrefix + type + queryId; + final Map tagsForStateAndError = new HashMap<>(metricsTags); + tagsForStateAndError.put("status", tag); this.stateMetricName = metrics.metricName( "query-status", groupPrefix + "ksql-queries", "The current status of the given query.", - Collections.singletonMap("status", tag)); + tagsForStateAndError + ); errorMetricName = metrics.metricName( "error-status", groupPrefix + "ksql-queries", "The current error status of the given query, if the state is in ERROR state", - Collections.singletonMap("status", tag) + tagsForStateAndError + ); + + final Map restartTags = new HashMap<>(tagsForStateAndError); + restartTags.put("query-id", queryId); + queryRestartMetricName = metrics.metricName( + QUERY_RESTART_METRIC_NAME, + groupPrefix + "ksql-queries", + QUERY_RESTART_METRIC_DESCRIPTION, + restartTags ); + this.queryRestartSum = new CumulativeSum(); this.metrics.addMetric(stateMetricName, (Gauge) (config, now) -> state); this.metrics.addMetric(errorMetricName, (Gauge) (config, now) -> error); + this.metrics.addMetric(queryRestartMetricName, queryRestartSum); } public void onChange(final State newState, final State oldState) { @@ -150,11 +185,13 @@ public void onChange(final State newState, final State oldState) { public void onError(final QueryError observedError) { error = observedError.getType().name(); + queryRestartSum.record(new MetricConfig(), 1, System.currentTimeMillis()); } public void onDeregister() { metrics.removeMetric(stateMetricName); metrics.removeMetric(errorMetricName); + metrics.removeMetric(queryRestartMetricName); } } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java index 8d60e8f70aa9..819adc1bbf35 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java @@ -227,9 +227,7 @@ TransientQueryMetadata buildTransientQuery( ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_INITIAL_MS), ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_MAX_MS), listener, - processingLogContext.getLoggerFactory(), - metricCollectors.getMetrics(), - ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS) + processingLogContext.getLoggerFactory() ); } @@ -377,9 +375,7 @@ PersistentQueryMetadata buildPersistentQueryInDedicatedRuntime( ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_MAX_MS), listener, scalablePushRegistry, - processingLogContext.getLoggerFactory(), - metricCollectors.getMetrics(), - ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS) + processingLogContext.getLoggerFactory() ); } @@ -493,9 +489,7 @@ PersistentQueryMetadata buildPersistentQueryInSharedRuntime( physicalPlan ), keyFormat, - processingLogContext.getLoggerFactory(), - metricCollectors.getMetrics(), - ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS) + processingLogContext.getLoggerFactory() ); if (real) { return binPackedPersistentQueryMetadata; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImpl.java index 2c29404e72e7..5ce8a3349069 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImpl.java @@ -48,8 +48,6 @@ import java.util.Optional; import java.util.Set; import java.util.function.Function; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.LagInfo; @@ -83,9 +81,6 @@ public class BinPackedPersistentQueryMetadataImpl implements PersistentQueryMeta private final Listener listener; private final Function namedTopologyBuilder; private final TimeBoundedQueue queryErrors; - private final Optional metrics; - private final Optional restartSensor; - private final MaterializationProviderBuilderFactory materializationProviderBuilderFactory; private final Optional scalablePushRegistry; @@ -117,10 +112,7 @@ public BinPackedPersistentQueryMetadataImpl( final Optional scalablePushRegistry, final Function namedTopologyBuilder, final KeyFormat keyFormat, - final ProcessingLoggerFactory loggerFactory, - final Metrics metrics, - final Map metricsTags - ) { + final ProcessingLoggerFactory loggerFactory) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck this.persistentQueryType = Objects.requireNonNull(persistentQueryType, "persistentQueryType"); this.statementString = Objects.requireNonNull(statementString, "statementString"); @@ -146,9 +138,6 @@ public BinPackedPersistentQueryMetadataImpl( this.namedTopologyBuilder = requireNonNull(namedTopologyBuilder, "namedTopologyBuilder"); this.queryErrors = sharedKafkaStreamsRuntime.getNewQueryErrorQueue(); this.scalablePushRegistry = requireNonNull(scalablePushRegistry, "scalablePushRegistry"); - this.metrics = Optional.of(metrics); - this.restartSensor = Optional.of( - QueryMetricsUtil.createQueryRestartMetricSensor(queryId.toString(), metricsTags, metrics)); this.keyFormat = requireNonNull(keyFormat, "keyFormat"); this.loggerFactory = requireNonNull(loggerFactory, "loggerFactory"); } @@ -179,8 +168,6 @@ public BinPackedPersistentQueryMetadataImpl( this.queryErrors = sharedKafkaStreamsRuntime.getNewQueryErrorQueue(); this.scalablePushRegistry = original.scalablePushRegistry; this.namedTopologyBuilder = original.namedTopologyBuilder; - this.metrics = Optional.empty(); - this.restartSensor = Optional.empty(); this.keyFormat = original.keyFormat; this.loggerFactory = original.loggerFactory; } @@ -419,9 +406,6 @@ public void close() { sharedKafkaStreamsRuntime.stop(queryId, false); scalablePushRegistry.ifPresent(ScalablePushRegistry::close); listener.onClose(this); - if (metrics.isPresent() && restartSensor.isPresent()) { - metrics.get().removeSensor(restartSensor.get().name()); - } } @Override @@ -443,8 +427,4 @@ Listener getListener() { return listener; } - @Override - public Optional getRestartMetricsSensor() { - return restartSensor; - } } \ No newline at end of file diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java index 694154ba13d0..262f8485c304 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java @@ -41,7 +41,6 @@ import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; @@ -89,9 +88,7 @@ public PersistentQueryMetadataImpl( final long retryBackoffMaxMs, final QueryMetadata.Listener listener, final Optional scalablePushRegistry, - final ProcessingLoggerFactory loggerFactory, - final Metrics metrics, - final Map metricsTags + final ProcessingLoggerFactory loggerFactory ) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck super( @@ -111,9 +108,7 @@ public PersistentQueryMetadataImpl( retryBackoffInitialMs, retryBackoffMaxMs, new QueryListenerWrapper(listener, scalablePushRegistry), - loggerFactory, - metrics, - metricsTags + loggerFactory ); this.sinkDataSource = requireNonNull(sinkDataSource, "sinkDataSource"); this.schemas = requireNonNull(schemas, "schemas"); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java index 5c654b24d5e7..4f6e66ff29a1 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java @@ -23,9 +23,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; -import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.LagInfo; import org.apache.kafka.streams.StreamsMetadata; @@ -75,8 +73,6 @@ public interface QueryMetadata { KafkaStreams getKafkaStreams(); - Optional getRestartMetricsSensor(); - void close(); void start(); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java index b37de4798332..26e9b64c2ba8 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java @@ -40,14 +40,11 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Collectors; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.LagInfo; @@ -59,11 +56,6 @@ import org.slf4j.LoggerFactory; public class QueryMetadataImpl implements QueryMetadata { - public static final String QUERY_RESTART_METRIC_NAME = "query-restart-total"; - public static final String QUERY_RESTART_METRIC_GROUP_NAME = "query-restart-metrics"; - public static final String QUERY_RESTART_METRIC_DESCRIPTION = - "The total number of times that a query thread has failed and then been restarted."; - private static final Logger LOG = LoggerFactory.getLogger(QueryMetadataImpl.class); private final String statementString; @@ -82,8 +74,6 @@ public class QueryMetadataImpl implements QueryMetadata { private final RetryEvent retryEvent; private final Listener listener; private final ProcessingLoggerFactory loggerFactory; - private final Optional metrics; - private final Optional restartSensor; private volatile boolean everStarted = false; private volatile KafkaStreams kafkaStreams; @@ -118,9 +108,7 @@ public long read() { final long baseWaitingTimeMs, final long retryBackoffMaxMs, final Listener listener, - final ProcessingLoggerFactory loggerFactory, - final Metrics metrics, - final Map metricsTags + final ProcessingLoggerFactory loggerFactory ) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck this.statementString = Objects.requireNonNull(statementString, "statementString"); @@ -143,15 +131,11 @@ public long read() { this.queryId = Objects.requireNonNull(queryId, "queryId"); this.errorClassifier = Objects.requireNonNull(errorClassifier, "errorClassifier"); this.queryErrors = new TimeBoundedQueue(Duration.ofHours(1), maxQueryErrorsQueueSize); - this.metrics = Optional.of(metrics); - this.restartSensor = Optional.of( - QueryMetricsUtil.createQueryRestartMetricSensor(queryId.toString(), metricsTags, metrics)); this.retryEvent = new RetryEvent( queryId, baseWaitingTimeMs, retryBackoffMaxMs, - CURRENT_TIME_MILLIS_TICKER, - restartSensor + CURRENT_TIME_MILLIS_TICKER ); this.loggerFactory = Objects.requireNonNull(loggerFactory, "loggerFactory"); } @@ -174,8 +158,6 @@ public long read() { this.errorClassifier = other.errorClassifier; this.everStarted = other.everStarted; this.queryErrors = new TimeBoundedQueue(Duration.ZERO, 0); - this.metrics = Optional.empty(); - this.restartSensor = Optional.empty(); this.retryEvent = new RetryEvent( other.getQueryId(), 0, @@ -185,8 +167,7 @@ public long read() { public long read() { return 0; } - }, - this.restartSensor + } ); this.listener = Objects.requireNonNull(listener, "stopListeners"); @@ -277,11 +258,6 @@ public Topology getTopology() { return topology; } - @Override - public Optional getRestartMetricsSensor() { - return restartSensor; - } - public Map> getAllLocalStorePartitionLags() { try { return kafkaStreams.allLocalStorePartitionLags(); @@ -386,9 +362,6 @@ public void close() { loggerFactory.getLoggersWithPrefix(queryId.toString()).forEach(ProcessingLogger::close); doClose(true); listener.onClose(this); - if (metrics.isPresent() && restartSensor.isPresent()) { - metrics.get().removeSensor(restartSensor.get().name()); - } } void doClose(final boolean cleanUp) { @@ -456,14 +429,12 @@ public static class RetryEvent implements QueryMetadata.RetryEvent { private long waitingTimeMs; private long expiryTimeMs; private long retryBackoffMaxMs; - private Optional sensor; RetryEvent( final QueryId queryId, final long baseWaitingTimeMs, final long retryBackoffMaxMs, - final Ticker ticker, - final Optional sensor + final Ticker ticker ) { this.ticker = ticker; this.queryId = queryId; @@ -473,7 +444,6 @@ public static class RetryEvent implements QueryMetadata.RetryEvent { this.waitingTimeMs = baseWaitingTimeMs; this.retryBackoffMaxMs = retryBackoffMaxMs; this.expiryTimeMs = now + baseWaitingTimeMs; - this.sensor = sensor; } public long nextRestartTimeMs() { @@ -495,7 +465,6 @@ public void backOff(final String threadName) { Thread.currentThread().interrupt(); } - sensor.ifPresent(Sensor::record); final int retries = numRetries.merge(threadName, 1, Integer::sum); LOG.info( diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetricsUtil.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetricsUtil.java deleted file mode 100644 index 3130f004ef0b..000000000000 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetricsUtil.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2022 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.util; - -import java.util.Map; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.CumulativeSum; - -public final class QueryMetricsUtil { - - private QueryMetricsUtil() {} - // returns a metrics sensor that tracks the number of times a query was restarted when hitting - // an uncaught exception - - public static Sensor createQueryRestartMetricSensor( - final String queryId, - final Map metricsTags, - final Metrics metrics - ) { - final Map customMetricsTagsForQuery = - MetricsTagsUtil.getMetricsTagsWithQueryId(queryId, metricsTags); - final MetricName restartMetricName = metrics.metricName( - QueryMetadataImpl.QUERY_RESTART_METRIC_NAME, - QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME, - QueryMetadataImpl.QUERY_RESTART_METRIC_DESCRIPTION, - customMetricsTagsForQuery - ); - final Sensor sensor = metrics.sensor( - QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME + "-" + queryId); - sensor.add(restartMetricName, new CumulativeSum()); - return sensor; - } -} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java index e18bce448d82..7097a6101114 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java @@ -54,7 +54,7 @@ public SharedKafkaStreamsRuntimeImpl(final KafkaStreamsBuilder kafkaStreamsBuild final QueryErrorClassifier errorClassifier, final int maxQueryErrorsQueueSize, final long shutdownTimeoutConfig, - final Map streamsProperties) { + final Map streamsProperties) { super( kafkaStreamsBuilder, streamsProperties @@ -125,9 +125,6 @@ public StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse uncaughtHan if (queryInError != null) { queryInError.setQueryError(queryError); - if (queryInError.getRestartMetricsSensor().isPresent()) { - queryInError.getRestartMetricsSensor().get().record(); - } log.error(String.format( "Unhandled query exception caught in streams thread %s for query %s. (%s)", Thread.currentThread().getName(), @@ -138,11 +135,7 @@ public StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse uncaughtHan } else { for (BinPackedPersistentQueryMetadataImpl query : collocatedQueries.values()) { query.setQueryError(queryError); - if (query.getRestartMetricsSensor().isPresent()) { - query.getRestartMetricsSensor().get().record(); - } } - log.error(String.format( "Unhandled runtime exception caught in streams thread %s. (%s)", Thread.currentThread().getName(), @@ -151,7 +144,6 @@ public StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse uncaughtHan ); } } - return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD; } @@ -251,7 +243,6 @@ public void start(final QueryId queryId) { throw new IllegalArgumentException("Cannot start because query " + queryId + " was not " + "registered to runtime " + getApplicationId()); } - log.info("Query {} was started successfully", queryId); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java index b6bcb68e982b..dcc3eb6bd0d1 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java @@ -30,7 +30,6 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.streams.Topology; /** @@ -61,9 +60,7 @@ public TransientQueryMetadata( final long retryBackoffInitialMs, final long retryBackoffMaxMs, final Listener listener, - final ProcessingLoggerFactory loggerFactory, - final Metrics metrics, - final Map metricsTags + final ProcessingLoggerFactory loggerFactory ) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck super( @@ -83,9 +80,7 @@ public TransientQueryMetadata( retryBackoffInitialMs, retryBackoffMaxMs, listener, - loggerFactory, - metrics, - metricsTags + loggerFactory ); this.rowQueue = Objects.requireNonNull(rowQueue, "rowQueue"); this.resultType = Objects.requireNonNull(resultType, "resultType"); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/internal/QueryStateMetricsReportingListenerTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/internal/QueryStateMetricsReportingListenerTest.java index ae68efdb5043..e7e5c65c2eca 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/internal/QueryStateMetricsReportingListenerTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/internal/QueryStateMetricsReportingListenerTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.streams.KafkaStreams.State; import org.junit.Before; import org.junit.Test; @@ -46,12 +47,18 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + @RunWith(MockitoJUnitRunner.class) public class QueryStateMetricsReportingListenerTest { private static final MetricName METRIC_NAME_1 = new MetricName("bob", "g1", "d1", ImmutableMap.of()); private static final MetricName METRIC_NAME_2 = new MetricName("dylan", "g1", "d1", ImmutableMap.of()); + private static final MetricName METRIC_NAME_3 = + new MetricName("steven", "g1", "d1", ImmutableMap.of()); private static final QueryId QUERY_ID = new QueryId("foo"); private static final String TAG = "_confluent-ksql-" + "some-prefix-" + "query_" + QUERY_ID.toString(); @@ -67,14 +74,17 @@ public class QueryStateMetricsReportingListenerTest { private ArgumentCaptor> gaugeCaptor; private QueryStateMetricsReportingListener listener; + private final Map metricsTags = Collections.singletonMap("tag1", "value1"); + @Before public void setUp() { when(metrics.metricName(any(), any(), any(), anyMap())) .thenReturn(METRIC_NAME_1) - .thenReturn(METRIC_NAME_2); + .thenReturn(METRIC_NAME_2) + .thenReturn(METRIC_NAME_3); when(query.getQueryId()).thenReturn(QUERY_ID); - listener = new QueryStateMetricsReportingListener(metrics, "some-prefix-"); + listener = new QueryStateMetricsReportingListener(metrics, "some-prefix-", metricsTags); } @Test @@ -86,17 +96,24 @@ public void shouldThrowOnNullParams() { public void shouldAddMetricOnCreation() { // When: listener.onCreate(serviceContext, metaStore, query); + final Map tags = new HashMap<>(metricsTags); + tags.put("status", TAG); // Then: verify(metrics).metricName("query-status", "some-prefix-ksql-queries", "The current status of the given query.", - ImmutableMap.of("status", TAG)); + tags); verify(metrics).metricName("error-status", "some-prefix-ksql-queries", "The current error status of the given query, if the state is in ERROR state", - ImmutableMap.of("status", TAG)); + tags); + tags.put("query-id", QUERY_ID.toString()); + verify(metrics).metricName(QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_NAME, "some-prefix-ksql-queries", + QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_DESCRIPTION, + tags); verify(metrics).addMetric(eq(METRIC_NAME_1), isA(Gauge.class)); verify(metrics).addMetric(eq(METRIC_NAME_2), isA(Gauge.class)); + verify(metrics).addMetric(eq(METRIC_NAME_3), isA(CumulativeSum.class)); } @Test @@ -123,20 +140,26 @@ public void shouldGracefullyHandleErrorCallbackAfterDeregister() { public void shouldAddMetricWithSuppliedPrefix() { // Given: final String groupPrefix = "some-prefix-"; + final Map tags = new HashMap<>(metricsTags); + tags.put("status", TAG); clearInvocations(metrics); // When: - listener = new QueryStateMetricsReportingListener(metrics, groupPrefix); + listener = new QueryStateMetricsReportingListener(metrics, groupPrefix, metricsTags); listener.onCreate(serviceContext, metaStore, query); // Then: verify(metrics).metricName("query-status", groupPrefix + "ksql-queries", "The current status of the given query.", - ImmutableMap.of("status", TAG)); + tags); verify(metrics).metricName("error-status", groupPrefix + "ksql-queries", "The current error status of the given query, if the state is in ERROR state", - ImmutableMap.of("status", TAG)); + tags); + tags.put("query-id", QUERY_ID.toString()); + verify(metrics).metricName(QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_NAME, "some-prefix-ksql-queries", + QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_DESCRIPTION, + tags); } @Test @@ -178,6 +201,7 @@ public void shouldRemoveMetricOnClose() { // Then: verify(metrics).removeMetric(METRIC_NAME_1); verify(metrics).removeMetric(METRIC_NAME_2); + verify(metrics).removeMetric(METRIC_NAME_3); } private String currentGaugeValue(final MetricName name) { diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImplTest.java index a51dfb136713..d04f576aefbb 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImplTest.java @@ -16,7 +16,6 @@ package io.confluent.ksql.util; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -41,13 +40,9 @@ import io.confluent.ksql.util.QueryMetadata.Listener; import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.Optional; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology; @@ -104,17 +99,11 @@ public class BinPackedPersistentQueryMetadataImplTest { private MaterializationProviderBuilderFactory.MaterializationProviderBuilder materializationProviderBuilder; @Mock private MeteredProcessingLoggerFactory loggerFactory; - @Mock - private Metrics metrics; - @Mock - private Sensor sensor; - private Map metricsTags = Collections.singletonMap("tag1", "value1"); private PersistentQueryMetadata query; @Before public void setUp() { - when(metrics.sensor(anyString())).thenReturn(sensor); query = new BinPackedPersistentQueryMetadataImpl( KsqlConstants.PersistentQueryType.CREATE_AS, SQL, @@ -136,9 +125,7 @@ public void setUp() { scalablePushRegistry, (runtime) -> topology, keyFormat, - loggerFactory, - metrics, - metricsTags); + loggerFactory); query.initialize(); when(materializationProviderBuilderFactory.materializationProviderBuilder( @@ -190,9 +177,8 @@ public void shouldCallKafkaStreamsCloseOnStop() { } @Test - public void shouldRemoveMetricWhenClose() { + public void shouldCloseProcessingLoggers() { // Given: - final Sensor restartSensor = query.getRestartMetricsSensor().orElseGet(() -> mock(Sensor.class)); final ProcessingLogger processingLogger1 = mock(ProcessingLogger.class); final ProcessingLogger processingLogger2 = mock(ProcessingLogger.class); when(loggerFactory.getLoggersWithPrefix(QUERY_ID.toString())).thenReturn(Arrays.asList(processingLogger1, processingLogger2)); @@ -201,7 +187,6 @@ public void shouldRemoveMetricWhenClose() { query.close(); // Then: - verify(metrics).removeSensor(restartSensor.name()); verify(processingLogger1).close(); verify(processingLogger2).close(); } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java index 3f533f783f5b..7076197d5abc 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java @@ -18,7 +18,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -46,9 +45,6 @@ import java.util.Collections; import java.util.Map; import java.util.Optional; - -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.Topology; @@ -100,11 +96,7 @@ public class PersistentQueryMetadataTest { @Mock private ScalablePushRegistry scalablePushRegistry; @Mock - private Metrics metrics; - @Mock private MeteredProcessingLoggerFactory processingLoggerFactory; - @Mock - private Sensor sensor; private PersistentQueryMetadata query; @@ -115,7 +107,6 @@ public void setUp() { when(materializationProviderBuilder.apply(kafkaStreams, topology)) .thenReturn(Optional.of(materializationProvider)); when(kafkaStreams.state()).thenReturn(State.NOT_RUNNING); - when(metrics.sensor(anyString())).thenReturn(sensor); query = new PersistentQueryMetadataImpl( KsqlConstants.PersistentQueryType.CREATE_AS, @@ -141,9 +132,7 @@ public void setUp() { 0L, listener, Optional.of(scalablePushRegistry), - processingLoggerFactory, - metrics, - Collections.emptyMap() + processingLoggerFactory ); query.initialize(); @@ -176,9 +165,7 @@ public void shouldReturnInsertQueryType() { 0L, listener, Optional.empty(), - processingLoggerFactory, - metrics, - Collections.emptyMap() + processingLoggerFactory ); // When/Then @@ -212,9 +199,7 @@ public void shouldReturnCreateAsQueryType() { 0L, listener, Optional.empty(), - processingLoggerFactory, - metrics, - Collections.emptyMap() + processingLoggerFactory ); // When/Then diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/QueryMetadataTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/QueryMetadataTest.java index 8512da612893..ca1247b82fd6 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/QueryMetadataTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/QueryMetadataTest.java @@ -100,16 +100,11 @@ public class QueryMetadataTest { private Ticker ticker; @Mock private MeteredProcessingLoggerFactory loggerFactory; - @Mock - private Sensor mockSensor; private QueryMetadataImpl query; - private MetricCollectors metricCollectors; - private final Map metricsTags = Collections.singletonMap("cluster-id", "cluster-1"); @Before public void setup() { - metricCollectors = new MetricCollectors(); when(kafkaStreamsBuilder.build(topoplogy, Collections.emptyMap())).thenReturn(kafkaStreams); when(classifier.classify(any())).thenReturn(Type.UNKNOWN); when(kafkaStreams.state()).thenReturn(State.NOT_RUNNING); @@ -131,9 +126,7 @@ public void setup() { 0L, 0L, listener, - loggerFactory, - metricCollectors.getMetrics(), - metricsTags + loggerFactory ){ }; query.initialize(); @@ -282,8 +275,7 @@ public void shouldRetryEventStartWithInitialValues() { QUERY_ID, RETRY_BACKOFF_INITIAL_MS, RETRY_BACKOFF_MAX_MS, - ticker, - Optional.of(mockSensor) + ticker ); // Then: @@ -302,8 +294,7 @@ public void shouldRetryEventRestartAndIncrementBackoffTime() { QUERY_ID, RETRY_BACKOFF_INITIAL_MS, RETRY_BACKOFF_MAX_MS, - ticker, - Optional.of(mockSensor) + ticker ); retryEvent.backOff("thread-name"); @@ -328,8 +319,7 @@ public void shouldRetryEventRestartAndNotExceedBackoffMaxTime() { QUERY_ID, RETRY_BACKOFF_INITIAL_MS, RETRY_BACKOFF_MAX_MS, - ticker, - Optional.of(mockSensor) + ticker ); retryEvent.backOff("thread-name"); retryEvent.backOff("thread-name"); @@ -367,56 +357,4 @@ public void shouldCloseProcessingLoggers() { verify(processingLogger1).close(); verify(processingLogger2).close(); } - - @Test - public void shouldRecordMetricForEachStreamsThreadRestarted() { - // Given: - final long now = 20; - when(ticker.read()).thenReturn(now); - - // When: - final QueryMetadataImpl.RetryEvent retryEvent = new QueryMetadataImpl.RetryEvent( - QUERY_ID, - RETRY_BACKOFF_INITIAL_MS, - RETRY_BACKOFF_MAX_MS, - ticker, - query.getRestartMetricsSensor() - ); - retryEvent.backOff("thread-name"); - retryEvent.backOff("thread-name"); - retryEvent.backOff("thread-name-2"); - - // Then: - assertThat(getMetricValue(QUERY_ID.toString(), metricsTags), is(3.0)); - } - - @Test - public void shouldIncrementMetricWhenUncaughtExceptionAndRestart() { - // Given: - when(classifier.classify(any())).thenThrow(new RuntimeException("bar")); - - // When: - query.uncaughtHandler(new RuntimeException("foo")); - query.uncaughtHandler(new RuntimeException("bar")); - query.uncaughtHandler(new RuntimeException("too")); - - - // Then: - assertThat(getMetricValue(QUERY_ID.toString(), metricsTags), is(3.0)); - } - - private double getMetricValue(final String queryId, final Map metricsTags) { - final Map customMetricsTags = new HashMap<>(metricsTags); - customMetricsTags.put("query-id", queryId); - final Metrics metrics = metricCollectors.getMetrics(); - return Double.parseDouble( - metrics.metric( - metrics.metricName( - QueryMetadataImpl.QUERY_RESTART_METRIC_NAME, - QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME, - QueryMetadataImpl.QUERY_RESTART_METRIC_DESCRIPTION, - customMetricsTags) - ).metricValue().toString() - ); - } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataImplTest.java index bdba5e082418..aa69edc3fef0 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataImplTest.java @@ -16,7 +16,6 @@ package io.confluent.ksql.util; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -39,9 +38,6 @@ import java.util.Map; import java.util.Optional; import java.util.function.Consumer; - -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.Topology; import org.junit.Before; @@ -93,10 +89,6 @@ public class SandboxedPersistentQueryMetadataImplTest { private Listener sandboxListener; @Mock private MeteredProcessingLoggerFactory processingLoggerFactory; - @Mock - private Metrics metrics; - @Mock - private Sensor sensor; private SandboxedPersistentQueryMetadataImpl sandbox; @@ -106,7 +98,6 @@ public void setUp() { when(physicalSchema.logicalSchema()).thenReturn(mock(LogicalSchema.class)); when(materializationProviderBuilder.apply(kafkaStreams, topology)) .thenReturn(Optional.of(materializationProvider)); - when(metrics.sensor(anyString())).thenReturn(sensor); final PersistentQueryMetadataImpl query = new PersistentQueryMetadataImpl( KsqlConstants.PersistentQueryType.CREATE_AS, @@ -132,9 +123,7 @@ public void setUp() { 0L, listener, Optional.empty(), - processingLoggerFactory, - metrics, - Collections.emptyMap() + processingLoggerFactory ); query.initialize(); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java index 27e83cd45157..d0b26eeff695 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java @@ -15,7 +15,6 @@ package io.confluent.ksql.util; -import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.query.KafkaStreamsBuilder; import io.confluent.ksql.query.QueryError.Type; import io.confluent.ksql.query.QueryErrorClassifier; @@ -24,8 +23,6 @@ import java.util.Collections; import java.util.HashMap; import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; @@ -42,12 +39,10 @@ import java.util.Optional; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -72,15 +67,10 @@ public class SharedKafkaStreamsRuntimeImplTest { private AddNamedTopologyResult addNamedTopologyResult; @Mock private KafkaFuture future; - @Mock - private Sensor restartErrorSensorQuery1; - @Mock - private Sensor restartErrorSensorQuery2; private final QueryId queryId = new QueryId("query-1"); private final QueryId queryId2= new QueryId("query-2"); private Map streamProps = new HashMap(); - private final Map metricsTags = Collections.singletonMap("cluster-id", "cluster-1"); private final StreamsException query1Exception = new StreamsException("query down!", new TaskId(0, 0, queryId.toString())); @@ -113,8 +103,6 @@ public void setUp() throws Exception { when(binPackedPersistentQueryMetadata.getTopologyCopy(any())).thenReturn(namedTopology); when(binPackedPersistentQueryMetadata.getQueryId()).thenReturn(queryId); when(binPackedPersistentQueryMetadata2.getQueryId()).thenReturn(queryId2); - when(binPackedPersistentQueryMetadata.getRestartMetricsSensor()).thenReturn(Optional.of(restartErrorSensorQuery1)); - when(binPackedPersistentQueryMetadata2.getRestartMetricsSensor()).thenReturn(Optional.of(restartErrorSensorQuery2)); sharedKafkaStreamsRuntimeImpl.register( binPackedPersistentQueryMetadata @@ -263,42 +251,4 @@ public void shouldNotStartOrAddedToStreamsIfOnlyRegistered() { verify(kafkaStreamsNamedTopologyWrapper, never()) .addNamedTopology(binPackedPersistentQueryMetadata2.getTopologyCopy(sharedKafkaStreamsRuntimeImpl)); } - - @Test - public void shouldRecordMetricForQuery1WhenError() { - //Given: - when(queryErrorClassifier.classify(query1Exception)).thenReturn(Type.USER); - sharedKafkaStreamsRuntimeImpl.register( - binPackedPersistentQueryMetadata2 - ); - - //When: - sharedKafkaStreamsRuntimeImpl.start(queryId); - sharedKafkaStreamsRuntimeImpl.start(queryId2); - sharedKafkaStreamsRuntimeImpl.uncaughtHandler(query1Exception); - - //Then: - verify(restartErrorSensorQuery1, times(1)).record(); - verify(restartErrorSensorQuery2, never()).record(); - } - - @Test - public void shouldRecordMetricForAllQueriesWhenErrorWithNoTask() { - when(queryErrorClassifier.classify(runtimeExceptionWithNoTask)).thenReturn(Type.USER); - - sharedKafkaStreamsRuntimeImpl.register( - binPackedPersistentQueryMetadata2 - ); - - //When: - sharedKafkaStreamsRuntimeImpl.start(queryId); - sharedKafkaStreamsRuntimeImpl.start(queryId2); - - sharedKafkaStreamsRuntimeImpl.uncaughtHandler(runtimeExceptionWithNoTask); - sharedKafkaStreamsRuntimeImpl.uncaughtHandler(runtimeExceptionWithNoTask); - - //Then: - verify(restartErrorSensorQuery1, times(2)).record(); - verify(restartErrorSensorQuery2, times(2)).record(); - } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java index b50a6ec6b2f4..ae77e0e747cc 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java @@ -30,14 +30,9 @@ import io.confluent.ksql.util.KsqlConstants.KsqlQueryType; import io.confluent.ksql.util.PushQueryMetadata.ResultType; import io.confluent.ksql.util.QueryMetadata.Listener; - -import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.function.Consumer; - -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.Topology; @@ -80,10 +75,6 @@ public class TransientQueryMetadataTest { private Listener listener; @Mock private MeteredProcessingLoggerFactory loggerFactory; - @Mock - private Metrics metrics; - @Mock - private Sensor sensor; private TransientQueryMetadata query; @@ -92,7 +83,6 @@ public void setUp() { when(kafkaStreamsBuilder.build(any(), any())).thenReturn(kafkaStreams); when(kafkaStreams.state()).thenReturn(State.NOT_RUNNING); when(sourceNames.toArray()).thenReturn(new SourceName[0]); - when(metrics.sensor(any())).thenReturn(sensor); query = new TransientQueryMetadata( SQL, @@ -112,9 +102,7 @@ public void setUp() { 0L, 0L, listener, - loggerFactory, - metrics, - Collections.emptyMap() + loggerFactory ); query.initialize(); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java index 436db38df3cf..0c8f0aa0f217 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java @@ -57,9 +57,6 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; - -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyDescription; @@ -116,10 +113,6 @@ public class QueryDescriptionFactoryTest { private QueryMetadata.Listener listener; @Mock private MeteredProcessingLoggerFactory processingLoggerFactory; - @Mock - private Metrics metrics; - @Mock - private Sensor sensor; private QueryMetadata transientQuery; private PersistentQueryMetadata persistentQuery; @@ -128,7 +121,6 @@ public class QueryDescriptionFactoryTest { @Before public void setUp() { - when(metrics.sensor(any())).thenReturn(sensor); when(topology.describe()).thenReturn(topologyDescription); when(kafkaStreamsBuilder.build(any(), any())).thenReturn(queryStreams); when(queryStreams.metadataForLocalThreads()).thenReturn(Collections.emptySet()); @@ -156,9 +148,7 @@ public void setUp() { 0L, 0L, listener, - processingLoggerFactory, - metrics, - Collections.emptyMap() + processingLoggerFactory ); transientQuery.initialize(); @@ -188,9 +178,7 @@ public void setUp() { 0L, listener, Optional.empty(), - processingLoggerFactory, - metrics, - Collections.emptyMap() + processingLoggerFactory ); persistentQuery.initialize(); @@ -305,9 +293,7 @@ public void shouldHandleRowTimeInValueSchemaForTransientQuery() { 0L, 0L, listener, - processingLoggerFactory, - metrics, - Collections.emptyMap() + processingLoggerFactory ); transientQuery.initialize(); @@ -348,9 +334,7 @@ public void shouldHandleRowKeyInValueSchemaForTransientQuery() { 0L, 0L, listener, - processingLoggerFactory, - metrics, - Collections.emptyMap() + processingLoggerFactory ); transientQuery.initialize(); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/QueryRestartMetricFunctionalTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/QueryRestartMetricFunctionalTest.java index f2f1f58d7dfa..b073b9a39477 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/QueryRestartMetricFunctionalTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/QueryRestartMetricFunctionalTest.java @@ -20,23 +20,20 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.integration.IntegrationTestHarness; -import io.confluent.ksql.rest.entity.KsqlEntity; -import io.confluent.ksql.rest.entity.Queries; -import io.confluent.ksql.rest.entity.RunningQuery; +import io.confluent.ksql.internal.QueryStateMetricsReportingListener; import io.confluent.ksql.rest.server.TestKsqlRestApp; import io.confluent.ksql.util.KsqlConfig; -import java.util.Collections; + import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import io.confluent.ksql.util.QueryMetadataImpl; -import io.confluent.ksql.util.QueryMetricsUtil; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.KafkaMetric; @@ -45,10 +42,8 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Ignore; -import org.junit.Rule; import org.junit.Test; import org.junit.rules.RuleChain; -import org.junit.rules.Timeout; @Ignore public class QueryRestartMetricFunctionalTest { @@ -117,22 +112,30 @@ public void shouldVerifyMetricsOnNonSharedRuntimeServer() { final List listOfQueryId = RestIntegrationTestUtil.getQueryIds(REST_APP_NO_SHARED_RUNTIME); assertThat(listOfQueryId.size(), equalTo(1)); metricsTagsForQuery.put("query-id", listOfQueryId.get(0)); + metricsTagsForQuery.put("ksql_service_id", KsqlConfig.KSQL_SERVICE_ID_DEFAULT); // When: TEST_HARNESS.produceRecord(TEST_TOPIC_NAME, null, "5,900.1"); TEST_HARNESS.produceRecord(TEST_TOPIC_NAME, null, "5,900.1"); TEST_HARNESS.produceRecord(TEST_TOPIC_NAME, null, "5,900.1"); - System.out.println("records done"); metricsNoSharedRuntime = ((KsqlEngine)REST_APP_NO_SHARED_RUNTIME.getEngine()).getEngineMetrics().getMetrics(); - final KafkaMetric restartMetric1 = getKafkaMetric(metricsNoSharedRuntime, metricsTagsForQuery); + final KafkaMetric restartMetric = getKafkaMetric(metricsNoSharedRuntime, metricsTagsForQuery, "app-0-"); // Then: - assertThatEventually(() -> (Double) restartMetric1.metricValue(), greaterThan(8.0)); + assertThatEventually(() -> (Double) restartMetric.metricValue(), greaterThan(8.0)); + + // should clean up metrics when queries are terminated + for (final String queryId:listOfQueryId) { + RestIntegrationTestUtil.makeKsqlRequest(REST_APP_NO_SHARED_RUNTIME, "terminate " + queryId + ";"); + } + + final KafkaMetric restartMetricAfterTerminate = getKafkaMetric(metricsNoSharedRuntime, metricsTagsForQuery, "app-0-"); + assertThat(restartMetricAfterTerminate, nullValue()); } @Test - public void shouldVerifyMetricsOnSharedRuntimeServer() throws InterruptedException { + public void shouldVerifyMetricsOnSharedRuntimeServer() { // Given: final Map metricsTagsForQuery1 = new HashMap<>(METRICS_TAGS); final Map metricsTagsForQuery2 = new HashMap<>(METRICS_TAGS); @@ -141,8 +144,10 @@ public void shouldVerifyMetricsOnSharedRuntimeServer() throws InterruptedExcepti for (final String queryId:listOfQueryId) { if (queryId.toLowerCase().contains("s2")) { metricsTagsForQuery1.put("query-id", queryId); + metricsTagsForQuery1.put("ksql_service_id", "another-id"); } else if (queryId.toLowerCase().contains("s3")) { metricsTagsForQuery2.put("query-id", queryId); + metricsTagsForQuery2.put("ksql_service_id", "another-id"); } } @@ -150,19 +155,30 @@ public void shouldVerifyMetricsOnSharedRuntimeServer() throws InterruptedExcepti TEST_HARNESS.produceRecord(TEST_TOPIC_NAME2, null, "5,900.1"); TEST_HARNESS.produceRecord(TEST_TOPIC_NAME2, null, "5,900.1"); metricsSharedRuntime = ((KsqlEngine)REST_APP_SHARED_RUNTIME.getEngine()).getEngineMetrics().getMetrics(); - final KafkaMetric restartMetric1 = getKafkaMetric(metricsSharedRuntime, metricsTagsForQuery1); - final KafkaMetric restartMetric2 = getKafkaMetric(metricsSharedRuntime, metricsTagsForQuery2); + final KafkaMetric restartMetric1 = getKafkaMetric(metricsSharedRuntime, metricsTagsForQuery1, "app-1-"); + final KafkaMetric restartMetric2 = getKafkaMetric(metricsSharedRuntime, metricsTagsForQuery2, "app-1-"); // Then: assertThatEventually(() -> (Double) restartMetric1.metricValue(), greaterThanOrEqualTo(1.0)); assertThatEventually(() -> (Double) restartMetric2.metricValue(), greaterThanOrEqualTo(1.0)); + + + // should clean up metrics when queries are terminated + for (final String queryId:listOfQueryId) { + RestIntegrationTestUtil.makeKsqlRequest(REST_APP_SHARED_RUNTIME, "terminate " + queryId + ";"); + } + + final KafkaMetric restartMetricAfterTerminate1 = getKafkaMetric(metricsSharedRuntime, metricsTagsForQuery1, "app-1-"); + final KafkaMetric restartMetricAfterTerminate2 = getKafkaMetric(metricsSharedRuntime, metricsTagsForQuery2, "app-1-"); + assertThat(restartMetricAfterTerminate1, nullValue()); + assertThat(restartMetricAfterTerminate2, nullValue()); } - private KafkaMetric getKafkaMetric(final Metrics metrics, final Map metricsTags) { + private KafkaMetric getKafkaMetric(final Metrics metrics, final Map metricsTags, final String metricPrefix) { return metrics.metric(new MetricName( - QueryMetadataImpl.QUERY_RESTART_METRIC_NAME, - QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME, - QueryMetadataImpl.QUERY_RESTART_METRIC_DESCRIPTION, + QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_NAME, + metricPrefix + "ksql-queries", + QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_DESCRIPTION, metricsTags )); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 8fcc58b7c310..1d5f82a96aaa 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -120,8 +120,6 @@ import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.acl.AclOperation; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.StreamsConfig; @@ -209,10 +207,6 @@ public class StreamedQueryResourceTest { private QueryMetadataHolder queryMetadataHolder; @Mock private MeteredProcessingLoggerFactory loggerFactory; - @Mock - private Metrics metrics; - @Mock - private Sensor sensor; private StreamedQueryResource testResource; private PreparedStatement invalid; @@ -223,7 +217,6 @@ public class StreamedQueryResourceTest { @Before public void setup() { when(serviceContext.getTopicClient()).thenReturn(mockKafkaTopicClient); - when(metrics.sensor(any())).thenReturn(sensor); query = PreparedStatement.of(PUSH_QUERY_STRING, mock(Query.class)); invalid = PreparedStatement.of("sql", mock(Statement.class)); when(mockStatementParser.parseSingleStatement(PUSH_QUERY_STRING)).thenReturn(invalid); @@ -573,9 +566,7 @@ public void shouldStreamRowsCorrectly() throws Throwable { 0L, 0L, listener, - loggerFactory, - metrics, - Collections.emptyMap() + loggerFactory ); transientQueryMetadata.initialize();