diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PrometheusReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PrometheusReporter.java index 1394e6626268..34fd7a07f653 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PrometheusReporter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PrometheusReporter.java @@ -18,42 +18,76 @@ package org.apache.hudi.metrics.prometheus; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.metrics.MetricsReporter; import com.codahale.metrics.MetricRegistry; +import io.prometheus.client.Collector; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.dropwizard.DropwizardExports; +import io.prometheus.client.dropwizard.samplebuilder.DefaultSampleBuilder; +import io.prometheus.client.dropwizard.samplebuilder.SampleBuilder; import io.prometheus.client.exporter.HTTPServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; /** * Implementation of Prometheus reporter, which connects to the Http server, and get metrics * from that server. */ public class PrometheusReporter extends MetricsReporter { + private static final Pattern LABEL_PATTERN = Pattern.compile("\\s*,\\s*"); private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporter.class); + private static final Map PORT_TO_COLLECTOR_REGISTRY = new HashMap<>(); + private static final Map PORT_TO_SERVER = new HashMap<>(); - private HTTPServer httpServer; private final DropwizardExports metricExports; private final CollectorRegistry collectorRegistry; + private final int serverPort; public PrometheusReporter(HoodieWriteConfig config, MetricRegistry registry) { - int serverPort = config.getPrometheusPort(); - collectorRegistry = new CollectorRegistry(); - metricExports = new DropwizardExports(registry); + this.serverPort = config.getPrometheusPort(); + if (!PORT_TO_SERVER.containsKey(serverPort) || !PORT_TO_COLLECTOR_REGISTRY.containsKey(serverPort)) { + startHttpServer(serverPort); + } + List labelNames = new ArrayList<>(); + List labelValues = new ArrayList<>(); + if (StringUtils.nonEmpty(config.getPushGatewayLabels())) { + LABEL_PATTERN.splitAsStream(config.getPushGatewayLabels().trim()).map(s -> s.split(":", 2)) + .forEach(parts -> { + labelNames.add(parts[0]); + labelValues.add(parts[1]); + }); + } + metricExports = new DropwizardExports(registry, new LabeledSampleBuilder(labelNames, labelValues)); + this.collectorRegistry = PORT_TO_COLLECTOR_REGISTRY.get(serverPort); metricExports.register(collectorRegistry); - try { - httpServer = new HTTPServer(new InetSocketAddress(serverPort), collectorRegistry); - } catch (Exception e) { - String msg = "Could not start PrometheusReporter HTTP server on port " + serverPort; - LOG.error(msg, e); - throw new HoodieException(msg, e); + } + + private static synchronized void startHttpServer(int serverPort) { + if (!PORT_TO_COLLECTOR_REGISTRY.containsKey(serverPort)) { + PORT_TO_COLLECTOR_REGISTRY.put(serverPort, new CollectorRegistry()); + } + if (!PORT_TO_SERVER.containsKey(serverPort)) { + try { + HTTPServer server = new HTTPServer(new InetSocketAddress(serverPort), PORT_TO_COLLECTOR_REGISTRY.get(serverPort)); + PORT_TO_SERVER.put(serverPort, server); + Runtime.getRuntime().addShutdownHook(new Thread(server::stop)); + } catch (Exception e) { + String msg = "Could not start PrometheusReporter HTTP server on port " + serverPort; + LOG.error(msg, e); + throw new HoodieException(msg, e); + } } } @@ -68,8 +102,31 @@ public void report() { @Override public void stop() { collectorRegistry.unregister(metricExports); + HTTPServer httpServer = PORT_TO_SERVER.remove(serverPort); if (httpServer != null) { httpServer.stop(); } + PORT_TO_COLLECTOR_REGISTRY.remove(serverPort); + } + + private static class LabeledSampleBuilder implements SampleBuilder { + private final DefaultSampleBuilder defaultMetricSampleBuilder = new DefaultSampleBuilder(); + private final List labelNames; + private final List labelValues; + + public LabeledSampleBuilder(List labelNames, List labelValues) { + this.labelNames = labelNames; + this.labelValues = labelValues; + } + + @Override + public Collector.MetricFamilySamples.Sample createSample(String dropwizardName, String nameSuffix, List additionalLabelNames, List additionalLabelValues, double value) { + return defaultMetricSampleBuilder.createSample( + dropwizardName, + nameSuffix, + labelNames, + labelValues, + value); + } } }