From bd4bbbb1b25e9ab99d677cb5d2cf7a67285372eb Mon Sep 17 00:00:00 2001 From: Jaime Fullaondo Date: Wed, 2 Dec 2020 18:06:49 +0100 Subject: [PATCH] [aggregator] fix thread leak + dont always start scheduler (#129) * [aggregator] fix thread leak + dont always start scheduler * [aggregator] remove commented lines * [circleci] tie cache to java version --- .circleci/config.yml | 19 ++++++++++++++++--- .../com/timgroup/statsd/StatsDAggregator.java | 9 ++++++--- .../com/timgroup/statsd/StatsDProcessor.java | 1 + 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 83dd06b6..86d8857e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1,22 +1,35 @@ version: 2.1 +commands: + create_custom_cache_lock: + description: "Create custom cache lock for java version." + parameters: + filename: + type: string + steps: + - run: + name: Grab java version and dump to file + command: java -version > << parameters.filename >> + default_steps: &default_steps steps: - checkout + - create_custom_cache_lock: + filename: java-version-lock.txt # Download and cache dependencies - restore_cache: keys: - - -v5-dependencies-{{ checksum "pom.xml" }}-{{ arch }} + - -v6-dependencies-{{ checksum "pom.xml" }}-{{ checksum "java-version-lock.txt" }}-{{ arch }} # fallback to using the latest cache if no exact match is found - - -v5-dependencies- + - -v6-dependencies- - run: mvn dependency:go-offline - save_cache: paths: - ~/.m2 - key: -v5-dependencies-{{ checksum "pom.xml" }}-{{ arch }} + key: -v6-dependencies-{{ checksum "pom.xml" }}-{{ checksum "java-version-lock.txt" }}-{{ arch }} - run: | mvn clean install -Dgpg.skip $MVN_EXTRA_OPTS diff --git a/src/main/java/com/timgroup/statsd/StatsDAggregator.java b/src/main/java/com/timgroup/statsd/StatsDAggregator.java index 39345d56..3a1aa5d1 100644 --- a/src/main/java/com/timgroup/statsd/StatsDAggregator.java +++ b/src/main/java/com/timgroup/statsd/StatsDAggregator.java @@ -23,14 +23,13 @@ public class StatsDAggregator { Arrays.asList(Message.Type.COUNT, Message.Type.GAUGE, Message.Type.SET)); protected final ArrayList> aggregateMetrics; - // protected final Map aggregateMetrics = new HashMap<>(); - protected final Timer scheduler = new Timer(AGGREGATOR_THREAD_NAME, true); - protected final int shardGranularity; protected final long flushInterval; private final StatsDProcessor processor; + protected Timer scheduler = null; + private Telemetry telemetry; private class FlushTask extends TimerTask { @@ -54,6 +53,10 @@ public StatsDAggregator(final StatsDProcessor processor, final int shards, final this.shardGranularity = shards; this.aggregateMetrics = new ArrayList<>(shards); + if (flushInterval > 0) { + this.scheduler = new Timer(AGGREGATOR_THREAD_NAME, true); + } + for (int i = 0 ; i < this.shardGranularity ; i++) { this.aggregateMetrics.add(i, new HashMap()); } diff --git a/src/main/java/com/timgroup/statsd/StatsDProcessor.java b/src/main/java/com/timgroup/statsd/StatsDProcessor.java index 9617ca98..2b2bace6 100644 --- a/src/main/java/com/timgroup/statsd/StatsDProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDProcessor.java @@ -182,6 +182,7 @@ boolean isShutdown() { void shutdown() { shutdown = true; + aggregator.stop(); executor.shutdown(); } }