diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java index e2e94318a947..074e2a0865b4 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java @@ -158,7 +158,7 @@ public QueryEventListener getQueryEventListener() { final String metricsPrefix = metricGroupPrefix.equals(KsqlEngineMetrics.DEFAULT_METRIC_GROUP_PREFIX) ? "" : metricGroupPrefix; - return new QueryStateMetricsReportingListener(metrics, metricsPrefix); + return new QueryStateMetricsReportingListener(metrics, metricsPrefix, newCustomMetricsTags); } private void recordMessageConsumptionByQueryStats( diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/internal/QueryStateMetricsReportingListener.java b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/QueryStateMetricsReportingListener.java index 3c95a5af98c6..80bd2a50c1e2 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/internal/QueryStateMetricsReportingListener.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/QueryStateMetricsReportingListener.java @@ -22,16 +22,22 @@ import io.confluent.ksql.query.QueryId; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.QueryMetadata; -import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.streams.KafkaStreams.State; public class QueryStateMetricsReportingListener implements QueryEventListener { + public static final String QUERY_RESTART_METRIC_NAME = "query-restart-total"; + public static final String QUERY_RESTART_METRIC_DESCRIPTION = + "The total number of times that a query thread has failed and then been restarted."; public static final Ticker CURRENT_TIME_MILLIS_TICKER = new Ticker() { @Override @@ -42,12 +48,18 @@ public long read() { private final Metrics metrics; private final String metricsPrefix; + private final Map metricsTags; private final ConcurrentMap perQuery = new ConcurrentHashMap<>(); - QueryStateMetricsReportingListener(final Metrics metrics, final String metricsPrefix) { + QueryStateMetricsReportingListener( + final Metrics metrics, + final String metricsPrefix, + final Map metricsTags + ) { this.metrics = Objects.requireNonNull(metrics, "metrics"); this.metricsPrefix = Objects.requireNonNull(metricsPrefix, "metricGroupPrefix"); + this.metricsTags = Objects.requireNonNull(metricsTags); } @Override @@ -60,7 +72,12 @@ public void onCreate( } perQuery.put( queryMetadata.getQueryId(), - new PerQueryListener(metrics, metricsPrefix, queryMetadata.getQueryId().toString()) + new PerQueryListener( + metrics, + metricsPrefix, + queryMetadata.getQueryId().toString(), + metricsTags + ) ); } @@ -96,6 +113,8 @@ private static class PerQueryListener { private final Metrics metrics; private final MetricName stateMetricName; private final MetricName errorMetricName; + private final MetricName queryRestartMetricName; + private final CumulativeSum queryRestartSum; private final Ticker ticker; private volatile String state = "-"; @@ -104,16 +123,18 @@ private static class PerQueryListener { PerQueryListener( final Metrics metrics, final String groupPrefix, - final String queryId + final String queryId, + final Map metricsTags ) { - this(metrics, groupPrefix, queryId, CURRENT_TIME_MILLIS_TICKER); + this(metrics, groupPrefix, queryId, CURRENT_TIME_MILLIS_TICKER, metricsTags); } PerQueryListener( final Metrics metrics, final String groupPrefix, final String queryId, - final Ticker ticker + final Ticker ticker, + final Map metricsTags ) { Objects.requireNonNull(groupPrefix, "groupPrefix"); Objects.requireNonNull(queryId, "queryId"); @@ -124,20 +145,34 @@ private static class PerQueryListener { final String tag = "_confluent-ksql-" + groupPrefix + type + queryId; + final Map tagsForStateAndError = new HashMap<>(metricsTags); + tagsForStateAndError.put("status", tag); this.stateMetricName = metrics.metricName( "query-status", groupPrefix + "ksql-queries", "The current status of the given query.", - Collections.singletonMap("status", tag)); + tagsForStateAndError + ); errorMetricName = metrics.metricName( "error-status", groupPrefix + "ksql-queries", "The current error status of the given query, if the state is in ERROR state", - Collections.singletonMap("status", tag) + tagsForStateAndError + ); + + final Map restartTags = new HashMap<>(tagsForStateAndError); + restartTags.put("query-id", queryId); + queryRestartMetricName = metrics.metricName( + QUERY_RESTART_METRIC_NAME, + groupPrefix + "ksql-queries", + QUERY_RESTART_METRIC_DESCRIPTION, + restartTags ); + this.queryRestartSum = new CumulativeSum(); this.metrics.addMetric(stateMetricName, (Gauge) (config, now) -> state); this.metrics.addMetric(errorMetricName, (Gauge) (config, now) -> error); + this.metrics.addMetric(queryRestartMetricName, queryRestartSum); } public void onChange(final State newState, final State oldState) { @@ -150,11 +185,13 @@ public void onChange(final State newState, final State oldState) { public void onError(final QueryError observedError) { error = observedError.getType().name(); + queryRestartSum.record(new MetricConfig(), 1, System.currentTimeMillis()); } public void onDeregister() { metrics.removeMetric(stateMetricName); metrics.removeMetric(errorMetricName); + metrics.removeMetric(queryRestartMetricName); } } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java index 8d60e8f70aa9..819adc1bbf35 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java @@ -227,9 +227,7 @@ TransientQueryMetadata buildTransientQuery( ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_INITIAL_MS), ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_MAX_MS), listener, - processingLogContext.getLoggerFactory(), - metricCollectors.getMetrics(), - ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS) + processingLogContext.getLoggerFactory() ); } @@ -377,9 +375,7 @@ PersistentQueryMetadata buildPersistentQueryInDedicatedRuntime( ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_MAX_MS), listener, scalablePushRegistry, - processingLogContext.getLoggerFactory(), - metricCollectors.getMetrics(), - ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS) + processingLogContext.getLoggerFactory() ); } @@ -493,9 +489,7 @@ PersistentQueryMetadata buildPersistentQueryInSharedRuntime( physicalPlan ), keyFormat, - processingLogContext.getLoggerFactory(), - metricCollectors.getMetrics(), - ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS) + processingLogContext.getLoggerFactory() ); if (real) { return binPackedPersistentQueryMetadata; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImpl.java index 2c29404e72e7..671e9871f583 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImpl.java @@ -49,7 +49,6 @@ import java.util.Set; import java.util.function.Function; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.LagInfo; @@ -83,8 +82,6 @@ public class BinPackedPersistentQueryMetadataImpl implements PersistentQueryMeta private final Listener listener; private final Function namedTopologyBuilder; private final TimeBoundedQueue queryErrors; - private final Optional metrics; - private final Optional restartSensor; private final MaterializationProviderBuilderFactory materializationProviderBuilderFactory; @@ -117,9 +114,7 @@ public BinPackedPersistentQueryMetadataImpl( final Optional scalablePushRegistry, final Function namedTopologyBuilder, final KeyFormat keyFormat, - final ProcessingLoggerFactory loggerFactory, - final Metrics metrics, - final Map metricsTags + final ProcessingLoggerFactory loggerFactory ) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck this.persistentQueryType = Objects.requireNonNull(persistentQueryType, "persistentQueryType"); @@ -146,9 +141,6 @@ public BinPackedPersistentQueryMetadataImpl( this.namedTopologyBuilder = requireNonNull(namedTopologyBuilder, "namedTopologyBuilder"); this.queryErrors = sharedKafkaStreamsRuntime.getNewQueryErrorQueue(); this.scalablePushRegistry = requireNonNull(scalablePushRegistry, "scalablePushRegistry"); - this.metrics = Optional.of(metrics); - this.restartSensor = Optional.of( - QueryMetricsUtil.createQueryRestartMetricSensor(queryId.toString(), metricsTags, metrics)); this.keyFormat = requireNonNull(keyFormat, "keyFormat"); this.loggerFactory = requireNonNull(loggerFactory, "loggerFactory"); } @@ -179,8 +171,6 @@ public BinPackedPersistentQueryMetadataImpl( this.queryErrors = sharedKafkaStreamsRuntime.getNewQueryErrorQueue(); this.scalablePushRegistry = original.scalablePushRegistry; this.namedTopologyBuilder = original.namedTopologyBuilder; - this.metrics = Optional.empty(); - this.restartSensor = Optional.empty(); this.keyFormat = original.keyFormat; this.loggerFactory = original.loggerFactory; } @@ -419,9 +409,6 @@ public void close() { sharedKafkaStreamsRuntime.stop(queryId, false); scalablePushRegistry.ifPresent(ScalablePushRegistry::close); listener.onClose(this); - if (metrics.isPresent() && restartSensor.isPresent()) { - metrics.get().removeSensor(restartSensor.get().name()); - } } @Override @@ -442,9 +429,4 @@ public void register() { Listener getListener() { return listener; } - - @Override - public Optional getRestartMetricsSensor() { - return restartSensor; - } } \ No newline at end of file diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java index 694154ba13d0..262f8485c304 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java @@ -41,7 +41,6 @@ import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; @@ -89,9 +88,7 @@ public PersistentQueryMetadataImpl( final long retryBackoffMaxMs, final QueryMetadata.Listener listener, final Optional scalablePushRegistry, - final ProcessingLoggerFactory loggerFactory, - final Metrics metrics, - final Map metricsTags + final ProcessingLoggerFactory loggerFactory ) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck super( @@ -111,9 +108,7 @@ public PersistentQueryMetadataImpl( retryBackoffInitialMs, retryBackoffMaxMs, new QueryListenerWrapper(listener, scalablePushRegistry), - loggerFactory, - metrics, - metricsTags + loggerFactory ); this.sinkDataSource = requireNonNull(sinkDataSource, "sinkDataSource"); this.schemas = requireNonNull(schemas, "schemas"); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java index 5c654b24d5e7..4f6e66ff29a1 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java @@ -23,9 +23,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; -import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.LagInfo; import org.apache.kafka.streams.StreamsMetadata; @@ -75,8 +73,6 @@ public interface QueryMetadata { KafkaStreams getKafkaStreams(); - Optional getRestartMetricsSensor(); - void close(); void start(); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java index b37de4798332..26e9b64c2ba8 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetadataImpl.java @@ -40,14 +40,11 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Collectors; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.LagInfo; @@ -59,11 +56,6 @@ import org.slf4j.LoggerFactory; public class QueryMetadataImpl implements QueryMetadata { - public static final String QUERY_RESTART_METRIC_NAME = "query-restart-total"; - public static final String QUERY_RESTART_METRIC_GROUP_NAME = "query-restart-metrics"; - public static final String QUERY_RESTART_METRIC_DESCRIPTION = - "The total number of times that a query thread has failed and then been restarted."; - private static final Logger LOG = LoggerFactory.getLogger(QueryMetadataImpl.class); private final String statementString; @@ -82,8 +74,6 @@ public class QueryMetadataImpl implements QueryMetadata { private final RetryEvent retryEvent; private final Listener listener; private final ProcessingLoggerFactory loggerFactory; - private final Optional metrics; - private final Optional restartSensor; private volatile boolean everStarted = false; private volatile KafkaStreams kafkaStreams; @@ -118,9 +108,7 @@ public long read() { final long baseWaitingTimeMs, final long retryBackoffMaxMs, final Listener listener, - final ProcessingLoggerFactory loggerFactory, - final Metrics metrics, - final Map metricsTags + final ProcessingLoggerFactory loggerFactory ) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck this.statementString = Objects.requireNonNull(statementString, "statementString"); @@ -143,15 +131,11 @@ public long read() { this.queryId = Objects.requireNonNull(queryId, "queryId"); this.errorClassifier = Objects.requireNonNull(errorClassifier, "errorClassifier"); this.queryErrors = new TimeBoundedQueue(Duration.ofHours(1), maxQueryErrorsQueueSize); - this.metrics = Optional.of(metrics); - this.restartSensor = Optional.of( - QueryMetricsUtil.createQueryRestartMetricSensor(queryId.toString(), metricsTags, metrics)); this.retryEvent = new RetryEvent( queryId, baseWaitingTimeMs, retryBackoffMaxMs, - CURRENT_TIME_MILLIS_TICKER, - restartSensor + CURRENT_TIME_MILLIS_TICKER ); this.loggerFactory = Objects.requireNonNull(loggerFactory, "loggerFactory"); } @@ -174,8 +158,6 @@ public long read() { this.errorClassifier = other.errorClassifier; this.everStarted = other.everStarted; this.queryErrors = new TimeBoundedQueue(Duration.ZERO, 0); - this.metrics = Optional.empty(); - this.restartSensor = Optional.empty(); this.retryEvent = new RetryEvent( other.getQueryId(), 0, @@ -185,8 +167,7 @@ public long read() { public long read() { return 0; } - }, - this.restartSensor + } ); this.listener = Objects.requireNonNull(listener, "stopListeners"); @@ -277,11 +258,6 @@ public Topology getTopology() { return topology; } - @Override - public Optional getRestartMetricsSensor() { - return restartSensor; - } - public Map> getAllLocalStorePartitionLags() { try { return kafkaStreams.allLocalStorePartitionLags(); @@ -386,9 +362,6 @@ public void close() { loggerFactory.getLoggersWithPrefix(queryId.toString()).forEach(ProcessingLogger::close); doClose(true); listener.onClose(this); - if (metrics.isPresent() && restartSensor.isPresent()) { - metrics.get().removeSensor(restartSensor.get().name()); - } } void doClose(final boolean cleanUp) { @@ -456,14 +429,12 @@ public static class RetryEvent implements QueryMetadata.RetryEvent { private long waitingTimeMs; private long expiryTimeMs; private long retryBackoffMaxMs; - private Optional sensor; RetryEvent( final QueryId queryId, final long baseWaitingTimeMs, final long retryBackoffMaxMs, - final Ticker ticker, - final Optional sensor + final Ticker ticker ) { this.ticker = ticker; this.queryId = queryId; @@ -473,7 +444,6 @@ public static class RetryEvent implements QueryMetadata.RetryEvent { this.waitingTimeMs = baseWaitingTimeMs; this.retryBackoffMaxMs = retryBackoffMaxMs; this.expiryTimeMs = now + baseWaitingTimeMs; - this.sensor = sensor; } public long nextRestartTimeMs() { @@ -495,7 +465,6 @@ public void backOff(final String threadName) { Thread.currentThread().interrupt(); } - sensor.ifPresent(Sensor::record); final int retries = numRetries.merge(threadName, 1, Integer::sum); LOG.info( diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetricsUtil.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetricsUtil.java deleted file mode 100644 index 3130f004ef0b..000000000000 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/QueryMetricsUtil.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2022 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.util; - -import java.util.Map; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.CumulativeSum; - -public final class QueryMetricsUtil { - - private QueryMetricsUtil() {} - // returns a metrics sensor that tracks the number of times a query was restarted when hitting - // an uncaught exception - - public static Sensor createQueryRestartMetricSensor( - final String queryId, - final Map metricsTags, - final Metrics metrics - ) { - final Map customMetricsTagsForQuery = - MetricsTagsUtil.getMetricsTagsWithQueryId(queryId, metricsTags); - final MetricName restartMetricName = metrics.metricName( - QueryMetadataImpl.QUERY_RESTART_METRIC_NAME, - QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME, - QueryMetadataImpl.QUERY_RESTART_METRIC_DESCRIPTION, - customMetricsTagsForQuery - ); - final Sensor sensor = metrics.sensor( - QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME + "-" + queryId); - sensor.add(restartMetricName, new CumulativeSum()); - return sensor; - } -} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java index e18bce448d82..aaf440b2262f 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java @@ -125,9 +125,6 @@ public StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse uncaughtHan if (queryInError != null) { queryInError.setQueryError(queryError); - if (queryInError.getRestartMetricsSensor().isPresent()) { - queryInError.getRestartMetricsSensor().get().record(); - } log.error(String.format( "Unhandled query exception caught in streams thread %s for query %s. (%s)", Thread.currentThread().getName(), @@ -138,9 +135,6 @@ public StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse uncaughtHan } else { for (BinPackedPersistentQueryMetadataImpl query : collocatedQueries.values()) { query.setQueryError(queryError); - if (query.getRestartMetricsSensor().isPresent()) { - query.getRestartMetricsSensor().get().record(); - } } log.error(String.format( diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java index b6bcb68e982b..e1a44205a0dd 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java @@ -61,9 +61,7 @@ public TransientQueryMetadata( final long retryBackoffInitialMs, final long retryBackoffMaxMs, final Listener listener, - final ProcessingLoggerFactory loggerFactory, - final Metrics metrics, - final Map metricsTags + final ProcessingLoggerFactory loggerFactory ) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck super( @@ -83,9 +81,7 @@ public TransientQueryMetadata( retryBackoffInitialMs, retryBackoffMaxMs, listener, - loggerFactory, - metrics, - metricsTags + loggerFactory ); this.rowQueue = Objects.requireNonNull(rowQueue, "rowQueue"); this.resultType = Objects.requireNonNull(resultType, "resultType"); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/internal/QueryStateMetricsReportingListenerTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/internal/QueryStateMetricsReportingListenerTest.java index ae68efdb5043..e7e5c65c2eca 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/internal/QueryStateMetricsReportingListenerTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/internal/QueryStateMetricsReportingListenerTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.streams.KafkaStreams.State; import org.junit.Before; import org.junit.Test; @@ -46,12 +47,18 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + @RunWith(MockitoJUnitRunner.class) public class QueryStateMetricsReportingListenerTest { private static final MetricName METRIC_NAME_1 = new MetricName("bob", "g1", "d1", ImmutableMap.of()); private static final MetricName METRIC_NAME_2 = new MetricName("dylan", "g1", "d1", ImmutableMap.of()); + private static final MetricName METRIC_NAME_3 = + new MetricName("steven", "g1", "d1", ImmutableMap.of()); private static final QueryId QUERY_ID = new QueryId("foo"); private static final String TAG = "_confluent-ksql-" + "some-prefix-" + "query_" + QUERY_ID.toString(); @@ -67,14 +74,17 @@ public class QueryStateMetricsReportingListenerTest { private ArgumentCaptor> gaugeCaptor; private QueryStateMetricsReportingListener listener; + private final Map metricsTags = Collections.singletonMap("tag1", "value1"); + @Before public void setUp() { when(metrics.metricName(any(), any(), any(), anyMap())) .thenReturn(METRIC_NAME_1) - .thenReturn(METRIC_NAME_2); + .thenReturn(METRIC_NAME_2) + .thenReturn(METRIC_NAME_3); when(query.getQueryId()).thenReturn(QUERY_ID); - listener = new QueryStateMetricsReportingListener(metrics, "some-prefix-"); + listener = new QueryStateMetricsReportingListener(metrics, "some-prefix-", metricsTags); } @Test @@ -86,17 +96,24 @@ public void shouldThrowOnNullParams() { public void shouldAddMetricOnCreation() { // When: listener.onCreate(serviceContext, metaStore, query); + final Map tags = new HashMap<>(metricsTags); + tags.put("status", TAG); // Then: verify(metrics).metricName("query-status", "some-prefix-ksql-queries", "The current status of the given query.", - ImmutableMap.of("status", TAG)); + tags); verify(metrics).metricName("error-status", "some-prefix-ksql-queries", "The current error status of the given query, if the state is in ERROR state", - ImmutableMap.of("status", TAG)); + tags); + tags.put("query-id", QUERY_ID.toString()); + verify(metrics).metricName(QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_NAME, "some-prefix-ksql-queries", + QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_DESCRIPTION, + tags); verify(metrics).addMetric(eq(METRIC_NAME_1), isA(Gauge.class)); verify(metrics).addMetric(eq(METRIC_NAME_2), isA(Gauge.class)); + verify(metrics).addMetric(eq(METRIC_NAME_3), isA(CumulativeSum.class)); } @Test @@ -123,20 +140,26 @@ public void shouldGracefullyHandleErrorCallbackAfterDeregister() { public void shouldAddMetricWithSuppliedPrefix() { // Given: final String groupPrefix = "some-prefix-"; + final Map tags = new HashMap<>(metricsTags); + tags.put("status", TAG); clearInvocations(metrics); // When: - listener = new QueryStateMetricsReportingListener(metrics, groupPrefix); + listener = new QueryStateMetricsReportingListener(metrics, groupPrefix, metricsTags); listener.onCreate(serviceContext, metaStore, query); // Then: verify(metrics).metricName("query-status", groupPrefix + "ksql-queries", "The current status of the given query.", - ImmutableMap.of("status", TAG)); + tags); verify(metrics).metricName("error-status", groupPrefix + "ksql-queries", "The current error status of the given query, if the state is in ERROR state", - ImmutableMap.of("status", TAG)); + tags); + tags.put("query-id", QUERY_ID.toString()); + verify(metrics).metricName(QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_NAME, "some-prefix-ksql-queries", + QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_DESCRIPTION, + tags); } @Test @@ -178,6 +201,7 @@ public void shouldRemoveMetricOnClose() { // Then: verify(metrics).removeMetric(METRIC_NAME_1); verify(metrics).removeMetric(METRIC_NAME_2); + verify(metrics).removeMetric(METRIC_NAME_3); } private String currentGaugeValue(final MetricName name) { diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImplTest.java index a51dfb136713..8147c14312fa 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/BinPackedPersistentQueryMetadataImplTest.java @@ -104,17 +104,12 @@ public class BinPackedPersistentQueryMetadataImplTest { private MaterializationProviderBuilderFactory.MaterializationProviderBuilder materializationProviderBuilder; @Mock private MeteredProcessingLoggerFactory loggerFactory; - @Mock - private Metrics metrics; - @Mock - private Sensor sensor; private Map metricsTags = Collections.singletonMap("tag1", "value1"); private PersistentQueryMetadata query; @Before public void setUp() { - when(metrics.sensor(anyString())).thenReturn(sensor); query = new BinPackedPersistentQueryMetadataImpl( KsqlConstants.PersistentQueryType.CREATE_AS, SQL, @@ -136,9 +131,7 @@ public void setUp() { scalablePushRegistry, (runtime) -> topology, keyFormat, - loggerFactory, - metrics, - metricsTags); + loggerFactory); query.initialize(); when(materializationProviderBuilderFactory.materializationProviderBuilder( @@ -190,9 +183,8 @@ public void shouldCallKafkaStreamsCloseOnStop() { } @Test - public void shouldRemoveMetricWhenClose() { + public void shouldCloseProcessingLoggers() { // Given: - final Sensor restartSensor = query.getRestartMetricsSensor().orElseGet(() -> mock(Sensor.class)); final ProcessingLogger processingLogger1 = mock(ProcessingLogger.class); final ProcessingLogger processingLogger2 = mock(ProcessingLogger.class); when(loggerFactory.getLoggersWithPrefix(QUERY_ID.toString())).thenReturn(Arrays.asList(processingLogger1, processingLogger2)); @@ -201,7 +193,6 @@ public void shouldRemoveMetricWhenClose() { query.close(); // Then: - verify(metrics).removeSensor(restartSensor.name()); verify(processingLogger1).close(); verify(processingLogger2).close(); } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java index 3f533f783f5b..4c8f099fa9eb 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java @@ -141,9 +141,7 @@ public void setUp() { 0L, listener, Optional.of(scalablePushRegistry), - processingLoggerFactory, - metrics, - Collections.emptyMap() + processingLoggerFactory ); query.initialize(); @@ -176,9 +174,7 @@ public void shouldReturnInsertQueryType() { 0L, listener, Optional.empty(), - processingLoggerFactory, - metrics, - Collections.emptyMap() + processingLoggerFactory ); // When/Then @@ -212,9 +208,7 @@ public void shouldReturnCreateAsQueryType() { 0L, listener, Optional.empty(), - processingLoggerFactory, - metrics, - Collections.emptyMap() + processingLoggerFactory ); // When/Then diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/QueryMetadataTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/QueryMetadataTest.java index 8512da612893..ca1247b82fd6 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/QueryMetadataTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/QueryMetadataTest.java @@ -100,16 +100,11 @@ public class QueryMetadataTest { private Ticker ticker; @Mock private MeteredProcessingLoggerFactory loggerFactory; - @Mock - private Sensor mockSensor; private QueryMetadataImpl query; - private MetricCollectors metricCollectors; - private final Map metricsTags = Collections.singletonMap("cluster-id", "cluster-1"); @Before public void setup() { - metricCollectors = new MetricCollectors(); when(kafkaStreamsBuilder.build(topoplogy, Collections.emptyMap())).thenReturn(kafkaStreams); when(classifier.classify(any())).thenReturn(Type.UNKNOWN); when(kafkaStreams.state()).thenReturn(State.NOT_RUNNING); @@ -131,9 +126,7 @@ public void setup() { 0L, 0L, listener, - loggerFactory, - metricCollectors.getMetrics(), - metricsTags + loggerFactory ){ }; query.initialize(); @@ -282,8 +275,7 @@ public void shouldRetryEventStartWithInitialValues() { QUERY_ID, RETRY_BACKOFF_INITIAL_MS, RETRY_BACKOFF_MAX_MS, - ticker, - Optional.of(mockSensor) + ticker ); // Then: @@ -302,8 +294,7 @@ public void shouldRetryEventRestartAndIncrementBackoffTime() { QUERY_ID, RETRY_BACKOFF_INITIAL_MS, RETRY_BACKOFF_MAX_MS, - ticker, - Optional.of(mockSensor) + ticker ); retryEvent.backOff("thread-name"); @@ -328,8 +319,7 @@ public void shouldRetryEventRestartAndNotExceedBackoffMaxTime() { QUERY_ID, RETRY_BACKOFF_INITIAL_MS, RETRY_BACKOFF_MAX_MS, - ticker, - Optional.of(mockSensor) + ticker ); retryEvent.backOff("thread-name"); retryEvent.backOff("thread-name"); @@ -367,56 +357,4 @@ public void shouldCloseProcessingLoggers() { verify(processingLogger1).close(); verify(processingLogger2).close(); } - - @Test - public void shouldRecordMetricForEachStreamsThreadRestarted() { - // Given: - final long now = 20; - when(ticker.read()).thenReturn(now); - - // When: - final QueryMetadataImpl.RetryEvent retryEvent = new QueryMetadataImpl.RetryEvent( - QUERY_ID, - RETRY_BACKOFF_INITIAL_MS, - RETRY_BACKOFF_MAX_MS, - ticker, - query.getRestartMetricsSensor() - ); - retryEvent.backOff("thread-name"); - retryEvent.backOff("thread-name"); - retryEvent.backOff("thread-name-2"); - - // Then: - assertThat(getMetricValue(QUERY_ID.toString(), metricsTags), is(3.0)); - } - - @Test - public void shouldIncrementMetricWhenUncaughtExceptionAndRestart() { - // Given: - when(classifier.classify(any())).thenThrow(new RuntimeException("bar")); - - // When: - query.uncaughtHandler(new RuntimeException("foo")); - query.uncaughtHandler(new RuntimeException("bar")); - query.uncaughtHandler(new RuntimeException("too")); - - - // Then: - assertThat(getMetricValue(QUERY_ID.toString(), metricsTags), is(3.0)); - } - - private double getMetricValue(final String queryId, final Map metricsTags) { - final Map customMetricsTags = new HashMap<>(metricsTags); - customMetricsTags.put("query-id", queryId); - final Metrics metrics = metricCollectors.getMetrics(); - return Double.parseDouble( - metrics.metric( - metrics.metricName( - QueryMetadataImpl.QUERY_RESTART_METRIC_NAME, - QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME, - QueryMetadataImpl.QUERY_RESTART_METRIC_DESCRIPTION, - customMetricsTags) - ).metricValue().toString() - ); - } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataImplTest.java index bdba5e082418..e98671462fac 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataImplTest.java @@ -132,9 +132,7 @@ public void setUp() { 0L, listener, Optional.empty(), - processingLoggerFactory, - metrics, - Collections.emptyMap() + processingLoggerFactory ); query.initialize(); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java index 27e83cd45157..14e0865681e2 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java @@ -72,10 +72,6 @@ public class SharedKafkaStreamsRuntimeImplTest { private AddNamedTopologyResult addNamedTopologyResult; @Mock private KafkaFuture future; - @Mock - private Sensor restartErrorSensorQuery1; - @Mock - private Sensor restartErrorSensorQuery2; private final QueryId queryId = new QueryId("query-1"); private final QueryId queryId2= new QueryId("query-2"); @@ -113,8 +109,6 @@ public void setUp() throws Exception { when(binPackedPersistentQueryMetadata.getTopologyCopy(any())).thenReturn(namedTopology); when(binPackedPersistentQueryMetadata.getQueryId()).thenReturn(queryId); when(binPackedPersistentQueryMetadata2.getQueryId()).thenReturn(queryId2); - when(binPackedPersistentQueryMetadata.getRestartMetricsSensor()).thenReturn(Optional.of(restartErrorSensorQuery1)); - when(binPackedPersistentQueryMetadata2.getRestartMetricsSensor()).thenReturn(Optional.of(restartErrorSensorQuery2)); sharedKafkaStreamsRuntimeImpl.register( binPackedPersistentQueryMetadata @@ -263,42 +257,4 @@ public void shouldNotStartOrAddedToStreamsIfOnlyRegistered() { verify(kafkaStreamsNamedTopologyWrapper, never()) .addNamedTopology(binPackedPersistentQueryMetadata2.getTopologyCopy(sharedKafkaStreamsRuntimeImpl)); } - - @Test - public void shouldRecordMetricForQuery1WhenError() { - //Given: - when(queryErrorClassifier.classify(query1Exception)).thenReturn(Type.USER); - sharedKafkaStreamsRuntimeImpl.register( - binPackedPersistentQueryMetadata2 - ); - - //When: - sharedKafkaStreamsRuntimeImpl.start(queryId); - sharedKafkaStreamsRuntimeImpl.start(queryId2); - sharedKafkaStreamsRuntimeImpl.uncaughtHandler(query1Exception); - - //Then: - verify(restartErrorSensorQuery1, times(1)).record(); - verify(restartErrorSensorQuery2, never()).record(); - } - - @Test - public void shouldRecordMetricForAllQueriesWhenErrorWithNoTask() { - when(queryErrorClassifier.classify(runtimeExceptionWithNoTask)).thenReturn(Type.USER); - - sharedKafkaStreamsRuntimeImpl.register( - binPackedPersistentQueryMetadata2 - ); - - //When: - sharedKafkaStreamsRuntimeImpl.start(queryId); - sharedKafkaStreamsRuntimeImpl.start(queryId2); - - sharedKafkaStreamsRuntimeImpl.uncaughtHandler(runtimeExceptionWithNoTask); - sharedKafkaStreamsRuntimeImpl.uncaughtHandler(runtimeExceptionWithNoTask); - - //Then: - verify(restartErrorSensorQuery1, times(2)).record(); - verify(restartErrorSensorQuery2, times(2)).record(); - } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java index b50a6ec6b2f4..82ce1de65574 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java @@ -80,10 +80,6 @@ public class TransientQueryMetadataTest { private Listener listener; @Mock private MeteredProcessingLoggerFactory loggerFactory; - @Mock - private Metrics metrics; - @Mock - private Sensor sensor; private TransientQueryMetadata query; @@ -92,7 +88,6 @@ public void setUp() { when(kafkaStreamsBuilder.build(any(), any())).thenReturn(kafkaStreams); when(kafkaStreams.state()).thenReturn(State.NOT_RUNNING); when(sourceNames.toArray()).thenReturn(new SourceName[0]); - when(metrics.sensor(any())).thenReturn(sensor); query = new TransientQueryMetadata( SQL, @@ -112,9 +107,7 @@ public void setUp() { 0L, 0L, listener, - loggerFactory, - metrics, - Collections.emptyMap() + loggerFactory ); query.initialize(); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java index 436db38df3cf..55e13493e5e7 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java @@ -118,8 +118,6 @@ public class QueryDescriptionFactoryTest { private MeteredProcessingLoggerFactory processingLoggerFactory; @Mock private Metrics metrics; - @Mock - private Sensor sensor; private QueryMetadata transientQuery; private PersistentQueryMetadata persistentQuery; @@ -128,7 +126,6 @@ public class QueryDescriptionFactoryTest { @Before public void setUp() { - when(metrics.sensor(any())).thenReturn(sensor); when(topology.describe()).thenReturn(topologyDescription); when(kafkaStreamsBuilder.build(any(), any())).thenReturn(queryStreams); when(queryStreams.metadataForLocalThreads()).thenReturn(Collections.emptySet()); @@ -188,9 +185,7 @@ public void setUp() { 0L, listener, Optional.empty(), - processingLoggerFactory, - metrics, - Collections.emptyMap() + processingLoggerFactory ); persistentQuery.initialize(); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/QueryRestartMetricFunctionalTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/QueryRestartMetricFunctionalTest.java index f2f1f58d7dfa..b073b9a39477 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/QueryRestartMetricFunctionalTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/QueryRestartMetricFunctionalTest.java @@ -20,23 +20,20 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.integration.IntegrationTestHarness; -import io.confluent.ksql.rest.entity.KsqlEntity; -import io.confluent.ksql.rest.entity.Queries; -import io.confluent.ksql.rest.entity.RunningQuery; +import io.confluent.ksql.internal.QueryStateMetricsReportingListener; import io.confluent.ksql.rest.server.TestKsqlRestApp; import io.confluent.ksql.util.KsqlConfig; -import java.util.Collections; + import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import io.confluent.ksql.util.QueryMetadataImpl; -import io.confluent.ksql.util.QueryMetricsUtil; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.KafkaMetric; @@ -45,10 +42,8 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Ignore; -import org.junit.Rule; import org.junit.Test; import org.junit.rules.RuleChain; -import org.junit.rules.Timeout; @Ignore public class QueryRestartMetricFunctionalTest { @@ -117,22 +112,30 @@ public void shouldVerifyMetricsOnNonSharedRuntimeServer() { final List listOfQueryId = RestIntegrationTestUtil.getQueryIds(REST_APP_NO_SHARED_RUNTIME); assertThat(listOfQueryId.size(), equalTo(1)); metricsTagsForQuery.put("query-id", listOfQueryId.get(0)); + metricsTagsForQuery.put("ksql_service_id", KsqlConfig.KSQL_SERVICE_ID_DEFAULT); // When: TEST_HARNESS.produceRecord(TEST_TOPIC_NAME, null, "5,900.1"); TEST_HARNESS.produceRecord(TEST_TOPIC_NAME, null, "5,900.1"); TEST_HARNESS.produceRecord(TEST_TOPIC_NAME, null, "5,900.1"); - System.out.println("records done"); metricsNoSharedRuntime = ((KsqlEngine)REST_APP_NO_SHARED_RUNTIME.getEngine()).getEngineMetrics().getMetrics(); - final KafkaMetric restartMetric1 = getKafkaMetric(metricsNoSharedRuntime, metricsTagsForQuery); + final KafkaMetric restartMetric = getKafkaMetric(metricsNoSharedRuntime, metricsTagsForQuery, "app-0-"); // Then: - assertThatEventually(() -> (Double) restartMetric1.metricValue(), greaterThan(8.0)); + assertThatEventually(() -> (Double) restartMetric.metricValue(), greaterThan(8.0)); + + // should clean up metrics when queries are terminated + for (final String queryId:listOfQueryId) { + RestIntegrationTestUtil.makeKsqlRequest(REST_APP_NO_SHARED_RUNTIME, "terminate " + queryId + ";"); + } + + final KafkaMetric restartMetricAfterTerminate = getKafkaMetric(metricsNoSharedRuntime, metricsTagsForQuery, "app-0-"); + assertThat(restartMetricAfterTerminate, nullValue()); } @Test - public void shouldVerifyMetricsOnSharedRuntimeServer() throws InterruptedException { + public void shouldVerifyMetricsOnSharedRuntimeServer() { // Given: final Map metricsTagsForQuery1 = new HashMap<>(METRICS_TAGS); final Map metricsTagsForQuery2 = new HashMap<>(METRICS_TAGS); @@ -141,8 +144,10 @@ public void shouldVerifyMetricsOnSharedRuntimeServer() throws InterruptedExcepti for (final String queryId:listOfQueryId) { if (queryId.toLowerCase().contains("s2")) { metricsTagsForQuery1.put("query-id", queryId); + metricsTagsForQuery1.put("ksql_service_id", "another-id"); } else if (queryId.toLowerCase().contains("s3")) { metricsTagsForQuery2.put("query-id", queryId); + metricsTagsForQuery2.put("ksql_service_id", "another-id"); } } @@ -150,19 +155,30 @@ public void shouldVerifyMetricsOnSharedRuntimeServer() throws InterruptedExcepti TEST_HARNESS.produceRecord(TEST_TOPIC_NAME2, null, "5,900.1"); TEST_HARNESS.produceRecord(TEST_TOPIC_NAME2, null, "5,900.1"); metricsSharedRuntime = ((KsqlEngine)REST_APP_SHARED_RUNTIME.getEngine()).getEngineMetrics().getMetrics(); - final KafkaMetric restartMetric1 = getKafkaMetric(metricsSharedRuntime, metricsTagsForQuery1); - final KafkaMetric restartMetric2 = getKafkaMetric(metricsSharedRuntime, metricsTagsForQuery2); + final KafkaMetric restartMetric1 = getKafkaMetric(metricsSharedRuntime, metricsTagsForQuery1, "app-1-"); + final KafkaMetric restartMetric2 = getKafkaMetric(metricsSharedRuntime, metricsTagsForQuery2, "app-1-"); // Then: assertThatEventually(() -> (Double) restartMetric1.metricValue(), greaterThanOrEqualTo(1.0)); assertThatEventually(() -> (Double) restartMetric2.metricValue(), greaterThanOrEqualTo(1.0)); + + + // should clean up metrics when queries are terminated + for (final String queryId:listOfQueryId) { + RestIntegrationTestUtil.makeKsqlRequest(REST_APP_SHARED_RUNTIME, "terminate " + queryId + ";"); + } + + final KafkaMetric restartMetricAfterTerminate1 = getKafkaMetric(metricsSharedRuntime, metricsTagsForQuery1, "app-1-"); + final KafkaMetric restartMetricAfterTerminate2 = getKafkaMetric(metricsSharedRuntime, metricsTagsForQuery2, "app-1-"); + assertThat(restartMetricAfterTerminate1, nullValue()); + assertThat(restartMetricAfterTerminate2, nullValue()); } - private KafkaMetric getKafkaMetric(final Metrics metrics, final Map metricsTags) { + private KafkaMetric getKafkaMetric(final Metrics metrics, final Map metricsTags, final String metricPrefix) { return metrics.metric(new MetricName( - QueryMetadataImpl.QUERY_RESTART_METRIC_NAME, - QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME, - QueryMetadataImpl.QUERY_RESTART_METRIC_DESCRIPTION, + QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_NAME, + metricPrefix + "ksql-queries", + QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_DESCRIPTION, metricsTags )); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 8fcc58b7c310..a9d7bdee7ab8 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -211,8 +211,6 @@ public class StreamedQueryResourceTest { private MeteredProcessingLoggerFactory loggerFactory; @Mock private Metrics metrics; - @Mock - private Sensor sensor; private StreamedQueryResource testResource; private PreparedStatement invalid; @@ -223,7 +221,6 @@ public class StreamedQueryResourceTest { @Before public void setup() { when(serviceContext.getTopicClient()).thenReturn(mockKafkaTopicClient); - when(metrics.sensor(any())).thenReturn(sensor); query = PreparedStatement.of(PUSH_QUERY_STRING, mock(Query.class)); invalid = PreparedStatement.of("sql", mock(Statement.class)); when(mockStatementParser.parseSingleStatement(PUSH_QUERY_STRING)).thenReturn(invalid);