Skip to content

Commit

Permalink
Disable auto detect schema activity bits (#20615)
Browse files Browse the repository at this point in the history
* Disable auto detect schema activity bits

* Disable impacted tests

* Disable auto detect schema checks

* Add comment as to why code has been disabled

* Fix PMD warnings

* Fix PMD warning

Co-authored-by: Peter Hu <[email protected]>
  • Loading branch information
jdpgrailsdev and git-phu authored Dec 19, 2022
1 parent b7113a2 commit c0838f8
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public List<Object> checkConnectionActivities(
@Requires(env = WorkerMode.CONTROL_PLANE)
@Named("notifyActivities")
public List<Object> notifyActivities(final NotifySchemaChangeActivity notifySchemaChangeActivity,
SlackConfigActivity slackConfigActivity,
ConfigFetchActivity configFetchActivity) {
final SlackConfigActivity slackConfigActivity,
final ConfigFetchActivity configFetchActivity) {
return List.of(notifySchemaChangeActivity, slackConfigActivity, configFetchActivity);
}

Expand Down Expand Up @@ -112,10 +112,12 @@ public List<Object> syncActivities(
final PersistStateActivity persistStateActivity,
final NormalizationSummaryCheckActivity normalizationSummaryCheckActivity,
final WebhookOperationActivity webhookOperationActivity,
final ConfigFetchActivity configFetchActivity,
/*
* Temporarily disabled to address OC issue #1210 final ConfigFetchActivity configFetchActivity,
*/
final RefreshSchemaActivity refreshSchemaActivity) {
return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity, normalizationSummaryCheckActivity,
webhookOperationActivity, configFetchActivity, refreshSchemaActivity);
webhookOperationActivity, /* configFetchActivity, */ refreshSchemaActivity);
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;

import datadog.trace.api.Trace;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.config.Cron;
import io.airbyte.config.StandardSync;
Expand All @@ -20,6 +21,7 @@
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.validation.json.JsonValidationException;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
Expand All @@ -39,6 +41,7 @@

@Slf4j
@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
public class ConfigFetchActivityImpl implements ConfigFetchActivity {

private final static long MS_PER_SECOND = 1000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,16 @@
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.OperatorDbtInput;
import io.airbyte.config.OperatorWebhookInput;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.config.SyncStats;
import io.airbyte.config.WebhookOperationSummary;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
import io.temporal.workflow.Workflow;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -62,10 +57,11 @@ public class SyncWorkflowImpl implements SyncWorkflow {
private NormalizationSummaryCheckActivity normalizationSummaryCheckActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private WebhookOperationActivity webhookOperationActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private RefreshSchemaActivity refreshSchemaActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private ConfigFetchActivity configFetchActivity;
// Temporarily disabled to address OC issue #1210
// @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
// private RefreshSchemaActivity refreshSchemaActivity;
// @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
// private ConfigFetchActivity configFetchActivity;

@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME)
@Override
Expand All @@ -84,26 +80,30 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
final int version = Workflow.getVersion(VERSION_LABEL, Workflow.DEFAULT_VERSION, CURRENT_VERSION);
final String taskQueue = Workflow.getInfo().getTaskQueue();

// Temporarily suppressed to address OC issue #1210
@SuppressWarnings("PMD.UnusedLocalVariable")
final int autoDetectSchemaVersion =
Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, AUTO_DETECT_SCHEMA_VERSION);

if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) {
final Optional<UUID> sourceId = configFetchActivity.getSourceId(connectionId);

if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) {
LOGGER.info("Refreshing source schema...");
refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId);
}

final Optional<Status> status = configFetchActivity.getStatus(connectionId);
if (!status.isEmpty() && Status.INACTIVE == status.get()) {
LOGGER.info("Connection is disabled. Cancelling run.");
final StandardSyncOutput output =
new StandardSyncOutput()
.withStandardSyncSummary(new StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats()));
return output;
}
}
// Temporarily disabled to address OC issue #1210
// if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) {
// final Optional<UUID> sourceId = configFetchActivity.getSourceId(connectionId);
//
// if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) {
// LOGGER.info("Refreshing source schema...");
// refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId);
// }
//
// final Optional<Status> status = configFetchActivity.getStatus(connectionId);
// if (!status.isEmpty() && Status.INACTIVE == status.get()) {
// LOGGER.info("Connection is disabled. Cancelling run.");
// final StandardSyncOutput output =
// new StandardSyncOutput()
// .withStandardSyncSummary(new
// StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats()));
// return output;
// }
// }

StandardSyncOutput syncOutput =
replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput, taskQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,9 @@ void testSuccess() {
verifyNormalize(normalizationActivity, normalizationInput);
verifyDbtTransform(dbtTransformationActivity, syncInput.getResourceRequirements(),
operatorDbtInput);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
// Temporarily disabled to address OC issue #1210
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
assertEquals(
replicationSuccessOutput.withNormalizationSummary(normalizationSummary).getStandardSyncSummary(),
actualOutput.getStandardSyncSummary());
Expand All @@ -246,8 +247,9 @@ void testReplicationFailure() {

assertThrows(WorkflowFailedException.class, this::execute);

verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
// Temporarily disabled to address OC issue #1210
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyNoInteractions(persistStateActivity);
verifyNoInteractions(normalizationActivity);
Expand All @@ -269,8 +271,9 @@ void testReplicationFailedGracefully() {

final StandardSyncOutput actualOutput = execute();

verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
// Temporarily disabled to address OC issue #1210
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyPersistState(persistStateActivity, sync, replicationFailOutput, syncInput.getCatalog());
verifyNormalize(normalizationActivity, normalizationInput);
Expand All @@ -296,8 +299,9 @@ void testNormalizationFailure() {

assertThrows(WorkflowFailedException.class, this::execute);

verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
// Temporarily disabled to address OC issue #1210
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog());
verifyNormalize(normalizationActivity, normalizationInput);
Expand All @@ -317,8 +321,9 @@ void testCancelDuringReplication() {

assertThrows(WorkflowFailedException.class, this::execute);

verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
// Temporarily disabled to address OC issue #1210
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyNoInteractions(persistStateActivity);
verifyNoInteractions(normalizationActivity);
Expand All @@ -343,8 +348,9 @@ void testCancelDuringNormalization() {

assertThrows(WorkflowFailedException.class, this::execute);

verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
// Temporarily disabled to address OC issue #1210
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog());
verifyNormalize(normalizationActivity, normalizationInput);
Expand Down Expand Up @@ -397,9 +403,10 @@ void testWebhookOperation() {
}

@Test
@Disabled("Temporarily disabled to address OC issue #1210")
void testSkipReplicationAfterRefreshSchema() {
when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(Status.INACTIVE));
StandardSyncOutput output = execute();
final StandardSyncOutput output = execute();
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
verifyNoInteractions(replicationActivity);
Expand Down

0 comments on commit c0838f8

Please sign in to comment.