From 5f6f68ba6f7b154aeeb72bba2d06c4d3088d8858 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Wed, 9 Nov 2022 13:05:27 -0800 Subject: [PATCH] Change where a connection is deleted (#19096) * Tmp * Move when the deletion is performed * Re-enable disable test * PR comments * Use cancel * rename * Fix test and version check position * Log exception --- .../temporal/ConnectionManagerUtils.java | 16 ++++++ .../commons/temporal/TemporalClient.java | 14 ++--- .../commons/temporal/TemporalClientTest.java | 53 ++----------------- .../java/io/airbyte/server/ServerApp.java | 6 ++- .../server/handlers/ConnectionsHandler.java | 16 ++++-- .../server/handlers/SourceHandler.java | 8 ++- .../airbyte/server/scheduler/EventRunner.java | 4 +- .../server/scheduler/TemporalEventRunner.java | 4 +- .../handlers/ConnectionsHandlerTest.java | 14 +++-- .../test/acceptance/BasicAcceptanceTests.java | 1 - .../ConnectionManagerWorkflowImpl.java | 15 +++++- .../ConnectionDeletionActivityImpl.java | 1 + .../ConnectionManagerWorkflowTest.java | 3 +- 13 files changed, 79 insertions(+), 76 deletions(-) diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/ConnectionManagerUtils.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/ConnectionManagerUtils.java index 47ccaa36ab1f..0c73ceeb63a7 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/ConnectionManagerUtils.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/ConnectionManagerUtils.java @@ -30,6 +30,22 @@ @Slf4j public class ConnectionManagerUtils { + /** + * Send a cancellation to the workflow. It will swallow any exception and won't check if the + * workflow is already deleted when being cancel. + */ + public void deleteWorkflowIfItExist(final WorkflowClient client, + final UUID connectionId) { + try { + final ConnectionManagerWorkflow connectionManagerWorkflow = + client.newWorkflowStub(ConnectionManagerWorkflow.class, getConnectionManagerName(connectionId)); + connectionManagerWorkflow.deleteConnection(); + } catch (final Exception e) { + log.warn("The workflow is not reachable when trying to cancel it", e); + } + + } + /** * Attempts to send a signal to the existing ConnectionManagerWorkflow for the provided connection. * diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java index db543d0b11d0..7cb76408ac9e 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java @@ -476,13 +476,13 @@ public ConnectionManagerWorkflow submitConnectionUpdaterAsync(final UUID connect return connectionManagerWorkflow; } - public void deleteConnection(final UUID connectionId) { - try { - connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, - connectionManagerWorkflow -> connectionManagerWorkflow::deleteConnection); - } catch (final DeletedWorkflowException e) { - log.info("Connection {} has already been deleted.", connectionId); - } + /** + * This will cancel a workflow even if the connection is deleted already + * + * @param connectionId - connectionId to cancel + */ + public void forceDeleteWorkflow(final UUID connectionId) { + connectionManagerUtils.deleteWorkflowIfItExist(client, connectionId); } public void update(final UUID connectionId) { diff --git a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java index 82b161e5eb05..08e7fd986552 100644 --- a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java +++ b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java @@ -326,12 +326,12 @@ void migrateCalled() { @Nested @DisplayName("Test delete connection method.") - class DeleteConnection { + class ForceCancelConnection { @Test @SuppressWarnings(UNCHECKED) @DisplayName("Test delete connection method when workflow is in a running state.") - void testDeleteConnection() { + void testforceCancelConnection() { final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); final WorkflowState mWorkflowState = mock(WorkflowState.class); when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); @@ -349,54 +349,9 @@ void testDeleteConnection() { .withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog()); temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID); - temporalClient.deleteConnection(CONNECTION_ID); + temporalClient.forceDeleteWorkflow(CONNECTION_ID); - verify(workflowClient, Mockito.never()).newSignalWithStartRequest(); - verify(mConnectionManagerWorkflow).deleteConnection(); - } - - @Test - @SuppressWarnings(UNCHECKED) - @DisplayName("Test delete connection method when workflow is in an unexpected state") - void testDeleteConnectionInUnexpectedState() { - final ConnectionManagerWorkflow mTerminatedConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); - when(mTerminatedConnectionManagerWorkflow.getState()) - .thenThrow(new IllegalStateException(EXCEPTION_MESSAGE)); - when(workflowClient.newWorkflowStub(any(Class.class), any(String.class))).thenReturn(mTerminatedConnectionManagerWorkflow); - - final ConnectionManagerWorkflow mNewConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); - when(workflowClient.newWorkflowStub(any(Class.class), any(WorkflowOptions.class))).thenReturn(mNewConnectionManagerWorkflow); - final BatchRequest mBatchRequest = mock(BatchRequest.class); - when(workflowClient.newSignalWithStartRequest()).thenReturn(mBatchRequest); - - temporalClient.deleteConnection(CONNECTION_ID); - verify(workflowClient).signalWithStart(mBatchRequest); - - // Verify that the deleteConnection signal was passed to the batch request by capturing the - // argument, - // executing the signal, and verifying that the desired signal was executed - final ArgumentCaptor batchRequestAddArgCaptor = ArgumentCaptor.forClass(Proc.class); - verify(mBatchRequest).add(batchRequestAddArgCaptor.capture()); - final Proc signal = batchRequestAddArgCaptor.getValue(); - signal.apply(); - verify(mNewConnectionManagerWorkflow).deleteConnection(); - } - - @Test - @SuppressWarnings(UNCHECKED) - @DisplayName("Test delete connection method when workflow has already been deleted") - void testDeleteConnectionOnDeletedWorkflow() { - final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); - final WorkflowState mWorkflowState = mock(WorkflowState.class); - when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); - when(mWorkflowState.isDeleted()).thenReturn(true); - when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow); - mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED); - - temporalClient.deleteConnection(CONNECTION_ID); - - verify(temporalClient).deleteConnection(CONNECTION_ID); - verifyNoMoreInteractions(temporalClient); + verify(connectionManagerUtils).deleteWorkflowIfItExist(workflowClient, CONNECTION_ID); } } diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 530e61074d44..9936fa19ee27 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -70,6 +70,7 @@ import io.airbyte.server.scheduler.EventRunner; import io.airbyte.server.scheduler.TemporalEventRunner; import io.airbyte.validation.json.JsonSchemaValidator; +import io.airbyte.workers.helper.ConnectionHelper; import io.airbyte.workers.normalization.NormalizationRunnerFactory; import io.temporal.serviceclient.WorkflowServiceStubs; import java.net.http.HttpClient; @@ -258,11 +259,14 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final AttemptHandler attemptHandler = new AttemptHandler(jobPersistence); + final ConnectionHelper connectionHelper = new ConnectionHelper(configRepository, workspaceHelper); + final ConnectionsHandler connectionsHandler = new ConnectionsHandler( configRepository, workspaceHelper, trackingClient, - eventRunner); + eventRunner, + connectionHelper); final DestinationHandler destinationHandler = new DestinationHandler( configRepository, diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java index fe1df0a0dfd7..41c013e2a43b 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java @@ -78,29 +78,34 @@ public class ConnectionsHandler { private final WorkspaceHelper workspaceHelper; private final TrackingClient trackingClient; private final EventRunner eventRunner; + private final ConnectionHelper connectionHelper; @VisibleForTesting ConnectionsHandler(final ConfigRepository configRepository, final Supplier uuidGenerator, final WorkspaceHelper workspaceHelper, final TrackingClient trackingClient, - final EventRunner eventRunner) { + final EventRunner eventRunner, + final ConnectionHelper connectionHelper) { this.configRepository = configRepository; this.uuidGenerator = uuidGenerator; this.workspaceHelper = workspaceHelper; this.trackingClient = trackingClient; this.eventRunner = eventRunner; + this.connectionHelper = connectionHelper; } public ConnectionsHandler(final ConfigRepository configRepository, final WorkspaceHelper workspaceHelper, final TrackingClient trackingClient, - final EventRunner eventRunner) { + final EventRunner eventRunner, + final ConnectionHelper connectionHelper) { this(configRepository, UUID::randomUUID, workspaceHelper, trackingClient, - eventRunner); + eventRunner, + connectionHelper); } @@ -545,8 +550,9 @@ public boolean matchSearch(final DestinationSearch destinationSearch, final Dest return (destinationReadFromSearch == null || destinationReadFromSearch.equals(destinationRead)); } - public void deleteConnection(final UUID connectionId) { - eventRunner.deleteConnection(connectionId); + public void deleteConnection(final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException { + connectionHelper.deleteConnection(connectionId); + eventRunner.forceDeleteConnection(connectionId); } private ConnectionRead buildConnectionRead(final UUID connectionId) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java index 5751d271900c..ab1e1092d235 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java @@ -213,11 +213,15 @@ public void deleteSource(final SourceRead source) final var workspaceIdRequestBody = new WorkspaceIdRequestBody() .workspaceId(source.getWorkspaceId()); - connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody) + final List uuidsToDelete = connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody) .getConnections().stream() .filter(con -> con.getSourceId().equals(source.getSourceId())) .map(ConnectionRead::getConnectionId) - .forEach(connectionsHandler::deleteConnection); + .toList(); + + for (final UUID uuidToDelete : uuidsToDelete) { + connectionsHandler.deleteConnection(uuidToDelete); + } final var spec = getSpecFromSourceId(source.getSourceId()); final var fullConfig = secretsRepositoryReader.getSourceConnectionWithSecrets(source.getSourceId()).getConfiguration(); diff --git a/airbyte-server/src/main/java/io/airbyte/server/scheduler/EventRunner.java b/airbyte-server/src/main/java/io/airbyte/server/scheduler/EventRunner.java index ca2b47bb6cc3..bc4a83a07042 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/scheduler/EventRunner.java +++ b/airbyte-server/src/main/java/io/airbyte/server/scheduler/EventRunner.java @@ -20,8 +20,10 @@ public interface EventRunner { ManualOperationResult resetConnection(final UUID connectionId, final List streamsToReset, final boolean runSyncImmediately); - void deleteConnection(final UUID connectionId); + void forceDeleteConnection(final UUID connectionId); + // TODO: Delete + @Deprecated(forRemoval = true) void migrateSyncIfNeeded(final Set connectionIds); void update(final UUID connectionId); diff --git a/airbyte-server/src/main/java/io/airbyte/server/scheduler/TemporalEventRunner.java b/airbyte-server/src/main/java/io/airbyte/server/scheduler/TemporalEventRunner.java index 79b54ce9a058..5aa469a1a9d5 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/scheduler/TemporalEventRunner.java +++ b/airbyte-server/src/main/java/io/airbyte/server/scheduler/TemporalEventRunner.java @@ -40,8 +40,8 @@ public ManualOperationResult resetConnection(final UUID connectionId, } @Override - public void deleteConnection(final UUID connectionId) { - temporalClient.deleteConnection(connectionId); + public void forceDeleteConnection(final UUID connectionId) { + temporalClient.forceDeleteWorkflow(connectionId); } @Override diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java index 08086c7e3bae..9e48179153ac 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java @@ -66,6 +66,7 @@ import io.airbyte.server.helpers.ConnectionHelpers; import io.airbyte.server.scheduler.EventRunner; import io.airbyte.validation.json.JsonValidationException; +import io.airbyte.workers.helper.ConnectionHelper; import java.io.IOException; import java.util.Collections; import java.util.List; @@ -98,6 +99,7 @@ class ConnectionsHandlerTest { private WorkspaceHelper workspaceHelper; private TrackingClient trackingClient; private EventRunner eventRunner; + private ConnectionHelper connectionHelper; private static final String PRESTO_TO_HUDI = "presto to hudi"; private static final String PRESTO_TO_HUDI_PREFIX = "presto_to_hudi"; @@ -173,7 +175,7 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio workspaceHelper = mock(WorkspaceHelper.class); trackingClient = mock(TrackingClient.class); eventRunner = mock(EventRunner.class); - + connectionHelper = mock(ConnectionHelper.class); when(workspaceHelper.getWorkspaceForSourceIdIgnoreExceptions(sourceId)).thenReturn(workspaceId); when(workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(destinationId)).thenReturn(workspaceId); when(workspaceHelper.getWorkspaceForOperationIdIgnoreExceptions(operationId)).thenReturn(workspaceId); @@ -190,7 +192,8 @@ void setUp() throws JsonValidationException, ConfigNotFoundException, IOExceptio uuidGenerator, workspaceHelper, trackingClient, - eventRunner); + eventRunner, + connectionHelper); when(uuidGenerator.get()).thenReturn(standardSync.getConnectionId()); final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition() @@ -831,10 +834,10 @@ void testSearchConnections() throws JsonValidationException, ConfigNotFoundExcep } @Test - void testDeleteConnection() { + void testDeleteConnection() throws JsonValidationException, ConfigNotFoundException, IOException { connectionsHandler.deleteConnection(connectionId); - verify(eventRunner).deleteConnection(connectionId); + verify(connectionHelper).deleteConnection(connectionId); } @Test @@ -904,7 +907,8 @@ void setUp() { uuidGenerator, workspaceHelper, trackingClient, - eventRunner); + eventRunner, + connectionHelper); } @Test diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index af778f649cbe..4f9baa0bc91b 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -658,7 +658,6 @@ void testIncrementalSync() throws Exception { } - @Disabled @Test @Order(14) void testDeleteConnection() throws Exception { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index acd374bc50c8..17486aaa8b0a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -116,6 +116,9 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow private static final String CHECK_JOB_OUTPUT_TAG = "check_job_output"; private static final int CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION = 1; + private static final String DONT_DELETE_IN_TEMPORAL_TAG = "dont_delete_in_temporal"; + private static final int DONT_DELETE_IN_TEMPORAL_TAG_CURRENT_VERSION = 1; + private static final String DELETE_RESET_JOB_STREAMS_TAG = "delete_reset_job_streams"; private static final int DELETE_RESET_JOB_STREAMS_CURRENT_VERSION = 1; private static final String RECORD_METRIC_TAG = "record_metric"; @@ -182,10 +185,17 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr if (workflowState.isDeleted()) { if (workflowState.isRunning()) { log.info("Cancelling the current running job because a connection deletion was requested"); + // This call is not needed anymore since this will be cancel using the the cancellation state reportCancelled(connectionUpdaterInput.getConnectionId()); } - log.info("Workflow deletion was requested. Calling deleteConnection activity before terminating the workflow."); - deleteConnectionBeforeTerminatingTheWorkflow(); + + final int dontDeleteInTemporal = + Workflow.getVersion(DONT_DELETE_IN_TEMPORAL_TAG, Workflow.DEFAULT_VERSION, DONT_DELETE_IN_TEMPORAL_TAG_CURRENT_VERSION); + + if (dontDeleteInTemporal < DONT_DELETE_IN_TEMPORAL_TAG_CURRENT_VERSION) { + log.info("Workflow deletion was requested. Calling deleteConnection activity before terminating the workflow."); + deleteConnectionBeforeTerminatingTheWorkflow(); + } return; } @@ -503,6 +513,7 @@ public void cancelJob() { cancellableSyncWorkflow.cancel(); } + // TODO: Delete when the don't delete in temporal is removed @Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME) @Override public void deleteConnection() { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConnectionDeletionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConnectionDeletionActivityImpl.java index 28c00e99fe0e..820662445cee 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConnectionDeletionActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConnectionDeletionActivityImpl.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.Map; +// TODO: Deleted when version is removed @Singleton @Requires(env = WorkerMode.CONTROL_PLANE) public class ConnectionDeletionActivityImpl implements ConnectionDeletionActivity { diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index 008f89e51f55..258aae6dd58d 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -486,6 +486,7 @@ void cancelNonRunning() throws InterruptedException { Mockito.verifyNoInteractions(mJobCreationAndStatusUpdateActivity); } + // TODO: delete when the signal method can be removed @Test @Timeout(value = 10, unit = TimeUnit.SECONDS) @@ -533,7 +534,7 @@ void deleteSync() throws InterruptedException { && changedStateEvent.isValue()) .isEmpty(); - Mockito.verify(mConnectionDeletionActivity, Mockito.times(1)).deleteConnection(Mockito.any()); + Mockito.verify(mConnectionDeletionActivity, Mockito.times(0)).deleteConnection(Mockito.any()); } @Test