diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java index 7c333e8a0b75..5a5a55332009 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java @@ -58,8 +58,8 @@ public List checkConnectionActivities( @Requires(env = WorkerMode.CONTROL_PLANE) @Named("notifyActivities") public List notifyActivities(final NotifySchemaChangeActivity notifySchemaChangeActivity, - SlackConfigActivity slackConfigActivity, - ConfigFetchActivity configFetchActivity) { + final SlackConfigActivity slackConfigActivity, + final ConfigFetchActivity configFetchActivity) { return List.of(notifySchemaChangeActivity, slackConfigActivity, configFetchActivity); } @@ -112,10 +112,12 @@ public List syncActivities( final PersistStateActivity persistStateActivity, final NormalizationSummaryCheckActivity normalizationSummaryCheckActivity, final WebhookOperationActivity webhookOperationActivity, - final ConfigFetchActivity configFetchActivity, + /* + * Temporarily disabled to address OC issue #1210 final ConfigFetchActivity configFetchActivity, + */ final RefreshSchemaActivity refreshSchemaActivity) { return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity, normalizationSummaryCheckActivity, - webhookOperationActivity, configFetchActivity, refreshSchemaActivity); + webhookOperationActivity, /* configFetchActivity, */ refreshSchemaActivity); } @Singleton diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java index 6e48ccfef5a9..514a56eba4f9 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java @@ -8,6 +8,7 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; import datadog.trace.api.Trace; +import io.airbyte.commons.temporal.config.WorkerMode; import io.airbyte.commons.temporal.exception.RetryableException; import io.airbyte.config.Cron; import io.airbyte.config.StandardSync; @@ -20,6 +21,7 @@ import io.airbyte.persistence.job.JobPersistence; import io.airbyte.persistence.job.models.Job; import io.airbyte.validation.json.JsonValidationException; +import io.micronaut.context.annotation.Requires; import io.micronaut.context.annotation.Value; import jakarta.inject.Named; import jakarta.inject.Singleton; @@ -39,6 +41,7 @@ @Slf4j @Singleton +@Requires(env = WorkerMode.CONTROL_PLANE) public class ConfigFetchActivityImpl implements ConfigFetchActivity { private final static long MS_PER_SECOND = 1000L; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index e0c4b0f55af8..5c4c19245ed7 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -17,21 +17,16 @@ import io.airbyte.config.NormalizationSummary; import io.airbyte.config.OperatorDbtInput; import io.airbyte.config.OperatorWebhookInput; -import io.airbyte.config.StandardSync.Status; import io.airbyte.config.StandardSyncInput; import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.StandardSyncOperation.OperatorType; import io.airbyte.config.StandardSyncOutput; -import io.airbyte.config.StandardSyncSummary; -import io.airbyte.config.StandardSyncSummary.ReplicationStatus; -import io.airbyte.config.SyncStats; import io.airbyte.config.WebhookOperationSummary; import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.workers.temporal.annotations.TemporalActivityStub; -import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity; import io.temporal.workflow.Workflow; import java.util.Map; import java.util.Optional; @@ -62,10 +57,11 @@ public class SyncWorkflowImpl implements SyncWorkflow { private NormalizationSummaryCheckActivity normalizationSummaryCheckActivity; @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") private WebhookOperationActivity webhookOperationActivity; - @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") - private RefreshSchemaActivity refreshSchemaActivity; - @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") - private ConfigFetchActivity configFetchActivity; + // Temporarily disabled to address OC issue #1210 + // @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") + // private RefreshSchemaActivity refreshSchemaActivity; + // @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") + // private ConfigFetchActivity configFetchActivity; @Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME) @Override @@ -84,26 +80,30 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, final int version = Workflow.getVersion(VERSION_LABEL, Workflow.DEFAULT_VERSION, CURRENT_VERSION); final String taskQueue = Workflow.getInfo().getTaskQueue(); + // Temporarily suppressed to address OC issue #1210 + @SuppressWarnings("PMD.UnusedLocalVariable") final int autoDetectSchemaVersion = Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, AUTO_DETECT_SCHEMA_VERSION); - if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) { - final Optional sourceId = configFetchActivity.getSourceId(connectionId); - - if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) { - LOGGER.info("Refreshing source schema..."); - refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId); - } - - final Optional status = configFetchActivity.getStatus(connectionId); - if (!status.isEmpty() && Status.INACTIVE == status.get()) { - LOGGER.info("Connection is disabled. Cancelling run."); - final StandardSyncOutput output = - new StandardSyncOutput() - .withStandardSyncSummary(new StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats())); - return output; - } - } + // Temporarily disabled to address OC issue #1210 + // if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) { + // final Optional sourceId = configFetchActivity.getSourceId(connectionId); + // + // if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) { + // LOGGER.info("Refreshing source schema..."); + // refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId); + // } + // + // final Optional status = configFetchActivity.getStatus(connectionId); + // if (!status.isEmpty() && Status.INACTIVE == status.get()) { + // LOGGER.info("Connection is disabled. Cancelling run."); + // final StandardSyncOutput output = + // new StandardSyncOutput() + // .withStandardSyncSummary(new + // StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats())); + // return output; + // } + // } StandardSyncOutput syncOutput = replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput, taskQueue); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java index 6ffac60aeef3..15c4a7b87a9c 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java @@ -229,8 +229,9 @@ void testSuccess() { verifyNormalize(normalizationActivity, normalizationInput); verifyDbtTransform(dbtTransformationActivity, syncInput.getResourceRequirements(), operatorDbtInput); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); + // Temporarily disabled to address OC issue #1210 + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); assertEquals( replicationSuccessOutput.withNormalizationSummary(normalizationSummary).getStandardSyncSummary(), actualOutput.getStandardSyncSummary()); @@ -246,8 +247,9 @@ void testReplicationFailure() { assertThrows(WorkflowFailedException.class, this::execute); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); + // Temporarily disabled to address OC issue #1210 + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyNoInteractions(persistStateActivity); verifyNoInteractions(normalizationActivity); @@ -269,8 +271,9 @@ void testReplicationFailedGracefully() { final StandardSyncOutput actualOutput = execute(); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); + // Temporarily disabled to address OC issue #1210 + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationFailOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -296,8 +299,9 @@ void testNormalizationFailure() { assertThrows(WorkflowFailedException.class, this::execute); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); + // Temporarily disabled to address OC issue #1210 + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -317,8 +321,9 @@ void testCancelDuringReplication() { assertThrows(WorkflowFailedException.class, this::execute); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); + // Temporarily disabled to address OC issue #1210 + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyNoInteractions(persistStateActivity); verifyNoInteractions(normalizationActivity); @@ -343,8 +348,9 @@ void testCancelDuringNormalization() { assertThrows(WorkflowFailedException.class, this::execute); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); + // Temporarily disabled to address OC issue #1210 + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -397,9 +403,10 @@ void testWebhookOperation() { } @Test + @Disabled("Temporarily disabled to address OC issue #1210") void testSkipReplicationAfterRefreshSchema() { when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(Status.INACTIVE)); - StandardSyncOutput output = execute(); + final StandardSyncOutput output = execute(); verifyShouldRefreshSchema(refreshSchemaActivity); verifyRefreshSchema(refreshSchemaActivity, sync); verifyNoInteractions(replicationActivity);