Skip to content

Commit

Permalink
Change the block logic and block after the job creation (#10597)
Browse files Browse the repository at this point in the history
This is changing the check to see if a connection exist in order to make it more performant and more accurate. It makes sure that the workflow is reachable by trying to query it.
  • Loading branch information
benmoriceau authored and etsybaev committed Mar 5, 2022
1 parent fd66e8b commit 4f24b09
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -490,10 +490,6 @@ public void testManualSync() throws Exception {
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();
// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
assertSourceAndDestinationDbInSync(false);
Expand All @@ -512,10 +508,6 @@ public void testCancelSync() throws Exception {
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();
// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForJob(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING));

Expand Down Expand Up @@ -548,10 +540,6 @@ public void testIncrementalSync() throws Exception {
.destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();
// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
LOGGER.info("Beginning testIncrementalSync() sync 1");
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
Expand Down Expand Up @@ -655,10 +643,6 @@ public void testMultipleSchemasAndTablesSync() throws Exception {
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();
// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
assertSourceAndDestinationDbInSync(false);
Expand Down Expand Up @@ -786,10 +770,6 @@ public void testCheckpointing() throws Exception {
.destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null).getConnectionId();
// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));

Expand Down Expand Up @@ -878,10 +858,6 @@ public void testBackpressure() throws Exception {
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null)
.getConnectionId();
// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));

Expand Down Expand Up @@ -959,10 +935,6 @@ public void testFailureTimeout() throws Exception {
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null)
.getConnectionId();
// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}

final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
Expand Down Expand Up @@ -1001,11 +973,6 @@ public void testDowntimeDuringSync() throws Exception {
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}

final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));

Thread.sleep(5000);
Expand Down Expand Up @@ -1045,10 +1012,6 @@ public void testCancelSyncWithInterruption() throws Exception {
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForJob(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING));

Expand Down Expand Up @@ -1083,10 +1046,6 @@ public void testCuttingOffPodBeforeFilesTransfer() throws Exception {
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

LOGGER.info("Waiting for connection to be available in Temporal...");
// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}

LOGGER.info("Run manual sync...");
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
Expand Down Expand Up @@ -1131,10 +1090,6 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

LOGGER.info("Waiting for connection to be available in Temporal...");
// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}

LOGGER.info("Run manual sync...");
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
Expand Down Expand Up @@ -1556,17 +1511,4 @@ public enum Type {
DESTINATION
}

private static void waitForTemporalWorkflow(final UUID connectionId) {
/*
* do { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e);
* } } while
* (temporalClient.isWorkflowRunning(temporalClient.getConnectionManagerName(connectionId)));
*/
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Builder;
Expand Down Expand Up @@ -221,6 +225,22 @@ public void submitConnectionUpdaterAsync(final UUID connectionId) {
signalRequest.add(connectionManagerWorkflow::run, input);

WorkflowClient.start(connectionManagerWorkflow::run, input);

try {
CompletableFuture.supplyAsync(() -> {
try {
do {
Thread.sleep(DELAY_BETWEEN_QUERY_MS);
} while (!isWorkflowRunning(getConnectionManagerName(connectionId)));
} catch (InterruptedException e) {}

return null;
}).get(60, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to create a new connection manager workflow", e);
} catch (TimeoutException e) {
log.error("Can't create a new connection manager workflow due to timeout", e);
}
}

public void deleteConnection(final UUID connectionId) {
Expand Down Expand Up @@ -448,37 +468,18 @@ <T> TemporalResponse<T> execute(final JobRunConfig jobRunConfig, final Supplier<
}

/**
* Check if a workflow is currently running. It is using the temporal pagination (see:
* https://temporalio.slack.com/archives/CTRCR8RBP/p1638926310308200)
* Check if a workflow is currently running. Running means that it is query-able, thus we check that
* we can properly launch a query
*/
public boolean isWorkflowRunning(final String workflowName) {
ByteString token;
ListOpenWorkflowExecutionsRequest openWorkflowExecutionsRequest =
ListOpenWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.setMaximumPageSize(MAXIMUM_SEARCH_PAGE_SIZE)
.build();
do {
final ListOpenWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest =
service.blockingStub().listOpenWorkflowExecutions(openWorkflowExecutionsRequest);
final long matchingWorkflowCount = listOpenWorkflowExecutionsRequest.getExecutionsList().stream()
.filter((workflowExecutionInfo -> workflowExecutionInfo.getExecution().getWorkflowId().equals(workflowName)))
.count();
if (matchingWorkflowCount != 0) {
return true;
}
token = listOpenWorkflowExecutionsRequest.getNextPageToken();

openWorkflowExecutionsRequest =
ListOpenWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.setNextPageToken(token)
.setMaximumPageSize(MAXIMUM_SEARCH_PAGE_SIZE)
.build();

} while (token != null && token.size() > 0);
try {
ConnectionManagerWorkflow connectionManagerWorkflow = getExistingWorkflow(ConnectionManagerWorkflow.class, workflowName);
connectionManagerWorkflow.getState();

return false;
return true;
} catch (Exception e) {
return false;
}
}

@VisibleForTesting
Expand Down

0 comments on commit 4f24b09

Please sign in to comment.