From aaeec26de3579e382cda90a1d13b30a8960c1bb1 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Thu, 3 Nov 2022 14:40:16 -0700 Subject: [PATCH] Job Persistence Refactor in preparation for progress bar. (#18891) Some refactoring in preparation for the progress bar persistence changes. The main change here was to simplify some of the JobPersistence methods by moving the logic to calculate attemptId into the JobPersistence implementation. This logic currently sits outside the class and is duplicated in multiple places. We could expose a helper method to calculate this logic, however that felt unnecessary at this point. The alternative is further duplicating this logic as the progress bar logic is implemented, so I want to get that out of the way. The other reason it's cleaner to use jobId and attemptNumber is these concepts/terms are more familiar throughout the rest of the codebase and it feels more intuitive to continue speaking this language (in my opinion). Some random bits I wanted to clean up on the way as well. I will leave comments in the files as appropriate. --- .../job/DefaultJobPersistence.java | 74 +++++++++++-------- .../persistence/job/JobPersistence.java | 9 ++- .../job/DefaultJobPersistenceTest.java | 12 +-- 3 files changed, 52 insertions(+), 43 deletions(-) diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java index 1f5f5b2a2381..2f31c3020b21 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java @@ -344,26 +344,9 @@ public void writeOutput(final long jobId, final int attemptNumber, final JobOutp .set(ATTEMPTS.UPDATED_AT, now) .where(ATTEMPTS.JOB_ID.eq(jobId), ATTEMPTS.ATTEMPT_NUMBER.eq(attemptNumber)) .execute(); - final Optional record = - ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number = ?", jobId, - attemptNumber).stream().findFirst(); - final Long attemptId = record.get().get("id", Long.class); - - ctx.insertInto(SYNC_STATS) - .set(SYNC_STATS.ID, UUID.randomUUID()) - .set(SYNC_STATS.UPDATED_AT, now) - .set(SYNC_STATS.CREATED_AT, now) - .set(SYNC_STATS.ATTEMPT_ID, attemptId) - .set(SYNC_STATS.BYTES_EMITTED, syncStats.getBytesEmitted()) - .set(SYNC_STATS.RECORDS_EMITTED, syncStats.getRecordsEmitted()) - .set(SYNC_STATS.RECORDS_COMMITTED, syncStats.getRecordsCommitted()) - .set(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED, syncStats.getSourceStateMessagesEmitted()) - .set(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED, syncStats.getDestinationStateMessagesEmitted()) - .set(SYNC_STATS.MAX_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMaxSecondsBeforeSourceStateMessageEmitted()) - .set(SYNC_STATS.MEAN_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMeanSecondsBeforeSourceStateMessageEmitted()) - .set(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted()) - .set(SYNC_STATS.MEAN_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted()) - .execute(); + final Long attemptId = getAttemptId(jobId, attemptNumber, ctx); + + writeSyncStats(now, syncStats, attemptId, ctx); if (normalizationSummary != null) { ctx.insertInto(NORMALIZATION_SUMMARIES) @@ -382,6 +365,24 @@ public void writeOutput(final long jobId, final int attemptNumber, final JobOutp } + private static void writeSyncStats(final OffsetDateTime now, final SyncStats syncStats, final Long attemptId, final DSLContext ctx) { + ctx.insertInto(SYNC_STATS) + .set(SYNC_STATS.ID, UUID.randomUUID()) + .set(SYNC_STATS.UPDATED_AT, now) + .set(SYNC_STATS.CREATED_AT, now) + .set(SYNC_STATS.ATTEMPT_ID, attemptId) + .set(SYNC_STATS.BYTES_EMITTED, syncStats.getBytesEmitted()) + .set(SYNC_STATS.RECORDS_EMITTED, syncStats.getRecordsEmitted()) + .set(SYNC_STATS.RECORDS_COMMITTED, syncStats.getRecordsCommitted()) + .set(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED, syncStats.getSourceStateMessagesEmitted()) + .set(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED, syncStats.getDestinationStateMessagesEmitted()) + .set(SYNC_STATS.MAX_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMaxSecondsBeforeSourceStateMessageEmitted()) + .set(SYNC_STATS.MEAN_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMeanSecondsBeforeSourceStateMessageEmitted()) + .set(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted()) + .set(SYNC_STATS.MEAN_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted()) + .execute(); + } + @Override public void writeAttemptFailureSummary(final long jobId, final int attemptNumber, final AttemptFailureSummary failureSummary) throws IOException { final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC); @@ -395,21 +396,34 @@ public void writeAttemptFailureSummary(final long jobId, final int attemptNumber } @Override - public List getSyncStats(final Long attemptId) throws IOException { + public List getSyncStats(final long jobId, final int attemptNumber) throws IOException { return jobDatabase - .query(ctx -> ctx.select(DSL.asterisk()).from(DSL.table("sync_stats")).where(SYNC_STATS.ATTEMPT_ID.eq(attemptId)) - .fetch(getSyncStatsRecordMapper()) - .stream() - .toList()); + .query(ctx -> { + final Long attemptId = getAttemptId(jobId, attemptNumber, ctx); + return ctx.select(DSL.asterisk()).from(DSL.table("sync_stats")).where(SYNC_STATS.ATTEMPT_ID.eq(attemptId)) + .fetch(getSyncStatsRecordMapper()) + .stream() + .toList(); + }); } @Override - public List getNormalizationSummary(final Long attemptId) throws IOException, JsonProcessingException { + public List getNormalizationSummary(final long jobId, final int attemptNumber) throws IOException { return jobDatabase - .query(ctx -> ctx.select(DSL.asterisk()).from(NORMALIZATION_SUMMARIES).where(NORMALIZATION_SUMMARIES.ATTEMPT_ID.eq(attemptId)) - .fetch(getNormalizationSummaryRecordMapper()) - .stream() - .toList()); + .query(ctx -> { + final Long attemptId = getAttemptId(jobId, attemptNumber, ctx); + return ctx.select(DSL.asterisk()).from(NORMALIZATION_SUMMARIES).where(NORMALIZATION_SUMMARIES.ATTEMPT_ID.eq(attemptId)) + .fetch(getNormalizationSummaryRecordMapper()) + .stream() + .toList(); + }); + } + + private static Long getAttemptId(final long jobId, final int attemptNumber, final DSLContext ctx) { + final Optional record = + ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number = ?", jobId, + attemptNumber).stream().findFirst(); + return record.get().get("id", Long.class); } private static RecordMapper getSyncStatsRecordMapper() { diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java index e5b389cae2f4..91f3905ca6ca 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java @@ -29,14 +29,15 @@ import java.util.stream.Stream; /** - * TODO Introduce a locking mechanism so that no DB operation is allowed when automatic migration is - * running + * General interface methods for persistence to the Jobs database. This database is separate from + * the config database as job-related tables has an order of magnitude higher load and scale + * differently from the config tables. */ public interface JobPersistence { - List getSyncStats(Long attemptId) throws IOException; + List getSyncStats(long jobId, int attemptNumber) throws IOException; - List getNormalizationSummary(Long attemptId) throws IOException; + List getNormalizationSummary(long jobId, int attemptNumber) throws IOException; Job getJob(long jobId) throws IOException; diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java index 06cdf5c4a2c5..b41c691fbbc8 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java @@ -257,7 +257,7 @@ void testCompleteAttemptSuccess() throws IOException { @Test @DisplayName("Should be able to read what is written") - void testWriteOutput() throws IOException, SQLException { + void testWriteOutput() throws IOException { final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH); final Job created = jobPersistence.getJob(jobId); @@ -285,13 +285,7 @@ void testWriteOutput() throws IOException, SQLException { assertEquals(Optional.of(jobOutput), updated.getAttempts().get(0).getOutput()); assertNotEquals(created.getAttempts().get(0).getUpdatedAtInSecond(), updated.getAttempts().get(0).getUpdatedAtInSecond()); - final Optional record = - jobDatabase.query(ctx -> ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number = ?", jobId, - attemptNumber).stream().findFirst()); - - final Long attemptId = record.get().get("id", Long.class); - - final SyncStats storedSyncStats = jobPersistence.getSyncStats(attemptId).stream().findFirst().get(); + final SyncStats storedSyncStats = jobPersistence.getSyncStats(jobId, attemptNumber).stream().findFirst().get(); assertEquals(100L, storedSyncStats.getBytesEmitted()); assertEquals(9L, storedSyncStats.getRecordsEmitted()); assertEquals(10L, storedSyncStats.getRecordsCommitted()); @@ -302,7 +296,7 @@ void testWriteOutput() throws IOException, SQLException { assertEquals(10L, storedSyncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted()); assertEquals(3L, storedSyncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted()); - final NormalizationSummary storedNormalizationSummary = jobPersistence.getNormalizationSummary(attemptId).stream().findFirst().get(); + final NormalizationSummary storedNormalizationSummary = jobPersistence.getNormalizationSummary(jobId, attemptNumber).stream().findFirst().get(); assertEquals(10L, storedNormalizationSummary.getStartTime()); assertEquals(500L, storedNormalizationSummary.getEndTime()); assertEquals(List.of(failureReason1, failureReason2), storedNormalizationSummary.getFailures());