Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bmoric/temporal cleaning cron #16414

Merged
merged 142 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
142 commits
Select commit Hold shift + click to select a range
1aaf052
WIP Convert airbyte-workers to Micronaut framework
jdpgrailsdev Aug 31, 2022
affa857
Rebase cleanup
jdpgrailsdev Sep 2, 2022
54bbd2c
Fix broken tests
jdpgrailsdev Sep 2, 2022
8b739fc
Simplify code
jdpgrailsdev Sep 2, 2022
d88941b
Support control vs data plane configuration
jdpgrailsdev Sep 2, 2022
222cad5
make WORFKLOW_PROXY_CACHE non-static to avoid cacheing mocks in unit …
pmossman Sep 2, 2022
8cfe8c8
Formatting
jdpgrailsdev Sep 6, 2022
001b8b5
Pairing on Worker Micronaut (#16364)
pmossman Sep 6, 2022
884d477
Revert temporal proxy changes
jdpgrailsdev Sep 6, 2022
8a71662
Formatting
jdpgrailsdev Sep 6, 2022
ea09c9b
Fix default value
jdpgrailsdev Sep 6, 2022
6db667f
register new route activity in test
pmossman Sep 6, 2022
a2996b2
fix SyncWorkflowTest now that it isn't doing any routing
pmossman Sep 6, 2022
0822d2b
Update dependencies
jdpgrailsdev Sep 7, 2022
ac3a275
More dependency updates
jdpgrailsdev Sep 7, 2022
068c09f
Update dependencies
jdpgrailsdev Sep 7, 2022
2aa0ab8
WIP Convert airbyte-workers to Micronaut framework
jdpgrailsdev Aug 31, 2022
0491117
Rebase cleanup
jdpgrailsdev Sep 2, 2022
a68d7aa
Fix broken tests
jdpgrailsdev Sep 2, 2022
643e36b
Simplify code
jdpgrailsdev Sep 2, 2022
26200ec
Support control vs data plane configuration
jdpgrailsdev Sep 2, 2022
3918a9d
make WORFKLOW_PROXY_CACHE non-static to avoid cacheing mocks in unit …
pmossman Sep 2, 2022
a5af940
Formatting
jdpgrailsdev Sep 6, 2022
c6a9ec3
Pairing on Worker Micronaut (#16364)
pmossman Sep 6, 2022
fbcb3c3
Revert temporal proxy changes
jdpgrailsdev Sep 6, 2022
b89a8f3
Formatting
jdpgrailsdev Sep 6, 2022
ed989a3
Fix default value
jdpgrailsdev Sep 6, 2022
de666c5
register new route activity in test
pmossman Sep 6, 2022
cb4edeb
fix SyncWorkflowTest now that it isn't doing any routing
pmossman Sep 6, 2022
60bb57e
Update dependencies
jdpgrailsdev Sep 7, 2022
4271780
More dependency updates
jdpgrailsdev Sep 7, 2022
0182b29
Update dependencies
jdpgrailsdev Sep 7, 2022
c359a1f
Merge branch 'jonathan/airbyte-worker-micronaut' of github.com:airbyt…
benmoriceau Sep 7, 2022
c574ed6
WIP Convert airbyte-workers to Micronaut framework
jdpgrailsdev Aug 31, 2022
89d86e5
Rebase cleanup
jdpgrailsdev Sep 2, 2022
e879675
Fix broken tests
jdpgrailsdev Sep 2, 2022
a9c1565
Simplify code
jdpgrailsdev Sep 2, 2022
34b72a2
Support control vs data plane configuration
jdpgrailsdev Sep 2, 2022
078957e
make WORFKLOW_PROXY_CACHE non-static to avoid cacheing mocks in unit …
pmossman Sep 2, 2022
dad3186
Formatting
jdpgrailsdev Sep 6, 2022
55f5034
Pairing on Worker Micronaut (#16364)
pmossman Sep 6, 2022
061a368
Revert temporal proxy changes
jdpgrailsdev Sep 6, 2022
78df2fa
Formatting
jdpgrailsdev Sep 6, 2022
ef54b2b
Fix default value
jdpgrailsdev Sep 6, 2022
05e2e2b
register new route activity in test
pmossman Sep 6, 2022
c3d93fa
fix SyncWorkflowTest now that it isn't doing any routing
pmossman Sep 6, 2022
6c96965
Update dependencies
jdpgrailsdev Sep 7, 2022
2ed4bbe
More dependency updates
jdpgrailsdev Sep 7, 2022
43ea323
Update dependencies
jdpgrailsdev Sep 7, 2022
6c92a07
Merge branch 'master' into jonathan/airbyte-worker-micronaut
jdpgrailsdev Sep 8, 2022
edde35b
Add the injected temporal client to airbyte-cron
benmoriceau Sep 7, 2022
ebb873d
Update cron to make it work
benmoriceau Sep 8, 2022
476064a
Merge branch 'jonathan/airbyte-worker-micronaut' of github.com:airbyt…
benmoriceau Sep 8, 2022
4326de7
Remove useless Bean factory
benmoriceau Sep 8, 2022
5ce56c5
PR comments
benmoriceau Sep 8, 2022
800e561
WIP Convert airbyte-workers to Micronaut framework
jdpgrailsdev Aug 31, 2022
896bbab
Rebase cleanup
jdpgrailsdev Sep 2, 2022
858a6de
Fix broken tests
jdpgrailsdev Sep 2, 2022
7abdf1f
Simplify code
jdpgrailsdev Sep 2, 2022
4e36dad
Support control vs data plane configuration
jdpgrailsdev Sep 2, 2022
7e9876a
make WORFKLOW_PROXY_CACHE non-static to avoid cacheing mocks in unit …
pmossman Sep 2, 2022
d58af58
Formatting
jdpgrailsdev Sep 6, 2022
253e191
Pairing on Worker Micronaut (#16364)
pmossman Sep 6, 2022
50b4094
Revert temporal proxy changes
jdpgrailsdev Sep 6, 2022
aa29c8a
Formatting
jdpgrailsdev Sep 6, 2022
5c0e197
Fix default value
jdpgrailsdev Sep 6, 2022
f48953a
register new route activity in test
pmossman Sep 6, 2022
0710776
fix SyncWorkflowTest now that it isn't doing any routing
pmossman Sep 6, 2022
7dc290c
Update dependencies
jdpgrailsdev Sep 7, 2022
80364d7
More dependency updates
jdpgrailsdev Sep 7, 2022
0020252
Update dependencies
jdpgrailsdev Sep 7, 2022
67ebb5c
Merge branch 'jonathan/airbyte-worker-micronaut' of github.com:airbyt…
benmoriceau Sep 8, 2022
6fa6998
Improve conditional bean check
jdpgrailsdev Sep 9, 2022
c60fcdd
Match existing Optional functionality
jdpgrailsdev Sep 9, 2022
1599488
Add notEquals check
jdpgrailsdev Sep 9, 2022
5eaa577
Merge branch 'master' into jonathan/airbyte-worker-micronaut
jdpgrailsdev Sep 9, 2022
0625302
Add missing env var to Helm chart
jdpgrailsdev Sep 9, 2022
9df6ae0
Fix typo
jdpgrailsdev Sep 9, 2022
452c796
Mark LogConfigs as Singleton
jdpgrailsdev Sep 9, 2022
a217fc2
Merge branch 'jonathan/airbyte-worker-micronaut' of github.com:airbyt…
benmoriceau Sep 9, 2022
0711780
WIP Convert airbyte-workers to Micronaut framework
jdpgrailsdev Aug 31, 2022
2c4070d
Rebase cleanup
jdpgrailsdev Sep 2, 2022
ec1b722
Fix broken tests
jdpgrailsdev Sep 2, 2022
096fc50
Simplify code
jdpgrailsdev Sep 2, 2022
30a779f
Support control vs data plane configuration
jdpgrailsdev Sep 2, 2022
4ae75ad
make WORFKLOW_PROXY_CACHE non-static to avoid cacheing mocks in unit …
pmossman Sep 2, 2022
539a20b
Formatting
jdpgrailsdev Sep 6, 2022
9913f80
Pairing on Worker Micronaut (#16364)
pmossman Sep 6, 2022
1405e36
Revert temporal proxy changes
jdpgrailsdev Sep 6, 2022
478ea84
Formatting
jdpgrailsdev Sep 6, 2022
4688d26
Fix default value
jdpgrailsdev Sep 6, 2022
66c4603
register new route activity in test
pmossman Sep 6, 2022
b1ab4ba
fix SyncWorkflowTest now that it isn't doing any routing
pmossman Sep 6, 2022
63103cd
Update dependencies
jdpgrailsdev Sep 7, 2022
63a0cbe
More dependency updates
jdpgrailsdev Sep 7, 2022
02a6e68
Update dependencies
jdpgrailsdev Sep 7, 2022
2469184
Improve conditional bean check
jdpgrailsdev Sep 9, 2022
ae3631e
Match existing Optional functionality
jdpgrailsdev Sep 9, 2022
b0ec8a0
Add notEquals check
jdpgrailsdev Sep 9, 2022
6f817c6
Add missing env var to Helm chart
jdpgrailsdev Sep 9, 2022
a96ad9d
Fix typo
jdpgrailsdev Sep 9, 2022
0ebc898
Mark LogConfigs as Singleton
jdpgrailsdev Sep 9, 2022
acc9c36
Env vars for log/state storage type
jdpgrailsdev Sep 9, 2022
3f70e8a
Remove use of Optional in bean declarations
jdpgrailsdev Sep 9, 2022
c7810d5
Fix typo in config property name
jdpgrailsdev Sep 9, 2022
d43d94e
Support Temporal Cloud namespace
jdpgrailsdev Sep 9, 2022
5631642
Change to @Value
jdpgrailsdev Sep 9, 2022
8b6ef90
Use correct value for conditional check
jdpgrailsdev Sep 9, 2022
d576704
Upgrade Micronaut
jdpgrailsdev Sep 12, 2022
e4944bc
Fix merge conflict
jdpgrailsdev Sep 12, 2022
ce82199
Formatting
jdpgrailsdev Sep 12, 2022
73e0f53
Add missing env var
jdpgrailsdev Sep 12, 2022
b712763
Use sync task queue environment variable
jdpgrailsdev Sep 12, 2022
5074a85
Handle sync task queue as set
jdpgrailsdev Sep 12, 2022
7be1461
format and force http
pmossman Sep 12, 2022
842ae6f
Handle case where sync task queue is empty
jdpgrailsdev Sep 12, 2022
5e176c9
Merge branch 'jonathan/airbyte-worker-micronaut' of github.com:airbyt…
jdpgrailsdev Sep 12, 2022
9e6e517
Add correct path to config property
jdpgrailsdev Sep 12, 2022
c965391
Remove unused import
jdpgrailsdev Sep 12, 2022
f1c9b41
Merge branch 'jonathan/airbyte-worker-micronaut' of github.com:airbyt…
benmoriceau Sep 12, 2022
8a30092
Remove conflict
benmoriceau Sep 12, 2022
7ac1964
Remove unused parameter
jdpgrailsdev Sep 12, 2022
e60e957
Formatting
jdpgrailsdev Sep 12, 2022
41e0e58
Use pattern for condition process factory beans
jdpgrailsdev Sep 12, 2022
5c7bb98
Cleanup
jdpgrailsdev Sep 12, 2022
f0f9516
PR feedback
jdpgrailsdev Sep 13, 2022
3285a0d
Merge branch 'jonathan/airbyte-worker-micronaut' of github.com:airbyt…
benmoriceau Sep 13, 2022
26edba3
Revert hack for testing
jdpgrailsdev Sep 13, 2022
97cd5a0
Fix temporal restart by status (#16447)
benmoriceau Sep 13, 2022
ead1ebb
Merge branch 'jonathan/airbyte-worker-micronaut' of github.com:airbyt…
benmoriceau Sep 13, 2022
0fa4352
Update application.yml
benmoriceau Sep 13, 2022
1ba8513
Merge branch 'master' into bmoric/temporal-cleaning-cron
benmoriceau Sep 15, 2022
3f0c332
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/tem…
benmoriceau Sep 16, 2022
3b2a14f
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/tem…
benmoriceau Sep 20, 2022
96be70f
Re add worker dep
benmoriceau Sep 20, 2022
6a74c9f
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/tem…
benmoriceau Sep 21, 2022
4f1343e
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/tem…
benmoriceau Sep 21, 2022
0851518
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/tem…
benmoriceau Sep 26, 2022
c2de8b6
Add missing env var
benmoriceau Sep 26, 2022
4299e7a
PR comments
benmoriceau Sep 26, 2022
aa249f7
Bmoric/move temporal client (#16778)
benmoriceau Sep 26, 2022
dad504d
Merge branch 'master' into bmoric/temporal-cleaning-cron
benmoriceau Sep 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions airbyte-commons-temporal/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import org.jsonschema2pojo.SourceType

plugins {
id "java-library"
id 'com.github.eirnym.js2p' version '1.0'
}

dependencies {
annotationProcessor platform(libs.micronaut.bom)
annotationProcessor libs.bundles.micronaut.annotation.processor

implementation platform(libs.micronaut.bom)
implementation libs.bundles.micronaut

implementation 'io.temporal:temporal-sdk:1.8.1'
implementation 'io.temporal:temporal-serviceclient:1.8.1'

testAnnotationProcessor platform(libs.micronaut.bom)
testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor


implementation project(':airbyte-config:config-models')
implementation project(':airbyte-metrics:metrics-lib')

testImplementation 'io.temporal:temporal-testing:1.8.1'
// Needed to be able to mock final class
testImplementation 'org.mockito:mockito-inline:4.7.0'
}

jsonSchema2Pojo {
sourceType = SourceType.YAMLSCHEMA
source = files("${sourceSets.main.output.resourcesDir}/workers_models")
targetDirectory = new File(project.buildDir, 'generated/src/gen/java/')
removeOldOutput = true

targetPackage = 'io.airbyte.persistence.job.models'

useLongIntegers = true
generateBuilders = true
includeConstructors = false
includeSetters = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;
package io.airbyte.commons.temporal;

import io.temporal.activity.ActivityExecutionContext;
import io.temporal.client.ActivityCompletionException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.temporal;

import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow;
import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput;
import io.temporal.client.WorkflowClient;
import java.util.UUID;
import javax.inject.Singleton;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@NoArgsConstructor
@Singleton
@Slf4j
public class ConnectionManagerUtils {

void safeTerminateWorkflow(final WorkflowClient client, final String workflowId, final String reason) {
log.info("Attempting to terminate existing workflow for workflowId {}.", workflowId);
try {
client.newUntypedWorkflowStub(workflowId).terminate(reason);
} catch (final Exception e) {
log.warn(
"Could not terminate temporal workflow due to the following error; "
+ "this may be because there is currently no running workflow for this connection.",
e);
}
}

public void safeTerminateWorkflow(final WorkflowClient client, final UUID connectionId, final String reason) {
safeTerminateWorkflow(client, getConnectionManagerName(connectionId), reason);
}

public String getConnectionManagerName(final UUID connectionId) {
return "connection_manager_" + connectionId;
}

public ConnectionManagerWorkflow startConnectionManagerNoSignal(final WorkflowClient client, final UUID connectionId) {
final ConnectionManagerWorkflow connectionManagerWorkflow = newConnectionManagerWorkflowStub(client, connectionId);
final ConnectionUpdaterInput input = TemporalWorkflowUtils.buildStartWorkflowInput(connectionId);
WorkflowClient.start(connectionManagerWorkflow::run, input);

return connectionManagerWorkflow;
}

public ConnectionManagerWorkflow newConnectionManagerWorkflowStub(final WorkflowClient client, final UUID connectionId) {
return client.newWorkflowStub(ConnectionManagerWorkflow.class,
TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.CONNECTION_UPDATER, getConnectionManagerName(connectionId)));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.temporal;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow;
import io.micronaut.context.annotation.Requires;
import io.temporal.api.common.v1.WorkflowType;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsRequest;
import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsResponse;
import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsRequest;
import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsResponse;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

@AllArgsConstructor
@NoArgsConstructor
@Slf4j
@Singleton
@Requires(property = "airbyte.worker.plane",
notEquals = "DATA_PLANE")
public class TemporalClient {

@Inject
private WorkflowClient client;
@Inject
private WorkflowServiceStubs service;
@Inject
private ConnectionManagerUtils connectionManagerUtils;

private final Set<String> workflowNames = new HashSet<>();

public void restartClosedWorkflowByStatus(final WorkflowExecutionStatus executionStatus) {
final Set<UUID> workflowExecutionInfos = fetchClosedWorkflowsByStatus(executionStatus);

final Set<UUID> nonRunningWorkflow = filterOutRunningWorkspaceId(workflowExecutionInfos);

nonRunningWorkflow.forEach(connectionId -> {
connectionManagerUtils.safeTerminateWorkflow(client, connectionId, "Terminating workflow in "
+ "unreachable state before starting a new workflow for this connection");
connectionManagerUtils.startConnectionManagerNoSignal(client, connectionId);
});
}

Set<UUID> fetchClosedWorkflowsByStatus(final WorkflowExecutionStatus executionStatus) {
ByteString token;
ListClosedWorkflowExecutionsRequest workflowExecutionsRequest =
ListClosedWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.build();

final Set<UUID> workflowExecutionInfos = new HashSet<>();
do {
final ListClosedWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest =
service.blockingStub().listClosedWorkflowExecutions(workflowExecutionsRequest);
final WorkflowType connectionManagerWorkflowType = WorkflowType.newBuilder().setName(ConnectionManagerWorkflow.class.getSimpleName()).build();
workflowExecutionInfos.addAll(listOpenWorkflowExecutionsRequest.getExecutionsList().stream()
.filter(workflowExecutionInfo -> workflowExecutionInfo.getType() == connectionManagerWorkflowType ||
workflowExecutionInfo.getStatus() == executionStatus)
.flatMap((workflowExecutionInfo -> extractConnectionIdFromWorkflowId(workflowExecutionInfo.getExecution().getWorkflowId()).stream()))
.collect(Collectors.toSet()));
token = listOpenWorkflowExecutionsRequest.getNextPageToken();

workflowExecutionsRequest =
ListClosedWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.setNextPageToken(token)
.build();

} while (token != null && token.size() > 0);

return workflowExecutionInfos;
}

@VisibleForTesting
Set<UUID> filterOutRunningWorkspaceId(final Set<UUID> workflowIds) {
refreshRunningWorkflow();

final Set<UUID> runningWorkflowByUUID =
workflowNames.stream().flatMap(name -> extractConnectionIdFromWorkflowId(name).stream()).collect(Collectors.toSet());

return workflowIds.stream().filter(workflowId -> !runningWorkflowByUUID.contains(workflowId)).collect(Collectors.toSet());
}

@VisibleForTesting
void refreshRunningWorkflow() {
workflowNames.clear();
ByteString token;
ListOpenWorkflowExecutionsRequest openWorkflowExecutionsRequest =
ListOpenWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.build();
do {
final ListOpenWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest =
service.blockingStub().listOpenWorkflowExecutions(openWorkflowExecutionsRequest);
final Set<String> workflowExecutionInfos = listOpenWorkflowExecutionsRequest.getExecutionsList().stream()
.map((workflowExecutionInfo -> workflowExecutionInfo.getExecution().getWorkflowId()))
.collect(Collectors.toSet());
workflowNames.addAll(workflowExecutionInfos);
token = listOpenWorkflowExecutionsRequest.getNextPageToken();

openWorkflowExecutionsRequest =
ListOpenWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.setNextPageToken(token)
.build();

} while (token != null && token.size() > 0);
}

Optional<UUID> extractConnectionIdFromWorkflowId(final String workflowId) {
if (!workflowId.startsWith("connection_manager_")) {
return Optional.empty();
}
return Optional.ofNullable(StringUtils.removeStart(workflowId, "connection_manager_"))
.map(
stringUUID -> UUID.fromString(stringUUID));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;
package io.airbyte.commons.temporal;

public enum TemporalJobType {
GET_SPEC,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;
package io.airbyte.commons.temporal;

import com.uber.m3.tally.RootScopeBuilder;
import com.uber.m3.tally.Scope;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;
package io.airbyte.commons.temporal;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.temporal.scheduling.ConnectionUpdaterInput;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowClientOptions;
import io.temporal.client.WorkflowOptions;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.temporal.config;

import io.airbyte.commons.temporal.TemporalUtils;
import io.airbyte.commons.temporal.TemporalWorkflowUtils;
import io.micronaut.context.annotation.Factory;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import javax.inject.Singleton;

/**
* Micronaut bean factory for Temporal-related singletons.
*/
@Factory
public class TemporalBeanFactory {

@Singleton
public WorkflowServiceStubs temporalService(final TemporalUtils temporalUtils) {
return temporalUtils.createTemporalService();
}

@Singleton
public WorkflowClient workflowClient(
final TemporalUtils temporalUtils,
final WorkflowServiceStubs temporalService) {
return TemporalWorkflowUtils.createWorkflowClient(temporalService, temporalUtils.getNamespace());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling;
package io.airbyte.commons.temporal.scheduling;

import io.airbyte.workers.temporal.scheduling.state.WorkflowState;
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
import io.temporal.workflow.QueryMethod;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.WorkflowInterface;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling;
package io.airbyte.commons.temporal.scheduling;

import io.airbyte.workers.temporal.scheduling.state.WorkflowState;
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
import java.util.UUID;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -39,7 +39,7 @@ public class ConnectionUpdaterInput {
private WorkflowState workflowState;
private boolean resetConnection;
@Builder.Default
private boolean fromJobResetFailure = false;
private final boolean fromJobResetFailure = false;

@Builder.Default
private boolean skipScheduling = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling.state;
package io.airbyte.commons.temporal.scheduling.state;

import io.airbyte.config.FailureReason;
import java.util.HashSet;
Expand All @@ -20,7 +20,7 @@ public class WorkflowInternalState {
private Integer attemptNumber = null;

// StandardSyncOutput standardSyncOutput = null;
private final Set<FailureReason> failures = new HashSet<>();
private Set<FailureReason> failures = new HashSet<>();
private Boolean partialSuccess = null;

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling.state;
package io.airbyte.commons.temporal.scheduling.state;

import io.airbyte.workers.temporal.scheduling.state.listener.WorkflowStateChangedListener;
import io.airbyte.workers.temporal.scheduling.state.listener.WorkflowStateChangedListener.ChangedStateEvent;
import io.airbyte.workers.temporal.scheduling.state.listener.WorkflowStateChangedListener.StateField;
import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener;
import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener.ChangedStateEvent;
import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener.StateField;
import java.util.UUID;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling.state.listener;
package io.airbyte.commons.temporal.scheduling.state.listener;

import java.util.LinkedList;
import java.util.Queue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling.state.listener;
package io.airbyte.commons.temporal.scheduling.state.listener;

import java.util.LinkedList;
import java.util.Optional;
Expand Down
Loading