Skip to content

Commit

Permalink
Uncomment auto detect schema (#21731)
Browse files Browse the repository at this point in the history
  • Loading branch information
alovew authored Jan 23, 2023
1 parent 39ec49b commit 1fba39a
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ public List<Object> syncActivities(
final PersistStateActivity persistStateActivity,
final NormalizationSummaryCheckActivity normalizationSummaryCheckActivity,
final WebhookOperationActivity webhookOperationActivity,
// final ConfigFetchActivity configFetchActivity,
final ConfigFetchActivity configFetchActivity,
final RefreshSchemaActivity refreshSchemaActivity) {
return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity, normalizationSummaryCheckActivity,
webhookOperationActivity, /* configFetchActivity, */ refreshSchemaActivity);
webhookOperationActivity, configFetchActivity, refreshSchemaActivity);
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
Expand All @@ -21,12 +22,16 @@
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.config.SyncStats;
import io.airbyte.config.WebhookOperationSummary;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
import io.temporal.workflow.Workflow;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -57,10 +62,10 @@ public class SyncWorkflowImpl implements SyncWorkflow {
private NormalizationSummaryCheckActivity normalizationSummaryCheckActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private WebhookOperationActivity webhookOperationActivity;
// @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
// private RefreshSchemaActivity refreshSchemaActivity;
// @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
// private ConfigFetchActivity configFetchActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private RefreshSchemaActivity refreshSchemaActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private ConfigFetchActivity configFetchActivity;

@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME)
@Override
Expand All @@ -79,28 +84,27 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
final int version = Workflow.getVersion(VERSION_LABEL, Workflow.DEFAULT_VERSION, CURRENT_VERSION);
final String taskQueue = Workflow.getInfo().getTaskQueue();

// final int autoDetectSchemaVersion =
// Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION,
// AUTO_DETECT_SCHEMA_VERSION);
//
// if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) {
// final Optional<UUID> sourceId = configFetchActivity.getSourceId(connectionId);
//
// if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) {
// LOGGER.info("Refreshing source schema...");
// refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId);
// }
//
// final Optional<ConnectionStatus> status = configFetchActivity.getStatus(connectionId);
// if (!status.isEmpty() && ConnectionStatus.INACTIVE == status.get()) {
// LOGGER.info("Connection is disabled. Cancelling run.");
// final StandardSyncOutput output =
// new StandardSyncOutput()
// .withStandardSyncSummary(new
// StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats()));
// return output;
// }
// }
final int autoDetectSchemaVersion =
Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION,
AUTO_DETECT_SCHEMA_VERSION);

if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) {
final Optional<UUID> sourceId = configFetchActivity.getSourceId(connectionId);

if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) {
LOGGER.info("Refreshing source schema...");
refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId);
}

final Optional<ConnectionStatus> status = configFetchActivity.getStatus(connectionId);
if (!status.isEmpty() && ConnectionStatus.INACTIVE == status.get()) {
LOGGER.info("Connection is disabled. Cancelling run.");
final StandardSyncOutput output =
new StandardSyncOutput()
.withStandardSyncSummary(new StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats()));
return output;
}
}

StandardSyncOutput syncOutput =
replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput, taskQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ void testSuccess() {
verifyNormalize(normalizationActivity, normalizationInput);
verifyDbtTransform(dbtTransformationActivity, syncInput.getResourceRequirements(),
operatorDbtInput);
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
assertEquals(
replicationSuccessOutput.withNormalizationSummary(normalizationSummary).getStandardSyncSummary(),
actualOutput.getStandardSyncSummary());
Expand All @@ -246,8 +246,8 @@ void testReplicationFailure() {

assertThrows(WorkflowFailedException.class, this::execute);

// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyNoInteractions(persistStateActivity);
verifyNoInteractions(normalizationActivity);
Expand All @@ -269,8 +269,8 @@ void testReplicationFailedGracefully() {

final StandardSyncOutput actualOutput = execute();

// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyPersistState(persistStateActivity, sync, replicationFailOutput, syncInput.getCatalog());
verifyNormalize(normalizationActivity, normalizationInput);
Expand All @@ -296,8 +296,8 @@ void testNormalizationFailure() {

assertThrows(WorkflowFailedException.class, this::execute);

// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog());
verifyNormalize(normalizationActivity, normalizationInput);
Expand All @@ -317,8 +317,8 @@ void testCancelDuringReplication() {

assertThrows(WorkflowFailedException.class, this::execute);

// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyNoInteractions(persistStateActivity);
verifyNoInteractions(normalizationActivity);
Expand All @@ -343,8 +343,8 @@ void testCancelDuringNormalization() {

assertThrows(WorkflowFailedException.class, this::execute);

// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog());
verifyNormalize(normalizationActivity, normalizationInput);
Expand All @@ -368,8 +368,8 @@ void testSkipNormalization() {

execute();

// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyPersistState(persistStateActivity, sync, replicationSuccessOutputNoRecordsCommitted, syncInput.getCatalog());
verifyNoInteractions(normalizationActivity);
Expand All @@ -396,16 +396,16 @@ void testWebhookOperation() {
assertEquals(actualOutput.getWebhookOperationSummary().getSuccesses().get(0), WEBHOOK_CONFIG_ID);
}

// @Test
// void testSkipReplicationAfterRefreshSchema() {
// when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(ConnectionStatus.INACTIVE));
// final StandardSyncOutput output = execute();
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
// verifyNoInteractions(replicationActivity);
// verifyNoInteractions(normalizationActivity);
// assertEquals(output.getStandardSyncSummary().getStatus(), ReplicationStatus.CANCELLED);
// }
@Test
void testSkipReplicationAfterRefreshSchema() {
when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(ConnectionStatus.INACTIVE));
final StandardSyncOutput output = execute();
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
verifyNoInteractions(replicationActivity);
verifyNoInteractions(normalizationActivity);
assertEquals(output.getStandardSyncSummary().getStatus(), ReplicationStatus.CANCELLED);
}

@SuppressWarnings("ResultOfMethodCallIgnored")
private void cancelWorkflow() {
Expand Down

0 comments on commit 1fba39a

Please sign in to comment.