Skip to content

Commit

Permalink
Change where a connection is deleted (#19096)
Browse files Browse the repository at this point in the history
* Tmp

* Move when the deletion is performed

* Re-enable disable test

* PR comments

* Use cancel

* rename

* Fix test and version check position

* Log exception
  • Loading branch information
benmoriceau authored and akashkulk committed Nov 17, 2022
1 parent 90697fb commit 5f6f68b
Show file tree
Hide file tree
Showing 13 changed files with 79 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@
@Slf4j
public class ConnectionManagerUtils {

/**
* Send a cancellation to the workflow. It will swallow any exception and won't check if the
* workflow is already deleted when being cancel.
*/
public void deleteWorkflowIfItExist(final WorkflowClient client,
final UUID connectionId) {
try {
final ConnectionManagerWorkflow connectionManagerWorkflow =
client.newWorkflowStub(ConnectionManagerWorkflow.class, getConnectionManagerName(connectionId));
connectionManagerWorkflow.deleteConnection();
} catch (final Exception e) {
log.warn("The workflow is not reachable when trying to cancel it", e);
}

}

/**
* Attempts to send a signal to the existing ConnectionManagerWorkflow for the provided connection.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,13 +476,13 @@ public ConnectionManagerWorkflow submitConnectionUpdaterAsync(final UUID connect
return connectionManagerWorkflow;
}

public void deleteConnection(final UUID connectionId) {
try {
connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId,
connectionManagerWorkflow -> connectionManagerWorkflow::deleteConnection);
} catch (final DeletedWorkflowException e) {
log.info("Connection {} has already been deleted.", connectionId);
}
/**
* This will cancel a workflow even if the connection is deleted already
*
* @param connectionId - connectionId to cancel
*/
public void forceDeleteWorkflow(final UUID connectionId) {
connectionManagerUtils.deleteWorkflowIfItExist(client, connectionId);
}

public void update(final UUID connectionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,12 +326,12 @@ void migrateCalled() {

@Nested
@DisplayName("Test delete connection method.")
class DeleteConnection {
class ForceCancelConnection {

@Test
@SuppressWarnings(UNCHECKED)
@DisplayName("Test delete connection method when workflow is in a running state.")
void testDeleteConnection() {
void testforceCancelConnection() {
final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
final WorkflowState mWorkflowState = mock(WorkflowState.class);
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
Expand All @@ -349,54 +349,9 @@ void testDeleteConnection() {
.withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog());

temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID);
temporalClient.deleteConnection(CONNECTION_ID);
temporalClient.forceDeleteWorkflow(CONNECTION_ID);

verify(workflowClient, Mockito.never()).newSignalWithStartRequest();
verify(mConnectionManagerWorkflow).deleteConnection();
}

@Test
@SuppressWarnings(UNCHECKED)
@DisplayName("Test delete connection method when workflow is in an unexpected state")
void testDeleteConnectionInUnexpectedState() {
final ConnectionManagerWorkflow mTerminatedConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
when(mTerminatedConnectionManagerWorkflow.getState())
.thenThrow(new IllegalStateException(EXCEPTION_MESSAGE));
when(workflowClient.newWorkflowStub(any(Class.class), any(String.class))).thenReturn(mTerminatedConnectionManagerWorkflow);

final ConnectionManagerWorkflow mNewConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
when(workflowClient.newWorkflowStub(any(Class.class), any(WorkflowOptions.class))).thenReturn(mNewConnectionManagerWorkflow);
final BatchRequest mBatchRequest = mock(BatchRequest.class);
when(workflowClient.newSignalWithStartRequest()).thenReturn(mBatchRequest);

temporalClient.deleteConnection(CONNECTION_ID);
verify(workflowClient).signalWithStart(mBatchRequest);

// Verify that the deleteConnection signal was passed to the batch request by capturing the
// argument,
// executing the signal, and verifying that the desired signal was executed
final ArgumentCaptor<Proc> batchRequestAddArgCaptor = ArgumentCaptor.forClass(Proc.class);
verify(mBatchRequest).add(batchRequestAddArgCaptor.capture());
final Proc signal = batchRequestAddArgCaptor.getValue();
signal.apply();
verify(mNewConnectionManagerWorkflow).deleteConnection();
}

@Test
@SuppressWarnings(UNCHECKED)
@DisplayName("Test delete connection method when workflow has already been deleted")
void testDeleteConnectionOnDeletedWorkflow() {
final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
final WorkflowState mWorkflowState = mock(WorkflowState.class);
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
when(mWorkflowState.isDeleted()).thenReturn(true);
when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow);
mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED);

temporalClient.deleteConnection(CONNECTION_ID);

verify(temporalClient).deleteConnection(CONNECTION_ID);
verifyNoMoreInteractions(temporalClient);
verify(connectionManagerUtils).deleteWorkflowIfItExist(workflowClient, CONNECTION_ID);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.server.scheduler.TemporalEventRunner;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.workers.helper.ConnectionHelper;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.net.http.HttpClient;
Expand Down Expand Up @@ -258,11 +259,14 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,

final AttemptHandler attemptHandler = new AttemptHandler(jobPersistence);

final ConnectionHelper connectionHelper = new ConnectionHelper(configRepository, workspaceHelper);

final ConnectionsHandler connectionsHandler = new ConnectionsHandler(
configRepository,
workspaceHelper,
trackingClient,
eventRunner);
eventRunner,
connectionHelper);

final DestinationHandler destinationHandler = new DestinationHandler(
configRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,29 +78,34 @@ public class ConnectionsHandler {
private final WorkspaceHelper workspaceHelper;
private final TrackingClient trackingClient;
private final EventRunner eventRunner;
private final ConnectionHelper connectionHelper;

@VisibleForTesting
ConnectionsHandler(final ConfigRepository configRepository,
final Supplier<UUID> uuidGenerator,
final WorkspaceHelper workspaceHelper,
final TrackingClient trackingClient,
final EventRunner eventRunner) {
final EventRunner eventRunner,
final ConnectionHelper connectionHelper) {
this.configRepository = configRepository;
this.uuidGenerator = uuidGenerator;
this.workspaceHelper = workspaceHelper;
this.trackingClient = trackingClient;
this.eventRunner = eventRunner;
this.connectionHelper = connectionHelper;
}

public ConnectionsHandler(final ConfigRepository configRepository,
final WorkspaceHelper workspaceHelper,
final TrackingClient trackingClient,
final EventRunner eventRunner) {
final EventRunner eventRunner,
final ConnectionHelper connectionHelper) {
this(configRepository,
UUID::randomUUID,
workspaceHelper,
trackingClient,
eventRunner);
eventRunner,
connectionHelper);

}

Expand Down Expand Up @@ -545,8 +550,9 @@ public boolean matchSearch(final DestinationSearch destinationSearch, final Dest
return (destinationReadFromSearch == null || destinationReadFromSearch.equals(destinationRead));
}

public void deleteConnection(final UUID connectionId) {
eventRunner.deleteConnection(connectionId);
public void deleteConnection(final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException {
connectionHelper.deleteConnection(connectionId);
eventRunner.forceDeleteConnection(connectionId);
}

private ConnectionRead buildConnectionRead(final UUID connectionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,15 @@ public void deleteSource(final SourceRead source)
final var workspaceIdRequestBody = new WorkspaceIdRequestBody()
.workspaceId(source.getWorkspaceId());

connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody)
final List<UUID> uuidsToDelete = connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody)
.getConnections().stream()
.filter(con -> con.getSourceId().equals(source.getSourceId()))
.map(ConnectionRead::getConnectionId)
.forEach(connectionsHandler::deleteConnection);
.toList();

for (final UUID uuidToDelete : uuidsToDelete) {
connectionsHandler.deleteConnection(uuidToDelete);
}

final var spec = getSpecFromSourceId(source.getSourceId());
final var fullConfig = secretsRepositoryReader.getSourceConnectionWithSecrets(source.getSourceId()).getConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ public interface EventRunner {

ManualOperationResult resetConnection(final UUID connectionId, final List<StreamDescriptor> streamsToReset, final boolean runSyncImmediately);

void deleteConnection(final UUID connectionId);
void forceDeleteConnection(final UUID connectionId);

// TODO: Delete
@Deprecated(forRemoval = true)
void migrateSyncIfNeeded(final Set<UUID> connectionIds);

void update(final UUID connectionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public ManualOperationResult resetConnection(final UUID connectionId,
}

@Override
public void deleteConnection(final UUID connectionId) {
temporalClient.deleteConnection(connectionId);
public void forceDeleteConnection(final UUID connectionId) {
temporalClient.forceDeleteWorkflow(connectionId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import io.airbyte.server.helpers.ConnectionHelpers;
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.helper.ConnectionHelper;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -98,6 +99,7 @@ class ConnectionsHandlerTest {
private WorkspaceHelper workspaceHelper;
private TrackingClient trackingClient;
private EventRunner eventRunner;
private ConnectionHelper connectionHelper;

private static final String PRESTO_TO_HUDI = "presto to hudi";
private static final String PRESTO_TO_HUDI_PREFIX = "presto_to_hudi";
Expand Down Expand Up @@ -173,7 +175,7 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio
workspaceHelper = mock(WorkspaceHelper.class);
trackingClient = mock(TrackingClient.class);
eventRunner = mock(EventRunner.class);

connectionHelper = mock(ConnectionHelper.class);
when(workspaceHelper.getWorkspaceForSourceIdIgnoreExceptions(sourceId)).thenReturn(workspaceId);
when(workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(destinationId)).thenReturn(workspaceId);
when(workspaceHelper.getWorkspaceForOperationIdIgnoreExceptions(operationId)).thenReturn(workspaceId);
Expand All @@ -190,7 +192,8 @@ void setUp() throws JsonValidationException, ConfigNotFoundException, IOExceptio
uuidGenerator,
workspaceHelper,
trackingClient,
eventRunner);
eventRunner,
connectionHelper);

when(uuidGenerator.get()).thenReturn(standardSync.getConnectionId());
final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition()
Expand Down Expand Up @@ -831,10 +834,10 @@ void testSearchConnections() throws JsonValidationException, ConfigNotFoundExcep
}

@Test
void testDeleteConnection() {
void testDeleteConnection() throws JsonValidationException, ConfigNotFoundException, IOException {
connectionsHandler.deleteConnection(connectionId);

verify(eventRunner).deleteConnection(connectionId);
verify(connectionHelper).deleteConnection(connectionId);
}

@Test
Expand Down Expand Up @@ -904,7 +907,8 @@ void setUp() {
uuidGenerator,
workspaceHelper,
trackingClient,
eventRunner);
eventRunner,
connectionHelper);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,6 @@ void testIncrementalSync() throws Exception {

}

@Disabled
@Test
@Order(14)
void testDeleteConnection() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow
private static final String CHECK_JOB_OUTPUT_TAG = "check_job_output";
private static final int CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION = 1;

private static final String DONT_DELETE_IN_TEMPORAL_TAG = "dont_delete_in_temporal";
private static final int DONT_DELETE_IN_TEMPORAL_TAG_CURRENT_VERSION = 1;

private static final String DELETE_RESET_JOB_STREAMS_TAG = "delete_reset_job_streams";
private static final int DELETE_RESET_JOB_STREAMS_CURRENT_VERSION = 1;
private static final String RECORD_METRIC_TAG = "record_metric";
Expand Down Expand Up @@ -182,10 +185,17 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
if (workflowState.isDeleted()) {
if (workflowState.isRunning()) {
log.info("Cancelling the current running job because a connection deletion was requested");
// This call is not needed anymore since this will be cancel using the the cancellation state
reportCancelled(connectionUpdaterInput.getConnectionId());
}
log.info("Workflow deletion was requested. Calling deleteConnection activity before terminating the workflow.");
deleteConnectionBeforeTerminatingTheWorkflow();

final int dontDeleteInTemporal =
Workflow.getVersion(DONT_DELETE_IN_TEMPORAL_TAG, Workflow.DEFAULT_VERSION, DONT_DELETE_IN_TEMPORAL_TAG_CURRENT_VERSION);

if (dontDeleteInTemporal < DONT_DELETE_IN_TEMPORAL_TAG_CURRENT_VERSION) {
log.info("Workflow deletion was requested. Calling deleteConnection activity before terminating the workflow.");
deleteConnectionBeforeTerminatingTheWorkflow();
}
return;
}

Expand Down Expand Up @@ -503,6 +513,7 @@ public void cancelJob() {
cancellableSyncWorkflow.cancel();
}

// TODO: Delete when the don't delete in temporal is removed
@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME)
@Override
public void deleteConnection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.Map;

// TODO: Deleted when version is removed
@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
public class ConnectionDeletionActivityImpl implements ConnectionDeletionActivity {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ void cancelNonRunning() throws InterruptedException {
Mockito.verifyNoInteractions(mJobCreationAndStatusUpdateActivity);
}

// TODO: delete when the signal method can be removed
@Test
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
Expand Down Expand Up @@ -533,7 +534,7 @@ void deleteSync() throws InterruptedException {
&& changedStateEvent.isValue())
.isEmpty();

Mockito.verify(mConnectionDeletionActivity, Mockito.times(1)).deleteConnection(Mockito.any());
Mockito.verify(mConnectionDeletionActivity, Mockito.times(0)).deleteConnection(Mockito.any());
}

@Test
Expand Down

0 comments on commit 5f6f68b

Please sign in to comment.