Skip to content

Commit

Permalink
feat: add thread pool monitor
Browse files Browse the repository at this point in the history
Cache thread-pools used to run async cache tasks need to be monitor to observe scenarios where tasks start to be queued.
  • Loading branch information
jeqo committed Sep 18, 2024
1 parent 29435d1 commit e54d401
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -35,6 +35,7 @@
import io.aiven.kafka.tieredstorage.fetch.ChunkManager;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter;
import io.aiven.kafka.tieredstorage.metrics.ThreadPoolMonitor;
import io.aiven.kafka.tieredstorage.storage.BytesRange;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
Expand All @@ -47,9 +48,10 @@

public abstract class ChunkCache<T> implements ChunkManager, Configurable {
private static final String METRIC_GROUP = "chunk-cache-metrics";
private static final String THREAD_POOL_METRIC_GROUP = "chunk-cache-thread-pool-metrics";

private final ChunkManager chunkManager;
private Executor executor;
private ExecutorService executor;

final CaffeineStatsCounter statsCounter;

Expand Down Expand Up @@ -136,8 +138,10 @@ public InputStream getChunk(final ObjectKey objectKey,

protected AsyncCache<ChunkKey, T> buildCache(final ChunkCacheConfig config) {
this.executor = config.threadPoolSize().map(ForkJoinPool::new).orElse(new ForkJoinPool());
new ThreadPoolMonitor(THREAD_POOL_METRIC_GROUP, this.executor);
this.getTimeout = config.getTimeout();
this.prefetchingSize = config.cachePrefetchingSize();

final Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
config.cacheSize().ifPresent(maximumWeight -> cacheBuilder.maximumWeight(maximumWeight).weigher(weigher()));
config.cacheRetention().ifPresent(cacheBuilder::expireAfterAccess);
Expand All @@ -146,7 +150,9 @@ protected AsyncCache<ChunkKey, T> buildCache(final ChunkCacheConfig config) {
.executor(executor)
.recordStats(() -> statsCounter)
.buildAsync();

statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize);

return cache;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -33,6 +33,7 @@

import io.aiven.kafka.tieredstorage.config.CacheConfig;
import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter;
import io.aiven.kafka.tieredstorage.metrics.ThreadPoolMonitor;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

Expand All @@ -49,10 +50,11 @@ public class MemorySegmentIndexesCache implements SegmentIndexesCache {

private static final long DEFAULT_MAX_SIZE_BYTES = 10 * 1024 * 1024;
private static final String METRIC_GROUP = "segment-indexes-cache";
private static final String THREAD_POOL_METRIC_GROUP = "segment-indexes-thread-pool-cache";

private final CaffeineStatsCounter statsCounter = new CaffeineStatsCounter(METRIC_GROUP);

private Executor executor;
private ExecutorService executor;
protected AsyncCache<SegmentIndexKey, byte[]> cache;
private Duration getTimeout;

Expand All @@ -68,7 +70,9 @@ private static Weigher<SegmentIndexKey, byte[]> weigher() {

protected AsyncCache<SegmentIndexKey, byte[]> buildCache(final CacheConfig config) {
this.executor = config.threadPoolSize().map(ForkJoinPool::new).orElse(new ForkJoinPool());
new ThreadPoolMonitor(THREAD_POOL_METRIC_GROUP, executor);
this.getTimeout = config.getTimeout();

final Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
config.cacheSize().ifPresent(maximumWeight -> cacheBuilder.maximumWeight(maximumWeight).weigher(weigher()));
config.cacheRetention().ifPresent(cacheBuilder::expireAfterAccess);
Expand All @@ -77,7 +81,9 @@ protected AsyncCache<SegmentIndexKey, byte[]> buildCache(final CacheConfig confi
.executor(executor)
.recordStats(() -> statsCounter)
.buildAsync();

statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize);

return cache;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.aiven.kafka.tieredstorage.config.CacheConfig;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter;
import io.aiven.kafka.tieredstorage.metrics.ThreadPoolMonitor;
import io.aiven.kafka.tieredstorage.storage.ObjectFetcher;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
Expand All @@ -44,6 +45,7 @@
public class MemorySegmentManifestCache implements SegmentManifestCache {
private static final Logger log = LoggerFactory.getLogger(MemorySegmentManifestCache.class);
private static final String METRIC_GROUP = "segment-manifest-cache-metrics";
private static final String THREAD_POOL_METRIC_GROUP = "segment-manifest-cache-thread-pool-metrics";
private static final long DEFAULT_MAX_SIZE = 1000L;
private static final long DEFAULT_RETENTION_MS = 3_600_000;

Expand Down Expand Up @@ -97,7 +99,9 @@ private static Weigher<ObjectKey, SegmentManifest> weigher() {

protected AsyncLoadingCache<ObjectKey, SegmentManifest> buildCache(final CacheConfig config) {
final var executor = config.threadPoolSize().map(ForkJoinPool::new).orElse(new ForkJoinPool());
new ThreadPoolMonitor(THREAD_POOL_METRIC_GROUP, executor);
getTimeout = config.getTimeout();

final Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
config.cacheSize().ifPresent(maximumWeight -> cacheBuilder.maximumWeight(maximumWeight).weigher(weigher()));
config.cacheRetention().ifPresent(cacheBuilder::expireAfterAccess);
Expand All @@ -110,7 +114,9 @@ protected AsyncLoadingCache<ObjectKey, SegmentManifest> buildCache(final CacheCo
return mapper.readValue(is, SegmentManifest.class);
}
});

statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize);

return cache;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Time;

import com.github.benmanes.caffeine.cache.RemovalCause;
Expand Down Expand Up @@ -227,21 +225,4 @@ private static long negativeToMaxValue(final long value) {
return (value >= 0) ? value : Long.MAX_VALUE;
}

/**
* Implementation of {@link Value} that allows fetching a value from provided {@code Long} {@link Supplier}
* to avoid unnecessary calls to {@link Sensor#record()} that under the hood has a synchronized block and affects
* performance because of that.
*/
private static class MeasurableValue extends Value {
private final Supplier<Long> value;

MeasurableValue(final Supplier<Long> value) {
this.value = value;
}

@Override
public double measure(final MetricConfig config, final long now) {
return value.get();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.metrics;

import java.util.function.Supplier;

import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Value;

/**
* Implementation of {@link Value} that allows fetching a value from provided {@code Long} {@link Supplier}
* to avoid unnecessary calls to {@link Sensor#record()} that under the hood has a synchronized block and affects
* performance because of that.
*/
class MeasurableValue extends Value {
private final Supplier<Long> value;

MeasurableValue(final Supplier<Long> value) {
this.value = value;
}

@Override
public double measure(final MetricConfig config, final long now) {
return value.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.metrics;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;

import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;

public class ThreadPoolMonitor {
// only fork-join pool is supported; but could be extended to other fixed-sized pools
final ForkJoinPool pool;
private final Metrics metrics;
String groupName;

private static final String ACTIVE_THREADS = "active-thread-count";
private static final String ACTIVE_THREADS_TOTAL = ACTIVE_THREADS + "-total";
private static final String RUNNING_THREADS = "running-thread-count";
private static final String RUNNING_THREADS_TOTAL = RUNNING_THREADS + "-total";
private static final String POOL_SIZE = "pool-size";
private static final String POOL_SIZE_TOTAL = POOL_SIZE + "-total";
private static final String PARALLELISM = "parallelism";
private static final String PARALLELISM_TOTAL = PARALLELISM + "-total";
private static final String QUEUED_TASK_COUNT = "queued-task-count";
private static final String QUEUED_TASK_COUNT_TOTAL = QUEUED_TASK_COUNT + "-total";
private static final String STEAL_TASK_COUNT = "steal-task-count";
private static final String STEAL_TASK_COUNT_TOTAL = STEAL_TASK_COUNT + "-total";

public ThreadPoolMonitor(final String groupName, final ExecutorService pool) {
this.groupName = groupName;
if (!(pool instanceof ForkJoinPool)) {
throw new UnsupportedOperationException("Only ForkJoinPool supported at the moment.");
}
this.pool = (ForkJoinPool) pool;

final JmxReporter reporter = new JmxReporter();
metrics = new org.apache.kafka.common.metrics.Metrics(
new MetricConfig(), List.of(reporter), Time.SYSTEM,
new KafkaMetricsContext("aiven.kafka.server.tieredstorage.thread-pool")
);

registerSensor(ACTIVE_THREADS_TOTAL, ACTIVE_THREADS, this::activeThreadCount);
registerSensor(RUNNING_THREADS_TOTAL, RUNNING_THREADS, this::runningThreadCount);
registerSensor(POOL_SIZE_TOTAL, POOL_SIZE, this::poolSize);
registerSensor(PARALLELISM_TOTAL, PARALLELISM, this::parallelism);
registerSensor(QUEUED_TASK_COUNT_TOTAL, QUEUED_TASK_COUNT, this::queuedTaskCount);
registerSensor(STEAL_TASK_COUNT_TOTAL, STEAL_TASK_COUNT, this::stealTaskCount);
}

void registerSensor(final String metricName, final String sensorName, final Supplier<Long> supplier) {
final var name = new MetricNameTemplate(metricName, groupName, "");
new SensorProvider(metrics, sensorName)
.with(name, new MeasurableValue(supplier))
.get();
}

long activeThreadCount() {
return pool.getActiveThreadCount();
}

long runningThreadCount() {
return pool.getRunningThreadCount();
}

long poolSize() {
return pool.getPoolSize();
}

long parallelism() {
return pool.getParallelism();
}

long queuedTaskCount() {
return pool.getQueuedTaskCount();
}

long stealTaskCount() {
return pool.getStealCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.ByteArrayInputStream;
import java.lang.management.ManagementFactory;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Map;
import java.util.stream.Stream;

Expand All @@ -42,6 +43,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.InstanceOfAssertFactories.DOUBLE;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -71,14 +73,16 @@ private static Stream<Arguments> caches() {
Map.of(
"retention.ms", "-1",
"size", "-1",
"path", baseCachePath.toString()
"path", baseCachePath.toString(),
"fetch.thread.pool.size", 4
)
),
Arguments.of(
MemoryChunkCache.class,
Map.of(
"retention.ms", "-1",
"size", "-1"
"size", "-1",
"fetch.thread.pool.size", 4
)
));
}
Expand Down Expand Up @@ -132,5 +136,22 @@ void shouldRecordMetrics(final Class<ChunkCache<?>> chunkCacheClass, final Map<S
.isEqualTo(0.0);
assertThat(MBEAN_SERVER.getAttribute(objectName, "cache-eviction-weight-total"))
.isEqualTo(0.0);

final var threadPoolObjectName =
new ObjectName("aiven.kafka.server.tieredstorage.thread-pool:type=chunk-cache-thread-pool-metrics");

// The following assertions are relaxed, just to show that metrics are collected
assertThat(MBEAN_SERVER.getAttribute(threadPoolObjectName, "parallelism-total"))
.isEqualTo(4.0);
// approximation to completed tasks
assertThat(MBEAN_SERVER.getAttribute(threadPoolObjectName, "steal-task-count-total"))
.asInstanceOf(DOUBLE)
.isGreaterThanOrEqualTo(0.0);
// wait for thread-pool to drain queued tasks
await()
.atMost(Duration.ofSeconds(5))
.untilAsserted(() ->
assertThat(MBEAN_SERVER.getAttribute(threadPoolObjectName, "queued-task-count-total"))
.isEqualTo(0.0));
}
}
Loading

0 comments on commit e54d401

Please sign in to comment.