Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bmoric/temporal cleaning cron #16414

Merged
merged 142 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from 140 commits
Commits
Show all changes
142 commits
Select commit Hold shift + click to select a range
1aaf052
WIP Convert airbyte-workers to Micronaut framework
jdpgrailsdev Aug 31, 2022
affa857
Rebase cleanup
jdpgrailsdev Sep 2, 2022
54bbd2c
Fix broken tests
jdpgrailsdev Sep 2, 2022
8b739fc
Simplify code
jdpgrailsdev Sep 2, 2022
d88941b
Support control vs data plane configuration
jdpgrailsdev Sep 2, 2022
222cad5
make WORFKLOW_PROXY_CACHE non-static to avoid cacheing mocks in unit …
pmossman Sep 2, 2022
8cfe8c8
Formatting
jdpgrailsdev Sep 6, 2022
001b8b5
Pairing on Worker Micronaut (#16364)
pmossman Sep 6, 2022
884d477
Revert temporal proxy changes
jdpgrailsdev Sep 6, 2022
8a71662
Formatting
jdpgrailsdev Sep 6, 2022
ea09c9b
Fix default value
jdpgrailsdev Sep 6, 2022
6db667f
register new route activity in test
pmossman Sep 6, 2022
a2996b2
fix SyncWorkflowTest now that it isn't doing any routing
pmossman Sep 6, 2022
0822d2b
Update dependencies
jdpgrailsdev Sep 7, 2022
ac3a275
More dependency updates
jdpgrailsdev Sep 7, 2022
068c09f
Update dependencies
jdpgrailsdev Sep 7, 2022
2aa0ab8
WIP Convert airbyte-workers to Micronaut framework
jdpgrailsdev Aug 31, 2022
0491117
Rebase cleanup
jdpgrailsdev Sep 2, 2022
a68d7aa
Fix broken tests
jdpgrailsdev Sep 2, 2022
643e36b
Simplify code
jdpgrailsdev Sep 2, 2022
26200ec
Support control vs data plane configuration
jdpgrailsdev Sep 2, 2022
3918a9d
make WORFKLOW_PROXY_CACHE non-static to avoid cacheing mocks in unit …
pmossman Sep 2, 2022
a5af940
Formatting
jdpgrailsdev Sep 6, 2022
c6a9ec3
Pairing on Worker Micronaut (#16364)
pmossman Sep 6, 2022
fbcb3c3
Revert temporal proxy changes
jdpgrailsdev Sep 6, 2022
b89a8f3
Formatting
jdpgrailsdev Sep 6, 2022
ed989a3
Fix default value
jdpgrailsdev Sep 6, 2022
de666c5
register new route activity in test
pmossman Sep 6, 2022
cb4edeb
fix SyncWorkflowTest now that it isn't doing any routing
pmossman Sep 6, 2022
60bb57e
Update dependencies
jdpgrailsdev Sep 7, 2022
4271780
More dependency updates
jdpgrailsdev Sep 7, 2022
0182b29
Update dependencies
jdpgrailsdev Sep 7, 2022
c359a1f
Merge branch 'jonathan/airbyte-worker-micronaut' of github.com:airbyt…
benmoriceau Sep 7, 2022
c574ed6
WIP Convert airbyte-workers to Micronaut framework
jdpgrailsdev Aug 31, 2022
89d86e5
Rebase cleanup
jdpgrailsdev Sep 2, 2022
e879675
Fix broken tests
jdpgrailsdev Sep 2, 2022
a9c1565
Simplify code
jdpgrailsdev Sep 2, 2022
34b72a2
Support control vs data plane configuration
jdpgrailsdev Sep 2, 2022
078957e
make WORFKLOW_PROXY_CACHE non-static to avoid cacheing mocks in unit …
pmossman Sep 2, 2022
dad3186
Formatting
jdpgrailsdev Sep 6, 2022
55f5034
Pairing on Worker Micronaut (#16364)
pmossman Sep 6, 2022
061a368
Revert temporal proxy changes
jdpgrailsdev Sep 6, 2022
78df2fa
Formatting
jdpgrailsdev Sep 6, 2022
ef54b2b
Fix default value
jdpgrailsdev Sep 6, 2022
05e2e2b
register new route activity in test
pmossman Sep 6, 2022
c3d93fa
fix SyncWorkflowTest now that it isn't doing any routing
pmossman Sep 6, 2022
6c96965
Update dependencies
jdpgrailsdev Sep 7, 2022
2ed4bbe
More dependency updates
jdpgrailsdev Sep 7, 2022
43ea323
Update dependencies
jdpgrailsdev Sep 7, 2022
6c92a07
Merge branch 'master' into jonathan/airbyte-worker-micronaut
jdpgrailsdev Sep 8, 2022
edde35b
Add the injected temporal client to airbyte-cron
benmoriceau Sep 7, 2022
ebb873d
Update cron to make it work
benmoriceau Sep 8, 2022
476064a
Merge branch 'jonathan/airbyte-worker-micronaut' of github.com:airbyt…
benmoriceau Sep 8, 2022
4326de7
Remove useless Bean factory
benmoriceau Sep 8, 2022
5ce56c5
PR comments
benmoriceau Sep 8, 2022
800e561
WIP Convert airbyte-workers to Micronaut framework
jdpgrailsdev Aug 31, 2022
896bbab
Rebase cleanup
jdpgrailsdev Sep 2, 2022
858a6de
Fix broken tests
jdpgrailsdev Sep 2, 2022
7abdf1f
Simplify code
jdpgrailsdev Sep 2, 2022
4e36dad
Support control vs data plane configuration
jdpgrailsdev Sep 2, 2022
7e9876a
make WORFKLOW_PROXY_CACHE non-static to avoid cacheing mocks in unit …
pmossman Sep 2, 2022
d58af58
Formatting
jdpgrailsdev Sep 6, 2022
253e191
Pairing on Worker Micronaut (#16364)
pmossman Sep 6, 2022
50b4094
Revert temporal proxy changes
jdpgrailsdev Sep 6, 2022
aa29c8a
Formatting
jdpgrailsdev Sep 6, 2022
5c0e197
Fix default value
jdpgrailsdev Sep 6, 2022
f48953a
register new route activity in test
pmossman Sep 6, 2022
0710776
fix SyncWorkflowTest now that it isn't doing any routing
pmossman Sep 6, 2022
7dc290c
Update dependencies
jdpgrailsdev Sep 7, 2022
80364d7
More dependency updates
jdpgrailsdev Sep 7, 2022
0020252
Update dependencies
jdpgrailsdev Sep 7, 2022
67ebb5c
Merge branch 'jonathan/airbyte-worker-micronaut' of github.com:airbyt…
benmoriceau Sep 8, 2022
6fa6998
Improve conditional bean check
jdpgrailsdev Sep 9, 2022
c60fcdd
Match existing Optional functionality
jdpgrailsdev Sep 9, 2022
1599488
Add notEquals check
jdpgrailsdev Sep 9, 2022
5eaa577
Merge branch 'master' into jonathan/airbyte-worker-micronaut
jdpgrailsdev Sep 9, 2022
0625302
Add missing env var to Helm chart
jdpgrailsdev Sep 9, 2022
9df6ae0
Fix typo
jdpgrailsdev Sep 9, 2022
452c796
Mark LogConfigs as Singleton
jdpgrailsdev Sep 9, 2022
a217fc2
Merge branch 'jonathan/airbyte-worker-micronaut' of github.com:airbyt…
benmoriceau Sep 9, 2022
0711780
WIP Convert airbyte-workers to Micronaut framework
jdpgrailsdev Aug 31, 2022
2c4070d
Rebase cleanup
jdpgrailsdev Sep 2, 2022
ec1b722
Fix broken tests
jdpgrailsdev Sep 2, 2022
096fc50
Simplify code
jdpgrailsdev Sep 2, 2022
30a779f
Support control vs data plane configuration
jdpgrailsdev Sep 2, 2022
4ae75ad
make WORFKLOW_PROXY_CACHE non-static to avoid cacheing mocks in unit …
pmossman Sep 2, 2022
539a20b
Formatting
jdpgrailsdev Sep 6, 2022
9913f80
Pairing on Worker Micronaut (#16364)
pmossman Sep 6, 2022
1405e36
Revert temporal proxy changes
jdpgrailsdev Sep 6, 2022
478ea84
Formatting
jdpgrailsdev Sep 6, 2022
4688d26
Fix default value
jdpgrailsdev Sep 6, 2022
66c4603
register new route activity in test
pmossman Sep 6, 2022
b1ab4ba
fix SyncWorkflowTest now that it isn't doing any routing
pmossman Sep 6, 2022
63103cd
Update dependencies
jdpgrailsdev Sep 7, 2022
63a0cbe
More dependency updates
jdpgrailsdev Sep 7, 2022
02a6e68
Update dependencies
jdpgrailsdev Sep 7, 2022
2469184
Improve conditional bean check
jdpgrailsdev Sep 9, 2022
ae3631e
Match existing Optional functionality
jdpgrailsdev Sep 9, 2022
b0ec8a0
Add notEquals check
jdpgrailsdev Sep 9, 2022
6f817c6
Add missing env var to Helm chart
jdpgrailsdev Sep 9, 2022
a96ad9d
Fix typo
jdpgrailsdev Sep 9, 2022
0ebc898
Mark LogConfigs as Singleton
jdpgrailsdev Sep 9, 2022
acc9c36
Env vars for log/state storage type
jdpgrailsdev Sep 9, 2022
3f70e8a
Remove use of Optional in bean declarations
jdpgrailsdev Sep 9, 2022
c7810d5
Fix typo in config property name
jdpgrailsdev Sep 9, 2022
d43d94e
Support Temporal Cloud namespace
jdpgrailsdev Sep 9, 2022
5631642
Change to @Value
jdpgrailsdev Sep 9, 2022
8b6ef90
Use correct value for conditional check
jdpgrailsdev Sep 9, 2022
d576704
Upgrade Micronaut
jdpgrailsdev Sep 12, 2022
e4944bc
Fix merge conflict
jdpgrailsdev Sep 12, 2022
ce82199
Formatting
jdpgrailsdev Sep 12, 2022
73e0f53
Add missing env var
jdpgrailsdev Sep 12, 2022
b712763
Use sync task queue environment variable
jdpgrailsdev Sep 12, 2022
5074a85
Handle sync task queue as set
jdpgrailsdev Sep 12, 2022
7be1461
format and force http
pmossman Sep 12, 2022
842ae6f
Handle case where sync task queue is empty
jdpgrailsdev Sep 12, 2022
5e176c9
Merge branch 'jonathan/airbyte-worker-micronaut' of github.com:airbyt…
jdpgrailsdev Sep 12, 2022
9e6e517
Add correct path to config property
jdpgrailsdev Sep 12, 2022
c965391
Remove unused import
jdpgrailsdev Sep 12, 2022
f1c9b41
Merge branch 'jonathan/airbyte-worker-micronaut' of github.com:airbyt…
benmoriceau Sep 12, 2022
8a30092
Remove conflict
benmoriceau Sep 12, 2022
7ac1964
Remove unused parameter
jdpgrailsdev Sep 12, 2022
e60e957
Formatting
jdpgrailsdev Sep 12, 2022
41e0e58
Use pattern for condition process factory beans
jdpgrailsdev Sep 12, 2022
5c7bb98
Cleanup
jdpgrailsdev Sep 12, 2022
f0f9516
PR feedback
jdpgrailsdev Sep 13, 2022
3285a0d
Merge branch 'jonathan/airbyte-worker-micronaut' of github.com:airbyt…
benmoriceau Sep 13, 2022
26edba3
Revert hack for testing
jdpgrailsdev Sep 13, 2022
97cd5a0
Fix temporal restart by status (#16447)
benmoriceau Sep 13, 2022
ead1ebb
Merge branch 'jonathan/airbyte-worker-micronaut' of github.com:airbyt…
benmoriceau Sep 13, 2022
0fa4352
Update application.yml
benmoriceau Sep 13, 2022
1ba8513
Merge branch 'master' into bmoric/temporal-cleaning-cron
benmoriceau Sep 15, 2022
3f0c332
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/tem…
benmoriceau Sep 16, 2022
3b2a14f
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/tem…
benmoriceau Sep 20, 2022
96be70f
Re add worker dep
benmoriceau Sep 20, 2022
6a74c9f
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/tem…
benmoriceau Sep 21, 2022
4f1343e
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/tem…
benmoriceau Sep 21, 2022
0851518
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/tem…
benmoriceau Sep 26, 2022
c2de8b6
Add missing env var
benmoriceau Sep 26, 2022
4299e7a
PR comments
benmoriceau Sep 26, 2022
aa249f7
Bmoric/move temporal client (#16778)
benmoriceau Sep 26, 2022
dad504d
Merge branch 'master' into bmoric/temporal-cleaning-cron
benmoriceau Sep 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions airbyte-cron/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,19 @@ plugins {
}

dependencies {
implementation 'com.auth0:java-jwt:3.19.2'
implementation 'io.fabric8:kubernetes-client:5.12.2'
implementation 'io.sentry:sentry:6.3.1'
implementation 'io.temporal:temporal-sdk:1.8.1'
implementation 'io.temporal:temporal-serviceclient:1.8.1'

implementation project(':airbyte-api')
implementation project(':airbyte-analytics')
implementation project(':airbyte-config:config-models')
implementation project(':airbyte-config:config-persistence')
implementation project(':airbyte-db:db-lib')
implementation project(':airbyte-metrics:metrics-lib')
implementation project(':airbyte-workers')

runtimeOnly 'io.micronaut:micronaut-http-server-netty:3.6.0'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,27 @@

package io.airbyte.cron.selfhealing;

import io.airbyte.workers.temporal.TemporalClient;
import io.micronaut.scheduling.annotation.Scheduled;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import javax.inject.Named;
import javax.inject.Singleton;
import lombok.extern.slf4j.Slf4j;

@Singleton
@Slf4j
public class Temporal {

public Temporal() {
log.info("Creating temporal self-healing");
private final TemporalClient temporalClient;

public Temporal(@Named("temporalClient") final TemporalClient temporalClient) {
log.debug("Creating temporal self-healing");
this.temporalClient = temporalClient;
}

@Scheduled(fixedRate = "10s")
void cleanTemporal() {}
void cleanTemporal() {
temporalClient.restartClosedWorkflowByStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED);
}

}
40 changes: 40 additions & 0 deletions airbyte-cron/src/main/resources/application-control.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
datasources:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need this file in the cron service.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue here is that all the Beans are being initiated because the cron module depends on the worker one which has all the beans being declared.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The eager initiation was set to true, I am removing it and will try to clean this list. Unfortunately we will need more config than what we thought.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah...this is because we depend on airbyte-workers. That causes Micronaut to find all singletons. The solution, ultimately, is to extract the shared code to a library module that both can depend on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We sync up with @gosusnp and discover that we have ApplicationInitializer. I think that the best way to do it is to have the cron module to not depends on the worker module. I will try to extract the temporal client from it in another PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benmoriceau Agreed...that is the best course here. It will cause some conflict with the airbyte-workers Micronaut PR, but if you are able to extract and get that merged first, I can easily update the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jdpgrailsdev After merging and merging the extraction of the TemporalClient, I'll clean this file.

config:
connection-test-query: SELECT 1
connection-timeout: 30000
idle-timeout: 600000
maximum-pool-size: 10
url: ${DATABASE_URL}
driverClassName: org.postgresql.Driver
username: ${DATABASE_USER}
password: ${DATABASE_PASSWORD}
jobs:
connection-test-query: SELECT 1
connection-timeout: 30000
idle-timeout: 600000
maximum-pool-size: 10
url: ${DATABASE_URL}
driverClassName: org.postgresql.Driver
username: ${DATABASE_USER}
password: ${DATABASE_PASSWORD}

flyway:
enabled: true
datasources:
config:
enabled: false
locations:
- 'classpath:io/airbyte/db/instance/configs/migrations'
jobs:
enabled: false
locations:
- 'classpath:io/airbyte/db/instance/jobs/migrations'

jooq:
datasources:
config:
jackson-converter-enabled: true
sql-dialect: POSTGRES
jobs:
jackson-converter-enabled: true
sql-dialect: POSTGRES
219 changes: 219 additions & 0 deletions airbyte-cron/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
micronaut:
application:
name: airbyte-workers
security:
intercept-url-map:
- pattern: /**
httpMethod: GET
access:
- isAnonymous()
server:
port: 9000

airbyte:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused about this part. It looks like we are starting to duplicate configs and a lot of the keys here feel irrelevant to the airbyte-cron itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gosusnp Agreed. We shouldn't need the full config from the airbyte-workers branch. We should only need the config bits needed to configure the Temporal client. I would start by commenting them all out and then restoring the ones that are actually used (Micronaut will complain at runtime that it can't find the property).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jdpgrailsdev @gosusnp I answered it here: #16414 (comment) I will bring it to the sit and work.

activity:
initial-delay: ${ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS:30}
max-attempts: ${ACTIVITY_MAX_ATTEMPT:5}
max-delay: ${ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS:600}
max-timeout: ${ACTIVITY_MAX_TIMEOUT_SECOND:120}
cloud:
storage:
logs:
type: ${WORKER_LOGS_STORAGE_TYPE:}
gcs:
application-credentials: ${GOOGLE_APPLICATION_CREDENTIALS:}
bucket: ${GCS_LOG_BUCKET:}
minio:
access-key: ${AWS_ACCESS_KEY_ID:}
bucket: ${S3_LOG_BUCKET:}
endpoint: ${S3_MINIO_ENDPOINT:}
secret-access-key: ${AWS_SECRET_ACCESS_KEY:}
s3:
access-key: ${AWS_ACCESS_KEY_ID:}
bucket: ${S3_LOG_BUCKET:}
region: ${S3_LOG_BUCKET_REGION:}
secret-access-key: ${AWS_SECRET_ACCESS_KEY:}
state:
type: ${WORKER_STATE_STORAGE_TYPE:}
gcs:
application-credentials: ${STATE_STORAGE_GCS_APPLICATION_CREDENTIALS:}
bucket: ${STATE_STORAGE_GCS_BUCKET_NAME:}
minio:
access-key: ${STATE_STORAGE_MINIO_ACCESS_KEY:}
bucket: ${STATE_STORAGE_MINIO_BUCKET_NAME:}
endpoint: ${STATE_STORAGE_MINIO_ENDPOINT:}
secret-access-key: ${STATE_STORAGE_MINIO_SECRET_ACCESS_KEY:}
s3:
access-key: ${STATE_STORAGE_S3_ACCESS_KEY:}
bucket: ${STATE_STORAGE_S3_BUCKET_NAME:}
region: ${STATE_STORAGE_S3_BUCKET_REGION:}
secret-access-key: ${STATE_STORAGE_S3_SECRET_ACCESS_KEY:}
connector:
specific-resource-defaults-enabled: ${CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED:false}
container:
orchestrator:
enabled: ${CONTAINER_ORCHESTRATOR_ENABLED:false}
image: ${CONTAINER_ORCHESTRATOR_IMAGE:}
secret-mount-path: ${CONTAINER_ORCHESTRATOR_SECRET_MOUNT_PATH:}
secret-name: ${CONTAINER_ORCHESTRATOR_SECRET_NAME:}
control:
plane:
auth-endpoint: ${CONTROL_PLANE_AUTH_ENDPOINT:}
data:
sync:
task-queue: ${DATA_SYNC_TASK_QUEUES:SYNC}
plane:
connection-ids-mvp: ${CONNECTION_IDS_FOR_MVP_DATA_PLANE:}
service-account:
credentials-path: ${DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH:}
email: ${DATA_PLANE_SERVICE_ACCOUNT_EMAIL:}
deployment-mode: ${DEPLOYMENT_MODE:OSS}
flyway:
configs:
initialization-timeout-ms: ${CONFIGS_DATABASE_INITIALIZATION_TIMEOUT_MS:60000}
minimum-migration-version: ${CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION}
jobs:
initialization-timeout-ms: ${JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS:60000}
minimum-migration-version: ${JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION}
internal:
api:
auth-header:
name: ${AIRBYTE_API_AUTH_HEADER_NAME:}
value: ${AIRBYTE_API_AUTH_HEADER_VALUE:}
host: ${INTERNAL_API_HOST}
local:
docker-mount: ${LOCAL_DOCKER_MOUNT}
root: ${LOCAL_ROOT}
worker:
env: ${WORKER_ENVIRONMENT:DOCKER}
check:
enabled: ${SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS:true}
kube:
annotations: ${CHECK_JOB_KUBE_ANNOTATION:}
node-selectors: ${CHECK_JOB_KUBE_NODE_SELECTORS:}
max-workers: ${MAX_CHECK_WORKERS:5}
main:
container:
cpu:
limit: ${CHECK_JOB_MAIN_CONTAINER_CPU_LIMIT:}
request: ${CHECK_JOB_MAIN_CONTAINER_CPU_REQUEST:}
memory:
limit: ${CHECK_JOB_MAIN_CONTAINER_MEMORY_LIMIT:}
request: ${CHECK_JOB_MAIN_CONTAINER_MEMORY_REQUEST:}
connection:
enabled: ${SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS:true}
discover:
enabled: ${SHOULD_RUN_DISCOVER_WORKFLOWS:true}
kube:
annotations: ${DISCOVER_JOB_KUBE_ANNOTATIONS:}
node-selectors: ${DISCOVER_JOB_KUBE_NODE_SELECTORS:}
max-workers: ${MAX_DISCOVER_WORKERS:5}
job:
error-reporting:
sentry:
dsn: ${JOB_ERROR_REPORTING_SENTRY_DSN}
strategy: ${JOB_ERROR_REPORTING_STRATEGY:LOGGING}
failed:
max-days: ${MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE:14}
max-jobs: ${MAX_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE:100}
kube:
annotations: ${JOB_KUBE_ANNOTATIONS:}
images:
busybox: ${JOB_KUBE_BUSYBOX_IMAGE:`busybox:1.28`}
curl: ${JOB_KUBE_CURL_IMAGE:`curlimages/curl:7.83.1`}
socat: ${JOB_KUBE_SOCAT_IMAGE:`alpine/socat:1.7.4.3-r0`}
main:
container:
image-pull-policy: ${JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY:IfNotPresent}
image-pull-secret: ${JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET:}
namespace: ${JOB_KUBE_NAMESPACE:default}
node-selectors: ${JOB_KUBE_NODE_SELECTORS:}
sidecar:
container:
image-pull-policy: ${JOB_KUBE_SIDECAR_CONTAINER_IMAGE_PULL_POLICY:IfNotPresent}
tolerations: ${JOB_KUBE_TOLERATIONS:}
main:
container:
cpu:
limit: ${JOB_MAIN_CONTAINER_CPU_LIMIT:}
request: ${JOB_MAIN_CONTAINER_CPU_REQUEST:}
memory:
limit: ${JOB_MAIN_CONTAINER_MEMORY_LIMIT:}
request: ${JOB_MAIN_CONTAINER_MEMORY_REQUEST:}
normalization:
main:
container:
cpu:
limit: ${NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT:}
request: ${NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST:}
memory:
limit: ${NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT:}
request: ${NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST:}
plane: ${WORKER_PLANE:CONTROL_PLANE}
replication:
orchestrator:
cpu:
limit: ${REPLICATION_ORCHESTRATOR_CPU_LIMIT:}
request: ${REPLICATION_ORCHESTRATOR_CPU_REQUEST:}
memory:
limit: ${REPLICATION_ORCHESTRATOR_MEMORY_LIMIT:}
request: ${REPLICATION_ORCHESTRATOR_MEMORY_REQUEST:}
spec:
enabled: ${SHOULD_RUN_GET_SPEC_WORKFLOWS:true}
kube:
annotations: ${SPEC_JOB_KUBE_ANNOTATIONS:}
node-selectors: ${SPEC_JOB_KUBE_NODE_SELECTORS:}
max-workers: ${MAX_SPEC_WORKERS:5}
sync:
enabled: ${SHOULD_RUN_SYNC_WORKFLOWS:true}
max-workers: ${MAX_SYNC_WORKERS:5}
max-attempts: ${SYNC_JOB_MAX_ATTEMPTS:3}
max-timeout: ${SYNC_JOB_MAX_TIMEOUT_DAYS:3}
role: ${AIRBYTE_ROLE:}
secret:
persistence: ${SECRET_PERSISTENCE:TESTING_CONFIG_DB_TABLE}
store:
gcp:
credentials: ${SECRET_STORE_GCP_CREDENTIALS:}
project-id: ${SECRET_STORE_GCP_PROJECT_ID:}
vault:
address: ${VAULT_ADDRESS:}
prefix: ${VAULT_PREFIX:}
token: ${VAULT_AUTH_TOKEN:}
temporal:
worker:
ports: ${TEMPORAL_WORKER_PORTS:}
tracking-strategy: ${TRACKING_STRATEGY:LOGGING}
version: ${AIRBYTE_VERSION}
web-app:
url: ${WEBAPP_URL:}
workflow:
failure:
restart-delay: ${WORKFLOW_FAILURE_RESTART_DELAY_SECONDS:600}
workspace:
docker-mount: ${WORKSPACE_DOCKER_MOUNT:}
root: ${WORKSPACE_ROOT}

docker:
network: ${DOCKER_NETWORK:host}

endpoints:
all:
enabled: true

temporal:
cloud:
client:
cert: ${TEMPORAL_CLOUD_CLIENT_CERT:}
key: ${TEMPORAL_CLOUD_CLIENT_KEY:}
enabled: ${TEMPORAL_CLOUD_ENABLED:false}
host: ${TEMPORAL_CLOUD_HOST:}
namespace: ${TEMPORAL_CLOUD_NAMESPACE:}
host: ${TEMPORAL_HOST:`airbyte-temporal:7233`}
retention: ${TEMPORAL_HISTORY_RETENTION_IN_DAYS:30}

logger:
levels:
io.airbyte.bootloader: DEBUG
# Uncomment to help resolve issues with conditional beans
# io.micronaut.context.condition: DEBUG
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
import io.micronaut.context.annotation.Requires;
import io.temporal.api.common.v1.WorkflowType;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsRequest;
import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsResponse;
import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsRequest;
import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsResponse;
import io.temporal.api.workflowservice.v1.ListWorkflowExecutionsRequest;
import io.temporal.api.workflowservice.v1.ListWorkflowExecutionsResponse;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
Expand Down Expand Up @@ -464,8 +464,8 @@ public ManualOperationResult synchronousResetConnection(final UUID connectionId,
Optional.of(resetJobId), Optional.empty());
}

public void restartWorkflowByStatus(final WorkflowExecutionStatus executionStatus) {
final Set<UUID> workflowExecutionInfos = fetchWorkflowsByStatus(executionStatus);
public void restartClosedWorkflowByStatus(final WorkflowExecutionStatus executionStatus) {
final Set<UUID> workflowExecutionInfos = fetchClosedWorkflowsByStatus(executionStatus);

final Set<UUID> nonRunningWorkflow = filterOutRunningWorkspaceId(workflowExecutionInfos);

Expand All @@ -491,17 +491,17 @@ Optional<UUID> extractConnectionIdFromWorkflowId(final String workflowId) {
stringUUID -> UUID.fromString(stringUUID));
}

Set<UUID> fetchWorkflowsByStatus(final WorkflowExecutionStatus executionStatus) {
Set<UUID> fetchClosedWorkflowsByStatus(final WorkflowExecutionStatus executionStatus) {
ByteString token;
ListWorkflowExecutionsRequest workflowExecutionsRequest =
ListWorkflowExecutionsRequest.newBuilder()
ListClosedWorkflowExecutionsRequest workflowExecutionsRequest =
ListClosedWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.build();

final Set<UUID> workflowExecutionInfos = new HashSet<>();
do {
final ListWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest =
service.blockingStub().listWorkflowExecutions(workflowExecutionsRequest);
final ListClosedWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest =
service.blockingStub().listClosedWorkflowExecutions(workflowExecutionsRequest);
final WorkflowType connectionManagerWorkflowType = WorkflowType.newBuilder().setName(ConnectionManagerWorkflow.class.getSimpleName()).build();
workflowExecutionInfos.addAll(listOpenWorkflowExecutionsRequest.getExecutionsList().stream()
.filter(workflowExecutionInfo -> workflowExecutionInfo.getType() == connectionManagerWorkflowType ||
Expand All @@ -511,7 +511,7 @@ Set<UUID> fetchWorkflowsByStatus(final WorkflowExecutionStatus executionStatus)
token = listOpenWorkflowExecutionsRequest.getNextPageToken();

workflowExecutionsRequest =
ListWorkflowExecutionsRequest.newBuilder()
ListClosedWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.setNextPageToken(token)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,11 +765,11 @@ void testRestartFailed() {
final Set<UUID> workflowIds = Set.of(connectionId);

doReturn(workflowIds)
.when(temporalClient).fetchWorkflowsByStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED);
.when(temporalClient).fetchClosedWorkflowsByStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED);
doReturn(workflowIds)
.when(temporalClient).filterOutRunningWorkspaceId(workflowIds);
mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED);
temporalClient.restartWorkflowByStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED);
temporalClient.restartClosedWorkflowByStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED);
verify(mConnectionManagerUtils).safeTerminateWorkflow(eq(workflowClient), eq(connectionId),
anyString());
verify(mConnectionManagerUtils).startConnectionManagerNoSignal(eq(workflowClient), eq(connectionId));
Expand Down
Loading