diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/DogStatsDMetricClient.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/DogStatsDMetricClient.java index 98624ac84824..bc2373b53dc3 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/DogStatsDMetricClient.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/DogStatsDMetricClient.java @@ -8,6 +8,8 @@ import com.timgroup.statsd.NonBlockingStatsDClientBuilder; import com.timgroup.statsd.StatsDClient; import io.airbyte.config.Configs; +import java.util.stream.Collectors; +import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; /** @@ -18,11 +20,17 @@ *

* Open source users are free to turn this on and consume the same metrics. *

- * This class is intended to be used in conjection with {@link Configs#getPublishMetrics()}. + * This class is intended to be used in conjunction with {@link Configs#getPublishMetrics()}. + *

+ * Any {@link MetricAttribute}s provided with the metric data are sent as tags created by joining + * the {@code key} and {@code value} property of each {@link MetricAttribute} with a + * {@link #TAG_DELIMITER} delimiter. */ @Slf4j public class DogStatsDMetricClient implements MetricClient { + private static final String TAG_DELIMITER = ":"; + private boolean instancePublish = false; private StatsDClient statsDClient; @@ -62,10 +70,10 @@ public synchronized void shutdown() { * * @param metric * @param amt to adjust. - * @param tags + * @param attributes */ @Override - public void count(final MetricsRegistry metric, final long amt, final String... tags) { + public void count(final MetricsRegistry metric, final long amt, final MetricAttribute... attributes) { if (instancePublish) { if (statsDClient == null) { // do not loudly fail to prevent application disruption @@ -73,8 +81,8 @@ public void count(final MetricsRegistry metric, final long amt, final String... return; } - log.info("publishing count, name: {}, value: {}, tags: {}", metric, amt, tags); - statsDClient.count(metric.getMetricName(), amt, tags); + log.info("publishing count, name: {}, value: {}, attributes: {}", metric, amt, attributes); + statsDClient.count(metric.getMetricName(), amt, toTags(attributes)); } } @@ -83,10 +91,10 @@ public void count(final MetricsRegistry metric, final long amt, final String... * * @param metric * @param val to record. - * @param tags + * @param attributes */ @Override - public void gauge(final MetricsRegistry metric, final double val, final String... tags) { + public void gauge(final MetricsRegistry metric, final double val, final MetricAttribute... attributes) { if (instancePublish) { if (statsDClient == null) { // do not loudly fail to prevent application disruption @@ -94,13 +102,13 @@ public void gauge(final MetricsRegistry metric, final double val, final String.. return; } - log.info("publishing gauge, name: {}, value: {}, tags: {}", metric, val, tags); - statsDClient.gauge(metric.getMetricName(), val, tags); + log.info("publishing gauge, name: {}, value: {}, attributes: {}", metric, val, attributes); + statsDClient.gauge(metric.getMetricName(), val, toTags(attributes)); } } @Override - public void distribution(MetricsRegistry metric, double val, final String... tags) { + public void distribution(final MetricsRegistry metric, final double val, final MetricAttribute... attributes) { if (instancePublish) { if (statsDClient == null) { // do not loudly fail to prevent application disruption @@ -108,9 +116,19 @@ public void distribution(MetricsRegistry metric, double val, final String... tag return; } - log.info("recording distribution, name: {}, value: {}, tags: {}", metric, val, tags); - statsDClient.distribution(metric.getMetricName(), val, tags); + log.info("recording distribution, name: {}, value: {}, attributes: {}", metric, val, attributes); + statsDClient.distribution(metric.getMetricName(), val, toTags(attributes)); } } + /** + * Converts each {@link MetricAttribute} tuple to a list of tags consumable by StatsD. + * + * @param attributes An array of {@link MetricAttribute} tuples. + * @return An array of tag values. + */ + private String[] toTags(final MetricAttribute... attributes) { + return Stream.of(attributes).map(a -> String.join(TAG_DELIMITER, a.key(), a.value())).collect(Collectors.toList()).toArray(new String[] {}); + } + } diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricAttribute.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricAttribute.java new file mode 100644 index 000000000000..4a752713e553 --- /dev/null +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricAttribute.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.metrics.lib; + +/** + * Custom tuple that represents a key/value pair to be included with a metric. + *

+ * It is up to each {@link MetricClient} implementation to decide what data from this record is used + * when generating a metric. See the specific implementations of the {@link MetricClient} interface + * for actual usage. + */ +public record MetricAttribute(String key, String value) {} diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricClient.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricClient.java index 0f916d49d27b..f1c5f200eec5 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricClient.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricClient.java @@ -14,18 +14,18 @@ public interface MetricClient { * * @param metric * @param val to record. - * @param tags + * @param attributes */ - void count(MetricsRegistry metric, long val, final String... tags); + void count(MetricsRegistry metric, long val, final MetricAttribute... attributes); /** * Record the latest value for a gauge. * * @param metric * @param val to record. - * @param tags + * @param attributes */ - void gauge(MetricsRegistry metric, double val, final String... tags); + void gauge(MetricsRegistry metric, double val, final MetricAttribute... attributes); /* * Accepts value on the metrics, and report the distribution of these values. Useful to analysis how @@ -35,9 +35,9 @@ public interface MetricClient { * * @param val to record. * - * @param tags + * @param attributes */ - void distribution(MetricsRegistry metric, double val, final String... tags); + void distribution(MetricsRegistry metric, double val, final MetricAttribute... attributes); /* * Reset initialization. Can be used in a unit test to reset metric client state. diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java index 026223f1e54d..998d290717b8 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java @@ -13,24 +13,24 @@ */ public class MetricTags { - private static final String RELEASE_STAGE = "release_stage"; - private static final String FAILURE_ORIGIN = "failure_origin"; - private static final String JOB_STATUS = "job_status"; + public static final String CONNECTION_ID = "connection_id"; + public static final String FAILURE_ORIGIN = "failure_origin"; + public static final String JOB_ID = "job_id"; + public static final String JOB_STATUS = "job_status"; + public static final String RELEASE_STAGE = "release_stage"; + public static final String RESET_WORKFLOW_FAILURE_CAUSE = "failure_cause"; + public static final String WORKFLOW_TYPE = "workflow_type"; public static String getReleaseStage(final ReleaseStage stage) { - return tagDelimit(RELEASE_STAGE, stage.getLiteral()); + return stage.getLiteral(); } public static String getFailureOrigin(final FailureOrigin origin) { - return tagDelimit(FAILURE_ORIGIN, origin.value()); + return origin.value(); } public static String getJobStatus(final JobStatus status) { - return tagDelimit(JOB_STATUS, status.getLiteral()); - } - - private static String tagDelimit(final String tagName, final String tagVal) { - return String.join(":", tagName, tagVal); + return status.getLiteral(); } } diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/NotImplementedMetricClient.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/NotImplementedMetricClient.java index bda99e097bd2..8ffd3db594b8 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/NotImplementedMetricClient.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/NotImplementedMetricClient.java @@ -12,17 +12,17 @@ public class NotImplementedMetricClient implements MetricClient { @Override - public void count(MetricsRegistry metric, long val, String... tags) { + public void count(final MetricsRegistry metric, final long val, final MetricAttribute... attributes) { // Not Implemented. } @Override - public void gauge(MetricsRegistry metric, double val, String... tags) { + public void gauge(final MetricsRegistry metric, final double val, final MetricAttribute... attributes) { // Not Implemented. } @Override - public void distribution(MetricsRegistry metric, double val, String... tags) { + public void distribution(final MetricsRegistry metric, final double val, final MetricAttribute... attributes) { // Not Implemented. } diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OpenTelemetryMetricClient.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OpenTelemetryMetricClient.java index 45d8ed675e7c..120665c93b49 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OpenTelemetryMetricClient.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OpenTelemetryMetricClient.java @@ -27,58 +27,54 @@ import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +/** + * Implementation of the {@link MetricClient} that sends the provided metric data to an + * OpenTelemetry compliant metrics store. + *

+ * Any {@link MetricAttribute}s provided along with the metric data are passed as key/value pairs + * annotating the metric. + */ public class OpenTelemetryMetricClient implements MetricClient { private Meter meter; private SdkMeterProvider meterProvider; @Override - public void count(MetricsRegistry metric, long val, String... tags) { - LongCounter counter = meter + public void count(final MetricsRegistry metric, final long val, final MetricAttribute... attributes) { + final LongCounter counter = meter .counterBuilder(metric.getMetricName()) .setDescription(metric.getMetricDescription()) .build(); - AttributesBuilder attributesBuilder = Attributes.builder(); - for (String tag : tags) { - attributesBuilder.put(stringKey(tag), tag); - } - + final AttributesBuilder attributesBuilder = buildAttributes(attributes); counter.add(val, attributesBuilder.build()); } @Override - public void gauge(MetricsRegistry metric, double val, String... tags) { - AttributesBuilder attributesBuilder = Attributes.builder(); - for (String tag : tags) { - attributesBuilder.put(stringKey(tag), tag); - } + public void gauge(final MetricsRegistry metric, final double val, final MetricAttribute... attributes) { + final AttributesBuilder attributesBuilder = buildAttributes(attributes); meter.gaugeBuilder(metric.getMetricName()).setDescription(metric.getMetricDescription()) .buildWithCallback(measurement -> measurement.record(val, attributesBuilder.build())); } @Override - public void distribution(MetricsRegistry metric, double val, String... tags) { - DoubleHistogram histogramMeter = meter.histogramBuilder(metric.getMetricName()).setDescription(metric.getMetricDescription()).build(); - AttributesBuilder attributesBuilder = Attributes.builder(); - - for (String tag : tags) { - attributesBuilder.put(stringKey(tag), tag); - } + public void distribution(final MetricsRegistry metric, final double val, final MetricAttribute... attributes) { + final DoubleHistogram histogramMeter = meter.histogramBuilder(metric.getMetricName()).setDescription(metric.getMetricDescription()).build(); + final AttributesBuilder attributesBuilder = buildAttributes(attributes); histogramMeter.record(val, attributesBuilder.build()); } - public void initialize(MetricEmittingApp metricEmittingApp, String otelEndpoint) { - Resource resource = Resource.getDefault().toBuilder().put(SERVICE_NAME, metricEmittingApp.getApplicationName()).build(); + public void initialize(final MetricEmittingApp metricEmittingApp, final String otelEndpoint) { + final Resource resource = Resource.getDefault().toBuilder().put(SERVICE_NAME, metricEmittingApp.getApplicationName()).build(); - SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() + final SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() .addSpanProcessor( BatchSpanProcessor .builder(OtlpGrpcSpanExporter.builder().setEndpoint(otelEndpoint).build()) .build()) .setResource(resource) .build(); - MetricExporter metricExporter = OtlpGrpcMetricExporter.builder() + final MetricExporter metricExporter = OtlpGrpcMetricExporter.builder() .setEndpoint(otelEndpoint).build(); initialize(metricEmittingApp, metricExporter, sdkTracerProvider, resource); } @@ -89,13 +85,17 @@ SdkMeterProvider getSdkMeterProvider() { } @VisibleForTesting - void initialize(MetricEmittingApp metricEmittingApp, MetricExporter metricExporter, SdkTracerProvider sdkTracerProvider, Resource resource) { + void initialize( + final MetricEmittingApp metricEmittingApp, + final MetricExporter metricExporter, + final SdkTracerProvider sdkTracerProvider, + final Resource resource) { meterProvider = SdkMeterProvider.builder() .registerMetricReader(PeriodicMetricReader.builder(metricExporter).build()) .setResource(resource) .build(); - OpenTelemetry openTelemetry = OpenTelemetrySdk.builder() + final OpenTelemetry openTelemetry = OpenTelemetrySdk.builder() .setTracerProvider(sdkTracerProvider) .setMeterProvider(meterProvider) .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) @@ -110,4 +110,12 @@ public void shutdown() { resetForTest(); } + private AttributesBuilder buildAttributes(final MetricAttribute... attributes) { + final AttributesBuilder attributesBuilder = Attributes.builder(); + for (final MetricAttribute attribute : attributes) { + attributesBuilder.put(stringKey(attribute.key()), attribute.value()); + } + return attributesBuilder; + } + } diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java index dea563de9ad8..d717a64ac339 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java @@ -95,7 +95,19 @@ public enum OssMetricsRegistry implements MetricsRegistry { "oldest running job in seconds"), OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS(MetricEmittingApps.METRICS_REPORTER, "overall_job_runtime_in_last_hour_by_terminal_state_secs", - "overall job runtime - scheduling and execution for all attempts - for jobs that reach terminal states in the last hour. tagged by terminal states."); + "overall job runtime - scheduling and execution for all attempts - for jobs that reach terminal states in the last hour. tagged by terminal states."), + + TEMPORAL_WORKFLOW_ATTEMPT(MetricEmittingApps.WORKER, + "temporal_workflow_attempt", + "count of the number of workflow attempts"), + + TEMPORAL_WORKFLOW_SUCCESS(MetricEmittingApps.WORKER, + "temporal_workflow_success", + "count of the number of successful workflow syncs."), + + TEMPORAL_WORKFLOW_FAILURE(MetricEmittingApps.WORKER, + "temporal_workflow_failure", + "count of the number of workflow failures"); private final MetricEmittingApp application; private final String metricName; diff --git a/airbyte-metrics/metrics-lib/src/test/java/io/airbyte/metrics/lib/OpenTelemetryMetricClientTest.java b/airbyte-metrics/metrics-lib/src/test/java/io/airbyte/metrics/lib/OpenTelemetryMetricClientTest.java index f9784e4d0855..1312cf0c6710 100644 --- a/airbyte-metrics/metrics-lib/src/test/java/io/airbyte/metrics/lib/OpenTelemetryMetricClientTest.java +++ b/airbyte-metrics/metrics-lib/src/test/java/io/airbyte/metrics/lib/OpenTelemetryMetricClientTest.java @@ -65,7 +65,7 @@ void testCountSuccess() { @Test @DisplayName("Tags should be passed into metrics") void testCountWithTagSuccess() { - openTelemetryMetricClient.count(OssMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1, TAG); + openTelemetryMetricClient.count(OssMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1, new MetricAttribute(TAG, TAG)); metricProvider.forceFlush(); final List metricDataList = metricExporter.getFinishedMetricItems(); diff --git a/airbyte-metrics/reporter/src/main/java/io/airbyte/metrics/reporter/ToEmit.java b/airbyte-metrics/reporter/src/main/java/io/airbyte/metrics/reporter/ToEmit.java index b63366ef2141..4e7b07fd5c3f 100644 --- a/airbyte-metrics/reporter/src/main/java/io/airbyte/metrics/reporter/ToEmit.java +++ b/airbyte-metrics/reporter/src/main/java/io/airbyte/metrics/reporter/ToEmit.java @@ -6,6 +6,7 @@ import io.airbyte.commons.lang.Exceptions.Procedure; import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStatus; +import io.airbyte.metrics.lib.MetricAttribute; import io.airbyte.metrics.lib.MetricClientFactory; import io.airbyte.metrics.lib.MetricQueries; import io.airbyte.metrics.lib.MetricTags; @@ -52,7 +53,8 @@ public enum ToEmit { final var times = ReporterApp.configDatabase.query(MetricQueries::overallJobRuntimeForTerminalJobsInLastHour); for (Pair pair : times) { MetricClientFactory.getMetricClient().distribution( - OssMetricsRegistry.OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS, pair.getRight(), MetricTags.getJobStatus(pair.getLeft())); + OssMetricsRegistry.OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS, pair.getRight(), + new MetricAttribute(MetricTags.JOB_STATUS, MetricTags.getJobStatus(pair.getLeft()))); } }), 1, TimeUnit.HOURS); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 23154161974b..da3cedc2d69c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -16,6 +16,11 @@ import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary; import io.airbyte.config.StandardSyncSummary.ReplicationStatus; +import io.airbyte.metrics.lib.MetricAttribute; +import io.airbyte.metrics.lib.MetricClient; +import io.airbyte.metrics.lib.MetricClientFactory; +import io.airbyte.metrics.lib.MetricTags; +import io.airbyte.metrics.lib.OssMetricsRegistry; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.workers.WorkerConstants; @@ -68,10 +73,14 @@ import io.temporal.workflow.Workflow; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.UUID; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -123,10 +132,13 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow private UUID connectionId; + private final MetricClient metricClient = MetricClientFactory.getMetricClient(); + public ConnectionManagerWorkflowImpl() {} @Override public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws RetryableException { + recordWorkflowCountMetric(connectionUpdaterInput, OssMetricsRegistry.TEMPORAL_WORKFLOW_ATTEMPT); try { try { cancellableSyncWorkflow = generateSyncWorkflowRunnable(connectionUpdaterInput); @@ -135,6 +147,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr // When a scope is cancelled temporal will throw a CanceledFailure as you can see here: // https://github.com/temporalio/sdk-java/blob/master/temporal-sdk/src/main/java/io/temporal/workflow/CancellationScope.java#L72 // The naming is very misleading, it is not a failure but the expected behavior... + recordWorkflowFailureCountMetric(connectionUpdaterInput, FailureCause.CANCELED); } if (workflowState.isDeleted()) { @@ -160,7 +173,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr } catch (final Exception e) { log.error("The connection update workflow has failed, will create a new attempt.", e); - reportFailure(connectionUpdaterInput, null); + reportFailure(connectionUpdaterInput, null, FailureCause.UNKNOWN); prepareForNextRunAndContinueAsNew(connectionUpdaterInput); } } @@ -213,13 +226,13 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn if (syncCheckConnectionFailure.isFailed()) { final StandardSyncOutput checkFailureOutput = syncCheckConnectionFailure.buildFailureOutput(); workflowState.setFailed(getFailStatus(checkFailureOutput)); - reportFailure(connectionUpdaterInput, checkFailureOutput); + reportFailure(connectionUpdaterInput, checkFailureOutput, FailureCause.CONNECTION); } else { standardSyncOutput = runChildWorkflow(jobInputs); workflowState.setFailed(getFailStatus(standardSyncOutput)); if (workflowState.isFailed()) { - reportFailure(connectionUpdaterInput, standardSyncOutput); + reportFailure(connectionUpdaterInput, standardSyncOutput, FailureCause.UNKNOWN); } else { reportSuccess(connectionUpdaterInput, standardSyncOutput); } @@ -232,7 +245,7 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn // silently ignore it. if (childWorkflowFailure.getCause() instanceof CanceledFailure) { // do nothing, cancellation handled by cancellationScope - + recordWorkflowFailureCountMetric(connectionUpdaterInput, FailureCause.CANCELED); } else if (childWorkflowFailure.getCause()instanceof final ActivityFailure af) { // Allows us to classify unhandled failures from the sync workflow. e.g. If the normalization // activity throws an exception, for @@ -243,13 +256,13 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn af.getCause(), workflowInternalState.getJobId(), workflowInternalState.getAttemptNumber())); - reportFailure(connectionUpdaterInput, standardSyncOutput); + reportFailure(connectionUpdaterInput, standardSyncOutput, FailureCause.ACTIVITY); prepareForNextRunAndContinueAsNew(connectionUpdaterInput); } else { workflowInternalState.getFailures().add( FailureHelper.unknownOriginFailure(childWorkflowFailure.getCause(), workflowInternalState.getJobId(), workflowInternalState.getAttemptNumber())); - reportFailure(connectionUpdaterInput, standardSyncOutput); + reportFailure(connectionUpdaterInput, standardSyncOutput, FailureCause.WORKFLOW); prepareForNextRunAndContinueAsNew(connectionUpdaterInput); } } @@ -275,11 +288,15 @@ private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput, deleteResetJobStreams(); + // Record the success metric + recordWorkflowCountMetric(connectionUpdaterInput, OssMetricsRegistry.TEMPORAL_WORKFLOW_SUCCESS); + resetNewConnectionInput(connectionUpdaterInput); } private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput, - final StandardSyncOutput standardSyncOutput) { + final StandardSyncOutput standardSyncOutput, + final FailureCause failureCause) { final int attemptCreationVersion = Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION); @@ -325,6 +342,9 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput, } } + // Record the failure metric + recordWorkflowFailureCountMetric(connectionUpdaterInput, failureCause); + resetNewConnectionInput(connectionUpdaterInput); } } @@ -796,4 +816,58 @@ private void deleteResetJobStreams() { new DeleteStreamResetRecordsForJobInput(connectionId, workflowInternalState.getJobId())); } + /** + * Records a counter metric for a workflow execution failure. + * + * @param connectionUpdaterInput The {@link ConnectionUpdaterInput} that represents the workflow to + * be executed. + * @param failureCause The cause of the workflow failure. + */ + private void recordWorkflowFailureCountMetric(final ConnectionUpdaterInput connectionUpdaterInput, final FailureCause failureCause) { + recordWorkflowCountMetric(connectionUpdaterInput, OssMetricsRegistry.TEMPORAL_WORKFLOW_FAILURE, + new MetricAttribute(MetricTags.RESET_WORKFLOW_FAILURE_CAUSE, failureCause.name())); + } + + /** + * Records a workflow counter for the specified metric. + * + * @param connectionUpdaterInput The {@link ConnectionUpdaterInput} that represents the workflow to + * * be executed. + * @param metricName The name of the metric + * @param metricAttributes Additional metric attributes. + */ + private void recordWorkflowCountMetric(final ConnectionUpdaterInput connectionUpdaterInput, + final OssMetricsRegistry metricName, + final MetricAttribute... metricAttributes) { + final List baseMetricAttributes = generateMetricAttributes(connectionUpdaterInput); + if (metricAttributes != null) { + baseMetricAttributes.addAll(Stream.of(metricAttributes).collect(Collectors.toList())); + } + metricClient.count(metricName, 1L, baseMetricAttributes.toArray(new MetricAttribute[] {})); + } + + /** + * Generates the list of {@link MetricAttribute}s to be included when recording a metric. + * + * @param connectionUpdaterInput The {@link ConnectionUpdaterInput} that represents the workflow to + * be executed. + * @return The list of {@link MetricAttribute}s to be included when recording a metric. + */ + private List generateMetricAttributes(final ConnectionUpdaterInput connectionUpdaterInput) { + final List metricAttributes = new ArrayList<>(); + metricAttributes.add(new MetricAttribute(MetricTags.CONNECTION_ID, String.valueOf(connectionUpdaterInput.getConnectionId()))); + return metricAttributes; + } + + /** + * Enumeration of workflow failure causes. + */ + private enum FailureCause { + ACTIVITY, + CANCELED, + CONNECTION, + UNKNOWN, + WORKFLOW + } + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 6ba83e6aa88d..5ab23cf7fcc0 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -21,6 +21,7 @@ import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.StreamResetPersistence; import io.airbyte.db.instance.configs.jooq.generated.enums.ReleaseStage; +import io.airbyte.metrics.lib.MetricAttribute; import io.airbyte.metrics.lib.MetricClientFactory; import io.airbyte.metrics.lib.MetricTags; import io.airbyte.metrics.lib.OssMetricsRegistry; @@ -119,7 +120,8 @@ private void emitSrcIdDstIdToReleaseStagesMetric(final UUID srcId, final UUID ds for (final ReleaseStage stage : releaseStages) { if (stage != null) { - MetricClientFactory.getMetricClient().count(OssMetricsRegistry.JOB_CREATED_BY_RELEASE_STAGE, 1, MetricTags.getReleaseStage(stage)); + MetricClientFactory.getMetricClient().count(OssMetricsRegistry.JOB_CREATED_BY_RELEASE_STAGE, 1, + new MetricAttribute(MetricTags.RELEASE_STAGE, MetricTags.getReleaseStage(stage))); } } } @@ -229,7 +231,7 @@ public void attemptFailure(final AttemptFailureInput input) { emitJobIdToReleaseStagesMetric(OssMetricsRegistry.ATTEMPT_FAILED_BY_RELEASE_STAGE, jobId); for (final FailureReason reason : failureSummary.getFailures()) { MetricClientFactory.getMetricClient().count(OssMetricsRegistry.ATTEMPT_FAILED_BY_FAILURE_ORIGIN, 1, - MetricTags.getFailureOrigin(reason.getFailureOrigin())); + new MetricAttribute(MetricTags.FAILURE_ORIGIN, MetricTags.getFailureOrigin(reason.getFailureOrigin()))); } } catch (final IOException e) { @@ -328,7 +330,8 @@ private void emitJobIdToReleaseStagesMetric(final OssMetricsRegistry metric, fin for (final ReleaseStage stage : releaseStages) { if (stage != null) { - MetricClientFactory.getMetricClient().count(metric, 1, MetricTags.getReleaseStage(stage)); + MetricClientFactory.getMetricClient().count(metric, 1, + new MetricAttribute(MetricTags.RELEASE_STAGE, MetricTags.getReleaseStage(stage))); } } }