Skip to content

Commit

Permalink
feat: add cleanup service metrics (#8779)
Browse files Browse the repository at this point in the history
* feat: add metrics for transient query cleanup service

* feat: add cleanup service metrics

* more metrics

* dot

* nit

* nit
  • Loading branch information
cprasad1 authored Mar 2, 2022
1 parent 7f7a076 commit 1cb7846
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,22 @@ public ImmutableAnalysis analyzeQueryWithNoOutputTopic(
);
}

public int reportNumberOfLeakedTopics() {
return transientQueryCleanupService.getNumLeakedTopics();
}

public int reportNumberOfLeakedStateDirs() {
return transientQueryCleanupService.getNumLeakedStateDirs();
}

public int reportNumLeakedTopicsAfterCleanup() {
return transientQueryCleanupService.getNumLeakedTopicsFailedToCleanUp();
}

public int reportNumLeakedStateDirsAfterCleanup() {
return transientQueryCleanupService.getNumLeakedStateDirsFailedToCleanUp();
}

private static final class CleanupListener implements QueryEventListener {
final QueryCleanupService cleanupService;
final ServiceContext serviceContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public class TransientQueryCleanupService extends AbstractScheduledService {
private Optional<Set<String>> localCommandsQueryAppIds;
private QueryRegistry queryRegistry;
private int numLeakedTopics;
private int numLeakedStateFiles;
private int numLeakedStateDirs;
private int numLeakedTopicsFailedToCleanUp;
private int numLeakedStateDirsFailedToCleanUp;

public TransientQueryCleanupService(final ServiceContext serviceContext,
final KsqlConfig ksqlConfig) {
Expand All @@ -80,48 +82,58 @@ public TransientQueryCleanupService(final ServiceContext serviceContext,
this.localCommandsQueryAppIds = Optional.empty();
this.queriesGuaranteedToBeRunningAtSomePoint = new HashSet<>();
this.numLeakedTopics = 0;
this.numLeakedStateFiles = 0;
this.numLeakedStateDirs = 0;
}

@Override
protected void runOneIteration() {
findAndDeleteLeakedTopics();
findAndDeleteLeakedStateDirs();
}

@Override
public Scheduler scheduler() {
return Scheduler.newFixedRateSchedule(initialDelay, intervalPeriod, TimeUnit.SECONDS);
}

public void setQueryRegistry(final QueryRegistry queryRegistry) {
this.queryRegistry = queryRegistry;
}

@SuppressFBWarnings(value = "EI_EXPOSE_REP2")
public void setLocalCommandsQueryAppIds(final Set<String> ids) {
this.localCommandsQueryAppIds = Optional.of(ids);
}

private void findAndDeleteLeakedTopics() {
try {
final List<String> leakedTopics = findLeakedTopics();
this.numLeakedTopics = leakedTopics.size();
LOG.info("Cleaning up {} leaked topics: {}", numLeakedTopics, leakedTopics);
getTopicClient().deleteTopics(leakedTopics);
this.numLeakedTopicsFailedToCleanUp = findLeakedTopics().size();
} catch (Throwable t) {
LOG.error(
"Failed to clean up topics with exception: " + t.getMessage(), t);
"Failed to clean up topics with exception: " + t.getMessage(), t);
}
}

private void findAndDeleteLeakedStateDirs() {
try {
final List<String> leakedStateDirs = findLeakedStateDirs();
this.numLeakedStateFiles = leakedStateDirs.size();
this.numLeakedStateDirs = leakedStateDirs.size();
LOG.info("Cleaning up {} leaked state directories: {}",
numLeakedStateFiles,
leakedStateDirs.stream().map(file -> stateDir + "/" + file)
.collect(Collectors.toList()));
numLeakedStateDirs,
leakedStateDirs.stream().map(file -> stateDir + "/" + file)
.collect(Collectors.toList()));
leakedStateDirs.forEach(this::deleteLeakedStateDir);
this.numLeakedStateDirsFailedToCleanUp = findLeakedStateDirs().size();
} catch (Throwable t) {
LOG.error(
"Failed to clean up state directories with exception: " + t.getMessage(), t);
"Failed to clean up state directories with exception: " + t.getMessage(), t);
}
}

@Override
public Scheduler scheduler() {
return Scheduler.newFixedRateSchedule(initialDelay, intervalPeriod, TimeUnit.SECONDS);
}

public void setQueryRegistry(final QueryRegistry queryRegistry) {
this.queryRegistry = queryRegistry;
}

@SuppressFBWarnings(value = "EI_EXPOSE_REP2")
public void setLocalCommandsQueryAppIds(final Set<String> ids) {
this.localCommandsQueryAppIds = Optional.of(ids);
}

private void deleteLeakedStateDir(final String filename) {
final String path = stateDir + "/" + filename;
final Path pathName = Paths.get(path);
Expand Down Expand Up @@ -207,4 +219,20 @@ KafkaTopicClient getTopicClient() {
String getStateDir() {
return stateDir;
}

public int getNumLeakedTopics() {
return numLeakedTopics;
}

public int getNumLeakedStateDirs() {
return numLeakedStateDirs;
}

public int getNumLeakedTopicsFailedToCleanUp() {
return numLeakedTopicsFailedToCleanUp;
}

public int getNumLeakedStateDirsFailedToCleanUp() {
return numLeakedStateDirsFailedToCleanUp;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.internal;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.engine.KsqlEngine;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LeakedResourcesMetrics implements Runnable {
private static final Logger LOGGER
= LoggerFactory.getLogger(LeakedResourcesMetrics.class);

private final KsqlEngine engine;
private final MetricsReporter reporter;
private final Map<String, String> customTags;
private final Supplier<Instant> time;

public LeakedResourcesMetrics(
final KsqlEngine ksqlEngine,
final JmxDataPointsReporter jmxDataPointsReporter,
final Map<String, String> customTags
) {
this(Instant::now, ksqlEngine, jmxDataPointsReporter, customTags);
}

LeakedResourcesMetrics(
final Supplier<Instant> time,
final KsqlEngine ksqlEngine,
final JmxDataPointsReporter jmxDataPointsReporter,
final Map<String, String> customTags) {
this.time = Objects.requireNonNull(time, "time");
this.engine = Objects.requireNonNull(ksqlEngine, "ksqlEngine");
this.reporter = Objects.requireNonNull(jmxDataPointsReporter, "jmxDataPointsReporter");
this.customTags = Objects.requireNonNull(customTags, "customTags");
}

@Override
public void run() {
final Instant now = time.get();

try {
final int numLeakedTopics = engine.reportNumberOfLeakedTopics();
final int numLeakedStateDirs = engine.reportNumberOfLeakedStateDirs();
final int numLeakedTopicsAfterCleanup = engine.reportNumLeakedTopicsAfterCleanup();
final int numLeakedStateDirsAfterCleanup = engine.reportNumLeakedStateDirsAfterCleanup();
reportLeakedResources(
now,
numLeakedTopics,
numLeakedStateDirs,
numLeakedTopicsAfterCleanup,
numLeakedStateDirsAfterCleanup);
} catch (final RuntimeException e) {
LOGGER.error("Error collecting leaked resources metrics", e);
throw e;
}
}

private void reportLeakedResources(
final Instant now,
final int numLeakedTopics,
final int numLeakedStateDirs,
final int numLeakedTopicsAfterCleanup,
final int numLeakedStateDirsAfterCleanup) {
reportNumLeakedTopics(now, numLeakedTopics);
reportNumLeakedStateDirs(now, numLeakedStateDirs);
reportNumLeakedTopicsAfterCleanup(now, numLeakedTopicsAfterCleanup);
reportNumLeakedStateDirsAfterCleanup(now, numLeakedStateDirsAfterCleanup);
}

private void reportNumLeakedTopics(final Instant now, final int numLeakedTopics) {
LOGGER.info("Reporting number of leaked topics: {}", numLeakedTopics);

reporter.report(
ImmutableList.of(
new MetricsReporter.DataPoint(
now,
"leaked-topics",
numLeakedTopics,
customTags
)
)
);
}

private void reportNumLeakedStateDirs(final Instant now, final int numLeakedStateDirs) {
LOGGER.info("Reporting number of leaked state files: {}", numLeakedStateDirs);

reporter.report(
ImmutableList.of(
new MetricsReporter.DataPoint(
now,
"leaked-state-dirs",
numLeakedStateDirs,
customTags
)
)
);
}

private void reportNumLeakedTopicsAfterCleanup(
final Instant now,
final int numLeakedTopicsAfterCleanup) {
LOGGER.info(
"Reporting number of leaked topics after cleanup: {}",
numLeakedTopicsAfterCleanup);

reporter.report(
ImmutableList.of(
new MetricsReporter.DataPoint(
now,
"leaked-topics-after-cleanup",
numLeakedTopicsAfterCleanup,
customTags
)
)
);
}

private void reportNumLeakedStateDirsAfterCleanup(
final Instant now,
final int numLeakedStateDirsAfterCleanup) {
LOGGER.info(
"Reporting number of leaked state directories after cleanup: {}",
numLeakedStateDirsAfterCleanup);

reporter.report(
ImmutableList.of(
new MetricsReporter.DataPoint(
now,
"leaked-state-dirs-after-cleanup",
numLeakedStateDirsAfterCleanup,
customTags
)
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.confluent.ksql.function.MutableFunctionRegistry;
import io.confluent.ksql.function.UserFunctionLoader;
import io.confluent.ksql.internal.JmxDataPointsReporter;
import io.confluent.ksql.internal.LeakedResourcesMetrics;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.internal.ScalablePushQueryMetrics;
import io.confluent.ksql.internal.StorageUtilizationMetricsReporter;
Expand Down Expand Up @@ -740,6 +741,13 @@ static KsqlRestApplication buildApplication(
.setNameFormat("ksql-csu-metrics-reporter-%d")
.build()
);

final ScheduledExecutorService leakedResourcesReporter =
Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat("ksql-leaked-resources-metrics-reporter-%d")
.build());

final KsqlEngine ksqlEngine = new KsqlEngine(
serviceContext,
processingLogContext,
Expand All @@ -750,7 +758,6 @@ static KsqlRestApplication buildApplication(
Collections.emptyList(),
metricCollectors
);

final PersistentQuerySaturationMetrics saturation = new PersistentQuerySaturationMetrics(
ksqlEngine,
new JmxDataPointsReporter(
Expand All @@ -766,6 +773,26 @@ static KsqlRestApplication buildApplication(
TimeUnit.MILLISECONDS
);

final int transientQueryCleanupServicePeriod =
ksqlConfig.getInt(
KsqlConfig.KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS);
final LeakedResourcesMetrics leaked = new LeakedResourcesMetrics(
ksqlEngine,
new JmxDataPointsReporter(
metricCollectors.getMetrics(),
ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX
+ ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)
+ ".leaked_resources_metrics",
Duration.ofSeconds(transientQueryCleanupServicePeriod)),
ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS)
);
leakedResourcesReporter.scheduleAtFixedRate(
leaked,
0,
transientQueryCleanupServicePeriod,
TimeUnit.SECONDS
);

UserFunctionLoader.newInstance(
ksqlConfig,
functionRegistry,
Expand Down

0 comments on commit 1cb7846

Please sign in to comment.