Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add metric for query restarts #9045

Merged
merged 7 commits into from
May 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public QueryEventListener getQueryEventListener() {
final String metricsPrefix
= metricGroupPrefix.equals(KsqlEngineMetrics.DEFAULT_METRIC_GROUP_PREFIX)
? "" : metricGroupPrefix;
return new QueryStateMetricsReportingListener(metrics, metricsPrefix);
return new QueryStateMetricsReportingListener(metrics, metricsPrefix, newCustomMetricsTags);
}

private void recordMessageConsumptionByQueryStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.QueryMetadata;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.streams.KafkaStreams.State;

public class QueryStateMetricsReportingListener implements QueryEventListener {
public static final String QUERY_RESTART_METRIC_NAME = "query-restart-total";
public static final String QUERY_RESTART_METRIC_DESCRIPTION =
"The total number of times that a query thread has failed and then been restarted.";

public static final Ticker CURRENT_TIME_MILLIS_TICKER = new Ticker() {
@Override
Expand All @@ -42,12 +48,18 @@ public long read() {

private final Metrics metrics;
private final String metricsPrefix;
private final Map<String, String> metricsTags;
private final ConcurrentMap<QueryId, PerQueryListener> perQuery = new ConcurrentHashMap<>();

QueryStateMetricsReportingListener(final Metrics metrics, final String metricsPrefix) {
QueryStateMetricsReportingListener(
final Metrics metrics,
final String metricsPrefix,
final Map<String, String> metricsTags
) {
this.metrics = Objects.requireNonNull(metrics, "metrics");
this.metricsPrefix
= Objects.requireNonNull(metricsPrefix, "metricGroupPrefix");
this.metricsTags = Objects.requireNonNull(metricsTags);
}

@Override
Expand All @@ -60,7 +72,12 @@ public void onCreate(
}
perQuery.put(
queryMetadata.getQueryId(),
new PerQueryListener(metrics, metricsPrefix, queryMetadata.getQueryId().toString())
new PerQueryListener(
metrics,
metricsPrefix,
queryMetadata.getQueryId().toString(),
metricsTags
)
);
}

Expand Down Expand Up @@ -96,6 +113,8 @@ private static class PerQueryListener {
private final Metrics metrics;
private final MetricName stateMetricName;
private final MetricName errorMetricName;
private final MetricName queryRestartMetricName;
private final CumulativeSum queryRestartSum;
private final Ticker ticker;

private volatile String state = "-";
Expand All @@ -104,16 +123,18 @@ private static class PerQueryListener {
PerQueryListener(
final Metrics metrics,
final String groupPrefix,
final String queryId
final String queryId,
final Map<String, String> metricsTags
) {
this(metrics, groupPrefix, queryId, CURRENT_TIME_MILLIS_TICKER);
this(metrics, groupPrefix, queryId, CURRENT_TIME_MILLIS_TICKER, metricsTags);
}

PerQueryListener(
final Metrics metrics,
final String groupPrefix,
final String queryId,
final Ticker ticker
final Ticker ticker,
final Map<String, String> metricsTags
) {
Objects.requireNonNull(groupPrefix, "groupPrefix");
Objects.requireNonNull(queryId, "queryId");
Expand All @@ -124,20 +145,34 @@ private static class PerQueryListener {

final String tag = "_confluent-ksql-" + groupPrefix + type + queryId;

final Map<String, String> tagsForStateAndError = new HashMap<>(metricsTags);
tagsForStateAndError.put("status", tag);
this.stateMetricName = metrics.metricName(
"query-status",
groupPrefix + "ksql-queries",
"The current status of the given query.",
Collections.singletonMap("status", tag));
tagsForStateAndError
);

errorMetricName = metrics.metricName(
"error-status",
groupPrefix + "ksql-queries",
"The current error status of the given query, if the state is in ERROR state",
Collections.singletonMap("status", tag)
tagsForStateAndError
);

final Map<String, String> restartTags = new HashMap<>(tagsForStateAndError);
restartTags.put("query-id", queryId);
queryRestartMetricName = metrics.metricName(
QUERY_RESTART_METRIC_NAME,
groupPrefix + "ksql-queries",
QUERY_RESTART_METRIC_DESCRIPTION,
restartTags
);
this.queryRestartSum = new CumulativeSum();
this.metrics.addMetric(stateMetricName, (Gauge<String>) (config, now) -> state);
this.metrics.addMetric(errorMetricName, (Gauge<String>) (config, now) -> error);
this.metrics.addMetric(queryRestartMetricName, queryRestartSum);
}

public void onChange(final State newState, final State oldState) {
Expand All @@ -150,11 +185,13 @@ public void onChange(final State newState, final State oldState) {

public void onError(final QueryError observedError) {
error = observedError.getType().name();
queryRestartSum.record(new MetricConfig(), 1, System.currentTimeMillis());
}

public void onDeregister() {
metrics.removeMetric(stateMetricName);
metrics.removeMetric(errorMetricName);
metrics.removeMetric(queryRestartMetricName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ interface RetryEvent {

long nextRestartTimeMs();

int getNumRetries();
int getNumRetries(String threadName);

void backOff();
void backOff(String threadName);
}

interface Listener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import org.apache.kafka.streams.KafkaStreams;
Expand All @@ -55,7 +56,6 @@
import org.slf4j.LoggerFactory;

public class QueryMetadataImpl implements QueryMetadata {

private static final Logger LOG = LoggerFactory.getLogger(QueryMetadataImpl.class);

private final String statementString;
Expand All @@ -74,6 +74,7 @@ public class QueryMetadataImpl implements QueryMetadata {
private final RetryEvent retryEvent;
private final Listener listener;
private final ProcessingLoggerFactory loggerFactory;

private volatile boolean everStarted = false;
private volatile KafkaStreams kafkaStreams;
// These fields don't need synchronization because they are initialized in initialize() before
Expand Down Expand Up @@ -210,7 +211,7 @@ protected StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse uncaught
e
);
}
retryEvent.backOff();
retryEvent.backOff(Thread.currentThread().getName());
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
}

Expand Down Expand Up @@ -424,7 +425,7 @@ public static class RetryEvent implements QueryMetadata.RetryEvent {
private final Ticker ticker;
private final QueryId queryId;

private int numRetries = 0;
private Map<String, Integer> numRetries = new ConcurrentHashMap<>();
private long waitingTimeMs;
private long expiryTimeMs;
private long retryBackoffMaxMs;
Expand All @@ -449,13 +450,11 @@ public long nextRestartTimeMs() {
return expiryTimeMs;
}

public int getNumRetries() {
return numRetries;
public int getNumRetries(final String threadName) {
return numRetries.getOrDefault(threadName, 0);
}

public void backOff() {
numRetries++;

public void backOff(final String threadName) {
final long now = ticker.read();

this.waitingTimeMs = getWaitingTimeMs();
Expand All @@ -466,7 +465,13 @@ public void backOff() {
Thread.currentThread().interrupt();
}

LOG.info("Restarting query {} (attempt #{})", queryId, numRetries);
final int retries = numRetries.merge(threadName, 1, Integer::sum);

LOG.info(
"Restarting query {} thread {} (attempt #{})",
queryId,
threadName,
retries);

// Math.max() prevents overflow if now is Long.MAX_VALUE (found just in tests)
this.expiryTimeMs = Math.max(now, now + waitingTimeMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.streams.KafkaStreams.State;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -46,12 +47,18 @@
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

@RunWith(MockitoJUnitRunner.class)
public class QueryStateMetricsReportingListenerTest {
private static final MetricName METRIC_NAME_1 =
new MetricName("bob", "g1", "d1", ImmutableMap.of());
private static final MetricName METRIC_NAME_2 =
new MetricName("dylan", "g1", "d1", ImmutableMap.of());
private static final MetricName METRIC_NAME_3 =
new MetricName("steven", "g1", "d1", ImmutableMap.of());
private static final QueryId QUERY_ID = new QueryId("foo");
private static final String TAG = "_confluent-ksql-" + "some-prefix-" + "query_" + QUERY_ID.toString();

Expand All @@ -67,14 +74,17 @@ public class QueryStateMetricsReportingListenerTest {
private ArgumentCaptor<Gauge<String>> gaugeCaptor;
private QueryStateMetricsReportingListener listener;

private final Map<String, String> metricsTags = Collections.singletonMap("tag1", "value1");

@Before
public void setUp() {
when(metrics.metricName(any(), any(), any(), anyMap()))
.thenReturn(METRIC_NAME_1)
.thenReturn(METRIC_NAME_2);
.thenReturn(METRIC_NAME_2)
.thenReturn(METRIC_NAME_3);
when(query.getQueryId()).thenReturn(QUERY_ID);

listener = new QueryStateMetricsReportingListener(metrics, "some-prefix-");
listener = new QueryStateMetricsReportingListener(metrics, "some-prefix-", metricsTags);
}

@Test
Expand All @@ -86,17 +96,24 @@ public void shouldThrowOnNullParams() {
public void shouldAddMetricOnCreation() {
// When:
listener.onCreate(serviceContext, metaStore, query);
final Map<String, String> tags = new HashMap<>(metricsTags);
tags.put("status", TAG);

// Then:
verify(metrics).metricName("query-status", "some-prefix-ksql-queries",
"The current status of the given query.",
ImmutableMap.of("status", TAG));
tags);
verify(metrics).metricName("error-status", "some-prefix-ksql-queries",
"The current error status of the given query, if the state is in ERROR state",
ImmutableMap.of("status", TAG));
tags);
tags.put("query-id", QUERY_ID.toString());
verify(metrics).metricName(QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_NAME, "some-prefix-ksql-queries",
QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_DESCRIPTION,
tags);

verify(metrics).addMetric(eq(METRIC_NAME_1), isA(Gauge.class));
verify(metrics).addMetric(eq(METRIC_NAME_2), isA(Gauge.class));
verify(metrics).addMetric(eq(METRIC_NAME_3), isA(CumulativeSum.class));
}

@Test
Expand All @@ -123,20 +140,26 @@ public void shouldGracefullyHandleErrorCallbackAfterDeregister() {
public void shouldAddMetricWithSuppliedPrefix() {
// Given:
final String groupPrefix = "some-prefix-";
final Map<String, String> tags = new HashMap<>(metricsTags);
tags.put("status", TAG);

clearInvocations(metrics);

// When:
listener = new QueryStateMetricsReportingListener(metrics, groupPrefix);
listener = new QueryStateMetricsReportingListener(metrics, groupPrefix, metricsTags);
listener.onCreate(serviceContext, metaStore, query);

// Then:
verify(metrics).metricName("query-status", groupPrefix + "ksql-queries",
"The current status of the given query.",
ImmutableMap.of("status", TAG));
tags);
verify(metrics).metricName("error-status", groupPrefix + "ksql-queries",
"The current error status of the given query, if the state is in ERROR state",
ImmutableMap.of("status", TAG));
tags);
tags.put("query-id", QUERY_ID.toString());
verify(metrics).metricName(QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_NAME, "some-prefix-ksql-queries",
QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_DESCRIPTION,
tags);
}

@Test
Expand Down Expand Up @@ -178,6 +201,7 @@ public void shouldRemoveMetricOnClose() {
// Then:
verify(metrics).removeMetric(METRIC_NAME_1);
verify(metrics).removeMetric(METRIC_NAME_2);
verify(metrics).removeMetric(METRIC_NAME_3);
}

private String currentGaugeValue(final MetricName name) {
Expand Down
Loading