Skip to content

Commit

Permalink
feat: update storage utilization metrics to start when app is initial…
Browse files Browse the repository at this point in the history
…ized
  • Loading branch information
lct45 committed Sep 2, 2021
1 parent fd9faf2 commit b78616c
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,9 @@ public void init(final List<KafkaMetric> list) {

@Override
public void configure(final Map<String, ?> map) {
final String dir;
if (map.containsKey(StreamsConfig.STATE_DIR_CONFIG)) {
dir = map.get(StreamsConfig.STATE_DIR_CONFIG).toString();
} else {
dir = StreamsConfig
.configDef()
.defaultValues()
.get(StreamsConfig.STATE_DIR_CONFIG)
.toString();
}
configureShared(new File(dir), this.metricRegistry);
}

private static void configureShared(final File baseDir, final Metrics metricRegistry) {
public static void configureShared(final File baseDir, final Metrics metricRegistry) {
if (registeredNodeMetrics.getAndSet(true)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.confluent.ksql.function.MutableFunctionRegistry;
import io.confluent.ksql.function.UserFunctionLoader;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.internal.StorageUtilizationMetricsReporter;
import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogServerUtils;
Expand Down Expand Up @@ -699,6 +700,13 @@ static KsqlRestApplication buildApplication(

final SpecificQueryIdGenerator specificQueryIdGenerator =
new SpecificQueryIdGenerator();

final String stateDir = ksqlConfig.getKsqlStreamConfigProps().getOrDefault(
StreamsConfig.STATE_DIR_CONFIG,
StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG))
.toString();

StorageUtilizationMetricsReporter.configureShared(new File(stateDir), MetricCollectors.getMetrics());

final KsqlEngine ksqlEngine = new KsqlEngine(
serviceContext,
Expand Down

0 comments on commit b78616c

Please sign in to comment.