diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java index fa8e81438bd0..8d60e8f70aa9 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java @@ -627,9 +627,7 @@ private SharedKafkaStreamsRuntime getKafkaStreamsInstance( metricCollectors, config.getConfig(true), processingLogContext - ), - metricCollectors.getMetrics(), - ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS) + ) ); } else { stream = new SandboxedSharedKafkaStreamsRuntimeImpl( diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java index b7c40a765426..e18bce448d82 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java @@ -30,7 +30,6 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.StateListener; import org.apache.kafka.streams.StreamsConfig; @@ -55,9 +54,7 @@ public SharedKafkaStreamsRuntimeImpl(final KafkaStreamsBuilder kafkaStreamsBuild final QueryErrorClassifier errorClassifier, final int maxQueryErrorsQueueSize, final long shutdownTimeoutConfig, - final Map streamsProperties, - final Metrics metrics, - final Map metricsTags) { + final Map streamsProperties) { super( kafkaStreamsBuilder, streamsProperties diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java index 3fc0e610ec4e..3f533f783f5b 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/PersistentQueryMetadataTest.java @@ -18,6 +18,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -47,6 +48,7 @@ import java.util.Optional; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.Topology; @@ -101,6 +103,8 @@ public class PersistentQueryMetadataTest { private Metrics metrics; @Mock private MeteredProcessingLoggerFactory processingLoggerFactory; + @Mock + private Sensor sensor; private PersistentQueryMetadata query; @@ -111,6 +115,7 @@ public void setUp() { when(materializationProviderBuilder.apply(kafkaStreams, topology)) .thenReturn(Optional.of(materializationProvider)); when(kafkaStreams.state()).thenReturn(State.NOT_RUNNING); + when(metrics.sensor(anyString())).thenReturn(sensor); query = new PersistentQueryMetadataImpl( KsqlConstants.PersistentQueryType.CREATE_AS, diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataImplTest.java index d118971f4486..bdba5e082418 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SandboxedPersistentQueryMetadataImplTest.java @@ -16,6 +16,7 @@ package io.confluent.ksql.util; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -40,6 +41,7 @@ import java.util.function.Consumer; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.Topology; import org.junit.Before; @@ -93,6 +95,8 @@ public class SandboxedPersistentQueryMetadataImplTest { private MeteredProcessingLoggerFactory processingLoggerFactory; @Mock private Metrics metrics; + @Mock + private Sensor sensor; private SandboxedPersistentQueryMetadataImpl sandbox; @@ -102,6 +106,7 @@ public void setUp() { when(physicalSchema.logicalSchema()).thenReturn(mock(LogicalSchema.class)); when(materializationProviderBuilder.apply(kafkaStreams, topology)) .thenReturn(Optional.of(materializationProvider)); + when(metrics.sensor(anyString())).thenReturn(sensor); final PersistentQueryMetadataImpl query = new PersistentQueryMetadataImpl( KsqlConstants.PersistentQueryType.CREATE_AS, diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java index fcef88305103..27e83cd45157 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImplTest.java @@ -47,6 +47,7 @@ import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -72,7 +73,9 @@ public class SharedKafkaStreamsRuntimeImplTest { @Mock private KafkaFuture future; @Mock - private Sensor sensor; + private Sensor restartErrorSensorQuery1; + @Mock + private Sensor restartErrorSensorQuery2; private final QueryId queryId = new QueryId("query-1"); private final QueryId queryId2= new QueryId("query-2"); @@ -89,11 +92,9 @@ public class SharedKafkaStreamsRuntimeImplTest { new StreamsException("query down!", new TaskId(0, 0, "not-a-real-query")); private SharedKafkaStreamsRuntimeImpl sharedKafkaStreamsRuntimeImpl; - private MetricCollectors metricCollectors; @Before public void setUp() throws Exception { - metricCollectors = new MetricCollectors(); when(kafkaStreamsBuilder.buildNamedTopologyWrapper(any())).thenReturn(kafkaStreamsNamedTopologyWrapper).thenReturn(kafkaStreamsNamedTopologyWrapper2); streamProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "runtime"); streamProps.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "old"); @@ -102,9 +103,7 @@ public void setUp() throws Exception { queryErrorClassifier, 5, 300_000L, - streamProps, - metricCollectors.getMetrics(), - metricsTags + streamProps ); when(kafkaStreamsNamedTopologyWrapper.getTopologyByName(any())).thenReturn(Optional.empty()); @@ -114,6 +113,8 @@ public void setUp() throws Exception { when(binPackedPersistentQueryMetadata.getTopologyCopy(any())).thenReturn(namedTopology); when(binPackedPersistentQueryMetadata.getQueryId()).thenReturn(queryId); when(binPackedPersistentQueryMetadata2.getQueryId()).thenReturn(queryId2); + when(binPackedPersistentQueryMetadata.getRestartMetricsSensor()).thenReturn(Optional.of(restartErrorSensorQuery1)); + when(binPackedPersistentQueryMetadata2.getRestartMetricsSensor()).thenReturn(Optional.of(restartErrorSensorQuery2)); sharedKafkaStreamsRuntimeImpl.register( binPackedPersistentQueryMetadata @@ -277,8 +278,8 @@ public void shouldRecordMetricForQuery1WhenError() { sharedKafkaStreamsRuntimeImpl.uncaughtHandler(query1Exception); //Then: - assertThat(getMetricValue(queryId.toString(), metricsTags), is(1.0)); - assertThat(getMetricValue(queryId2.toString(), metricsTags), is(0.0)); + verify(restartErrorSensorQuery1, times(1)).record(); + verify(restartErrorSensorQuery2, never()).record(); } @Test @@ -293,25 +294,11 @@ public void shouldRecordMetricForAllQueriesWhenErrorWithNoTask() { sharedKafkaStreamsRuntimeImpl.start(queryId); sharedKafkaStreamsRuntimeImpl.start(queryId2); + sharedKafkaStreamsRuntimeImpl.uncaughtHandler(runtimeExceptionWithNoTask); sharedKafkaStreamsRuntimeImpl.uncaughtHandler(runtimeExceptionWithNoTask); //Then: - assertThat(getMetricValue(queryId.toString(), metricsTags), is(1.0)); - assertThat(getMetricValue(queryId2.toString(), metricsTags), is(1.0)); - } - - private double getMetricValue(final String queryId, final Map metricsTags) { - final Map customMetricsTags = new HashMap<>(metricsTags); - customMetricsTags.put("query-id", queryId); - final Metrics metrics = metricCollectors.getMetrics(); - return Double.parseDouble( - metrics.metric( - metrics.metricName( - QueryMetadataImpl.QUERY_RESTART_METRIC_NAME, - QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME, - QueryMetadataImpl.QUERY_RESTART_METRIC_DESCRIPTION, - customMetricsTags) - ).metricValue().toString() - ); + verify(restartErrorSensorQuery1, times(2)).record(); + verify(restartErrorSensorQuery2, times(2)).record(); } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java index 0a961ec0a1cc..b50a6ec6b2f4 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java @@ -37,6 +37,7 @@ import java.util.function.Consumer; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.Topology; @@ -81,6 +82,8 @@ public class TransientQueryMetadataTest { private MeteredProcessingLoggerFactory loggerFactory; @Mock private Metrics metrics; + @Mock + private Sensor sensor; private TransientQueryMetadata query; @@ -89,6 +92,7 @@ public void setUp() { when(kafkaStreamsBuilder.build(any(), any())).thenReturn(kafkaStreams); when(kafkaStreams.state()).thenReturn(State.NOT_RUNNING); when(sourceNames.toArray()).thenReturn(new SourceName[0]); + when(metrics.sensor(any())).thenReturn(sensor); query = new TransientQueryMetadata( SQL,