Skip to content

Commit

Permalink
adding logging, fix checkstyle
Browse files Browse the repository at this point in the history
  • Loading branch information
lct45 committed Sep 7, 2021
1 parent b78616c commit 0e1c370
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StorageUtilizationMetricsReporter implements MetricsReporter {
private static final Logger LOGGER
= LoggerFactory.getLogger(StorageUtilizationMetricsReporter.class);
private static final String METRIC_GROUP = "ksqldb_utilization";

private final Map<String, Map<String, TaskStorageMetric>> metricsSeen;
Expand Down Expand Up @@ -73,6 +76,7 @@ public static void configureShared(final File baseDir, final Metrics metricRegis
if (registeredNodeMetrics.getAndSet(true)) {
return;
}
LOGGER.info("Adding node level storage usage gauges");
final MetricName nodeAvailable =
metricRegistry.metricName("node_storage_free_bytes", METRIC_GROUP);
final MetricName nodeTotal =
Expand Down Expand Up @@ -158,6 +162,7 @@ private synchronized void handleNewSstFilesSizeMetric(
final String taskId,
final String queryId
) {
LOGGER.debug("Updating disk usage metrics");
// if we haven't seen a task for this query yet
if (!metricsSeen.containsKey(queryId)) {
metricRegistry.addMetric(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,11 +702,14 @@ static KsqlRestApplication buildApplication(
new SpecificQueryIdGenerator();

final String stateDir = ksqlConfig.getKsqlStreamConfigProps().getOrDefault(
StreamsConfig.STATE_DIR_CONFIG,
StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG))
.toString();
StreamsConfig.STATE_DIR_CONFIG,
StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG))
.toString();

StorageUtilizationMetricsReporter.configureShared(new File(stateDir), MetricCollectors.getMetrics());
StorageUtilizationMetricsReporter.configureShared(
new File(stateDir),
MetricCollectors.getMetrics()
);

final KsqlEngine ksqlEngine = new KsqlEngine(
serviceContext,
Expand Down

0 comments on commit 0e1c370

Please sign in to comment.