Skip to content

Commit

Permalink
chore: add ThroughputMetricsReporter for consumed/produced metrics fr…
Browse files Browse the repository at this point in the history
…om KIP-846 (#9187)

Introduces a ThroughputMetricsReporter to report the consumed/produced total throughput metrics added to Streams in KIP-846. 

Metrics are aggregated to spit out a total sum of records & bytes consumed/produced per topic, per query
  • Loading branch information
ableegoldman authored Jun 14, 2022
1 parent b8590f5 commit b832626
Show file tree
Hide file tree
Showing 6 changed files with 726 additions and 6 deletions.
12 changes: 12 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -1799,6 +1799,18 @@ public Map<String, String> getStringAsMap(final String key) {
return parseStringAsMap(key, value);
}

public static Map<String, String> getStringAsMap(
final String key,
final Map<String, ?> configMap
) {
final String value = (String) configMap.get(key);
if (value != null) {
return parseStringAsMap(key, value);
} else {
return Collections.emptyMap();
}
}

public static Map<String, String> parseStringAsMap(final String key, final String value) {
try {
return value.equals("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@ public class StorageUtilizationMetricsReporter implements MetricsReporter {
private Map<String, Map<String, TaskStorageMetric>> metricsSeen;
private Metrics metricRegistry;
private static Map<String, String> customTags = new HashMap<>();
private static AtomicInteger numberStatefulTasks = new AtomicInteger(0);

public StorageUtilizationMetricsReporter() {
}
private static final AtomicInteger numberStatefulTasks = new AtomicInteger(0);

@Override
public void init(final List<KafkaMetric> list) {
Expand Down Expand Up @@ -281,6 +278,7 @@ private String getQueryId(final KafkaMetric metric) {
if (matcher.find()) {
return matcher.group(1);
} else {
LOGGER.error("Can't parse query id from metric {}", metric);
throw new KsqlException("Missing query ID when reporting utilization metrics");
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.internal;

import static java.util.Objects.requireNonNull;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOPIC_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOPIC_NAME_TAG;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThroughputMetricsReporter implements MetricsReporter {
private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputMetricsReporter.class);
private static final String QUERY_ID_TAG = "query-id";
private static final String RECORDS_CONSUMED = "records-consumed-total";
private static final String BYTES_CONSUMED = "bytes-consumed-total";
private static final String RECORDS_PRODUCED = "records-produced-total";
private static final String BYTES_PRODUCED = "bytes-produced-total";
private static final Set<String> THROUGHPUT_METRIC_NAMES =
mkSet(RECORDS_CONSUMED, BYTES_CONSUMED, RECORDS_PRODUCED, BYTES_PRODUCED);
private static final Pattern NAMED_TOPOLOGY_PATTERN = Pattern.compile("(.*?)__\\d*_\\d*");
private static final Pattern QUERY_ID_PATTERN =
Pattern.compile("(?<=query_|transient_)(.*?)(?=-)");

private static final Map<String, Map<String, Map<MetricName, ThroughputTotalMetric>>> metrics =
new HashMap<>();
private static final Map<String, String> customTags = new HashMap<>();
private Metrics metricRegistry;

@Override
public void init(final List<KafkaMetric> initial) {
}

@VisibleForTesting
static void reset() {
metrics.clear();
}

@Override
public synchronized void configure(final Map<String, ?> configMap) {
this.metricRegistry = (Metrics) requireNonNull(
configMap.get(KsqlConfig.KSQL_INTERNAL_METRICS_CONFIG)
);
customTags.putAll(KsqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS, configMap));
}

@Override
public void metricChange(final KafkaMetric metric) {
if (!THROUGHPUT_METRIC_NAMES.contains(metric.metricName().name())
|| !TOPIC_LEVEL_GROUP.equals(metric.metricName().group())) {
return;
}
addMetric(
metric,
getQueryId(metric),
getTopic(metric)
);
}

private synchronized void addMetric(
final KafkaMetric metric,
final String queryId,
final String topic
) {
final MetricName throughputTotalMetricName =
getThroughputTotalMetricName(queryId, metric.metricName());
LOGGER.debug("Adding metric {}", throughputTotalMetricName);
if (!metrics.containsKey(queryId)) {
metrics.put(queryId, new HashMap<>());
}
if (!metrics.get(queryId).containsKey(topic)) {
metrics.get(queryId).put(topic, new HashMap<>());
}

final ThroughputTotalMetric existingThroughputMetric =
metrics.get(queryId).get(topic).get(throughputTotalMetricName);

if (existingThroughputMetric == null) {
final ThroughputTotalMetric newThroughputMetric = new ThroughputTotalMetric(metric);

metrics.get(queryId).get(topic)
.put(throughputTotalMetricName, newThroughputMetric);
metricRegistry.addMetric(
throughputTotalMetricName,
(config, now) -> newThroughputMetric.getValue()
);
} else {
existingThroughputMetric.add(metric);
}
}

@Override
public void metricRemoval(final KafkaMetric metric) {
if (!THROUGHPUT_METRIC_NAMES.contains(metric.metricName().name())
|| !TOPIC_LEVEL_GROUP.equals(metric.metricName().group())) {
return;
}

removeMetric(
metric,
getQueryId(metric),
getTopic(metric)
);
}

private synchronized void removeMetric(
final KafkaMetric metric,
final String queryId,
final String topic
) {
final MetricName throughputTotalMetricName =
getThroughputTotalMetricName(queryId, metric.metricName());

LOGGER.debug("Removing metric {}", throughputTotalMetricName);

if (metrics.containsKey(queryId)
&& metrics.get(queryId).containsKey(topic)
&& metrics.get(queryId).get(topic).containsKey(throughputTotalMetricName)) {

final ThroughputTotalMetric throughputTotalMetric =
metrics.get(queryId).get(topic).get(throughputTotalMetricName);

throughputTotalMetric.remove(metric.metricName());

if (throughputTotalMetric.throughputTotalMetrics.isEmpty()) {
metrics.get(queryId).get(topic).remove(throughputTotalMetricName);
metricRegistry.removeMetric(throughputTotalMetricName);
if (metrics.get(queryId).get(topic).isEmpty()) {
metrics.get(queryId).remove(topic);
if (metrics.get(queryId).isEmpty()) {
metrics.remove(queryId);
}
}
}
}
}

@Override
public void close() {
}

@Override
public Set<String> reconfigurableConfigs() {
return null;
}

@Override
public void validateReconfiguration(final Map<String, ?> configs) throws ConfigException {
}

@Override
public void reconfigure(final Map<String, ?> configs) {
}

@Override
public void contextChange(final MetricsContext metricsContext) {
}

private MetricName getThroughputTotalMetricName(
final String queryId,
final MetricName metricName
) {
return new MetricName(
metricName.name(),
TOPIC_LEVEL_GROUP,
metricName.description() + " by this query",
getThroughputTotalMetricTags(queryId, metricName.tags())
);
}

private String getQueryId(final KafkaMetric metric) {
final String taskName = metric.metricName().tags().getOrDefault(TASK_ID_TAG, "");
final Matcher namedTopologyMatcher = NAMED_TOPOLOGY_PATTERN.matcher(taskName);
if (namedTopologyMatcher.find()) {
return namedTopologyMatcher.group(1);
}

final String queryIdTag = metric.metricName().tags().getOrDefault(THREAD_ID_TAG, "");
final Matcher matcher = QUERY_ID_PATTERN.matcher(queryIdTag);
if (matcher.find()) {
return matcher.group(1);
} else {
LOGGER.error("Can't parse query id from metric {}", metric.metricName());
throw new KsqlException("Missing query ID when reporting total throughput metrics");
}
}

private String getTopic(final KafkaMetric metric) {
final String topic = metric.metricName().tags().getOrDefault(TOPIC_NAME_TAG, "");
if (topic.equals("")) {
LOGGER.error("Can't parse topic name from metric {}", metric);
throw new KsqlException("Missing topic name when reporting total throughput metrics");
} else {
return topic;
}
}

private Map<String, String> getThroughputTotalMetricTags(
final String queryId,
final Map<String, String> originalStreamsMetricTags
) {
final Map<String, String> queryMetricTags = new HashMap<>(customTags);
queryMetricTags.putAll(originalStreamsMetricTags);
// Remove the taskId and processorNodeId tags as the throughput total metric sums over them
queryMetricTags.remove(TASK_ID_TAG);
queryMetricTags.remove(PROCESSOR_NODE_ID_TAG);
queryMetricTags.put(QUERY_ID_TAG, queryId);
return ImmutableMap.copyOf(queryMetricTags);
}

private static class ThroughputTotalMetric {
final Map<MetricName, KafkaMetric> throughputTotalMetrics = new HashMap<>();

ThroughputTotalMetric(final KafkaMetric metric) {
add(metric);
}

private void add(final KafkaMetric metric) {
throughputTotalMetrics.put(metric.metricName(), metric);
}

private void remove(final MetricName metric) {
throughputTotalMetrics.remove(metric);
}

public Double getValue() {
return throughputTotalMetrics
.values()
.stream()
.map(m -> (Double) m.metricValue())
.reduce(Double::sum)
.orElse(0D);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.confluent.ksql.execution.util.KeyUtil;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.internal.StorageUtilizationMetricsReporter;
import io.confluent.ksql.internal.ThroughputMetricsReporter;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.metastore.model.DataSource;
Expand Down Expand Up @@ -568,6 +569,11 @@ public static Map<String, Object> buildStreamsProperties(
StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
StorageUtilizationMetricsReporter.class.getName()
);
updateListProperty(
newStreamsProperties,
StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
ThroughputMetricsReporter.class.getName()
);

if (config.getBoolean(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED)) {
newStreamsProperties.put(StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE,
Expand Down
Loading

0 comments on commit b832626

Please sign in to comment.