Skip to content

Commit

Permalink
Add back in Auto detect schema functionality in sync workflow (#21361)
Browse files Browse the repository at this point in the history
* Add back in Auto detect schema functionality in sync workflow
  • Loading branch information
alovew authored Jan 17, 2023
1 parent 72a9b29 commit dd0e83e
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,10 @@ public List<Object> syncActivities(
final PersistStateActivity persistStateActivity,
final NormalizationSummaryCheckActivity normalizationSummaryCheckActivity,
final WebhookOperationActivity webhookOperationActivity,
/*
* Temporarily disabled to address OC issue #1210 final ConfigFetchActivity configFetchActivity,
*/
final ConfigFetchActivity configFetchActivity,
final RefreshSchemaActivity refreshSchemaActivity) {
return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity, normalizationSummaryCheckActivity,
webhookOperationActivity, /* configFetchActivity, */ refreshSchemaActivity);
webhookOperationActivity, configFetchActivity, refreshSchemaActivity);
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
import io.airbyte.api.client.model.generated.ConnectionScheduleType;
import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.api.client.model.generated.WorkspaceRead;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.models.Job;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
Expand All @@ -50,7 +48,6 @@

@Slf4j
@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
public class ConfigFetchActivityImpl implements ConfigFetchActivity {

private static final Logger LOGGER = LoggerFactory.getLogger(ConfigFetchActivityImpl.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
Expand All @@ -21,12 +22,16 @@
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.config.SyncStats;
import io.airbyte.config.WebhookOperationSummary;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
import io.temporal.workflow.Workflow;
import java.util.Map;
import java.util.Optional;
Expand All @@ -43,7 +48,7 @@ public class SyncWorkflowImpl implements SyncWorkflow {
private static final String NORMALIZATION_SUMMARY_CHECK_TAG = "normalization_summary_check";
private static final int NORMALIZATION_SUMMARY_CHECK_CURRENT_VERSION = 1;
private static final String AUTO_DETECT_SCHEMA_TAG = "auto_detect_schema";
private static final int AUTO_DETECT_SCHEMA_VERSION = 1;
private static final int AUTO_DETECT_SCHEMA_VERSION = 2;

@TemporalActivityStub(activityOptionsBeanName = "longRunActivityOptions")
private ReplicationActivity replicationActivity;
Expand All @@ -57,11 +62,10 @@ public class SyncWorkflowImpl implements SyncWorkflow {
private NormalizationSummaryCheckActivity normalizationSummaryCheckActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private WebhookOperationActivity webhookOperationActivity;
// Temporarily disabled to address OC issue #1210
// @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
// private RefreshSchemaActivity refreshSchemaActivity;
// @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
// private ConfigFetchActivity configFetchActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private RefreshSchemaActivity refreshSchemaActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private ConfigFetchActivity configFetchActivity;

@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME)
@Override
Expand All @@ -80,30 +84,26 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
final int version = Workflow.getVersion(VERSION_LABEL, Workflow.DEFAULT_VERSION, CURRENT_VERSION);
final String taskQueue = Workflow.getInfo().getTaskQueue();

// Temporarily suppressed to address OC issue #1210
@SuppressWarnings("PMD.UnusedLocalVariable")
final int autoDetectSchemaVersion =
Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, AUTO_DETECT_SCHEMA_VERSION);

// Temporarily disabled to address OC issue #1210
// if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) {
// final Optional<UUID> sourceId = configFetchActivity.getSourceId(connectionId);
//
// if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) {
// LOGGER.info("Refreshing source schema...");
// refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId);
// }
//
// final Optional<Status> status = configFetchActivity.getStatus(connectionId);
// if (!status.isEmpty() && Status.INACTIVE == status.get()) {
// LOGGER.info("Connection is disabled. Cancelling run.");
// final StandardSyncOutput output =
// new StandardSyncOutput()
// .withStandardSyncSummary(new
// StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats()));
// return output;
// }
// }
if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) {
final Optional<UUID> sourceId = configFetchActivity.getSourceId(connectionId);

if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) {
LOGGER.info("Refreshing source schema...");
refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId);
}

final Optional<ConnectionStatus> status = configFetchActivity.getStatus(connectionId);
if (!status.isEmpty() && ConnectionStatus.INACTIVE == status.get()) {
LOGGER.info("Connection is disabled. Cancelling run.");
final StandardSyncOutput output =
new StandardSyncOutput()
.withStandardSyncSummary(new StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats()));
return output;
}
}

StandardSyncOutput syncOutput =
replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput, taskQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,8 @@ void testSuccess() {
verifyNormalize(normalizationActivity, normalizationInput);
verifyDbtTransform(dbtTransformationActivity, syncInput.getResourceRequirements(),
operatorDbtInput);
// Temporarily disabled to address OC issue #1210
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
assertEquals(
replicationSuccessOutput.withNormalizationSummary(normalizationSummary).getStandardSyncSummary(),
actualOutput.getStandardSyncSummary());
Expand All @@ -247,9 +246,8 @@ void testReplicationFailure() {

assertThrows(WorkflowFailedException.class, this::execute);

// Temporarily disabled to address OC issue #1210
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyNoInteractions(persistStateActivity);
verifyNoInteractions(normalizationActivity);
Expand All @@ -271,9 +269,8 @@ void testReplicationFailedGracefully() {

final StandardSyncOutput actualOutput = execute();

// Temporarily disabled to address OC issue #1210
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyPersistState(persistStateActivity, sync, replicationFailOutput, syncInput.getCatalog());
verifyNormalize(normalizationActivity, normalizationInput);
Expand All @@ -299,9 +296,8 @@ void testNormalizationFailure() {

assertThrows(WorkflowFailedException.class, this::execute);

// Temporarily disabled to address OC issue #1210
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog());
verifyNormalize(normalizationActivity, normalizationInput);
Expand All @@ -321,9 +317,8 @@ void testCancelDuringReplication() {

assertThrows(WorkflowFailedException.class, this::execute);

// Temporarily disabled to address OC issue #1210
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyNoInteractions(persistStateActivity);
verifyNoInteractions(normalizationActivity);
Expand All @@ -348,9 +343,8 @@ void testCancelDuringNormalization() {

assertThrows(WorkflowFailedException.class, this::execute);

// Temporarily disabled to address OC issue #1210
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog());
verifyNormalize(normalizationActivity, normalizationInput);
Expand Down Expand Up @@ -403,7 +397,6 @@ void testWebhookOperation() {
}

@Test
@Disabled("Temporarily disabled to address OC issue #1210")
void testSkipReplicationAfterRefreshSchema() {
when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(ConnectionStatus.INACTIVE));
final StandardSyncOutput output = execute();
Expand Down

0 comments on commit dd0e83e

Please sign in to comment.