Skip to content

Commit

Permalink
move metric to query state reporting listener
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang committed May 17, 2022
1 parent facc272 commit 2710ed0
Show file tree
Hide file tree
Showing 20 changed files with 135 additions and 318 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public QueryEventListener getQueryEventListener() {
final String metricsPrefix
= metricGroupPrefix.equals(KsqlEngineMetrics.DEFAULT_METRIC_GROUP_PREFIX)
? "" : metricGroupPrefix;
return new QueryStateMetricsReportingListener(metrics, metricsPrefix);
return new QueryStateMetricsReportingListener(metrics, metricsPrefix, newCustomMetricsTags);
}

private void recordMessageConsumptionByQueryStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.QueryMetadata;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.streams.KafkaStreams.State;

public class QueryStateMetricsReportingListener implements QueryEventListener {
public static final String QUERY_RESTART_METRIC_NAME = "query-restart-total";
public static final String QUERY_RESTART_METRIC_DESCRIPTION =
"The total number of times that a query thread has failed and then been restarted.";

public static final Ticker CURRENT_TIME_MILLIS_TICKER = new Ticker() {
@Override
Expand All @@ -42,12 +48,18 @@ public long read() {

private final Metrics metrics;
private final String metricsPrefix;
private final Map<String, String> metricsTags;
private final ConcurrentMap<QueryId, PerQueryListener> perQuery = new ConcurrentHashMap<>();

QueryStateMetricsReportingListener(final Metrics metrics, final String metricsPrefix) {
QueryStateMetricsReportingListener(
final Metrics metrics,
final String metricsPrefix,
final Map<String, String> metricsTags
) {
this.metrics = Objects.requireNonNull(metrics, "metrics");
this.metricsPrefix
= Objects.requireNonNull(metricsPrefix, "metricGroupPrefix");
this.metricsTags = Objects.requireNonNull(metricsTags);
}

@Override
Expand All @@ -60,7 +72,12 @@ public void onCreate(
}
perQuery.put(
queryMetadata.getQueryId(),
new PerQueryListener(metrics, metricsPrefix, queryMetadata.getQueryId().toString())
new PerQueryListener(
metrics,
metricsPrefix,
queryMetadata.getQueryId().toString(),
metricsTags
)
);
}

Expand Down Expand Up @@ -96,6 +113,8 @@ private static class PerQueryListener {
private final Metrics metrics;
private final MetricName stateMetricName;
private final MetricName errorMetricName;
private final MetricName queryRestartMetricName;
private final CumulativeSum queryRestartSum;
private final Ticker ticker;

private volatile String state = "-";
Expand All @@ -104,16 +123,18 @@ private static class PerQueryListener {
PerQueryListener(
final Metrics metrics,
final String groupPrefix,
final String queryId
final String queryId,
final Map<String, String> metricsTags
) {
this(metrics, groupPrefix, queryId, CURRENT_TIME_MILLIS_TICKER);
this(metrics, groupPrefix, queryId, CURRENT_TIME_MILLIS_TICKER, metricsTags);
}

PerQueryListener(
final Metrics metrics,
final String groupPrefix,
final String queryId,
final Ticker ticker
final Ticker ticker,
final Map<String, String> metricsTags
) {
Objects.requireNonNull(groupPrefix, "groupPrefix");
Objects.requireNonNull(queryId, "queryId");
Expand All @@ -124,20 +145,34 @@ private static class PerQueryListener {

final String tag = "_confluent-ksql-" + groupPrefix + type + queryId;

final Map<String, String> tagsForStateAndError = new HashMap<>(metricsTags);
tagsForStateAndError.put("status", tag);
this.stateMetricName = metrics.metricName(
"query-status",
groupPrefix + "ksql-queries",
"The current status of the given query.",
Collections.singletonMap("status", tag));
tagsForStateAndError
);

errorMetricName = metrics.metricName(
"error-status",
groupPrefix + "ksql-queries",
"The current error status of the given query, if the state is in ERROR state",
Collections.singletonMap("status", tag)
tagsForStateAndError
);

final Map<String, String> restartTags = new HashMap<>(tagsForStateAndError);
restartTags.put("query-id", queryId);
queryRestartMetricName = metrics.metricName(
QUERY_RESTART_METRIC_NAME,
groupPrefix + "ksql-queries",
QUERY_RESTART_METRIC_DESCRIPTION,
restartTags
);
this.queryRestartSum = new CumulativeSum();
this.metrics.addMetric(stateMetricName, (Gauge<String>) (config, now) -> state);
this.metrics.addMetric(errorMetricName, (Gauge<String>) (config, now) -> error);
this.metrics.addMetric(queryRestartMetricName, queryRestartSum);
}

public void onChange(final State newState, final State oldState) {
Expand All @@ -150,11 +185,13 @@ public void onChange(final State newState, final State oldState) {

public void onError(final QueryError observedError) {
error = observedError.getType().name();
queryRestartSum.record(new MetricConfig(), 1, System.currentTimeMillis());
}

public void onDeregister() {
metrics.removeMetric(stateMetricName);
metrics.removeMetric(errorMetricName);
metrics.removeMetric(queryRestartMetricName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,7 @@ TransientQueryMetadata buildTransientQuery(
ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_INITIAL_MS),
ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_MAX_MS),
listener,
processingLogContext.getLoggerFactory(),
metricCollectors.getMetrics(),
ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS)
processingLogContext.getLoggerFactory()
);
}

Expand Down Expand Up @@ -377,9 +375,7 @@ PersistentQueryMetadata buildPersistentQueryInDedicatedRuntime(
ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_MAX_MS),
listener,
scalablePushRegistry,
processingLogContext.getLoggerFactory(),
metricCollectors.getMetrics(),
ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS)
processingLogContext.getLoggerFactory()
);

}
Expand Down Expand Up @@ -493,9 +489,7 @@ PersistentQueryMetadata buildPersistentQueryInSharedRuntime(
physicalPlan
),
keyFormat,
processingLogContext.getLoggerFactory(),
metricCollectors.getMetrics(),
ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS)
processingLogContext.getLoggerFactory()
);
if (real) {
return binPackedPersistentQueryMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.Set;
import java.util.function.Function;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.LagInfo;
Expand Down Expand Up @@ -83,8 +82,6 @@ public class BinPackedPersistentQueryMetadataImpl implements PersistentQueryMeta
private final Listener listener;
private final Function<SharedKafkaStreamsRuntime, NamedTopology> namedTopologyBuilder;
private final TimeBoundedQueue queryErrors;
private final Optional<Metrics> metrics;
private final Optional<Sensor> restartSensor;

private final MaterializationProviderBuilderFactory
materializationProviderBuilderFactory;
Expand Down Expand Up @@ -117,9 +114,7 @@ public BinPackedPersistentQueryMetadataImpl(
final Optional<ScalablePushRegistry> scalablePushRegistry,
final Function<SharedKafkaStreamsRuntime, NamedTopology> namedTopologyBuilder,
final KeyFormat keyFormat,
final ProcessingLoggerFactory loggerFactory,
final Metrics metrics,
final Map<String, String> metricsTags
final ProcessingLoggerFactory loggerFactory
) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
this.persistentQueryType = Objects.requireNonNull(persistentQueryType, "persistentQueryType");
Expand All @@ -146,9 +141,6 @@ public BinPackedPersistentQueryMetadataImpl(
this.namedTopologyBuilder = requireNonNull(namedTopologyBuilder, "namedTopologyBuilder");
this.queryErrors = sharedKafkaStreamsRuntime.getNewQueryErrorQueue();
this.scalablePushRegistry = requireNonNull(scalablePushRegistry, "scalablePushRegistry");
this.metrics = Optional.of(metrics);
this.restartSensor = Optional.of(
QueryMetricsUtil.createQueryRestartMetricSensor(queryId.toString(), metricsTags, metrics));
this.keyFormat = requireNonNull(keyFormat, "keyFormat");
this.loggerFactory = requireNonNull(loggerFactory, "loggerFactory");
}
Expand Down Expand Up @@ -179,8 +171,6 @@ public BinPackedPersistentQueryMetadataImpl(
this.queryErrors = sharedKafkaStreamsRuntime.getNewQueryErrorQueue();
this.scalablePushRegistry = original.scalablePushRegistry;
this.namedTopologyBuilder = original.namedTopologyBuilder;
this.metrics = Optional.empty();
this.restartSensor = Optional.empty();
this.keyFormat = original.keyFormat;
this.loggerFactory = original.loggerFactory;
}
Expand Down Expand Up @@ -419,9 +409,6 @@ public void close() {
sharedKafkaStreamsRuntime.stop(queryId, false);
scalablePushRegistry.ifPresent(ScalablePushRegistry::close);
listener.onClose(this);
if (metrics.isPresent() && restartSensor.isPresent()) {
metrics.get().removeSensor(restartSensor.get().name());
}
}

@Override
Expand All @@ -442,9 +429,4 @@ public void register() {
Listener getListener() {
return listener;
}

@Override
public Optional<Sensor> getRestartMetricsSensor() {
return restartSensor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;

Expand Down Expand Up @@ -89,9 +88,7 @@ public PersistentQueryMetadataImpl(
final long retryBackoffMaxMs,
final QueryMetadata.Listener listener,
final Optional<ScalablePushRegistry> scalablePushRegistry,
final ProcessingLoggerFactory loggerFactory,
final Metrics metrics,
final Map<String, String> metricsTags
final ProcessingLoggerFactory loggerFactory
) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
super(
Expand All @@ -111,9 +108,7 @@ public PersistentQueryMetadataImpl(
retryBackoffInitialMs,
retryBackoffMaxMs,
new QueryListenerWrapper(listener, scalablePushRegistry),
loggerFactory,
metrics,
metricsTags
loggerFactory
);
this.sinkDataSource = requireNonNull(sinkDataSource, "sinkDataSource");
this.schemas = requireNonNull(schemas, "schemas");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.LagInfo;
import org.apache.kafka.streams.StreamsMetadata;
Expand Down Expand Up @@ -75,8 +73,6 @@ public interface QueryMetadata {

KafkaStreams getKafkaStreams();

Optional<Sensor> getRestartMetricsSensor();

void close();

void start();
Expand Down
Loading

0 comments on commit 2710ed0

Please sign in to comment.