From e32183fb4d77371ed3f4b610f5abf68d888c65f4 Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Mon, 10 Aug 2020 11:27:04 +0100 Subject: [PATCH] fix: NPE when udf metrics enabled (#5960) * fix: NPE when udf metrics enabled fixes: https://github.com/confluentinc/ksql/issues/5890 Fixes an NPE thrown when the second instance of a UDAF is initialized. The NPE was being thrown due to a bug in the code that failed to initialise the sensor fields in the `UdafAggregateFunction` class if the sensor already existed. The commit also refactors the code used to add function invocation metrics such that all three function types: UDF, UDTF and UDAF, use the same function to register metrics. Unit tests added to ensure this single code path doesn't throw an NPE or return null on the subsequent invocation. Also added `ksql.udf.collect.metrics` metric to the server config file, so users are more likely to be able to find it and try it out. Moved UDTF functions out of the `ksql-udf` group and into `ksql-udtf` group. Having them in the former was another bug. Co-authored-by: Andy Coates --- config/ksql-server.properties | 5 + .../ksql/function/FunctionLoaderUtils.java | 44 ----- .../ksql/function/FunctionMetrics.java | 106 ++++++++++++ .../ksql/function/UdafAggregateFunction.java | 147 +++++----------- .../confluent/ksql/function/UdafLoader.java | 7 +- .../io/confluent/ksql/function/UdfLoader.java | 10 +- .../confluent/ksql/function/UdtfLoader.java | 10 +- .../ksql/function/UserFunctionLoader.java | 5 +- .../ksql/function/FunctionMetricsTest.java | 158 ++++++++++++++++++ 9 files changed, 325 insertions(+), 167 deletions(-) create mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/function/FunctionMetrics.java create mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/function/FunctionMetricsTest.java diff --git a/config/ksql-server.properties b/config/ksql-server.properties index 51cfc494a763..c238389cd1a7 100644 --- a/config/ksql-server.properties +++ b/config/ksql-server.properties @@ -53,6 +53,11 @@ ksql.logging.processing.stream.auto.create=true # processing contains sensitive information. #ksql.logging.processing.rows.include=true +#------- Metrics config -------- + +# Turn on collection of metrics of function invocations: +# ksql.udf.collect.metrics=true + #------ External service config ------- #------ Kafka ------- diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/FunctionLoaderUtils.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/FunctionLoaderUtils.java index bf6d5d56be0e..b6dd6c498219 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/FunctionLoaderUtils.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/FunctionLoaderUtils.java @@ -37,24 +37,15 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.Avg; -import org.apache.kafka.common.metrics.stats.Max; -import org.apache.kafka.common.metrics.stats.Rate; -import org.apache.kafka.common.metrics.stats.WindowedCount; /** * Utility class for loading different types of user defined funcrions */ public final class FunctionLoaderUtils { - private static final String UDF_METRIC_GROUP = "ksql-udf"; - private FunctionLoaderUtils() { } @@ -119,41 +110,6 @@ static Object instantiateFunctionInstance( } } - static void addSensor( - final String sensorName, final String udfName, final Optional theMetrics - ) { - theMetrics.ifPresent(metrics -> { - if (metrics.getSensor(sensorName) == null) { - final Sensor sensor = metrics.sensor(sensorName); - sensor.add( - metrics.metricName(sensorName + "-avg", UDF_METRIC_GROUP, - "Average time for an invocation of " + udfName + " udf" - ), - new Avg() - ); - sensor.add( - metrics.metricName(sensorName + "-max", UDF_METRIC_GROUP, - "Max time for an invocation of " + udfName + " udf" - ), - new Max() - ); - sensor.add( - metrics.metricName(sensorName + "-count", UDF_METRIC_GROUP, - "Total number of invocations of " + udfName + " udf" - ), - new WindowedCount() - ); - sensor.add( - metrics.metricName(sensorName + "-rate", UDF_METRIC_GROUP, - "The average number of occurrence of " + udfName + " operation per second " - + udfName + " udf" - ), - new Rate(TimeUnit.SECONDS, new WindowedCount()) - ); - } - }); - } - static ParamType getReturnType( final Method method, final String annotationSchema, final SqlTypeParser typeParser diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/FunctionMetrics.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/FunctionMetrics.java new file mode 100644 index 000000000000..08ce96c990a8 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/FunctionMetrics.java @@ -0,0 +1,106 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function; + +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.WindowedCount; + +public final class FunctionMetrics { + + static final String AVG_DESC = "Average time for invocations of the %s"; + static final String MAX_DESC = "Max time for invocations of the %s"; + static final String COUNT_DESC = "Total number of invocations of the %s"; + static final String RATE_DESC = "The rate of invocations (invocations per second) of the %s"; + + private FunctionMetrics() { + } + + /** + * Gets an existing invocation sensor, or creates one if needed. + * + *

Sensor created with avg, max, count and rate metrics. + * + * @param metrics the metrics service. + * @param sensorName the name of the sensor + * @param groupName the name of the group + * @param functionDescription the description of the function. + */ + public static void initInvocationSensor( + final Optional metrics, + final String sensorName, + final String groupName, + final String functionDescription + ) { + metrics.ifPresent(m -> getInvocationSensor(m, sensorName, groupName, functionDescription)); + } + + /** + * Gets an existing invocation sensor, or creates one if needed. + * + *

Sensor created with avg, max, count and rate metrics. + * + * @param metrics the metrics service. + * @param sensorName the name of the sensor + * @param groupName the name of the group + * @param functionDescription the description of the function. + */ + public static Sensor getInvocationSensor( + final Metrics metrics, + final String sensorName, + final String groupName, + final String functionDescription + ) { + final Sensor sensor = metrics.sensor(sensorName); + if (sensor.hasMetrics()) { + return sensor; + } + + final BiFunction metricNamer = (suffix, descPattern) -> { + final String description = String.format(descPattern, functionDescription); + return metrics.metricName(sensorName + "-" + suffix, groupName, description); + }; + + sensor.add( + metricNamer.apply("avg", AVG_DESC), + new Avg() + ); + + sensor.add( + metricNamer.apply("max", MAX_DESC), + new Max() + ); + + sensor.add( + metricNamer.apply("count", COUNT_DESC), + new WindowedCount() + ); + + sensor.add( + metricNamer.apply("rate", RATE_DESC), + new Rate(TimeUnit.SECONDS, new WindowedCount()) + ); + + return sensor; + } +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafAggregateFunction.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafAggregateFunction.java index 313a9ae31d1d..1a7167c25f40 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafAggregateFunction.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafAggregateFunction.java @@ -18,16 +18,12 @@ import io.confluent.ksql.function.udaf.Udaf; import io.confluent.ksql.schema.ksql.types.SqlType; import java.util.List; +import java.util.Objects; import java.util.Optional; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.Avg; -import org.apache.kafka.common.metrics.stats.Max; -import org.apache.kafka.common.metrics.stats.Rate; -import org.apache.kafka.common.metrics.stats.WindowedCount; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.streams.kstream.Merger; @@ -48,115 +44,18 @@ protected UdafAggregateFunction( final List arguments, final String description, final Optional metrics, - final String method) { - + final String method + ) { super(functionName, udafIndex, udaf::initialize, aggregateType, outputType, arguments, description); - this.udaf = udaf; - - final String aggSensorName = String.format("aggregate-%s-%s", functionName, method); - final String mapSensorName = String.format("map-%s-%s", functionName, method); - final String mergeSensorName = String.format("merge-%s-%s", functionName, method); + this.udaf = Objects.requireNonNull(udaf, "udaf"); - initMetrics(metrics, functionName, method, aggSensorName, mapSensorName, mergeSensorName); - } - - private void initMetrics( - final Optional maybeMetrics, - final String name, - final String method, - final String aggSensorName, - final String mapSensorName, - final String mergeSensorName) { - if (maybeMetrics.isPresent()) { - final String groupName = String.format("ksql-udaf-%s-%s", name, method); - final Metrics metrics = maybeMetrics.get(); - - if (metrics.getSensor(aggSensorName) == null) { - final Sensor sensor = metrics.sensor(aggSensorName); - sensor.add(metrics.metricName( - aggSensorName + "-avg", - groupName, - String.format("Average time for an aggregate invocation of %s %s udaf", name, method)), - new Avg()); - sensor.add(metrics.metricName( - aggSensorName + "-max", - groupName, - String.format("Max time for an aggregate invocation of %s %s udaf", name, method)), - new Max()); - sensor.add(metrics.metricName( - aggSensorName + "-count", - groupName, - String.format("Total number of aggregate invocations of %s %s udaf", name, method)), - new WindowedCount()); - sensor.add(metrics.metricName( - aggSensorName + "-rate", - groupName, - String.format("The average number of occurrences of aggregate " - + "%s %s operation per second udaf", name, method)), - new Rate(TimeUnit.SECONDS, new WindowedCount())); - this.aggregateSensor = Optional.of(sensor); - } - - if (metrics.getSensor(mapSensorName) == null) { - final Sensor sensor = metrics.sensor(mapSensorName); - sensor.add(metrics.metricName( - mapSensorName + "-avg", - groupName, - String.format("Average time for a map invocation of %s %s udaf", name, method)), - new Avg()); - sensor.add(metrics.metricName( - mapSensorName + "-max", - groupName, - String.format("Max time for a map invocation of %s %s udaf", name, method)), - new Max()); - sensor.add(metrics.metricName( - mapSensorName + "-count", - groupName, - String.format("Total number of map invocations of %s %s udaf", name, method)), - new WindowedCount()); - sensor.add(metrics.metricName( - mapSensorName + "-rate", - groupName, - String.format("The average number of occurrences of map " - + "%s %s operation per second udaf", name, method)), - new Rate(TimeUnit.SECONDS, new WindowedCount())); - this.mapSensor = Optional.of(sensor); - } - - if (metrics.getSensor(mergeSensorName) == null) { - final Sensor sensor = metrics.sensor(mergeSensorName); - sensor.add(metrics.metricName( - mergeSensorName + "-avg", - groupName, - String.format("Average time for a merge invocation of %s %s udaf", name, method)), - new Avg()); - sensor.add(metrics.metricName( - mergeSensorName + "-max", - groupName, - String.format("Max time for a merge invocation of %s %s udaf", name, method)), - new Max()); - sensor.add(metrics.metricName( - mergeSensorName + "-count", - groupName, - String.format("Total number of merge invocations of %s %s udaf", name, method)), - new WindowedCount()); - sensor.add(metrics.metricName( - mergeSensorName + "-rate", - groupName, - String.format( - "The average number of occurrences of merge %s %s operation per second udaf", - name, method)), - new Rate(TimeUnit.SECONDS, new WindowedCount())); - this.mergeSensor = Optional.of(sensor); - } - } else { - this.aggregateSensor = Optional.empty(); - this.mapSensor = Optional.empty(); - this.mergeSensor = Optional.empty(); - } + final String groupName = String.format("ksql-udaf-%s-%s", functionName, method); + this.aggregateSensor = getSensor(metrics, functionName, method, groupName, "aggregate"); + this.mapSensor = getSensor(metrics, functionName, method, groupName, "map"); + this.mergeSensor = getSensor(metrics, functionName, method, groupName, "merge"); } @Override @@ -174,6 +73,36 @@ public Function getResultMapper() { return (v1) -> timed(mapSensor, () -> udaf.map(v1)); } + private static Optional getSensor( + final Optional maybeMetrics, + final String name, + final String method, + final String groupName, + final String step + ) { + if (!maybeMetrics.isPresent()) { + return Optional.empty(); + } + + final Metrics metrics = maybeMetrics.get(); + + final String sensorName = step + "-" + name + "-" + method; + + final Sensor existing = metrics.getSensor(sensorName); + if (existing != null) { + return Optional.of(existing); + } + + final Sensor newSensor = FunctionMetrics.getInvocationSensor( + metrics, + sensorName, + groupName, + name + " " + method + " udaf's " + step + " step" + ); + + return Optional.of(newSensor); + } + private static T timed(final Optional maybeSensor, final Supplier task) { final long start = Time.SYSTEM.nanoseconds(); try { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafLoader.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafLoader.java index 45ce2d418358..5aeae30a4e66 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafLoader.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafLoader.java @@ -24,6 +24,7 @@ import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.Optional; import org.apache.kafka.common.metrics.Metrics; import org.slf4j.Logger; @@ -45,9 +46,9 @@ class UdafLoader { final Optional metrics, final SqlTypeParser typeParser ) { - this.functionRegistry = functionRegistry; - this.metrics = metrics; - this.typeParser = typeParser; + this.functionRegistry = Objects.requireNonNull(functionRegistry, "functionRegistry"); + this.metrics = Objects.requireNonNull(metrics, "metrics"); + this.typeParser = Objects.requireNonNull(typeParser, "typeParser"); } void loadUdafFromClass(final Class theClass, final String path) { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java index c7a3f112e956..440cbcd1d131 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java @@ -28,6 +28,7 @@ import io.confluent.ksql.util.KsqlException; import java.lang.reflect.Method; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.function.Function; import org.apache.kafka.common.Configurable; @@ -54,9 +55,9 @@ public UdfLoader( final SqlTypeParser typeParser, final boolean throwExceptionOnLoadFailure ) { - this.functionRegistry = functionRegistry; - this.metrics = metrics; - this.typeParser = typeParser; + this.functionRegistry = Objects.requireNonNull(functionRegistry, "functionRegistry"); + this.metrics = Objects.requireNonNull(metrics, "metrics"); + this.typeParser = Objects.requireNonNull(typeParser, "typeParser"); this.throwExceptionOnLoadFailure = throwExceptionOnLoadFailure; } @@ -84,7 +85,8 @@ public void loadUdfFromClass( @SuppressWarnings("unchecked") final Class udfClass = metrics .map(m -> (Class) UdfMetricProducer.class) .orElse(PluggableUdf.class); - FunctionLoaderUtils.addSensor(sensorName, functionName, metrics); + + FunctionMetrics.initInvocationSensor(metrics, sensorName, "ksql-udf", functionName + " udf"); final UdfFactory factory = new UdfFactory( udfClass, diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdtfLoader.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdtfLoader.java index 2246b06a62d2..1231346cd0f1 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdtfLoader.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdtfLoader.java @@ -27,6 +27,7 @@ import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.List; +import java.util.Objects; import java.util.Optional; import org.apache.kafka.common.metrics.Metrics; import org.slf4j.Logger; @@ -50,9 +51,9 @@ public UdtfLoader( final SqlTypeParser typeParser, final boolean throwExceptionOnLoadFailure ) { - this.functionRegistry = functionRegistry; - this.metrics = metrics; - this.typeParser = typeParser; + this.functionRegistry = Objects.requireNonNull(functionRegistry, "functionRegistry"); + this.metrics = Objects.requireNonNull(metrics, "metrics"); + this.typeParser = Objects.requireNonNull(typeParser, "typeParser"); this.throwExceptionOnLoadFailure = throwExceptionOnLoadFailure; } @@ -67,7 +68,8 @@ public void loadUdtfFromClass( } final String functionName = udtfDescriptionAnnotation.name(); final String sensorName = "ksql-udtf-" + functionName; - FunctionLoaderUtils.addSensor(sensorName, functionName, metrics); + + FunctionMetrics.initInvocationSensor(metrics, sensorName, "ksql-udtf", functionName + " udtf"); final UdfMetadata metadata = new UdfMetadata( udtfDescriptionAnnotation.name(), diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UserFunctionLoader.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UserFunctionLoader.java index 95c3995ce3e6..23d03b90c3a5 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UserFunctionLoader.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UserFunctionLoader.java @@ -71,7 +71,6 @@ public UserFunctionLoader( "parentClassLoader can't be null" ); this.blacklist = Objects.requireNonNull(blacklist, "blacklist can't be null"); - Objects.requireNonNull(metrics, "metrics can't be null"); this.loadCustomerUdfs = loadCustomerUdfs; final SqlTypeParser typeParser = SqlTypeParser.create(TypeRegistry.EMPTY); this.udfLoader = new UdfLoader(functionRegistry, metrics, typeParser, false); @@ -143,8 +142,8 @@ public static UserFunctionLoader newInstance( final MutableFunctionRegistry metaStore, final String ksqlInstallDir ) { - final Boolean loadCustomerUdfs = config.getBoolean(KsqlConfig.KSQL_ENABLE_UDFS); - final Boolean collectMetrics = config.getBoolean(KsqlConfig.KSQL_COLLECT_UDF_METRICS); + final boolean loadCustomerUdfs = config.getBoolean(KsqlConfig.KSQL_ENABLE_UDFS); + final boolean collectMetrics = config.getBoolean(KsqlConfig.KSQL_COLLECT_UDF_METRICS); final String extDirName = config.getString(KsqlConfig.KSQL_EXT_DIR); final File pluginDir = KsqlConfig.DEFAULT_EXT_DIR.equals(extDirName) ? new File(ksqlInstallDir, extDirName) diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/FunctionMetricsTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/FunctionMetricsTest.java new file mode 100644 index 000000000000..20fd5f0f2e8a --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/FunctionMetricsTest.java @@ -0,0 +1,158 @@ +package io.confluent.ksql.function; + +import static io.confluent.ksql.function.FunctionMetrics.AVG_DESC; +import static io.confluent.ksql.function.FunctionMetrics.COUNT_DESC; +import static io.confluent.ksql.function.FunctionMetrics.MAX_DESC; +import static io.confluent.ksql.function.FunctionMetrics.RATE_DESC; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.WindowedCount; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class FunctionMetricsTest { + + private static final String SENSOR_NAME = "sensorName"; + private static final String GROUP_NAME = "groupName"; + private static final String FUNC_NAME = "Func name"; + + @Mock + private Metrics metrics; + @Mock + private Sensor sensor; + @Mock + private MetricName metricName; + @Mock + private MetricName specificMetricName; + + @Before + public void setup() { + when(metrics.sensor(any())).thenReturn(sensor); + + when(metrics.metricName(any(String.class), any(String.class), any(String.class))) + .thenReturn(metricName); + } + + @Test + public void shouldGetSensorWithCorrectName() { + // When: + FunctionMetrics + .getInvocationSensor(metrics, SENSOR_NAME, GROUP_NAME, FUNC_NAME); + + // Then: + verify(metrics).sensor(SENSOR_NAME); + } + + @Test + public void shouldReturnSensorOnFirstCall() { + // When: + final Sensor result = FunctionMetrics + .getInvocationSensor(metrics, SENSOR_NAME, GROUP_NAME, FUNC_NAME); + + // Then: + assertThat(result, is(sensor)); + } + + @Test + public void shouldReturnSensorOnSubsequentCalls() { + // Given: + when(sensor.hasMetrics()).thenReturn(true); + + // When: + final Sensor result = FunctionMetrics + .getInvocationSensor(metrics, SENSOR_NAME, GROUP_NAME, FUNC_NAME); + + // Then: + assertThat(result, is(sensor)); + } + + @Test + public void shouldRegisterAvgMetric() { + // Given: + when(metrics.metricName(SENSOR_NAME + "-avg", GROUP_NAME, description(AVG_DESC))) + .thenReturn(specificMetricName); + + // When: + FunctionMetrics + .getInvocationSensor(metrics, SENSOR_NAME, GROUP_NAME, FUNC_NAME); + + // Then: + verify(sensor).add(eq(specificMetricName), isA(Avg.class)); + } + + @Test + public void shouldRegisterMaxMetric() { + // Given: + when(metrics.metricName(SENSOR_NAME + "-max", GROUP_NAME, description(MAX_DESC))) + .thenReturn(specificMetricName); + + // When: + FunctionMetrics + .getInvocationSensor(metrics, SENSOR_NAME, GROUP_NAME, FUNC_NAME); + + // Then: + verify(sensor).add(eq(specificMetricName), isA(Max.class)); + } + + @Test + public void shouldRegisterCountMetric() { + // Given: + when(metrics.metricName(SENSOR_NAME + "-count", GROUP_NAME, description(COUNT_DESC))) + .thenReturn(specificMetricName); + + // When: + FunctionMetrics + .getInvocationSensor(metrics, SENSOR_NAME, GROUP_NAME, FUNC_NAME); + + // Then: + verify(sensor).add(eq(specificMetricName), isA(WindowedCount.class)); + } + + @Test + public void shouldRegisterRateMetric() { + // Given: + when(metrics.metricName(SENSOR_NAME + "-rate", GROUP_NAME, description(RATE_DESC))) + .thenReturn(specificMetricName); + + // When: + FunctionMetrics + .getInvocationSensor(metrics, SENSOR_NAME, GROUP_NAME, FUNC_NAME); + + // Then: + verify(sensor).add(eq(specificMetricName), isA(Rate.class)); + } + + @Test + public void shouldNotInitializeOnSubsequentCalls() { + // Given: + when(sensor.hasMetrics()).thenReturn(true); + + // When: + FunctionMetrics + .getInvocationSensor(metrics, SENSOR_NAME, GROUP_NAME, FUNC_NAME); + + // Then: + verify(sensor, never()).add(any(MetricName.class), any()); + } + + private static String description(final String formatString) { + return String.format(formatString, FUNC_NAME); + } +} \ No newline at end of file