From 0fd3acd3aa525f12e26e1f035224605e589185eb Mon Sep 17 00:00:00 2001 From: alovew Date: Thu, 19 Jan 2023 10:04:34 -0800 Subject: [PATCH 1/2] Revert uncommenting auto detect schema in sync workflow --- .../workers/config/ActivityBeanFactory.java | 4 +- .../temporal/sync/SyncWorkflowImpl.java | 48 +++++++++---------- .../temporal/sync/SyncWorkflowTest.java | 48 +++++++++---------- 3 files changed, 50 insertions(+), 50 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java index 6e9b5be2333c..da71da54f4a4 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java @@ -112,10 +112,10 @@ public List syncActivities( final PersistStateActivity persistStateActivity, final NormalizationSummaryCheckActivity normalizationSummaryCheckActivity, final WebhookOperationActivity webhookOperationActivity, - final ConfigFetchActivity configFetchActivity, +// final ConfigFetchActivity configFetchActivity, final RefreshSchemaActivity refreshSchemaActivity) { return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity, normalizationSummaryCheckActivity, - webhookOperationActivity, configFetchActivity, refreshSchemaActivity); + webhookOperationActivity, /* configFetchActivity, */ refreshSchemaActivity); } @Singleton diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index c93bbd3ad2f9..47b7d15c96c3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -62,10 +62,10 @@ public class SyncWorkflowImpl implements SyncWorkflow { private NormalizationSummaryCheckActivity normalizationSummaryCheckActivity; @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") private WebhookOperationActivity webhookOperationActivity; - @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") - private RefreshSchemaActivity refreshSchemaActivity; - @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") - private ConfigFetchActivity configFetchActivity; +// @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") +// private RefreshSchemaActivity refreshSchemaActivity; +// @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") +// private ConfigFetchActivity configFetchActivity; @Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME) @Override @@ -84,26 +84,26 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, final int version = Workflow.getVersion(VERSION_LABEL, Workflow.DEFAULT_VERSION, CURRENT_VERSION); final String taskQueue = Workflow.getInfo().getTaskQueue(); - final int autoDetectSchemaVersion = - Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, AUTO_DETECT_SCHEMA_VERSION); - - if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) { - final Optional sourceId = configFetchActivity.getSourceId(connectionId); - - if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) { - LOGGER.info("Refreshing source schema..."); - refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId); - } - - final Optional status = configFetchActivity.getStatus(connectionId); - if (!status.isEmpty() && ConnectionStatus.INACTIVE == status.get()) { - LOGGER.info("Connection is disabled. Cancelling run."); - final StandardSyncOutput output = - new StandardSyncOutput() - .withStandardSyncSummary(new StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats())); - return output; - } - } +// final int autoDetectSchemaVersion = +// Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, AUTO_DETECT_SCHEMA_VERSION); +// +// if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) { +// final Optional sourceId = configFetchActivity.getSourceId(connectionId); +// +// if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) { +// LOGGER.info("Refreshing source schema..."); +// refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId); +// } +// +// final Optional status = configFetchActivity.getStatus(connectionId); +// if (!status.isEmpty() && ConnectionStatus.INACTIVE == status.get()) { +// LOGGER.info("Connection is disabled. Cancelling run."); +// final StandardSyncOutput output = +// new StandardSyncOutput() +// .withStandardSyncSummary(new StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats())); +// return output; +// } +// } StandardSyncOutput syncOutput = replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput, taskQueue); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java index 9a39dd0b9a82..1d3c08c27efd 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java @@ -229,8 +229,8 @@ void testSuccess() { verifyNormalize(normalizationActivity, normalizationInput); verifyDbtTransform(dbtTransformationActivity, syncInput.getResourceRequirements(), operatorDbtInput); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); +// verifyShouldRefreshSchema(refreshSchemaActivity); +// verifyRefreshSchema(refreshSchemaActivity, sync); assertEquals( replicationSuccessOutput.withNormalizationSummary(normalizationSummary).getStandardSyncSummary(), actualOutput.getStandardSyncSummary()); @@ -246,8 +246,8 @@ void testReplicationFailure() { assertThrows(WorkflowFailedException.class, this::execute); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); +// verifyShouldRefreshSchema(refreshSchemaActivity); +// verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyNoInteractions(persistStateActivity); verifyNoInteractions(normalizationActivity); @@ -269,8 +269,8 @@ void testReplicationFailedGracefully() { final StandardSyncOutput actualOutput = execute(); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); +// verifyShouldRefreshSchema(refreshSchemaActivity); +// verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationFailOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -296,8 +296,8 @@ void testNormalizationFailure() { assertThrows(WorkflowFailedException.class, this::execute); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); +// verifyShouldRefreshSchema(refreshSchemaActivity); +// verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -317,8 +317,8 @@ void testCancelDuringReplication() { assertThrows(WorkflowFailedException.class, this::execute); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); +// verifyShouldRefreshSchema(refreshSchemaActivity); +// verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyNoInteractions(persistStateActivity); verifyNoInteractions(normalizationActivity); @@ -343,8 +343,8 @@ void testCancelDuringNormalization() { assertThrows(WorkflowFailedException.class, this::execute); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); +// verifyShouldRefreshSchema(refreshSchemaActivity); +// verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -368,8 +368,8 @@ void testSkipNormalization() { execute(); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); +// verifyShouldRefreshSchema(refreshSchemaActivity); +// verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutputNoRecordsCommitted, syncInput.getCatalog()); verifyNoInteractions(normalizationActivity); @@ -396,16 +396,16 @@ void testWebhookOperation() { assertEquals(actualOutput.getWebhookOperationSummary().getSuccesses().get(0), WEBHOOK_CONFIG_ID); } - @Test - void testSkipReplicationAfterRefreshSchema() { - when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(ConnectionStatus.INACTIVE)); - final StandardSyncOutput output = execute(); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); - verifyNoInteractions(replicationActivity); - verifyNoInteractions(normalizationActivity); - assertEquals(output.getStandardSyncSummary().getStatus(), ReplicationStatus.CANCELLED); - } +// @Test +// void testSkipReplicationAfterRefreshSchema() { +// when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(ConnectionStatus.INACTIVE)); +// final StandardSyncOutput output = execute(); +// verifyShouldRefreshSchema(refreshSchemaActivity); +// verifyRefreshSchema(refreshSchemaActivity, sync); +// verifyNoInteractions(replicationActivity); +// verifyNoInteractions(normalizationActivity); +// assertEquals(output.getStandardSyncSummary().getStatus(), ReplicationStatus.CANCELLED); +// } @SuppressWarnings("ResultOfMethodCallIgnored") private void cancelWorkflow() { From 642cbf1c0afc2912010d49cddad06c0e83b45bcb Mon Sep 17 00:00:00 2001 From: alovew Date: Thu, 19 Jan 2023 10:17:11 -0800 Subject: [PATCH 2/2] formatting --- .../workers/config/ActivityBeanFactory.java | 2 +- .../temporal/sync/SyncWorkflowImpl.java | 55 +++++++++---------- .../temporal/sync/SyncWorkflowTest.java | 48 ++++++++-------- 3 files changed, 51 insertions(+), 54 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java index da71da54f4a4..b66dfa5cfd74 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java @@ -112,7 +112,7 @@ public List syncActivities( final PersistStateActivity persistStateActivity, final NormalizationSummaryCheckActivity normalizationSummaryCheckActivity, final WebhookOperationActivity webhookOperationActivity, -// final ConfigFetchActivity configFetchActivity, + // final ConfigFetchActivity configFetchActivity, final RefreshSchemaActivity refreshSchemaActivity) { return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity, normalizationSummaryCheckActivity, webhookOperationActivity, /* configFetchActivity, */ refreshSchemaActivity); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index 47b7d15c96c3..c839a8a28f23 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -12,7 +12,6 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; import datadog.trace.api.Trace; -import io.airbyte.api.client.model.generated.ConnectionStatus; import io.airbyte.commons.temporal.scheduling.SyncWorkflow; import io.airbyte.config.NormalizationInput; import io.airbyte.config.NormalizationSummary; @@ -22,16 +21,12 @@ import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.StandardSyncOperation.OperatorType; import io.airbyte.config.StandardSyncOutput; -import io.airbyte.config.StandardSyncSummary; -import io.airbyte.config.StandardSyncSummary.ReplicationStatus; -import io.airbyte.config.SyncStats; import io.airbyte.config.WebhookOperationSummary; import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.workers.temporal.annotations.TemporalActivityStub; -import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity; import io.temporal.workflow.Workflow; import java.util.Map; import java.util.Optional; @@ -62,10 +57,10 @@ public class SyncWorkflowImpl implements SyncWorkflow { private NormalizationSummaryCheckActivity normalizationSummaryCheckActivity; @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") private WebhookOperationActivity webhookOperationActivity; -// @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") -// private RefreshSchemaActivity refreshSchemaActivity; -// @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") -// private ConfigFetchActivity configFetchActivity; + // @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") + // private RefreshSchemaActivity refreshSchemaActivity; + // @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") + // private ConfigFetchActivity configFetchActivity; @Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME) @Override @@ -84,26 +79,28 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, final int version = Workflow.getVersion(VERSION_LABEL, Workflow.DEFAULT_VERSION, CURRENT_VERSION); final String taskQueue = Workflow.getInfo().getTaskQueue(); -// final int autoDetectSchemaVersion = -// Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, AUTO_DETECT_SCHEMA_VERSION); -// -// if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) { -// final Optional sourceId = configFetchActivity.getSourceId(connectionId); -// -// if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) { -// LOGGER.info("Refreshing source schema..."); -// refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId); -// } -// -// final Optional status = configFetchActivity.getStatus(connectionId); -// if (!status.isEmpty() && ConnectionStatus.INACTIVE == status.get()) { -// LOGGER.info("Connection is disabled. Cancelling run."); -// final StandardSyncOutput output = -// new StandardSyncOutput() -// .withStandardSyncSummary(new StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats())); -// return output; -// } -// } + // final int autoDetectSchemaVersion = + // Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, + // AUTO_DETECT_SCHEMA_VERSION); + // + // if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) { + // final Optional sourceId = configFetchActivity.getSourceId(connectionId); + // + // if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) { + // LOGGER.info("Refreshing source schema..."); + // refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId); + // } + // + // final Optional status = configFetchActivity.getStatus(connectionId); + // if (!status.isEmpty() && ConnectionStatus.INACTIVE == status.get()) { + // LOGGER.info("Connection is disabled. Cancelling run."); + // final StandardSyncOutput output = + // new StandardSyncOutput() + // .withStandardSyncSummary(new + // StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats())); + // return output; + // } + // } StandardSyncOutput syncOutput = replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput, taskQueue); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java index 1d3c08c27efd..5c73bf469afa 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java @@ -229,8 +229,8 @@ void testSuccess() { verifyNormalize(normalizationActivity, normalizationInput); verifyDbtTransform(dbtTransformationActivity, syncInput.getResourceRequirements(), operatorDbtInput); -// verifyShouldRefreshSchema(refreshSchemaActivity); -// verifyRefreshSchema(refreshSchemaActivity, sync); + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); assertEquals( replicationSuccessOutput.withNormalizationSummary(normalizationSummary).getStandardSyncSummary(), actualOutput.getStandardSyncSummary()); @@ -246,8 +246,8 @@ void testReplicationFailure() { assertThrows(WorkflowFailedException.class, this::execute); -// verifyShouldRefreshSchema(refreshSchemaActivity); -// verifyRefreshSchema(refreshSchemaActivity, sync); + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyNoInteractions(persistStateActivity); verifyNoInteractions(normalizationActivity); @@ -269,8 +269,8 @@ void testReplicationFailedGracefully() { final StandardSyncOutput actualOutput = execute(); -// verifyShouldRefreshSchema(refreshSchemaActivity); -// verifyRefreshSchema(refreshSchemaActivity, sync); + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationFailOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -296,8 +296,8 @@ void testNormalizationFailure() { assertThrows(WorkflowFailedException.class, this::execute); -// verifyShouldRefreshSchema(refreshSchemaActivity); -// verifyRefreshSchema(refreshSchemaActivity, sync); + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -317,8 +317,8 @@ void testCancelDuringReplication() { assertThrows(WorkflowFailedException.class, this::execute); -// verifyShouldRefreshSchema(refreshSchemaActivity); -// verifyRefreshSchema(refreshSchemaActivity, sync); + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyNoInteractions(persistStateActivity); verifyNoInteractions(normalizationActivity); @@ -343,8 +343,8 @@ void testCancelDuringNormalization() { assertThrows(WorkflowFailedException.class, this::execute); -// verifyShouldRefreshSchema(refreshSchemaActivity); -// verifyRefreshSchema(refreshSchemaActivity, sync); + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -368,8 +368,8 @@ void testSkipNormalization() { execute(); -// verifyShouldRefreshSchema(refreshSchemaActivity); -// verifyRefreshSchema(refreshSchemaActivity, sync); + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutputNoRecordsCommitted, syncInput.getCatalog()); verifyNoInteractions(normalizationActivity); @@ -396,16 +396,16 @@ void testWebhookOperation() { assertEquals(actualOutput.getWebhookOperationSummary().getSuccesses().get(0), WEBHOOK_CONFIG_ID); } -// @Test -// void testSkipReplicationAfterRefreshSchema() { -// when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(ConnectionStatus.INACTIVE)); -// final StandardSyncOutput output = execute(); -// verifyShouldRefreshSchema(refreshSchemaActivity); -// verifyRefreshSchema(refreshSchemaActivity, sync); -// verifyNoInteractions(replicationActivity); -// verifyNoInteractions(normalizationActivity); -// assertEquals(output.getStandardSyncSummary().getStatus(), ReplicationStatus.CANCELLED); -// } + // @Test + // void testSkipReplicationAfterRefreshSchema() { + // when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(ConnectionStatus.INACTIVE)); + // final StandardSyncOutput output = execute(); + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); + // verifyNoInteractions(replicationActivity); + // verifyNoInteractions(normalizationActivity); + // assertEquals(output.getStandardSyncSummary().getStatus(), ReplicationStatus.CANCELLED); + // } @SuppressWarnings("ResultOfMethodCallIgnored") private void cancelWorkflow() {