Skip to content

Commit

Permalink
feat: add config for custom metrics tags (5.3.x) (#2996)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Jul 19, 2019
1 parent 6f4ae2c commit 76f5590
Show file tree
Hide file tree
Showing 13 changed files with 133 additions and 30 deletions.
10 changes: 10 additions & 0 deletions docs/installation/server-config/config-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,16 @@ The corresponding environment variable in the
`KSQL Server image <https://hub.docker.com/r/confluentinc/cp-ksql-server/>`__ is
``KSQL_LISTENERS``.

.. _ksql-metrics-tags-custom:

------------------------
ksql.metrics.tags.custom
------------------------

A list of tags to be included with emitted :ref:`JMX metrics <ksql-monitoring-and-metrics>`,
formatted as a string of ``key:value`` pairs separated by commas.
For example, ``key1:value1,key2:value2``.

.. _ksql-c3-settings:

|c3| Settings
Expand Down
28 changes: 28 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.util;

import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.config.ConfigItem;
Expand Down Expand Up @@ -143,6 +144,11 @@ public class KsqlConfig extends AbstractConfig {
private static final String
defaultSchemaRegistryUrl = "http://localhost:8081";

public static final String KSQL_CUSTOM_METRICS_TAGS = "ksql.metrics.tags.custom";
private static final String KSQL_CUSTOM_METRICS_TAGS_DOC =
"A list of tags to be included with emitted JMX metrics, formatted as a string of key:value "
+ "pairs separated by commas. For example, 'key1:value1,key2:value2'.";

public static final String KSQL_STREAMS_PREFIX = "ksql.streams.";

public static final String KSQL_COLLECT_UDF_METRICS = "ksql.udf.collect.metrics";
Expand Down Expand Up @@ -447,6 +453,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
KSQL_SECURITY_EXTENSION_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_SECURITY_EXTENSION_DOC
).define(
KSQL_CUSTOM_METRICS_TAGS,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.LOW,
KSQL_CUSTOM_METRICS_TAGS_DOC
)
.withClientSslSupport();
for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down Expand Up @@ -684,6 +696,22 @@ public KsqlConfig overrideBreakingConfigsWithOriginalValues(final Map<String, St
return new KsqlConfig(ConfigGeneration.LEGACY, mergedProperties, mergedStreamConfigProps);
}

public Map<String, String> getStringAsMap(final String key) {
final String value = getString(key).trim();
try {
return value.equals("")
? Collections.emptyMap()
: Splitter.on(",").trimResults().withKeyValueSeparator(":").split(value);
} catch (IllegalArgumentException e) {
throw new KsqlException(
String.format(
"Invalid config value for '%s'. value: %s. reason: %s",
key,
value,
e.getMessage()));
}
}

private static Set<String> sslConfigNames() {
final ConfigDef sslConfig = new ConfigDef();
SslConfigs.addClientSslSupport(sslConfig);
Expand Down
4 changes: 2 additions & 2 deletions ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ public static KsqlContext create(
final ServiceContext serviceContext = DefaultServiceContext.create(ksqlConfig);
final MutableFunctionRegistry functionRegistry = new InternalFunctionRegistry();
UdfLoader.newInstance(ksqlConfig, functionRegistry, ".").load();
final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
final ServiceInfo serviceInfo = ServiceInfo.create(ksqlConfig);
final KsqlEngine engine = new KsqlEngine(
serviceContext,
processingLogContext,
functionRegistry,
serviceId);
serviceInfo);

return new KsqlContext(
serviceContext,
Expand Down
55 changes: 55 additions & 0 deletions ksql-engine/src/main/java/io/confluent/ksql/ServiceInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql;

import io.confluent.ksql.util.KsqlConfig;
import java.util.Map;
import java.util.Objects;

public final class ServiceInfo {

private final String serviceId;
private final Map<String, String> customMetricsTags;

/**
* Create an object to be passed from the KSQL context down to the KSQL engine.
*/
public static ServiceInfo create(final KsqlConfig ksqlConfig) {
Objects.requireNonNull(ksqlConfig, "ksqlConfig cannot be null.");

final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
final Map<String, String> customMetricsTags =
ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS);

return new ServiceInfo(serviceId, customMetricsTags);
}

private ServiceInfo(
final String serviceId,
final Map<String, String> customMetricsTags
) {
this.serviceId = Objects.requireNonNull(serviceId, "serviceId");
this.customMetricsTags = Objects.requireNonNull(customMetricsTags, "customMetricsTags");
}

public String serviceId() {
return serviceId;
}

public Map<String, String> customMetricsTags() {
return customMetricsTags;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.ddl.commands.DdlCommandExec;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.internal.KsqlEngineMetrics;
Expand Down Expand Up @@ -67,14 +68,14 @@ public KsqlEngine(
final ServiceContext serviceContext,
final ProcessingLogContext processingLogContext,
final FunctionRegistry functionRegistry,
final String serviceId
final ServiceInfo serviceInfo
) {
this(
serviceContext,
processingLogContext,
serviceId,
serviceInfo.serviceId(),
new MetaStoreImpl(functionRegistry),
KsqlEngineMetrics::new);
(engine) -> new KsqlEngineMetrics(engine, serviceInfo.customMetricsTags()));
}

KsqlEngine(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.kafka.common.MetricName;
Expand Down Expand Up @@ -55,24 +57,28 @@ public class KsqlEngineMetrics implements Closeable {
private final Sensor errorRate;

private final String ksqlServiceId;

private final Map<String, String> customMetricsTags;

private final KsqlEngine ksqlEngine;
private final Metrics metrics;

public KsqlEngineMetrics(final KsqlEngine ksqlEngine) {
this(METRIC_GROUP_PREFIX, ksqlEngine, MetricCollectors.getMetrics());
public KsqlEngineMetrics(
final KsqlEngine ksqlEngine,
final Map<String, String> customMetricsTags) {
this(METRIC_GROUP_PREFIX, ksqlEngine, MetricCollectors.getMetrics(), customMetricsTags);
}

KsqlEngineMetrics(
final String metricGroupPrefix,
final KsqlEngine ksqlEngine,
final Metrics metrics) {
final Metrics metrics,
final Map<String, String> customMetricsTags) {
this.ksqlEngine = ksqlEngine;
this.ksqlServiceId = KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + ksqlEngine.getServiceId();
this.sensors = new ArrayList<>();
this.countMetrics = new ArrayList<>();
this.metricGroupName = metricGroupPrefix + "-query-stats";
this.customMetricsTags = customMetricsTags;

this.metrics = metrics;

Expand Down Expand Up @@ -269,7 +275,8 @@ private void configureMetric(
statSupplier.get());
// new
sensor.add(
metrics.metricName(metricName, ksqlServiceId + metricGroupName, description),
metrics.metricName(
metricName, ksqlServiceId + metricGroupName, description, customMetricsTags),
statSupplier.get());
}

Expand All @@ -293,6 +300,7 @@ private void configureGaugeForState(
final Metrics metrics,
final String name,
final String group,
final Map<String, String> tags,
final KafkaStreams.State state
) {
final Gauge<Long> gauge =
Expand All @@ -302,7 +310,7 @@ private void configureGaugeForState(
.filter(queryMetadata -> queryMetadata.getState().equals(state.toString()))
.count();
final String description = String.format("Count of queries in %s state.", state.toString());
final MetricName metricName = metrics.metricName(name, group, description);
final MetricName metricName = metrics.metricName(name, group, description, tags);
final CountMetric countMetric = new CountMetric(metricName, gauge);
metrics.addMetric(metricName, gauge);
countMetrics.add(countMetric);
Expand All @@ -315,15 +323,17 @@ private void configureNumActiveQueriesForGivenState(
// legacy
configureGaugeForState(
metrics,
ksqlServiceId + metricGroupName + "-" + name,
ksqlServiceId + metricGroupName + "-" + name,
metricGroupName,
Collections.emptyMap(),
state
);
// new
configureGaugeForState(
metrics,
name,
ksqlServiceId + metricGroupName,
customMetricsTags,
state
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static KsqlContext create(
serviceContext,
ProcessingLogContext.create(),
functionRegistry,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)
ServiceInfo.create(ksqlConfig)
);

return new KsqlContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.QueryMetadata;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -49,7 +51,7 @@ public static KsqlEngine createKsqlEngine(
ProcessingLogContext.create(),
"test_instance_",
metaStore,
KsqlEngineMetrics::new
(engine) -> new KsqlEngineMetrics(engine, Collections.emptyMap())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlConfigTestUtil;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.KsqlEngineTestUtil;
import io.confluent.ksql.function.InternalFunctionRegistry;
Expand Down Expand Up @@ -96,7 +97,7 @@ public void before() throws Exception {
serviceContext,
processingLogContext,
new InternalFunctionRegistry(),
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG));
ServiceInfo.create(ksqlConfig));

topicClient = serviceContext.getTopicClient();
metaStore = ksqlEngine.getMetaStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.ops;
import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.prefixedResource;
import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.resource;
import static io.confluent.ksql.util.KsqlConfig.KSQL_CUSTOM_METRICS_TAGS;
import static io.confluent.ksql.util.KsqlConfig.KSQL_SERVICE_ID_CONFIG;
import static org.apache.kafka.common.acl.AclOperation.ALL;
import static org.apache.kafka.common.acl.AclOperation.CREATE;
Expand All @@ -37,6 +38,7 @@

import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.KsqlConfigTestUtil;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.KsqlEngineTestUtil;
import io.confluent.ksql.function.InternalFunctionRegistry;
Expand Down Expand Up @@ -269,7 +271,7 @@ private void givenTestSetupWithConfig(final Map<String, Object> ksqlConfigs) {
serviceContext,
ProcessingLogContext.create(),
new InternalFunctionRegistry(),
ksqlConfig.getString(KSQL_SERVICE_ID_CONFIG));
ServiceInfo.create(ksqlConfig));

execInitCreateStreamQueries();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class KsqlEngineMetricsTest {
private KsqlEngineMetrics engineMetrics;
private static final String KSQL_SERVICE_ID = "test-ksql-service-id";
private static final String metricNamePrefix = KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + KSQL_SERVICE_ID;
private static final Map<String, String> CUSTOM_TAGS = ImmutableMap.of("tag1", "value1", "tag2", "value2");

@Mock
private KsqlEngine ksqlEngine;
Expand All @@ -75,7 +76,7 @@ public void setUp() {
when(ksqlEngine.getServiceId()).thenReturn(KSQL_SERVICE_ID);
when(query1.getQueryApplicationId()).thenReturn("app-1");

engineMetrics = new KsqlEngineMetrics(METRIC_GROUP, ksqlEngine, MetricCollectors.getMetrics());
engineMetrics = new KsqlEngineMetrics(METRIC_GROUP, ksqlEngine, MetricCollectors.getMetrics(), CUSTOM_TAGS);
}

@After
Expand All @@ -93,15 +94,6 @@ public void shouldRemoveAllSensorsOnClose() {
engineMetrics.registeredSensors().forEach(sensor -> assertThat(engineMetrics.getMetrics().getSensor(sensor.name()), is(nullValue())));
}

private void shouldRecordRate(final String name, final double expected, final double error) {
assertThat(
Math.floor(getMetricValueLegacy(name)),
closeTo(expected, error));
assertThat(
Math.floor(getMetricValue(name)),
closeTo(expected, error));
}

@Test
public void shouldRecordNumberOfActiveQueries() {
when(ksqlEngine.numberOfLiveQueries()).thenReturn(3);
Expand Down Expand Up @@ -264,7 +256,7 @@ private double getMetricValue(final String metricName) {
return Double.valueOf(
metrics.metric(
metrics.metricName(
metricName, metricNamePrefix + METRIC_GROUP + "-query-stats")
metricName, metricNamePrefix + METRIC_GROUP + "-query-stats", CUSTOM_TAGS)
).metricValue().toString()
);
}
Expand All @@ -274,7 +266,7 @@ private long getLongMetricValue(final String metricName) {
return Long.parseLong(
metrics.metric(
metrics.metricName(
metricName, metricNamePrefix + METRIC_GROUP + "-query-stats")
metricName, metricNamePrefix + METRIC_GROUP + "-query-stats", CUSTOM_TAGS)
).metricValue().toString()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.ddl.DdlConfig;
import io.confluent.ksql.ddl.commands.CreateStreamCommand;
import io.confluent.ksql.ddl.commands.RegisterTopicCommand;
Expand Down Expand Up @@ -468,7 +469,7 @@ static KsqlRestApplication buildApplication(
serviceContext,
processingLogContext,
functionRegistry,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG));
ServiceInfo.create(ksqlConfig));

UdfLoader.newInstance(ksqlConfig, functionRegistry, ksqlInstallDir).load();

Expand Down
Loading

0 comments on commit 76f5590

Please sign in to comment.