Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bmoric/remove dep server worker #17894

Merged
merged 40 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
33b83bf
test [ci skip]
benmoriceau Oct 4, 2022
4134048
Remove the dependency between orchestrator APP and the worker
benmoriceau Oct 10, 2022
0d20286
Merge branch 'master' into bmoric/remove-dep-container-orchestrator-w…
benmoriceau Oct 10, 2022
d37bc19
Autogenerated files
benmoriceau Oct 10, 2022
bcf3431
Add missing annotation
benmoriceau Oct 10, 2022
db1f95d
Remove unused json2Schema block from worker
benmoriceau Oct 10, 2022
e7d1592
Move tess
benmoriceau Oct 10, 2022
0881cee
Missing deps and format
benmoriceau Oct 10, 2022
5c74045
Fix test build
benmoriceau Oct 10, 2022
7e2d449
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/rem…
benmoriceau Oct 10, 2022
203071a
TMP
benmoriceau Oct 11, 2022
390b8eb
Add missing dependencies
benmoriceau Oct 11, 2022
4b6634d
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/rem…
benmoriceau Oct 11, 2022
cbae067
PR comments
benmoriceau Oct 11, 2022
7efa2c7
Tmp
benmoriceau Oct 11, 2022
636e507
[ci skip] Tmp
benmoriceau Oct 11, 2022
628b721
Fix acceptance test and add the seed dependency
benmoriceau Oct 11, 2022
4c0de3c
Fix build
benmoriceau Oct 11, 2022
a3bce17
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/rem…
benmoriceau Oct 11, 2022
279447f
Merge branch 'bmoric/remove-dep-container-orchestrator-worker' of git…
benmoriceau Oct 11, 2022
b1defe5
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/rem…
benmoriceau Oct 12, 2022
4b2c8f2
For diff
benmoriceau Oct 12, 2022
72a11f8
tmp
benmoriceau Oct 12, 2022
dcfb012
Build pass
benmoriceau Oct 13, 2022
ae2748b
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/rem…
benmoriceau Oct 13, 2022
5424358
make the worker to be on the platform only
benmoriceau Oct 13, 2022
b243256
fix setting.yaml
benmoriceau Oct 13, 2022
68843af
Fix pmd
benmoriceau Oct 13, 2022
4898e50
Fix Cron
benmoriceau Oct 13, 2022
25a26d4
Add chart
benmoriceau Oct 13, 2022
6b8465b
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/rem…
benmoriceau Oct 14, 2022
6e4e1aa
Fix cron
benmoriceau Oct 14, 2022
7f915cb
Merge branch 'master' into bmoric/remove-dep-server-worker
benmoriceau Oct 14, 2022
d241c64
Fix server build.gradle
benmoriceau Oct 14, 2022
fada02f
Fix jar conflict
benmoriceau Oct 14, 2022
2f12031
Merge branch 'bmoric/remove-dep-server-worker' of github.com:airbyteh…
benmoriceau Oct 14, 2022
5d0cac0
Merge branch 'master' into bmoric/remove-dep-server-worker
benmoriceau Oct 17, 2022
77d3402
PR comments
benmoriceau Oct 17, 2022
4e6ab3f
Add cron micronaut environemnt
benmoriceau Oct 17, 2022
9f8c233
Merge branch 'master' into bmoric/remove-dep-server-worker
benmoriceau Oct 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 4 additions & 18 deletions airbyte-commons-temporal/build.gradle
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import org.jsonschema2pojo.SourceType

plugins {
id "java-library"
id 'com.github.eirnym.js2p' version '1.0'
}

dependencies {
Expand All @@ -18,27 +15,16 @@ dependencies {
testAnnotationProcessor platform(libs.micronaut.bom)
testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor


implementation project(':airbyte-config:config-models')
implementation project(':airbyte-config:config-persistence')
implementation project(':airbyte-metrics:metrics-lib')
implementation project(':airbyte-persistence:job-persistence')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-worker-models')

testImplementation 'io.temporal:temporal-testing:1.8.1'
// Needed to be able to mock final class
testImplementation 'org.mockito:mockito-inline:4.7.0'
}

jsonSchema2Pojo {
sourceType = SourceType.YAMLSCHEMA
source = files("${sourceSets.main.output.resourcesDir}/workers_models")
targetDirectory = new File(project.buildDir, 'generated/src/gen/java/')
removeOldOutput = true

targetPackage = 'io.airbyte.persistence.job.models'

useLongIntegers = true
generateBuilders = true
includeConstructors = false
includeSetters = true
}

Task publishArtifactsTask = getPublishArtifactsTask("$rootProject.ext.version", project)
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,24 @@

package io.airbyte.commons.temporal;

import io.airbyte.commons.temporal.exception.DeletedWorkflowException;
import io.airbyte.commons.temporal.exception.UnreachableWorkflowException;
import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow;
import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput;
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
import io.temporal.client.BatchRequest;
import io.temporal.client.WorkflowClient;
import io.temporal.workflow.Functions.Proc;
import io.temporal.workflow.Functions.Proc1;
import io.temporal.workflow.Functions.TemporalFunctionalInterfaceMarker;
import jakarta.inject.Singleton;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -17,6 +30,105 @@
@Slf4j
public class ConnectionManagerUtils {

/**
* Attempts to send a signal to the existing ConnectionManagerWorkflow for the provided connection.
*
* If the workflow is unreachable, this will restart the workflow and send the signal in a single
* batched request. Batching is used to avoid race conditions between starting the workflow and
* executing the signal.
*
* @param client the WorkflowClient for interacting with temporal
* @param connectionId the connection ID to execute this operation for
* @param signalMethod a function that takes in a connection manager workflow and executes a signal
* method on it, with no arguments
* @return the healthy connection manager workflow that was signaled
* @throws DeletedWorkflowException if the connection manager workflow was deleted
*/
public ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final WorkflowClient client,
final UUID connectionId,
final Function<ConnectionManagerWorkflow, Proc> signalMethod)
throws DeletedWorkflowException {
return signalWorkflowAndRepairIfNecessary(client, connectionId, signalMethod, Optional.empty());
}

/**
* Attempts to send a signal to the existing ConnectionManagerWorkflow for the provided connection.
*
* If the workflow is unreachable, this will restart the workflow and send the signal in a single
* batched request. Batching is used to avoid race conditions between starting the workflow and
* executing the signal.
*
* @param client the WorkflowClient for interacting with temporal
* @param connectionId the connection ID to execute this operation for
* @param signalMethod a function that takes in a connection manager workflow and executes a signal
* method on it, with 1 argument
* @param signalArgument the single argument to be input to the signal
* @return the healthy connection manager workflow that was signaled
* @throws DeletedWorkflowException if the connection manager workflow was deleted
*/
public <T> ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final WorkflowClient client,
final UUID connectionId,
final Function<ConnectionManagerWorkflow, Proc1<T>> signalMethod,
final T signalArgument)
throws DeletedWorkflowException {
return signalWorkflowAndRepairIfNecessary(client, connectionId, signalMethod, Optional.of(signalArgument));
}

// This method unifies the logic of the above two, by using the optional signalArgument parameter to
// indicate if an argument is being provided to the signal or not.
// Keeping this private and only exposing the above methods outside this class provides a strict
// type enforcement for external calls, and means this method can assume consistent type
// implementations for both cases.
private <T> ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final WorkflowClient client,
final UUID connectionId,
final Function<ConnectionManagerWorkflow, ? extends TemporalFunctionalInterfaceMarker> signalMethod,
final Optional<T> signalArgument)
throws DeletedWorkflowException {
try {
final ConnectionManagerWorkflow connectionManagerWorkflow = getConnectionManagerWorkflow(client, connectionId);
log.info("Retrieved existing connection manager workflow for connection {}. Executing signal.", connectionId);
// retrieve the signal from the lambda
final TemporalFunctionalInterfaceMarker signal = signalMethod.apply(connectionManagerWorkflow);
// execute the signal
if (signalArgument.isPresent()) {
((Proc1<T>) signal).apply(signalArgument.get());
} else {
((Proc) signal).apply();
}
return connectionManagerWorkflow;
} catch (final UnreachableWorkflowException e) {
log.error(
String.format(
"Failed to retrieve ConnectionManagerWorkflow for connection %s. Repairing state by creating new workflow and starting with the signal.",
connectionId),
e);

// in case there is an existing workflow in a bad state, attempt to terminate it first before
// starting a new workflow
safeTerminateWorkflow(client, connectionId, "Terminating workflow in unreachable state before starting a new workflow for this connection");

final ConnectionManagerWorkflow connectionManagerWorkflow = newConnectionManagerWorkflowStub(client, connectionId);
final ConnectionUpdaterInput startWorkflowInput = TemporalWorkflowUtils.buildStartWorkflowInput(connectionId);

final BatchRequest batchRequest = client.newSignalWithStartRequest();
batchRequest.add(connectionManagerWorkflow::run, startWorkflowInput);

// retrieve the signal from the lambda
final TemporalFunctionalInterfaceMarker signal = signalMethod.apply(connectionManagerWorkflow);
// add signal to batch request
if (signalArgument.isPresent()) {
batchRequest.add((Proc1<T>) signal, signalArgument.get());
} else {
batchRequest.add((Proc) signal);
}

client.signalWithStart(batchRequest);
log.info("Connection manager workflow for connection {} has been started and signaled.", connectionId);

return connectionManagerWorkflow;
}
}

void safeTerminateWorkflow(final WorkflowClient client, final String workflowId, final String reason) {
log.info("Attempting to terminate existing workflow for workflowId {}.", workflowId);
try {
Expand All @@ -33,10 +145,6 @@ public void safeTerminateWorkflow(final WorkflowClient client, final UUID connec
safeTerminateWorkflow(client, getConnectionManagerName(connectionId), reason);
}

public String getConnectionManagerName(final UUID connectionId) {
return "connection_manager_" + connectionId;
}

public ConnectionManagerWorkflow startConnectionManagerNoSignal(final WorkflowClient client, final UUID connectionId) {
final ConnectionManagerWorkflow connectionManagerWorkflow = newConnectionManagerWorkflowStub(client, connectionId);
final ConnectionUpdaterInput input = TemporalWorkflowUtils.buildStartWorkflowInput(connectionId);
Expand All @@ -45,9 +153,89 @@ public ConnectionManagerWorkflow startConnectionManagerNoSignal(final WorkflowCl
return connectionManagerWorkflow;
}

/**
* Attempts to retrieve the connection manager workflow for the provided connection.
*
* @param connectionId the ID of the connection whose workflow should be retrieved
* @return the healthy ConnectionManagerWorkflow
* @throws DeletedWorkflowException if the workflow was deleted, according to the workflow state
* @throws UnreachableWorkflowException if the workflow is in an unreachable state
*/
public ConnectionManagerWorkflow getConnectionManagerWorkflow(final WorkflowClient client, final UUID connectionId)
throws DeletedWorkflowException, UnreachableWorkflowException {

final ConnectionManagerWorkflow connectionManagerWorkflow;
final WorkflowState workflowState;
final WorkflowExecutionStatus workflowExecutionStatus;
try {
connectionManagerWorkflow = client.newWorkflowStub(ConnectionManagerWorkflow.class, getConnectionManagerName(connectionId));
workflowState = connectionManagerWorkflow.getState();
workflowExecutionStatus = getConnectionManagerWorkflowStatus(client, connectionId);
} catch (final Exception e) {
throw new UnreachableWorkflowException(
String.format("Failed to retrieve ConnectionManagerWorkflow for connection %s due to the following error:", connectionId),
e);
}

if (WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED.equals(workflowExecutionStatus)) {
if (workflowState.isDeleted()) {
throw new DeletedWorkflowException(String.format(
"The connection manager workflow for connection %s is deleted, so no further operations cannot be performed on it.",
connectionId));
}

// A non-deleted workflow being in a COMPLETED state is unexpected, and should be corrected
throw new UnreachableWorkflowException(
String.format("ConnectionManagerWorkflow for connection %s is unreachable due to having COMPLETED status.", connectionId));
}

if (workflowState.isQuarantined()) {
throw new UnreachableWorkflowException(
String.format("ConnectionManagerWorkflow for connection %s is unreachable due to being in a quarantined state.", connectionId));
}

return connectionManagerWorkflow;
}

boolean isWorkflowStateRunning(final WorkflowClient client, final UUID connectionId) {
try {
final ConnectionManagerWorkflow connectionManagerWorkflow = client.newWorkflowStub(ConnectionManagerWorkflow.class,
getConnectionManagerName(connectionId));
return connectionManagerWorkflow.getState().isRunning();
} catch (final Exception e) {
return false;
}
}

public WorkflowExecutionStatus getConnectionManagerWorkflowStatus(final WorkflowClient workflowClient, final UUID connectionId) {
final DescribeWorkflowExecutionRequest describeWorkflowExecutionRequest = DescribeWorkflowExecutionRequest.newBuilder()
.setExecution(WorkflowExecution.newBuilder()
.setWorkflowId(getConnectionManagerName(connectionId))
.build())
.setNamespace(workflowClient.getOptions().getNamespace()).build();

final DescribeWorkflowExecutionResponse describeWorkflowExecutionResponse = workflowClient.getWorkflowServiceStubs().blockingStub()
.describeWorkflowExecution(describeWorkflowExecutionRequest);

return describeWorkflowExecutionResponse.getWorkflowExecutionInfo().getStatus();
}

public long getCurrentJobId(final WorkflowClient client, final UUID connectionId) {
try {
final ConnectionManagerWorkflow connectionManagerWorkflow = getConnectionManagerWorkflow(client, connectionId);
return connectionManagerWorkflow.getJobInformation().getJobId();
} catch (final Exception e) {
return ConnectionManagerWorkflow.NON_RUNNING_JOB_ID;
}
}

public ConnectionManagerWorkflow newConnectionManagerWorkflowStub(final WorkflowClient client, final UUID connectionId) {
return client.newWorkflowStub(ConnectionManagerWorkflow.class,
TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.CONNECTION_UPDATER, getConnectionManagerName(connectionId)));
}

public String getConnectionManagerName(final UUID connectionId) {
return "connection_manager_" + connectionId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;
package io.airbyte.commons.temporal;

public enum ErrorCode {
UNKNOWN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;
package io.airbyte.commons.temporal;

import java.nio.file.Path;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;
package io.airbyte.commons.temporal;

import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.persistence.StreamResetPersistence;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.workers.temporal.exception.RetryableException;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.List;
Expand Down
Loading