Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add error code to ManualOperationResult #14657

Merged
merged 2 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import io.airbyte.api.model.generated.AdvancedAuth;
Expand Down Expand Up @@ -58,9 +59,11 @@
import io.airbyte.server.converters.ConfigurationUpdate;
import io.airbyte.server.converters.JobConverter;
import io.airbyte.server.converters.OauthModelConverter;
import io.airbyte.server.errors.ValueConflictKnownException;
import io.airbyte.server.handlers.helpers.CatalogConverter;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.ErrorCode;
import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -74,6 +77,9 @@ public class SchedulerHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerHandler.class);
private static final HashFunction HASH_FUNCTION = Hashing.md5();

private static final ImmutableSet<ErrorCode> VALUE_CONFLICT_EXCEPTION_ERROR_CODE_SET =
ImmutableSet.of(ErrorCode.WORKFLOW_DELETED, ErrorCode.WORKFLOW_RUNNING);

private final ConfigRepository configRepository;
private final SecretsRepositoryWriter secretsRepositoryWriter;
private final SynchronousSchedulerClient synchronousSchedulerClient;
Expand Down Expand Up @@ -381,7 +387,11 @@ private JobInfoRead submitResetConnectionToWorker(final UUID connectionId) throw

private JobInfoRead readJobFromResult(final ManualOperationResult manualOperationResult) throws IOException, IllegalStateException {
if (manualOperationResult.getFailingReason().isPresent()) {
throw new IllegalStateException(manualOperationResult.getFailingReason().get());
if (VALUE_CONFLICT_EXCEPTION_ERROR_CODE_SET.contains(manualOperationResult.getErrorCode().get())) {
throw new ValueConflictKnownException(manualOperationResult.getFailingReason().get());
} else {
throw new IllegalStateException(manualOperationResult.getFailingReason().get());
}
}

final Job job = jobPersistence.getJob(manualOperationResult.getJobId().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -67,10 +68,12 @@
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.server.converters.ConfigurationUpdate;
import io.airbyte.server.converters.JobConverter;
import io.airbyte.server.errors.ValueConflictKnownException;
import io.airbyte.server.helpers.DestinationHelpers;
import io.airbyte.server.helpers.SourceHelpers;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.ErrorCode;
import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult;
import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -587,6 +590,25 @@ void testSyncConnection() throws IOException {
verify(eventRunner).startNewManualSync(connectionId);
}

@Test
void testSyncConnectionFailWithOtherSyncRunning() throws IOException {
final UUID connectionId = UUID.randomUUID();

final ManualOperationResult manualOperationResult = ManualOperationResult
.builder()
.failingReason(Optional.of("another sync running"))
.jobId(Optional.empty())
.errorCode(Optional.of(ErrorCode.WORKFLOW_RUNNING))
.build();

when(eventRunner.startNewManualSync(connectionId))
.thenReturn(manualOperationResult);

assertThrows(ValueConflictKnownException.class,
() -> schedulerHandler.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)));

}

@Test
void testResetConnection() throws IOException, JsonValidationException, ConfigNotFoundException {
final UUID connectionId = UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;

public enum ErrorCode {
UNKNOWN,
WORKFLOW_DELETED,
WORKFLOW_RUNNING
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ public static class ManualOperationResult {

final Optional<String> failingReason;
final Optional<Long> jobId;
final Optional<ErrorCode> errorCode;

}

Expand All @@ -300,7 +301,7 @@ public ManualOperationResult startNewManualSync(final UUID connectionId) {
// TODO Bmoric: Error is running
return new ManualOperationResult(
Optional.of("A sync is already running for: " + connectionId),
Optional.empty());
Optional.empty(), Optional.of(ErrorCode.WORKFLOW_RUNNING));
}

try {
Expand All @@ -309,7 +310,7 @@ public ManualOperationResult startNewManualSync(final UUID connectionId) {
log.error("Can't sync a deleted connection.", e);
return new ManualOperationResult(
Optional.of(e.getMessage()),
Optional.empty());
Optional.empty(), Optional.of(ErrorCode.WORKFLOW_DELETED));
}

do {
Expand All @@ -318,7 +319,7 @@ public ManualOperationResult startNewManualSync(final UUID connectionId) {
} catch (final InterruptedException e) {
return new ManualOperationResult(
Optional.of("Didn't managed to start a sync for: " + connectionId),
Optional.empty());
Optional.empty(), Optional.of(ErrorCode.UNKNOWN));
}
} while (!ConnectionManagerUtils.isWorkflowStateRunning(client, connectionId));

Expand All @@ -328,7 +329,7 @@ public ManualOperationResult startNewManualSync(final UUID connectionId) {

return new ManualOperationResult(
Optional.empty(),
Optional.of(jobId));
Optional.of(jobId), Optional.empty());
}

public ManualOperationResult startNewCancellation(final UUID connectionId) {
Expand All @@ -342,7 +343,7 @@ public ManualOperationResult startNewCancellation(final UUID connectionId) {
log.error("Can't cancel a deleted workflow", e);
return new ManualOperationResult(
Optional.of(e.getMessage()),
Optional.empty());
Optional.empty(), Optional.of(ErrorCode.WORKFLOW_DELETED));
}

do {
Expand All @@ -351,15 +352,15 @@ public ManualOperationResult startNewCancellation(final UUID connectionId) {
} catch (final InterruptedException e) {
return new ManualOperationResult(
Optional.of("Didn't manage to cancel a sync for: " + connectionId),
Optional.empty());
Optional.empty(), Optional.of(ErrorCode.UNKNOWN));
}
} while (ConnectionManagerUtils.isWorkflowStateRunning(client, connectionId));

log.info("end of manual cancellation");

return new ManualOperationResult(
Optional.empty(),
Optional.of(jobId));
Optional.of(jobId), Optional.empty());
}

public ManualOperationResult resetConnection(final UUID connectionId, final List<StreamDescriptor> streamsToReset) {
Expand All @@ -371,7 +372,7 @@ public ManualOperationResult resetConnection(final UUID connectionId, final List
log.error("Could not persist streams to reset.", e);
return new ManualOperationResult(
Optional.of(e.getMessage()),
Optional.empty());
Optional.empty(), Optional.of(ErrorCode.UNKNOWN));
}

// get the job ID before the reset, defaulting to NON_RUNNING_JOB_ID if workflow is unreachable
Expand All @@ -383,7 +384,7 @@ public ManualOperationResult resetConnection(final UUID connectionId, final List
log.error("Can't reset a deleted workflow", e);
return new ManualOperationResult(
Optional.of(e.getMessage()),
Optional.empty());
Optional.empty(), Optional.of(ErrorCode.UNKNOWN));
}

Optional<Long> newJobId;
Expand All @@ -394,7 +395,7 @@ public ManualOperationResult resetConnection(final UUID connectionId, final List
} catch (final InterruptedException e) {
return new ManualOperationResult(
Optional.of("Didn't manage to reset a sync for: " + connectionId),
Optional.empty());
Optional.empty(), Optional.of(ErrorCode.UNKNOWN));
}
newJobId = getNewJobId(connectionId, oldJobId);
} while (newJobId.isEmpty());
Expand All @@ -403,7 +404,7 @@ public ManualOperationResult resetConnection(final UUID connectionId, final List

return new ManualOperationResult(
Optional.empty(),
newJobId);
newJobId, Optional.empty());
}

private Optional<Long> getNewJobId(final UUID connectionId, final long oldJobId) {
Expand All @@ -428,22 +429,21 @@ public ManualOperationResult synchronousResetConnection(final UUID connectionId,
}

final long resetJobId = resetResult.getJobId().get();

do {
try {
Thread.sleep(DELAY_BETWEEN_QUERY_MS);
} catch (final InterruptedException e) {
return new ManualOperationResult(
Optional.of("Didn't manage to reset a sync for: " + connectionId),
Optional.empty());
Optional.empty(), Optional.of(ErrorCode.UNKNOWN));
}
} while (ConnectionManagerUtils.getCurrentJobId(client, connectionId) == resetJobId);

log.info("End of reset");

return new ManualOperationResult(
Optional.empty(),
Optional.of(resetJobId));
Optional.of(resetJobId), Optional.empty());
}

private <T> T getWorkflowStub(final Class<T> workflowClass, final TemporalJobType jobType) {
Expand Down