Skip to content

Commit

Permalink
[HUDI-7083] Adding support for multiple tables with Prometheus Report…
Browse files Browse the repository at this point in the history
…er (apache#10068)

* Adding support for multiple tables with Prometheus Reporter

* Fixing closure of http server

* Remove entry from port-collector registry map after stopping http server

---------

Co-authored-by: Sagar Sumit <[email protected]>
  • Loading branch information
nsivabalan and codope authored Nov 21, 2023
1 parent baffe1d commit 9e2500c
Showing 1 changed file with 67 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,76 @@

package org.apache.hudi.metrics.prometheus;

import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metrics.MetricsReporter;

import com.codahale.metrics.MetricRegistry;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.dropwizard.DropwizardExports;
import io.prometheus.client.dropwizard.samplebuilder.DefaultSampleBuilder;
import io.prometheus.client.dropwizard.samplebuilder.SampleBuilder;
import io.prometheus.client.exporter.HTTPServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

/**
* Implementation of Prometheus reporter, which connects to the Http server, and get metrics
* from that server.
*/
public class PrometheusReporter extends MetricsReporter {
private static final Pattern LABEL_PATTERN = Pattern.compile("\\s*,\\s*");

private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporter.class);
private static final Map<Integer, CollectorRegistry> PORT_TO_COLLECTOR_REGISTRY = new HashMap<>();
private static final Map<Integer, HTTPServer> PORT_TO_SERVER = new HashMap<>();

private HTTPServer httpServer;
private final DropwizardExports metricExports;
private final CollectorRegistry collectorRegistry;
private final int serverPort;

public PrometheusReporter(HoodieWriteConfig config, MetricRegistry registry) {
int serverPort = config.getPrometheusPort();
collectorRegistry = new CollectorRegistry();
metricExports = new DropwizardExports(registry);
this.serverPort = config.getPrometheusPort();
if (!PORT_TO_SERVER.containsKey(serverPort) || !PORT_TO_COLLECTOR_REGISTRY.containsKey(serverPort)) {
startHttpServer(serverPort);
}
List<String> labelNames = new ArrayList<>();
List<String> labelValues = new ArrayList<>();
if (StringUtils.nonEmpty(config.getPushGatewayLabels())) {
LABEL_PATTERN.splitAsStream(config.getPushGatewayLabels().trim()).map(s -> s.split(":", 2))
.forEach(parts -> {
labelNames.add(parts[0]);
labelValues.add(parts[1]);
});
}
metricExports = new DropwizardExports(registry, new LabeledSampleBuilder(labelNames, labelValues));
this.collectorRegistry = PORT_TO_COLLECTOR_REGISTRY.get(serverPort);
metricExports.register(collectorRegistry);
try {
httpServer = new HTTPServer(new InetSocketAddress(serverPort), collectorRegistry);
} catch (Exception e) {
String msg = "Could not start PrometheusReporter HTTP server on port " + serverPort;
LOG.error(msg, e);
throw new HoodieException(msg, e);
}

private static synchronized void startHttpServer(int serverPort) {
if (!PORT_TO_COLLECTOR_REGISTRY.containsKey(serverPort)) {
PORT_TO_COLLECTOR_REGISTRY.put(serverPort, new CollectorRegistry());
}
if (!PORT_TO_SERVER.containsKey(serverPort)) {
try {
HTTPServer server = new HTTPServer(new InetSocketAddress(serverPort), PORT_TO_COLLECTOR_REGISTRY.get(serverPort));
PORT_TO_SERVER.put(serverPort, server);
Runtime.getRuntime().addShutdownHook(new Thread(server::stop));
} catch (Exception e) {
String msg = "Could not start PrometheusReporter HTTP server on port " + serverPort;
LOG.error(msg, e);
throw new HoodieException(msg, e);
}
}
}

Expand All @@ -68,8 +102,31 @@ public void report() {
@Override
public void stop() {
collectorRegistry.unregister(metricExports);
HTTPServer httpServer = PORT_TO_SERVER.remove(serverPort);
if (httpServer != null) {
httpServer.stop();
}
PORT_TO_COLLECTOR_REGISTRY.remove(serverPort);
}

private static class LabeledSampleBuilder implements SampleBuilder {
private final DefaultSampleBuilder defaultMetricSampleBuilder = new DefaultSampleBuilder();
private final List<String> labelNames;
private final List<String> labelValues;

public LabeledSampleBuilder(List<String> labelNames, List<String> labelValues) {
this.labelNames = labelNames;
this.labelValues = labelValues;
}

@Override
public Collector.MetricFamilySamples.Sample createSample(String dropwizardName, String nameSuffix, List<String> additionalLabelNames, List<String> additionalLabelValues, double value) {
return defaultMetricSampleBuilder.createSample(
dropwizardName,
nameSuffix,
labelNames,
labelValues,
value);
}
}
}

0 comments on commit 9e2500c

Please sign in to comment.