Skip to content

Commit

Permalink
Remove jobPersistence dependency (#21625)
Browse files Browse the repository at this point in the history
* Remove JobPersistence from ConfigFetchActivityImpl
  • Loading branch information
alovew authored Jan 21, 2023
1 parent 680f5da commit 6026465
Show file tree
Hide file tree
Showing 8 changed files with 477 additions and 321 deletions.
30 changes: 30 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1989,6 +1989,28 @@ paths:
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/jobs/get_last_replication_job:
post:
tags:
- jobs
operationId: getLastReplicationJob
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/ConnectionIdRequestBody"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/JobOptionalRead"
"404":
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/jobs/get_light:
post:
tags:
Expand Down Expand Up @@ -3907,6 +3929,9 @@ components:
updatedAt:
type: integer
format: int64
startedAt:
type: integer
format: int64
status:
$ref: "#/components/schemas/JobStatus"
resetConfig:
Expand Down Expand Up @@ -4142,6 +4167,11 @@ components:
properties:
job:
$ref: "#/components/schemas/JobRead"
JobOptionalRead:
type: object
properties:
job:
$ref: "#/components/schemas/JobRead"
JobDebugInfoRead:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.generated.DestinationApi;
import io.airbyte.api.client.generated.JobsApi;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.generated.WorkspaceApi;
import io.airbyte.api.client.invoker.generated.ApiClient;
Expand Down Expand Up @@ -72,6 +73,11 @@ public SourceApi sourceApi(@Named("apiClient") final ApiClient apiClient) {
return new SourceApi(apiClient);
}

@Singleton
public JobsApi jobsApi(@Named("apiClient") final ApiClient apiClient) {
return new JobsApi(apiClient);
}

@Singleton
public DestinationApi destinationApi(final ApiClient apiClient) {
return new DestinationApi(apiClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@

import io.airbyte.api.generated.JobsApi;
import io.airbyte.api.model.generated.AttemptNormalizationStatusReadList;
import io.airbyte.api.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.model.generated.JobDebugInfoRead;
import io.airbyte.api.model.generated.JobIdRequestBody;
import io.airbyte.api.model.generated.JobInfoLightRead;
import io.airbyte.api.model.generated.JobInfoRead;
import io.airbyte.api.model.generated.JobListRequestBody;
import io.airbyte.api.model.generated.JobOptionalRead;
import io.airbyte.api.model.generated.JobReadList;
import io.airbyte.server.handlers.JobHistoryHandler;
import io.airbyte.server.handlers.SchedulerHandler;
Expand Down Expand Up @@ -63,6 +65,12 @@ public JobInfoLightRead getJobInfoLight(final JobIdRequestBody jobIdRequestBody)
return ApiHelper.execute(() -> jobHistoryHandler.getJobInfoLight(jobIdRequestBody));
}

@Post("/get_last_replication_job")
@Override
public JobOptionalRead getLastReplicationJob(final ConnectionIdRequestBody connectionIdRequestBody) {
return ApiHelper.execute(() -> jobHistoryHandler.getLastReplicationJob(connectionIdRequestBody));
}

@Post("/list")
@Override
public JobReadList listJobsFor(final JobListRequestBody jobListRequestBody) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airbyte.api.model.generated.JobDebugRead;
import io.airbyte.api.model.generated.JobInfoLightRead;
import io.airbyte.api.model.generated.JobInfoRead;
import io.airbyte.api.model.generated.JobOptionalRead;
import io.airbyte.api.model.generated.JobRead;
import io.airbyte.api.model.generated.JobStatus;
import io.airbyte.api.model.generated.JobWithAttemptsRead;
Expand Down Expand Up @@ -72,6 +73,10 @@ public JobInfoLightRead getJobInfoLightRead(final Job job) {
return new JobInfoLightRead().job(getJobRead(job));
}

public JobOptionalRead getJobOptionalRead(final Job job) {
return new JobOptionalRead().job(getJobRead(job));
}

public static JobDebugRead getDebugJobInfoRead(final JobInfoRead jobInfoRead,
final SourceDefinitionRead sourceDefinitionRead,
final DestinationDefinitionRead destinationDefinitionRead,
Expand Down Expand Up @@ -103,6 +108,7 @@ public static JobRead getJobRead(final Job job) {
.resetConfig(extractResetConfigIfReset(job).orElse(null))
.createdAt(job.getCreatedAtInSecond())
.updatedAt(job.getUpdatedAtInSecond())
.startedAt(job.getStartedAtInSecond().isPresent() ? job.getStartedAtInSecond().get() : null)
.status(Enums.convertTo(job.getStatus(), JobStatus.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.api.model.generated.AttemptRead;
import io.airbyte.api.model.generated.AttemptStats;
import io.airbyte.api.model.generated.AttemptStreamStats;
import io.airbyte.api.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.model.generated.ConnectionRead;
import io.airbyte.api.model.generated.DestinationDefinitionIdRequestBody;
import io.airbyte.api.model.generated.DestinationDefinitionRead;
Expand All @@ -21,6 +22,7 @@
import io.airbyte.api.model.generated.JobInfoLightRead;
import io.airbyte.api.model.generated.JobInfoRead;
import io.airbyte.api.model.generated.JobListRequestBody;
import io.airbyte.api.model.generated.JobOptionalRead;
import io.airbyte.api.model.generated.JobRead;
import io.airbyte.api.model.generated.JobReadList;
import io.airbyte.api.model.generated.JobWithAttemptsRead;
Expand Down Expand Up @@ -194,6 +196,16 @@ public JobInfoLightRead getJobInfoLight(final JobIdRequestBody jobIdRequestBody)
return jobConverter.getJobInfoLightRead(job);
}

public JobOptionalRead getLastReplicationJob(final ConnectionIdRequestBody connectionIdRequestBody) throws IOException {
Optional<Job> job = jobPersistence.getLastReplicationJob(connectionIdRequestBody.getConnectionId());
if (job.isEmpty()) {
return new JobOptionalRead();
} else {
return jobConverter.getJobOptionalRead(job.get());
}

}

public JobDebugInfoRead getJobDebugInfo(final JobIdRequestBody jobIdRequestBody)
throws ConfigNotFoundException, IOException, JsonValidationException {
final Job job = jobPersistence.getJob(jobIdRequestBody.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.google.common.annotations.VisibleForTesting;
import datadog.trace.api.Trace;
import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.generated.JobsApi;
import io.airbyte.api.client.generated.WorkspaceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody;
Expand All @@ -20,11 +21,11 @@
import io.airbyte.api.client.model.generated.ConnectionScheduleDataCron;
import io.airbyte.api.client.model.generated.ConnectionScheduleType;
import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.api.client.model.generated.JobOptionalRead;
import io.airbyte.api.client.model.generated.JobRead;
import io.airbyte.api.client.model.generated.WorkspaceRead;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.models.Job;
import io.micronaut.context.annotation.Value;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
Expand Down Expand Up @@ -61,19 +62,19 @@ public class ConfigFetchActivityImpl implements ConfigFetchActivity {
UUID.fromString("226edbc1-4a9c-4401-95a9-90435d667d9d"));
private static final long SCHEDULING_NOISE_CONSTANT = 15;

private final JobPersistence jobPersistence;
private final JobsApi jobsApi;
private final WorkspaceApi workspaceApi;
private final Integer syncJobMaxAttempts;
private final Supplier<Long> currentSecondsSupplier;
private final ConnectionApi connectionApi;

@VisibleForTesting
protected ConfigFetchActivityImpl(final JobPersistence jobPersistence,
protected ConfigFetchActivityImpl(final JobsApi jobsApi,
final WorkspaceApi workspaceApi,
@Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts,
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier,
final ConnectionApi connectionApi) {
this.jobPersistence = jobPersistence;
this.jobsApi = jobsApi;
this.workspaceApi = workspaceApi;
this.syncJobMaxAttempts = syncJobMaxAttempts;
this.currentSecondsSupplier = currentSecondsSupplier;
Expand Down Expand Up @@ -104,21 +105,22 @@ public ScheduleRetrieverOutput getTimeToWait(final ScheduleRetrieverInput input)
*
* This method consumes the `scheduleType` and `scheduleData` fields.
*/
private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final ConnectionRead connectionRead, final UUID connectionId) throws IOException {
private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final ConnectionRead connectionRead, final UUID connectionId)
throws IOException, ApiException {
if (connectionRead.getScheduleType() == ConnectionScheduleType.MANUAL || connectionRead.getStatus() != ConnectionStatus.ACTIVE) {
// Manual syncs wait for their first run
return new ScheduleRetrieverOutput(Duration.ofDays(100 * 365));
}

final Optional<Job> previousJobOptional = jobPersistence.getLastReplicationJob(connectionId);
final JobOptionalRead previousJobOptional = jobsApi.getLastReplicationJob(new ConnectionIdRequestBody().connectionId(connectionId));

if (connectionRead.getScheduleType() == ConnectionScheduleType.BASIC) {
if (previousJobOptional.isEmpty()) {
if (previousJobOptional.getJob() == null) {
// Basic schedules don't wait for their first run.
return new ScheduleRetrieverOutput(Duration.ZERO);
}
final Job previousJob = previousJobOptional.get();
final long prevRunStart = previousJob.getStartedAtInSecond().orElse(previousJob.getCreatedAtInSecond());
final long prevRunStart = previousJobOptional.getJob().getStartedAt() != null ? previousJobOptional.getJob().getStartedAt()
: previousJobOptional.getJob().getCreatedAt();
final long nextRunStart = prevRunStart + getIntervalInSecond(connectionRead.getScheduleData().getBasicSchedule());
final Duration timeToWait = Duration.ofSeconds(
Math.max(0, nextRunStart - currentSecondsSupplier.get()));
Expand All @@ -135,9 +137,10 @@ private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final ConnectionRe
// us from multiple executions for the same scheduled time, since cron only has a 1-minute
// resolution.
final long earliestNextRun = Math.max(currentSecondsSupplier.get() * MS_PER_SECOND,
(previousJobOptional.isPresent()
? previousJobOptional.get().getStartedAtInSecond().orElse(previousJobOptional.get().getCreatedAtInSecond())
+ MIN_CRON_INTERVAL_SECONDS
(previousJobOptional.getJob() != null
? previousJobOptional.getJob().getStartedAt() != null ? previousJobOptional.getJob().getStartedAt() + MIN_CRON_INTERVAL_SECONDS
: previousJobOptional.getJob().getCreatedAt()
+ MIN_CRON_INTERVAL_SECONDS
: currentSecondsSupplier.get()) * MS_PER_SECOND);
final Date nextRunStart = cronExpression.getNextValidTimeAfter(new Date(earliestNextRun));
Duration timeToWait = Duration.ofSeconds(
Expand Down Expand Up @@ -185,21 +188,22 @@ private Duration addSchedulingNoiseForAllowListedWorkspace(Duration timeToWait,
*
* This method consumes the `schedule` field.
*/
private ScheduleRetrieverOutput getTimeToWaitFromLegacy(final ConnectionRead connectionRead, final UUID connectionId) throws IOException {
private ScheduleRetrieverOutput getTimeToWaitFromLegacy(final ConnectionRead connectionRead, final UUID connectionId)
throws IOException, ApiException {
if (connectionRead.getSchedule() == null || connectionRead.getStatus() != ConnectionStatus.ACTIVE) {
// Manual syncs wait for their first run
return new ScheduleRetrieverOutput(Duration.ofDays(100 * 365));
}

final Optional<Job> previousJobOptional = jobPersistence.getLastReplicationJob(connectionId);
final JobOptionalRead previousJobOptional = jobsApi.getLastReplicationJob(new ConnectionIdRequestBody().connectionId(connectionId));

if (previousJobOptional.isEmpty() && connectionRead.getSchedule() != null) {
if (previousJobOptional.getJob() == null && connectionRead.getSchedule() != null) {
// Non-manual syncs don't wait for their first run
return new ScheduleRetrieverOutput(Duration.ZERO);
}

final Job previousJob = previousJobOptional.get();
final long prevRunStart = previousJob.getStartedAtInSecond().orElse(previousJob.getCreatedAtInSecond());
final JobRead previousJob = previousJobOptional.getJob();
final long prevRunStart = previousJob.getStartedAt() != null ? previousJob.getStartedAt() : previousJob.getCreatedAt();

final long nextRunStart = prevRunStart + getIntervalInSecond(connectionRead.getSchedule());

Expand Down
Loading

0 comments on commit 6026465

Please sign in to comment.