diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java index b66dfa5cfd74..6e9b5be2333c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java @@ -112,10 +112,10 @@ public List syncActivities( final PersistStateActivity persistStateActivity, final NormalizationSummaryCheckActivity normalizationSummaryCheckActivity, final WebhookOperationActivity webhookOperationActivity, - // final ConfigFetchActivity configFetchActivity, + final ConfigFetchActivity configFetchActivity, final RefreshSchemaActivity refreshSchemaActivity) { return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity, normalizationSummaryCheckActivity, - webhookOperationActivity, /* configFetchActivity, */ refreshSchemaActivity); + webhookOperationActivity, configFetchActivity, refreshSchemaActivity); } @Singleton diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index c839a8a28f23..0aa8eb8e9dc6 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -12,6 +12,7 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; import datadog.trace.api.Trace; +import io.airbyte.api.client.model.generated.ConnectionStatus; import io.airbyte.commons.temporal.scheduling.SyncWorkflow; import io.airbyte.config.NormalizationInput; import io.airbyte.config.NormalizationSummary; @@ -21,12 +22,16 @@ import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.StandardSyncOperation.OperatorType; import io.airbyte.config.StandardSyncOutput; +import io.airbyte.config.StandardSyncSummary; +import io.airbyte.config.StandardSyncSummary.ReplicationStatus; +import io.airbyte.config.SyncStats; import io.airbyte.config.WebhookOperationSummary; import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.workers.temporal.annotations.TemporalActivityStub; +import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity; import io.temporal.workflow.Workflow; import java.util.Map; import java.util.Optional; @@ -57,10 +62,10 @@ public class SyncWorkflowImpl implements SyncWorkflow { private NormalizationSummaryCheckActivity normalizationSummaryCheckActivity; @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") private WebhookOperationActivity webhookOperationActivity; - // @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") - // private RefreshSchemaActivity refreshSchemaActivity; - // @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") - // private ConfigFetchActivity configFetchActivity; + @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") + private RefreshSchemaActivity refreshSchemaActivity; + @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") + private ConfigFetchActivity configFetchActivity; @Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME) @Override @@ -79,28 +84,27 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, final int version = Workflow.getVersion(VERSION_LABEL, Workflow.DEFAULT_VERSION, CURRENT_VERSION); final String taskQueue = Workflow.getInfo().getTaskQueue(); - // final int autoDetectSchemaVersion = - // Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, - // AUTO_DETECT_SCHEMA_VERSION); - // - // if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) { - // final Optional sourceId = configFetchActivity.getSourceId(connectionId); - // - // if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) { - // LOGGER.info("Refreshing source schema..."); - // refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId); - // } - // - // final Optional status = configFetchActivity.getStatus(connectionId); - // if (!status.isEmpty() && ConnectionStatus.INACTIVE == status.get()) { - // LOGGER.info("Connection is disabled. Cancelling run."); - // final StandardSyncOutput output = - // new StandardSyncOutput() - // .withStandardSyncSummary(new - // StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats())); - // return output; - // } - // } + final int autoDetectSchemaVersion = + Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, + AUTO_DETECT_SCHEMA_VERSION); + + if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) { + final Optional sourceId = configFetchActivity.getSourceId(connectionId); + + if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) { + LOGGER.info("Refreshing source schema..."); + refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId); + } + + final Optional status = configFetchActivity.getStatus(connectionId); + if (!status.isEmpty() && ConnectionStatus.INACTIVE == status.get()) { + LOGGER.info("Connection is disabled. Cancelling run."); + final StandardSyncOutput output = + new StandardSyncOutput() + .withStandardSyncSummary(new StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats())); + return output; + } + } StandardSyncOutput syncOutput = replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput, taskQueue); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java index 5c73bf469afa..9a39dd0b9a82 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java @@ -229,8 +229,8 @@ void testSuccess() { verifyNormalize(normalizationActivity, normalizationInput); verifyDbtTransform(dbtTransformationActivity, syncInput.getResourceRequirements(), operatorDbtInput); - // verifyShouldRefreshSchema(refreshSchemaActivity); - // verifyRefreshSchema(refreshSchemaActivity, sync); + verifyShouldRefreshSchema(refreshSchemaActivity); + verifyRefreshSchema(refreshSchemaActivity, sync); assertEquals( replicationSuccessOutput.withNormalizationSummary(normalizationSummary).getStandardSyncSummary(), actualOutput.getStandardSyncSummary()); @@ -246,8 +246,8 @@ void testReplicationFailure() { assertThrows(WorkflowFailedException.class, this::execute); - // verifyShouldRefreshSchema(refreshSchemaActivity); - // verifyRefreshSchema(refreshSchemaActivity, sync); + verifyShouldRefreshSchema(refreshSchemaActivity); + verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyNoInteractions(persistStateActivity); verifyNoInteractions(normalizationActivity); @@ -269,8 +269,8 @@ void testReplicationFailedGracefully() { final StandardSyncOutput actualOutput = execute(); - // verifyShouldRefreshSchema(refreshSchemaActivity); - // verifyRefreshSchema(refreshSchemaActivity, sync); + verifyShouldRefreshSchema(refreshSchemaActivity); + verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationFailOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -296,8 +296,8 @@ void testNormalizationFailure() { assertThrows(WorkflowFailedException.class, this::execute); - // verifyShouldRefreshSchema(refreshSchemaActivity); - // verifyRefreshSchema(refreshSchemaActivity, sync); + verifyShouldRefreshSchema(refreshSchemaActivity); + verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -317,8 +317,8 @@ void testCancelDuringReplication() { assertThrows(WorkflowFailedException.class, this::execute); - // verifyShouldRefreshSchema(refreshSchemaActivity); - // verifyRefreshSchema(refreshSchemaActivity, sync); + verifyShouldRefreshSchema(refreshSchemaActivity); + verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyNoInteractions(persistStateActivity); verifyNoInteractions(normalizationActivity); @@ -343,8 +343,8 @@ void testCancelDuringNormalization() { assertThrows(WorkflowFailedException.class, this::execute); - // verifyShouldRefreshSchema(refreshSchemaActivity); - // verifyRefreshSchema(refreshSchemaActivity, sync); + verifyShouldRefreshSchema(refreshSchemaActivity); + verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -368,8 +368,8 @@ void testSkipNormalization() { execute(); - // verifyShouldRefreshSchema(refreshSchemaActivity); - // verifyRefreshSchema(refreshSchemaActivity, sync); + verifyShouldRefreshSchema(refreshSchemaActivity); + verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutputNoRecordsCommitted, syncInput.getCatalog()); verifyNoInteractions(normalizationActivity); @@ -396,16 +396,16 @@ void testWebhookOperation() { assertEquals(actualOutput.getWebhookOperationSummary().getSuccesses().get(0), WEBHOOK_CONFIG_ID); } - // @Test - // void testSkipReplicationAfterRefreshSchema() { - // when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(ConnectionStatus.INACTIVE)); - // final StandardSyncOutput output = execute(); - // verifyShouldRefreshSchema(refreshSchemaActivity); - // verifyRefreshSchema(refreshSchemaActivity, sync); - // verifyNoInteractions(replicationActivity); - // verifyNoInteractions(normalizationActivity); - // assertEquals(output.getStandardSyncSummary().getStatus(), ReplicationStatus.CANCELLED); - // } + @Test + void testSkipReplicationAfterRefreshSchema() { + when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(ConnectionStatus.INACTIVE)); + final StandardSyncOutput output = execute(); + verifyShouldRefreshSchema(refreshSchemaActivity); + verifyRefreshSchema(refreshSchemaActivity, sync); + verifyNoInteractions(replicationActivity); + verifyNoInteractions(normalizationActivity); + assertEquals(output.getStandardSyncSummary().getStatus(), ReplicationStatus.CANCELLED); + } @SuppressWarnings("ResultOfMethodCallIgnored") private void cancelWorkflow() {