Skip to content

Commit

Permalink
Change the block logic and block after the job creation
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed Feb 23, 2022
1 parent 91933c2 commit 503e406
Showing 1 changed file with 29 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Value;
Expand Down Expand Up @@ -220,6 +224,22 @@ public void submitConnectionUpdaterAsync(final UUID connectionId) {
signalRequest.add(connectionManagerWorkflow::run, input);

WorkflowClient.start(connectionManagerWorkflow::run, input);

try {
CompletableFuture.supplyAsync(() -> {
try {
do {
Thread.sleep(DELAY_BETWEEN_QUERY_MS);
} while (!isWorkflowRunning(getConnectionManagerName(connectionId)));
} catch (InterruptedException e) {}

return null;
}).get(60, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException e) {
log.error("Can't create a new workflow because of the exception bellow", e);
} catch (TimeoutException e) {
log.error("Can't create a new workflow within a minute", e);
}
}

public void deleteConnection(final UUID connectionId) {
Expand Down Expand Up @@ -446,37 +466,18 @@ <T> TemporalResponse<T> execute(final JobRunConfig jobRunConfig, final Supplier<
}

/**
* Check if a workflow is currently running. It is using the temporal pagination (see:
* https://temporalio.slack.com/archives/CTRCR8RBP/p1638926310308200)
* Check if a workflow is currently running. Running means that it is query-able, thus we check that
* we can properly launch a query
*/
public boolean isWorkflowRunning(final String workflowName) {
ByteString token;
ListOpenWorkflowExecutionsRequest openWorkflowExecutionsRequest =
ListOpenWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.setMaximumPageSize(MAXIMUM_SEARCH_PAGE_SIZE)
.build();
do {
final ListOpenWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest =
service.blockingStub().listOpenWorkflowExecutions(openWorkflowExecutionsRequest);
final long matchingWorkflowCount = listOpenWorkflowExecutionsRequest.getExecutionsList().stream()
.filter((workflowExecutionInfo -> workflowExecutionInfo.getExecution().getWorkflowId().equals(workflowName)))
.count();
if (matchingWorkflowCount != 0) {
return true;
}
token = listOpenWorkflowExecutionsRequest.getNextPageToken();

openWorkflowExecutionsRequest =
ListOpenWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.setNextPageToken(token)
.setMaximumPageSize(MAXIMUM_SEARCH_PAGE_SIZE)
.build();

} while (token != null && token.size() > 0);
try {
ConnectionManagerWorkflow connectionManagerWorkflow = getExistingWorkflow(ConnectionManagerWorkflow.class, workflowName);
connectionManagerWorkflow.getState();

return false;
return true;
} catch (Exception e) {
return false;
}
}

@VisibleForTesting
Expand Down

0 comments on commit 503e406

Please sign in to comment.