Skip to content

Commit

Permalink
chore: update KSQL metrics context labels (confluentinc#5757)
Browse files Browse the repository at this point in the history
* chore: update KSQL metrics context labels

* remove cluster.id label

* Revert "remove cluster.id label"

This reverts commit 6977a0c.

* use cluster.id instead of service.id for label
  • Loading branch information
stevenpyzhang authored Jul 15, 2020
1 parent 7303aed commit 6edd20b
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public final class MetricCollectors {
private static final String KSQL_JMX_PREFIX = "io.confluent.ksql.metrics";
public static final String RESOURCE_LABEL_PREFIX =
CommonClientConfigs.METRICS_CONTEXT_PREFIX + "resource.";
private static final String KSQL_RESOURCE_TYPE = "KSQL";
private static final String KSQL_RESOURCE_TYPE = "ksql";

public static final String RESOURCE_LABEL_TYPE =
RESOURCE_LABEL_PREFIX + "type";
Expand All @@ -56,8 +56,8 @@ public final class MetricCollectors {
RESOURCE_LABEL_PREFIX + "commit.id";
public static final String RESOURCE_LABEL_CLUSTER_ID =
RESOURCE_LABEL_PREFIX + "cluster.id";
public static final String RESOURCE_LABEL_KSQL_SERVICE_ID =
RESOURCE_LABEL_PREFIX + KsqlConfig.KSQL_SERVICE_ID_CONFIG;
public static final String RESOURCE_LABEL_KAFKA_CLUSTER_ID =
RESOURCE_LABEL_PREFIX + "kafka.cluster.id";

private static Map<String, MetricCollector> collectorMap;
private static Metrics metrics;
Expand Down Expand Up @@ -128,12 +128,9 @@ public static void addConfigurableReporter(
ksqlServiceId));

if (reporters.size() > 0) {
final Map<String, Object> props = ksqlConfig.originals();
props.putAll(addConfluentMetricsContextConfigsForKsql(ksqlServiceId));
final KsqlConfig ksqlConfigWithMetricsContext = new KsqlConfig(props);
final MetricsContext metricsContext = new KafkaMetricsContext(
KSQL_JMX_PREFIX,
ksqlConfigWithMetricsContext.originalsWithPrefix(
ksqlConfig.originalsWithPrefix(
CommonClientConfigs.METRICS_CONTEXT_PREFIX));
for (final MetricsReporter reporter : reporters) {
reporter.contextChange(metricsContext);
Expand All @@ -143,22 +140,15 @@ public static void addConfigurableReporter(
}

public static Map<String, Object> addConfluentMetricsContextConfigs(
final String ksqlServiceId
final String ksqlServiceId,
final String kafkaClusterId
) {
final Map<String, Object> updatedProps = new HashMap<>();
updatedProps.put(RESOURCE_LABEL_TYPE, KSQL_RESOURCE_TYPE);
updatedProps.put(RESOURCE_LABEL_CLUSTER_ID, ksqlServiceId);
return updatedProps;
}

public static Map<String, Object> addConfluentMetricsContextConfigsForKsql(
final String ksqlServiceId
) {
final Map<String, Object> updatedProps = new HashMap<>();
updatedProps.put(RESOURCE_LABEL_KSQL_SERVICE_ID, ksqlServiceId);
updatedProps.put(RESOURCE_LABEL_KAFKA_CLUSTER_ID, kafkaClusterId);
updatedProps.put(RESOURCE_LABEL_VERSION, AppInfo.getVersion());
updatedProps.put(RESOURCE_LABEL_COMMIT_ID, AppInfo.getCommitId());
updatedProps.putAll(addConfluentMetricsContextConfigs(ksqlServiceId));
return updatedProps;
}

Expand Down
21 changes: 5 additions & 16 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -781,10 +781,7 @@ public Map<String, Object> getKsqlStreamConfigProps(final String applicationId)
+ StreamsConfig.APPLICATION_ID_CONFIG,
applicationId
);
map.putAll(
addConfluentMetricsContextConfigsKafka(
Collections.emptyMap(),
getString(KSQL_SERVICE_ID_CONFIG)));
map.putAll(addConfluentMetricsContextConfigsKafka(Collections.emptyMap()));
return Collections.unmodifiableMap(map);
}

Expand All @@ -799,33 +796,25 @@ public Map<String, Object> getKsqlStreamConfigProps() {
public Map<String, Object> getKsqlAdminClientConfigProps() {
final Map<String, Object> map = new HashMap<>();
map.putAll(getConfigsFor(AdminClientConfig.configNames()));
map.putAll(
addConfluentMetricsContextConfigsKafka(Collections.emptyMap(),
getString(KSQL_SERVICE_ID_CONFIG)));
map.putAll(addConfluentMetricsContextConfigsKafka(Collections.emptyMap()));
return Collections.unmodifiableMap(map);
}

public Map<String, Object> getProducerClientConfigProps() {
final Map<String, Object> map = new HashMap<>();
map.putAll(getConfigsFor(ProducerConfig.configNames()));
map.putAll(
addConfluentMetricsContextConfigsKafka(Collections.emptyMap(),
getString(KSQL_SERVICE_ID_CONFIG)));
map.putAll(addConfluentMetricsContextConfigsKafka(Collections.emptyMap()));
return Collections.unmodifiableMap(map);
}

public Map<String, Object> addConfluentMetricsContextConfigsKafka(
final Map<String,Object> props,
final String ksqlServiceId
final Map<String,Object> props
) {
final Map<String, Object> updatedProps = new HashMap<>(props);
final AppInfoParser.AppInfo appInfo = new AppInfoParser.AppInfo(System.currentTimeMillis());
updatedProps.putAll(getConfigsForPrefix(REPORTER_CONFIGS_PREFIXES));
updatedProps.put(MetricCollectors.RESOURCE_LABEL_VERSION, appInfo.getVersion());
updatedProps.put(MetricCollectors.RESOURCE_LABEL_COMMIT_ID, appInfo.getCommitId());

updatedProps.putAll(
MetricCollectors.addConfluentMetricsContextConfigs(ksqlServiceId));
updatedProps.putAll(getConfigsForPrefix(REPORTER_CONFIGS_PREFIXES));
return updatedProps;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import io.confluent.ksql.security.KsqlDefaultSecurityExtension;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.security.KsqlSecurityExtension;
import io.confluent.ksql.services.KafkaClusterUtil;
import io.confluent.ksql.services.LazyServiceContext;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.services.SimpleKsqlClient;
Expand Down Expand Up @@ -549,16 +550,29 @@ Optional<URL> getInternalListener() {
}

public static KsqlRestApplication buildApplication(final KsqlRestConfig restConfig) {
final Map<String, Object> updatedRestProps = restConfig.getOriginals();
final KsqlConfig ksqlConfig = new KsqlConfig(restConfig.getKsqlConfigProperties());
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory =
new KsqlSchemaRegistryClientFactory(ksqlConfig, Collections.emptyMap())::get;
final ServiceContext serviceContext = new LazyServiceContext(() ->

final ServiceContext tempServiceContext = new LazyServiceContext(() ->
RestServiceContextFactory.create(ksqlConfig, Optional.empty(),
schemaRegistryClientFactory));
final String kafkaClusterId = KafkaClusterUtil.getKafkaClusterId(tempServiceContext);
final String ksqlServerId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
updatedRestProps.putAll(
MetricCollectors.addConfluentMetricsContextConfigs(ksqlServerId, kafkaClusterId));
final KsqlRestConfig updatedRestConfig = new KsqlRestConfig(updatedRestProps);

final ServiceContext serviceContext = new LazyServiceContext(() ->
RestServiceContextFactory.create(
new KsqlConfig(updatedRestConfig.getKsqlConfigProperties()),
Optional.empty(),
schemaRegistryClientFactory));

return buildApplication(
"",
restConfig,
updatedRestConfig,
KsqlVersionCheckerAgent::new,
Integer.MAX_VALUE,
serviceContext,
Expand Down Expand Up @@ -605,17 +619,14 @@ static KsqlRestApplication buildApplication(

final String commandTopicName = ReservedInternalTopics.commandTopic(ksqlConfig);

final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
final CommandStore commandStore = CommandStore.Factory.create(
commandTopicName,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
ksqlConfig.addConfluentMetricsContextConfigsKafka(
restConfig.getCommandConsumerProperties(),
serviceId),
restConfig.getCommandConsumerProperties()),
ksqlConfig.addConfluentMetricsContextConfigsKafka(
restConfig.getCommandProducerProperties(),
serviceId)
restConfig.getCommandProducerProperties())
);

final InteractiveStatementExecutor statementExecutor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import io.confluent.ksql.function.UserFunctionLoader;
import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.query.id.SequentialQueryIdGenerator;
import io.confluent.ksql.rest.server.computation.ConfigStore;
import io.confluent.ksql.rest.server.computation.KafkaConfigStore;
import io.confluent.ksql.rest.util.KsqlInternalTopicUtils;
import io.confluent.ksql.services.DisabledKsqlClient;
import io.confluent.ksql.services.KafkaClusterUtil;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.services.ServiceContextFactory;
import io.confluent.ksql.statement.Injector;
Expand All @@ -52,11 +54,24 @@ public static StandaloneExecutor create(
final String queriesFile,
final String installDir
) {
final KsqlConfig tempConfig = new KsqlConfig(properties);

final Function<KsqlConfig, ServiceContext> serviceContextFactory =
config -> ServiceContextFactory.create(config, DisabledKsqlClient::instance);
final ServiceContext tempServiceContext =
serviceContextFactory.apply(tempConfig);

final String kafkaClusterId = KafkaClusterUtil.getKafkaClusterId(tempServiceContext);
final String ksqlServerId = tempConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
final Map<String, Object> updatedProperties = tempConfig.originals();
updatedProperties.putAll(
MetricCollectors.addConfluentMetricsContextConfigs(ksqlServerId, kafkaClusterId));

return create(
properties,
updatedProperties,
queriesFile,
installDir,
config -> ServiceContextFactory.create(config, DisabledKsqlClient::instance),
serviceContextFactory,
KafkaConfigStore::new,
KsqlVersionCheckerAgent::new,
StandaloneExecutor::new
Expand All @@ -80,7 +95,7 @@ StandaloneExecutor create(

@VisibleForTesting
static StandaloneExecutor create(
final Map<String, String> properties,
final Map<String, Object> properties,
final String queriesFile,
final String installDir,
final Function<KsqlConfig, ServiceContext> serviceContextFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class StandaloneExecutorFactoryTest {
private static final String QUERIES_FILE = "queries";
private static final String INSTALL_DIR = "install";

private final Map<String, String> properties = Collections.emptyMap();
private final Map<String, Object> properties = Collections.emptyMap();
private final KsqlConfig baseConfig = new KsqlConfig(properties);
private final KsqlConfig mergedConfig = new KsqlConfig(Collections.emptyMap());
private final String configTopicName = ReservedInternalTopics.configsTopic(baseConfig);
Expand Down

0 comments on commit 6edd20b

Please sign in to comment.