diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index 5e3ef890edee..718f37906907 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -77,6 +77,10 @@ public ConfigRepository(final ConfigPersistence persistence, this.database = new ExceptionWrappingDatabase(database); } + public ExceptionWrappingDatabase getDatabase() { + return database; + } + public StandardWorkspace getStandardWorkspace(final UUID workspaceId, final boolean includeTombstone) throws JsonValidationException, IOException, ConfigNotFoundException { final StandardWorkspace workspace = persistence.getConfig(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString(), StandardWorkspace.class); @@ -84,7 +88,6 @@ public StandardWorkspace getStandardWorkspace(final UUID workspaceId, final bool if (!MoreBooleans.isTruthy(workspace.getTombstone()) || includeTombstone) { return workspace; } - throw new ConfigNotFoundException(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString()); } diff --git a/airbyte-metrics/lib/build.gradle b/airbyte-metrics/lib/build.gradle index 1941dfd836fa..fddad3b6e31e 100644 --- a/airbyte-metrics/lib/build.gradle +++ b/airbyte-metrics/lib/build.gradle @@ -5,6 +5,11 @@ plugins { dependencies { implementation project(':airbyte-commons') implementation project(':airbyte-config:models') + implementation project(':airbyte-db:jooq') + implementation project(':airbyte-db:lib') implementation 'com.datadoghq:java-dogstatsd-client:4.0.0' + + testImplementation project(':airbyte-config:persistence') + testImplementation 'org.testcontainers:postgresql:1.15.3' } diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/DogStatsDMetricSingleton.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/DogStatsDMetricSingleton.java index e327309ab9c5..5cfec14e606b 100644 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/DogStatsDMetricSingleton.java +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/DogStatsDMetricSingleton.java @@ -62,7 +62,7 @@ public synchronized static void flush() { * @param amt to adjust. * @param tags */ - public static void count(final AirbyteMetricsRegistry metric, final double amt, final String... tags) { + public static void count(final MetricsRegistry metric, final double amt, final String... tags) { if (instancePublish) { if (statsDClient == null) { // do not loudly fail to prevent application disruption @@ -70,7 +70,7 @@ public static void count(final AirbyteMetricsRegistry metric, final double amt, return; } - log.info("publishing count, name: {}, value: {}", metric.metricName, amt); + log.info("publishing count, name: {}, value: {}, tags: {}", metric.metricName, amt, tags); statsDClient.count(metric.metricName, amt, tags); } } @@ -82,7 +82,7 @@ public static void count(final AirbyteMetricsRegistry metric, final double amt, * @param val to record. * @param tags */ - public static void gauge(final AirbyteMetricsRegistry metric, final double val, final String... tags) { + public static void gauge(final MetricsRegistry metric, final double val, final String... tags) { if (instancePublish) { if (statsDClient == null) { // do not loudly fail to prevent application disruption @@ -90,7 +90,7 @@ public static void gauge(final AirbyteMetricsRegistry metric, final double val, return; } - log.info("publishing gauge, name: {}, value: {}", metric, val); + log.info("publishing gauge, name: {}, value: {}, tags: {}", metric, val, tags); statsDClient.gauge(metric.metricName, val, tags); } } @@ -109,7 +109,7 @@ public static void gauge(final AirbyteMetricsRegistry metric, final double val, * @param val of time to record. * @param tags */ - public static void recordTimeLocal(final AirbyteMetricsRegistry metric, final double val, final String... tags) { + public static void recordTimeLocal(final MetricsRegistry metric, final double val, final String... tags) { if (instancePublish) { if (statsDClient == null) { // do not loudly fail to prevent application disruption @@ -117,7 +117,7 @@ public static void recordTimeLocal(final AirbyteMetricsRegistry metric, final do return; } - log.info("recording histogram, name: {}, value: {}", metric.metricName, val); + log.info("recording histogram, name: {}, value: {}, tags: {}", metric.metricName, val, tags); statsDClient.histogram(metric.metricName, val, tags); } } @@ -130,7 +130,7 @@ public static void recordTimeLocal(final AirbyteMetricsRegistry metric, final do * @param val of time to record. * @param tags */ - public static void recordTimeGlobal(final AirbyteMetricsRegistry metric, final double val, final String... tags) { + public static void recordTimeGlobal(final MetricsRegistry metric, final double val, final String... tags) { if (instancePublish) { if (statsDClient == null) { // do not loudly fail to prevent application disruption @@ -138,20 +138,20 @@ public static void recordTimeGlobal(final AirbyteMetricsRegistry metric, final d return; } - log.info("recording distribution, name: {}, value: {}", metric.metricName, val); + log.info("recording distribution, name: {}, value: {}, tags: {}", metric.metricName, val, tags); statsDClient.distribution(metric.metricName, val, tags); } } /** - * Wrapper of {@link #recordTimeGlobal(AirbyteMetricsRegistry, double, String...)} with a runnable - * for convenience. + * Wrapper of {@link #recordTimeGlobal(MetricsRegistry, double, String...)} with a runnable for + * convenience. * * @param metric * @param runnable to time * @param tags */ - public static void recordTimeGlobal(final AirbyteMetricsRegistry metric, final Runnable runnable, final String... tags) { + public static void recordTimeGlobal(final MetricsRegistry metric, final Runnable runnable, final String... tags) { final long start = System.currentTimeMillis(); runnable.run(); final long end = System.currentTimeMillis(); diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricEmittingApp.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricEmittingApp.java index c47700d23b17..5c538bb55e8e 100644 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricEmittingApp.java +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricEmittingApp.java @@ -7,7 +7,7 @@ /** * Interface representing an Airbyte Application to collect metrics for. This interface is present * as Java doesn't support enum inheritance as of Java 17. We use a shared interface so this - * interface can be used in the {@link AirbyteMetricsRegistry} enum. + * interface can be used in the {@link MetricsRegistry} enum. */ public interface MetricEmittingApp { diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricEmittingApps.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricEmittingApps.java index 104af5835bd2..cde70fcf671a 100644 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricEmittingApps.java +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricEmittingApps.java @@ -8,7 +8,7 @@ /** * Enum containing all applications metrics are emitted for. Used to initialize - * {@link DogStatsDMetricSingleton#initialize(MetricEmittingApp, boolean)}. + * {@link DogStatsDMetricSingleton#initialize(MetricEmittingApp, DatadogClientConfiguration)}. * * Application Name Conventions: *

diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricQueries.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricQueries.java new file mode 100644 index 000000000000..21e83cec2728 --- /dev/null +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricQueries.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.metrics.lib; + +import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR; +import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_DEFINITION; + +import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage; +import java.util.List; +import java.util.UUID; +import org.jooq.DSLContext; + +/** + * Keep track of all metric queries. + */ +public class MetricQueries { + + public static List jobIdToReleaseStages(final DSLContext ctx, final long jobId) { + final var srcRelStageCol = "src_release_stage"; + final var dstRelStageCol = "dst_release_stage"; + + final var query = String.format(""" + SELECT src_def_data.release_stage AS %s, + dest_def_data.release_stage AS %s + FROM connection + INNER JOIN jobs ON connection.id=CAST(jobs.scope AS uuid) + INNER JOIN actor AS dest_data ON connection.destination_id = dest_data.id + INNER JOIN actor_definition AS dest_def_data ON dest_data.actor_definition_id = dest_def_data.id + INNER JOIN actor AS src_data ON connection.source_id = src_data.id + INNER JOIN actor_definition AS src_def_data ON src_data.actor_definition_id = src_def_data.id + WHERE jobs.id = '%d';""", srcRelStageCol, dstRelStageCol, jobId); + + final var res = ctx.fetch(query); + final var stages = res.getValues(srcRelStageCol, ReleaseStage.class); + stages.addAll(res.getValues(dstRelStageCol, ReleaseStage.class)); + return stages; + } + + public static List srcIdAndDestIdToReleaseStages(final DSLContext ctx, final UUID srcId, final UUID dstId) { + return ctx.select(ACTOR_DEFINITION.RELEASE_STAGE).from(ACTOR).join(ACTOR_DEFINITION).on(ACTOR.ACTOR_DEFINITION_ID.eq(ACTOR_DEFINITION.ID)) + .where(ACTOR.ID.eq(srcId)) + .or(ACTOR.ID.eq(dstId)).fetch().getValues(ACTOR_DEFINITION.RELEASE_STAGE); + } + +} diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java new file mode 100644 index 000000000000..9e2b98d5659b --- /dev/null +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.metrics.lib; + +import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage; + +/** + * Keep track of all metric tags. + */ +public class MetricTags { + + private static final String RELEASE_STAGE = "release_stage:"; + + public static String getReleaseStage(final ReleaseStage stage) { + return RELEASE_STAGE + ":" + stage.getLiteral(); + } + +} diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/AirbyteMetricsRegistry.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsRegistry.java similarity index 64% rename from airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/AirbyteMetricsRegistry.java rename to airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsRegistry.java index 515abfbca1af..e80b27845618 100644 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/AirbyteMetricsRegistry.java +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsRegistry.java @@ -28,8 +28,24 @@ * - Add units at name end if applicable. This is especially relevant for time units. versioning * tactic and present at the end of the metric. */ -public enum AirbyteMetricsRegistry { +public enum MetricsRegistry { + JOB_CANCELLED_BY_RELEASE_STAGE( + MetricEmittingApps.WORKER, + "job_cancelled_by_release_stage", + "increments when a job is cancelled. jobs are double counted as this is tagged by release stage."), + JOB_CREATED_BY_RELEASE_STAGE( + MetricEmittingApps.WORKER, + "job_created_by_release_stage", + "increments when a new job is created. jobs are double counted as this is tagged by release stage."), + JOB_FAILED_BY_RELEASE_STAGE( + MetricEmittingApps.WORKER, + "job_failed_by_release_stage", + "increments when a job fails. jobs are double counted as this is tagged by release stage."), + JOB_SUCCEEDED_BY_RELEASE_STAGE( + MetricEmittingApps.WORKER, + "job_succeeded_by_release_stage", + "increments when a job succeeds. jobs are double counted as this is tagged by release stage."), KUBE_POD_PROCESS_CREATE_TIME_MILLISECS( MetricEmittingApps.WORKER, "kube_pod_process_create_time_millisecs", @@ -39,7 +55,7 @@ public enum AirbyteMetricsRegistry { public final String metricName; public final String metricDescription; - AirbyteMetricsRegistry(final MetricEmittingApp application, final String metricName, final String metricDescription) { + MetricsRegistry(final MetricEmittingApp application, final String metricName, final String metricDescription) { Preconditions.checkNotNull(metricDescription); Preconditions.checkNotNull(application); diff --git a/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/llib/DogStatsDMetricSingletonTest.java b/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/lib/DogStatsDMetricSingletonTest.java similarity index 68% rename from airbyte-metrics/lib/src/test/java/io/airbyte/metrics/llib/DogStatsDMetricSingletonTest.java rename to airbyte-metrics/lib/src/test/java/io/airbyte/metrics/lib/DogStatsDMetricSingletonTest.java index 00fad98f848b..dc6a929bea2d 100644 --- a/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/llib/DogStatsDMetricSingletonTest.java +++ b/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/lib/DogStatsDMetricSingletonTest.java @@ -2,12 +2,8 @@ * Copyright (c) 2021 Airbyte, Inc., all rights reserved. */ -package io.airbyte.metrics.llib; +package io.airbyte.metrics.lib; -import io.airbyte.metrics.lib.AirbyteMetricsRegistry; -import io.airbyte.metrics.lib.DatadogClientConfiguration; -import io.airbyte.metrics.lib.DogStatsDMetricSingleton; -import io.airbyte.metrics.lib.MetricEmittingApps; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.DisplayName; @@ -25,7 +21,7 @@ void tearDown() { public void testPublishTrueNoEmitError() { Assertions.assertDoesNotThrow(() -> { DogStatsDMetricSingleton.initialize(MetricEmittingApps.WORKER, new DatadogClientConfiguration("localhost", "1000", false)); - DogStatsDMetricSingleton.gauge(AirbyteMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1); + DogStatsDMetricSingleton.gauge(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1); }); } @@ -34,7 +30,7 @@ public void testPublishTrueNoEmitError() { public void testPublishFalseNoEmitError() { Assertions.assertDoesNotThrow(() -> { DogStatsDMetricSingleton.initialize(MetricEmittingApps.WORKER, new DatadogClientConfiguration("localhost", "1000", true)); - DogStatsDMetricSingleton.gauge(AirbyteMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1); + DogStatsDMetricSingleton.gauge(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1); }); } @@ -42,7 +38,7 @@ public void testPublishFalseNoEmitError() { @DisplayName("there should be no exception if we attempt to emit metrics without initializing") public void testNoInitializeNoEmitError() { Assertions.assertDoesNotThrow(() -> { - DogStatsDMetricSingleton.gauge(AirbyteMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1); + DogStatsDMetricSingleton.gauge(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1); }); } diff --git a/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/lib/MetrisQueriesTest.java b/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/lib/MetrisQueriesTest.java new file mode 100644 index 000000000000..b55bd1c51ae5 --- /dev/null +++ b/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/lib/MetrisQueriesTest.java @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.metrics.lib; + +import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR; +import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_DEFINITION; +import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION; +import static io.airbyte.db.instance.jobs.jooq.Tables.*; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import io.airbyte.db.Database; +import io.airbyte.db.instance.configs.jooq.enums.ActorType; +import io.airbyte.db.instance.configs.jooq.enums.NamespaceDefinitionType; +import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage; +import io.airbyte.db.instance.test.TestDatabaseProviders; +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; +import java.util.UUID; +import org.jooq.JSONB; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.PostgreSQLContainer; + +public class MetrisQueriesTest { + + private static final String USER = "user"; + private static final String PASS = "hunter2"; + + private static final UUID SRC_DEF_ID = UUID.randomUUID(); + private static final UUID DST_DEF_ID = UUID.randomUUID(); + + private static PostgreSQLContainer container; + private static Database configDb; + + @BeforeAll + static void setUpAll() throws IOException, SQLException { + container = new PostgreSQLContainer<>("postgres:13-alpine") + .withUsername(USER) + .withPassword(PASS); + container.start(); + + final TestDatabaseProviders databaseProviders = new TestDatabaseProviders(container); + configDb = databaseProviders.createNewConfigsDatabase(); + databaseProviders.createNewJobsDatabase(); + + // create src and dst def + configDb.transaction(ctx -> ctx + .insertInto(ACTOR_DEFINITION, ACTOR_DEFINITION.ID, ACTOR_DEFINITION.NAME, ACTOR_DEFINITION.DOCKER_REPOSITORY, + ACTOR_DEFINITION.DOCKER_IMAGE_TAG, ACTOR_DEFINITION.SPEC, ACTOR_DEFINITION.ACTOR_TYPE, ACTOR_DEFINITION.RELEASE_STAGE) + .values(SRC_DEF_ID, "srcDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.source, ReleaseStage.beta) + .values(DST_DEF_ID, "dstDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.destination, ReleaseStage.generally_available) + .values(UUID.randomUUID(), "dstDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.destination, ReleaseStage.alpha).execute()); + + // drop the constraint to simplify following test set up + configDb.transaction(ctx -> ctx.alterTable(ACTOR).dropForeignKey("actor_workspace_id_fkey").execute()); + } + + @AfterEach + void tearDown() throws SQLException { + configDb.transaction(ctx -> ctx.truncate(ACTOR)); + } + + @Nested + class srcIdAndDestIdToReleaseStages { + + @Test + @DisplayName("should return the right release stages") + void shouldReturnReleaseStages() throws SQLException { + final var srcId = UUID.randomUUID(); + final var dstId = UUID.randomUUID(); + + // create src and dst + configDb.transaction( + ctx -> ctx.insertInto(ACTOR, ACTOR.ID, ACTOR.WORKSPACE_ID, ACTOR.ACTOR_DEFINITION_ID, ACTOR.NAME, ACTOR.CONFIGURATION, ACTOR.ACTOR_TYPE) + .values(srcId, UUID.randomUUID(), SRC_DEF_ID, "src", JSONB.valueOf("{}"), ActorType.source) + .values(dstId, UUID.randomUUID(), DST_DEF_ID, "dst", JSONB.valueOf("{}"), ActorType.destination) + .execute()); + final var res = configDb.query(ctx -> MetricQueries.srcIdAndDestIdToReleaseStages(ctx, srcId, dstId)); + assertEquals(List.of(ReleaseStage.beta, ReleaseStage.generally_available), res); + } + + @Test + @DisplayName("should not error out or return any result if not applicable") + void shouldReturnNothingIfNotApplicable() throws SQLException { + final var res = configDb.query(ctx -> MetricQueries.srcIdAndDestIdToReleaseStages(ctx, UUID.randomUUID(), UUID.randomUUID())); + assertEquals(0, res.size()); + } + + } + + @Nested + class jobIdToReleaseStages { + + @Test + @DisplayName("should return the right release stages") + void shouldReturnReleaseStages() throws SQLException { + final var srcId = UUID.randomUUID(); + final var dstId = UUID.randomUUID(); + // create src and dst + configDb.transaction( + ctx -> ctx.insertInto(ACTOR, ACTOR.ID, ACTOR.WORKSPACE_ID, ACTOR.ACTOR_DEFINITION_ID, ACTOR.NAME, ACTOR.CONFIGURATION, ACTOR.ACTOR_TYPE) + .values(srcId, UUID.randomUUID(), SRC_DEF_ID, "src", JSONB.valueOf("{}"), ActorType.source) + .values(dstId, UUID.randomUUID(), DST_DEF_ID, "dst", JSONB.valueOf("{}"), ActorType.destination) + .execute()); + final var connId = UUID.randomUUID(); + // create connection + configDb.transaction( + ctx -> ctx + .insertInto(CONNECTION, CONNECTION.ID, CONNECTION.NAMESPACE_DEFINITION, CONNECTION.SOURCE_ID, CONNECTION.DESTINATION_ID, + CONNECTION.NAME, CONNECTION.CATALOG, CONNECTION.MANUAL) + .values(connId, NamespaceDefinitionType.source, srcId, dstId, "conn", JSONB.valueOf("{}"), true) + .execute()); + // create job + final var jobId = 1L; + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE).values(jobId, connId.toString()).execute()); + + final var res = configDb.query(ctx -> MetricQueries.jobIdToReleaseStages(ctx, jobId)); + assertEquals(List.of(ReleaseStage.beta, ReleaseStage.generally_available), res); + } + + @Test + @DisplayName("should not error out or return any result if not applicable") + void shouldReturnNothingIfNotApplicable() throws SQLException { + final var missingJobId = 100000L; + final var res = configDb.query(ctx -> MetricQueries.jobIdToReleaseStages(ctx, missingJobId)); + assertEquals(0, res.size()); + } + + } + +} diff --git a/airbyte-workers/build.gradle b/airbyte-workers/build.gradle index 020400c9339f..e37a5b9a4482 100644 --- a/airbyte-workers/build.gradle +++ b/airbyte-workers/build.gradle @@ -24,6 +24,7 @@ dependencies { implementation project(':airbyte-commons-docker') implementation project(':airbyte-config:models') implementation project(':airbyte-config:persistence') + implementation project(':airbyte-db:jooq') implementation project(':airbyte-db:lib') implementation project(':airbyte-metrics:lib') implementation project(':airbyte-json-validation') diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java index b56126881c2c..ee6c9754d369 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -10,8 +10,8 @@ import io.airbyte.commons.resources.MoreResources; import io.airbyte.config.ResourceRequirements; import io.airbyte.config.TolerationPOJO; -import io.airbyte.metrics.lib.AirbyteMetricsRegistry; import io.airbyte.metrics.lib.DogStatsDMetricSingleton; +import io.airbyte.metrics.lib.MetricsRegistry; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ContainerBuilder; import io.fabric8.kubernetes.api.model.ContainerPort; @@ -521,7 +521,7 @@ public KubePodProcess(final boolean isOrchestrator, final boolean isReady = Objects.nonNull(p) && Readiness.getInstance().isReady(p); return isReady || isTerminal(p); }, 20, TimeUnit.MINUTES); - DogStatsDMetricSingleton.recordTimeGlobal(AirbyteMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, System.currentTimeMillis() - start); + DogStatsDMetricSingleton.recordTimeGlobal(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, System.currentTimeMillis() - start); // allow writing stdin to pod LOGGER.info("Reading pod IP..."); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index b64b8197c717..47e49165ac50 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -17,6 +17,11 @@ import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage; +import io.airbyte.metrics.lib.DogStatsDMetricSingleton; +import io.airbyte.metrics.lib.MetricQueries; +import io.airbyte.metrics.lib.MetricTags; +import io.airbyte.metrics.lib.MetricsRegistry; import io.airbyte.scheduler.models.Job; import io.airbyte.scheduler.persistence.JobCreator; import io.airbyte.scheduler.persistence.JobNotifier; @@ -33,6 +38,7 @@ import java.nio.file.Path; import java.util.List; import java.util.Optional; +import java.util.UUID; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -53,8 +59,8 @@ public class JobCreationAndStatusUpdateActivityImpl implements JobCreationAndSta @Override public JobCreationOutput createNewJob(final JobCreationInput input) { try { + final StandardSync standardSync = configRepository.getStandardSync(input.getConnectionId()); if (input.isReset()) { - final StandardSync standardSync = configRepository.getStandardSync(input.getConnectionId()); final DestinationConnection destination = configRepository.getDestinationConnection(standardSync.getDestinationId()); @@ -80,6 +86,7 @@ public JobCreationOutput createNewJob(final JobCreationInput input) { final long jobId = jobFactory.create(input.getConnectionId()); log.info("New job created, with id: " + jobId); + emitSrcIdDstIdToReleaseStagesMetric(standardSync.getSourceId(), standardSync.getDestinationId()); return new JobCreationOutput(jobId); } @@ -88,6 +95,17 @@ public JobCreationOutput createNewJob(final JobCreationInput input) { } } + private void emitSrcIdDstIdToReleaseStagesMetric(final UUID srcId, final UUID dstId) throws IOException { + final var releaseStages = configRepository.getDatabase().query(ctx -> MetricQueries.srcIdAndDestIdToReleaseStages(ctx, srcId, dstId)); + if (releaseStages == null) { + return; + } + + for (final ReleaseStage stage : releaseStages) { + DogStatsDMetricSingleton.count(MetricsRegistry.JOB_CREATED_BY_RELEASE_STAGE, 1, MetricTags.getReleaseStage(stage)); + } + } + @Override public AttemptCreationOutput createNewAttempt(final AttemptCreationInput input) throws RetryableException { try { @@ -108,15 +126,20 @@ public AttemptCreationOutput createNewAttempt(final AttemptCreationInput input) @Override public void jobSuccess(final JobSuccessInput input) { try { + final long jobId = input.getJobId(); + final int attemptId = input.getAttemptId(); + if (input.getStandardSyncOutput() != null) { final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput()); - jobPersistence.writeOutput(input.getJobId(), input.getAttemptId(), jobOutput); + jobPersistence.writeOutput(jobId, attemptId, jobOutput); } else { - log.warn("The job {} doesn't have any output for the attempt {}", input.getJobId(), input.getAttemptId()); + log.warn("The job {} doesn't have any output for the attempt {}", jobId, attemptId); } - jobPersistence.succeedAttempt(input.getJobId(), input.getAttemptId()); - final Job job = jobPersistence.getJob(input.getJobId()); + jobPersistence.succeedAttempt(jobId, attemptId); + final Job job = jobPersistence.getJob(jobId); + jobNotifier.successJob(job); + emitJobIdToReleaseStagesMetric(MetricsRegistry.JOB_SUCCEEDED_BY_RELEASE_STAGE, jobId); trackCompletion(job, JobStatus.SUCCEEDED); } catch (final IOException e) { throw new RetryableException(e); @@ -126,9 +149,12 @@ public void jobSuccess(final JobSuccessInput input) { @Override public void jobFailure(final JobFailureInput input) { try { - jobPersistence.failJob(input.getJobId()); - final Job job = jobPersistence.getJob(input.getJobId()); + final var jobId = input.getJobId(); + jobPersistence.failJob(jobId); + final Job job = jobPersistence.getJob(jobId); + jobNotifier.failJob(input.getReason(), job); + emitJobIdToReleaseStagesMetric(MetricsRegistry.JOB_FAILED_BY_RELEASE_STAGE, jobId); trackCompletion(job, JobStatus.FAILED); } catch (final IOException e) { throw new RetryableException(e); @@ -154,12 +180,15 @@ public void attemptFailure(final AttemptFailureInput input) { @Override public void jobCancelled(final JobCancelledInput input) { try { - jobPersistence.cancelJob(input.getJobId()); - jobPersistence.failAttempt(input.getJobId(), input.getAttemptId()); - jobPersistence.writeAttemptFailureSummary(input.getJobId(), input.getAttemptId(), input.getAttemptFailureSummary()); + final long jobId = input.getJobId(); + jobPersistence.cancelJob(jobId); + final int attemptId = input.getAttemptId(); + jobPersistence.failAttempt(jobId, attemptId); + jobPersistence.writeAttemptFailureSummary(jobId, attemptId, input.getAttemptFailureSummary()); - final Job job = jobPersistence.getJob(input.getJobId()); + final Job job = jobPersistence.getJob(jobId); trackCompletion(job, JobStatus.FAILED); + emitJobIdToReleaseStagesMetric(MetricsRegistry.JOB_SUCCEEDED_BY_RELEASE_STAGE, jobId); jobNotifier.failJob("Job was cancelled", job); } catch (final IOException e) { throw new RetryableException(e); @@ -176,6 +205,17 @@ public void reportJobStart(final ReportJobStartInput input) { } } + private void emitJobIdToReleaseStagesMetric(final MetricsRegistry metric, final long jobId) throws IOException { + final var releaseStages = configRepository.getDatabase().query(ctx -> MetricQueries.jobIdToReleaseStages(ctx, jobId)); + if (releaseStages == null) { + return; + } + + for (final ReleaseStage stage : releaseStages) { + DogStatsDMetricSingleton.count(metric, 1, MetricTags.getReleaseStage(stage)); + } + } + private void trackCompletion(final Job job, final io.airbyte.workers.JobStatus status) { jobTracker.trackSync(job, Enums.convertTo(status, JobState.class)); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index 097aabd0947e..475f93c7dffc 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -9,17 +9,22 @@ import io.airbyte.config.FailureReason; import io.airbyte.config.FailureReason.FailureOrigin; import io.airbyte.config.JobOutput; +import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary; import io.airbyte.config.StandardSyncSummary.ReplicationStatus; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.helpers.LogConfigs; +import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.db.ExceptionWrappingDatabase; import io.airbyte.scheduler.models.Job; import io.airbyte.scheduler.persistence.JobNotifier; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory; import io.airbyte.scheduler.persistence.job_tracker.JobTracker; import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState; +import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.temporal.exception.RetryableException; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationInput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationOutput; @@ -70,6 +75,9 @@ public class JobCreationAndStatusUpdateActivityTest { @Mock private JobTracker mJobtracker; + @Mock + private ConfigRepository mConfigRepository; + @InjectMocks private JobCreationAndStatusUpdateActivityImpl jobCreationAndStatusUpdateActivity; @@ -93,9 +101,13 @@ class Creation { @Test @DisplayName("Test job creation") - public void createJob() { + public void createJob() throws JsonValidationException, ConfigNotFoundException, IOException { Mockito.when(mJobFactory.create(CONNECTION_ID)) .thenReturn(JOB_ID); + Mockito.when(mConfigRepository.getStandardSync(CONNECTION_ID)) + .thenReturn(Mockito.mock(StandardSync.class)); + Mockito.when(mConfigRepository.getDatabase()) + .thenReturn(Mockito.mock(ExceptionWrappingDatabase.class)); final JobCreationOutput output = jobCreationAndStatusUpdateActivity.createNewJob(new JobCreationInput(CONNECTION_ID, false)); @@ -157,6 +169,8 @@ class Update { @Test public void setJobSuccess() throws IOException { + Mockito.when(mConfigRepository.getDatabase()).thenReturn(Mockito.mock(ExceptionWrappingDatabase.class)); + jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_ID, standardSyncOutput)); Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput); @@ -177,6 +191,8 @@ public void setJobSuccessWrapException() throws IOException { @Test public void setJobFailure() throws IOException { + Mockito.when(mConfigRepository.getDatabase()).thenReturn(Mockito.mock(ExceptionWrappingDatabase.class)); + jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, "reason")); Mockito.verify(mJobPersistence).failJob(JOB_ID); @@ -216,6 +232,8 @@ public void setAttemptFailureWrapException() throws IOException { @Test public void setJobCancelled() throws IOException { + Mockito.when(mConfigRepository.getDatabase()).thenReturn(Mockito.mock(ExceptionWrappingDatabase.class)); + jobCreationAndStatusUpdateActivity.jobCancelled(new JobCancelledInput(JOB_ID, ATTEMPT_ID, failureSummary)); Mockito.verify(mJobPersistence).cancelJob(JOB_ID);