Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix orchestrator restart problem for cloud #10565

Merged
merged 22 commits into from
Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions airbyte-config/models/src/main/java/io/airbyte/config/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,32 @@ public interface Configs {
*/
MaxWorkersConfig getMaxWorkers();

/**
* Define if the worker should run get spec workflows. Defaults to true. Internal-use only.
*/
boolean shouldRunGetSpecWorkflows();

/**
* Define if the worker should run check connection workflows. Defaults to true. Internal-use only.
*/
boolean shouldRunCheckConnectionWorkflows();

/**
* Define if the worker should run discover workflows. Defaults to true. Internal-use only.
*/
boolean shouldRunDiscoverWorkflows();

/**
* Define if the worker should run sync workflows. Defaults to true. Internal-use only.
*/
boolean shouldRunSyncWorkflows();

/**
* Define if the worker should run connection manager workflows. Defaults to true. Internal-use
* only.
*/
boolean shouldRunConnectionManagerWorkflows();

// Worker - Kube only
/**
* Define the local ports the Airbyte Worker pod uses to connect to the various Job pods.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ public class EnvConfigs implements Configs {
public static final String ACTIVITY_MAX_ATTEMPT = "ACTIVITY_MAX_ATTEMPT";
public static final String ACTIVITY_DELAY_IN_SECOND_BETWEEN_ATTEMPTS = "ACTIVITY_DELAY_IN_SECOND_BETWEEN_ATTEMPTS";

private static final String SHOULD_RUN_GET_SPEC_WORKFLOWS = "SHOULD_RUN_GET_SPEC_WORKFLOWS";
private static final String SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS = "SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS";
private static final String SHOULD_RUN_DISCOVER_WORKFLOWS = "SHOULD_RUN_DISCOVER_WORKFLOWS";
private static final String SHOULD_RUN_SYNC_WORKFLOWS = "SHOULD_RUN_SYNC_WORKFLOWS";
private static final String SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS = "SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS";

// job-type-specific overrides
public static final String SPEC_JOB_KUBE_NODE_SELECTORS = "SPEC_JOB_KUBE_NODE_SELECTORS";
public static final String CHECK_JOB_KUBE_NODE_SELECTORS = "CHECK_JOB_KUBE_NODE_SELECTORS";
Expand Down Expand Up @@ -692,6 +698,31 @@ public MaxWorkersConfig getMaxWorkers() {
Math.toIntExact(getEnvOrDefault(MAX_SYNC_WORKERS, DEFAULT_MAX_SYNC_WORKERS)));
}

@Override
public boolean shouldRunGetSpecWorkflows() {
return getEnvOrDefault(SHOULD_RUN_GET_SPEC_WORKFLOWS, true);
}

@Override
public boolean shouldRunCheckConnectionWorkflows() {
return getEnvOrDefault(SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS, true);
}

@Override
public boolean shouldRunDiscoverWorkflows() {
return getEnvOrDefault(SHOULD_RUN_DISCOVER_WORKFLOWS, true);
}

@Override
public boolean shouldRunSyncWorkflows() {
return getEnvOrDefault(SHOULD_RUN_SYNC_WORKFLOWS, true);
}

@Override
public boolean shouldRunConnectionManagerWorkflows() {
return getEnvOrDefault(SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS, true);
}

@Override
public Set<Integer> getTemporalWorkerPorts() {
final var ports = getEnvOrDefault(TEMPORAL_WORKER_PORTS, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -965,35 +965,66 @@ public void testDowntimeDuringSync() throws Exception {
final String connectionName = "test-connection";
final UUID sourceId = createPostgresSource().getSourceId();
final UUID destinationId = createDestination().getDestinationId();
final UUID operationId = createOperation().getOperationId();
final AirbyteCatalog catalog = discoverSourceSchema(sourceId);
final SyncMode syncMode = SyncMode.FULL_REFRESH;
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
for (final var input : List.of("KILL_BOTH_NON_SYNC_SLIGHTLY_FIRST", "KILL_ONLY_SYNC", "KILL_ONLY_NON_SYNC")) {
LOGGER.info("Checking " + input);

Thread.sleep(5000);
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(), catalog, null).getConnectionId();

LOGGER.info("Scaling down workers...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0);
Thread.sleep(1000);
JobInfoRead connectionSyncRead = null;

LOGGER.info("Scaling up workers...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1);
while (connectionSyncRead == null) {

waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
assertSourceAndDestinationDbInSync(false);
try {
connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
} catch (Exception e) {
LOGGER.error("retrying after error", e);
}
}

Thread.sleep(10000);

switch (input) {
case "KILL_BOTH_NON_SYNC_SLIGHTLY_FIRST" -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It can be a parameterized test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm planning to leave it as-is. I think we want a lot more re-use of the discovered catalog between steps. Can improve that in a separate PR and make this better afterwards.

LOGGER.info("Scaling down both workers at roughly the same time...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0);
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true);

LOGGER.info("Scaling up both workers...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1);
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1);
}
case "KILL_ONLY_SYNC" -> {
LOGGER.info("Scaling down only sync worker...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true);

LOGGER.info("Scaling up sync worker...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1);
}
case "KILL_ONLY_NON_SYNC" -> {
LOGGER.info("Scaling down only non-sync worker...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true);

LOGGER.info("Scaling up non-sync worker...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1);
}
}

waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());

final long numAttempts = apiClient.getJobsApi()
.getJobInfo(new JobIdRequestBody().id(connectionSyncRead.getJob().getId()))
.getAttempts()
.size();
final long numAttempts = apiClient.getJobsApi()
.getJobInfo(new JobIdRequestBody().id(connectionSyncRead.getJob().getId()))
.getAttempts()
.size();

// it should be able to accomplish the resume without an additional attempt!
assertEquals(1, numAttempts);
// it should be able to accomplish the resume without an additional attempt!
assertEquals(1, numAttempts);
}
}

@Test
Expand Down
174 changes: 98 additions & 76 deletions airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,67 +134,31 @@ public void start() {

final WorkerFactory factory = WorkerFactory.newInstance(WorkflowClient.newInstance(temporalService));

final Worker specWorker = factory.newWorker(TemporalJobType.GET_SPEC.name(), getWorkerOptions(maxWorkers.getMaxSpecWorkers()));
specWorker.registerWorkflowImplementationTypes(SpecWorkflowImpl.class);
specWorker.registerActivitiesImplementations(
new SpecActivityImpl(specWorkerConfigs, specProcessFactory, workspaceRoot, workerEnvironment, logConfigs, jobPersistence,
airbyteVersion));
if (configs.shouldRunGetSpecWorkflows()) {
registerGetSpec(factory);
}

final Worker checkConnectionWorker =
factory.newWorker(TemporalJobType.CHECK_CONNECTION.name(), getWorkerOptions(maxWorkers.getMaxCheckWorkers()));
checkConnectionWorker.registerWorkflowImplementationTypes(CheckConnectionWorkflowImpl.class);
checkConnectionWorker
.registerActivitiesImplementations(
new CheckConnectionActivityImpl(checkWorkerConfigs, checkProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment, logConfigs,
jobPersistence, airbyteVersion));
if (configs.shouldRunCheckConnectionWorkflows()) {
registerCheckConnection(factory);
}

final Worker discoverWorker = factory.newWorker(TemporalJobType.DISCOVER_SCHEMA.name(), getWorkerOptions(maxWorkers.getMaxDiscoverWorkers()));
discoverWorker.registerWorkflowImplementationTypes(DiscoverCatalogWorkflowImpl.class);
discoverWorker
.registerActivitiesImplementations(
new DiscoverCatalogActivityImpl(discoverWorkerConfigs, discoverProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment,
logConfigs,
jobPersistence, airbyteVersion));
if (configs.shouldRunDiscoverWorkflows()) {
registerDiscover(factory);
}

final NormalizationActivityImpl normalizationActivity =
new NormalizationActivityImpl(
containerOrchestratorConfig,
defaultWorkerConfigs,
defaultProcessFactory,
secretsHydrator,
workspaceRoot,
workerEnvironment,
logConfigs,
jobPersistence,
airbyteVersion);
final DbtTransformationActivityImpl dbtTransformationActivity =
new DbtTransformationActivityImpl(
containerOrchestratorConfig,
defaultWorkerConfigs,
defaultProcessFactory,
secretsHydrator,
workspaceRoot,
workerEnvironment,
logConfigs,
jobPersistence,
airbyteVersion);
new PersistStateActivityImpl(workspaceRoot, configRepository);
final PersistStateActivityImpl persistStateActivity = new PersistStateActivityImpl(workspaceRoot, configRepository);
final Worker syncWorker = factory.newWorker(TemporalJobType.SYNC.name(), getWorkerOptions(maxWorkers.getMaxSyncWorkers()));
final ReplicationActivityImpl replicationActivity = getReplicationActivityImpl(
containerOrchestratorConfig,
replicationWorkerConfigs,
replicationProcessFactory,
secretsHydrator,
workspaceRoot,
workerEnvironment,
logConfigs,
jobPersistence,
airbyteVersion);
syncWorker.registerWorkflowImplementationTypes(SyncWorkflowImpl.class);
if (configs.shouldRunSyncWorkflows() || configs.shouldRunConnectionManagerWorkflows()) {
jrhizor marked this conversation as resolved.
Show resolved Hide resolved
if (configs.shouldRunSyncWorkflows()) {
registerSync(factory);
}

syncWorker.registerActivitiesImplementations(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity);
if (configs.shouldRunConnectionManagerWorkflows()) {
registerConnectionManager(factory);
}
}
factory.start();
}

private void registerConnectionManager(final WorkerFactory factory) {
final JobCreator jobCreator = new DefaultJobCreator(jobPersistence, configRepository, defaultWorkerConfigs.getResourceRequirements());

final Worker connectionUpdaterWorker =
Expand All @@ -214,29 +178,57 @@ public void start() {
configRepository,
jobCreator),
new ConfigFetchActivityImpl(configRepository, jobPersistence, configs, () -> Instant.now().getEpochSecond()),
new ConnectionDeletionActivityImpl(connectionHelper),
replicationActivity,
normalizationActivity,
dbtTransformationActivity,
persistStateActivity);
new ConnectionDeletionActivityImpl(connectionHelper));
}

factory.start();
private void registerSync(final WorkerFactory factory) {
final ReplicationActivityImpl replicationActivity = getReplicationActivityImpl(replicationWorkerConfigs, replicationProcessFactory);

final NormalizationActivityImpl normalizationActivity = getNormalizationActivityImpl(
defaultWorkerConfigs,
defaultProcessFactory);

final DbtTransformationActivityImpl dbtTransformationActivity = getDbtActivityImpl(
defaultWorkerConfigs,
defaultProcessFactory);

final PersistStateActivityImpl persistStateActivity = new PersistStateActivityImpl(workspaceRoot, configRepository);

final Worker syncWorker = factory.newWorker(TemporalJobType.SYNC.name(), getWorkerOptions(maxWorkers.getMaxSyncWorkers()));
syncWorker.registerWorkflowImplementationTypes(SyncWorkflowImpl.class);
syncWorker.registerActivitiesImplementations(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity);
}

private void registerDiscover(final WorkerFactory factory) {
final Worker discoverWorker = factory.newWorker(TemporalJobType.DISCOVER_SCHEMA.name(), getWorkerOptions(maxWorkers.getMaxDiscoverWorkers()));
discoverWorker.registerWorkflowImplementationTypes(DiscoverCatalogWorkflowImpl.class);
discoverWorker
.registerActivitiesImplementations(
new DiscoverCatalogActivityImpl(discoverWorkerConfigs, discoverProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment,
logConfigs,
jobPersistence, airbyteVersion));
}

private void registerCheckConnection(final WorkerFactory factory) {
final Worker checkConnectionWorker =
factory.newWorker(TemporalJobType.CHECK_CONNECTION.name(), getWorkerOptions(maxWorkers.getMaxCheckWorkers()));
checkConnectionWorker.registerWorkflowImplementationTypes(CheckConnectionWorkflowImpl.class);
checkConnectionWorker
.registerActivitiesImplementations(
new CheckConnectionActivityImpl(checkWorkerConfigs, checkProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment, logConfigs,
jobPersistence, airbyteVersion));
}

public void registerGetSpec(final WorkerFactory factory) {
jrhizor marked this conversation as resolved.
Show resolved Hide resolved
final Worker specWorker = factory.newWorker(TemporalJobType.GET_SPEC.name(), getWorkerOptions(maxWorkers.getMaxSpecWorkers()));
specWorker.registerWorkflowImplementationTypes(SpecWorkflowImpl.class);
specWorker.registerActivitiesImplementations(
new SpecActivityImpl(specWorkerConfigs, specProcessFactory, workspaceRoot, workerEnvironment, logConfigs, jobPersistence,
airbyteVersion));
}

/**
* Switches behavior based on containerOrchestratorEnabled to decide whether to use new container
* launching or not.
*/
private ReplicationActivityImpl getReplicationActivityImpl(
final Optional<ContainerOrchestratorConfig> containerOrchestratorConfig,
final WorkerConfigs workerConfigs,
final ProcessFactory jobProcessFactory,
final SecretsHydrator secretsHydrator,
final Path workspaceRoot,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final JobPersistence jobPersistence,
final String airbyteVersion) {
private ReplicationActivityImpl getReplicationActivityImpl(final WorkerConfigs workerConfigs,
final ProcessFactory jobProcessFactory) {

return new ReplicationActivityImpl(
containerOrchestratorConfig,
Expand All @@ -250,6 +242,36 @@ private ReplicationActivityImpl getReplicationActivityImpl(
airbyteVersion);
}

private NormalizationActivityImpl getNormalizationActivityImpl(final WorkerConfigs workerConfigs,
final ProcessFactory jobProcessFactory) {

return new NormalizationActivityImpl(
containerOrchestratorConfig,
workerConfigs,
jobProcessFactory,
secretsHydrator,
workspaceRoot,
workerEnvironment,
logConfigs,
jobPersistence,
airbyteVersion);
}

private DbtTransformationActivityImpl getDbtActivityImpl(final WorkerConfigs workerConfigs,
final ProcessFactory jobProcessFactory) {

return new DbtTransformationActivityImpl(
containerOrchestratorConfig,
workerConfigs,
jobProcessFactory,
secretsHydrator,
workspaceRoot,
workerEnvironment,
logConfigs,
jobPersistence,
airbyteVersion);
}

private static ProcessFactory getJobProcessFactory(final Configs configs, final WorkerConfigs workerConfigs) throws IOException {
if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) {
final KubernetesClient fabricClient = new DefaultKubernetesClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ private StandardSyncOutput runChildWorkflow(GeneratedJobInput jobInputs) {
final SyncWorkflow childSync = Workflow.newChildWorkflowStub(SyncWorkflow.class,
ChildWorkflowOptions.newBuilder()
.setWorkflowId("sync_" + workflowInternalState.getJobId())
.setTaskQueue(TemporalJobType.CONNECTION_UPDATER.name())
.setTaskQueue(TemporalJobType.SYNC.name())
// This will cancel the child workflow when the parent is terminated
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL)
.build());
Expand Down
Loading