From 54725284745c767e0ae20c7f33988a433991bea6 Mon Sep 17 00:00:00 2001 From: "Pedro S. Lopez" Date: Tue, 7 Feb 2023 15:42:52 -0800 Subject: [PATCH] Revert "Revert "fix: refresh actor configuration and state between sync attempts"" (#22281) * Revert "Revert "fix: refresh actor configuration and state between sync attempts (#21629)" (#22211)" This reverts commit 7978862f623fb0e71dec5dd24ed421a67c1521e7. * fmt --- airbyte-api/src/main/openapi/config.yaml | 45 ++++ .../io/airbyte/bootloader/BootloaderTest.java | 7 +- .../server/converters/ApiPojoConverters.java | 45 ++++ .../server/handlers/AttemptHandler.java | 15 ++ .../server/handlers/AttemptHandlerTest.java | 52 +++- .../handlers/JobHistoryHandlerTest.java | 2 +- .../commons/temporal/TemporalClient.java | 13 +- .../commons/temporal/TemporalClientTest.java | 22 +- .../workers/config/ApiClientBeanFactory.java | 12 + .../workers/helper/ProtocolConverters.java | 5 + .../workers/helper/StateConverter.java | 46 ++++ .../resources/types/AttemptSyncConfig.yaml | 22 ++ .../types/JobResetConnectionConfig.yaml | 8 - .../main/resources/types/JobSyncConfig.yaml | 13 - .../V0_40_28_001__AddAttemptSyncConfig.java | 39 +++ .../resources/jobs_database/Attempts.yaml | 2 + .../resources/jobs_database/schema_dump.txt | 1 + .../persistence/job/DefaultJobCreator.java | 20 +- .../job/DefaultJobPersistence.java | 16 ++ .../persistence/job/JobPersistence.java | 12 + .../persistence/job/models/Attempt.java | 13 +- .../airbyte/persistence/job/models/Job.java | 7 + .../persistence/job/tracker/JobTracker.java | 31 ++- .../job/DefaultJobCreatorTest.java | 30 +-- .../job/DefaultJobPersistenceTest.java | 24 ++ .../persistence/job/models/AttemptTest.java | 2 +- .../persistence/job/models/JobTest.java | 9 +- .../job/tracker/JobTrackerTest.java | 11 +- .../job/tracker/TrackingMetadataTest.java | 4 +- .../server/apis/AttemptApiController.java | 9 + airbyte-workers/build.gradle | 1 + .../config/ApplicationBeanFactory.java | 7 +- .../workers/run/TemporalWorkerRunFactory.java | 18 +- .../activities/GenerateInputActivityImpl.java | 88 +++++-- .../run/TemporalWorkerRunFactoryTest.java | 21 +- .../activities/GenerateInputActivityTest.java | 234 ++++++++++++++++++ ...obCreationAndStatusUpdateActivityTest.java | 12 +- .../api/generated-api-html/index.html | 126 ++++++++++ 38 files changed, 902 insertions(+), 142 deletions(-) create mode 100644 airbyte-config/config-models/src/main/resources/types/AttemptSyncConfig.yaml create mode 100644 airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_40_28_001__AddAttemptSyncConfig.java create mode 100644 airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityTest.java diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index b88dc1bcfb5f..31c248e30927 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -2214,6 +2214,26 @@ paths: application/json: schema: $ref: "#/components/schemas/InternalOperationResult" + /v1/attempt/save_sync_config: + post: + tags: + - attempt + - internal + summary: For worker to save the AttemptSyncConfig for an attempt. + operationId: saveSyncConfig + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/SaveAttemptSyncConfigRequestBody" + required: true + responses: + "200": + description: Successful Operation + content: + application/json: + schema: + $ref: "#/components/schemas/InternalOperationResult" components: securitySchemes: @@ -5026,6 +5046,31 @@ components: type: array items: $ref: "#/components/schemas/AttemptStreamStats" + AttemptSyncConfig: + type: object + required: + - sourceConfiguration + - destinationConfiguration + properties: + sourceConfiguration: + $ref: "#/components/schemas/SourceConfiguration" + destinationConfiguration: + $ref: "#/components/schemas/DestinationConfiguration" + state: + $ref: "#/components/schemas/ConnectionState" + SaveAttemptSyncConfigRequestBody: + type: object + required: + - jobId + - attemptNumber + - syncConfig + properties: + jobId: + $ref: "#/components/schemas/JobId" + attemptNumber: + $ref: "#/components/schemas/AttemptNumber" + syncConfig: + $ref: "#/components/schemas/AttemptSyncConfig" InternalOperationResult: type: object required: diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java index 032ec01e66c4..c31d6acdb644 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java @@ -82,7 +82,8 @@ class BootloaderTest { // ⚠️ This line should change with every new migration to show that you meant to make a new // migration to the prod database - private static final String CURRENT_MIGRATION_VERSION = "0.40.28.001"; + private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.40.28.001"; + private static final String CURRENT_JOBS_MIGRATION_VERSION = "0.40.28.001"; @BeforeEach void setup() { @@ -147,10 +148,10 @@ void testBootloaderAppBlankDb() throws Exception { bootloader.load(); val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway); - assertEquals("0.40.26.001", jobsMigrator.getLatestMigration().getVersion().getVersion()); + assertEquals(CURRENT_JOBS_MIGRATION_VERSION, jobsMigrator.getLatestMigration().getVersion().getVersion()); val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway); - assertEquals(CURRENT_MIGRATION_VERSION, configsMigrator.getLatestMigration().getVersion().getVersion()); + assertEquals(CURRENT_CONFIGS_MIGRATION_VERSION, configsMigrator.getLatestMigration().getVersion().getVersion()); assertEquals(VERSION_0330_ALPHA, jobsPersistence.getVersion().get()); assertEquals(new Version(PROTOCOL_VERSION_123), jobsPersistence.getAirbyteProtocolVersionMin().get()); diff --git a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/converters/ApiPojoConverters.java b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/converters/ApiPojoConverters.java index ea6309030676..713aefd7d715 100644 --- a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/converters/ApiPojoConverters.java +++ b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/converters/ApiPojoConverters.java @@ -5,11 +5,14 @@ package io.airbyte.commons.server.converters; import io.airbyte.api.model.generated.ActorDefinitionResourceRequirements; +import io.airbyte.api.model.generated.AttemptSyncConfig; import io.airbyte.api.model.generated.ConnectionRead; import io.airbyte.api.model.generated.ConnectionSchedule; import io.airbyte.api.model.generated.ConnectionScheduleData; import io.airbyte.api.model.generated.ConnectionScheduleDataBasicSchedule; import io.airbyte.api.model.generated.ConnectionScheduleDataCron; +import io.airbyte.api.model.generated.ConnectionState; +import io.airbyte.api.model.generated.ConnectionStateType; import io.airbyte.api.model.generated.ConnectionStatus; import io.airbyte.api.model.generated.Geography; import io.airbyte.api.model.generated.JobType; @@ -22,6 +25,12 @@ import io.airbyte.config.BasicSchedule; import io.airbyte.config.Schedule; import io.airbyte.config.StandardSync; +import io.airbyte.config.State; +import io.airbyte.config.StateWrapper; +import io.airbyte.config.helpers.StateMessageHelper; +import io.airbyte.workers.helper.StateConverter; +import java.util.Optional; +import java.util.UUID; import java.util.stream.Collectors; public class ApiPojoConverters { @@ -42,6 +51,42 @@ public static io.airbyte.config.ActorDefinitionResourceRequirements actorDefReso .collect(Collectors.toList())); } + public static io.airbyte.config.AttemptSyncConfig attemptSyncConfigToInternal(final AttemptSyncConfig attemptSyncConfig) { + if (attemptSyncConfig == null) { + return null; + } + + final io.airbyte.config.AttemptSyncConfig internalAttemptSyncConfig = new io.airbyte.config.AttemptSyncConfig() + .withSourceConfiguration(attemptSyncConfig.getSourceConfiguration()) + .withDestinationConfiguration(attemptSyncConfig.getDestinationConfiguration()); + + final ConnectionState connectionState = attemptSyncConfig.getState(); + if (connectionState != null && connectionState.getStateType() != ConnectionStateType.NOT_SET) { + final StateWrapper stateWrapper = StateConverter.toInternal(attemptSyncConfig.getState()); + final io.airbyte.config.State state = StateMessageHelper.getState(stateWrapper); + internalAttemptSyncConfig.setState(state); + } + + return internalAttemptSyncConfig; + } + + public static io.airbyte.api.client.model.generated.AttemptSyncConfig attemptSyncConfigToClient(final io.airbyte.config.AttemptSyncConfig attemptSyncConfig, + final UUID connectionId, + final boolean useStreamCapableState) { + if (attemptSyncConfig == null) { + return null; + } + + final State state = attemptSyncConfig.getState(); + final Optional optStateWrapper = state != null ? StateMessageHelper.getTypedState( + state.getState(), useStreamCapableState) : Optional.empty(); + + return new io.airbyte.api.client.model.generated.AttemptSyncConfig() + .sourceConfiguration(attemptSyncConfig.getSourceConfiguration()) + .destinationConfiguration(attemptSyncConfig.getDestinationConfiguration()) + .state(StateConverter.toClient(connectionId, optStateWrapper.orElse(null))); + } + public static ActorDefinitionResourceRequirements actorDefResourceReqsToApi(final io.airbyte.config.ActorDefinitionResourceRequirements actorDefResourceReqs) { if (actorDefResourceReqs == null) { return null; diff --git a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/AttemptHandler.java b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/AttemptHandler.java index c7132665fe92..0302bc63b37b 100644 --- a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/AttemptHandler.java +++ b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/AttemptHandler.java @@ -5,8 +5,10 @@ package io.airbyte.commons.server.handlers; import io.airbyte.api.model.generated.InternalOperationResult; +import io.airbyte.api.model.generated.SaveAttemptSyncConfigRequestBody; import io.airbyte.api.model.generated.SaveStatsRequestBody; import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody; +import io.airbyte.commons.server.converters.ApiPojoConverters; import io.airbyte.config.StreamSyncStats; import io.airbyte.config.SyncStats; import io.airbyte.persistence.job.JobPersistence; @@ -63,4 +65,17 @@ public InternalOperationResult saveStats(final SaveStatsRequestBody requestBody) return new InternalOperationResult().succeeded(true); } + public InternalOperationResult saveSyncConfig(final SaveAttemptSyncConfigRequestBody requestBody) { + try { + jobPersistence.writeAttemptSyncConfig( + requestBody.getJobId(), + requestBody.getAttemptNumber(), + ApiPojoConverters.attemptSyncConfigToInternal(requestBody.getSyncConfig())); + } catch (final IOException ioe) { + LOGGER.error("IOException when saving AttemptSyncConfig for attempt;", ioe); + return new InternalOperationResult().succeeded(false); + } + return new InternalOperationResult().succeeded(true); + } + } diff --git a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/AttemptHandlerTest.java b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/AttemptHandlerTest.java index eeb2c0ff48c7..a3415ab24fff 100644 --- a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/AttemptHandlerTest.java +++ b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/AttemptHandlerTest.java @@ -12,9 +12,18 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doThrow; +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.api.model.generated.AttemptSyncConfig; +import io.airbyte.api.model.generated.ConnectionState; +import io.airbyte.api.model.generated.ConnectionStateType; +import io.airbyte.api.model.generated.GlobalState; +import io.airbyte.api.model.generated.SaveAttemptSyncConfigRequestBody; import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.server.converters.ApiPojoConverters; import io.airbyte.persistence.job.JobPersistence; import java.io.IOException; +import java.util.Map; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -26,6 +35,7 @@ class AttemptHandlerTest { JobPersistence jobPersistence; AttemptHandler handler; + private static final UUID CONNECTION_ID = UUID.randomUUID(); private static final long JOB_ID = 10002L; private static final int ATTEMPT_NUMBER = 1; @@ -39,14 +49,14 @@ public void init() { @Test void testInternalWorkerHandlerSetsTemporalWorkflowId() throws Exception { - String workflowId = UUID.randomUUID().toString(); + final String workflowId = UUID.randomUUID().toString(); final ArgumentCaptor attemptNumberCapture = ArgumentCaptor.forClass(Integer.class); final ArgumentCaptor jobIdCapture = ArgumentCaptor.forClass(Long.class); final ArgumentCaptor workflowIdCapture = ArgumentCaptor.forClass(String.class); final ArgumentCaptor queueCapture = ArgumentCaptor.forClass(String.class); - SetWorkflowInAttemptRequestBody requestBody = + final SetWorkflowInAttemptRequestBody requestBody = new SetWorkflowInAttemptRequestBody().attemptNumber(ATTEMPT_NUMBER).jobId(JOB_ID).workflowId(workflowId) .processingTaskQueue(PROCESSING_TASK_QUEUE); @@ -63,7 +73,7 @@ void testInternalWorkerHandlerSetsTemporalWorkflowId() throws Exception { @Test void testInternalWorkerHandlerSetsTemporalWorkflowIdThrows() throws Exception { - String workflowId = UUID.randomUUID().toString(); + final String workflowId = UUID.randomUUID().toString(); doThrow(IOException.class).when(jobPersistence).setAttemptTemporalWorkflowInfo(anyLong(), anyInt(), any(), any()); @@ -73,7 +83,7 @@ void testInternalWorkerHandlerSetsTemporalWorkflowIdThrows() throws Exception { final ArgumentCaptor workflowIdCapture = ArgumentCaptor.forClass(String.class); final ArgumentCaptor queueCapture = ArgumentCaptor.forClass(String.class); - SetWorkflowInAttemptRequestBody requestBody = + final SetWorkflowInAttemptRequestBody requestBody = new SetWorkflowInAttemptRequestBody().attemptNumber(ATTEMPT_NUMBER).jobId(JOB_ID).workflowId(workflowId) .processingTaskQueue(PROCESSING_TASK_QUEUE); @@ -88,4 +98,38 @@ void testInternalWorkerHandlerSetsTemporalWorkflowIdThrows() throws Exception { assertEquals(PROCESSING_TASK_QUEUE, queueCapture.getValue()); } + @Test + void testInternalHandlerSetsAttemptSyncConfig() throws Exception { + final ArgumentCaptor attemptNumberCapture = ArgumentCaptor.forClass(Integer.class); + final ArgumentCaptor jobIdCapture = ArgumentCaptor.forClass(Long.class); + final ArgumentCaptor attemptSyncConfigCapture = + ArgumentCaptor.forClass(io.airbyte.config.AttemptSyncConfig.class); + + final JsonNode sourceConfig = Jsons.jsonNode(Map.of("source_key", "source_val")); + final JsonNode destinationConfig = Jsons.jsonNode(Map.of("destination_key", "destination_val")); + final ConnectionState state = new ConnectionState() + .connectionId(CONNECTION_ID) + .stateType(ConnectionStateType.GLOBAL) + .streamState(null) + .globalState(new GlobalState().sharedState(Jsons.jsonNode(Map.of("state_key", "state_val")))); + + final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig() + .destinationConfiguration(destinationConfig) + .sourceConfiguration(sourceConfig) + .state(state); + + final SaveAttemptSyncConfigRequestBody requestBody = + new SaveAttemptSyncConfigRequestBody().attemptNumber(ATTEMPT_NUMBER).jobId(JOB_ID).syncConfig(attemptSyncConfig); + + assertTrue(handler.saveSyncConfig(requestBody).getSucceeded()); + + Mockito.verify(jobPersistence).writeAttemptSyncConfig(jobIdCapture.capture(), attemptNumberCapture.capture(), attemptSyncConfigCapture.capture()); + + final io.airbyte.config.AttemptSyncConfig expectedAttemptSyncConfig = ApiPojoConverters.attemptSyncConfigToInternal(attemptSyncConfig); + + assertEquals(ATTEMPT_NUMBER, attemptNumberCapture.getValue()); + assertEquals(JOB_ID, jobIdCapture.getValue()); + assertEquals(expectedAttemptSyncConfig, attemptSyncConfigCapture.getValue()); + } + } diff --git a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobHistoryHandlerTest.java b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobHistoryHandlerTest.java index 3ab2582afdb4..81adf7f43ec0 100644 --- a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobHistoryHandlerTest.java +++ b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobHistoryHandlerTest.java @@ -157,7 +157,7 @@ private static AttemptRead toAttemptRead(final Attempt a) { } private static Attempt createAttempt(final long jobId, final long timestamps, final AttemptStatus status) { - return new Attempt(ATTEMPT_NUMBER, jobId, LOG_PATH, null, status, null, null, timestamps, timestamps, timestamps); + return new Attempt(ATTEMPT_NUMBER, jobId, LOG_PATH, null, null, status, null, null, timestamps, timestamps, timestamps); } @BeforeEach diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java index 129b7ae44c96..5fa35f1414e4 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java @@ -16,6 +16,7 @@ import io.airbyte.commons.temporal.scheduling.SpecWorkflow; import io.airbyte.commons.temporal.scheduling.SyncWorkflow; import io.airbyte.commons.temporal.scheduling.state.WorkflowState; +import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.JobCheckConnectionConfig; import io.airbyte.config.JobDiscoverCatalogConfig; @@ -374,7 +375,11 @@ public TemporalResponse submitDiscoverSchema(final UUID jobI () -> getWorkflowStubWithTaskQueue(DiscoverCatalogWorkflow.class, taskQueue).run(jobRunConfig, launcherConfig, input)); } - public TemporalResponse submitSync(final long jobId, final int attempt, final JobSyncConfig config, final UUID connectionId) { + public TemporalResponse submitSync(final long jobId, + final int attempt, + final JobSyncConfig config, + final AttemptSyncConfig attemptConfig, + final UUID connectionId) { final JobRunConfig jobRunConfig = TemporalWorkflowUtils.createJobRunConfig(jobId, attempt); final IntegrationLauncherConfig sourceLauncherConfig = new IntegrationLauncherConfig() @@ -395,11 +400,11 @@ public TemporalResponse submitSync(final long jobId, final i .withNamespaceDefinition(config.getNamespaceDefinition()) .withNamespaceFormat(config.getNamespaceFormat()) .withPrefix(config.getPrefix()) - .withSourceConfiguration(config.getSourceConfiguration()) - .withDestinationConfiguration(config.getDestinationConfiguration()) + .withSourceConfiguration(attemptConfig.getSourceConfiguration()) + .withDestinationConfiguration(attemptConfig.getDestinationConfiguration()) .withOperationSequence(config.getOperationSequence()) .withCatalog(config.getConfiguredAirbyteCatalog()) - .withState(config.getState()) + .withState(attemptConfig.getState()) .withResourceRequirements(config.getResourceRequirements()) .withSourceResourceRequirements(config.getSourceResourceRequirements()) .withDestinationResourceRequirements(config.getDestinationResourceRequirements()) diff --git a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java index fad2eff33288..95bd9ad5418d 100644 --- a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java +++ b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java @@ -31,6 +31,7 @@ import io.airbyte.commons.temporal.scheduling.SpecWorkflow; import io.airbyte.commons.temporal.scheduling.SyncWorkflow; import io.airbyte.commons.temporal.scheduling.state.WorkflowState; +import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.FailureReason; import io.airbyte.config.JobCheckConnectionConfig; @@ -274,26 +275,27 @@ void testSubmitSync() { final JobSyncConfig syncConfig = new JobSyncConfig() .withSourceDockerImage(IMAGE_NAME1) .withDestinationDockerImage(IMAGE_NAME2) - .withSourceConfiguration(Jsons.emptyObject()) - .withDestinationConfiguration(Jsons.emptyObject()) .withOperationSequence(List.of()) .withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog()); + final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig() + .withSourceConfiguration(Jsons.emptyObject()) + .withDestinationConfiguration(Jsons.emptyObject()); final StandardSyncInput input = new StandardSyncInput() .withNamespaceDefinition(syncConfig.getNamespaceDefinition()) .withNamespaceFormat(syncConfig.getNamespaceFormat()) .withPrefix(syncConfig.getPrefix()) - .withSourceConfiguration(syncConfig.getSourceConfiguration()) - .withDestinationConfiguration(syncConfig.getDestinationConfiguration()) + .withSourceConfiguration(attemptSyncConfig.getSourceConfiguration()) + .withDestinationConfiguration(attemptSyncConfig.getDestinationConfiguration()) .withOperationSequence(syncConfig.getOperationSequence()) .withCatalog(syncConfig.getConfiguredAirbyteCatalog()) - .withState(syncConfig.getState()); + .withState(attemptSyncConfig.getState()); final IntegrationLauncherConfig destinationLauncherConfig = new IntegrationLauncherConfig() .withJobId(String.valueOf(JOB_ID)) .withAttemptId((long) ATTEMPT_ID) .withDockerImage(IMAGE_NAME2); - temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID); + temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, attemptSyncConfig, CONNECTION_ID); discoverCatalogWorkflow.run(JOB_RUN_CONFIG, LAUNCHER_CONFIG, destinationLauncherConfig, input, CONNECTION_ID); verify(workflowClient).newWorkflowStub(SyncWorkflow.class, TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.SYNC)); } @@ -343,15 +345,17 @@ void testforceCancelConnection() { doReturn(true).when(temporalClient).isWorkflowReachable(any(UUID.class)); when(workflowClient.newWorkflowStub(any(Class.class), anyString())).thenReturn(mConnectionManagerWorkflow); + final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig() + .withSourceConfiguration(Jsons.emptyObject()) + .withDestinationConfiguration(Jsons.emptyObject()); + final JobSyncConfig syncConfig = new JobSyncConfig() .withSourceDockerImage(IMAGE_NAME1) .withDestinationDockerImage(IMAGE_NAME2) - .withSourceConfiguration(Jsons.emptyObject()) - .withDestinationConfiguration(Jsons.emptyObject()) .withOperationSequence(List.of()) .withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog()); - temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID); + temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, attemptSyncConfig, CONNECTION_ID); temporalClient.forceDeleteWorkflow(CONNECTION_ID); verify(connectionManagerUtils).deleteWorkflowIfItExist(workflowClient, CONNECTION_ID); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java index 0577d5fd463b..64b9647287b7 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java @@ -9,10 +9,12 @@ import com.auth0.jwt.algorithms.Algorithm; import com.google.auth.oauth2.ServiceAccountCredentials; import io.airbyte.api.client.AirbyteApiClient; +import io.airbyte.api.client.generated.AttemptApi; import io.airbyte.api.client.generated.ConnectionApi; import io.airbyte.api.client.generated.DestinationApi; import io.airbyte.api.client.generated.JobsApi; import io.airbyte.api.client.generated.SourceApi; +import io.airbyte.api.client.generated.StateApi; import io.airbyte.api.client.generated.WorkspaceApi; import io.airbyte.api.client.invoker.generated.ApiClient; import io.airbyte.commons.temporal.config.WorkerMode; @@ -96,6 +98,16 @@ public WorkspaceApi workspaceApi(final ApiClient apiClient) { return new WorkspaceApi(apiClient); } + @Singleton + public AttemptApi attemptApi(final ApiClient apiClient) { + return new AttemptApi(apiClient); + } + + @Singleton + public StateApi stateApi(final ApiClient apiClient) { + return new StateApi(apiClient); + } + @Singleton public HttpClient httpClient() { return HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build(); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/ProtocolConverters.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/ProtocolConverters.java index b06eb17ee0e9..313263173825 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/ProtocolConverters.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/ProtocolConverters.java @@ -26,4 +26,9 @@ public static io.airbyte.protocol.models.StreamDescriptor streamDescriptorToProt .withNamespace(apiStreamDescriptor.getNamespace()); } + public static io.airbyte.protocol.models.StreamDescriptor clientStreamDescriptorToProtocol(final io.airbyte.api.client.model.generated.StreamDescriptor clientStreamDescriptor) { + return new io.airbyte.protocol.models.StreamDescriptor().withName(clientStreamDescriptor.getName()) + .withNamespace(clientStreamDescriptor.getNamespace()); + } + } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/StateConverter.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/StateConverter.java index 73fe752537ce..0765e242d93f 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/StateConverter.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/StateConverter.java @@ -13,6 +13,7 @@ import io.airbyte.config.StateWrapper; import io.airbyte.protocol.models.AirbyteGlobalState; import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; import io.airbyte.protocol.models.AirbyteStreamState; import java.util.List; import java.util.Optional; @@ -68,6 +69,15 @@ public static StateWrapper toInternal(final @Nullable ConnectionState apiConnect } + public static StateWrapper clientToInternal(final @Nullable io.airbyte.api.client.model.generated.ConnectionState clientConnectionState) { + return new StateWrapper() + .withStateType(clientConnectionState != null ? convertClientStateTypeToInternal(clientConnectionState.getStateType()) : null) + .withGlobal(clientGlobalStateToInternal(clientConnectionState).orElse(null)) + .withLegacyState(clientConnectionState != null ? clientConnectionState.getState() : null) + .withStateMessages(clientStreamStateToInternal(clientConnectionState).orElse(null)); + + } + public static StateType convertClientStateTypeToInternal(final @Nullable io.airbyte.api.client.model.generated.ConnectionStateType connectionStateType) { if (connectionStateType == null || connectionStateType.equals(io.airbyte.api.client.model.generated.ConnectionStateType.NOT_SET)) { return null; @@ -191,6 +201,23 @@ private static Optional globalStateToInternal(final @Nullab } } + private static Optional clientGlobalStateToInternal(final @Nullable io.airbyte.api.client.model.generated.ConnectionState connectionState) { + if (connectionState != null + && connectionState.getStateType() == io.airbyte.api.client.model.generated.ConnectionStateType.GLOBAL + && connectionState.getGlobalState() != null) { + return Optional.of(new AirbyteStateMessage() + .withType(AirbyteStateType.GLOBAL) + .withGlobal(new AirbyteGlobalState() + .withSharedState(connectionState.getGlobalState().getSharedState()) + .withStreamStates(connectionState.getGlobalState().getStreamStates() + .stream() + .map(StateConverter::clientStreamStateStructToInternal) + .toList()))); + } else { + return Optional.empty(); + } + } + /** * If wrapper is of type stream state, returns API representation of stream state. Otherwise, empty * optional. @@ -251,6 +278,19 @@ private static Optional> streamStateToInternal(final @ } } + private static Optional> clientStreamStateToInternal(final @Nullable io.airbyte.api.client.model.generated.ConnectionState connectionState) { + if (connectionState != null && connectionState.getStateType() == io.airbyte.api.client.model.generated.ConnectionStateType.STREAM + && connectionState.getStreamState() != null) { + return Optional.ofNullable(connectionState.getStreamState() + .stream() + .map(StateConverter::clientStreamStateStructToInternal) + .map(s -> new AirbyteStateMessage().withType(AirbyteStateType.STREAM).withStream(s)) + .toList()); + } else { + return Optional.empty(); + } + } + private static StreamState streamStateStructToApi(final AirbyteStreamState streamState) { return new StreamState() .streamDescriptor(ProtocolConverters.streamDescriptorToApi(streamState.getStreamDescriptor())) @@ -269,4 +309,10 @@ private static AirbyteStreamState streamStateStructToInternal(final StreamState .withStreamState(streamState.getStreamState()); } + private static AirbyteStreamState clientStreamStateStructToInternal(final io.airbyte.api.client.model.generated.StreamState streamState) { + return new AirbyteStreamState() + .withStreamDescriptor(ProtocolConverters.clientStreamDescriptorToProtocol(streamState.getStreamDescriptor())) + .withStreamState(streamState.getStreamState()); + } + } diff --git a/airbyte-config/config-models/src/main/resources/types/AttemptSyncConfig.yaml b/airbyte-config/config-models/src/main/resources/types/AttemptSyncConfig.yaml new file mode 100644 index 000000000000..7b28faea7bbf --- /dev/null +++ b/airbyte-config/config-models/src/main/resources/types/AttemptSyncConfig.yaml @@ -0,0 +1,22 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/AttemptSyncConfig.yaml +title: AttemptSyncConfig +description: attempt sync config +type: object +additionalProperties: false +required: + - sourceConfiguration + - destinationConfiguration +properties: + sourceConfiguration: + description: Integration specific blob. Must be a valid JSON string. + type: object + existingJavaType: com.fasterxml.jackson.databind.JsonNode + destinationConfiguration: + description: Integration specific blob. Must be a valid JSON string. + type: object + existingJavaType: com.fasterxml.jackson.databind.JsonNode + state: + description: optional state of the previous run. this object is defined per integration. + "$ref": State.yaml diff --git a/airbyte-config/config-models/src/main/resources/types/JobResetConnectionConfig.yaml b/airbyte-config/config-models/src/main/resources/types/JobResetConnectionConfig.yaml index 462a8ab1229d..73dcd898f93c 100644 --- a/airbyte-config/config-models/src/main/resources/types/JobResetConnectionConfig.yaml +++ b/airbyte-config/config-models/src/main/resources/types/JobResetConnectionConfig.yaml @@ -6,7 +6,6 @@ description: job reset connection config type: object additionalProperties: false required: - - destinationConfiguration - configuredAirbyteCatalog - destinationDockerImage properties: @@ -19,10 +18,6 @@ properties: prefix: description: Prefix that will be prepended to the name of each stream when it is written to the destination. type: string - destinationConfiguration: - description: Integration specific blob. Must be a valid JSON string. - type: object - existingJavaType: com.fasterxml.jackson.databind.JsonNode configuredAirbyteCatalog: description: the configured airbyte catalog type: object @@ -49,9 +44,6 @@ properties: existingJavaType: io.airbyte.config.ResourceRequirements resetSourceConfiguration: "$ref": ResetSourceConfiguration.yaml - state: - description: optional current state of the connection - "$ref": State.yaml isSourceCustomConnector: description: determine if the running image of the source is a custom connector. type: boolean diff --git a/airbyte-config/config-models/src/main/resources/types/JobSyncConfig.yaml b/airbyte-config/config-models/src/main/resources/types/JobSyncConfig.yaml index 652996a9b5c0..7fe334ef5d0a 100644 --- a/airbyte-config/config-models/src/main/resources/types/JobSyncConfig.yaml +++ b/airbyte-config/config-models/src/main/resources/types/JobSyncConfig.yaml @@ -6,8 +6,6 @@ description: job sync config type: object additionalProperties: false required: - - sourceConfiguration - - destinationConfiguration - configuredAirbyteCatalog - sourceDockerImage - destinationDockerImage @@ -21,14 +19,6 @@ properties: prefix: description: Prefix that will be prepended to the name of each stream when it is written to the destination. type: string - sourceConfiguration: - description: Integration specific blob. Must be a valid JSON string. - type: object - existingJavaType: com.fasterxml.jackson.databind.JsonNode - destinationConfiguration: - description: Integration specific blob. Must be a valid JSON string. - type: object - existingJavaType: com.fasterxml.jackson.databind.JsonNode configuredAirbyteCatalog: description: the configured airbyte catalog type: object @@ -64,9 +54,6 @@ properties: description: The webhook operation configs belonging to this workspace. Must conform to WebhookOperationConfigs.yaml. type: object existingJavaType: com.fasterxml.jackson.databind.JsonNode - state: - description: optional state of the previous run. this object is defined per integration. - "$ref": State.yaml resourceRequirements: type: object description: optional resource requirements to run sync workers - this is used for containers other than the source/dest containers diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_40_28_001__AddAttemptSyncConfig.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_40_28_001__AddAttemptSyncConfig.java new file mode 100644 index 000000000000..076e879368b4 --- /dev/null +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_40_28_001__AddAttemptSyncConfig.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.jobs.migrations; + +import org.flywaydb.core.api.migration.BaseJavaMigration; +import org.flywaydb.core.api.migration.Context; +import org.jooq.DSLContext; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class V0_40_28_001__AddAttemptSyncConfig extends BaseJavaMigration { + + private static final Logger LOGGER = LoggerFactory.getLogger(V0_40_28_001__AddAttemptSyncConfig.class); + + @Override + public void migrate(final Context context) throws Exception { + LOGGER.info("Running migration: {}", this.getClass().getSimpleName()); + + // Warning: please do not use any jOOQ generated code to write a migration. + // As database schema changes, the generated jOOQ code can be deprecated. So + // old migration may not compile if there is any generated code. + try (final DSLContext ctx = DSL.using(context.getConnection())) { + addAttemptSyncConfigToAttempts(ctx); + } + } + + private static void addAttemptSyncConfigToAttempts(final DSLContext ctx) { + ctx.alterTable("attempts") + .addColumnIfNotExists(DSL.field( + "attempt_sync_config", + SQLDataType.JSONB.nullable(true))) + .execute(); + } + +} diff --git a/airbyte-db/db-lib/src/main/resources/jobs_database/Attempts.yaml b/airbyte-db/db-lib/src/main/resources/jobs_database/Attempts.yaml index 758f53c322f6..5efdc4ef1097 100644 --- a/airbyte-db/db-lib/src/main/resources/jobs_database/Attempts.yaml +++ b/airbyte-db/db-lib/src/main/resources/jobs_database/Attempts.yaml @@ -19,6 +19,8 @@ properties: type: number attempt_number: type: number + attempt_sync_config: + type: ["null", object] log_path: type: string output: diff --git a/airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt b/airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt index 100af44d0893..6e959d581f7a 100644 --- a/airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt +++ b/airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt @@ -35,6 +35,7 @@ create table "public"."attempts"( "temporal_workflow_id" varchar(256) null, "failure_summary" jsonb null, "processing_task_queue" varchar(255) null, + "attempt_sync_config" jsonb null, constraint "attempts_pkey" primary key ("id") ); diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java index e09056525a28..4b02f2c51884 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java @@ -19,9 +19,6 @@ import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncOperation; -import io.airbyte.config.State; -import io.airbyte.config.helpers.StateMessageHelper; -import io.airbyte.config.persistence.StatePersistence; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.DestinationSyncMode; @@ -39,14 +36,11 @@ public class DefaultJobCreator implements JobCreator { private final JobPersistence jobPersistence; private final ResourceRequirements workerResourceRequirements; - private final StatePersistence statePersistence; public DefaultJobCreator(final JobPersistence jobPersistence, - final ResourceRequirements workerResourceRequirements, - final StatePersistence statePersistence) { + final ResourceRequirements workerResourceRequirements) { this.jobPersistence = jobPersistence; this.workerResourceRequirements = workerResourceRequirements; - this.statePersistence = statePersistence; } @Override @@ -85,14 +79,11 @@ public Optional createSyncJob(final SourceConnection source, .withPrefix(standardSync.getPrefix()) .withSourceDockerImage(sourceDockerImageName) .withSourceProtocolVersion(sourceProtocolVersion) - .withSourceConfiguration(source.getConfiguration()) .withDestinationDockerImage(destinationDockerImageName) .withDestinationProtocolVersion(destinationProtocolVersion) - .withDestinationConfiguration(destination.getConfiguration()) .withOperationSequence(standardSyncOperations) .withWebhookOperationConfigs(webhookOperationConfigs) .withConfiguredAirbyteCatalog(standardSync.getCatalog()) - .withState(null) .withResourceRequirements(mergedOrchestratorResourceReq) .withSourceResourceRequirements(mergedSrcResourceReq) .withDestinationResourceRequirements(mergedDstResourceReq) @@ -100,8 +91,6 @@ public Optional createSyncJob(final SourceConnection source, .withIsDestinationCustomConnector(destinationDefinition.getCustom()) .withWorkspaceId(workspaceId); - getCurrentConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState); - final JobConfig jobConfig = new JobConfig() .withConfigType(ConfigType.SYNC) .withSync(jobSyncConfig); @@ -141,7 +130,6 @@ public Optional createResetConnectionJob(final DestinationConnection desti .withPrefix(standardSync.getPrefix()) .withDestinationDockerImage(destinationDockerImage) .withDestinationProtocolVersion(destinationProtocolVersion) - .withDestinationConfiguration(destination.getConfiguration()) .withOperationSequence(standardSyncOperations) .withConfiguredAirbyteCatalog(configuredAirbyteCatalog) .withResourceRequirements(ResourceRequirementsUtils.getResourceRequirements( @@ -151,16 +139,10 @@ public Optional createResetConnectionJob(final DestinationConnection desti .withIsSourceCustomConnector(false) .withIsDestinationCustomConnector(isDestinationCustomConnector); - getCurrentConnectionState(standardSync.getConnectionId()).ifPresent(resetConnectionConfig::withState); - final JobConfig jobConfig = new JobConfig() .withConfigType(ConfigType.RESET_CONNECTION) .withResetConnection(resetConnectionConfig); return jobPersistence.enqueueJob(standardSync.getConnectionId().toString(), jobConfig); } - private Optional getCurrentConnectionState(final UUID connectionId) throws IOException { - return statePersistence.getCurrentState(connectionId).map(StateMessageHelper::getState); - } - } diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java index 3dbc9398e796..b8336b0f41a1 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java @@ -31,6 +31,7 @@ import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.commons.version.Version; import io.airbyte.config.AttemptFailureSummary; +import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.FailureReason; import io.airbyte.config.JobConfig; import io.airbyte.config.JobConfig.ConfigType; @@ -156,6 +157,7 @@ private static String jobSelectAndJoin(final String jobsSubquery) { + "jobs.created_at AS job_created_at,\n" + "jobs.updated_at AS job_updated_at,\n" + "attempts.attempt_number AS attempt_number,\n" + + "attempts.attempt_sync_config AS attempt_sync_config,\n" + "attempts.log_path AS log_path,\n" + "attempts.output AS attempt_output,\n" + "attempts.status AS attempt_status,\n" @@ -490,6 +492,18 @@ private static void saveToStreamStatsTable(final OffsetDateTime now, }); } + @Override + public void writeAttemptSyncConfig(final long jobId, final int attemptNumber, final AttemptSyncConfig attemptSyncConfig) throws IOException { + final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC); + + jobDatabase.transaction( + ctx -> ctx.update(ATTEMPTS) + .set(ATTEMPTS.ATTEMPT_SYNC_CONFIG, JSONB.valueOf(Jsons.serialize(attemptSyncConfig))) + .set(ATTEMPTS.UPDATED_AT, now) + .where(ATTEMPTS.JOB_ID.eq(jobId), ATTEMPTS.ATTEMPT_NUMBER.eq(attemptNumber)) + .execute()); + } + @Override public void writeAttemptFailureSummary(final long jobId, final int attemptNumber, final AttemptFailureSummary failureSummary) throws IOException { final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC); @@ -948,6 +962,8 @@ private static Attempt getAttemptFromRecord(final Record record) { record.get(ATTEMPT_NUMBER, int.class), record.get(JOB_ID, Long.class), Path.of(record.get("log_path", String.class)), + record.get("attempt_sync_config", String.class) == null ? null + : Jsons.deserialize(record.get("attempt_sync_config", String.class), AttemptSyncConfig.class), attemptOutputString == null ? null : parseJobOutputFromString(attemptOutputString), Enums.toEnum(record.get("attempt_status", String.class), AttemptStatus.class).orElseThrow(), record.get("processing_task_queue", String.class), diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java index da7b3a98474e..98db2975077d 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java @@ -8,6 +8,7 @@ import io.airbyte.commons.version.AirbyteProtocolVersionRange; import io.airbyte.commons.version.Version; import io.airbyte.config.AttemptFailureSummary; +import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.JobConfig; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobOutput; @@ -191,6 +192,17 @@ void writeStats(long jobId, */ void writeAttemptFailureSummary(long jobId, int attemptNumber, AttemptFailureSummary failureSummary) throws IOException; + /** + * Writes the attempt-specific configuration used to build the sync input during the attempt. + * + * @param jobId job id + * @param attemptNumber attempt number + * @param attemptSyncConfig attempt-specific configuration used to build the sync input for this + * attempt + * @throws IOException exception due to interaction with persistence + */ + void writeAttemptSyncConfig(long jobId, int attemptNumber, AttemptSyncConfig attemptSyncConfig) throws IOException; + /** * @param configTypes - the type of config, e.g. sync * @param connectionId - ID of the connection for which the job count should be retrieved diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java index 5b585e42e39f..e0984bee6a77 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java @@ -5,6 +5,7 @@ package io.airbyte.persistence.job.models; import io.airbyte.config.AttemptFailureSummary; +import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.JobOutput; import java.nio.file.Path; import java.util.Objects; @@ -19,6 +20,7 @@ public class Attempt { private final AttemptStatus status; private final String processingTaskQueue; private final AttemptFailureSummary failureSummary; + private final AttemptSyncConfig syncConfig; private final Path logPath; private final long updatedAtInSecond; private final long createdAtInSecond; @@ -27,6 +29,7 @@ public class Attempt { public Attempt(final int attemptNumber, final long jobId, final Path logPath, + final @Nullable AttemptSyncConfig syncConfig, final @Nullable JobOutput output, final AttemptStatus status, final String processingTaskQueue, @@ -36,6 +39,7 @@ public Attempt(final int attemptNumber, final @Nullable Long endedAtInSecond) { this.attemptNumber = attemptNumber; this.jobId = jobId; + this.syncConfig = syncConfig; this.output = output; this.status = status; this.processingTaskQueue = processingTaskQueue; @@ -54,6 +58,10 @@ public long getJobId() { return jobId; } + public Optional getSyncConfig() { + return Optional.ofNullable(syncConfig); + } + public Optional getOutput() { return Optional.ofNullable(output); } @@ -103,6 +111,7 @@ public boolean equals(final Object o) { jobId == attempt.jobId && updatedAtInSecond == attempt.updatedAtInSecond && createdAtInSecond == attempt.createdAtInSecond && + Objects.equals(syncConfig, attempt.syncConfig) && Objects.equals(output, attempt.output) && status == attempt.status && Objects.equals(failureSummary, attempt.failureSummary) && @@ -112,7 +121,8 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(attemptNumber, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond); + return Objects.hash(attemptNumber, jobId, syncConfig, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, + endedAtInSecond); } @Override @@ -120,6 +130,7 @@ public String toString() { return "Attempt{" + "id=" + attemptNumber + ", jobId=" + jobId + + ", syncConfig=" + syncConfig + ", output=" + output + ", status=" + status + ", failureSummary=" + failureSummary + diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Job.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Job.java index ada40c6ed08f..5911f28dcdb4 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Job.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Job.java @@ -132,6 +132,13 @@ public Optional getLastAttempt() { .max(Comparator.comparing(Attempt::getCreatedAtInSecond)); } + public Optional getAttemptByNumber(final int attemptNumber) { + return getAttempts() + .stream() + .filter(a -> a.getAttemptNumber() == attemptNumber) + .findFirst(); + } + public boolean hasRunningAttempt() { return getAttempts().stream().anyMatch(a -> !Attempt.isAttemptInTerminalState(a)); } diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/tracker/JobTracker.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/tracker/JobTracker.java index 86c0b154891c..25dd7d570acb 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/tracker/JobTracker.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/tracker/JobTracker.java @@ -11,10 +11,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import edu.umd.cs.findbugs.annotations.Nullable; import io.airbyte.analytics.TrackingClient; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.map.MoreMaps; +import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.JobConfig; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.StandardCheckConnectionOutput; @@ -27,6 +29,7 @@ import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.persistence.job.JobPersistence; import io.airbyte.persistence.job.WorkspaceHelper; +import io.airbyte.persistence.job.models.Attempt; import io.airbyte.persistence.job.models.Job; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; @@ -39,6 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.UUID; public class JobTracker { @@ -124,6 +128,9 @@ public void trackSync(final Job job, final JobState jobState) { final boolean allowedJob = configType == ConfigType.SYNC || configType == ConfigType.RESET_CONNECTION; Preconditions.checkArgument(allowedJob, "Job type " + configType + " is not allowed!"); final long jobId = job.getId(); + final Optional lastAttempt = job.getLastAttempt(); + final Optional attemptSyncConfig = lastAttempt.flatMap(Attempt::getSyncConfig); + final UUID connectionId = UUID.fromString(job.getScope()); final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromConnection(connectionId); final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromConnection(connectionId); @@ -136,6 +143,7 @@ public void trackSync(final Job job, final JobState jobState) { final Map stateMetadata = generateStateMetadata(jobState); final Map syncConfigMetadata = generateSyncConfigMetadata( job.getConfig(), + attemptSyncConfig.orElse(null), sourceDefinition.getSpec().getConnectionSpecification(), destinationDefinition.getSpec().getConnectionSpecification()); @@ -184,18 +192,27 @@ public void trackSyncForInternalFailure(final Long jobId, }); } - private Map generateSyncConfigMetadata(final JobConfig config, + private Map generateSyncConfigMetadata( + final JobConfig config, + @Nullable final AttemptSyncConfig attemptSyncConfig, final JsonNode sourceConfigSchema, final JsonNode destinationConfigSchema) { if (config.getConfigType() == ConfigType.SYNC) { - final JsonNode sourceConfiguration = config.getSync().getSourceConfiguration(); - final JsonNode destinationConfiguration = config.getSync().getDestinationConfiguration(); + final Map actorConfigMetadata = new HashMap<>(); - final Map sourceMetadata = configToMetadata(CONFIG + ".source", sourceConfiguration, sourceConfigSchema); - final Map destinationMetadata = configToMetadata(CONFIG + ".destination", destinationConfiguration, destinationConfigSchema); - final Map catalogMetadata = getCatalogMetadata(config.getSync().getConfiguredAirbyteCatalog()); + if (attemptSyncConfig != null) { + final JsonNode sourceConfiguration = attemptSyncConfig.getSourceConfiguration(); + final JsonNode destinationConfiguration = attemptSyncConfig.getDestinationConfiguration(); + + final Map sourceMetadata = configToMetadata(CONFIG + ".source", sourceConfiguration, sourceConfigSchema); + final Map destinationMetadata = configToMetadata(CONFIG + ".destination", destinationConfiguration, destinationConfigSchema); - return MoreMaps.merge(sourceMetadata, destinationMetadata, catalogMetadata); + actorConfigMetadata.putAll(sourceMetadata); + actorConfigMetadata.putAll(destinationMetadata); + } + + final Map catalogMetadata = getCatalogMetadata(config.getSync().getConfiguredAirbyteCatalog()); + return MoreMaps.merge(actorConfigMetadata, catalogMetadata); } else { return emptyMap(); } diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java index 7311ade09c97..3fdbf3e6d4dd 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java @@ -34,9 +34,6 @@ import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.StandardSyncOperation.OperatorType; -import io.airbyte.config.State; -import io.airbyte.config.helpers.StateMessageHelper; -import io.airbyte.config.persistence.StatePersistence; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; @@ -47,7 +44,6 @@ import io.airbyte.protocol.models.SyncMode; import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; @@ -78,7 +74,6 @@ class DefaultJobCreatorTest { private static final UUID WORKSPACE_ID = UUID.randomUUID(); private JobPersistence jobPersistence; - private StatePersistence statePersistence; private JobCreator jobCreator; private ResourceRequirements workerResourceRequirements; @@ -163,13 +158,12 @@ class DefaultJobCreatorTest { @BeforeEach void setup() { jobPersistence = mock(JobPersistence.class); - statePersistence = mock(StatePersistence.class); workerResourceRequirements = new ResourceRequirements() .withCpuLimit("0.2") .withCpuRequest("0.2") .withMemoryLimit("200Mi") .withMemoryRequest("200Mi"); - jobCreator = new DefaultJobCreator(jobPersistence, workerResourceRequirements, statePersistence); + jobCreator = new DefaultJobCreator(jobPersistence, workerResourceRequirements); } @Test @@ -178,10 +172,8 @@ void testCreateSyncJob() throws IOException { .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) .withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) .withPrefix(STANDARD_SYNC.getPrefix()) - .withSourceConfiguration(SOURCE_CONNECTION.getConfiguration()) .withSourceDockerImage(SOURCE_IMAGE_NAME) .withSourceProtocolVersion(SOURCE_PROTOCOL_VERSION) - .withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration()) .withDestinationDockerImage(DESTINATION_IMAGE_NAME) .withDestinationProtocolVersion(DESTINATION_PROTOCOL_VERSION) .withConfiguredAirbyteCatalog(STANDARD_SYNC.getCatalog()) @@ -222,10 +214,8 @@ void testCreateSyncJobEnsureNoQueuing() throws IOException { .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) .withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) .withPrefix(STANDARD_SYNC.getPrefix()) - .withSourceConfiguration(SOURCE_CONNECTION.getConfiguration()) .withSourceDockerImage(SOURCE_IMAGE_NAME) .withDestinationProtocolVersion(SOURCE_PROTOCOL_VERSION) - .withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration()) .withDestinationDockerImage(DESTINATION_IMAGE_NAME) .withDestinationProtocolVersion(DESTINATION_PROTOCOL_VERSION) .withConfiguredAirbyteCatalog(STANDARD_SYNC.getCatalog()) @@ -270,10 +260,8 @@ void testCreateSyncJobDefaultWorkerResourceReqs() throws IOException { .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) .withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) .withPrefix(STANDARD_SYNC.getPrefix()) - .withSourceConfiguration(SOURCE_CONNECTION.getConfiguration()) .withSourceDockerImage(SOURCE_IMAGE_NAME) .withSourceProtocolVersion(SOURCE_PROTOCOL_VERSION) - .withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration()) .withDestinationDockerImage(DESTINATION_IMAGE_NAME) .withDestinationProtocolVersion(DESTINATION_PROTOCOL_VERSION) .withConfiguredAirbyteCatalog(STANDARD_SYNC.getCatalog()) @@ -319,10 +307,8 @@ void testCreateSyncJobConnectionResourceReqs() throws IOException { .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) .withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) .withPrefix(STANDARD_SYNC.getPrefix()) - .withSourceConfiguration(SOURCE_CONNECTION.getConfiguration()) .withSourceDockerImage(SOURCE_IMAGE_NAME) .withSourceProtocolVersion(SOURCE_PROTOCOL_VERSION) - .withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration()) .withDestinationDockerImage(DESTINATION_IMAGE_NAME) .withDestinationProtocolVersion(DESTINATION_PROTOCOL_VERSION) .withConfiguredAirbyteCatalog(STANDARD_SYNC.getCatalog()) @@ -375,10 +361,8 @@ void testCreateSyncJobSourceAndDestinationResourceReqs() throws IOException { .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) .withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) .withPrefix(STANDARD_SYNC.getPrefix()) - .withSourceConfiguration(SOURCE_CONNECTION.getConfiguration()) .withSourceDockerImage(SOURCE_IMAGE_NAME) .withSourceProtocolVersion(SOURCE_PROTOCOL_VERSION) - .withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration()) .withDestinationDockerImage(DESTINATION_IMAGE_NAME) .withDestinationProtocolVersion(DESTINATION_PROTOCOL_VERSION) .withConfiguredAirbyteCatalog(STANDARD_SYNC.getCatalog()) @@ -417,22 +401,16 @@ void testCreateResetConnectionJob() throws IOException { .withSyncMode(SyncMode.FULL_REFRESH) .withDestinationSyncMode(DestinationSyncMode.APPEND))); - final State connectionState = new State().withState(Jsons.jsonNode(Map.of("key", "val"))); - when(statePersistence.getCurrentState(STANDARD_SYNC.getConnectionId())) - .thenReturn(StateMessageHelper.getTypedState(connectionState.getState(), false)); - final JobResetConnectionConfig jobResetConnectionConfig = new JobResetConnectionConfig() .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) .withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) .withPrefix(STANDARD_SYNC.getPrefix()) - .withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration()) .withDestinationDockerImage(DESTINATION_IMAGE_NAME) .withDestinationProtocolVersion(DESTINATION_PROTOCOL_VERSION) .withConfiguredAirbyteCatalog(expectedCatalog) .withOperationSequence(List.of(STANDARD_SYNC_OPERATION)) .withResourceRequirements(workerResourceRequirements) .withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(streamsToReset)) - .withState(connectionState) .withIsSourceCustomConnector(false) .withIsDestinationCustomConnector(false); @@ -475,22 +453,16 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException { .withSyncMode(SyncMode.FULL_REFRESH) .withDestinationSyncMode(DestinationSyncMode.APPEND))); - final State connectionState = new State().withState(Jsons.jsonNode(Map.of("key", "val"))); - when(statePersistence.getCurrentState(STANDARD_SYNC.getConnectionId())) - .thenReturn(StateMessageHelper.getTypedState(connectionState.getState(), false)); - final JobResetConnectionConfig jobResetConnectionConfig = new JobResetConnectionConfig() .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) .withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) .withPrefix(STANDARD_SYNC.getPrefix()) - .withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration()) .withDestinationDockerImage(DESTINATION_IMAGE_NAME) .withDestinationProtocolVersion(DESTINATION_PROTOCOL_VERSION) .withConfiguredAirbyteCatalog(expectedCatalog) .withOperationSequence(List.of(STANDARD_SYNC_OPERATION)) .withResourceRequirements(workerResourceRequirements) .withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(streamsToReset)) - .withState(connectionState) .withIsSourceCustomConnector(false) .withIsDestinationCustomConnector(false); diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java index 0652e2a8251e..fe02440aea08 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import io.airbyte.commons.json.Jsons; @@ -30,6 +31,7 @@ import io.airbyte.commons.version.AirbyteProtocolVersionRange; import io.airbyte.commons.version.Version; import io.airbyte.config.AttemptFailureSummary; +import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.FailureReason; import io.airbyte.config.FailureReason.FailureOrigin; import io.airbyte.config.FailureReason.FailureType; @@ -41,6 +43,7 @@ import io.airbyte.config.NormalizationSummary; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary; +import io.airbyte.config.State; import io.airbyte.config.StreamSyncStats; import io.airbyte.config.SyncStats; import io.airbyte.db.Database; @@ -150,6 +153,7 @@ private static Attempt createAttempt(final int id, final long jobId, final Attem jobId, logPath, null, + null, status, null, null, @@ -164,6 +168,7 @@ private static Attempt createUnfinishedAttempt(final int id, final long jobId, f jobId, logPath, null, + null, status, null, null, @@ -312,6 +317,25 @@ void testWriteOutput() throws IOException { assertEquals(List.of(failureReason1, failureReason2), storedNormalizationSummary.getFailures()); } + @Test + @DisplayName("Should be able to read AttemptSyncConfig that was written") + void testWriteAttemptSyncConfig() throws IOException { + final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); + final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH); + final Job created = jobPersistence.getJob(jobId); + final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig() + .withSourceConfiguration(Jsons.jsonNode(Map.of("source", "s_config_value"))) + .withDestinationConfiguration(Jsons.jsonNode(Map.of("destination", "d_config_value"))) + .withState(new State().withState(Jsons.jsonNode(ImmutableMap.of("state_key", "state_value")))); + + when(timeSupplier.get()).thenReturn(Instant.ofEpochMilli(4242)); + jobPersistence.writeAttemptSyncConfig(jobId, attemptNumber, attemptSyncConfig); + + final Job updated = jobPersistence.getJob(jobId); + assertEquals(Optional.of(attemptSyncConfig), updated.getAttempts().get(0).getSyncConfig()); + assertNotEquals(created.getAttempts().get(0).getUpdatedAtInSecond(), updated.getAttempts().get(0).getUpdatedAtInSecond()); + } + @Test @DisplayName("Should be able to read attemptFailureSummary that was written") void testWriteAttemptFailureSummary() throws IOException { diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java index f8660bd4cb57..92ea7c68dad6 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java @@ -19,7 +19,7 @@ void testIsAttemptInTerminalState() { } private static Attempt attemptWithStatus(final AttemptStatus attemptStatus) { - return new Attempt(1, 1L, null, null, attemptStatus, null, null, 0L, 0L, null); + return new Attempt(1, 1L, null, null, null, attemptStatus, null, null, 0L, 0L, null); } } diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java index 9dc147ae99b5..335a7a30cfd6 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java @@ -43,7 +43,7 @@ void testHasRunningAttempt() { private static Job jobWithAttemptWithStatus(final AttemptStatus... attemptStatuses) { final List attempts = IntStream.range(0, attemptStatuses.length) - .mapToObj(idx -> new Attempt(idx + 1, 1L, null, null, attemptStatuses[idx], null, null, idx, 0L, null)) + .mapToObj(idx -> new Attempt(idx + 1, 1L, null, null, null, attemptStatuses[idx], null, null, idx, 0L, null)) .collect(Collectors.toList()); return new Job(1L, null, null, null, attempts, null, 0L, 0L, 0L); } @@ -78,6 +78,13 @@ void testGetLastAttempt() { assertEquals(3, job.getLastAttempt().get().getAttemptNumber()); } + @Test + void testGetAttemptByNumber() { + final Job job = jobWithAttemptWithStatus(AttemptStatus.FAILED, AttemptStatus.FAILED, AttemptStatus.SUCCEEDED); + assertTrue(job.getAttemptByNumber(2).isPresent()); + assertEquals(2, job.getAttemptByNumber(2).get().getAttemptNumber()); + } + @Test void testValidateStatusTransitionFromPending() { final Job pendingJob = jobWithStatus(JobStatus.PENDING); diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/JobTrackerTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/JobTrackerTest.java index b5e3361b2cec..086a42eb87d1 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/JobTrackerTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/JobTrackerTest.java @@ -20,6 +20,7 @@ import io.airbyte.commons.map.MoreMaps; import io.airbyte.commons.resources.MoreResources; import io.airbyte.config.AttemptFailureSummary; +import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.FailureReason; import io.airbyte.config.JobConfig; import io.airbyte.config.JobConfig.ConfigType; @@ -550,10 +551,12 @@ private Job getJobMock(final ConfigType configType, final long jobId) throws Con .withDestinationSyncMode(DestinationSyncMode.APPEND))); final JobSyncConfig jobSyncConfig = new JobSyncConfig() - .withSourceConfiguration(Jsons.jsonNode(ImmutableMap.of("key", "some_value"))) - .withDestinationConfiguration(Jsons.jsonNode(ImmutableMap.of("key", false))) .withConfiguredAirbyteCatalog(catalog); + final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig() + .withSourceConfiguration(Jsons.jsonNode(ImmutableMap.of("key", "some_value"))) + .withDestinationConfiguration(Jsons.jsonNode(ImmutableMap.of("key", false))); + final JobConfig jobConfig = mock(JobConfig.class); when(jobConfig.getConfigType()).thenReturn(configType); @@ -561,11 +564,15 @@ private Job getJobMock(final ConfigType configType, final long jobId) throws Con when(jobConfig.getSync()).thenReturn(jobSyncConfig); } + final Attempt attempt = mock(Attempt.class); + when(attempt.getSyncConfig()).thenReturn(Optional.of(attemptSyncConfig)); + final Job job = mock(Job.class); when(job.getId()).thenReturn(jobId); when(job.getConfig()).thenReturn(jobConfig); when(job.getConfigType()).thenReturn(configType); when(job.getScope()).thenReturn(CONNECTION_ID.toString()); + when(job.getLastAttempt()).thenReturn(Optional.of(attempt)); when(job.getAttemptsCount()).thenReturn(700); return job; } diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/TrackingMetadataTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/TrackingMetadataTest.java index d8ffb69eaac1..9dd50747ba51 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/TrackingMetadataTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/TrackingMetadataTest.java @@ -8,6 +8,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.JobOutput; import io.airbyte.config.ResourceRequirements; import io.airbyte.config.StandardSync; @@ -60,8 +61,9 @@ void testgenerateJobAttemptMetadataWithNulls() { .withMeanSecondsBeforeSourceStateMessageEmitted(2L).withMaxSecondsBetweenStateMessageEmittedandCommitted(null); final StandardSyncSummary standardSyncSummary = new StandardSyncSummary().withTotalStats(syncStats); final StandardSyncOutput standardSyncOutput = new StandardSyncOutput().withStandardSyncSummary(standardSyncSummary); + final AttemptSyncConfig attemptSyncConfig = mock(AttemptSyncConfig.class); final JobOutput jobOutput = new JobOutput().withSync(standardSyncOutput); - final Attempt attempt = new Attempt(0, 10L, Path.of("test"), jobOutput, AttemptStatus.SUCCEEDED, null, null, 100L, 100L, 99L); + final Attempt attempt = new Attempt(0, 10L, Path.of("test"), attemptSyncConfig, jobOutput, AttemptStatus.SUCCEEDED, null, null, 100L, 100L, 99L); final Job job = mock(Job.class); when(job.getAttempts()).thenReturn(List.of(attempt)); diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/AttemptApiController.java b/airbyte-server/src/main/java/io/airbyte/server/apis/AttemptApiController.java index 64e8c4730d8f..acc500ec9a54 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/AttemptApiController.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/AttemptApiController.java @@ -8,6 +8,7 @@ import io.airbyte.api.generated.AttemptApi; import io.airbyte.api.model.generated.InternalOperationResult; +import io.airbyte.api.model.generated.SaveAttemptSyncConfigRequestBody; import io.airbyte.api.model.generated.SaveStatsRequestBody; import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody; import io.airbyte.commons.server.handlers.AttemptHandler; @@ -46,4 +47,12 @@ public InternalOperationResult setWorkflowInAttempt(@Body final SetWorkflowInAtt return ApiHelper.execute(() -> attemptHandler.setWorkflowInAttempt(requestBody)); } + @Override + @Post(uri = "/save_sync_config", + processes = MediaType.APPLICATION_JSON) + @Secured({ADMIN}) + public InternalOperationResult saveSyncConfig(@Body final SaveAttemptSyncConfigRequestBody requestBody) { + return ApiHelper.execute(() -> attemptHandler.saveSyncConfig(requestBody)); + } + } diff --git a/airbyte-workers/build.gradle b/airbyte-workers/build.gradle index 1d104a4c4b4e..4f8416a939ea 100644 --- a/airbyte-workers/build.gradle +++ b/airbyte-workers/build.gradle @@ -58,6 +58,7 @@ dependencies { implementation project(':airbyte-commons-protocol') implementation project(':airbyte-commons-temporal') implementation project(':airbyte-commons-worker') + implementation project(':airbyte-commons-server') implementation project(':airbyte-config:config-models') implementation project(':airbyte-config:config-persistence') implementation project(':airbyte-config:init') diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ApplicationBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ApplicationBeanFactory.java index 49edebdf7266..63fdd1bb3b96 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ApplicationBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ApplicationBeanFactory.java @@ -14,7 +14,6 @@ import io.airbyte.config.Configs.SecretPersistenceType; import io.airbyte.config.Configs.TrackingStrategy; import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.config.persistence.StatePersistence; import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor; import io.airbyte.metrics.lib.MetricClient; import io.airbyte.metrics.lib.MetricClientFactory; @@ -81,12 +80,10 @@ public Supplier currentSecondsSupplier() { @Singleton public DefaultJobCreator defaultJobCreator(final JobPersistence jobPersistence, - @Named("defaultWorkerConfigs") final WorkerConfigs defaultWorkerConfigs, - final StatePersistence statePersistence) { + @Named("defaultWorkerConfigs") final WorkerConfigs defaultWorkerConfigs) { return new DefaultJobCreator( jobPersistence, - defaultWorkerConfigs.getResourceRequirements(), - statePersistence); + defaultWorkerConfigs.getResourceRequirements()); } @Singleton diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/run/TemporalWorkerRunFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/run/TemporalWorkerRunFactory.java index 739ba9d8efc0..8aaa51bb68b4 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/run/TemporalWorkerRunFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/run/TemporalWorkerRunFactory.java @@ -6,16 +6,17 @@ import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.functional.CheckedSupplier; -import io.airbyte.commons.json.Jsons; import io.airbyte.commons.temporal.TemporalClient; import io.airbyte.commons.temporal.TemporalJobType; import io.airbyte.commons.temporal.TemporalResponse; +import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobOutput; import io.airbyte.config.JobResetConnectionConfig; import io.airbyte.config.JobSyncConfig; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary.ReplicationStatus; +import io.airbyte.persistence.job.models.Attempt; import io.airbyte.persistence.job.models.Job; import io.airbyte.workers.JobStatus; import io.airbyte.workers.OutputAndStatus; @@ -44,14 +45,18 @@ public WorkerRun create(final Job job) { public CheckedSupplier, Exception> createSupplier(final Job job, final int attemptId) { final TemporalJobType temporalJobType = toTemporalJobType(job.getConfigType()); final UUID connectionId = UUID.fromString(job.getScope()); + return switch (job.getConfigType()) { case SYNC -> () -> { + final AttemptSyncConfig attemptConfig = getAttemptSyncConfig(job, attemptId); final TemporalResponse output = temporalClient.submitSync(job.getId(), - attemptId, job.getConfig().getSync(), connectionId); + attemptId, job.getConfig().getSync(), attemptConfig, connectionId); return toOutputAndStatus(output); }; case RESET_CONNECTION -> () -> { final JobResetConnectionConfig resetConnection = job.getConfig().getResetConnection(); + final AttemptSyncConfig attemptConfig = getAttemptSyncConfig(job, attemptId); + final JobSyncConfig config = new JobSyncConfig() .withNamespaceDefinition(resetConnection.getNamespaceDefinition()) .withNamespaceFormat(resetConnection.getNamespaceFormat()) @@ -59,8 +64,6 @@ public CheckedSupplier, Exception> createSupplier(fin .withSourceDockerImage(WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB) .withDestinationDockerImage(resetConnection.getDestinationDockerImage()) .withDestinationProtocolVersion(resetConnection.getDestinationProtocolVersion()) - .withSourceConfiguration(Jsons.emptyObject()) - .withDestinationConfiguration(resetConnection.getDestinationConfiguration()) .withConfiguredAirbyteCatalog(resetConnection.getConfiguredAirbyteCatalog()) .withOperationSequence(resetConnection.getOperationSequence()) .withResourceRequirements(resetConnection.getResourceRequirements()) @@ -69,13 +72,18 @@ public CheckedSupplier, Exception> createSupplier(fin .withIsSourceCustomConnector(false) .withIsDestinationCustomConnector(resetConnection.getIsDestinationCustomConnector()); - final TemporalResponse output = temporalClient.submitSync(job.getId(), attemptId, config, connectionId); + final TemporalResponse output = temporalClient.submitSync(job.getId(), attemptId, config, attemptConfig, connectionId); return toOutputAndStatus(output); }; default -> throw new IllegalArgumentException("Does not support job type: " + temporalJobType); }; } + private static AttemptSyncConfig getAttemptSyncConfig(final Job job, final int attemptId) { + return job.getAttemptByNumber(attemptId).flatMap(Attempt::getSyncConfig).orElseThrow( + () -> new IllegalStateException(String.format("AttemptSyncConfig for job %s attemptId %s not found", job.getId(), attemptId))); + } + private static TemporalJobType toTemporalJobType(final ConfigType jobType) { return switch (jobType) { case GET_SPEC -> TemporalJobType.GET_SPEC; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index 2f03d0735783..30dce2a4b57d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -9,18 +9,33 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import datadog.trace.api.Trace; +import io.airbyte.api.client.AirbyteApiClient; +import io.airbyte.api.client.generated.AttemptApi; +import io.airbyte.api.client.generated.StateApi; +import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; +import io.airbyte.api.client.model.generated.ConnectionState; +import io.airbyte.api.client.model.generated.ConnectionStateType; +import io.airbyte.api.client.model.generated.SaveAttemptSyncConfigRequestBody; +import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.server.converters.ApiPojoConverters; import io.airbyte.commons.temporal.TemporalWorkflowUtils; import io.airbyte.commons.temporal.config.WorkerMode; import io.airbyte.commons.temporal.exception.RetryableException; +import io.airbyte.config.AttemptSyncConfig; +import io.airbyte.config.DestinationConnection; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobResetConnectionConfig; import io.airbyte.config.JobSyncConfig; import io.airbyte.config.ResetSourceConfiguration; +import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncInput; +import io.airbyte.config.State; +import io.airbyte.config.StateWrapper; +import io.airbyte.config.helpers.StateMessageHelper; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.persistence.job.JobPersistence; @@ -28,11 +43,13 @@ import io.airbyte.persistence.job.models.Job; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.WorkerConstants; +import io.airbyte.workers.helper.StateConverter; import io.airbyte.workers.utils.ConfigReplacer; import io.micronaut.context.annotation.Requires; import jakarta.inject.Singleton; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,12 +60,43 @@ public class GenerateInputActivityImpl implements GenerateInputActivity { private final JobPersistence jobPersistence; private final ConfigRepository configRepository; + private final AttemptApi attemptApi; + private final StateApi stateApi; + private final FeatureFlags featureFlags; + private static final Logger LOGGER = LoggerFactory.getLogger(GenerateInputActivity.class); public GenerateInputActivityImpl(final JobPersistence jobPersistence, - final ConfigRepository configRepository) { + final ConfigRepository configRepository, + final StateApi stateApi, + final AttemptApi attemptApi, + final FeatureFlags featureFlags) { this.jobPersistence = jobPersistence; this.configRepository = configRepository; + this.stateApi = stateApi; + this.attemptApi = attemptApi; + this.featureFlags = featureFlags; + } + + private Optional getCurrentConnectionState(final UUID connectionId) { + final ConnectionState state = AirbyteApiClient.retryWithJitter( + () -> stateApi.getState(new ConnectionIdRequestBody().connectionId(connectionId)), + "get state"); + + if (state.getStateType() == ConnectionStateType.NOT_SET) + return Optional.empty(); + + final StateWrapper internalState = StateConverter.clientToInternal(state); + return Optional.of(StateMessageHelper.getState(internalState)); + } + + private void saveAttemptSyncConfig(final long jobId, final int attemptNumber, final UUID connectionId, final AttemptSyncConfig attemptSyncConfig) { + AirbyteApiClient.retryWithJitter( + () -> attemptApi.saveSyncConfig(new SaveAttemptSyncConfigRequestBody() + .jobId(jobId) + .attemptNumber(attemptNumber) + .syncConfig(ApiPojoConverters.attemptSyncConfigToClient(attemptSyncConfig, connectionId, featureFlags.useStreamCapableState()))), + "set attempt sync config"); } @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) @@ -64,11 +112,26 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { final Job job = jobPersistence.getJob(jobId); final ConfigType jobConfigType = job.getConfig().getConfigType(); + + final UUID connectionId = UUID.fromString(job.getScope()); + final StandardSync standardSync = configRepository.getStandardSync(connectionId); + + final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig(); + getCurrentConnectionState(connectionId).ifPresent(attemptSyncConfig::setState); + if (ConfigType.SYNC.equals(jobConfigType)) { config = job.getConfig().getSync(); + final SourceConnection source = configRepository.getSourceConnection(standardSync.getSourceId()); + attemptSyncConfig.setSourceConfiguration(source.getConfiguration()); } else if (ConfigType.RESET_CONNECTION.equals(jobConfigType)) { final JobResetConnectionConfig resetConnection = job.getConfig().getResetConnection(); final ResetSourceConfiguration resetSourceConfiguration = resetConnection.getResetSourceConfiguration(); + + // null check for backwards compatibility with reset jobs that did not have a + // resetSourceConfiguration + attemptSyncConfig + .setSourceConfiguration(resetSourceConfiguration == null ? Jsons.emptyObject() : Jsons.jsonNode(resetSourceConfiguration)); + config = new JobSyncConfig() .withNamespaceDefinition(resetConnection.getNamespaceDefinition()) .withNamespaceFormat(resetConnection.getNamespaceFormat()) @@ -76,14 +139,9 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withSourceDockerImage(WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB) .withDestinationDockerImage(resetConnection.getDestinationDockerImage()) .withDestinationProtocolVersion(resetConnection.getDestinationProtocolVersion()) - // null check for backwards compatibility with reset jobs that did not have a - // resetSourceConfiguration - .withSourceConfiguration(resetSourceConfiguration == null ? Jsons.emptyObject() : Jsons.jsonNode(resetSourceConfiguration)) - .withDestinationConfiguration(resetConnection.getDestinationConfiguration()) .withConfiguredAirbyteCatalog(resetConnection.getConfiguredAirbyteCatalog()) .withOperationSequence(resetConnection.getOperationSequence()) .withResourceRequirements(resetConnection.getResourceRequirements()) - .withState(resetConnection.getState()) .withIsSourceCustomConnector(resetConnection.getIsSourceCustomConnector()) .withIsDestinationCustomConnector(resetConnection.getIsDestinationCustomConnector()) .withWorkspaceId(resetConnection.getWorkspaceId()); @@ -97,14 +155,14 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { final JobRunConfig jobRunConfig = TemporalWorkflowUtils.createJobRunConfig(jobId, attempt); - final UUID connectionId = UUID.fromString(job.getScope()); - final StandardSync standardSync = configRepository.getStandardSync(connectionId); + final DestinationConnection destination = configRepository.getDestinationConnection(standardSync.getDestinationId()); + attemptSyncConfig.setDestinationConfiguration(destination.getConfiguration()); final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromSource(standardSync.getSourceId()); final StandardDestinationDefinition destinationDefinition = - configRepository.getDestinationDefinitionFromDestination(standardSync.getDestinationId()); + configRepository.getStandardDestinationDefinition(destination.getDestinationDefinitionId()); final String destinationNormalizationDockerImage = destinationDefinition.getNormalizationConfig() != null ? destinationDefinition.getNormalizationConfig().getNormalizationRepository() + ":" + destinationDefinition.getNormalizationConfig().getNormalizationTag() @@ -120,7 +178,7 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withProtocolVersion(config.getSourceProtocolVersion()) .withIsCustomConnector(config.getIsSourceCustomConnector()) .withAllowedHosts(ConfigType.RESET_CONNECTION.equals(jobConfigType) ? null - : configReplacer.getAllowedHosts(sourceDefinition.getAllowedHosts(), config.getSourceConfiguration())); + : configReplacer.getAllowedHosts(sourceDefinition.getAllowedHosts(), attemptSyncConfig.getSourceConfiguration())); final IntegrationLauncherConfig destinationLauncherConfig = new IntegrationLauncherConfig() .withJobId(String.valueOf(jobId)) @@ -131,7 +189,7 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withNormalizationDockerImage(destinationNormalizationDockerImage) .withSupportsDbt(destinationDefinition.getSupportsDbt()) .withNormalizationIntegrationType(normalizationIntegrationType) - .withAllowedHosts(configReplacer.getAllowedHosts(destinationDefinition.getAllowedHosts(), config.getDestinationConfiguration())); + .withAllowedHosts(configReplacer.getAllowedHosts(destinationDefinition.getAllowedHosts(), attemptSyncConfig.getDestinationConfiguration())); final StandardSyncInput syncInput = new StandardSyncInput() .withNamespaceDefinition(config.getNamespaceDefinition()) @@ -139,18 +197,20 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withPrefix(config.getPrefix()) .withSourceId(standardSync.getSourceId()) .withDestinationId(standardSync.getDestinationId()) - .withSourceConfiguration(config.getSourceConfiguration()) - .withDestinationConfiguration(config.getDestinationConfiguration()) + .withSourceConfiguration(attemptSyncConfig.getSourceConfiguration()) + .withDestinationConfiguration(attemptSyncConfig.getDestinationConfiguration()) .withOperationSequence(config.getOperationSequence()) .withWebhookOperationConfigs(config.getWebhookOperationConfigs()) .withCatalog(config.getConfiguredAirbyteCatalog()) - .withState(config.getState()) + .withState(attemptSyncConfig.getState()) .withResourceRequirements(config.getResourceRequirements()) .withSourceResourceRequirements(config.getSourceResourceRequirements()) .withDestinationResourceRequirements(config.getDestinationResourceRequirements()) .withConnectionId(standardSync.getConnectionId()) .withWorkspaceId(config.getWorkspaceId()); + saveAttemptSyncConfig(jobId, attempt, connectionId, attemptSyncConfig); + return new GeneratedJobInput(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput); } catch (final Exception e) { diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/run/TemporalWorkerRunFactoryTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/run/TemporalWorkerRunFactoryTest.java index 14d1f99749f0..6dbc107926a7 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/run/TemporalWorkerRunFactoryTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/run/TemporalWorkerRunFactoryTest.java @@ -11,16 +11,16 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.common.collect.ImmutableMap; import io.airbyte.commons.features.FeatureFlags; -import io.airbyte.commons.json.Jsons; import io.airbyte.commons.temporal.TemporalClient; import io.airbyte.commons.temporal.TemporalResponse; +import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobResetConnectionConfig; import io.airbyte.config.JobSyncConfig; import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.StandardSyncOutput; +import io.airbyte.persistence.job.models.Attempt; import io.airbyte.persistence.job.models.Job; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.workers.WorkerConstants; @@ -28,6 +28,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; +import java.util.Optional; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -43,6 +44,7 @@ class TemporalWorkerRunFactoryTest { private TemporalClient temporalClient; private TemporalWorkerRunFactory workerRunFactory; private Job job; + private AttemptSyncConfig attemptSyncConfig; @BeforeEach void setup() throws IOException { @@ -55,7 +57,11 @@ void setup() throws IOException { "unknown airbyte version", mock(FeatureFlags.class)); job = mock(Job.class, RETURNS_DEEP_STUBS); + final Attempt attempt = mock(Attempt.class, RETURNS_DEEP_STUBS); + attemptSyncConfig = mock(AttemptSyncConfig.class); + when(attempt.getSyncConfig()).thenReturn(Optional.of(attemptSyncConfig)); when(job.getId()).thenReturn(JOB_ID); + when(job.getAttemptByNumber(ATTEMPT_ID)).thenReturn(Optional.of(attempt)); when(job.getAttemptsCount()).thenReturn(ATTEMPT_ID); when(job.getScope()).thenReturn(CONNECTION_ID.toString()); } @@ -65,11 +71,11 @@ void setup() throws IOException { void testSync() throws Exception { when(job.getConfigType()).thenReturn(ConfigType.SYNC); final TemporalResponse mockResponse = mock(TemporalResponse.class); - when(temporalClient.submitSync(JOB_ID, ATTEMPT_ID, job.getConfig().getSync(), + when(temporalClient.submitSync(JOB_ID, ATTEMPT_ID, job.getConfig().getSync(), attemptSyncConfig, CONNECTION_ID)).thenReturn(mockResponse); final WorkerRun workerRun = workerRunFactory.create(job); workerRun.call(); - verify(temporalClient).submitSync(JOB_ID, ATTEMPT_ID, job.getConfig().getSync(), CONNECTION_ID); + verify(temporalClient).submitSync(JOB_ID, ATTEMPT_ID, job.getConfig().getSync(), attemptSyncConfig, CONNECTION_ID); assertEquals(jobRoot, workerRun.getJobRoot()); } @@ -78,7 +84,6 @@ void testSync() throws Exception { void testResetConnection() throws Exception { final JobResetConnectionConfig resetConfig = new JobResetConnectionConfig() .withDestinationDockerImage("airbyte/fusion_reactor") - .withDestinationConfiguration(Jsons.jsonNode(ImmutableMap.of("a", 1))) .withOperationSequence(List.of(new StandardSyncOperation().withName("b"))) .withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog()) .withIsSourceCustomConnector(false) @@ -86,22 +91,20 @@ void testResetConnection() throws Exception { final JobSyncConfig syncConfig = new JobSyncConfig() .withSourceDockerImage(WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB) .withDestinationDockerImage(resetConfig.getDestinationDockerImage()) - .withDestinationConfiguration(resetConfig.getDestinationConfiguration()) .withOperationSequence(List.of(new StandardSyncOperation().withName("b"))) - .withSourceConfiguration(Jsons.emptyObject()) .withConfiguredAirbyteCatalog(resetConfig.getConfiguredAirbyteCatalog()) .withIsSourceCustomConnector(false) .withIsDestinationCustomConnector(false); when(job.getConfigType()).thenReturn(ConfigType.RESET_CONNECTION); when(job.getConfig().getResetConnection()).thenReturn(resetConfig); final TemporalResponse mockResponse = mock(TemporalResponse.class); - when(temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID)).thenReturn(mockResponse); + when(temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, attemptSyncConfig, CONNECTION_ID)).thenReturn(mockResponse); final WorkerRun workerRun = workerRunFactory.create(job); workerRun.call(); final ArgumentCaptor argument = ArgumentCaptor.forClass(JobSyncConfig.class); - verify(temporalClient).submitSync(eq(JOB_ID), eq(ATTEMPT_ID), argument.capture(), eq(CONNECTION_ID)); + verify(temporalClient).submitSync(eq(JOB_ID), eq(ATTEMPT_ID), argument.capture(), eq(attemptSyncConfig), eq(CONNECTION_ID)); assertEquals(syncConfig, argument.getValue()); assertEquals(jobRoot, workerRun.getJobRoot()); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityTest.java new file mode 100644 index 000000000000..1a71296660e2 --- /dev/null +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityTest.java @@ -0,0 +1,234 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.scheduling.activities; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.api.client.generated.AttemptApi; +import io.airbyte.api.client.generated.StateApi; +import io.airbyte.api.client.invoker.generated.ApiException; +import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; +import io.airbyte.api.client.model.generated.ConnectionState; +import io.airbyte.api.client.model.generated.ConnectionStateType; +import io.airbyte.api.client.model.generated.SaveAttemptSyncConfigRequestBody; +import io.airbyte.commons.features.FeatureFlags; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.server.converters.ApiPojoConverters; +import io.airbyte.config.AttemptSyncConfig; +import io.airbyte.config.DestinationConnection; +import io.airbyte.config.JobConfig; +import io.airbyte.config.JobConfig.ConfigType; +import io.airbyte.config.JobResetConnectionConfig; +import io.airbyte.config.JobSyncConfig; +import io.airbyte.config.SourceConnection; +import io.airbyte.config.StandardDestinationDefinition; +import io.airbyte.config.StandardSourceDefinition; +import io.airbyte.config.StandardSync; +import io.airbyte.config.StandardSyncInput; +import io.airbyte.config.State; +import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.persistence.job.JobPersistence; +import io.airbyte.persistence.job.models.IntegrationLauncherConfig; +import io.airbyte.persistence.job.models.Job; +import io.airbyte.persistence.job.models.JobRunConfig; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.validation.json.JsonValidationException; +import io.airbyte.workers.WorkerConstants; +import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.GeneratedJobInput; +import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.SyncInput; +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class GenerateInputActivityTest { + + static private AttemptApi attemptApi; + static private JobPersistence jobPersistence; + static private ConfigRepository configRepository; + static private GenerateInputActivityImpl generateInputActivity; + static private Job job; + + static private final JsonNode SOURCE_CONFIGURATION = Jsons.jsonNode(Map.of("source_key", "source_value")); + static private final JsonNode DESTINATION_CONFIGURATION = Jsons.jsonNode(Map.of("destination_key", "destination_value")); + static private final State STATE = new State().withState(Jsons.jsonNode(Map.of("state_key", "state_value"))); + + static private final long JOB_ID = 1; + static private final int ATTEMPT_ID = 1; + static private final UUID SOURCE_ID = UUID.randomUUID(); + static private final UUID DESTINATION_ID = UUID.randomUUID(); + static private final UUID CONNECTION_ID = UUID.randomUUID(); + + @BeforeEach + void setUp() throws IOException, JsonValidationException, ConfigNotFoundException, ApiException { + final StateApi stateApi = mock(StateApi.class); + final FeatureFlags featureFlags = mock(FeatureFlags.class); + + attemptApi = mock(AttemptApi.class); + jobPersistence = mock(JobPersistence.class); + configRepository = mock(ConfigRepository.class); + generateInputActivity = new GenerateInputActivityImpl(jobPersistence, configRepository, stateApi, attemptApi, featureFlags); + + job = mock(Job.class); + + when(jobPersistence.getJob(JOB_ID)).thenReturn(job); + + final UUID destinationDefinitionId = UUID.randomUUID(); + + final DestinationConnection destinationConnection = new DestinationConnection() + .withDestinationId(DESTINATION_ID) + .withDestinationDefinitionId(destinationDefinitionId) + .withConfiguration(DESTINATION_CONFIGURATION); + when(configRepository.getDestinationConnection(DESTINATION_ID)).thenReturn(destinationConnection); + when(configRepository.getStandardDestinationDefinition(destinationDefinitionId)).thenReturn(mock(StandardDestinationDefinition.class)); + when(configRepository.getSourceDefinitionFromSource(SOURCE_ID)).thenReturn(mock(StandardSourceDefinition.class)); + + final StandardSync standardSync = new StandardSync() + .withSourceId(SOURCE_ID) + .withDestinationId(DESTINATION_ID); + when(configRepository.getStandardSync(CONNECTION_ID)).thenReturn(standardSync); + + when(stateApi.getState(new ConnectionIdRequestBody().connectionId(CONNECTION_ID))) + .thenReturn(new ConnectionState() + .stateType(ConnectionStateType.LEGACY) + .state(STATE.getState())); + } + + @Test + void testGetSyncWorkflowInput() throws JsonValidationException, ConfigNotFoundException, IOException, ApiException { + final SyncInput syncInput = new SyncInput(ATTEMPT_ID, JOB_ID); + + final SourceConnection sourceConnection = new SourceConnection() + .withSourceId(SOURCE_ID) + .withConfiguration(SOURCE_CONFIGURATION); + when(configRepository.getSourceConnection(SOURCE_ID)).thenReturn(sourceConnection); + + final JobSyncConfig jobSyncConfig = new JobSyncConfig() + .withWorkspaceId(UUID.randomUUID()) + .withDestinationDockerImage("destinationDockerImage") + .withSourceDockerImage("sourceDockerImage") + .withConfiguredAirbyteCatalog(mock(ConfiguredAirbyteCatalog.class)); + + final JobConfig jobConfig = new JobConfig() + .withConfigType(ConfigType.SYNC) + .withSync(jobSyncConfig); + + when(job.getConfig()).thenReturn(jobConfig); + when(job.getScope()).thenReturn(CONNECTION_ID.toString()); + + final StandardSyncInput expectedStandardSyncInput = new StandardSyncInput() + .withWorkspaceId(jobSyncConfig.getWorkspaceId()) + .withSourceId(SOURCE_ID) + .withDestinationId(DESTINATION_ID) + .withSourceConfiguration(SOURCE_CONFIGURATION) + .withDestinationConfiguration(DESTINATION_CONFIGURATION) + .withState(STATE) + .withCatalog(jobSyncConfig.getConfiguredAirbyteCatalog()) + .withWorkspaceId(jobSyncConfig.getWorkspaceId()); + + final JobRunConfig expectedJobRunConfig = new JobRunConfig() + .withJobId(String.valueOf(JOB_ID)) + .withAttemptId((long) ATTEMPT_ID); + + final IntegrationLauncherConfig expectedSourceLauncherConfig = new IntegrationLauncherConfig() + .withJobId(String.valueOf(JOB_ID)) + .withAttemptId((long) ATTEMPT_ID) + .withDockerImage(jobSyncConfig.getSourceDockerImage()); + + final IntegrationLauncherConfig expectedDestinationLauncherConfig = new IntegrationLauncherConfig() + .withJobId(String.valueOf(JOB_ID)) + .withAttemptId((long) ATTEMPT_ID) + .withDockerImage(jobSyncConfig.getDestinationDockerImage()); + + final GeneratedJobInput expectedGeneratedJobInput = new GeneratedJobInput( + expectedJobRunConfig, + expectedSourceLauncherConfig, + expectedDestinationLauncherConfig, + expectedStandardSyncInput); + + final GeneratedJobInput generatedJobInput = generateInputActivity.getSyncWorkflowInput(syncInput); + assertEquals(expectedGeneratedJobInput, generatedJobInput); + + final AttemptSyncConfig expectedAttemptSyncConfig = new AttemptSyncConfig() + .withSourceConfiguration(SOURCE_CONFIGURATION) + .withDestinationConfiguration(DESTINATION_CONFIGURATION) + .withState(STATE); + + verify(attemptApi).saveSyncConfig(new SaveAttemptSyncConfigRequestBody() + .jobId(JOB_ID) + .attemptNumber(ATTEMPT_ID) + .syncConfig(ApiPojoConverters.attemptSyncConfigToClient(expectedAttemptSyncConfig, CONNECTION_ID, true))); + } + + @Test + void testGetResetSyncWorkflowInput() throws IOException, ApiException { + final SyncInput syncInput = new SyncInput(ATTEMPT_ID, JOB_ID); + + final JobResetConnectionConfig jobResetConfig = new JobResetConnectionConfig() + .withWorkspaceId(UUID.randomUUID()) + .withDestinationDockerImage("destinationDockerImage") + .withConfiguredAirbyteCatalog(mock(ConfiguredAirbyteCatalog.class)); + + final JobConfig jobConfig = new JobConfig() + .withConfigType(ConfigType.RESET_CONNECTION) + .withResetConnection(jobResetConfig); + + when(job.getConfig()).thenReturn(jobConfig); + when(job.getScope()).thenReturn(CONNECTION_ID.toString()); + + final StandardSyncInput expectedStandardSyncInput = new StandardSyncInput() + .withWorkspaceId(jobResetConfig.getWorkspaceId()) + .withSourceId(SOURCE_ID) + .withDestinationId(DESTINATION_ID) + .withSourceConfiguration(Jsons.emptyObject()) + .withDestinationConfiguration(DESTINATION_CONFIGURATION) + .withState(STATE) + .withCatalog(jobResetConfig.getConfiguredAirbyteCatalog()) + .withWorkspaceId(jobResetConfig.getWorkspaceId()); + + final JobRunConfig expectedJobRunConfig = new JobRunConfig() + .withJobId(String.valueOf(JOB_ID)) + .withAttemptId((long) ATTEMPT_ID); + + final IntegrationLauncherConfig expectedSourceLauncherConfig = new IntegrationLauncherConfig() + .withJobId(String.valueOf(JOB_ID)) + .withAttemptId((long) ATTEMPT_ID) + .withDockerImage(WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB); + + final IntegrationLauncherConfig expectedDestinationLauncherConfig = new IntegrationLauncherConfig() + .withJobId(String.valueOf(JOB_ID)) + .withAttemptId((long) ATTEMPT_ID) + .withDockerImage(jobResetConfig.getDestinationDockerImage()); + + final GeneratedJobInput expectedGeneratedJobInput = new GeneratedJobInput( + expectedJobRunConfig, + expectedSourceLauncherConfig, + expectedDestinationLauncherConfig, + expectedStandardSyncInput); + + final GeneratedJobInput generatedJobInput = generateInputActivity.getSyncWorkflowInput(syncInput); + assertEquals(expectedGeneratedJobInput, generatedJobInput); + + final AttemptSyncConfig expectedAttemptSyncConfig = new AttemptSyncConfig() + .withSourceConfiguration(Jsons.emptyObject()) + .withDestinationConfiguration(DESTINATION_CONFIGURATION) + .withState(STATE); + + verify(attemptApi).saveSyncConfig(new SaveAttemptSyncConfigRequestBody() + .jobId(JOB_ID) + .attemptNumber(ATTEMPT_ID) + .syncConfig(ApiPojoConverters.attemptSyncConfigToClient(expectedAttemptSyncConfig, CONNECTION_ID, true))); + } + +} diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index a246a2c3faae..bee505326122 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -194,7 +194,7 @@ void createResetJob() throws JsonValidationException, ConfigNotFoundException, I @Test void isLastJobOrAttemptFailureTrueTest() throws Exception { final int activeAttemptNumber = 0; - final Attempt activeAttempt = new Attempt(activeAttemptNumber, 1, Path.of(""), null, AttemptStatus.RUNNING, null, null, 4L, 5L, null); + final Attempt activeAttempt = new Attempt(activeAttemptNumber, 1, Path.of(""), null, null, AttemptStatus.RUNNING, null, null, 4L, 5L, null); final Job previousJob = new Job(PREVIOUS_JOB_ID, ConfigType.SYNC, CONNECTION_ID.toString(), new JobConfig(), List.of(), JobStatus.SUCCEEDED, 4L, 4L, 5L); @@ -214,7 +214,7 @@ void isLastJobOrAttemptFailureTrueTest() throws Exception { @Test void isLastJobOrAttemptFailureFalseTest() throws Exception { final int activeAttemptNumber = 0; - final Attempt activeAttempt = new Attempt(activeAttemptNumber, 1, Path.of(""), null, AttemptStatus.RUNNING, null, null, 4L, 5L, null); + final Attempt activeAttempt = new Attempt(activeAttemptNumber, 1, Path.of(""), null, null, AttemptStatus.RUNNING, null, null, 4L, 5L, null); final Job previousJob = new Job(PREVIOUS_JOB_ID, ConfigType.SYNC, CONNECTION_ID.toString(), new JobConfig(), List.of(), JobStatus.FAILED, 4L, 4L, 5L); @@ -233,9 +233,9 @@ void isLastJobOrAttemptFailureFalseTest() throws Exception { @Test void isLastJobOrAttemptFailurePreviousAttemptFailureTest() throws Exception { - final Attempt previousAttempt = new Attempt(0, 1, Path.of(""), null, AttemptStatus.FAILED, null, null, 2L, 3L, 3L); + final Attempt previousAttempt = new Attempt(0, 1, Path.of(""), null, null, AttemptStatus.FAILED, null, null, 2L, 3L, 3L); final int activeAttemptNumber = 1; - final Attempt activeAttempt = new Attempt(activeAttemptNumber, 1, Path.of(""), null, AttemptStatus.RUNNING, null, null, 4L, 5L, null); + final Attempt activeAttempt = new Attempt(activeAttemptNumber, 1, Path.of(""), null, null, AttemptStatus.RUNNING, null, null, 4L, 5L, null); final Job previousJob = new Job(PREVIOUS_JOB_ID, ConfigType.SYNC, CONNECTION_ID.toString(), new JobConfig(), List.of(), JobStatus.SUCCEEDED, 4L, 4L, 5L); @@ -473,9 +473,9 @@ void setJobCancelledWrapException() throws IOException { @Test void ensureCleanJobState() throws IOException { - final Attempt failedAttempt = new Attempt(0, 1, Path.of(""), null, AttemptStatus.FAILED, null, null, 2L, 3L, 3L); + final Attempt failedAttempt = new Attempt(0, 1, Path.of(""), null, null, AttemptStatus.FAILED, null, null, 2L, 3L, 3L); final int runningAttemptNumber = 1; - final Attempt runningAttempt = new Attempt(runningAttemptNumber, 1, Path.of(""), null, AttemptStatus.RUNNING, null, null, 4L, 5L, null); + final Attempt runningAttempt = new Attempt(runningAttemptNumber, 1, Path.of(""), null, null, AttemptStatus.RUNNING, null, null, 4L, 5L, null); final Job runningJob = new Job(1, ConfigType.SYNC, CONNECTION_ID.toString(), new JobConfig(), List.of(failedAttempt, runningAttempt), JobStatus.RUNNING, 2L, 2L, 3L); diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index c772bcb886bc..feb39b876cdc 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -214,6 +214,7 @@

Table of Contents

Attempt

Connection

@@ -273,6 +274,7 @@

Internal

  • post /v1/state/create_or_update
  • post /v1/jobs/get_normalization_status
  • post /v1/attempt/save_stats
  • +
  • post /v1/attempt/save_sync_config
  • post /v1/attempt/set_workflow_in_attempt
  • post /v1/sources/write_discover_catalog_result
  • @@ -407,6 +409,58 @@

    Request body

    +

    Return type

    + + + + +

    Example data

    +
    Content-Type: application/json
    +
    {
    +  "succeeded" : true
    +}
    + +

    Produces

    + This API call produces the following media types according to the Accept request header; + the media type will be conveyed by the Content-Type response header. +
      +
    • application/json
    • +
    + +

    Responses

    +

    200

    + Successful Operation + InternalOperationResult + +
    +
    +
    + Up +
    post /v1/attempt/save_sync_config
    +
    For worker to save the AttemptSyncConfig for an attempt. (saveSyncConfig)
    +
    + + +

    Consumes

    + This API call consumes the following media types via the Content-Type request header: +
      +
    • application/json
    • +
    + +

    Request body

    +
    +
    SaveAttemptSyncConfigRequestBody SaveAttemptSyncConfigRequestBody (required)
    + +
    Body Parameter
    + +
    + + + +

    Return type

    InternalOperationResult @@ -4093,6 +4147,58 @@

    Request body

    +

    Return type

    + + + + +

    Example data

    +
    Content-Type: application/json
    +
    {
    +  "succeeded" : true
    +}
    + +

    Produces

    + This API call produces the following media types according to the Accept request header; + the media type will be conveyed by the Content-Type response header. +
      +
    • application/json
    • +
    + +

    Responses

    +

    200

    + Successful Operation + InternalOperationResult +
    +
    +
    +
    + Up +
    post /v1/attempt/save_sync_config
    +
    For worker to save the AttemptSyncConfig for an attempt. (saveSyncConfig)
    +
    + + +

    Consumes

    + This API call consumes the following media types via the Content-Type request header: +
      +
    • application/json
    • +
    + +

    Request body

    +
    +
    SaveAttemptSyncConfigRequestBody SaveAttemptSyncConfigRequestBody (required)
    + +
    Body Parameter
    + +
    + + + +

    Return type

    +
    +

    AttemptSyncConfig - Up

    +
    +
    +
    sourceConfiguration
    +
    destinationConfiguration
    +
    state (optional)
    +
    +
    +
    +

    SaveAttemptSyncConfigRequestBody - Up

    +
    +
    +
    jobId
    Long format: int64
    +
    attemptNumber
    Integer format: int32
    +
    syncConfig
    +
    +