Skip to content

Commit

Permalink
query to find out jobs running unusally long (#17978)
Browse files Browse the repository at this point in the history
* query to find out jobs running unusally long

* comments fix

* add more indents for better readability

* Update airbyte-metrics/reporter/src/main/java/io/airbyte/metrics/reporter/MetricRepository.java

Co-authored-by: Davin Chia <[email protected]>

* formatting

Co-authored-by: Davin Chia <[email protected]>
  • Loading branch information
xiaohansong and davinchia authored Oct 19, 2022
1 parent e361ef6 commit 6f88bdb
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,6 @@ public enum OssMetricsRegistry implements MetricsRegistry {
MetricEmittingApps.WORKER,
"job_succeeded_by_release_stage",
"increments when a job succeeds. jobs are double counted as this is tagged by release stage."),
KUBE_POD_PROCESS_CREATE_TIME_MILLISECS(
MetricEmittingApps.WORKER,
"kube_pod_process_create_time_millisecs",
"time taken to create a new kube pod process"),
JSON_STRING_LENGTH(
MetricEmittingApps.WORKER,
"json_string_length",
Expand All @@ -79,30 +75,43 @@ public enum OssMetricsRegistry implements MetricsRegistry {
MetricEmittingApps.WORKER,
"json_size",
"size of the json object"),
KUBE_POD_PROCESS_CREATE_TIME_MILLISECS(
MetricEmittingApps.WORKER,
"kube_pod_process_create_time_millisecs",
"time taken to create a new kube pod process"),
NUM_ABNORMAL_SCHEDULED_SYNCS_IN_LAST_DAY(
MetricEmittingApps.METRICS_REPORTER,
"num_abnormal_scheduled_syncs_last_day",
"number of abnormal syncs that have skipped at least 1 scheduled run in last day."),
NUM_ACTIVE_CONN_PER_WORKSPACE(
MetricEmittingApps.METRICS_REPORTER,
"num_active_conn_per_workspace",
"number of active connections per workspace"),
NUM_PENDING_JOBS(
MetricEmittingApps.METRICS_REPORTER,
"num_pending_jobs",
"number of pending jobs"),
NUM_RUNNING_JOBS(
MetricEmittingApps.METRICS_REPORTER,
"num_running_jobs",
"number of running jobs"),
NUM_ORPHAN_RUNNING_JOBS(
MetricEmittingApps.METRICS_REPORTER,
"num_orphan_running_jobs",
"number of jobs reported as running that as associated to connection inactive or deprecated"),
NUM_ACTIVE_CONN_PER_WORKSPACE(
MetricEmittingApps.METRICS_REPORTER,
"num_active_conn_per_workspace",
"number of active connections per workspace"),
NUM_ABNORMAL_SCHEDULED_SYNCS_IN_LAST_DAY(
NUM_RUNNING_JOBS(
MetricEmittingApps.METRICS_REPORTER,
"num_abnormal_scheduled_syncs_last_day",
"number of abnormal syncs that have skipped at least 1 scheduled run in last day."),
"num_running_jobs",
"number of running jobs"),
NUM_SOURCE_STREAMS_WITH_RECORD_SCHEMA_VALIDATION_ERRORS(MetricEmittingApps.WORKER,
"record_schema_validation_error",
"number of record schema validation errors"),
NUM_TOTAL_SCHEDULED_SYNCS_IN_LAST_DAY(
MetricEmittingApps.METRICS_REPORTER,
"num_total_scheduled_syncs_last_day",
"number of total syncs runs in last day."),

NUM_UNUSUALLY_LONG_SYNCS(
MetricEmittingApps.METRICS_REPORTER,
"num_unusually_long_syncs",
"number of unusual long syncs compared to their historic performance."),

OLDEST_PENDING_JOB_AGE_SECS(MetricEmittingApps.METRICS_REPORTER,
"oldest_pending_job_age_secs",
"oldest pending job in seconds"),
Expand All @@ -112,6 +121,9 @@ public enum OssMetricsRegistry implements MetricsRegistry {
OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS(MetricEmittingApps.METRICS_REPORTER,
"overall_job_runtime_in_last_hour_by_terminal_state_secs",
"overall job runtime - scheduling and execution for all attempts - for jobs that reach terminal states in the last hour. tagged by terminal states."),
STATE_METRIC_TRACKER_ERROR(MetricEmittingApps.WORKER,
"state_timestamp_metric_tracker_error",
"number of syncs where the state timestamp metric tracker ran out of memory or was unable to match destination state message to source state message"),
TEMPORAL_WORKFLOW_ATTEMPT(MetricEmittingApps.WORKER,
"temporal_workflow_attempt",
"count of the number of workflow attempts"),
Expand All @@ -120,13 +132,7 @@ public enum OssMetricsRegistry implements MetricsRegistry {
"count of the number of successful workflow syncs."),
TEMPORAL_WORKFLOW_FAILURE(MetricEmittingApps.WORKER,
"temporal_workflow_failure",
"count of the number of workflow failures"),
NUM_SOURCE_STREAMS_WITH_RECORD_SCHEMA_VALIDATION_ERRORS(MetricEmittingApps.WORKER,
"record_schema_validation_error",
"number of record schema validation errors"),
STATE_METRIC_TRACKER_ERROR(MetricEmittingApps.WORKER,
"state_timestamp_metric_tracker_error",
"number of syncs where the state timestamp metric tracker ran out of memory or was unable to match destination state message to source state message");
"count of the number of workflow failures");

private final MetricEmittingApp application;
private final String metricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,24 @@ public Duration getDuration() {

}

@Singleton
final class NumUnusuallyLongSyncs extends Emitter {

NumUnusuallyLongSyncs(final MetricClient client, final MetricRepository db) {
super(client, () -> {
final var count = db.numberOfJobsRunningUnusuallyLong();
client.gauge(OssMetricsRegistry.NUM_UNUSUALLY_LONG_SYNCS, count);
return null;
});
}

@Override
public Duration getDuration() {
return Duration.ofMinutes(15);
}

}

@Singleton
final class TotalScheduledSyncs extends Emitter {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,66 @@ having count(*) < 1440 / cast(c.schedule::jsonb->'units' as integer)
+ ctx.fetchOne(queryForAbnormalSyncInMinutesInLastDay).get("cnt", long.class);
}

long numberOfJobsRunningUnusuallyLong() {
// Definition of unusually long means runtime is more than 2x historic avg run time or 15
// minutes more than avg run time, whichever is greater.
// It will skip jobs with fewer than 4 runs in last week to make sure the historic avg run is
// meaningful and consistent.
final var query =
"""
-- pick average running time and last sync running time in attempts table.
select
current_running_attempts.connection_id,
current_running_attempts.running_time,
historic_avg_running_attempts.avg_run_sec
from
(
-- Sub-query-1: query the currently running attempt's running time.
(
select
jobs.scope as connection_id,
extract(epoch from age(NOW(), attempts.created_at)) as running_time
from
jobs
join attempts on
jobs.id = attempts.job_id
where
jobs.status = 'running'
and attempts.status = 'running'
and jobs.config_type = 'sync' )
as current_running_attempts
join
-- Sub-query-2: query historic attempts' average running time within last week.
(
select
jobs.scope as connection_id,
avg(extract(epoch from age(attempts.updated_at, attempts.created_at))) as avg_run_sec
from
jobs
join attempts on
jobs.id = attempts.job_id
where
-- 168 hours is 1 week: we look for all attempts in last week to calculate its average running time.
attempts.updated_at >= NOW() - interval '168 HOUR'
and jobs.status = 'succeeded'
and attempts.status = 'succeeded'
and jobs.config_type = 'sync'
group by
connection_id
having
count(*) > 4
) as historic_avg_running_attempts
on
current_running_attempts.connection_id = historic_avg_running_attempts.connection_id)
where
-- Find if currently running time takes 2x more time than average running time,
-- and it's 15 minutes (900 seconds) more than average running time so it won't alert on noises for quick sync jobs.
current_running_attempts.running_time > greatest(historic_avg_running_attempts.avg_run_sec * 2, historic_avg_running_attempts.avg_run_sec + 900)
""";
final var queryResults = ctx.fetch(query);
return queryResults.getValues("connection_id").size();
}

Map<JobStatus, Double> overallJobRuntimeForTerminalJobsInLastHour() {
final var query = """
SELECT status, extract(epoch from age(updated_at, created_at)) AS sec FROM jobs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.CONNECTION;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.WORKSPACE;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.ATTEMPTS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -23,6 +24,7 @@
import io.airbyte.db.instance.configs.jooq.generated.enums.NamespaceDefinitionType;
import io.airbyte.db.instance.configs.jooq.generated.enums.ReleaseStage;
import io.airbyte.db.instance.configs.jooq.generated.enums.StatusType;
import io.airbyte.db.instance.jobs.jooq.generated.enums.AttemptStatus;
import io.airbyte.db.instance.jobs.jooq.generated.enums.JobConfigType;
import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStatus;
import io.airbyte.db.instance.test.TestDatabaseProviders;
Expand Down Expand Up @@ -92,6 +94,7 @@ void setUp() {
ctx.truncate(ACTOR).execute();
ctx.truncate(CONNECTION).cascade().execute();
ctx.truncate(JOBS).cascade().execute();
ctx.truncate(ATTEMPTS).cascade().execute();
ctx.truncate(WORKSPACE).cascade().execute();
}

Expand Down Expand Up @@ -503,4 +506,122 @@ void shouldNotCountNormalJobsInAbnormalMetric() {

}

@Nested
class UnusuallyLongJobs {

@Test
void shouldCountInJobsWithUnusuallyLongTime() throws SQLException {
final var connectionId = UUID.randomUUID();
final var syncConfigType = JobConfigType.sync;

// Current job has been running for 12 hours while the previous 5 jobs runs 2 hours. Avg will be 2
// hours.
// Thus latest job will be counted as an unusually long-running job.
ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS, JOBS.CREATED_AT, JOBS.UPDATED_AT, JOBS.CONFIG_TYPE)
.values(100L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(28, ChronoUnit.HOURS),
OffsetDateTime.now().minus(26, ChronoUnit.HOURS), syncConfigType)
.values(1L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(20, ChronoUnit.HOURS),
OffsetDateTime.now().minus(18, ChronoUnit.HOURS), syncConfigType)
.values(2L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(18, ChronoUnit.HOURS),
OffsetDateTime.now().minus(16, ChronoUnit.HOURS), syncConfigType)
.values(3L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(16, ChronoUnit.HOURS),
OffsetDateTime.now().minus(14, ChronoUnit.HOURS), syncConfigType)
.values(4L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(14, ChronoUnit.HOURS),
OffsetDateTime.now().minus(12, ChronoUnit.HOURS), syncConfigType)
.values(5L, connectionId.toString(), JobStatus.running, OffsetDateTime.now().minus(12, ChronoUnit.HOURS),
OffsetDateTime.now().minus(12, ChronoUnit.HOURS), syncConfigType)
.execute();

ctx.insertInto(ATTEMPTS, ATTEMPTS.ID, ATTEMPTS.JOB_ID, ATTEMPTS.STATUS, ATTEMPTS.CREATED_AT, ATTEMPTS.UPDATED_AT)
.values(100L, 100L, AttemptStatus.succeeded, OffsetDateTime.now().minus(28, ChronoUnit.HOURS),
OffsetDateTime.now().minus(26, ChronoUnit.HOURS))
.values(1L, 1L, AttemptStatus.succeeded, OffsetDateTime.now().minus(20, ChronoUnit.HOURS),
OffsetDateTime.now().minus(18, ChronoUnit.HOURS))
.values(2L, 2L, AttemptStatus.succeeded, OffsetDateTime.now().minus(18, ChronoUnit.HOURS),
OffsetDateTime.now().minus(16, ChronoUnit.HOURS))
.values(3L, 3L, AttemptStatus.succeeded, OffsetDateTime.now().minus(16, ChronoUnit.HOURS),
OffsetDateTime.now().minus(14, ChronoUnit.HOURS))
.values(4L, 4L, AttemptStatus.succeeded, OffsetDateTime.now().minus(14, ChronoUnit.HOURS),
OffsetDateTime.now().minus(12, ChronoUnit.HOURS))
.values(5L, 5L, AttemptStatus.running, OffsetDateTime.now().minus(12, ChronoUnit.HOURS),
OffsetDateTime.now().minus(12, ChronoUnit.HOURS))
.execute();

final var numOfJubsRunningUnusallyLong = db.numberOfJobsRunningUnusuallyLong();
assertEquals(1, numOfJubsRunningUnusallyLong);
}

@Test
void shouldNotCountInJobsWithinFifteenMinutes() throws SQLException {
final var connectionId = UUID.randomUUID();
final var syncConfigType = JobConfigType.sync;

// Latest job runs 14 minutes while the previous 5 jobs runs average about 3 minutes.
// Despite it has been more than 2x than avg it's still within 15 minutes threshold, thus this
// shouldn't be
// counted in.
ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS, JOBS.CREATED_AT, JOBS.UPDATED_AT, JOBS.CONFIG_TYPE)
.values(100L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(28, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(26, ChronoUnit.MINUTES), syncConfigType)
.values(1L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(20, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(18, ChronoUnit.MINUTES), syncConfigType)
.values(2L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(18, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(16, ChronoUnit.MINUTES), syncConfigType)
.values(3L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(16, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(14, ChronoUnit.MINUTES), syncConfigType)
.values(4L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(14, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(2, ChronoUnit.MINUTES), syncConfigType)
.values(5L, connectionId.toString(), JobStatus.running, OffsetDateTime.now().minus(14, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(2, ChronoUnit.MINUTES), syncConfigType)
.execute();

ctx.insertInto(ATTEMPTS, ATTEMPTS.ID, ATTEMPTS.JOB_ID, ATTEMPTS.STATUS, ATTEMPTS.CREATED_AT, ATTEMPTS.UPDATED_AT)
.values(100L, 100L, AttemptStatus.succeeded, OffsetDateTime.now().minus(28, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(26, ChronoUnit.MINUTES))
.values(1L, 1L, AttemptStatus.succeeded, OffsetDateTime.now().minus(20, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(18, ChronoUnit.MINUTES))
.values(2L, 2L, AttemptStatus.succeeded, OffsetDateTime.now().minus(18, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(16, ChronoUnit.MINUTES))
.values(3L, 3L, AttemptStatus.succeeded, OffsetDateTime.now().minus(26, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(14, ChronoUnit.MINUTES))
.values(4L, 4L, AttemptStatus.succeeded, OffsetDateTime.now().minus(18, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(17, ChronoUnit.MINUTES))
.values(5L, 5L, AttemptStatus.running, OffsetDateTime.now().minus(14, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(14, ChronoUnit.MINUTES))
.execute();

final var numOfJubsRunningUnusallyLong = db.numberOfJobsRunningUnusuallyLong();
assertEquals(0, numOfJubsRunningUnusallyLong);
}

@Test
void shouldSkipInsufficientJobRuns() throws SQLException {
final var connectionId = UUID.randomUUID();
final var syncConfigType = JobConfigType.sync;

// Require at least 5 runs in last week to get meaningful average runtime.
ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS, JOBS.CREATED_AT, JOBS.UPDATED_AT, JOBS.CONFIG_TYPE)
.values(100L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(28, ChronoUnit.HOURS),
OffsetDateTime.now().minus(26, ChronoUnit.HOURS), syncConfigType)
.values(1L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(20, ChronoUnit.HOURS),
OffsetDateTime.now().minus(18, ChronoUnit.HOURS), syncConfigType)
.values(2L, connectionId.toString(), JobStatus.running, OffsetDateTime.now().minus(18, ChronoUnit.HOURS),
OffsetDateTime.now().minus(1, ChronoUnit.HOURS), syncConfigType)
.execute();

ctx.insertInto(ATTEMPTS, ATTEMPTS.ID, ATTEMPTS.JOB_ID, ATTEMPTS.STATUS, ATTEMPTS.CREATED_AT, ATTEMPTS.UPDATED_AT)
.values(100L, 100L, AttemptStatus.succeeded, OffsetDateTime.now().minus(28, ChronoUnit.HOURS),
OffsetDateTime.now().minus(26, ChronoUnit.HOURS))
.values(1L, 1L, AttemptStatus.succeeded, OffsetDateTime.now().minus(20, ChronoUnit.HOURS),
OffsetDateTime.now().minus(18, ChronoUnit.HOURS))
.values(2L, 2L, AttemptStatus.running, OffsetDateTime.now().minus(18, ChronoUnit.HOURS),
OffsetDateTime.now().minus(1, ChronoUnit.HOURS))
.execute();

final var numOfJubsRunningUnusallyLong = db.numberOfJobsRunningUnusuallyLong();
assertEquals(0, numOfJubsRunningUnusallyLong);
}

}

}
Empty file.

0 comments on commit 6f88bdb

Please sign in to comment.