Skip to content

Commit

Permalink
Add metrics for temporal workflow resets (#15016)
Browse files Browse the repository at this point in the history
* Add metrics for temporal workflow resets

* Properly record metric attributes

* PR feedback

* Formatting

* Record general workflow attempts/failures

* Refactor metric methods

* PR feedback

* Formatting
  • Loading branch information
jdpgrailsdev authored Aug 1, 2022
1 parent e65627b commit d0b15fe
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import com.timgroup.statsd.NonBlockingStatsDClientBuilder;
import com.timgroup.statsd.StatsDClient;
import io.airbyte.config.Configs;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -18,11 +20,17 @@
* <p>
* Open source users are free to turn this on and consume the same metrics.
* <p>
* This class is intended to be used in conjection with {@link Configs#getPublishMetrics()}.
* This class is intended to be used in conjunction with {@link Configs#getPublishMetrics()}.
* <p>
* Any {@link MetricAttribute}s provided with the metric data are sent as tags created by joining
* the {@code key} and {@code value} property of each {@link MetricAttribute} with a
* {@link #TAG_DELIMITER} delimiter.
*/
@Slf4j
public class DogStatsDMetricClient implements MetricClient {

private static final String TAG_DELIMITER = ":";

private boolean instancePublish = false;
private StatsDClient statsDClient;

Expand Down Expand Up @@ -62,19 +70,19 @@ public synchronized void shutdown() {
*
* @param metric
* @param amt to adjust.
* @param tags
* @param attributes
*/
@Override
public void count(final MetricsRegistry metric, final long amt, final String... tags) {
public void count(final MetricsRegistry metric, final long amt, final MetricAttribute... attributes) {
if (instancePublish) {
if (statsDClient == null) {
// do not loudly fail to prevent application disruption
log.warn("singleton not initialized, count {} not emitted", metric);
return;
}

log.info("publishing count, name: {}, value: {}, tags: {}", metric, amt, tags);
statsDClient.count(metric.getMetricName(), amt, tags);
log.info("publishing count, name: {}, value: {}, attributes: {}", metric, amt, attributes);
statsDClient.count(metric.getMetricName(), amt, toTags(attributes));
}
}

Expand All @@ -83,34 +91,44 @@ public void count(final MetricsRegistry metric, final long amt, final String...
*
* @param metric
* @param val to record.
* @param tags
* @param attributes
*/
@Override
public void gauge(final MetricsRegistry metric, final double val, final String... tags) {
public void gauge(final MetricsRegistry metric, final double val, final MetricAttribute... attributes) {
if (instancePublish) {
if (statsDClient == null) {
// do not loudly fail to prevent application disruption
log.warn("singleton not initialized, gauge {} not emitted", metric);
return;
}

log.info("publishing gauge, name: {}, value: {}, tags: {}", metric, val, tags);
statsDClient.gauge(metric.getMetricName(), val, tags);
log.info("publishing gauge, name: {}, value: {}, attributes: {}", metric, val, attributes);
statsDClient.gauge(metric.getMetricName(), val, toTags(attributes));
}
}

@Override
public void distribution(MetricsRegistry metric, double val, final String... tags) {
public void distribution(final MetricsRegistry metric, final double val, final MetricAttribute... attributes) {
if (instancePublish) {
if (statsDClient == null) {
// do not loudly fail to prevent application disruption
log.warn("singleton not initialized, distribution {} not emitted", metric);
return;
}

log.info("recording distribution, name: {}, value: {}, tags: {}", metric, val, tags);
statsDClient.distribution(metric.getMetricName(), val, tags);
log.info("recording distribution, name: {}, value: {}, attributes: {}", metric, val, attributes);
statsDClient.distribution(metric.getMetricName(), val, toTags(attributes));
}
}

/**
* Converts each {@link MetricAttribute} tuple to a list of tags consumable by StatsD.
*
* @param attributes An array of {@link MetricAttribute} tuples.
* @return An array of tag values.
*/
private String[] toTags(final MetricAttribute... attributes) {
return Stream.of(attributes).map(a -> String.join(TAG_DELIMITER, a.key(), a.value())).collect(Collectors.toList()).toArray(new String[] {});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.metrics.lib;

/**
* Custom tuple that represents a key/value pair to be included with a metric.
* <p>
* It is up to each {@link MetricClient} implementation to decide what data from this record is used
* when generating a metric. See the specific implementations of the {@link MetricClient} interface
* for actual usage.
*/
public record MetricAttribute(String key, String value) {}
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ public interface MetricClient {
*
* @param metric
* @param val to record.
* @param tags
* @param attributes
*/
void count(MetricsRegistry metric, long val, final String... tags);
void count(MetricsRegistry metric, long val, final MetricAttribute... attributes);

/**
* Record the latest value for a gauge.
*
* @param metric
* @param val to record.
* @param tags
* @param attributes
*/
void gauge(MetricsRegistry metric, double val, final String... tags);
void gauge(MetricsRegistry metric, double val, final MetricAttribute... attributes);

/*
* Accepts value on the metrics, and report the distribution of these values. Useful to analysis how
Expand All @@ -35,9 +35,9 @@ public interface MetricClient {
*
* @param val to record.
*
* @param tags
* @param attributes
*/
void distribution(MetricsRegistry metric, double val, final String... tags);
void distribution(MetricsRegistry metric, double val, final MetricAttribute... attributes);

/*
* Reset initialization. Can be used in a unit test to reset metric client state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,24 @@
*/
public class MetricTags {

private static final String RELEASE_STAGE = "release_stage";
private static final String FAILURE_ORIGIN = "failure_origin";
private static final String JOB_STATUS = "job_status";
public static final String CONNECTION_ID = "connection_id";
public static final String FAILURE_ORIGIN = "failure_origin";
public static final String JOB_ID = "job_id";
public static final String JOB_STATUS = "job_status";
public static final String RELEASE_STAGE = "release_stage";
public static final String RESET_WORKFLOW_FAILURE_CAUSE = "failure_cause";
public static final String WORKFLOW_TYPE = "workflow_type";

public static String getReleaseStage(final ReleaseStage stage) {
return tagDelimit(RELEASE_STAGE, stage.getLiteral());
return stage.getLiteral();
}

public static String getFailureOrigin(final FailureOrigin origin) {
return tagDelimit(FAILURE_ORIGIN, origin.value());
return origin.value();
}

public static String getJobStatus(final JobStatus status) {
return tagDelimit(JOB_STATUS, status.getLiteral());
}

private static String tagDelimit(final String tagName, final String tagVal) {
return String.join(":", tagName, tagVal);
return status.getLiteral();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@
public class NotImplementedMetricClient implements MetricClient {

@Override
public void count(MetricsRegistry metric, long val, String... tags) {
public void count(final MetricsRegistry metric, final long val, final MetricAttribute... attributes) {
// Not Implemented.
}

@Override
public void gauge(MetricsRegistry metric, double val, String... tags) {
public void gauge(final MetricsRegistry metric, final double val, final MetricAttribute... attributes) {
// Not Implemented.
}

@Override
public void distribution(MetricsRegistry metric, double val, String... tags) {
public void distribution(final MetricsRegistry metric, final double val, final MetricAttribute... attributes) {
// Not Implemented.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,58 +27,54 @@
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;

/**
* Implementation of the {@link MetricClient} that sends the provided metric data to an
* OpenTelemetry compliant metrics store.
* <p>
* Any {@link MetricAttribute}s provided along with the metric data are passed as key/value pairs
* annotating the metric.
*/
public class OpenTelemetryMetricClient implements MetricClient {

private Meter meter;
private SdkMeterProvider meterProvider;

@Override
public void count(MetricsRegistry metric, long val, String... tags) {
LongCounter counter = meter
public void count(final MetricsRegistry metric, final long val, final MetricAttribute... attributes) {
final LongCounter counter = meter
.counterBuilder(metric.getMetricName())
.setDescription(metric.getMetricDescription())
.build();

AttributesBuilder attributesBuilder = Attributes.builder();
for (String tag : tags) {
attributesBuilder.put(stringKey(tag), tag);
}

final AttributesBuilder attributesBuilder = buildAttributes(attributes);
counter.add(val, attributesBuilder.build());
}

@Override
public void gauge(MetricsRegistry metric, double val, String... tags) {
AttributesBuilder attributesBuilder = Attributes.builder();
for (String tag : tags) {
attributesBuilder.put(stringKey(tag), tag);
}
public void gauge(final MetricsRegistry metric, final double val, final MetricAttribute... attributes) {
final AttributesBuilder attributesBuilder = buildAttributes(attributes);
meter.gaugeBuilder(metric.getMetricName()).setDescription(metric.getMetricDescription())
.buildWithCallback(measurement -> measurement.record(val, attributesBuilder.build()));
}

@Override
public void distribution(MetricsRegistry metric, double val, String... tags) {
DoubleHistogram histogramMeter = meter.histogramBuilder(metric.getMetricName()).setDescription(metric.getMetricDescription()).build();
AttributesBuilder attributesBuilder = Attributes.builder();

for (String tag : tags) {
attributesBuilder.put(stringKey(tag), tag);
}
public void distribution(final MetricsRegistry metric, final double val, final MetricAttribute... attributes) {
final DoubleHistogram histogramMeter = meter.histogramBuilder(metric.getMetricName()).setDescription(metric.getMetricDescription()).build();
final AttributesBuilder attributesBuilder = buildAttributes(attributes);
histogramMeter.record(val, attributesBuilder.build());
}

public void initialize(MetricEmittingApp metricEmittingApp, String otelEndpoint) {
Resource resource = Resource.getDefault().toBuilder().put(SERVICE_NAME, metricEmittingApp.getApplicationName()).build();
public void initialize(final MetricEmittingApp metricEmittingApp, final String otelEndpoint) {
final Resource resource = Resource.getDefault().toBuilder().put(SERVICE_NAME, metricEmittingApp.getApplicationName()).build();

SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
final SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(
BatchSpanProcessor
.builder(OtlpGrpcSpanExporter.builder().setEndpoint(otelEndpoint).build())
.build())
.setResource(resource)
.build();
MetricExporter metricExporter = OtlpGrpcMetricExporter.builder()
final MetricExporter metricExporter = OtlpGrpcMetricExporter.builder()
.setEndpoint(otelEndpoint).build();
initialize(metricEmittingApp, metricExporter, sdkTracerProvider, resource);
}
Expand All @@ -89,13 +85,17 @@ SdkMeterProvider getSdkMeterProvider() {
}

@VisibleForTesting
void initialize(MetricEmittingApp metricEmittingApp, MetricExporter metricExporter, SdkTracerProvider sdkTracerProvider, Resource resource) {
void initialize(
final MetricEmittingApp metricEmittingApp,
final MetricExporter metricExporter,
final SdkTracerProvider sdkTracerProvider,
final Resource resource) {
meterProvider = SdkMeterProvider.builder()
.registerMetricReader(PeriodicMetricReader.builder(metricExporter).build())
.setResource(resource)
.build();

OpenTelemetry openTelemetry = OpenTelemetrySdk.builder()
final OpenTelemetry openTelemetry = OpenTelemetrySdk.builder()
.setTracerProvider(sdkTracerProvider)
.setMeterProvider(meterProvider)
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
Expand All @@ -110,4 +110,12 @@ public void shutdown() {
resetForTest();
}

private AttributesBuilder buildAttributes(final MetricAttribute... attributes) {
final AttributesBuilder attributesBuilder = Attributes.builder();
for (final MetricAttribute attribute : attributes) {
attributesBuilder.put(stringKey(attribute.key()), attribute.value());
}
return attributesBuilder;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,19 @@ public enum OssMetricsRegistry implements MetricsRegistry {
"oldest running job in seconds"),
OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS(MetricEmittingApps.METRICS_REPORTER,
"overall_job_runtime_in_last_hour_by_terminal_state_secs",
"overall job runtime - scheduling and execution for all attempts - for jobs that reach terminal states in the last hour. tagged by terminal states.");
"overall job runtime - scheduling and execution for all attempts - for jobs that reach terminal states in the last hour. tagged by terminal states."),

TEMPORAL_WORKFLOW_ATTEMPT(MetricEmittingApps.WORKER,
"temporal_workflow_attempt",
"count of the number of workflow attempts"),

TEMPORAL_WORKFLOW_SUCCESS(MetricEmittingApps.WORKER,
"temporal_workflow_success",
"count of the number of successful workflow syncs."),

TEMPORAL_WORKFLOW_FAILURE(MetricEmittingApps.WORKER,
"temporal_workflow_failure",
"count of the number of workflow failures");

private final MetricEmittingApp application;
private final String metricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void testCountSuccess() {
@Test
@DisplayName("Tags should be passed into metrics")
void testCountWithTagSuccess() {
openTelemetryMetricClient.count(OssMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1, TAG);
openTelemetryMetricClient.count(OssMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1, new MetricAttribute(TAG, TAG));

metricProvider.forceFlush();
final List<MetricData> metricDataList = metricExporter.getFinishedMetricItems();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import io.airbyte.commons.lang.Exceptions.Procedure;
import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStatus;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricQueries;
import io.airbyte.metrics.lib.MetricTags;
Expand Down Expand Up @@ -52,7 +53,8 @@ public enum ToEmit {
final var times = ReporterApp.configDatabase.query(MetricQueries::overallJobRuntimeForTerminalJobsInLastHour);
for (Pair<JobStatus, Double> pair : times) {
MetricClientFactory.getMetricClient().distribution(
OssMetricsRegistry.OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS, pair.getRight(), MetricTags.getJobStatus(pair.getLeft()));
OssMetricsRegistry.OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS, pair.getRight(),
new MetricAttribute(MetricTags.JOB_STATUS, MetricTags.getJobStatus(pair.getLeft())));
}
}), 1, TimeUnit.HOURS);

Expand Down
Loading

0 comments on commit d0b15fe

Please sign in to comment.