diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 3c27be01ddc7..9d995521fa72 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -6,6 +6,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableSet; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import io.airbyte.api.model.generated.AdvancedAuth; @@ -58,9 +59,11 @@ import io.airbyte.server.converters.ConfigurationUpdate; import io.airbyte.server.converters.JobConverter; import io.airbyte.server.converters.OauthModelConverter; +import io.airbyte.server.errors.ValueConflictKnownException; import io.airbyte.server.handlers.helpers.CatalogConverter; import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; +import io.airbyte.workers.temporal.ErrorCode; import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult; import java.io.IOException; import java.util.ArrayList; @@ -74,6 +77,9 @@ public class SchedulerHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerHandler.class); private static final HashFunction HASH_FUNCTION = Hashing.md5(); + private static final ImmutableSet VALUE_CONFLICT_EXCEPTION_ERROR_CODE_SET = + ImmutableSet.of(ErrorCode.WORKFLOW_DELETED, ErrorCode.WORKFLOW_RUNNING); + private final ConfigRepository configRepository; private final SecretsRepositoryWriter secretsRepositoryWriter; private final SynchronousSchedulerClient synchronousSchedulerClient; @@ -381,7 +387,11 @@ private JobInfoRead submitResetConnectionToWorker(final UUID connectionId) throw private JobInfoRead readJobFromResult(final ManualOperationResult manualOperationResult) throws IOException, IllegalStateException { if (manualOperationResult.getFailingReason().isPresent()) { - throw new IllegalStateException(manualOperationResult.getFailingReason().get()); + if (VALUE_CONFLICT_EXCEPTION_ERROR_CODE_SET.contains(manualOperationResult.getErrorCode().get())) { + throw new ValueConflictKnownException(manualOperationResult.getFailingReason().get()); + } else { + throw new IllegalStateException(manualOperationResult.getFailingReason().get()); + } } final Job job = jobPersistence.getJob(manualOperationResult.getJobId().get()); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index f67f6e0321cd..1f214fdacee9 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -8,6 +8,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -67,10 +68,12 @@ import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.server.converters.ConfigurationUpdate; import io.airbyte.server.converters.JobConverter; +import io.airbyte.server.errors.ValueConflictKnownException; import io.airbyte.server.helpers.DestinationHelpers; import io.airbyte.server.helpers.SourceHelpers; import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; +import io.airbyte.workers.temporal.ErrorCode; import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult; import java.io.IOException; import java.net.URI; @@ -587,6 +590,25 @@ void testSyncConnection() throws IOException { verify(eventRunner).startNewManualSync(connectionId); } + @Test + void testSyncConnectionFailWithOtherSyncRunning() throws IOException { + final UUID connectionId = UUID.randomUUID(); + + final ManualOperationResult manualOperationResult = ManualOperationResult + .builder() + .failingReason(Optional.of("another sync running")) + .jobId(Optional.empty()) + .errorCode(Optional.of(ErrorCode.WORKFLOW_RUNNING)) + .build(); + + when(eventRunner.startNewManualSync(connectionId)) + .thenReturn(manualOperationResult); + + assertThrows(ValueConflictKnownException.class, + () -> schedulerHandler.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId))); + + } + @Test void testResetConnection() throws IOException, JsonValidationException, ConfigNotFoundException { final UUID connectionId = UUID.randomUUID(); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ErrorCode.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ErrorCode.java new file mode 100644 index 000000000000..cd0a14b0c4c9 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ErrorCode.java @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal; + +public enum ErrorCode { + UNKNOWN, + WORKFLOW_DELETED, + WORKFLOW_RUNNING +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index cff240870404..8d98cc2f3160 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -290,6 +290,7 @@ public static class ManualOperationResult { final Optional failingReason; final Optional jobId; + final Optional errorCode; } @@ -300,7 +301,7 @@ public ManualOperationResult startNewManualSync(final UUID connectionId) { // TODO Bmoric: Error is running return new ManualOperationResult( Optional.of("A sync is already running for: " + connectionId), - Optional.empty()); + Optional.empty(), Optional.of(ErrorCode.WORKFLOW_RUNNING)); } try { @@ -309,7 +310,7 @@ public ManualOperationResult startNewManualSync(final UUID connectionId) { log.error("Can't sync a deleted connection.", e); return new ManualOperationResult( Optional.of(e.getMessage()), - Optional.empty()); + Optional.empty(), Optional.of(ErrorCode.WORKFLOW_DELETED)); } do { @@ -318,7 +319,7 @@ public ManualOperationResult startNewManualSync(final UUID connectionId) { } catch (final InterruptedException e) { return new ManualOperationResult( Optional.of("Didn't managed to start a sync for: " + connectionId), - Optional.empty()); + Optional.empty(), Optional.of(ErrorCode.UNKNOWN)); } } while (!ConnectionManagerUtils.isWorkflowStateRunning(client, connectionId)); @@ -328,7 +329,7 @@ public ManualOperationResult startNewManualSync(final UUID connectionId) { return new ManualOperationResult( Optional.empty(), - Optional.of(jobId)); + Optional.of(jobId), Optional.empty()); } public ManualOperationResult startNewCancellation(final UUID connectionId) { @@ -342,7 +343,7 @@ public ManualOperationResult startNewCancellation(final UUID connectionId) { log.error("Can't cancel a deleted workflow", e); return new ManualOperationResult( Optional.of(e.getMessage()), - Optional.empty()); + Optional.empty(), Optional.of(ErrorCode.WORKFLOW_DELETED)); } do { @@ -351,7 +352,7 @@ public ManualOperationResult startNewCancellation(final UUID connectionId) { } catch (final InterruptedException e) { return new ManualOperationResult( Optional.of("Didn't manage to cancel a sync for: " + connectionId), - Optional.empty()); + Optional.empty(), Optional.of(ErrorCode.UNKNOWN)); } } while (ConnectionManagerUtils.isWorkflowStateRunning(client, connectionId)); @@ -359,7 +360,7 @@ public ManualOperationResult startNewCancellation(final UUID connectionId) { return new ManualOperationResult( Optional.empty(), - Optional.of(jobId)); + Optional.of(jobId), Optional.empty()); } public ManualOperationResult resetConnection(final UUID connectionId, final List streamsToReset) { @@ -371,7 +372,7 @@ public ManualOperationResult resetConnection(final UUID connectionId, final List log.error("Could not persist streams to reset.", e); return new ManualOperationResult( Optional.of(e.getMessage()), - Optional.empty()); + Optional.empty(), Optional.of(ErrorCode.UNKNOWN)); } // get the job ID before the reset, defaulting to NON_RUNNING_JOB_ID if workflow is unreachable @@ -383,7 +384,7 @@ public ManualOperationResult resetConnection(final UUID connectionId, final List log.error("Can't reset a deleted workflow", e); return new ManualOperationResult( Optional.of(e.getMessage()), - Optional.empty()); + Optional.empty(), Optional.of(ErrorCode.UNKNOWN)); } Optional newJobId; @@ -394,7 +395,7 @@ public ManualOperationResult resetConnection(final UUID connectionId, final List } catch (final InterruptedException e) { return new ManualOperationResult( Optional.of("Didn't manage to reset a sync for: " + connectionId), - Optional.empty()); + Optional.empty(), Optional.of(ErrorCode.UNKNOWN)); } newJobId = getNewJobId(connectionId, oldJobId); } while (newJobId.isEmpty()); @@ -403,7 +404,7 @@ public ManualOperationResult resetConnection(final UUID connectionId, final List return new ManualOperationResult( Optional.empty(), - newJobId); + newJobId, Optional.empty()); } private Optional getNewJobId(final UUID connectionId, final long oldJobId) { @@ -428,14 +429,13 @@ public ManualOperationResult synchronousResetConnection(final UUID connectionId, } final long resetJobId = resetResult.getJobId().get(); - do { try { Thread.sleep(DELAY_BETWEEN_QUERY_MS); } catch (final InterruptedException e) { return new ManualOperationResult( Optional.of("Didn't manage to reset a sync for: " + connectionId), - Optional.empty()); + Optional.empty(), Optional.of(ErrorCode.UNKNOWN)); } } while (ConnectionManagerUtils.getCurrentJobId(client, connectionId) == resetJobId); @@ -443,7 +443,7 @@ public ManualOperationResult synchronousResetConnection(final UUID connectionId, return new ManualOperationResult( Optional.empty(), - Optional.of(resetJobId)); + Optional.of(resetJobId), Optional.empty()); } private T getWorkflowStub(final Class workflowClass, final TemporalJobType jobType) {