Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert uncommenting auto detect schema in sync workflow #21614

Merged
merged 2 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ public List<Object> syncActivities(
final PersistStateActivity persistStateActivity,
final NormalizationSummaryCheckActivity normalizationSummaryCheckActivity,
final WebhookOperationActivity webhookOperationActivity,
final ConfigFetchActivity configFetchActivity,
// final ConfigFetchActivity configFetchActivity,
final RefreshSchemaActivity refreshSchemaActivity) {
return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity, normalizationSummaryCheckActivity,
webhookOperationActivity, configFetchActivity, refreshSchemaActivity);
webhookOperationActivity, /* configFetchActivity, */ refreshSchemaActivity);
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
Expand All @@ -22,16 +21,12 @@
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.config.SyncStats;
import io.airbyte.config.WebhookOperationSummary;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
import io.temporal.workflow.Workflow;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -62,10 +57,10 @@ public class SyncWorkflowImpl implements SyncWorkflow {
private NormalizationSummaryCheckActivity normalizationSummaryCheckActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private WebhookOperationActivity webhookOperationActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private RefreshSchemaActivity refreshSchemaActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private ConfigFetchActivity configFetchActivity;
// @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
// private RefreshSchemaActivity refreshSchemaActivity;
// @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
// private ConfigFetchActivity configFetchActivity;

@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME)
@Override
Expand All @@ -84,26 +79,28 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
final int version = Workflow.getVersion(VERSION_LABEL, Workflow.DEFAULT_VERSION, CURRENT_VERSION);
final String taskQueue = Workflow.getInfo().getTaskQueue();

final int autoDetectSchemaVersion =
Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, AUTO_DETECT_SCHEMA_VERSION);

if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) {
final Optional<UUID> sourceId = configFetchActivity.getSourceId(connectionId);

if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) {
LOGGER.info("Refreshing source schema...");
refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId);
}

final Optional<ConnectionStatus> status = configFetchActivity.getStatus(connectionId);
if (!status.isEmpty() && ConnectionStatus.INACTIVE == status.get()) {
LOGGER.info("Connection is disabled. Cancelling run.");
final StandardSyncOutput output =
new StandardSyncOutput()
.withStandardSyncSummary(new StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats()));
return output;
}
}
// final int autoDetectSchemaVersion =
// Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION,
// AUTO_DETECT_SCHEMA_VERSION);
//
// if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) {
// final Optional<UUID> sourceId = configFetchActivity.getSourceId(connectionId);
//
// if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) {
// LOGGER.info("Refreshing source schema...");
// refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId);
// }
//
// final Optional<ConnectionStatus> status = configFetchActivity.getStatus(connectionId);
// if (!status.isEmpty() && ConnectionStatus.INACTIVE == status.get()) {
// LOGGER.info("Connection is disabled. Cancelling run.");
// final StandardSyncOutput output =
// new StandardSyncOutput()
// .withStandardSyncSummary(new
// StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats()));
// return output;
// }
// }

StandardSyncOutput syncOutput =
replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput, taskQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ void testSuccess() {
verifyNormalize(normalizationActivity, normalizationInput);
verifyDbtTransform(dbtTransformationActivity, syncInput.getResourceRequirements(),
operatorDbtInput);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
assertEquals(
replicationSuccessOutput.withNormalizationSummary(normalizationSummary).getStandardSyncSummary(),
actualOutput.getStandardSyncSummary());
Expand All @@ -246,8 +246,8 @@ void testReplicationFailure() {

assertThrows(WorkflowFailedException.class, this::execute);

verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyNoInteractions(persistStateActivity);
verifyNoInteractions(normalizationActivity);
Expand All @@ -269,8 +269,8 @@ void testReplicationFailedGracefully() {

final StandardSyncOutput actualOutput = execute();

verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyPersistState(persistStateActivity, sync, replicationFailOutput, syncInput.getCatalog());
verifyNormalize(normalizationActivity, normalizationInput);
Expand All @@ -296,8 +296,8 @@ void testNormalizationFailure() {

assertThrows(WorkflowFailedException.class, this::execute);

verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog());
verifyNormalize(normalizationActivity, normalizationInput);
Expand All @@ -317,8 +317,8 @@ void testCancelDuringReplication() {

assertThrows(WorkflowFailedException.class, this::execute);

verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyNoInteractions(persistStateActivity);
verifyNoInteractions(normalizationActivity);
Expand All @@ -343,8 +343,8 @@ void testCancelDuringNormalization() {

assertThrows(WorkflowFailedException.class, this::execute);

verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog());
verifyNormalize(normalizationActivity, normalizationInput);
Expand All @@ -368,8 +368,8 @@ void testSkipNormalization() {

execute();

verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyPersistState(persistStateActivity, sync, replicationSuccessOutputNoRecordsCommitted, syncInput.getCatalog());
verifyNoInteractions(normalizationActivity);
Expand All @@ -396,16 +396,16 @@ void testWebhookOperation() {
assertEquals(actualOutput.getWebhookOperationSummary().getSuccesses().get(0), WEBHOOK_CONFIG_ID);
}

@Test
void testSkipReplicationAfterRefreshSchema() {
when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(ConnectionStatus.INACTIVE));
final StandardSyncOutput output = execute();
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
verifyNoInteractions(replicationActivity);
verifyNoInteractions(normalizationActivity);
assertEquals(output.getStandardSyncSummary().getStatus(), ReplicationStatus.CANCELLED);
}
// @Test
// void testSkipReplicationAfterRefreshSchema() {
// when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(ConnectionStatus.INACTIVE));
// final StandardSyncOutput output = execute();
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
// verifyNoInteractions(replicationActivity);
// verifyNoInteractions(normalizationActivity);
// assertEquals(output.getStandardSyncSummary().getStatus(), ReplicationStatus.CANCELLED);
// }

@SuppressWarnings("ResultOfMethodCallIgnored")
private void cancelWorkflow() {
Expand Down