From a0c811ec03d51c3b184c28f6f58b79466061ae71 Mon Sep 17 00:00:00 2001 From: Jeyhun Karimov Date: Wed, 13 Mar 2024 22:20:32 +0100 Subject: [PATCH] [FLINK-34640][metrics] Replace DummyMetricGroup usage with UnregisteredMetricsGroup --- .../eventtime/WatermarkStrategyTest.java | 74 +------------------ .../CheckpointCoordinatorMasterHooksTest.java | 4 +- .../CheckpointCoordinatorTestingUtils.java | 4 +- 3 files changed, 7 insertions(+), 75 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java index b76252e2e2d73..2ce7b88d3f60d 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java @@ -20,18 +20,12 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.ClosureCleaner; -import org.apache.flink.metrics.CharacterFilter; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.Histogram; -import org.apache.flink.metrics.Meter; -import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.junit.jupiter.api.Test; import java.io.Serializable; import java.time.Duration; -import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; @@ -173,72 +167,10 @@ public long extractTimestamp(Object element, long recordTimestamp) { } static TimestampAssignerSupplier.Context assignerContext() { - return DummyMetricGroup::new; + return UnregisteredMetricsGroup::new; } static WatermarkGeneratorSupplier.Context generatorContext() { - return DummyMetricGroup::new; - } - - /** - * A dummy {@link MetricGroup} to be used when a group is required as an argument but not - * actually used. - */ - public static class DummyMetricGroup implements MetricGroup { - - @Override - public Counter counter(String name) { - return null; - } - - @Override - public C counter(String name, C counter) { - return null; - } - - @Override - public > G gauge(String name, G gauge) { - return null; - } - - @Override - public H histogram(String name, H histogram) { - return null; - } - - @Override - public M meter(String name, M meter) { - return null; - } - - @Override - public MetricGroup addGroup(String name) { - return null; - } - - @Override - public MetricGroup addGroup(String key, String value) { - return null; - } - - @Override - public String[] getScopeComponents() { - return new String[0]; - } - - @Override - public Map getAllVariables() { - return null; - } - - @Override - public String getMetricIdentifier(String metricName) { - return null; - } - - @Override - public String getMetricIdentifier(String metricName, CharacterFilter filter) { - return null; - } + return UnregisteredMetricsGroup::new; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java index 75e637dd07835..a7332d0719486 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.eventtime.WatermarkStrategyTest.DummyMetricGroup; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraphCheckpointPlanCalculatorContext; @@ -500,7 +500,7 @@ private CheckpointCoordinator instantiateCheckpointCoordinator( new ExecutionGraphCheckpointPlanCalculatorContext(graph), graph.getVerticesTopologically(), false), - new CheckpointStatsTracker(1, new DummyMetricGroup(), new JobID())); + new CheckpointStatsTracker(1, new UnregisteredMetricsGroup(), new JobID())); } private static T mockGeneric(Class clazz) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java index aac4ab51b362a..829445287d92e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java @@ -19,11 +19,11 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.eventtime.WatermarkStrategyTest.DummyMetricGroup; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; @@ -781,7 +781,7 @@ public static class CheckpointCoordinatorBuilder { private boolean allowCheckpointsAfterTasksFinished; private CheckpointStatsTracker checkpointStatsTracker = - new CheckpointStatsTracker(1, new DummyMetricGroup(), new JobID()); + new CheckpointStatsTracker(1, new UnregisteredMetricsGroup(), new JobID()); private BiFunction< Set,