Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove jobPersistence dependency #21625

Merged
merged 4 commits into from
Jan 21, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1989,6 +1989,28 @@ paths:
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/jobs/get_last_replication_job:
post:
tags:
- jobs
operationId: getLastReplicationJob
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/ConnectionIdRequestBody"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/JobOptionalRead"
"404":
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/jobs/get_light:
post:
tags:
Expand Down Expand Up @@ -3907,6 +3929,9 @@ components:
updatedAt:
type: integer
format: int64
startedAt:
type: integer
format: int64
status:
$ref: "#/components/schemas/JobStatus"
resetConfig:
Expand Down Expand Up @@ -4142,6 +4167,11 @@ components:
properties:
job:
$ref: "#/components/schemas/JobRead"
JobOptionalRead:
type: object
properties:
job:
$ref: "#/components/schemas/JobRead"
JobDebugInfoRead:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@

import io.airbyte.api.generated.JobsApi;
import io.airbyte.api.model.generated.AttemptNormalizationStatusReadList;
import io.airbyte.api.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.model.generated.JobDebugInfoRead;
import io.airbyte.api.model.generated.JobIdRequestBody;
import io.airbyte.api.model.generated.JobInfoLightRead;
import io.airbyte.api.model.generated.JobInfoRead;
import io.airbyte.api.model.generated.JobListRequestBody;
import io.airbyte.api.model.generated.JobOptionalRead;
import io.airbyte.api.model.generated.JobReadList;
import io.airbyte.server.handlers.JobHistoryHandler;
import io.airbyte.server.handlers.SchedulerHandler;
Expand Down Expand Up @@ -63,6 +65,12 @@ public JobInfoLightRead getJobInfoLight(final JobIdRequestBody jobIdRequestBody)
return ApiHelper.execute(() -> jobHistoryHandler.getJobInfoLight(jobIdRequestBody));
}

@Post("/get_last_replication_job")
@Override
public JobOptionalRead getLastReplicationJob(final ConnectionIdRequestBody connectionIdRequestBody) {
return ApiHelper.execute(() -> jobHistoryHandler.getLastReplicationJob(connectionIdRequestBody));
}

@Post("/list")
@Override
public JobReadList listJobsFor(final JobListRequestBody jobListRequestBody) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airbyte.api.model.generated.JobDebugRead;
import io.airbyte.api.model.generated.JobInfoLightRead;
import io.airbyte.api.model.generated.JobInfoRead;
import io.airbyte.api.model.generated.JobOptionalRead;
import io.airbyte.api.model.generated.JobRead;
import io.airbyte.api.model.generated.JobStatus;
import io.airbyte.api.model.generated.JobWithAttemptsRead;
Expand Down Expand Up @@ -72,6 +73,10 @@ public JobInfoLightRead getJobInfoLightRead(final Job job) {
return new JobInfoLightRead().job(getJobRead(job));
}

public JobOptionalRead getJobOptionalRead(final Job job) {
return new JobOptionalRead().job(getJobRead(job));
}

public static JobDebugRead getDebugJobInfoRead(final JobInfoRead jobInfoRead,
final SourceDefinitionRead sourceDefinitionRead,
final DestinationDefinitionRead destinationDefinitionRead,
Expand Down Expand Up @@ -103,6 +108,7 @@ public static JobRead getJobRead(final Job job) {
.resetConfig(extractResetConfigIfReset(job).orElse(null))
.createdAt(job.getCreatedAtInSecond())
.updatedAt(job.getUpdatedAtInSecond())
.startedAt(job.getStartedAtInSecond().isPresent() ? job.getStartedAtInSecond().get() : null)
.status(Enums.convertTo(job.getStatus(), JobStatus.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.api.model.generated.AttemptRead;
import io.airbyte.api.model.generated.AttemptStats;
import io.airbyte.api.model.generated.AttemptStreamStats;
import io.airbyte.api.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.model.generated.ConnectionRead;
import io.airbyte.api.model.generated.DestinationDefinitionIdRequestBody;
import io.airbyte.api.model.generated.DestinationDefinitionRead;
Expand All @@ -21,6 +22,7 @@
import io.airbyte.api.model.generated.JobInfoLightRead;
import io.airbyte.api.model.generated.JobInfoRead;
import io.airbyte.api.model.generated.JobListRequestBody;
import io.airbyte.api.model.generated.JobOptionalRead;
import io.airbyte.api.model.generated.JobRead;
import io.airbyte.api.model.generated.JobReadList;
import io.airbyte.api.model.generated.JobWithAttemptsRead;
Expand Down Expand Up @@ -194,6 +196,16 @@ public JobInfoLightRead getJobInfoLight(final JobIdRequestBody jobIdRequestBody)
return jobConverter.getJobInfoLightRead(job);
}

public JobOptionalRead getLastReplicationJob(final ConnectionIdRequestBody connectionIdRequestBody) throws IOException {
Optional<Job> job = jobPersistence.getLastReplicationJob(connectionIdRequestBody.getConnectionId());
if (job.isEmpty()) {
return new JobOptionalRead();
} else {
return jobConverter.getJobOptionalRead(job.get());
}

}

public JobDebugInfoRead getJobDebugInfo(final JobIdRequestBody jobIdRequestBody)
throws ConfigNotFoundException, IOException, JsonValidationException {
final Job job = jobPersistence.getJob(jobIdRequestBody.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.google.common.annotations.VisibleForTesting;
import datadog.trace.api.Trace;
import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.generated.JobsApi;
import io.airbyte.api.client.generated.WorkspaceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody;
Expand All @@ -20,11 +21,11 @@
import io.airbyte.api.client.model.generated.ConnectionScheduleDataCron;
import io.airbyte.api.client.model.generated.ConnectionScheduleType;
import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.api.client.model.generated.JobOptionalRead;
import io.airbyte.api.client.model.generated.JobRead;
import io.airbyte.api.client.model.generated.WorkspaceRead;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.models.Job;
import io.micronaut.context.annotation.Value;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
Expand Down Expand Up @@ -61,19 +62,19 @@ public class ConfigFetchActivityImpl implements ConfigFetchActivity {
UUID.fromString("226edbc1-4a9c-4401-95a9-90435d667d9d"));
private static final long SCHEDULING_NOISE_CONSTANT = 15;

private final JobPersistence jobPersistence;
private final JobsApi jobsApi;
private final WorkspaceApi workspaceApi;
private final Integer syncJobMaxAttempts;
private final Supplier<Long> currentSecondsSupplier;
private final ConnectionApi connectionApi;

@VisibleForTesting
protected ConfigFetchActivityImpl(final JobPersistence jobPersistence,
protected ConfigFetchActivityImpl(final JobsApi jobsApi,
final WorkspaceApi workspaceApi,
@Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts,
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier,
final ConnectionApi connectionApi) {
this.jobPersistence = jobPersistence;
this.jobsApi = jobsApi;
this.workspaceApi = workspaceApi;
this.syncJobMaxAttempts = syncJobMaxAttempts;
this.currentSecondsSupplier = currentSecondsSupplier;
Expand Down Expand Up @@ -104,21 +105,22 @@ public ScheduleRetrieverOutput getTimeToWait(final ScheduleRetrieverInput input)
*
* This method consumes the `scheduleType` and `scheduleData` fields.
*/
private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final ConnectionRead connectionRead, final UUID connectionId) throws IOException {
private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final ConnectionRead connectionRead, final UUID connectionId)
throws IOException, ApiException {
if (connectionRead.getScheduleType() == ConnectionScheduleType.MANUAL || connectionRead.getStatus() != ConnectionStatus.ACTIVE) {
// Manual syncs wait for their first run
return new ScheduleRetrieverOutput(Duration.ofDays(100 * 365));
}

final Optional<Job> previousJobOptional = jobPersistence.getLastReplicationJob(connectionId);
final JobOptionalRead previousJobOptional = jobsApi.getLastReplicationJob(new ConnectionIdRequestBody().connectionId(connectionId));

if (connectionRead.getScheduleType() == ConnectionScheduleType.BASIC) {
if (previousJobOptional.isEmpty()) {
if (previousJobOptional.getJob() == null) {
// Basic schedules don't wait for their first run.
return new ScheduleRetrieverOutput(Duration.ZERO);
}
final Job previousJob = previousJobOptional.get();
final long prevRunStart = previousJob.getStartedAtInSecond().orElse(previousJob.getCreatedAtInSecond());
final long prevRunStart = previousJobOptional.getJob().getStartedAt() != null ? previousJobOptional.getJob().getStartedAt()
: previousJobOptional.getJob().getCreatedAt();
final long nextRunStart = prevRunStart + getIntervalInSecond(connectionRead.getScheduleData().getBasicSchedule());
final Duration timeToWait = Duration.ofSeconds(
Math.max(0, nextRunStart - currentSecondsSupplier.get()));
Expand All @@ -135,9 +137,10 @@ private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final ConnectionRe
// us from multiple executions for the same scheduled time, since cron only has a 1-minute
// resolution.
final long earliestNextRun = Math.max(currentSecondsSupplier.get() * MS_PER_SECOND,
(previousJobOptional.isPresent()
? previousJobOptional.get().getStartedAtInSecond().orElse(previousJobOptional.get().getCreatedAtInSecond())
+ MIN_CRON_INTERVAL_SECONDS
(previousJobOptional.getJob() != null
? previousJobOptional.getJob().getStartedAt() != null ? previousJobOptional.getJob().getStartedAt() + MIN_CRON_INTERVAL_SECONDS
: previousJobOptional.getJob().getCreatedAt()
+ MIN_CRON_INTERVAL_SECONDS
: currentSecondsSupplier.get()) * MS_PER_SECOND);
final Date nextRunStart = cronExpression.getNextValidTimeAfter(new Date(earliestNextRun));
Duration timeToWait = Duration.ofSeconds(
Expand Down Expand Up @@ -185,21 +188,22 @@ private Duration addSchedulingNoiseForAllowListedWorkspace(Duration timeToWait,
*
* This method consumes the `schedule` field.
*/
private ScheduleRetrieverOutput getTimeToWaitFromLegacy(final ConnectionRead connectionRead, final UUID connectionId) throws IOException {
private ScheduleRetrieverOutput getTimeToWaitFromLegacy(final ConnectionRead connectionRead, final UUID connectionId)
throws IOException, ApiException {
if (connectionRead.getSchedule() == null || connectionRead.getStatus() != ConnectionStatus.ACTIVE) {
// Manual syncs wait for their first run
return new ScheduleRetrieverOutput(Duration.ofDays(100 * 365));
}

final Optional<Job> previousJobOptional = jobPersistence.getLastReplicationJob(connectionId);
final JobOptionalRead previousJobOptional = jobsApi.getLastReplicationJob(new ConnectionIdRequestBody().connectionId(connectionId));

if (previousJobOptional.isEmpty() && connectionRead.getSchedule() != null) {
if (previousJobOptional.getJob() == null && connectionRead.getSchedule() != null) {
// Non-manual syncs don't wait for their first run
return new ScheduleRetrieverOutput(Duration.ZERO);
}

final Job previousJob = previousJobOptional.get();
final long prevRunStart = previousJob.getStartedAtInSecond().orElse(previousJob.getCreatedAtInSecond());
final JobRead previousJob = previousJobOptional.getJob();
final long prevRunStart = previousJob.getStartedAt() != null ? previousJob.getStartedAt() : previousJob.getCreatedAt();

final long nextRunStart = prevRunStart + getIntervalInSecond(connectionRead.getSchedule());

Expand Down
Loading