Skip to content

Commit

Permalink
Job Persistence Refactor in preparation for progress bar. (#18891)
Browse files Browse the repository at this point in the history
Some refactoring in preparation for the progress bar persistence changes.

The main change here was to simplify some of the JobPersistence methods by moving the logic to calculate attemptId into the JobPersistence implementation. This logic currently sits outside the class and is duplicated in multiple places. We could expose a helper method to calculate this logic, however that felt unnecessary at this point.

The alternative is further duplicating this logic as the progress bar logic is implemented, so I want to get that out of the way.

The other reason it's cleaner to use jobId and attemptNumber is these concepts/terms are more familiar throughout the rest of the codebase and it feels more intuitive to continue speaking this language (in my opinion).

Some random bits I wanted to clean up on the way as well. I will leave comments in the files as appropriate.
  • Loading branch information
davinchia authored Nov 3, 2022
1 parent 64736f0 commit aaeec26
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -344,26 +344,9 @@ public void writeOutput(final long jobId, final int attemptNumber, final JobOutp
.set(ATTEMPTS.UPDATED_AT, now)
.where(ATTEMPTS.JOB_ID.eq(jobId), ATTEMPTS.ATTEMPT_NUMBER.eq(attemptNumber))
.execute();
final Optional<Record> record =
ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number = ?", jobId,
attemptNumber).stream().findFirst();
final Long attemptId = record.get().get("id", Long.class);

ctx.insertInto(SYNC_STATS)
.set(SYNC_STATS.ID, UUID.randomUUID())
.set(SYNC_STATS.UPDATED_AT, now)
.set(SYNC_STATS.CREATED_AT, now)
.set(SYNC_STATS.ATTEMPT_ID, attemptId)
.set(SYNC_STATS.BYTES_EMITTED, syncStats.getBytesEmitted())
.set(SYNC_STATS.RECORDS_EMITTED, syncStats.getRecordsEmitted())
.set(SYNC_STATS.RECORDS_COMMITTED, syncStats.getRecordsCommitted())
.set(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED, syncStats.getSourceStateMessagesEmitted())
.set(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED, syncStats.getDestinationStateMessagesEmitted())
.set(SYNC_STATS.MAX_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMaxSecondsBeforeSourceStateMessageEmitted())
.set(SYNC_STATS.MEAN_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMeanSecondsBeforeSourceStateMessageEmitted())
.set(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted())
.set(SYNC_STATS.MEAN_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted())
.execute();
final Long attemptId = getAttemptId(jobId, attemptNumber, ctx);

writeSyncStats(now, syncStats, attemptId, ctx);

if (normalizationSummary != null) {
ctx.insertInto(NORMALIZATION_SUMMARIES)
Expand All @@ -382,6 +365,24 @@ public void writeOutput(final long jobId, final int attemptNumber, final JobOutp

}

private static void writeSyncStats(final OffsetDateTime now, final SyncStats syncStats, final Long attemptId, final DSLContext ctx) {
ctx.insertInto(SYNC_STATS)
.set(SYNC_STATS.ID, UUID.randomUUID())
.set(SYNC_STATS.UPDATED_AT, now)
.set(SYNC_STATS.CREATED_AT, now)
.set(SYNC_STATS.ATTEMPT_ID, attemptId)
.set(SYNC_STATS.BYTES_EMITTED, syncStats.getBytesEmitted())
.set(SYNC_STATS.RECORDS_EMITTED, syncStats.getRecordsEmitted())
.set(SYNC_STATS.RECORDS_COMMITTED, syncStats.getRecordsCommitted())
.set(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED, syncStats.getSourceStateMessagesEmitted())
.set(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED, syncStats.getDestinationStateMessagesEmitted())
.set(SYNC_STATS.MAX_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMaxSecondsBeforeSourceStateMessageEmitted())
.set(SYNC_STATS.MEAN_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMeanSecondsBeforeSourceStateMessageEmitted())
.set(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted())
.set(SYNC_STATS.MEAN_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted())
.execute();
}

@Override
public void writeAttemptFailureSummary(final long jobId, final int attemptNumber, final AttemptFailureSummary failureSummary) throws IOException {
final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC);
Expand All @@ -395,21 +396,34 @@ public void writeAttemptFailureSummary(final long jobId, final int attemptNumber
}

@Override
public List<SyncStats> getSyncStats(final Long attemptId) throws IOException {
public List<SyncStats> getSyncStats(final long jobId, final int attemptNumber) throws IOException {
return jobDatabase
.query(ctx -> ctx.select(DSL.asterisk()).from(DSL.table("sync_stats")).where(SYNC_STATS.ATTEMPT_ID.eq(attemptId))
.fetch(getSyncStatsRecordMapper())
.stream()
.toList());
.query(ctx -> {
final Long attemptId = getAttemptId(jobId, attemptNumber, ctx);
return ctx.select(DSL.asterisk()).from(DSL.table("sync_stats")).where(SYNC_STATS.ATTEMPT_ID.eq(attemptId))
.fetch(getSyncStatsRecordMapper())
.stream()
.toList();
});
}

@Override
public List<NormalizationSummary> getNormalizationSummary(final Long attemptId) throws IOException, JsonProcessingException {
public List<NormalizationSummary> getNormalizationSummary(final long jobId, final int attemptNumber) throws IOException {
return jobDatabase
.query(ctx -> ctx.select(DSL.asterisk()).from(NORMALIZATION_SUMMARIES).where(NORMALIZATION_SUMMARIES.ATTEMPT_ID.eq(attemptId))
.fetch(getNormalizationSummaryRecordMapper())
.stream()
.toList());
.query(ctx -> {
final Long attemptId = getAttemptId(jobId, attemptNumber, ctx);
return ctx.select(DSL.asterisk()).from(NORMALIZATION_SUMMARIES).where(NORMALIZATION_SUMMARIES.ATTEMPT_ID.eq(attemptId))
.fetch(getNormalizationSummaryRecordMapper())
.stream()
.toList();
});
}

private static Long getAttemptId(final long jobId, final int attemptNumber, final DSLContext ctx) {
final Optional<Record> record =
ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number = ?", jobId,
attemptNumber).stream().findFirst();
return record.get().get("id", Long.class);
}

private static RecordMapper<Record, SyncStats> getSyncStatsRecordMapper() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@
import java.util.stream.Stream;

/**
* TODO Introduce a locking mechanism so that no DB operation is allowed when automatic migration is
* running
* General interface methods for persistence to the Jobs database. This database is separate from
* the config database as job-related tables has an order of magnitude higher load and scale
* differently from the config tables.
*/
public interface JobPersistence {

List<SyncStats> getSyncStats(Long attemptId) throws IOException;
List<SyncStats> getSyncStats(long jobId, int attemptNumber) throws IOException;

List<NormalizationSummary> getNormalizationSummary(Long attemptId) throws IOException;
List<NormalizationSummary> getNormalizationSummary(long jobId, int attemptNumber) throws IOException;

Job getJob(long jobId) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ void testCompleteAttemptSuccess() throws IOException {

@Test
@DisplayName("Should be able to read what is written")
void testWriteOutput() throws IOException, SQLException {
void testWriteOutput() throws IOException {
final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH);
final Job created = jobPersistence.getJob(jobId);
Expand Down Expand Up @@ -285,13 +285,7 @@ void testWriteOutput() throws IOException, SQLException {
assertEquals(Optional.of(jobOutput), updated.getAttempts().get(0).getOutput());
assertNotEquals(created.getAttempts().get(0).getUpdatedAtInSecond(), updated.getAttempts().get(0).getUpdatedAtInSecond());

final Optional<Record> record =
jobDatabase.query(ctx -> ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number = ?", jobId,
attemptNumber).stream().findFirst());

final Long attemptId = record.get().get("id", Long.class);

final SyncStats storedSyncStats = jobPersistence.getSyncStats(attemptId).stream().findFirst().get();
final SyncStats storedSyncStats = jobPersistence.getSyncStats(jobId, attemptNumber).stream().findFirst().get();
assertEquals(100L, storedSyncStats.getBytesEmitted());
assertEquals(9L, storedSyncStats.getRecordsEmitted());
assertEquals(10L, storedSyncStats.getRecordsCommitted());
Expand All @@ -302,7 +296,7 @@ void testWriteOutput() throws IOException, SQLException {
assertEquals(10L, storedSyncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted());
assertEquals(3L, storedSyncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted());

final NormalizationSummary storedNormalizationSummary = jobPersistence.getNormalizationSummary(attemptId).stream().findFirst().get();
final NormalizationSummary storedNormalizationSummary = jobPersistence.getNormalizationSummary(jobId, attemptNumber).stream().findFirst().get();
assertEquals(10L, storedNormalizationSummary.getStartTime());
assertEquals(500L, storedNormalizationSummary.getEndTime());
assertEquals(List.of(failureReason1, failureReason2), storedNormalizationSummary.getFailures());
Expand Down

0 comments on commit aaeec26

Please sign in to comment.