From adb3a845cd114bbcc00cda32bb1419d46b57608d Mon Sep 17 00:00:00 2001 From: alovew Date: Thu, 19 Jan 2023 13:33:39 -0800 Subject: [PATCH 1/3] Remove JobPersistence from ConfigFetchActivityImpl --- airbyte-api/src/main/openapi/config.yaml | 30 + .../server/apis/JobsApiController.java | 8 + .../server/converters/JobConverter.java | 6 + .../server/handlers/JobHistoryHandler.java | 12 + .../activities/ConfigFetchActivityImpl.java | 40 +- .../api/generated-api-html/index.html | 629 ++++++++++-------- 6 files changed, 434 insertions(+), 291 deletions(-) diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index 79ec520942b2..2107c31066f7 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -1989,6 +1989,28 @@ paths: $ref: "#/components/responses/NotFoundResponse" "422": $ref: "#/components/responses/InvalidInputResponse" + /v1/jobs/get_last_replication_job: + post: + tags: + - jobs + operationId: getLastReplicationJob + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/ConnectionIdRequestBody" + required: true + responses: + "200": + description: Successful operation + content: + application/json: + schema: + $ref: "#/components/schemas/JobOptionalRead" + "404": + $ref: "#/components/responses/NotFoundResponse" + "422": + $ref: "#/components/responses/InvalidInputResponse" /v1/jobs/get_light: post: tags: @@ -3907,6 +3929,9 @@ components: updatedAt: type: integer format: int64 + startedAt: + type: integer + format: int64 status: $ref: "#/components/schemas/JobStatus" resetConfig: @@ -4142,6 +4167,11 @@ components: properties: job: $ref: "#/components/schemas/JobRead" + JobOptionalRead: + type: object + properties: + job: + $ref: "#/components/schemas/JobRead" JobDebugInfoRead: type: object required: diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/JobsApiController.java b/airbyte-server/src/main/java/io/airbyte/server/apis/JobsApiController.java index f09d331038a2..9fadce045d05 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/JobsApiController.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/JobsApiController.java @@ -6,11 +6,13 @@ import io.airbyte.api.generated.JobsApi; import io.airbyte.api.model.generated.AttemptNormalizationStatusReadList; +import io.airbyte.api.model.generated.ConnectionIdRequestBody; import io.airbyte.api.model.generated.JobDebugInfoRead; import io.airbyte.api.model.generated.JobIdRequestBody; import io.airbyte.api.model.generated.JobInfoLightRead; import io.airbyte.api.model.generated.JobInfoRead; import io.airbyte.api.model.generated.JobListRequestBody; +import io.airbyte.api.model.generated.JobOptionalRead; import io.airbyte.api.model.generated.JobReadList; import io.airbyte.server.handlers.JobHistoryHandler; import io.airbyte.server.handlers.SchedulerHandler; @@ -63,6 +65,12 @@ public JobInfoLightRead getJobInfoLight(final JobIdRequestBody jobIdRequestBody) return ApiHelper.execute(() -> jobHistoryHandler.getJobInfoLight(jobIdRequestBody)); } + @Post("/get_last_replication_job") + @Override + public JobOptionalRead getLastReplicationJob(final ConnectionIdRequestBody connectionIdRequestBody) { + return ApiHelper.execute(() -> jobHistoryHandler.getLastReplicationJob(connectionIdRequestBody)); + } + @Post("/list") @Override public JobReadList listJobsFor(final JobListRequestBody jobListRequestBody) { diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java index 289d1458003e..9a3d479749e0 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java @@ -19,6 +19,7 @@ import io.airbyte.api.model.generated.JobDebugRead; import io.airbyte.api.model.generated.JobInfoLightRead; import io.airbyte.api.model.generated.JobInfoRead; +import io.airbyte.api.model.generated.JobOptionalRead; import io.airbyte.api.model.generated.JobRead; import io.airbyte.api.model.generated.JobStatus; import io.airbyte.api.model.generated.JobWithAttemptsRead; @@ -72,6 +73,10 @@ public JobInfoLightRead getJobInfoLightRead(final Job job) { return new JobInfoLightRead().job(getJobRead(job)); } + public JobOptionalRead getJobOptionalRead(final Job job) { + return new JobOptionalRead().job(getJobRead(job)); + } + public static JobDebugRead getDebugJobInfoRead(final JobInfoRead jobInfoRead, final SourceDefinitionRead sourceDefinitionRead, final DestinationDefinitionRead destinationDefinitionRead, @@ -103,6 +108,7 @@ public static JobRead getJobRead(final Job job) { .resetConfig(extractResetConfigIfReset(job).orElse(null)) .createdAt(job.getCreatedAtInSecond()) .updatedAt(job.getUpdatedAtInSecond()) + .startedAt(job.getStartedAtInSecond().isPresent() ? job.getStartedAtInSecond().get() : null) .status(Enums.convertTo(job.getStatus(), JobStatus.class)); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java index d12fdad8a77c..579e0b4672a4 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java @@ -10,6 +10,7 @@ import io.airbyte.api.model.generated.AttemptRead; import io.airbyte.api.model.generated.AttemptStats; import io.airbyte.api.model.generated.AttemptStreamStats; +import io.airbyte.api.model.generated.ConnectionIdRequestBody; import io.airbyte.api.model.generated.ConnectionRead; import io.airbyte.api.model.generated.DestinationDefinitionIdRequestBody; import io.airbyte.api.model.generated.DestinationDefinitionRead; @@ -21,6 +22,7 @@ import io.airbyte.api.model.generated.JobInfoLightRead; import io.airbyte.api.model.generated.JobInfoRead; import io.airbyte.api.model.generated.JobListRequestBody; +import io.airbyte.api.model.generated.JobOptionalRead; import io.airbyte.api.model.generated.JobRead; import io.airbyte.api.model.generated.JobReadList; import io.airbyte.api.model.generated.JobWithAttemptsRead; @@ -194,6 +196,16 @@ public JobInfoLightRead getJobInfoLight(final JobIdRequestBody jobIdRequestBody) return jobConverter.getJobInfoLightRead(job); } + public JobOptionalRead getLastReplicationJob(final ConnectionIdRequestBody connectionIdRequestBody) throws IOException { + Optional job = jobPersistence.getLastReplicationJob(connectionIdRequestBody.getConnectionId()); + if (job.isEmpty()) { + return new JobOptionalRead(); + } else { + return jobConverter.getJobOptionalRead(job.get()); + } + + } + public JobDebugInfoRead getJobDebugInfo(final JobIdRequestBody jobIdRequestBody) throws ConfigNotFoundException, IOException, JsonValidationException { final Job job = jobPersistence.getJob(jobIdRequestBody.getId()); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java index 74ee4d751b9b..6e935af243a2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java @@ -10,6 +10,7 @@ import com.google.common.annotations.VisibleForTesting; import datadog.trace.api.Trace; import io.airbyte.api.client.generated.ConnectionApi; +import io.airbyte.api.client.generated.JobsApi; import io.airbyte.api.client.generated.WorkspaceApi; import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; @@ -20,11 +21,11 @@ import io.airbyte.api.client.model.generated.ConnectionScheduleDataCron; import io.airbyte.api.client.model.generated.ConnectionScheduleType; import io.airbyte.api.client.model.generated.ConnectionStatus; +import io.airbyte.api.client.model.generated.JobOptionalRead; +import io.airbyte.api.client.model.generated.JobRead; import io.airbyte.api.client.model.generated.WorkspaceRead; import io.airbyte.commons.temporal.exception.RetryableException; import io.airbyte.metrics.lib.ApmTraceUtils; -import io.airbyte.persistence.job.JobPersistence; -import io.airbyte.persistence.job.models.Job; import io.micronaut.context.annotation.Value; import jakarta.inject.Named; import jakarta.inject.Singleton; @@ -61,19 +62,19 @@ public class ConfigFetchActivityImpl implements ConfigFetchActivity { UUID.fromString("226edbc1-4a9c-4401-95a9-90435d667d9d")); private static final long SCHEDULING_NOISE_CONSTANT = 15; - private final JobPersistence jobPersistence; + private final JobsApi jobsApi; private final WorkspaceApi workspaceApi; private final Integer syncJobMaxAttempts; private final Supplier currentSecondsSupplier; private final ConnectionApi connectionApi; @VisibleForTesting - protected ConfigFetchActivityImpl(final JobPersistence jobPersistence, + protected ConfigFetchActivityImpl(final JobsApi jobsApi, final WorkspaceApi workspaceApi, @Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts, @Named("currentSecondsSupplier") final Supplier currentSecondsSupplier, final ConnectionApi connectionApi) { - this.jobPersistence = jobPersistence; + this.jobsApi = jobsApi; this.workspaceApi = workspaceApi; this.syncJobMaxAttempts = syncJobMaxAttempts; this.currentSecondsSupplier = currentSecondsSupplier; @@ -104,21 +105,22 @@ public ScheduleRetrieverOutput getTimeToWait(final ScheduleRetrieverInput input) * * This method consumes the `scheduleType` and `scheduleData` fields. */ - private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final ConnectionRead connectionRead, final UUID connectionId) throws IOException { + private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final ConnectionRead connectionRead, final UUID connectionId) + throws IOException, ApiException { if (connectionRead.getScheduleType() == ConnectionScheduleType.MANUAL || connectionRead.getStatus() != ConnectionStatus.ACTIVE) { // Manual syncs wait for their first run return new ScheduleRetrieverOutput(Duration.ofDays(100 * 365)); } - final Optional previousJobOptional = jobPersistence.getLastReplicationJob(connectionId); + final JobOptionalRead previousJobOptional = jobsApi.getLastReplicationJob(new ConnectionIdRequestBody().connectionId(connectionId)); if (connectionRead.getScheduleType() == ConnectionScheduleType.BASIC) { - if (previousJobOptional.isEmpty()) { + if (previousJobOptional.getJob() == null) { // Basic schedules don't wait for their first run. return new ScheduleRetrieverOutput(Duration.ZERO); } - final Job previousJob = previousJobOptional.get(); - final long prevRunStart = previousJob.getStartedAtInSecond().orElse(previousJob.getCreatedAtInSecond()); + final long prevRunStart = previousJobOptional.getJob().getStartedAt() != null ? previousJobOptional.getJob().getStartedAt() + : previousJobOptional.getJob().getCreatedAt(); final long nextRunStart = prevRunStart + getIntervalInSecond(connectionRead.getScheduleData().getBasicSchedule()); final Duration timeToWait = Duration.ofSeconds( Math.max(0, nextRunStart - currentSecondsSupplier.get())); @@ -135,9 +137,10 @@ private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final ConnectionRe // us from multiple executions for the same scheduled time, since cron only has a 1-minute // resolution. final long earliestNextRun = Math.max(currentSecondsSupplier.get() * MS_PER_SECOND, - (previousJobOptional.isPresent() - ? previousJobOptional.get().getStartedAtInSecond().orElse(previousJobOptional.get().getCreatedAtInSecond()) - + MIN_CRON_INTERVAL_SECONDS + (previousJobOptional.getJob() != null + ? previousJobOptional.getJob().getStartedAt() != null ? previousJobOptional.getJob().getStartedAt() + MIN_CRON_INTERVAL_SECONDS + : previousJobOptional.getJob().getCreatedAt() + + MIN_CRON_INTERVAL_SECONDS : currentSecondsSupplier.get()) * MS_PER_SECOND); final Date nextRunStart = cronExpression.getNextValidTimeAfter(new Date(earliestNextRun)); Duration timeToWait = Duration.ofSeconds( @@ -185,21 +188,22 @@ private Duration addSchedulingNoiseForAllowListedWorkspace(Duration timeToWait, * * This method consumes the `schedule` field. */ - private ScheduleRetrieverOutput getTimeToWaitFromLegacy(final ConnectionRead connectionRead, final UUID connectionId) throws IOException { + private ScheduleRetrieverOutput getTimeToWaitFromLegacy(final ConnectionRead connectionRead, final UUID connectionId) + throws IOException, ApiException { if (connectionRead.getSchedule() == null || connectionRead.getStatus() != ConnectionStatus.ACTIVE) { // Manual syncs wait for their first run return new ScheduleRetrieverOutput(Duration.ofDays(100 * 365)); } - final Optional previousJobOptional = jobPersistence.getLastReplicationJob(connectionId); + final JobOptionalRead previousJobOptional = jobsApi.getLastReplicationJob(new ConnectionIdRequestBody().connectionId(connectionId)); - if (previousJobOptional.isEmpty() && connectionRead.getSchedule() != null) { + if (previousJobOptional.getJob() == null && connectionRead.getSchedule() != null) { // Non-manual syncs don't wait for their first run return new ScheduleRetrieverOutput(Duration.ZERO); } - final Job previousJob = previousJobOptional.get(); - final long prevRunStart = previousJob.getStartedAtInSecond().orElse(previousJob.getCreatedAtInSecond()); + final JobRead previousJob = previousJobOptional.getJob(); + final long prevRunStart = previousJob.getStartedAt() != null ? previousJob.getStartedAt() : previousJob.getCreatedAt(); final long nextRunStart = prevRunStart + getIntervalInSecond(connectionRead.getSchedule()); diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index 7b3127f63cdb..7489b2e65eef 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -282,6 +282,7 @@

Jobs

  • post /v1/jobs/get_debug_info
  • post /v1/jobs/get
  • post /v1/jobs/get_light
  • +
  • post /v1/jobs/get_last_replication_job
  • post /v1/jobs/list
  • Logs

    @@ -1252,6 +1253,7 @@

    Example data

    "job" : { "createdAt" : 6, "configId" : "configId", + "startedAt" : 5, "id" : 0, "resetConfig" : { "streamsToReset" : [ { @@ -1267,12 +1269,12 @@

    Example data

    "attempts" : [ { "attempt" : { "totalStats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "failureSummary" : { "failures" : [ { @@ -1280,45 +1282,45 @@

    Example data

    "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 } ], "partialSuccess" : true }, - "createdAt" : 5, - "bytesSynced" : 9, - "endedAt" : 7, + "createdAt" : 2, + "bytesSynced" : 3, + "endedAt" : 9, "streamStats" : [ { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, - "recordsSynced" : 3, - "updatedAt" : 2 + "recordsSynced" : 2, + "updatedAt" : 7 }, "logs" : { "logLines" : [ "logLines", "logLines" ] @@ -1326,12 +1328,12 @@

    Example data

    }, { "attempt" : { "totalStats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "failureSummary" : { "failures" : [ { @@ -1339,45 +1341,45 @@

    Example data

    "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 } ], "partialSuccess" : true }, - "createdAt" : 5, - "bytesSynced" : 9, - "endedAt" : 7, + "createdAt" : 2, + "bytesSynced" : 3, + "endedAt" : 9, "streamStats" : [ { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, - "recordsSynced" : 3, - "updatedAt" : 2 + "recordsSynced" : 2, + "updatedAt" : 7 }, "logs" : { "logLines" : [ "logLines", "logLines" ] @@ -1650,6 +1652,7 @@

    Example data

    "job" : { "createdAt" : 6, "configId" : "configId", + "startedAt" : 5, "id" : 0, "resetConfig" : { "streamsToReset" : [ { @@ -1665,12 +1668,12 @@

    Example data

    "attempts" : [ { "attempt" : { "totalStats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "failureSummary" : { "failures" : [ { @@ -1678,45 +1681,45 @@

    Example data

    "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 } ], "partialSuccess" : true }, - "createdAt" : 5, - "bytesSynced" : 9, - "endedAt" : 7, + "createdAt" : 2, + "bytesSynced" : 3, + "endedAt" : 9, "streamStats" : [ { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, - "recordsSynced" : 3, - "updatedAt" : 2 + "recordsSynced" : 2, + "updatedAt" : 7 }, "logs" : { "logLines" : [ "logLines", "logLines" ] @@ -1724,12 +1727,12 @@

    Example data

    }, { "attempt" : { "totalStats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "failureSummary" : { "failures" : [ { @@ -1737,45 +1740,45 @@

    Example data

    "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 } ], "partialSuccess" : true }, - "createdAt" : 5, - "bytesSynced" : 9, - "endedAt" : 7, + "createdAt" : 2, + "bytesSynced" : 3, + "endedAt" : 9, "streamStats" : [ { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, - "recordsSynced" : 3, - "updatedAt" : 2 + "recordsSynced" : 2, + "updatedAt" : 7 }, "logs" : { "logLines" : [ "logLines", "logLines" ] @@ -1971,7 +1974,6 @@

    Return type

    Example data

    Content-Type: application/json
    {
    -  "didUpdateConfiguration" : true,
       "message" : "message",
       "jobInfo" : {
         "createdAt" : 0,
    @@ -2042,7 +2044,6 @@ 

    Return type

    Example data

    Content-Type: application/json
    {
    -  "didUpdateConfiguration" : true,
       "message" : "message",
       "jobInfo" : {
         "createdAt" : 0,
    @@ -4191,6 +4192,7 @@ 

    Example data

    "job" : { "createdAt" : 6, "configId" : "configId", + "startedAt" : 5, "id" : 0, "resetConfig" : { "streamsToReset" : [ { @@ -4206,12 +4208,12 @@

    Example data

    "attempts" : [ { "attempt" : { "totalStats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "failureSummary" : { "failures" : [ { @@ -4219,45 +4221,45 @@

    Example data

    "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 } ], "partialSuccess" : true }, - "createdAt" : 5, - "bytesSynced" : 9, - "endedAt" : 7, + "createdAt" : 2, + "bytesSynced" : 3, + "endedAt" : 9, "streamStats" : [ { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, - "recordsSynced" : 3, - "updatedAt" : 2 + "recordsSynced" : 2, + "updatedAt" : 7 }, "logs" : { "logLines" : [ "logLines", "logLines" ] @@ -4265,12 +4267,12 @@

    Example data

    }, { "attempt" : { "totalStats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "failureSummary" : { "failures" : [ { @@ -4278,45 +4280,45 @@

    Example data

    "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 } ], "partialSuccess" : true }, - "createdAt" : 5, - "bytesSynced" : 9, - "endedAt" : 7, + "createdAt" : 2, + "bytesSynced" : 3, + "endedAt" : 9, "streamStats" : [ { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, - "recordsSynced" : 3, - "updatedAt" : 2 + "recordsSynced" : 2, + "updatedAt" : 7 }, "logs" : { "logLines" : [ "logLines", "logLines" ] @@ -4526,12 +4528,12 @@

    Example data

    "attempts" : [ { "attempt" : { "totalStats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "failureSummary" : { "failures" : [ { @@ -4539,45 +4541,45 @@

    Example data

    "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 } ], "partialSuccess" : true }, - "createdAt" : 5, - "bytesSynced" : 9, - "endedAt" : 7, + "createdAt" : 2, + "bytesSynced" : 3, + "endedAt" : 9, "streamStats" : [ { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, - "recordsSynced" : 3, - "updatedAt" : 2 + "recordsSynced" : 2, + "updatedAt" : 7 }, "logs" : { "logLines" : [ "logLines", "logLines" ] @@ -4585,12 +4587,12 @@

    Example data

    }, { "attempt" : { "totalStats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "failureSummary" : { "failures" : [ { @@ -4598,45 +4600,45 @@

    Example data

    "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 } ], "partialSuccess" : true }, - "createdAt" : 5, - "bytesSynced" : 9, - "endedAt" : 7, + "createdAt" : 2, + "bytesSynced" : 3, + "endedAt" : 9, "streamStats" : [ { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, - "recordsSynced" : 3, - "updatedAt" : 2 + "recordsSynced" : 2, + "updatedAt" : 7 }, "logs" : { "logLines" : [ "logLines", "logLines" ] @@ -4702,6 +4704,7 @@

    Example data

    "job" : { "createdAt" : 6, "configId" : "configId", + "startedAt" : 5, "id" : 0, "resetConfig" : { "streamsToReset" : [ { @@ -4717,12 +4720,12 @@

    Example data

    "attempts" : [ { "attempt" : { "totalStats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "failureSummary" : { "failures" : [ { @@ -4730,45 +4733,45 @@

    Example data

    "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 } ], "partialSuccess" : true }, - "createdAt" : 5, - "bytesSynced" : 9, - "endedAt" : 7, + "createdAt" : 2, + "bytesSynced" : 3, + "endedAt" : 9, "streamStats" : [ { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, - "recordsSynced" : 3, - "updatedAt" : 2 + "recordsSynced" : 2, + "updatedAt" : 7 }, "logs" : { "logLines" : [ "logLines", "logLines" ] @@ -4776,12 +4779,12 @@

    Example data

    }, { "attempt" : { "totalStats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "failureSummary" : { "failures" : [ { @@ -4789,45 +4792,45 @@

    Example data

    "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 } ], "partialSuccess" : true }, - "createdAt" : 5, - "bytesSynced" : 9, - "endedAt" : 7, + "createdAt" : 2, + "bytesSynced" : 3, + "endedAt" : 9, "streamStats" : [ { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, - "recordsSynced" : 3, - "updatedAt" : 2 + "recordsSynced" : 2, + "updatedAt" : 7 }, "logs" : { "logLines" : [ "logLines", "logLines" ] @@ -4893,6 +4896,7 @@

    Example data

    "job" : { "createdAt" : 6, "configId" : "configId", + "startedAt" : 5, "id" : 0, "resetConfig" : { "streamsToReset" : [ { @@ -4926,6 +4930,79 @@

    422

    InvalidInputExceptionInfo
    +
    +
    + Up +
    post /v1/jobs/get_last_replication_job
    +
    (getLastReplicationJob)
    +
    + + +

    Consumes

    + This API call consumes the following media types via the Content-Type request header: +
      +
    • application/json
    • +
    + +

    Request body

    +
    +
    ConnectionIdRequestBody ConnectionIdRequestBody (required)
    + +
    Body Parameter
    + +
    + + + + +

    Return type

    + + + + +

    Example data

    +
    Content-Type: application/json
    +
    {
    +  "job" : {
    +    "createdAt" : 6,
    +    "configId" : "configId",
    +    "startedAt" : 5,
    +    "id" : 0,
    +    "resetConfig" : {
    +      "streamsToReset" : [ {
    +        "name" : "name",
    +        "namespace" : "namespace"
    +      }, {
    +        "name" : "name",
    +        "namespace" : "namespace"
    +      } ]
    +    },
    +    "updatedAt" : 1
    +  }
    +}
    + +

    Produces

    + This API call produces the following media types according to the Accept request header; + the media type will be conveyed by the Content-Type response header. +
      +
    • application/json
    • +
    + +

    Responses

    +

    200

    + Successful operation + JobOptionalRead +

    404

    + Object with given id was not found. + NotFoundKnownExceptionInfo +

    422

    + Input failed validation + InvalidInputExceptionInfo +
    +
    Up @@ -4967,6 +5044,7 @@

    Example data

    "job" : { "createdAt" : 6, "configId" : "configId", + "startedAt" : 5, "id" : 0, "resetConfig" : { "streamsToReset" : [ { @@ -4981,12 +5059,12 @@

    Example data

    }, "attempts" : [ { "totalStats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "failureSummary" : { "failures" : [ { @@ -4994,53 +5072,53 @@

    Example data

    "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 } ], "partialSuccess" : true }, - "createdAt" : 5, - "bytesSynced" : 9, - "endedAt" : 7, + "createdAt" : 2, + "bytesSynced" : 3, + "endedAt" : 9, "streamStats" : [ { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, - "recordsSynced" : 3, - "updatedAt" : 2 + "recordsSynced" : 2, + "updatedAt" : 7 }, { "totalStats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "failureSummary" : { "failures" : [ { @@ -5048,50 +5126,51 @@

    Example data

    "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 } ], "partialSuccess" : true }, - "createdAt" : 5, - "bytesSynced" : 9, - "endedAt" : 7, + "createdAt" : 2, + "bytesSynced" : 3, + "endedAt" : 9, "streamStats" : [ { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, - "recordsSynced" : 3, - "updatedAt" : 2 + "recordsSynced" : 2, + "updatedAt" : 7 } ] }, { "job" : { "createdAt" : 6, "configId" : "configId", + "startedAt" : 5, "id" : 0, "resetConfig" : { "streamsToReset" : [ { @@ -5106,12 +5185,12 @@

    Example data

    }, "attempts" : [ { "totalStats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "failureSummary" : { "failures" : [ { @@ -5119,53 +5198,53 @@

    Example data

    "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 } ], "partialSuccess" : true }, - "createdAt" : 5, - "bytesSynced" : 9, - "endedAt" : 7, + "createdAt" : 2, + "bytesSynced" : 3, + "endedAt" : 9, "streamStats" : [ { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, - "recordsSynced" : 3, - "updatedAt" : 2 + "recordsSynced" : 2, + "updatedAt" : 7 }, { "totalStats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "failureSummary" : { "failures" : [ { @@ -5173,45 +5252,45 @@

    Example data

    "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 6 + "timestamp" : 7 } ], "partialSuccess" : true }, - "createdAt" : 5, - "bytesSynced" : 9, - "endedAt" : 7, + "createdAt" : 2, + "bytesSynced" : 3, + "endedAt" : 9, "streamStats" : [ { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { - "stateMessagesEmitted" : 7, + "stateMessagesEmitted" : 1, "recordsCommitted" : 1, - "bytesEmitted" : 4, - "estimatedBytes" : 1, + "bytesEmitted" : 7, + "estimatedBytes" : 6, "estimatedRecords" : 1, - "recordsEmitted" : 2 + "recordsEmitted" : 4 }, "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, - "recordsSynced" : 3, - "updatedAt" : 2 + "recordsSynced" : 2, + "updatedAt" : 7 } ] } ] }
    @@ -5873,7 +5952,6 @@

    Return type

    Example data

    Content-Type: application/json
    {
    -  "didUpdateConfiguration" : true,
       "message" : "message",
       "jobInfo" : {
         "createdAt" : 0,
    @@ -5941,7 +6019,6 @@ 

    Return type

    Example data

    Content-Type: application/json
    {
    -  "didUpdateConfiguration" : true,
       "message" : "message",
       "jobInfo" : {
         "createdAt" : 0,
    @@ -6167,7 +6244,6 @@ 

    Return type

    Example data

    Content-Type: application/json
    {
    -  "didUpdateConfiguration" : true,
       "message" : "message",
       "jobInfo" : {
         "createdAt" : 0,
    @@ -6238,7 +6314,6 @@ 

    Return type

    Example data

    Content-Type: application/json
    {
    -  "didUpdateConfiguration" : true,
       "message" : "message",
       "jobInfo" : {
         "createdAt" : 0,
    @@ -10273,6 +10348,7 @@ 

    Table of Contents

  • JobInfoLightRead -
  • JobInfoRead -
  • JobListRequestBody -
  • +
  • JobOptionalRead -
  • JobRead -
  • JobReadList -
  • JobStatus -
  • @@ -10566,7 +10642,6 @@

    CheckConnectionRead - Enum:
    succeeded
    failed
    message (optional)
    -
    didUpdateConfiguration (optional)
    jobInfo
    @@ -11192,6 +11267,13 @@

    JobListRequestBody - pagination (optional) +
    +

    JobOptionalRead - Up

    +
    +
    +
    job (optional)
    +
    +

    JobRead - Up

    @@ -11201,6 +11283,7 @@

    JobRead - configId

    createdAt
    Long format: int64
    updatedAt
    Long format: int64
    +
    startedAt (optional)
    Long format: int64
    status
    resetConfig (optional)
    From 34bd14a811e37c9eae9d69581b5cb577c2e6dbed Mon Sep 17 00:00:00 2001 From: alovew Date: Thu, 19 Jan 2023 13:52:44 -0800 Subject: [PATCH 2/3] tests --- .../activities/ConfigFetchActivityTest.java | 74 +++++++++---------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java index 320df0ac0af4..d372d91b3377 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java @@ -8,6 +8,7 @@ import static org.mockito.Mockito.when; import io.airbyte.api.client.generated.ConnectionApi; +import io.airbyte.api.client.generated.JobsApi; import io.airbyte.api.client.generated.WorkspaceApi; import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.ConnectionRead; @@ -18,10 +19,10 @@ import io.airbyte.api.client.model.generated.ConnectionScheduleDataCron; import io.airbyte.api.client.model.generated.ConnectionScheduleType; import io.airbyte.api.client.model.generated.ConnectionStatus; +import io.airbyte.api.client.model.generated.JobOptionalRead; +import io.airbyte.api.client.model.generated.JobRead; import io.airbyte.api.client.model.generated.WorkspaceRead; import io.airbyte.config.persistence.ConfigNotFoundException; -import io.airbyte.persistence.job.JobPersistence; -import io.airbyte.persistence.job.models.Job; import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverInput; import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverOutput; @@ -29,7 +30,6 @@ import java.time.Duration; import java.time.Instant; import java.util.Calendar; -import java.util.Optional; import java.util.TimeZone; import java.util.UUID; import org.assertj.core.api.Assertions; @@ -47,12 +47,12 @@ class ConfigFetchActivityTest { private static final Integer SYNC_JOB_MAX_ATTEMPTS = 3; @Mock - private JobPersistence mJobPersistence; + private JobsApi mJobsApi; @Mock private WorkspaceApi mWorkspaceApi; @Mock - private Job mJob; + private JobRead mJobRead; @Mock private ConnectionApi mConnectionApi; @@ -103,7 +103,7 @@ class ConfigFetchActivityTest { @BeforeEach void setup() { configFetchActivity = - new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, + new ConfigFetchActivityImpl(mJobsApi, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> Instant.now().getEpochSecond(), mConnectionApi); } @@ -113,8 +113,8 @@ class TimeToWaitTest { @Test @DisplayName("Test that the job gets scheduled if it is not manual and if it is the first run with legacy schedule schema") void testFirstJobNonManual() throws IOException, JsonValidationException, ConfigNotFoundException, ApiException { - when(mJobPersistence.getLastReplicationJob(connectionId)) - .thenReturn(Optional.empty()); + when(mJobsApi.getLastReplicationJob(any())) + .thenReturn(new JobOptionalRead()); when(mConnectionApi.getConnection(any())) .thenReturn(connectionReadWithLegacySchedule); @@ -173,13 +173,13 @@ void testDeleted() throws ApiException { @DisplayName("Test we will wait the required amount of time with legacy config") void testWait() throws IOException, JsonValidationException, ConfigNotFoundException, ApiException { configFetchActivity = - new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi); + new ConfigFetchActivityImpl(mJobsApi, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi); - when(mJob.getStartedAtInSecond()) - .thenReturn(Optional.of(60L)); + when(mJobRead.getStartedAt()) + .thenReturn(60L); - when(mJobPersistence.getLastReplicationJob(connectionId)) - .thenReturn(Optional.of(mJob)); + when(mJobsApi.getLastReplicationJob(any())) + .thenReturn(new JobOptionalRead().job(mJobRead)); when(mConnectionApi.getConnection(any())) .thenReturn(connectionReadWithLegacySchedule); @@ -196,13 +196,13 @@ void testWait() throws IOException, JsonValidationException, ConfigNotFoundExcep @DisplayName("Test we will not wait if we are late in the legacy schedule schema") void testNotWaitIfLate() throws IOException, ApiException { configFetchActivity = - new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10, mConnectionApi); + new ConfigFetchActivityImpl(mJobsApi, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10, mConnectionApi); - when(mJob.getStartedAtInSecond()) - .thenReturn(Optional.of(60L)); + when(mJobRead.getStartedAt()) + .thenReturn(60L); - when(mJobPersistence.getLastReplicationJob(connectionId)) - .thenReturn(Optional.of(mJob)); + when(mJobsApi.getLastReplicationJob(any())) + .thenReturn(new JobOptionalRead().job(mJobRead)); when(mConnectionApi.getConnection(any())) .thenReturn(connectionReadWithLegacySchedule); @@ -234,8 +234,8 @@ void testManualScheduleType() throws ApiException { @Test @DisplayName("Test that the job will be immediately scheduled if it is a BASIC_SCHEDULE type on the first run") void testBasicScheduleTypeFirstRun() throws IOException, ApiException { - when(mJobPersistence.getLastReplicationJob(connectionId)) - .thenReturn(Optional.empty()); + when(mJobsApi.getLastReplicationJob(any())) + .thenReturn(new JobOptionalRead()); when(mConnectionApi.getConnection(any())) .thenReturn(connectionReadWithBasicScheduleType); @@ -251,13 +251,13 @@ void testBasicScheduleTypeFirstRun() throws IOException, ApiException { @Test @DisplayName("Test that we will wait the required amount of time with a BASIC_SCHEDULE type on a subsequent run") void testBasicScheduleSubsequentRun() throws IOException, ApiException { - configFetchActivity = new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi); + configFetchActivity = new ConfigFetchActivityImpl(mJobsApi, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi); - when(mJob.getStartedAtInSecond()) - .thenReturn(Optional.of(60L)); + when(mJobRead.getStartedAt()) + .thenReturn(60L); - when(mJobPersistence.getLastReplicationJob(connectionId)) - .thenReturn(Optional.of(mJob)); + when(mJobsApi.getLastReplicationJob(any())) + .thenReturn(new JobOptionalRead().job(mJobRead)); when(mConnectionApi.getConnection(any())) .thenReturn(connectionReadWithBasicScheduleType); @@ -282,11 +282,11 @@ void testCronScheduleSubsequentRun() throws IOException, JsonValidationException when(mWorkspaceApi.getWorkspaceByConnectionId(any())).thenReturn(new WorkspaceRead().workspaceId(UUID.randomUUID())); configFetchActivity = - new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, + new ConfigFetchActivityImpl(mJobsApi, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi); - when(mJobPersistence.getLastReplicationJob(connectionId)) - .thenReturn(Optional.of(mJob)); + when(mJobsApi.getLastReplicationJob(any())) + .thenReturn(new JobOptionalRead().job(mJobRead)); when(mConnectionApi.getConnection(any())) .thenReturn(connectionReadWithCronScheduleType); @@ -311,12 +311,12 @@ void testCronScheduleMinimumInterval() throws IOException, JsonValidationExcepti when(mWorkspaceApi.getWorkspaceByConnectionId(any())).thenReturn(new WorkspaceRead().workspaceId(UUID.randomUUID())); configFetchActivity = - new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, + new ConfigFetchActivityImpl(mJobsApi, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi); - when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L)); - when(mJobPersistence.getLastReplicationJob(connectionId)) - .thenReturn(Optional.of(mJob)); + when(mJobRead.getStartedAt()).thenReturn(mockRightNow.getTimeInMillis() / 1000L); + when(mJobsApi.getLastReplicationJob(any())) + .thenReturn(new JobOptionalRead().job(mJobRead)); when(mConnectionApi.getConnection(any())) .thenReturn(connectionReadWithCronScheduleType); @@ -342,12 +342,12 @@ void testCronSchedulingNoise() throws IOException, JsonValidationException, Conf .thenReturn(new WorkspaceRead().workspaceId(UUID.fromString("226edbc1-4a9c-4401-95a9-90435d667d9d"))); configFetchActivity = - new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, + new ConfigFetchActivityImpl(mJobsApi, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi); - when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L)); - when(mJobPersistence.getLastReplicationJob(connectionId)) - .thenReturn(Optional.of(mJob)); + when(mJobRead.getStartedAt()).thenReturn(mockRightNow.getTimeInMillis() / 1000L); + when(mJobsApi.getLastReplicationJob(any())) + .thenReturn(new JobOptionalRead().job(mJobRead)); when(mConnectionApi.getConnection(any())) .thenReturn(connectionReadWithCronScheduleType); @@ -368,7 +368,7 @@ class TestGetMaxAttempt { void testGetMaxAttempt() { final int maxAttempt = 15031990; configFetchActivity = - new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, maxAttempt, () -> Instant.now().getEpochSecond(), mConnectionApi); + new ConfigFetchActivityImpl(mJobsApi, mWorkspaceApi, maxAttempt, () -> Instant.now().getEpochSecond(), mConnectionApi); Assertions.assertThat(configFetchActivity.getMaxAttempt().getMaxAttempt()) .isEqualTo(maxAttempt); } From 8cc48de23390ba70583a052d5c11c608eb34e8f0 Mon Sep 17 00:00:00 2001 From: alovew Date: Fri, 20 Jan 2023 15:13:45 -0800 Subject: [PATCH 3/3] add jobsapi to api client bean factory --- .../io/airbyte/workers/config/ApiClientBeanFactory.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java index d7e5d80407c0..47f2c7cded30 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java @@ -11,6 +11,7 @@ import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.api.client.generated.ConnectionApi; import io.airbyte.api.client.generated.DestinationApi; +import io.airbyte.api.client.generated.JobsApi; import io.airbyte.api.client.generated.SourceApi; import io.airbyte.api.client.generated.WorkspaceApi; import io.airbyte.api.client.invoker.generated.ApiClient; @@ -72,6 +73,11 @@ public SourceApi sourceApi(@Named("apiClient") final ApiClient apiClient) { return new SourceApi(apiClient); } + @Singleton + public JobsApi jobsApi(@Named("apiClient") final ApiClient apiClient) { + return new JobsApi(apiClient); + } + @Singleton public DestinationApi destinationApi(final ApiClient apiClient) { return new DestinationApi(apiClient);