From fbd01aabbb4420676558f320579d3fa895b45bc6 Mon Sep 17 00:00:00 2001 From: Lake Mossman Date: Thu, 12 May 2022 17:43:19 -0700 Subject: [PATCH] Repair temporal state when performing manual actions (#12289) * Repair temporal state when performing manual actions * refactor temporal client and fix tests * add unreachable workflow exception * format * test repeated deletion * add acceptance tests for automatic workflow repair * rename and DRY up manual operation methods in SchedulerHandler * refactor temporal client to batch signal and start requests together in repair case * add comment * remove main method * fix job id fetching * only overwrite workflowState if reset flags are true on input * fix test * fix cancel endpoint * Clean job state before creating new jobs in connection manager workflow (#12589) * first working iteration of cleaning job state on first workflow run * second iteration, with tests * undo local testing changes * move method * add comment explaining placement of clean job state logic * change connection_workflow failure origin value to platform * remove cast from new query * create static var for non terminal job statuses * change failure origin value to airbyte_platform * tweak external message wording * remove unused variable * reword external message * fix merge conflict * remove log lines * move cleaning job state to beginning of workflow * do not clean job state if there is already a job id for this workflow, and add test * see if sleeping fixes test on CI * add repeated test annotation to protect from flakiness * fail jobs before creating new ones to protect from quarantined state * update external message for cleaning job state error --- airbyte-api/src/main/openapi/config.yaml | 1 + .../main/resources/types/FailureReason.yaml | 1 + .../airbyte/scheduler/client/EventRunner.java | 10 +- .../scheduler/client/TemporalEventRunner.java | 12 +- .../airbyte/scheduler/models/JobStatus.java | 1 + .../persistence/DefaultJobPersistence.java | 12 + .../scheduler/persistence/JobPersistence.java | 2 + .../DefaultJobPersistenceTest.java | 39 ++ .../server/handlers/SchedulerHandler.java | 47 ++- .../server/handlers/SchedulerHandlerTest.java | 6 +- .../test/acceptance/AcceptanceTests.java | 99 +++++ .../airbyte/workers/helper/FailureHelper.java | 12 + .../temporal/ConnectionManagerUtils.java | 194 ++++++++++ .../workers/temporal/TemporalClient.java | 231 +++++------- .../exception/DeletedWorkflowException.java | 13 + .../UnreachableWorkflowException.java | 13 + .../ConnectionManagerWorkflowImpl.java | 35 +- .../JobCreationAndStatusUpdateActivity.java | 12 + ...obCreationAndStatusUpdateActivityImpl.java | 44 +++ .../workers/temporal/TemporalClientTest.java | 356 ++++++++++++++++-- .../ConnectionManagerWorkflowTest.java | 22 ++ ...obCreationAndStatusUpdateActivityTest.java | 44 ++- 22 files changed, 994 insertions(+), 212 deletions(-) create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/exception/DeletedWorkflowException.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/exception/UnreachableWorkflowException.java diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index dd4f72314461..26adc3dfd97c 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -3862,6 +3862,7 @@ components: - persistence - normalization - dbt + - airbyte_platform AttemptFailureType: description: Categorizes well known errors into types for programmatic handling. If not set, the type of error is not well known. type: string diff --git a/airbyte-config/models/src/main/resources/types/FailureReason.yaml b/airbyte-config/models/src/main/resources/types/FailureReason.yaml index bae623d6da64..72dced892a78 100644 --- a/airbyte-config/models/src/main/resources/types/FailureReason.yaml +++ b/airbyte-config/models/src/main/resources/types/FailureReason.yaml @@ -17,6 +17,7 @@ properties: - persistence - normalization - dbt + - airbyte_platform failureType: description: Categorizes well known errors into types for programmatic handling. If not set, the type of error is not well known. type: string diff --git a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/EventRunner.java b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/EventRunner.java index f1784a8baff4..95bec9f3f8a9 100644 --- a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/EventRunner.java +++ b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/EventRunner.java @@ -4,7 +4,7 @@ package io.airbyte.scheduler.client; -import io.airbyte.workers.temporal.TemporalClient.ManualSyncSubmissionResult; +import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult; import java.util.Set; import java.util.UUID; @@ -12,13 +12,13 @@ public interface EventRunner { void createNewSchedulerWorkflow(final UUID connectionId); - ManualSyncSubmissionResult startNewManualSync(final UUID connectionId); + ManualOperationResult startNewManualSync(final UUID connectionId); - ManualSyncSubmissionResult startNewCancelation(final UUID connectionId); + ManualOperationResult startNewCancellation(final UUID connectionId); - ManualSyncSubmissionResult resetConnection(final UUID connectionId); + ManualOperationResult resetConnection(final UUID connectionId); - ManualSyncSubmissionResult synchronousResetConnection(final UUID connectionId); + ManualOperationResult synchronousResetConnection(final UUID connectionId); void deleteConnection(final UUID connectionId); diff --git a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/TemporalEventRunner.java b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/TemporalEventRunner.java index 9eb7df68198f..d9d9b075b008 100644 --- a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/TemporalEventRunner.java +++ b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/TemporalEventRunner.java @@ -5,7 +5,7 @@ package io.airbyte.scheduler.client; import io.airbyte.workers.temporal.TemporalClient; -import io.airbyte.workers.temporal.TemporalClient.ManualSyncSubmissionResult; +import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult; import java.util.Set; import java.util.UUID; import lombok.AllArgsConstructor; @@ -19,19 +19,19 @@ public void createNewSchedulerWorkflow(final UUID connectionId) { temporalClient.submitConnectionUpdaterAsync(connectionId); } - public ManualSyncSubmissionResult startNewManualSync(final UUID connectionId) { + public ManualOperationResult startNewManualSync(final UUID connectionId) { return temporalClient.startNewManualSync(connectionId); } - public ManualSyncSubmissionResult startNewCancelation(final UUID connectionId) { - return temporalClient.startNewCancelation(connectionId); + public ManualOperationResult startNewCancellation(final UUID connectionId) { + return temporalClient.startNewCancellation(connectionId); } - public ManualSyncSubmissionResult resetConnection(final UUID connectionId) { + public ManualOperationResult resetConnection(final UUID connectionId) { return temporalClient.resetConnection(connectionId); } - public ManualSyncSubmissionResult synchronousResetConnection(final UUID connectionId) { + public ManualOperationResult synchronousResetConnection(final UUID connectionId) { return temporalClient.synchronousResetConnection(connectionId); } diff --git a/airbyte-scheduler/models/src/main/java/io/airbyte/scheduler/models/JobStatus.java b/airbyte-scheduler/models/src/main/java/io/airbyte/scheduler/models/JobStatus.java index dca3be7b7b92..7fbe68d97c26 100644 --- a/airbyte-scheduler/models/src/main/java/io/airbyte/scheduler/models/JobStatus.java +++ b/airbyte-scheduler/models/src/main/java/io/airbyte/scheduler/models/JobStatus.java @@ -17,5 +17,6 @@ public enum JobStatus { CANCELLED; public static final Set TERMINAL_STATUSES = Sets.newHashSet(FAILED, SUCCEEDED, CANCELLED); + public static final Set NON_TERMINAL_STATUSES = Sets.difference(Set.of(values()), TERMINAL_STATUSES); } diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java index 28b5d553c354..7a45dd8e2630 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java @@ -386,6 +386,18 @@ public List listJobsWithStatus(final ConfigType configType, final JobStatus return listJobsWithStatus(Sets.newHashSet(configType), status); } + @Override + public List listJobsForConnectionWithStatuses(final UUID connectionId, final Set configTypes, final Set statuses) + throws IOException { + return jobDatabase.query(ctx -> getJobsFromResult(ctx + .fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE " + + "scope = ? AND " + + "config_type IN " + Sqls.toSqlInFragment(configTypes) + " AND " + + "jobs.status IN " + Sqls.toSqlInFragment(statuses) + " " + + ORDER_BY_JOB_TIME_ATTEMPT_TIME, + connectionId.toString()))); + } + @Override public List listJobStatusAndTimestampWithConnection(final UUID connectionId, final Set configTypes, diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java index 39124cab645e..b73310ea7cd7 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java @@ -161,6 +161,8 @@ public interface JobPersistence { List listJobsWithStatus(JobConfig.ConfigType configType, JobStatus status) throws IOException; + List listJobsForConnectionWithStatuses(UUID connectionId, Set configTypes, Set statuses) throws IOException; + /** * @param connectionId The ID of the connection * @param configTypes The types of jobs diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java index 1d283d767f43..e4d44447d99a 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java @@ -1168,6 +1168,45 @@ public void testListJobsWithStatusAndConfigType() throws IOException, Interrupte assertEquals(expectedIncompleteJob, actualIncompleteJob); } + @Test + @DisplayName("Should only list jobs for the requested connection and with the requested statuses and config types") + public void testListJobsWithStatusesAndConfigTypesForConnection() throws IOException, InterruptedException { + final UUID desiredConnectionId = UUID.randomUUID(); + final UUID otherConnectionId = UUID.randomUUID(); + + // desired connection, statuses, and config types + final long desiredJobId1 = jobPersistence.enqueueJob(desiredConnectionId.toString(), SYNC_JOB_CONFIG).orElseThrow(); + jobPersistence.succeedAttempt(desiredJobId1, jobPersistence.createAttempt(desiredJobId1, LOG_PATH)); + final long desiredJobId2 = jobPersistence.enqueueJob(desiredConnectionId.toString(), SYNC_JOB_CONFIG).orElseThrow(); + final long desiredJobId3 = jobPersistence.enqueueJob(desiredConnectionId.toString(), CHECK_JOB_CONFIG).orElseThrow(); + jobPersistence.succeedAttempt(desiredJobId3, jobPersistence.createAttempt(desiredJobId3, LOG_PATH)); + final long desiredJobId4 = jobPersistence.enqueueJob(desiredConnectionId.toString(), CHECK_JOB_CONFIG).orElseThrow(); + + // right connection id and status, wrong config type + final long otherJobId1 = jobPersistence.enqueueJob(desiredConnectionId.toString(), SPEC_JOB_CONFIG).orElseThrow(); + // right config type and status, wrong connection id + final long otherJobId2 = jobPersistence.enqueueJob(otherConnectionId.toString(), SYNC_JOB_CONFIG).orElseThrow(); + // right connection id and config type, wrong status + final long otherJobId3 = jobPersistence.enqueueJob(desiredConnectionId.toString(), CHECK_JOB_CONFIG).orElseThrow(); + jobPersistence.failAttempt(otherJobId3, jobPersistence.createAttempt(otherJobId3, LOG_PATH)); + + final List actualJobs = jobPersistence.listJobsForConnectionWithStatuses(desiredConnectionId, + Set.of(ConfigType.SYNC, ConfigType.CHECK_CONNECTION_DESTINATION), Set.of(JobStatus.PENDING, JobStatus.SUCCEEDED)); + + final Job expectedDesiredJob1 = createJob(desiredJobId1, SYNC_JOB_CONFIG, JobStatus.SUCCEEDED, + Lists.newArrayList(createAttempt(0L, desiredJobId1, AttemptStatus.SUCCEEDED, LOG_PATH)), + NOW.getEpochSecond(), desiredConnectionId.toString()); + final Job expectedDesiredJob2 = + createJob(desiredJobId2, SYNC_JOB_CONFIG, JobStatus.PENDING, Lists.newArrayList(), NOW.getEpochSecond(), desiredConnectionId.toString()); + final Job expectedDesiredJob3 = createJob(desiredJobId3, CHECK_JOB_CONFIG, JobStatus.SUCCEEDED, + Lists.newArrayList(createAttempt(0L, desiredJobId3, AttemptStatus.SUCCEEDED, LOG_PATH)), + NOW.getEpochSecond(), desiredConnectionId.toString()); + final Job expectedDesiredJob4 = + createJob(desiredJobId4, CHECK_JOB_CONFIG, JobStatus.PENDING, Lists.newArrayList(), NOW.getEpochSecond(), desiredConnectionId.toString()); + + assertEquals(Sets.newHashSet(expectedDesiredJob1, expectedDesiredJob2, expectedDesiredJob3, expectedDesiredJob4), Sets.newHashSet(actualJobs)); + } + } @Nested diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index b4f3ba95d296..0a5bd696f81d 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -71,7 +71,7 @@ import io.airbyte.server.handlers.helpers.CatalogConverter; import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; -import io.airbyte.workers.temporal.TemporalClient.ManualSyncSubmissionResult; +import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult; import io.airbyte.workers.temporal.TemporalUtils; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.workflowservice.v1.RequestCancelWorkflowExecutionRequest; @@ -364,7 +364,7 @@ public DestinationDefinitionSpecificationRead getDestinationSpecification( public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody) throws ConfigNotFoundException, IOException, JsonValidationException { if (featureFlags.usesNewScheduler()) { - return createManualRun(connectionIdRequestBody.getConnectionId()); + return submitManualSyncToWorker(connectionIdRequestBody.getConnectionId()); } final UUID connectionId = connectionIdRequestBody.getConnectionId(); final StandardSync standardSync = configRepository.getStandardSync(connectionId); @@ -411,7 +411,7 @@ public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequ public JobInfoRead resetConnection(final ConnectionIdRequestBody connectionIdRequestBody) throws IOException, JsonValidationException, ConfigNotFoundException { if (featureFlags.usesNewScheduler()) { - return resetConnectionWithNewScheduler(connectionIdRequestBody.getConnectionId()); + return submitResetConnectionToWorker(connectionIdRequestBody.getConnectionId()); } final UUID connectionId = connectionIdRequestBody.getConnectionId(); final StandardSync standardSync = configRepository.getStandardSync(connectionId); @@ -447,7 +447,7 @@ public ConnectionState getState(final ConnectionIdRequestBody connectionIdReques // todo (cgardens) - this method needs a test. public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOException { if (featureFlags.usesNewScheduler()) { - return createNewSchedulerCancellation(jobIdRequestBody.getId()); + return submitCancellationToWorker(jobIdRequestBody.getId()); } final long jobId = jobIdRequestBody.getId(); @@ -509,39 +509,36 @@ private ConnectorSpecification getSpecFromDestinationDefinitionId(final UUID des return destinationDef.getSpec(); } - private JobInfoRead createNewSchedulerCancellation(final Long id) throws IOException { - final Job job = jobPersistence.getJob(id); - - final ManualSyncSubmissionResult cancellationSubmissionResult = eventRunner.startNewCancelation(UUID.fromString(job.getScope())); + private JobInfoRead submitCancellationToWorker(final Long jobId) throws IOException { + final Job job = jobPersistence.getJob(jobId); - if (cancellationSubmissionResult.getFailingReason().isPresent()) { - throw new IllegalStateException(cancellationSubmissionResult.getFailingReason().get()); + final ManualOperationResult cancellationResult = eventRunner.startNewCancellation(UUID.fromString(job.getScope())); + if (cancellationResult.getFailingReason().isPresent()) { + throw new IllegalStateException(cancellationResult.getFailingReason().get()); } - final Job cancelledJob = jobPersistence.getJob(id); - return jobConverter.getJobInfoRead(cancelledJob); + // query same job ID again to get updated job info after cancellation + return jobConverter.getJobInfoRead(jobPersistence.getJob(jobId)); } - private JobInfoRead createManualRun(final UUID connectionId) throws IOException { - final ManualSyncSubmissionResult manualSyncSubmissionResult = eventRunner.startNewManualSync(connectionId); + private JobInfoRead submitManualSyncToWorker(final UUID connectionId) throws IOException { + final ManualOperationResult manualSyncResult = eventRunner.startNewManualSync(connectionId); - if (manualSyncSubmissionResult.getFailingReason().isPresent()) { - throw new IllegalStateException(manualSyncSubmissionResult.getFailingReason().get()); - } + return readJobFromResult(manualSyncResult); + } - final Job job = jobPersistence.getJob(manualSyncSubmissionResult.getJobId().get()); + private JobInfoRead submitResetConnectionToWorker(final UUID connectionId) throws IOException { + final ManualOperationResult resetConnectionResult = eventRunner.resetConnection(connectionId); - return jobConverter.getJobInfoRead(job); + return readJobFromResult(resetConnectionResult); } - private JobInfoRead resetConnectionWithNewScheduler(final UUID connectionId) throws IOException { - final ManualSyncSubmissionResult manualSyncSubmissionResult = eventRunner.resetConnection(connectionId); - - if (manualSyncSubmissionResult.getFailingReason().isPresent()) { - throw new IllegalStateException(manualSyncSubmissionResult.getFailingReason().get()); + private JobInfoRead readJobFromResult(final ManualOperationResult manualOperationResult) throws IOException, IllegalStateException { + if (manualOperationResult.getFailingReason().isPresent()) { + throw new IllegalStateException(manualOperationResult.getFailingReason().get()); } - final Job job = jobPersistence.getJob(manualSyncSubmissionResult.getJobId().get()); + final Job job = jobPersistence.getJob(manualOperationResult.getJobId().get()); return jobConverter.getJobInfoRead(job); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index b45044993f72..433071145b34 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -85,7 +85,7 @@ import io.airbyte.server.helpers.SourceHelpers; import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; -import io.airbyte.workers.temporal.TemporalClient.ManualSyncSubmissionResult; +import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult; import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.IOException; import java.net.URI; @@ -720,14 +720,14 @@ void testNewSchedulerSync() throws JsonValidationException, ConfigNotFoundExcept final UUID connectionId = UUID.randomUUID(); final long jobId = 123L; - final ManualSyncSubmissionResult manualSyncSubmissionResult = ManualSyncSubmissionResult + final ManualOperationResult manualOperationResult = ManualOperationResult .builder() .failingReason(Optional.empty()) .jobId(Optional.of(jobId)) .build(); when(eventRunner.startNewManualSync(connectionId)) - .thenReturn(manualSyncSubmissionResult); + .thenReturn(manualOperationResult); doReturn(new JobInfoRead()) .when(jobConverter).getJobInfoRead(any()); diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 030a83b8dd47..0638c6a58a70 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -1193,6 +1193,7 @@ public void testDeleteConnection() throws Exception { waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING)); // test normal deletion of connection + LOGGER.info("Calling delete connection..."); apiClient.getConnectionApi().deleteConnection(new ConnectionIdRequestBody().connectionId(connectionId)); // remove connection to avoid exception during tear down @@ -1205,6 +1206,10 @@ public void testDeleteConnection() throws Exception { apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId)).getStatus(); assertEquals(ConnectionStatus.DEPRECATED, connectionStatus); + // test that repeated deletion call for same connection is successful + LOGGER.info("Calling delete connection a second time to test repeat call behavior..."); + apiClient.getConnectionApi().deleteConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + // test deletion of connection when temporal workflow is in a bad state, only when using new // scheduler final FeatureFlags featureFlags = new EnvVariableFeatureFlags(); @@ -1268,6 +1273,100 @@ public void testUpdateConnectionWhenWorkflowUnreachable() throws Exception { } } + @Test + @Order(24) + public void testManualSyncRepairsWorkflowWhenWorkflowUnreachable() throws Exception { + // This test only covers the specific behavior of updating a connection that does not have an + // underlying temporal workflow. + // This case only occurs with the new scheduler, so the entire test is inside the feature flag + // conditional. + final FeatureFlags featureFlags = new EnvVariableFeatureFlags(); + if (featureFlags.usesNewScheduler()) { + final String connectionName = "test-connection"; + final SourceDefinitionRead sourceDefinition = createE2eSourceDefinition(); + final SourceRead source = createSource( + "E2E Test Source -" + UUID.randomUUID(), + workspaceId, + sourceDefinition.getSourceDefinitionId(), + Jsons.jsonNode(ImmutableMap.builder() + .put("type", "INFINITE_FEED") + .put("max_records", 5000) + .put("message_interval", 100) + .build())); + final UUID sourceId = source.getSourceId(); + final UUID destinationId = createDestination().getDestinationId(); + final UUID operationId = createOperation().getOperationId(); + final AirbyteCatalog catalog = discoverSourceSchema(sourceId); + final SyncMode syncMode = SyncMode.INCREMENTAL; + final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP; + catalog.getStreams().forEach(s -> s.getConfig() + .syncMode(syncMode) + .cursorField(List.of(COLUMN_ID)) + .destinationSyncMode(destinationSyncMode) + .primaryKey(List.of(List.of(COLUMN_NAME)))); + + LOGGER.info("Testing manual sync when temporal is in a terminal state"); + final UUID connectionId = createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + + LOGGER.info("Starting first manual sync"); + final JobInfoRead firstJobInfo = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + LOGGER.info("Terminating workflow during first sync"); + terminateTemporalWorkflow(connectionId); + + LOGGER.info("Submitted another manual sync"); + apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + + LOGGER.info("Waiting for workflow to be recreated..."); + Thread.sleep(500); + + final WorkflowState workflowState = getWorkflowState(connectionId); + assertTrue(workflowState.isRunning()); + assertTrue(workflowState.isSkipScheduling()); + + // verify that the first manual sync was marked as failed + final JobInfoRead terminatedJobInfo = apiClient.getJobsApi().getJobInfo(new JobIdRequestBody().id(firstJobInfo.getJob().getId())); + assertEquals(JobStatus.FAILED, terminatedJobInfo.getJob().getStatus()); + } + } + + @Test + @Order(25) + public void testResetConnectionRepairsWorkflowWhenWorkflowUnreachable() throws Exception { + // This test only covers the specific behavior of updating a connection that does not have an + // underlying temporal workflow. + // This case only occurs with the new scheduler, so the entire test is inside the feature flag + // conditional. + final FeatureFlags featureFlags = new EnvVariableFeatureFlags(); + if (featureFlags.usesNewScheduler()) { + final String connectionName = "test-connection"; + final UUID sourceId = createPostgresSource().getSourceId(); + final UUID destinationId = createDestination().getDestinationId(); + final UUID operationId = createOperation().getOperationId(); + final AirbyteCatalog catalog = discoverSourceSchema(sourceId); + final SyncMode syncMode = SyncMode.INCREMENTAL; + final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP; + catalog.getStreams().forEach(s -> s.getConfig() + .syncMode(syncMode) + .cursorField(List.of(COLUMN_ID)) + .destinationSyncMode(destinationSyncMode) + .primaryKey(List.of(List.of(COLUMN_NAME)))); + + LOGGER.info("Testing reset connection when temporal is in a terminal state"); + final UUID connectionId = createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + + terminateTemporalWorkflow(connectionId); + + apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + + LOGGER.info("Waiting for workflow to be recreated..."); + Thread.sleep(500); + + final WorkflowState workflowState = getWorkflowState(connectionId); + assertTrue(workflowState.isRunning()); + assertTrue(workflowState.isResetConnection()); + } + } + private WorkflowClient getWorkflowClient() { final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService("localhost:7233"); return WorkflowClient.newInstance(temporalService); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/helper/FailureHelper.java b/airbyte-workers/src/main/java/io/airbyte/workers/helper/FailureHelper.java index b01995e42cda..644bcafff8e8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/helper/FailureHelper.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/helper/FailureHelper.java @@ -95,6 +95,18 @@ public static AttemptFailureSummary failureSummaryForCancellation(final Long job return failureSummary(failures, partialSuccess); } + public static AttemptFailureSummary failureSummaryForTemporalCleaningJobState(final Long jobId, final Integer attemptNumber) { + final FailureReason failureReason = new FailureReason() + .withFailureOrigin(FailureOrigin.AIRBYTE_PLATFORM) + .withFailureType(FailureType.SYSTEM_ERROR) + .withInternalMessage( + "Setting attempt to FAILED because the temporal workflow for this connection was restarted, and existing job state was cleaned.") + .withExternalMessage("An internal Airbyte error has occurred. This sync will need to be retried.") + .withTimestamp(System.currentTimeMillis()) + .withMetadata(jobAndAttemptMetadata(jobId, attemptNumber)); + return new AttemptFailureSummary().withFailures(List.of(failureReason)); + } + public static FailureReason failureReasonFromWorkflowAndActivity( final String workflowType, final String activityType, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java new file mode 100644 index 000000000000..fcc152c76c19 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java @@ -0,0 +1,194 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal; + +import io.airbyte.workers.temporal.exception.DeletedWorkflowException; +import io.airbyte.workers.temporal.exception.UnreachableWorkflowException; +import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow; +import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflowImpl; +import io.airbyte.workers.temporal.scheduling.ConnectionUpdaterInput; +import io.airbyte.workers.temporal.scheduling.state.WorkflowState; +import io.temporal.client.BatchRequest; +import io.temporal.client.WorkflowClient; +import io.temporal.workflow.Functions.Proc; +import io.temporal.workflow.Functions.Proc1; +import io.temporal.workflow.Functions.TemporalFunctionalInterfaceMarker; +import java.util.Optional; +import java.util.UUID; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; + +/** + * Encapsulates logic specific to retrieving, starting, and signaling the ConnectionManagerWorkflow. + */ +@Slf4j +public class ConnectionManagerUtils { + + /** + * Attempts to send a signal to the existing ConnectionManagerWorkflow for the provided connection. + * + * If the workflow is unreachable, this will restart the workflow and send the signal in a single + * batched request. Batching is used to avoid race conditions between starting the workflow and + * executing the signal. + * + * @param client the WorkflowClient for interacting with temporal + * @param connectionId the connection ID to execute this operation for + * @param signalMethod a function that takes in a connection manager workflow and executes a signal + * method on it, with no arguments + * @return the healthy connection manager workflow that was signaled + * @throws DeletedWorkflowException if the connection manager workflow was deleted + */ + static ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final WorkflowClient client, + final UUID connectionId, + final Function signalMethod) + throws DeletedWorkflowException { + return signalWorkflowAndRepairIfNecessary(client, connectionId, signalMethod, Optional.empty()); + } + + /** + * Attempts to send a signal to the existing ConnectionManagerWorkflow for the provided connection. + * + * If the workflow is unreachable, this will restart the workflow and send the signal in a single + * batched request. Batching is used to avoid race conditions between starting the workflow and + * executing the signal. + * + * @param client the WorkflowClient for interacting with temporal + * @param connectionId the connection ID to execute this operation for + * @param signalMethod a function that takes in a connection manager workflow and executes a signal + * method on it, with 1 argument + * @param signalArgument the single argument to be input to the signal + * @return the healthy connection manager workflow that was signaled + * @throws DeletedWorkflowException if the connection manager workflow was deleted + */ + static ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final WorkflowClient client, + final UUID connectionId, + final Function> signalMethod, + final T signalArgument) + throws DeletedWorkflowException { + return signalWorkflowAndRepairIfNecessary(client, connectionId, signalMethod, Optional.of(signalArgument)); + } + + // This method unifies the logic of the above two, by using the optional signalArgument parameter to + // indicate if an argument is being provided to the signal or not. + // Keeping this private and only exposing the above methods outside this class provides a strict + // type enforcement for external calls, and means this method can assume consistent type + // implementations for both cases. + private static ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final WorkflowClient client, + final UUID connectionId, + final Function signalMethod, + final Optional signalArgument) + throws DeletedWorkflowException { + try { + final ConnectionManagerWorkflow connectionManagerWorkflow = getConnectionManagerWorkflow(client, connectionId); + log.info("Retrieved existing connection manager workflow for connection {}. Executing signal.", connectionId); + // retrieve the signal from the lambda + final TemporalFunctionalInterfaceMarker signal = signalMethod.apply(connectionManagerWorkflow); + // execute the signal + if (signalArgument.isPresent()) { + ((Proc1) signal).apply(signalArgument.get()); + } else { + ((Proc) signal).apply(); + } + return connectionManagerWorkflow; + } catch (final UnreachableWorkflowException e) { + log.error( + String.format( + "Failed to retrieve ConnectionManagerWorkflow for connection %s. Repairing state by creating new workflow and starting with the signal.", + connectionId), + e); + + final ConnectionManagerWorkflow connectionManagerWorkflow = newConnectionManagerWorkflowStub(client, connectionId); + final ConnectionUpdaterInput startWorkflowInput = buildStartWorkflowInput(connectionId); + + final BatchRequest batchRequest = client.newSignalWithStartRequest(); + batchRequest.add(connectionManagerWorkflow::run, startWorkflowInput); + + // retrieve the signal from the lambda + final TemporalFunctionalInterfaceMarker signal = signalMethod.apply(connectionManagerWorkflow); + // add signal to batch request + if (signalArgument.isPresent()) { + batchRequest.add((Proc1) signal, signalArgument.get()); + } else { + batchRequest.add((Proc) signal); + } + + client.signalWithStart(batchRequest); + log.info("Connection manager workflow for connection {} has been started and signaled.", connectionId); + + return connectionManagerWorkflow; + } + } + + static ConnectionManagerWorkflow startConnectionManagerNoSignal(final WorkflowClient client, final UUID connectionId) { + final ConnectionManagerWorkflow connectionManagerWorkflow = newConnectionManagerWorkflowStub(client, connectionId); + final ConnectionUpdaterInput input = buildStartWorkflowInput(connectionId); + + WorkflowClient.start(connectionManagerWorkflow::run, input); + + return connectionManagerWorkflow; + } + + /** + * Attempts to retrieve the connection manager workflow for the provided connection. + * + * @param connectionId the ID of the connection whose workflow should be retrieved + * @return the healthy ConnectionManagerWorkflow + * @throws DeletedWorkflowException if the workflow was deleted, according to the workflow state + * @throws UnreachableWorkflowException if the workflow is unreachable + */ + static ConnectionManagerWorkflow getConnectionManagerWorkflow(final WorkflowClient client, final UUID connectionId) + throws DeletedWorkflowException, UnreachableWorkflowException { + final ConnectionManagerWorkflow connectionManagerWorkflow; + final WorkflowState workflowState; + try { + connectionManagerWorkflow = client.newWorkflowStub(ConnectionManagerWorkflow.class, getConnectionManagerName(connectionId)); + workflowState = connectionManagerWorkflow.getState(); + } catch (final Exception e) { + throw new UnreachableWorkflowException( + String.format("Failed to retrieve ConnectionManagerWorkflow for connection %s due to the following error:", connectionId), + e); + } + + if (workflowState.isDeleted()) { + throw new DeletedWorkflowException(String.format( + "The connection manager workflow for connection %s is deleted, so no further operations cannot be performed on it.", + connectionId)); + } + + return connectionManagerWorkflow; + } + + static long getCurrentJobId(final WorkflowClient client, final UUID connectionId) { + try { + final ConnectionManagerWorkflow connectionManagerWorkflow = ConnectionManagerUtils.getConnectionManagerWorkflow(client, connectionId); + return connectionManagerWorkflow.getJobInformation().getJobId(); + } catch (final Exception e) { + return ConnectionManagerWorkflowImpl.NON_RUNNING_JOB_ID; + } + } + + static ConnectionManagerWorkflow newConnectionManagerWorkflowStub(final WorkflowClient client, final UUID connectionId) { + return client.newWorkflowStub(ConnectionManagerWorkflow.class, + TemporalUtils.getWorkflowOptionsWithWorkflowId(TemporalJobType.CONNECTION_UPDATER, getConnectionManagerName(connectionId))); + } + + static String getConnectionManagerName(final UUID connectionId) { + return "connection_manager_" + connectionId; + } + + static ConnectionUpdaterInput buildStartWorkflowInput(final UUID connectionId) { + return ConnectionUpdaterInput.builder() + .connectionId(connectionId) + .jobId(null) + .attemptId(null) + .fromFailure(false) + .attemptNumber(1) + .workflowState(null) + .resetConnection(false) + .fromJobResetFailure(false) + .build(); + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index e65920e6083f..a8ced41f3839 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -23,14 +23,14 @@ import io.airbyte.workers.WorkerUtils; import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflow; import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflow; +import io.airbyte.workers.temporal.exception.DeletedWorkflowException; +import io.airbyte.workers.temporal.exception.UnreachableWorkflowException; import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow; -import io.airbyte.workers.temporal.scheduling.ConnectionUpdaterInput; import io.airbyte.workers.temporal.scheduling.state.WorkflowState; import io.airbyte.workers.temporal.spec.SpecWorkflow; import io.airbyte.workers.temporal.sync.SyncWorkflow; import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsRequest; import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsResponse; -import io.temporal.client.BatchRequest; import io.temporal.client.WorkflowClient; import io.temporal.serviceclient.WorkflowServiceStubs; import java.nio.file.Path; @@ -164,7 +164,7 @@ public void migrateSyncIfNeeded(final Set connectionIds) { connectionIds.forEach((connectionId) -> { final StopWatch singleSyncMigrationWatch = new StopWatch(); singleSyncMigrationWatch.start(); - if (!isInRunningWorkflowCache(getConnectionManagerName(connectionId))) { + if (!isInRunningWorkflowCache(ConnectionManagerUtils.getConnectionManagerName(connectionId))) { log.info("Migrating: " + connectionId); try { submitConnectionUpdaterAsync(connectionId); @@ -214,32 +214,16 @@ void refreshRunningWorkflow() { } while (token != null && token.size() > 0); } - public void submitConnectionUpdaterAsync(final UUID connectionId) { + public ConnectionManagerWorkflow submitConnectionUpdaterAsync(final UUID connectionId) { log.info("Starting the scheduler temporal wf"); - final ConnectionManagerWorkflow connectionManagerWorkflow = getWorkflowOptionsWithWorkflowId(ConnectionManagerWorkflow.class, - TemporalJobType.CONNECTION_UPDATER, getConnectionManagerName(connectionId)); - final BatchRequest signalRequest = client.newSignalWithStartRequest(); - final ConnectionUpdaterInput input = ConnectionUpdaterInput.builder() - .connectionId(connectionId) - .jobId(null) - .attemptId(null) - .fromFailure(false) - .attemptNumber(1) - .workflowState(null) - .resetConnection(false) - .fromJobResetFailure(false) - .build(); - - signalRequest.add(connectionManagerWorkflow::run, input); - - WorkflowClient.start(connectionManagerWorkflow::run, input); + final ConnectionManagerWorkflow connectionManagerWorkflow = ConnectionManagerUtils.startConnectionManagerNoSignal(client, connectionId); try { CompletableFuture.supplyAsync(() -> { try { do { Thread.sleep(DELAY_BETWEEN_QUERY_MS); - } while (!isWorkflowReachable(getConnectionManagerName(connectionId))); + } while (!isWorkflowReachable(connectionId)); } catch (final InterruptedException e) {} return null; @@ -249,86 +233,72 @@ public void submitConnectionUpdaterAsync(final UUID connectionId) { } catch (final TimeoutException e) { log.error("Can't create a new connection manager workflow due to timeout", e); } + + return connectionManagerWorkflow; } public void deleteConnection(final UUID connectionId) { try { - final ConnectionManagerWorkflow connectionManagerWorkflow = getConnectionUpdateWorkflow(connectionId); - connectionManagerWorkflow.deleteConnection(); - } catch (final IllegalStateException e) { - log.info("Connection in an illegal state; Creating new workflow and sending delete signal"); - - final ConnectionManagerWorkflow connectionManagerWorkflow = getWorkflowOptionsWithWorkflowId(ConnectionManagerWorkflow.class, - TemporalJobType.CONNECTION_UPDATER, getConnectionManagerName(connectionId)); - - final ConnectionUpdaterInput input = ConnectionUpdaterInput.builder() - .connectionId(connectionId) - .jobId(null) - .attemptId(null) - .fromFailure(false) - .attemptNumber(1) - .workflowState(null) - .resetConnection(false) - .fromJobResetFailure(false) - .build(); - - final BatchRequest signalRequest = client.newSignalWithStartRequest(); - signalRequest.add(connectionManagerWorkflow::run, input); - signalRequest.add(connectionManagerWorkflow::deleteConnection); - client.signalWithStart(signalRequest); - log.info("New start request and delete signal submitted"); + ConnectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, + connectionManagerWorkflow -> connectionManagerWorkflow::deleteConnection); + } catch (final DeletedWorkflowException e) { + log.info("Connection {} has already been deleted.", connectionId); } } public void update(final UUID connectionId) { - final boolean workflowReachable = isWorkflowReachable(getConnectionManagerName(connectionId)); - - if (!workflowReachable) { - // if a workflow is not reachable for update, create a new workflow + final ConnectionManagerWorkflow connectionManagerWorkflow; + try { + connectionManagerWorkflow = ConnectionManagerUtils.getConnectionManagerWorkflow(client, connectionId); + } catch (final DeletedWorkflowException e) { + log.info("Connection {} is deleted, and therefore cannot be updated.", connectionId); + return; + } catch (final UnreachableWorkflowException e) { + log.error( + String.format("Failed to retrieve ConnectionManagerWorkflow for connection %s. Repairing state by creating new workflow.", connectionId), + e); submitConnectionUpdaterAsync(connectionId); - } else { - final ConnectionManagerWorkflow connectionManagerWorkflow = getConnectionUpdateWorkflow(connectionId); - connectionManagerWorkflow.connectionUpdated(); + return; } + + connectionManagerWorkflow.connectionUpdated(); } @Value @Builder - public static class ManualSyncSubmissionResult { + public static class ManualOperationResult { final Optional failingReason; final Optional jobId; } - public ManualSyncSubmissionResult startNewManualSync(final UUID connectionId) { + public ManualOperationResult startNewManualSync(final UUID connectionId) { log.info("Manual sync request"); - final boolean workflowReachable = isWorkflowReachable(getConnectionManagerName(connectionId)); - - if (!workflowReachable) { - return new ManualSyncSubmissionResult( - Optional.of("No scheduler workflow is reachable for: " + connectionId), - Optional.empty()); - } - - final ConnectionManagerWorkflow connectionManagerWorkflow = - getExistingWorkflow(ConnectionManagerWorkflow.class, getConnectionManagerName(connectionId)); - final WorkflowState workflowState = connectionManagerWorkflow.getState(); - if (workflowState.isRunning()) { + if (isWorkflowStateRunning(connectionId)) { // TODO Bmoric: Error is running - return new ManualSyncSubmissionResult( + return new ManualOperationResult( Optional.of("A sync is already running for: " + connectionId), Optional.empty()); } - connectionManagerWorkflow.submitManualSync(); + final ConnectionManagerWorkflow connectionManagerWorkflow; + try { + connectionManagerWorkflow = + ConnectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, workflow -> workflow::submitManualSync); + } catch (final DeletedWorkflowException e) { + log.error("Can't sync a deleted connection.", e); + return new ManualOperationResult( + Optional.of(e.getMessage()), + Optional.empty()); + } do { try { Thread.sleep(DELAY_BETWEEN_QUERY_MS); } catch (final InterruptedException e) { - return new ManualSyncSubmissionResult( + return new ManualOperationResult( Optional.of("Didn't managed to start a sync for: " + connectionId), Optional.empty()); } @@ -338,79 +308,64 @@ public ManualSyncSubmissionResult startNewManualSync(final UUID connectionId) { final long jobId = connectionManagerWorkflow.getJobInformation().getJobId(); - return new ManualSyncSubmissionResult( + return new ManualOperationResult( Optional.empty(), Optional.of(jobId)); } - @Value - public class NewCancellationSubmissionResult { - - final Optional failingReason; - final Optional jobId; + public ManualOperationResult startNewCancellation(final UUID connectionId) { + log.info("Manual cancellation request"); - } + final long jobId = ConnectionManagerUtils.getCurrentJobId(client, connectionId); - public ManualSyncSubmissionResult startNewCancelation(final UUID connectionId) { - log.info("Manual sync request"); - - final boolean workflowReachable = isWorkflowReachable(getConnectionManagerName(connectionId)); - - if (!workflowReachable) { - log.error("Can't cancel a non running workflow"); - return new ManualSyncSubmissionResult( - Optional.of("No scheduler workflow is reachable for: " + connectionId), + try { + ConnectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, workflow -> workflow::cancelJob); + } catch (final DeletedWorkflowException e) { + log.error("Can't cancel a deleted workflow", e); + return new ManualOperationResult( + Optional.of(e.getMessage()), Optional.empty()); } - final ConnectionManagerWorkflow connectionManagerWorkflow = - getExistingWorkflow(ConnectionManagerWorkflow.class, getConnectionManagerName(connectionId)); - - connectionManagerWorkflow.cancelJob(); - do { try { Thread.sleep(DELAY_BETWEEN_QUERY_MS); } catch (final InterruptedException e) { - return new ManualSyncSubmissionResult( - Optional.of("Didn't manage cancel a sync for: " + connectionId), + return new ManualOperationResult( + Optional.of("Didn't manage to cancel a sync for: " + connectionId), Optional.empty()); } - } while (isWorkflowStateRunning(getConnectionManagerName(connectionId))); + } while (isWorkflowStateRunning(connectionId)); log.info("end of manual cancellation"); - final long jobId = connectionManagerWorkflow.getJobInformation().getJobId(); - - return new ManualSyncSubmissionResult( + return new ManualOperationResult( Optional.empty(), Optional.of(jobId)); } - public ManualSyncSubmissionResult resetConnection(final UUID connectionId) { + public ManualOperationResult resetConnection(final UUID connectionId) { log.info("reset sync request"); - final boolean workflowReachable = isWorkflowReachable(getConnectionManagerName(connectionId)); + // get the job ID before the reset, defaulting to NON_RUNNING_JOB_ID if workflow is unreachable + final long oldJobId = ConnectionManagerUtils.getCurrentJobId(client, connectionId); - if (!workflowReachable) { - log.error("Can't reset a non-reachable workflow"); - return new ManualSyncSubmissionResult( - Optional.of("No scheduler workflow is reachable for: " + connectionId), + final ConnectionManagerWorkflow connectionManagerWorkflow; + try { + connectionManagerWorkflow = + ConnectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, workflow -> workflow::resetConnection); + } catch (final DeletedWorkflowException e) { + log.error("Can't reset a deleted workflow", e); + return new ManualOperationResult( + Optional.of(e.getMessage()), Optional.empty()); } - final ConnectionManagerWorkflow connectionManagerWorkflow = - getExistingWorkflow(ConnectionManagerWorkflow.class, getConnectionManagerName(connectionId)); - - final long oldJobId = connectionManagerWorkflow.getJobInformation().getJobId(); - - connectionManagerWorkflow.resetConnection(); - do { try { Thread.sleep(DELAY_BETWEEN_QUERY_MS); } catch (final InterruptedException e) { - return new ManualSyncSubmissionResult( + return new ManualOperationResult( Optional.of("Didn't manage to reset a sync for: " + connectionId), Optional.empty()); } @@ -420,7 +375,7 @@ public ManualSyncSubmissionResult resetConnection(final UUID connectionId) { final long jobId = connectionManagerWorkflow.getJobInformation().getJobId(); - return new ManualSyncSubmissionResult( + return new ManualOperationResult( Optional.empty(), Optional.of(jobId)); } @@ -431,14 +386,21 @@ public ManualSyncSubmissionResult resetConnection(final UUID connectionId) { * The way to do so is to wait for the jobId to change, either to a new job id or the default id * that signal that a workflow is waiting to be submitted */ - public ManualSyncSubmissionResult synchronousResetConnection(final UUID connectionId) { - final ManualSyncSubmissionResult resetResult = resetConnection(connectionId); + public ManualOperationResult synchronousResetConnection(final UUID connectionId) { + final ManualOperationResult resetResult = resetConnection(connectionId); if (resetResult.getFailingReason().isPresent()) { return resetResult; } - final ConnectionManagerWorkflow connectionManagerWorkflow = - getExistingWorkflow(ConnectionManagerWorkflow.class, getConnectionManagerName(connectionId)); + final ConnectionManagerWorkflow connectionManagerWorkflow; + try { + connectionManagerWorkflow = ConnectionManagerUtils.getConnectionManagerWorkflow(client, connectionId); + } catch (final Exception e) { + log.error("Encountered exception retrieving workflow after reset.", e); + return new ManualOperationResult( + Optional.of(e.getMessage()), + Optional.empty()); + } final long oldJobId = connectionManagerWorkflow.getJobInformation().getJobId(); @@ -446,7 +408,7 @@ public ManualSyncSubmissionResult synchronousResetConnection(final UUID connecti try { Thread.sleep(DELAY_BETWEEN_QUERY_MS); } catch (final InterruptedException e) { - return new ManualSyncSubmissionResult( + return new ManualOperationResult( Optional.of("Didn't manage to reset a sync for: " + connectionId), Optional.empty()); } @@ -456,7 +418,7 @@ public ManualSyncSubmissionResult synchronousResetConnection(final UUID connecti final long jobId = connectionManagerWorkflow.getJobInformation().getJobId(); - return new ManualSyncSubmissionResult( + return new ManualOperationResult( Optional.empty(), Optional.of(jobId)); } @@ -469,23 +431,6 @@ private T getWorkflowOptionsWithWorkflowId(final Class workflowClass, fin return client.newWorkflowStub(workflowClass, TemporalUtils.getWorkflowOptionsWithWorkflowId(jobType, name)); } - private T getExistingWorkflow(final Class workflowClass, final String name) { - return client.newWorkflowStub(workflowClass, name); - } - - ConnectionManagerWorkflow getConnectionUpdateWorkflow(final UUID connectionId) { - final boolean workflowReachable = isWorkflowReachable(getConnectionManagerName(connectionId)); - - if (!workflowReachable) { - throw new IllegalStateException("No reachable workflow for the connection {} while trying to delete it"); - } - - final ConnectionManagerWorkflow connectionManagerWorkflow = - getExistingWorkflow(ConnectionManagerWorkflow.class, getConnectionManagerName(connectionId)); - - return connectionManagerWorkflow; - } - @VisibleForTesting TemporalResponse execute(final JobRunConfig jobRunConfig, final Supplier executor) { final Path jobRoot = WorkerUtils.getJobRoot(workspaceRoot, jobRunConfig); @@ -506,14 +451,12 @@ TemporalResponse execute(final JobRunConfig jobRunConfig, final Supplier< /** * Check if a workflow is reachable for signal calls by attempting to query for current state. If - * the query succeeds, the workflow is reachable. + * the query succeeds, and the workflow is not marked as deleted, the workflow is reachable. */ @VisibleForTesting - boolean isWorkflowReachable(final String workflowName) { + boolean isWorkflowReachable(final UUID connectionId) { try { - final ConnectionManagerWorkflow connectionManagerWorkflow = getExistingWorkflow(ConnectionManagerWorkflow.class, workflowName); - connectionManagerWorkflow.getState(); - + ConnectionManagerUtils.getConnectionManagerWorkflow(client, connectionId); return true; } catch (final Exception e) { return false; @@ -524,9 +467,9 @@ boolean isWorkflowReachable(final String workflowName) { * Check if a workflow is reachable and has state {@link WorkflowState#isRunning()} */ @VisibleForTesting - boolean isWorkflowStateRunning(final String workflowName) { + boolean isWorkflowStateRunning(final UUID connectionId) { try { - final ConnectionManagerWorkflow connectionManagerWorkflow = getExistingWorkflow(ConnectionManagerWorkflow.class, workflowName); + final ConnectionManagerWorkflow connectionManagerWorkflow = ConnectionManagerUtils.getConnectionManagerWorkflow(client, connectionId); return connectionManagerWorkflow.getState().isRunning(); } catch (final Exception e) { @@ -534,8 +477,4 @@ boolean isWorkflowStateRunning(final String workflowName) { } } - static String getConnectionManagerName(final UUID connectionId) { - return "connection_manager_" + connectionId; - } - } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/exception/DeletedWorkflowException.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/exception/DeletedWorkflowException.java new file mode 100644 index 000000000000..234dcfac612d --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/exception/DeletedWorkflowException.java @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.exception; + +public class DeletedWorkflowException extends Exception { + + public DeletedWorkflowException(final String message) { + super(message); + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/exception/UnreachableWorkflowException.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/exception/UnreachableWorkflowException.java new file mode 100644 index 000000000000..8ae77e3e5041 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/exception/UnreachableWorkflowException.java @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.exception; + +public class UnreachableWorkflowException extends Exception { + + public UnreachableWorkflowException(final String message, final Throwable t) { + super(message, t); + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 96b31fdc2e1b..310efd5db39a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -29,6 +29,7 @@ import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptFailureInput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptNumberCreationOutput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptNumberFailureInput; +import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.EnsureCleanJobStateInput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInputWithAttemptNumber; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationInput; @@ -69,6 +70,9 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow private static final String RENAME_ATTEMPT_ID_TO_NUMBER_TAG = "rename_attempt_id_to_number"; private static final int RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION = 1; + private static final String ENSURE_CLEAN_JOB_STATE = "ensure_clean_job_state"; + private static final int ENSURE_CLEAN_JOB_STATE_CURRENT_VERSION = 1; + private WorkflowState workflowState = new WorkflowState(UUID.randomUUID(), new NoopStateListener()); private final WorkflowInternalState workflowInternalState = new WorkflowInternalState(); @@ -128,6 +132,11 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn return Workflow.newCancellationScope(() -> { connectionId = connectionUpdaterInput.getConnectionId(); + // Clean the job state by failing any jobs for this connection that are currently non-terminal. + // This catches cases where the temporal workflow was terminated and restarted while a job was + // actively running, leaving that job in an orphaned and non-terminal state. + ensureCleanJobState(connectionUpdaterInput); + // workflow state is only ever set in test cases. for production cases, it will always be null. if (connectionUpdaterInput.getWorkflowState() != null) { workflowState = connectionUpdaterInput.getWorkflowState(); @@ -136,9 +145,12 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn // when a reset is triggered, the previous attempt, cancels itself (unless it is already a reset, in // which case it does nothing). the previous run that cancels itself then passes on the // resetConnection flag to the next run so that that run can execute the actual reset - workflowState.setResetConnection(connectionUpdaterInput.isResetConnection()); - - workflowState.setResetWithScheduling(connectionUpdaterInput.isFromJobResetFailure()); + if (connectionUpdaterInput.isResetConnection()) { + workflowState.setResetConnection(true); + } + if (connectionUpdaterInput.isFromJobResetFailure()) { + workflowState.setResetWithScheduling(true); + } final Duration timeToWait = getTimeToWait(connectionUpdaterInput.getConnectionId()); @@ -432,6 +444,23 @@ private Duration getTimeToWait(final UUID connectionId) { return scheduleRetrieverOutput.getTimeToWait(); } + private void ensureCleanJobState(final ConnectionUpdaterInput connectionUpdaterInput) { + final int ensureCleanJobStateVersion = + Workflow.getVersion(ENSURE_CLEAN_JOB_STATE, Workflow.DEFAULT_VERSION, ENSURE_CLEAN_JOB_STATE_CURRENT_VERSION); + + // For backwards compatibility and determinism, skip if workflow existed before this change + if (ensureCleanJobStateVersion < ENSURE_CLEAN_JOB_STATE_CURRENT_VERSION) { + return; + } + + if (connectionUpdaterInput.getJobId() != null) { + log.info("This workflow is already attached to a job, so no need to clean job state."); + return; + } + + runMandatoryActivity(jobCreationAndStatusUpdateActivity::ensureCleanJobState, new EnsureCleanJobStateInput(connectionId)); + } + /** * Creates a new job if it is not present in the input. If the jobId is specified in the input of * the connectionManagerWorkflow, we will return it. Otherwise we will create a job and return its diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java index 2de587c2a15b..87c0b9ef488c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java @@ -222,4 +222,16 @@ class ReportJobStartInput { @ActivityMethod void reportJobStart(ReportJobStartInput reportJobStartInput); + @Data + @NoArgsConstructor + @AllArgsConstructor + class EnsureCleanJobStateInput { + + private UUID connectionId; + + } + + @ActivityMethod + void ensureCleanJobState(EnsureCleanJobStateInput input); + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 84643c243c60..2f7cf4dec795 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -23,6 +23,7 @@ import io.airbyte.metrics.lib.DogStatsDMetricSingleton; import io.airbyte.metrics.lib.MetricTags; import io.airbyte.metrics.lib.OssMetricsRegistry; +import io.airbyte.scheduler.models.Attempt; import io.airbyte.scheduler.models.Job; import io.airbyte.scheduler.persistence.JobCreator; import io.airbyte.scheduler.persistence.JobNotifier; @@ -32,6 +33,7 @@ import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState; import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.JobStatus; +import io.airbyte.workers.helper.FailureHelper; import io.airbyte.workers.temporal.exception.RetryableException; import io.airbyte.workers.worker_run.TemporalWorkerRunFactory; import io.airbyte.workers.worker_run.WorkerRun; @@ -60,6 +62,12 @@ public class JobCreationAndStatusUpdateActivityImpl implements JobCreationAndSta @Override public JobCreationOutput createNewJob(final JobCreationInput input) { try { + // Fail non-terminal jobs first to prevent this activity from repeatedly trying to create a new job + // and failing, potentially resulting in the workflow ending up in a quarantined state. + // Another non-terminal job is not expected to exist at this point in the normal case, but this + // could happen in special edge cases for example when migrating to this from the old scheduler. + failNonTerminalJobs(input.getConnectionId()); + final StandardSync standardSync = configRepository.getStandardSync(input.getConnectionId()); if (input.isReset()) { @@ -262,6 +270,42 @@ public void reportJobStart(final ReportJobStartInput input) { } } + @Override + public void ensureCleanJobState(final EnsureCleanJobStateInput input) { + failNonTerminalJobs(input.getConnectionId()); + } + + private void failNonTerminalJobs(final UUID connectionId) { + try { + final List jobs = jobPersistence.listJobsForConnectionWithStatuses(connectionId, Job.REPLICATION_TYPES, + io.airbyte.scheduler.models.JobStatus.NON_TERMINAL_STATUSES); + for (final Job job : jobs) { + final long jobId = job.getId(); + log.info("Failing non-terminal job {}", jobId); + jobPersistence.failJob(jobId); + + // fail all non-terminal attempts + for (final Attempt attempt : job.getAttempts()) { + if (Attempt.isAttemptInTerminalState(attempt)) { + continue; + } + + // the Attempt object 'id' is actually the value of the attempt_number column in the db + final int attemptNumber = (int) attempt.getId(); + jobPersistence.failAttempt(jobId, attemptNumber); + jobPersistence.writeAttemptFailureSummary(jobId, attemptNumber, + FailureHelper.failureSummaryForTemporalCleaningJobState(jobId, attemptNumber)); + } + + final Job failedJob = jobPersistence.getJob(jobId); + jobNotifier.failJob("Failing job in order to start from clean job state for new temporal workflow run.", failedJob); + trackCompletion(failedJob, JobStatus.FAILED); + } + } catch (final IOException e) { + throw new RetryableException(e); + } + } + private void emitJobIdToReleaseStagesMetric(final OssMetricsRegistry metric, final long jobId) throws IOException { final var releaseStages = configRepository.getJobIdToReleaseStages(jobId); if (releaseStages == null || releaseStages.size() == 0) { diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java index 8292744b3bc5..bd42a178d877 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java @@ -12,11 +12,11 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.google.common.collect.Sets; @@ -33,11 +33,12 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; -import io.airbyte.workers.temporal.TemporalClient.ManualSyncSubmissionResult; +import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult; import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflow; import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflow; import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow; import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow.JobInformation; +import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflowImpl; import io.airbyte.workers.temporal.scheduling.state.WorkflowState; import io.airbyte.workers.temporal.spec.SpecWorkflow; import io.airbyte.workers.temporal.sync.SyncWorkflow; @@ -45,6 +46,7 @@ import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.workflow.Functions.Proc; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -55,6 +57,7 @@ import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; class TemporalClientTest { @@ -209,6 +212,9 @@ void testSubmitSync() { @Test public void testSynchronousResetConnection() { final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + final WorkflowState mWorkflowState = mock(WorkflowState.class); + when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); + when(mWorkflowState.isDeleted()).thenReturn(false); final long jobId1 = 1L; final long jobId2 = 2L; final long jobId3 = 3L; @@ -221,15 +227,15 @@ public void testSynchronousResetConnection() { new JobInformation(jobId3, 0), new JobInformation(jobId3, 0)); - doReturn(true).when(temporalClient).isWorkflowReachable(anyString()); + doReturn(true).when(temporalClient).isWorkflowReachable(any(UUID.class)); when(workflowClient.newWorkflowStub(any(Class.class), anyString())).thenReturn(mConnectionManagerWorkflow); - final ManualSyncSubmissionResult manualSyncSubmissionResult = temporalClient.synchronousResetConnection(CONNECTION_ID); + final ManualOperationResult manualOperationResult = temporalClient.synchronousResetConnection(CONNECTION_ID); verify(mConnectionManagerWorkflow).resetConnection(); - assertEquals(manualSyncSubmissionResult.getJobId().get(), jobId3); + assertEquals(manualOperationResult.getJobId().get(), jobId3); } } @@ -245,13 +251,14 @@ public void migrateCalled() { final UUID migratedId = UUID.randomUUID(); doReturn(false) - .when(temporalClient).isInRunningWorkflowCache(TemporalClient.getConnectionManagerName(nonMigratedId)); + .when(temporalClient).isInRunningWorkflowCache(ConnectionManagerUtils.getConnectionManagerName(nonMigratedId)); doReturn(true) - .when(temporalClient).isInRunningWorkflowCache(TemporalClient.getConnectionManagerName(migratedId)); + .when(temporalClient).isInRunningWorkflowCache(ConnectionManagerUtils.getConnectionManagerName(migratedId)); doNothing() .when(temporalClient).refreshRunningWorkflow(); - doNothing() + final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + doReturn(mConnectionManagerWorkflow) .when(temporalClient).submitConnectionUpdaterAsync(nonMigratedId); temporalClient.migrateSyncIfNeeded(Sets.newHashSet(nonMigratedId, migratedId)); @@ -268,11 +275,14 @@ class DeleteConnection { @Test @SuppressWarnings("unchecked") - @DisplayName("Test delete connection method.") + @DisplayName("Test delete connection method when workflow is in a running state.") void testDeleteConnection() { final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + final WorkflowState mWorkflowState = mock(WorkflowState.class); + when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); + when(mWorkflowState.isDeleted()).thenReturn(false); - doReturn(true).when(temporalClient).isWorkflowReachable(anyString()); + doReturn(true).when(temporalClient).isWorkflowReachable(any(UUID.class)); when(workflowClient.newWorkflowStub(any(Class.class), anyString())).thenReturn(mConnectionManagerWorkflow); final JobSyncConfig syncConfig = new JobSyncConfig() @@ -294,20 +304,43 @@ void testDeleteConnection() { @SuppressWarnings("unchecked") @DisplayName("Test delete connection method when workflow is in an unexpected state") void testDeleteConnectionInUnexpectedState() { - final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); - final BatchRequest mBatchRequest = mock(BatchRequest.class); + final ConnectionManagerWorkflow mTerminatedConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + when(mTerminatedConnectionManagerWorkflow.getState()) + .thenThrow(new IllegalStateException("Force state exception to simulate workflow not running")); + when(workflowClient.newWorkflowStub(any(Class.class), any(String.class))).thenReturn(mTerminatedConnectionManagerWorkflow); - doThrow(new IllegalStateException("Force illegal state")).when(temporalClient).getConnectionUpdateWorkflow(CONNECTION_ID); - when(workflowClient.newWorkflowStub(any(Class.class), any(WorkflowOptions.class))).thenReturn(mConnectionManagerWorkflow); + final ConnectionManagerWorkflow mNewConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + when(workflowClient.newWorkflowStub(any(Class.class), any(WorkflowOptions.class))).thenReturn(mNewConnectionManagerWorkflow); + final BatchRequest mBatchRequest = mock(BatchRequest.class); when(workflowClient.newSignalWithStartRequest()).thenReturn(mBatchRequest); temporalClient.deleteConnection(CONNECTION_ID); + verify(workflowClient).signalWithStart(mBatchRequest); - // this is only called when getting existing workflow - verify(workflowClient, Mockito.never()).newWorkflowStub(any(), anyString()); + // Verify that the deleteConnection signal was passed to the batch request by capturing the + // argument, + // executing the signal, and verifying that the desired signal was executed + final ArgumentCaptor batchRequestAddArgCaptor = ArgumentCaptor.forClass(Proc.class); + verify(mBatchRequest).add(batchRequestAddArgCaptor.capture()); + final Proc signal = batchRequestAddArgCaptor.getValue(); + signal.apply(); + verify(mNewConnectionManagerWorkflow).deleteConnection(); + } - verify(workflowClient).newSignalWithStartRequest(); - verify(workflowClient).signalWithStart(mBatchRequest); + @Test + @SuppressWarnings("unchecked") + @DisplayName("Test delete connection method when workflow has already been deleted") + void testDeleteConnectionOnDeletedWorkflow() { + final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + final WorkflowState mWorkflowState = mock(WorkflowState.class); + when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); + when(mWorkflowState.isDeleted()).thenReturn(true); + when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow); + + temporalClient.deleteConnection(CONNECTION_ID); + + verify(temporalClient).deleteConnection(CONNECTION_ID); + verifyNoMoreInteractions(temporalClient); } } @@ -324,6 +357,7 @@ void testUpdateConnection() { final WorkflowState mWorkflowState = mock(WorkflowState.class); when(mWorkflowState.isRunning()).thenReturn(true); + when(mWorkflowState.isDeleted()).thenReturn(false); when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); when(workflowClient.newWorkflowStub(any(Class.class), any(String.class))).thenReturn(mConnectionManagerWorkflow); @@ -337,13 +371,10 @@ void testUpdateConnection() { @DisplayName("Test update connection method starts a new workflow when workflow is in an unexpected state") void testUpdateConnectionInUnexpectedState() { final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); - final BatchRequest mBatchRequest = mock(BatchRequest.class); when(mConnectionManagerWorkflow.getState()).thenThrow(new IllegalStateException("Force state exception to simulate workflow not running")); when(workflowClient.newWorkflowStub(any(Class.class), any(String.class))).thenReturn(mConnectionManagerWorkflow); - when(workflowClient.newWorkflowStub(any(Class.class), any(WorkflowOptions.class))).thenReturn(mConnectionManagerWorkflow); - when(workflowClient.newSignalWithStartRequest()).thenReturn(mBatchRequest); - doNothing().when(temporalClient).submitConnectionUpdaterAsync(CONNECTION_ID); + doReturn(mConnectionManagerWorkflow).when(temporalClient).submitConnectionUpdaterAsync(CONNECTION_ID); temporalClient.update(CONNECTION_ID); @@ -353,6 +384,287 @@ void testUpdateConnectionInUnexpectedState() { verify(temporalClient, Mockito.times(1)).submitConnectionUpdaterAsync(CONNECTION_ID); } + @Test + @SuppressWarnings("unchecked") + @DisplayName("Test update connection method does nothing when connection is deleted") + void testUpdateConnectionDeletedWorkflow() { + final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + final WorkflowState mWorkflowState = mock(WorkflowState.class); + when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); + when(mWorkflowState.isDeleted()).thenReturn(true); + when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow); + + temporalClient.update(CONNECTION_ID); + + // this is only called when updating an existing workflow + verify(mConnectionManagerWorkflow, Mockito.never()).connectionUpdated(); + verify(temporalClient).update(CONNECTION_ID); + verifyNoMoreInteractions(temporalClient); + } + + } + + @Nested + @DisplayName("Test manual sync behavior") + class ManualSync { + + @Test + @DisplayName("Test startNewManualSync successful") + void testStartNewManualSyncSuccess() { + final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + final WorkflowState mWorkflowState = mock(WorkflowState.class); + when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); + when(mWorkflowState.isDeleted()).thenReturn(false); + when(mWorkflowState.isRunning()).thenReturn(false).thenReturn(true); + when(mConnectionManagerWorkflow.getJobInformation()).thenReturn(new JobInformation(JOB_ID, ATTEMPT_ID)); + when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow); + + final ManualOperationResult result = temporalClient.startNewManualSync(CONNECTION_ID); + + assertTrue(result.getJobId().isPresent()); + assertEquals(JOB_ID, result.getJobId().get()); + assertFalse(result.getFailingReason().isPresent()); + verify(mConnectionManagerWorkflow).submitManualSync(); + } + + @Test + @DisplayName("Test startNewManualSync fails if job is already running") + void testStartNewManualSyncAlreadyRunning() { + final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + final WorkflowState mWorkflowState = mock(WorkflowState.class); + when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); + when(mWorkflowState.isDeleted()).thenReturn(false); + when(mWorkflowState.isRunning()).thenReturn(true); + when(mConnectionManagerWorkflow.getJobInformation()).thenReturn(new JobInformation(JOB_ID, ATTEMPT_ID)); + when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow); + + final ManualOperationResult result = temporalClient.startNewManualSync(CONNECTION_ID); + + assertFalse(result.getJobId().isPresent()); + assertTrue(result.getFailingReason().isPresent()); + verify(mConnectionManagerWorkflow, times(0)).submitManualSync(); + } + + @Test + @DisplayName("Test startNewManualSync repairs the workflow if it is in a bad state") + void testStartNewManualSyncRepairsBadWorkflowState() { + final ConnectionManagerWorkflow mTerminatedConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + when(mTerminatedConnectionManagerWorkflow.getState()) + .thenThrow(new IllegalStateException("Force state exception to simulate workflow not running")); + when(mTerminatedConnectionManagerWorkflow.getJobInformation()).thenReturn(new JobInformation(JOB_ID, ATTEMPT_ID)); + when(workflowClient.newWorkflowStub(any(Class.class), any(String.class))).thenReturn(mTerminatedConnectionManagerWorkflow); + + final ConnectionManagerWorkflow mNewConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + final WorkflowState mWorkflowState = mock(WorkflowState.class); + when(mNewConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); + when(mWorkflowState.isDeleted()).thenReturn(false); + when(mWorkflowState.isRunning()).thenReturn(false).thenReturn(true); + when(mNewConnectionManagerWorkflow.getJobInformation()).thenReturn(new JobInformation(JOB_ID, ATTEMPT_ID)); + when(workflowClient.newWorkflowStub(any(Class.class), any(WorkflowOptions.class))).thenReturn(mNewConnectionManagerWorkflow); + final BatchRequest mBatchRequest = mock(BatchRequest.class); + when(workflowClient.newSignalWithStartRequest()).thenReturn(mBatchRequest); + + final ManualOperationResult result = temporalClient.startNewManualSync(CONNECTION_ID); + + assertTrue(result.getJobId().isPresent()); + assertEquals(JOB_ID, result.getJobId().get()); + assertFalse(result.getFailingReason().isPresent()); + verify(workflowClient).signalWithStart(mBatchRequest); + + // Verify that the submitManualSync signal was passed to the batch request by capturing the + // argument, + // executing the signal, and verifying that the desired signal was executed + final ArgumentCaptor batchRequestAddArgCaptor = ArgumentCaptor.forClass(Proc.class); + verify(mBatchRequest).add(batchRequestAddArgCaptor.capture()); + final Proc signal = batchRequestAddArgCaptor.getValue(); + signal.apply(); + verify(mNewConnectionManagerWorkflow).submitManualSync(); + } + + @Test + @SuppressWarnings("unchecked") + @DisplayName("Test startNewManualSync returns a failure reason when connection is deleted") + void testStartNewManualSyncDeletedWorkflow() { + final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + final WorkflowState mWorkflowState = mock(WorkflowState.class); + when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); + when(mWorkflowState.isDeleted()).thenReturn(true); + when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow); + + final ManualOperationResult result = temporalClient.startNewManualSync(CONNECTION_ID); + + // this is only called when updating an existing workflow + assertFalse(result.getJobId().isPresent()); + assertTrue(result.getFailingReason().isPresent()); + verify(mConnectionManagerWorkflow, times(0)).submitManualSync(); + } + + } + + @Nested + @DisplayName("Test cancellation behavior") + class Cancellation { + + @Test + @DisplayName("Test startNewCancellation successful") + void testStartNewCancellationSuccess() { + final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + final WorkflowState mWorkflowState = mock(WorkflowState.class); + when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); + when(mWorkflowState.isDeleted()).thenReturn(false); + when(mWorkflowState.isRunning()).thenReturn(true).thenReturn(false); + when(mConnectionManagerWorkflow.getJobInformation()).thenReturn(new JobInformation(JOB_ID, ATTEMPT_ID)); + when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow); + + final ManualOperationResult result = temporalClient.startNewCancellation(CONNECTION_ID); + + assertTrue(result.getJobId().isPresent()); + assertEquals(JOB_ID, result.getJobId().get()); + assertFalse(result.getFailingReason().isPresent()); + verify(mConnectionManagerWorkflow).cancelJob(); + } + + @Test + @DisplayName("Test startNewCancellation repairs the workflow if it is in a bad state") + void testStartNewCancellationRepairsBadWorkflowState() { + final ConnectionManagerWorkflow mTerminatedConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + when(mTerminatedConnectionManagerWorkflow.getState()) + .thenThrow(new IllegalStateException("Force state exception to simulate workflow not running")); + when(mTerminatedConnectionManagerWorkflow.getJobInformation()).thenReturn(new JobInformation(JOB_ID, ATTEMPT_ID)); + when(workflowClient.newWorkflowStub(any(Class.class), any(String.class))).thenReturn(mTerminatedConnectionManagerWorkflow); + + final ConnectionManagerWorkflow mNewConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + final WorkflowState mWorkflowState = mock(WorkflowState.class); + when(mNewConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); + when(mWorkflowState.isDeleted()).thenReturn(false); + when(mWorkflowState.isRunning()).thenReturn(true).thenReturn(false); + when(mNewConnectionManagerWorkflow.getJobInformation()).thenReturn(new JobInformation(JOB_ID, ATTEMPT_ID)); + when(workflowClient.newWorkflowStub(any(Class.class), any(WorkflowOptions.class))).thenReturn(mNewConnectionManagerWorkflow); + final BatchRequest mBatchRequest = mock(BatchRequest.class); + when(workflowClient.newSignalWithStartRequest()).thenReturn(mBatchRequest); + + final ManualOperationResult result = temporalClient.startNewCancellation(CONNECTION_ID); + + assertTrue(result.getJobId().isPresent()); + assertEquals(ConnectionManagerWorkflowImpl.NON_RUNNING_JOB_ID, result.getJobId().get()); + assertFalse(result.getFailingReason().isPresent()); + verify(workflowClient).signalWithStart(mBatchRequest); + + // Verify that the cancelJob signal was passed to the batch request by capturing the argument, + // executing the signal, and verifying that the desired signal was executed + final ArgumentCaptor batchRequestAddArgCaptor = ArgumentCaptor.forClass(Proc.class); + verify(mBatchRequest).add(batchRequestAddArgCaptor.capture()); + final Proc signal = batchRequestAddArgCaptor.getValue(); + signal.apply(); + verify(mNewConnectionManagerWorkflow).cancelJob(); + } + + @Test + @SuppressWarnings("unchecked") + @DisplayName("Test startNewCancellation returns a failure reason when connection is deleted") + void testStartNewCancellationDeletedWorkflow() { + final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + final WorkflowState mWorkflowState = mock(WorkflowState.class); + when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); + when(mWorkflowState.isDeleted()).thenReturn(true); + when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow); + + final ManualOperationResult result = temporalClient.startNewCancellation(CONNECTION_ID); + + // this is only called when updating an existing workflow + assertFalse(result.getJobId().isPresent()); + assertTrue(result.getFailingReason().isPresent()); + verify(mConnectionManagerWorkflow, times(0)).cancelJob(); + } + + } + + @Nested + @DisplayName("Test reset connection behavior") + class ResetConnection { + + @Test + @DisplayName("Test resetConnection successful") + void testResetConnectionSuccess() { + final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + final WorkflowState mWorkflowState = mock(WorkflowState.class); + when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); + when(mWorkflowState.isDeleted()).thenReturn(false); + when(mWorkflowState.isRunning()).thenReturn(false); + final long jobId1 = 1; + final long jobId2 = 2; + when(mConnectionManagerWorkflow.getJobInformation()).thenReturn( + new JobInformation(jobId1, 0), + new JobInformation(jobId1, 0), + new JobInformation(jobId2, 0), + new JobInformation(jobId2, 0)); + when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow); + + final ManualOperationResult result = temporalClient.resetConnection(CONNECTION_ID); + + assertTrue(result.getJobId().isPresent()); + assertEquals(jobId2, result.getJobId().get()); + assertFalse(result.getFailingReason().isPresent()); + verify(mConnectionManagerWorkflow).resetConnection(); + } + + @Test + @DisplayName("Test resetConnection repairs the workflow if it is in a bad state") + void testResetConnectionRepairsBadWorkflowState() { + final ConnectionManagerWorkflow mTerminatedConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + when(mTerminatedConnectionManagerWorkflow.getState()) + .thenThrow(new IllegalStateException("Force state exception to simulate workflow not running")); + when(mTerminatedConnectionManagerWorkflow.getJobInformation()).thenReturn(new JobInformation(JOB_ID, ATTEMPT_ID)); + when(workflowClient.newWorkflowStub(any(Class.class), any(String.class))).thenReturn(mTerminatedConnectionManagerWorkflow); + + final ConnectionManagerWorkflow mNewConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + final WorkflowState mWorkflowState = mock(WorkflowState.class); + when(mNewConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); + when(mWorkflowState.isDeleted()).thenReturn(false); + when(mWorkflowState.isRunning()).thenReturn(false); + when(mNewConnectionManagerWorkflow.getJobInformation()).thenReturn( + new JobInformation(ConnectionManagerWorkflowImpl.NON_RUNNING_JOB_ID, 0), + new JobInformation(ConnectionManagerWorkflowImpl.NON_RUNNING_JOB_ID, 0), + new JobInformation(JOB_ID, 0), + new JobInformation(JOB_ID, 0)); + when(workflowClient.newWorkflowStub(any(Class.class), any(WorkflowOptions.class))).thenReturn(mNewConnectionManagerWorkflow); + final BatchRequest mBatchRequest = mock(BatchRequest.class); + when(workflowClient.newSignalWithStartRequest()).thenReturn(mBatchRequest); + + final ManualOperationResult result = temporalClient.resetConnection(CONNECTION_ID); + + assertTrue(result.getJobId().isPresent()); + assertEquals(JOB_ID, result.getJobId().get()); + assertFalse(result.getFailingReason().isPresent()); + verify(workflowClient).signalWithStart(mBatchRequest); + + // Verify that the resetConnection signal was passed to the batch request by capturing the argument, + // executing the signal, and verifying that the desired signal was executed + final ArgumentCaptor batchRequestAddArgCaptor = ArgumentCaptor.forClass(Proc.class); + verify(mBatchRequest).add(batchRequestAddArgCaptor.capture()); + final Proc signal = batchRequestAddArgCaptor.getValue(); + signal.apply(); + verify(mNewConnectionManagerWorkflow).resetConnection(); + } + + @Test + @SuppressWarnings("unchecked") + @DisplayName("Test resetConnection returns a failure reason when connection is deleted") + void testResetConnectionDeletedWorkflow() { + final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + final WorkflowState mWorkflowState = mock(WorkflowState.class); + when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); + when(mWorkflowState.isDeleted()).thenReturn(true); + when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow); + + final ManualOperationResult result = temporalClient.resetConnection(CONNECTION_ID); + + // this is only called when updating an existing workflow + assertFalse(result.getJobId().isPresent()); + assertTrue(result.getFailingReason().isPresent()); + verify(mConnectionManagerWorkflow, times(0)).resetConnection(); + } + } } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index 9aadf0279269..85a661917fc4 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -413,6 +413,28 @@ public void deleteSync() throws InterruptedException { Mockito.verify(mConnectionDeletionActivity, Mockito.times(1)).deleteConnection(Mockito.any()); } + @RepeatedTest(10) + @Timeout(value = 2, + unit = TimeUnit.SECONDS) + @DisplayName("Test that fresh workflow cleans the job state") + public void testStartFromCleanJobState() throws InterruptedException { + final ConnectionUpdaterInput input = ConnectionUpdaterInput.builder() + .connectionId(UUID.randomUUID()) + .jobId(null) + .attemptId(null) + .fromFailure(false) + .attemptNumber(1) + .workflowState(null) + .resetConnection(false) + .fromJobResetFailure(false) + .build(); + + startWorkflowAndWaitUntilReady(workflow, input); + testEnv.sleep(Duration.ofSeconds(30L)); + + Mockito.verify(mJobCreationAndStatusUpdateActivity, Mockito.times(1)).ensureCleanJobState(Mockito.any()); + } + } @Nested diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index 5448541f87cd..c1379d618a4c 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -4,10 +4,15 @@ package io.airbyte.workers.temporal.scheduling.activities; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; + import io.airbyte.config.AttemptFailureSummary; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.FailureReason; import io.airbyte.config.FailureReason.FailureOrigin; +import io.airbyte.config.JobConfig; +import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobOutput; import io.airbyte.config.NormalizationSummary; import io.airbyte.config.StandardSync; @@ -18,7 +23,10 @@ import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.scheduler.models.Attempt; +import io.airbyte.scheduler.models.AttemptStatus; import io.airbyte.scheduler.models.Job; +import io.airbyte.scheduler.models.JobStatus; import io.airbyte.scheduler.persistence.JobNotifier; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory; @@ -30,6 +38,7 @@ import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationOutput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptFailureInput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptNumberCreationOutput; +import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.EnsureCleanJobStateInput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationInput; import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationOutput; @@ -40,6 +49,7 @@ import java.io.IOException; import java.nio.file.Path; import java.util.Collections; +import java.util.List; import java.util.UUID; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.DisplayName; @@ -224,7 +234,7 @@ public void setJobSuccess() throws IOException { Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput); Mockito.verify(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_ID); Mockito.verify(mJobNotifier).successJob(Mockito.any()); - Mockito.verify(mJobtracker).trackSync(Mockito.any(), Mockito.eq(JobState.SUCCEEDED)); + Mockito.verify(mJobtracker).trackSync(Mockito.any(), eq(JobState.SUCCEEDED)); } @Test @@ -242,7 +252,7 @@ public void setJobFailure() throws IOException { jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, "reason")); Mockito.verify(mJobPersistence).failJob(JOB_ID); - Mockito.verify(mJobNotifier).failJob(Mockito.eq("reason"), Mockito.any()); + Mockito.verify(mJobNotifier).failJob(eq("reason"), Mockito.any()); } @Test @@ -295,6 +305,36 @@ public void setJobCancelledWrapException() throws IOException { .hasCauseInstanceOf(IOException.class); } + @Test + public void ensureCleanJobState() throws IOException { + final Attempt failedAttempt = new Attempt(0, 1, Path.of(""), null, AttemptStatus.FAILED, null, 2L, 3L, 3L); + final int runningAttemptNumber = 1; + final Attempt runningAttempt = new Attempt(runningAttemptNumber, 1, Path.of(""), null, AttemptStatus.RUNNING, null, 4L, 5L, null); + final Job runningJob = new Job(1, ConfigType.SYNC, CONNECTION_ID.toString(), new JobConfig(), List.of(failedAttempt, runningAttempt), + JobStatus.RUNNING, 2L, 2L, 3L); + + final Job pendingJob = new Job(2, ConfigType.SYNC, CONNECTION_ID.toString(), new JobConfig(), List.of(), JobStatus.PENDING, 4L, 4L, 5L); + + Mockito.when(mJobPersistence.listJobsForConnectionWithStatuses(CONNECTION_ID, Job.REPLICATION_TYPES, JobStatus.NON_TERMINAL_STATUSES)) + .thenReturn(List.of(runningJob, pendingJob)); + Mockito.when(mJobPersistence.getJob(runningJob.getId())).thenReturn(runningJob); + Mockito.when(mJobPersistence.getJob(pendingJob.getId())).thenReturn(pendingJob); + + jobCreationAndStatusUpdateActivity.ensureCleanJobState(new EnsureCleanJobStateInput(CONNECTION_ID)); + + Mockito.verify(mJobPersistence).failJob(runningJob.getId()); + Mockito.verify(mJobPersistence).failJob(pendingJob.getId()); + Mockito.verify(mJobPersistence).failAttempt(runningJob.getId(), runningAttemptNumber); + Mockito.verify(mJobPersistence).writeAttemptFailureSummary(eq(runningJob.getId()), eq(runningAttemptNumber), any()); + Mockito.verify(mJobPersistence).getJob(runningJob.getId()); + Mockito.verify(mJobPersistence).getJob(pendingJob.getId()); + Mockito.verify(mJobNotifier).failJob(any(), eq(runningJob)); + Mockito.verify(mJobNotifier).failJob(any(), eq(pendingJob)); + Mockito.verify(mJobtracker).trackSync(runningJob, JobState.FAILED); + Mockito.verify(mJobtracker).trackSync(pendingJob, JobState.FAILED); + Mockito.verifyNoMoreInteractions(mJobPersistence, mJobNotifier, mJobtracker); + } + } }