Skip to content

Commit

Permalink
Cloud Dashboard 1 (#10628)
Browse files Browse the repository at this point in the history
Publish metrics for:
- created jobs tagged by release stage
- failed jobs tagged by release stage
- cancelled jobs tagged by release stage
- succeed jobs tagged by release stage
  • Loading branch information
davinchia authored Feb 25, 2022
1 parent 3a5beb7 commit 5bc6d81
Show file tree
Hide file tree
Showing 14 changed files with 322 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,17 @@ public ConfigRepository(final ConfigPersistence persistence,
this.database = new ExceptionWrappingDatabase(database);
}

public ExceptionWrappingDatabase getDatabase() {
return database;
}

public StandardWorkspace getStandardWorkspace(final UUID workspaceId, final boolean includeTombstone)
throws JsonValidationException, IOException, ConfigNotFoundException {
final StandardWorkspace workspace = persistence.getConfig(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString(), StandardWorkspace.class);

if (!MoreBooleans.isTruthy(workspace.getTombstone()) || includeTombstone) {
return workspace;
}

throw new ConfigNotFoundException(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString());
}

Expand Down
5 changes: 5 additions & 0 deletions airbyte-metrics/lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ plugins {
dependencies {
implementation project(':airbyte-commons')
implementation project(':airbyte-config:models')
implementation project(':airbyte-db:jooq')
implementation project(':airbyte-db:lib')

implementation 'com.datadoghq:java-dogstatsd-client:4.0.0'

testImplementation project(':airbyte-config:persistence')
testImplementation 'org.testcontainers:postgresql:1.15.3'
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ public synchronized static void flush() {
* @param amt to adjust.
* @param tags
*/
public static void count(final AirbyteMetricsRegistry metric, final double amt, final String... tags) {
public static void count(final MetricsRegistry metric, final double amt, final String... tags) {
if (instancePublish) {
if (statsDClient == null) {
// do not loudly fail to prevent application disruption
log.warn("singleton not initialized, count {} not emitted", metric.metricName);
return;
}

log.info("publishing count, name: {}, value: {}", metric.metricName, amt);
log.info("publishing count, name: {}, value: {}, tags: {}", metric.metricName, amt, tags);
statsDClient.count(metric.metricName, amt, tags);
}
}
Expand All @@ -82,15 +82,15 @@ public static void count(final AirbyteMetricsRegistry metric, final double amt,
* @param val to record.
* @param tags
*/
public static void gauge(final AirbyteMetricsRegistry metric, final double val, final String... tags) {
public static void gauge(final MetricsRegistry metric, final double val, final String... tags) {
if (instancePublish) {
if (statsDClient == null) {
// do not loudly fail to prevent application disruption
log.warn("singleton not initialized, gauge {} not emitted", metric.metricName);
return;
}

log.info("publishing gauge, name: {}, value: {}", metric, val);
log.info("publishing gauge, name: {}, value: {}, tags: {}", metric, val, tags);
statsDClient.gauge(metric.metricName, val, tags);
}
}
Expand All @@ -109,15 +109,15 @@ public static void gauge(final AirbyteMetricsRegistry metric, final double val,
* @param val of time to record.
* @param tags
*/
public static void recordTimeLocal(final AirbyteMetricsRegistry metric, final double val, final String... tags) {
public static void recordTimeLocal(final MetricsRegistry metric, final double val, final String... tags) {
if (instancePublish) {
if (statsDClient == null) {
// do not loudly fail to prevent application disruption
log.warn("singleton not initialized, histogram {} not emitted", metric.metricName);
return;
}

log.info("recording histogram, name: {}, value: {}", metric.metricName, val);
log.info("recording histogram, name: {}, value: {}, tags: {}", metric.metricName, val, tags);
statsDClient.histogram(metric.metricName, val, tags);
}
}
Expand All @@ -130,28 +130,28 @@ public static void recordTimeLocal(final AirbyteMetricsRegistry metric, final do
* @param val of time to record.
* @param tags
*/
public static void recordTimeGlobal(final AirbyteMetricsRegistry metric, final double val, final String... tags) {
public static void recordTimeGlobal(final MetricsRegistry metric, final double val, final String... tags) {
if (instancePublish) {
if (statsDClient == null) {
// do not loudly fail to prevent application disruption
log.warn("singleton not initialized, distribution {} not emitted", metric.metricName);
return;
}

log.info("recording distribution, name: {}, value: {}", metric.metricName, val);
log.info("recording distribution, name: {}, value: {}, tags: {}", metric.metricName, val, tags);
statsDClient.distribution(metric.metricName, val, tags);
}
}

/**
* Wrapper of {@link #recordTimeGlobal(AirbyteMetricsRegistry, double, String...)} with a runnable
* for convenience.
* Wrapper of {@link #recordTimeGlobal(MetricsRegistry, double, String...)} with a runnable for
* convenience.
*
* @param metric
* @param runnable to time
* @param tags
*/
public static void recordTimeGlobal(final AirbyteMetricsRegistry metric, final Runnable runnable, final String... tags) {
public static void recordTimeGlobal(final MetricsRegistry metric, final Runnable runnable, final String... tags) {
final long start = System.currentTimeMillis();
runnable.run();
final long end = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
/**
* Interface representing an Airbyte Application to collect metrics for. This interface is present
* as Java doesn't support enum inheritance as of Java 17. We use a shared interface so this
* interface can be used in the {@link AirbyteMetricsRegistry} enum.
* interface can be used in the {@link MetricsRegistry} enum.
*/
public interface MetricEmittingApp {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

/**
* Enum containing all applications metrics are emitted for. Used to initialize
* {@link DogStatsDMetricSingleton#initialize(MetricEmittingApp, boolean)}.
* {@link DogStatsDMetricSingleton#initialize(MetricEmittingApp, DatadogClientConfiguration)}.
*
* Application Name Conventions:
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.metrics.lib;

import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR;
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_DEFINITION;

import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage;
import java.util.List;
import java.util.UUID;
import org.jooq.DSLContext;

/**
* Keep track of all metric queries.
*/
public class MetricQueries {

public static List<ReleaseStage> jobIdToReleaseStages(final DSLContext ctx, final long jobId) {
final var srcRelStageCol = "src_release_stage";
final var dstRelStageCol = "dst_release_stage";

final var query = String.format("""
SELECT src_def_data.release_stage AS %s,
dest_def_data.release_stage AS %s
FROM connection
INNER JOIN jobs ON connection.id=CAST(jobs.scope AS uuid)
INNER JOIN actor AS dest_data ON connection.destination_id = dest_data.id
INNER JOIN actor_definition AS dest_def_data ON dest_data.actor_definition_id = dest_def_data.id
INNER JOIN actor AS src_data ON connection.source_id = src_data.id
INNER JOIN actor_definition AS src_def_data ON src_data.actor_definition_id = src_def_data.id
WHERE jobs.id = '%d';""", srcRelStageCol, dstRelStageCol, jobId);

final var res = ctx.fetch(query);
final var stages = res.getValues(srcRelStageCol, ReleaseStage.class);
stages.addAll(res.getValues(dstRelStageCol, ReleaseStage.class));
return stages;
}

public static List<ReleaseStage> srcIdAndDestIdToReleaseStages(final DSLContext ctx, final UUID srcId, final UUID dstId) {
return ctx.select(ACTOR_DEFINITION.RELEASE_STAGE).from(ACTOR).join(ACTOR_DEFINITION).on(ACTOR.ACTOR_DEFINITION_ID.eq(ACTOR_DEFINITION.ID))
.where(ACTOR.ID.eq(srcId))
.or(ACTOR.ID.eq(dstId)).fetch().getValues(ACTOR_DEFINITION.RELEASE_STAGE);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.metrics.lib;

import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage;

/**
* Keep track of all metric tags.
*/
public class MetricTags {

private static final String RELEASE_STAGE = "release_stage:";

public static String getReleaseStage(final ReleaseStage stage) {
return RELEASE_STAGE + ":" + stage.getLiteral();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,24 @@
* - Add units at name end if applicable. This is especially relevant for time units. versioning
* tactic and present at the end of the metric.
*/
public enum AirbyteMetricsRegistry {
public enum MetricsRegistry {

JOB_CANCELLED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_cancelled_by_release_stage",
"increments when a job is cancelled. jobs are double counted as this is tagged by release stage."),
JOB_CREATED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_created_by_release_stage",
"increments when a new job is created. jobs are double counted as this is tagged by release stage."),
JOB_FAILED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_failed_by_release_stage",
"increments when a job fails. jobs are double counted as this is tagged by release stage."),
JOB_SUCCEEDED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_succeeded_by_release_stage",
"increments when a job succeeds. jobs are double counted as this is tagged by release stage."),
KUBE_POD_PROCESS_CREATE_TIME_MILLISECS(
MetricEmittingApps.WORKER,
"kube_pod_process_create_time_millisecs",
Expand All @@ -39,7 +55,7 @@ public enum AirbyteMetricsRegistry {
public final String metricName;
public final String metricDescription;

AirbyteMetricsRegistry(final MetricEmittingApp application, final String metricName, final String metricDescription) {
MetricsRegistry(final MetricEmittingApp application, final String metricName, final String metricDescription) {
Preconditions.checkNotNull(metricDescription);
Preconditions.checkNotNull(application);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.metrics.llib;
package io.airbyte.metrics.lib;

import io.airbyte.metrics.lib.AirbyteMetricsRegistry;
import io.airbyte.metrics.lib.DatadogClientConfiguration;
import io.airbyte.metrics.lib.DogStatsDMetricSingleton;
import io.airbyte.metrics.lib.MetricEmittingApps;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
Expand All @@ -25,7 +21,7 @@ void tearDown() {
public void testPublishTrueNoEmitError() {
Assertions.assertDoesNotThrow(() -> {
DogStatsDMetricSingleton.initialize(MetricEmittingApps.WORKER, new DatadogClientConfiguration("localhost", "1000", false));
DogStatsDMetricSingleton.gauge(AirbyteMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
DogStatsDMetricSingleton.gauge(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
});
}

Expand All @@ -34,15 +30,15 @@ public void testPublishTrueNoEmitError() {
public void testPublishFalseNoEmitError() {
Assertions.assertDoesNotThrow(() -> {
DogStatsDMetricSingleton.initialize(MetricEmittingApps.WORKER, new DatadogClientConfiguration("localhost", "1000", true));
DogStatsDMetricSingleton.gauge(AirbyteMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
DogStatsDMetricSingleton.gauge(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
});
}

@Test
@DisplayName("there should be no exception if we attempt to emit metrics without initializing")
public void testNoInitializeNoEmitError() {
Assertions.assertDoesNotThrow(() -> {
DogStatsDMetricSingleton.gauge(AirbyteMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
DogStatsDMetricSingleton.gauge(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
});
}

Expand Down
Loading

0 comments on commit 5bc6d81

Please sign in to comment.