Skip to content

Commit

Permalink
fix: NPE when udf metrics enabled (#5960)
Browse files Browse the repository at this point in the history
* fix: NPE when udf metrics enabled

fixes: #5890

Fixes an NPE thrown when the second instance of a UDAF is initialized. The NPE was being thrown due to a bug in the code that failed to initialise the sensor fields in the `UdafAggregateFunction` class if the sensor already existed.

The commit also refactors the code used to add function invocation metrics such that all three function types: UDF, UDTF and UDAF, use the same function to register metrics.

Unit tests added to ensure this single code path doesn't throw an NPE or return null on the subsequent invocation.

Also added `ksql.udf.collect.metrics` metric to the server config file, so users are more likely to be able to find it and try it out.

Moved UDTF functions out of the `ksql-udf` group and into `ksql-udtf` group. Having them in the former was another bug.

Co-authored-by: Andy Coates <[email protected]>
  • Loading branch information
big-andy-coates and big-andy-coates authored Aug 10, 2020
1 parent ce2b6e4 commit e32183f
Show file tree
Hide file tree
Showing 9 changed files with 325 additions and 167 deletions.
5 changes: 5 additions & 0 deletions config/ksql-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ ksql.logging.processing.stream.auto.create=true
# processing contains sensitive information.
#ksql.logging.processing.rows.include=true

#------- Metrics config --------

# Turn on collection of metrics of function invocations:
# ksql.udf.collect.metrics=true

#------ External service config -------

#------ Kafka -------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,15 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.WindowedCount;

/**
* Utility class for loading different types of user defined funcrions
*/
public final class FunctionLoaderUtils {

private static final String UDF_METRIC_GROUP = "ksql-udf";

private FunctionLoaderUtils() {
}

Expand Down Expand Up @@ -119,41 +110,6 @@ static Object instantiateFunctionInstance(
}
}

static void addSensor(
final String sensorName, final String udfName, final Optional<Metrics> theMetrics
) {
theMetrics.ifPresent(metrics -> {
if (metrics.getSensor(sensorName) == null) {
final Sensor sensor = metrics.sensor(sensorName);
sensor.add(
metrics.metricName(sensorName + "-avg", UDF_METRIC_GROUP,
"Average time for an invocation of " + udfName + " udf"
),
new Avg()
);
sensor.add(
metrics.metricName(sensorName + "-max", UDF_METRIC_GROUP,
"Max time for an invocation of " + udfName + " udf"
),
new Max()
);
sensor.add(
metrics.metricName(sensorName + "-count", UDF_METRIC_GROUP,
"Total number of invocations of " + udfName + " udf"
),
new WindowedCount()
);
sensor.add(
metrics.metricName(sensorName + "-rate", UDF_METRIC_GROUP,
"The average number of occurrence of " + udfName + " operation per second "
+ udfName + " udf"
),
new Rate(TimeUnit.SECONDS, new WindowedCount())
);
}
});
}

static ParamType getReturnType(
final Method method, final String annotationSchema,
final SqlTypeParser typeParser
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function;

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.WindowedCount;

public final class FunctionMetrics {

static final String AVG_DESC = "Average time for invocations of the %s";
static final String MAX_DESC = "Max time for invocations of the %s";
static final String COUNT_DESC = "Total number of invocations of the %s";
static final String RATE_DESC = "The rate of invocations (invocations per second) of the %s";

private FunctionMetrics() {
}

/**
* Gets an existing invocation sensor, or creates one if needed.
*
* <p>Sensor created with avg, max, count and rate metrics.
*
* @param metrics the metrics service.
* @param sensorName the name of the sensor
* @param groupName the name of the group
* @param functionDescription the description of the function.
*/
public static void initInvocationSensor(
final Optional<Metrics> metrics,
final String sensorName,
final String groupName,
final String functionDescription
) {
metrics.ifPresent(m -> getInvocationSensor(m, sensorName, groupName, functionDescription));
}

/**
* Gets an existing invocation sensor, or creates one if needed.
*
* <p>Sensor created with avg, max, count and rate metrics.
*
* @param metrics the metrics service.
* @param sensorName the name of the sensor
* @param groupName the name of the group
* @param functionDescription the description of the function.
*/
public static Sensor getInvocationSensor(
final Metrics metrics,
final String sensorName,
final String groupName,
final String functionDescription
) {
final Sensor sensor = metrics.sensor(sensorName);
if (sensor.hasMetrics()) {
return sensor;
}

final BiFunction<String, String, MetricName> metricNamer = (suffix, descPattern) -> {
final String description = String.format(descPattern, functionDescription);
return metrics.metricName(sensorName + "-" + suffix, groupName, description);
};

sensor.add(
metricNamer.apply("avg", AVG_DESC),
new Avg()
);

sensor.add(
metricNamer.apply("max", MAX_DESC),
new Max()
);

sensor.add(
metricNamer.apply("count", COUNT_DESC),
new WindowedCount()
);

sensor.add(
metricNamer.apply("rate", RATE_DESC),
new Rate(TimeUnit.SECONDS, new WindowedCount())
);

return sensor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,12 @@
import io.confluent.ksql.function.udaf.Udaf;
import io.confluent.ksql.schema.ksql.types.SqlType;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.Merger;
Expand All @@ -48,115 +44,18 @@ protected UdafAggregateFunction(
final List<ParameterInfo> arguments,
final String description,
final Optional<Metrics> metrics,
final String method) {

final String method
) {
super(functionName, udafIndex, udaf::initialize, aggregateType,
outputType, arguments, description);

this.udaf = udaf;

final String aggSensorName = String.format("aggregate-%s-%s", functionName, method);
final String mapSensorName = String.format("map-%s-%s", functionName, method);
final String mergeSensorName = String.format("merge-%s-%s", functionName, method);
this.udaf = Objects.requireNonNull(udaf, "udaf");

initMetrics(metrics, functionName, method, aggSensorName, mapSensorName, mergeSensorName);
}

private void initMetrics(
final Optional<Metrics> maybeMetrics,
final String name,
final String method,
final String aggSensorName,
final String mapSensorName,
final String mergeSensorName) {
if (maybeMetrics.isPresent()) {
final String groupName = String.format("ksql-udaf-%s-%s", name, method);
final Metrics metrics = maybeMetrics.get();

if (metrics.getSensor(aggSensorName) == null) {
final Sensor sensor = metrics.sensor(aggSensorName);
sensor.add(metrics.metricName(
aggSensorName + "-avg",
groupName,
String.format("Average time for an aggregate invocation of %s %s udaf", name, method)),
new Avg());
sensor.add(metrics.metricName(
aggSensorName + "-max",
groupName,
String.format("Max time for an aggregate invocation of %s %s udaf", name, method)),
new Max());
sensor.add(metrics.metricName(
aggSensorName + "-count",
groupName,
String.format("Total number of aggregate invocations of %s %s udaf", name, method)),
new WindowedCount());
sensor.add(metrics.metricName(
aggSensorName + "-rate",
groupName,
String.format("The average number of occurrences of aggregate "
+ "%s %s operation per second udaf", name, method)),
new Rate(TimeUnit.SECONDS, new WindowedCount()));
this.aggregateSensor = Optional.of(sensor);
}

if (metrics.getSensor(mapSensorName) == null) {
final Sensor sensor = metrics.sensor(mapSensorName);
sensor.add(metrics.metricName(
mapSensorName + "-avg",
groupName,
String.format("Average time for a map invocation of %s %s udaf", name, method)),
new Avg());
sensor.add(metrics.metricName(
mapSensorName + "-max",
groupName,
String.format("Max time for a map invocation of %s %s udaf", name, method)),
new Max());
sensor.add(metrics.metricName(
mapSensorName + "-count",
groupName,
String.format("Total number of map invocations of %s %s udaf", name, method)),
new WindowedCount());
sensor.add(metrics.metricName(
mapSensorName + "-rate",
groupName,
String.format("The average number of occurrences of map "
+ "%s %s operation per second udaf", name, method)),
new Rate(TimeUnit.SECONDS, new WindowedCount()));
this.mapSensor = Optional.of(sensor);
}

if (metrics.getSensor(mergeSensorName) == null) {
final Sensor sensor = metrics.sensor(mergeSensorName);
sensor.add(metrics.metricName(
mergeSensorName + "-avg",
groupName,
String.format("Average time for a merge invocation of %s %s udaf", name, method)),
new Avg());
sensor.add(metrics.metricName(
mergeSensorName + "-max",
groupName,
String.format("Max time for a merge invocation of %s %s udaf", name, method)),
new Max());
sensor.add(metrics.metricName(
mergeSensorName + "-count",
groupName,
String.format("Total number of merge invocations of %s %s udaf", name, method)),
new WindowedCount());
sensor.add(metrics.metricName(
mergeSensorName + "-rate",
groupName,
String.format(
"The average number of occurrences of merge %s %s operation per second udaf",
name, method)),
new Rate(TimeUnit.SECONDS, new WindowedCount()));
this.mergeSensor = Optional.of(sensor);
}
} else {
this.aggregateSensor = Optional.empty();
this.mapSensor = Optional.empty();
this.mergeSensor = Optional.empty();
}
final String groupName = String.format("ksql-udaf-%s-%s", functionName, method);

this.aggregateSensor = getSensor(metrics, functionName, method, groupName, "aggregate");
this.mapSensor = getSensor(metrics, functionName, method, groupName, "map");
this.mergeSensor = getSensor(metrics, functionName, method, groupName, "merge");
}

@Override
Expand All @@ -174,6 +73,36 @@ public Function<A, O> getResultMapper() {
return (v1) -> timed(mapSensor, () -> udaf.map(v1));
}

private static Optional<Sensor> getSensor(
final Optional<Metrics> maybeMetrics,
final String name,
final String method,
final String groupName,
final String step
) {
if (!maybeMetrics.isPresent()) {
return Optional.empty();
}

final Metrics metrics = maybeMetrics.get();

final String sensorName = step + "-" + name + "-" + method;

final Sensor existing = metrics.getSensor(sensorName);
if (existing != null) {
return Optional.of(existing);
}

final Sensor newSensor = FunctionMetrics.getInvocationSensor(
metrics,
sensorName,
groupName,
name + " " + method + " udaf's " + step + " step"
);

return Optional.of(newSensor);
}

private static <T> T timed(final Optional<Sensor> maybeSensor, final Supplier<T> task) {
final long start = Time.SYSTEM.nanoseconds();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.common.metrics.Metrics;
import org.slf4j.Logger;
Expand All @@ -45,9 +46,9 @@ class UdafLoader {
final Optional<Metrics> metrics,
final SqlTypeParser typeParser
) {
this.functionRegistry = functionRegistry;
this.metrics = metrics;
this.typeParser = typeParser;
this.functionRegistry = Objects.requireNonNull(functionRegistry, "functionRegistry");
this.metrics = Objects.requireNonNull(metrics, "metrics");
this.typeParser = Objects.requireNonNull(typeParser, "typeParser");
}

void loadUdafFromClass(final Class<?> theClass, final String path) {
Expand Down
Loading

0 comments on commit e32183f

Please sign in to comment.