diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java index 6e9b5be2333c..b66dfa5cfd74 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java @@ -112,10 +112,10 @@ public List syncActivities( final PersistStateActivity persistStateActivity, final NormalizationSummaryCheckActivity normalizationSummaryCheckActivity, final WebhookOperationActivity webhookOperationActivity, - final ConfigFetchActivity configFetchActivity, + // final ConfigFetchActivity configFetchActivity, final RefreshSchemaActivity refreshSchemaActivity) { return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity, normalizationSummaryCheckActivity, - webhookOperationActivity, configFetchActivity, refreshSchemaActivity); + webhookOperationActivity, /* configFetchActivity, */ refreshSchemaActivity); } @Singleton diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index c93bbd3ad2f9..c839a8a28f23 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -12,7 +12,6 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; import datadog.trace.api.Trace; -import io.airbyte.api.client.model.generated.ConnectionStatus; import io.airbyte.commons.temporal.scheduling.SyncWorkflow; import io.airbyte.config.NormalizationInput; import io.airbyte.config.NormalizationSummary; @@ -22,16 +21,12 @@ import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.StandardSyncOperation.OperatorType; import io.airbyte.config.StandardSyncOutput; -import io.airbyte.config.StandardSyncSummary; -import io.airbyte.config.StandardSyncSummary.ReplicationStatus; -import io.airbyte.config.SyncStats; import io.airbyte.config.WebhookOperationSummary; import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.workers.temporal.annotations.TemporalActivityStub; -import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity; import io.temporal.workflow.Workflow; import java.util.Map; import java.util.Optional; @@ -62,10 +57,10 @@ public class SyncWorkflowImpl implements SyncWorkflow { private NormalizationSummaryCheckActivity normalizationSummaryCheckActivity; @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") private WebhookOperationActivity webhookOperationActivity; - @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") - private RefreshSchemaActivity refreshSchemaActivity; - @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") - private ConfigFetchActivity configFetchActivity; + // @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") + // private RefreshSchemaActivity refreshSchemaActivity; + // @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") + // private ConfigFetchActivity configFetchActivity; @Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME) @Override @@ -84,26 +79,28 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, final int version = Workflow.getVersion(VERSION_LABEL, Workflow.DEFAULT_VERSION, CURRENT_VERSION); final String taskQueue = Workflow.getInfo().getTaskQueue(); - final int autoDetectSchemaVersion = - Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, AUTO_DETECT_SCHEMA_VERSION); - - if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) { - final Optional sourceId = configFetchActivity.getSourceId(connectionId); - - if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) { - LOGGER.info("Refreshing source schema..."); - refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId); - } - - final Optional status = configFetchActivity.getStatus(connectionId); - if (!status.isEmpty() && ConnectionStatus.INACTIVE == status.get()) { - LOGGER.info("Connection is disabled. Cancelling run."); - final StandardSyncOutput output = - new StandardSyncOutput() - .withStandardSyncSummary(new StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats())); - return output; - } - } + // final int autoDetectSchemaVersion = + // Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, + // AUTO_DETECT_SCHEMA_VERSION); + // + // if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) { + // final Optional sourceId = configFetchActivity.getSourceId(connectionId); + // + // if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) { + // LOGGER.info("Refreshing source schema..."); + // refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId); + // } + // + // final Optional status = configFetchActivity.getStatus(connectionId); + // if (!status.isEmpty() && ConnectionStatus.INACTIVE == status.get()) { + // LOGGER.info("Connection is disabled. Cancelling run."); + // final StandardSyncOutput output = + // new StandardSyncOutput() + // .withStandardSyncSummary(new + // StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats())); + // return output; + // } + // } StandardSyncOutput syncOutput = replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput, taskQueue); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java index 9a39dd0b9a82..5c73bf469afa 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java @@ -229,8 +229,8 @@ void testSuccess() { verifyNormalize(normalizationActivity, normalizationInput); verifyDbtTransform(dbtTransformationActivity, syncInput.getResourceRequirements(), operatorDbtInput); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); assertEquals( replicationSuccessOutput.withNormalizationSummary(normalizationSummary).getStandardSyncSummary(), actualOutput.getStandardSyncSummary()); @@ -246,8 +246,8 @@ void testReplicationFailure() { assertThrows(WorkflowFailedException.class, this::execute); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyNoInteractions(persistStateActivity); verifyNoInteractions(normalizationActivity); @@ -269,8 +269,8 @@ void testReplicationFailedGracefully() { final StandardSyncOutput actualOutput = execute(); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationFailOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -296,8 +296,8 @@ void testNormalizationFailure() { assertThrows(WorkflowFailedException.class, this::execute); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -317,8 +317,8 @@ void testCancelDuringReplication() { assertThrows(WorkflowFailedException.class, this::execute); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyNoInteractions(persistStateActivity); verifyNoInteractions(normalizationActivity); @@ -343,8 +343,8 @@ void testCancelDuringNormalization() { assertThrows(WorkflowFailedException.class, this::execute); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -368,8 +368,8 @@ void testSkipNormalization() { execute(); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutputNoRecordsCommitted, syncInput.getCatalog()); verifyNoInteractions(normalizationActivity); @@ -396,16 +396,16 @@ void testWebhookOperation() { assertEquals(actualOutput.getWebhookOperationSummary().getSuccesses().get(0), WEBHOOK_CONFIG_ID); } - @Test - void testSkipReplicationAfterRefreshSchema() { - when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(ConnectionStatus.INACTIVE)); - final StandardSyncOutput output = execute(); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); - verifyNoInteractions(replicationActivity); - verifyNoInteractions(normalizationActivity); - assertEquals(output.getStandardSyncSummary().getStatus(), ReplicationStatus.CANCELLED); - } + // @Test + // void testSkipReplicationAfterRefreshSchema() { + // when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(ConnectionStatus.INACTIVE)); + // final StandardSyncOutput output = execute(); + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); + // verifyNoInteractions(replicationActivity); + // verifyNoInteractions(normalizationActivity); + // assertEquals(output.getStandardSyncSummary().getStatus(), ReplicationStatus.CANCELLED); + // } @SuppressWarnings("ResultOfMethodCallIgnored") private void cancelWorkflow() {