Skip to content

Commit

Permalink
Job history purging (#4575)
Browse files Browse the repository at this point in the history
* WIP: Job history purging

* Created test cases that handle variations of job history purging configuration

* Typo fix

* Expanded test cases to control for job history on multiple connections at once.

* Handle latest job with saved state correctly regardless of order of ids

* Whitespace

* Externalized sql. Cleaned up constants.

* Cleaned up test case persistence code and structure

* Whitespace and formatting per standard tooling.
  • Loading branch information
airbyte-jenny authored Jul 9, 2021
1 parent 0aed33e commit 902587c
Show file tree
Hide file tree
Showing 5 changed files with 321 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public void start() throws IOException {
() -> {
MDC.setContextMap(mdc);
jobCleaner.run();
jobPersistence.purgeJobHistory();
},
CLEANING_DELAY.toSeconds(),
CLEANING_DELAY.toSeconds(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.collect.Sets;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.text.Names;
import io.airbyte.commons.text.Sqls;
import io.airbyte.commons.version.AirbyteVersion;
Expand Down Expand Up @@ -82,6 +83,11 @@

public class DefaultJobPersistence implements JobPersistence {

// not static because job history test case manipulates these.
private final int JOB_HISTORY_MINIMUM_AGE_IN_DAYS;
private final int JOB_HISTORY_MINIMUM_RECENCY;
private final int JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS;

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultJobPersistence.class);
private static final Set<String> SYSTEM_SCHEMA = Set
.of("pg_toast", "information_schema", "pg_catalog", "import_backup", "pg_internal",
Expand Down Expand Up @@ -119,13 +125,20 @@ public class DefaultJobPersistence implements JobPersistence {
private final Supplier<Instant> timeSupplier;

@VisibleForTesting
DefaultJobPersistence(Database database, Supplier<Instant> timeSupplier) {
DefaultJobPersistence(Database database,
Supplier<Instant> timeSupplier,
int minimumAgeInDays,
int excessiveNumberOfJobs,
int minimumRecencyCount) {
this.database = new ExceptionWrappingDatabase(database);
this.timeSupplier = timeSupplier;
JOB_HISTORY_MINIMUM_AGE_IN_DAYS = minimumAgeInDays;
JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS = excessiveNumberOfJobs;
JOB_HISTORY_MINIMUM_RECENCY = minimumRecencyCount;
}

public DefaultJobPersistence(Database database) {
this(database, Instant::now);
this(database, Instant::now, 30, 500, 10);
}

@Override
Expand Down Expand Up @@ -506,6 +519,26 @@ private List<String> listTables(final String schema) throws IOException {
}
}

@Override
public void purgeJobHistory() {
purgeJobHistory(LocalDateTime.now());
}

@VisibleForTesting
public void purgeJobHistory(LocalDateTime asOfDate) {
try {
String JOB_HISTORY_PURGE_SQL = MoreResources.readResource("job_history_purge.sql");
// interval '?' days cannot use a ? bind, so we're using %d instead.
String sql = String.format(JOB_HISTORY_PURGE_SQL, (JOB_HISTORY_MINIMUM_AGE_IN_DAYS - 1));
final Integer rows = database.query(ctx -> ctx.execute(sql,
asOfDate.format(DateTimeFormatter.ofPattern("YYYY-MM-dd")),
JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS,
JOB_HISTORY_MINIMUM_RECENCY));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private List<String> listAllTables(final String schema) throws IOException {
if (schema != null) {
return database.query(context -> context.meta().getSchemas(schema).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,11 @@ public interface JobPersistence {
*/
void importDatabase(String airbyteVersion, Map<DatabaseSchema, Stream<JsonNode>> data) throws IOException;

/**
* Purges job history while ensuring that the latest saved-state information is maintained.
*
* @throws IOException
*/
void purgeJobHistory();

}
100 changes: 100 additions & 0 deletions airbyte-scheduler/persistence/src/main/resources/job_history_purge.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
DELETE
FROM
jobs
WHERE
jobs.id IN(
SELECT
jobs.id
FROM
jobs
LEFT JOIN(
SELECT
SCOPE,
COUNT( jobs.id ) AS jobCount
FROM
jobs
GROUP BY
SCOPE
) counts ON
jobs.scope = counts.scope
WHERE
-- job must be at least MINIMUM_AGE_IN_DAYS old or connection has more than EXCESSIVE_NUMBER_OF_JOBS
(
jobs.created_at <(
TO_TIMESTAMP(
?,
'YYYY-MM-DD'
)- INTERVAL '%d' DAY
)
OR counts.jobCount >?
)
AND jobs.id NOT IN(
-- cannot be the most recent job with saved state
SELECT
job_id AS latest_job_id_with_state
FROM
(
SELECT
jobs.scope,
jobs.id AS job_id,
jobs.config_type,
jobs.created_at,
jobs.status,
bool_or(
attempts."output" -> 'sync' -> 'state' -> 'state' IS NOT NULL
) AS outputStateExists,
ROW_NUMBER() OVER(
PARTITION BY SCOPE
ORDER BY
jobs.created_at DESC,
jobs.id DESC
) AS stateRecency
FROM
jobs
LEFT JOIN attempts ON
jobs.id = attempts.job_id
GROUP BY
SCOPE,
jobs.id
HAVING
bool_or(
attempts."output" -> 'sync' -> 'state' -> 'state' IS NOT NULL
)= TRUE
ORDER BY
SCOPE,
jobs.created_at DESC,
jobs.id DESC
) jobs_with_state
WHERE
stateRecency = 1
)
AND jobs.id NOT IN(
-- cannot be one of the last MINIMUM_RECENCY jobs for that connection/scope
SELECT
id
FROM
(
SELECT
jobs.scope,
jobs.id,
jobs.created_at,
ROW_NUMBER() OVER(
PARTITION BY SCOPE
ORDER BY
jobs.created_at DESC,
jobs.id DESC
) AS recency
FROM
jobs
GROUP BY
SCOPE,
jobs.id
ORDER BY
SCOPE,
jobs.created_at DESC,
jobs.id DESC
) jobs_by_recency
WHERE
recency <=?
)
)
Loading

0 comments on commit 902587c

Please sign in to comment.