Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add back in Auto detect schema functionality in sync workflow #21361

Merged
merged 5 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,10 @@ public List<Object> syncActivities(
final PersistStateActivity persistStateActivity,
final NormalizationSummaryCheckActivity normalizationSummaryCheckActivity,
final WebhookOperationActivity webhookOperationActivity,
/*
* Temporarily disabled to address OC issue #1210 final ConfigFetchActivity configFetchActivity,
*/
final ConfigFetchActivity configFetchActivity,
final RefreshSchemaActivity refreshSchemaActivity) {
return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity, normalizationSummaryCheckActivity,
webhookOperationActivity, /* configFetchActivity, */ refreshSchemaActivity);
webhookOperationActivity, configFetchActivity, refreshSchemaActivity);
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
import io.airbyte.api.client.model.generated.ConnectionScheduleType;
import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.api.client.model.generated.WorkspaceRead;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.models.Job;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
Expand All @@ -50,7 +48,6 @@

@Slf4j
@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
public class ConfigFetchActivityImpl implements ConfigFetchActivity {

private static final Logger LOGGER = LoggerFactory.getLogger(ConfigFetchActivityImpl.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
Expand All @@ -21,12 +22,16 @@
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.config.SyncStats;
import io.airbyte.config.WebhookOperationSummary;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
import io.temporal.workflow.Workflow;
import java.util.Map;
import java.util.Optional;
Expand All @@ -43,7 +48,7 @@ public class SyncWorkflowImpl implements SyncWorkflow {
private static final String NORMALIZATION_SUMMARY_CHECK_TAG = "normalization_summary_check";
private static final int NORMALIZATION_SUMMARY_CHECK_CURRENT_VERSION = 1;
private static final String AUTO_DETECT_SCHEMA_TAG = "auto_detect_schema";
private static final int AUTO_DETECT_SCHEMA_VERSION = 1;
private static final int AUTO_DETECT_SCHEMA_VERSION = 2;

@TemporalActivityStub(activityOptionsBeanName = "longRunActivityOptions")
private ReplicationActivity replicationActivity;
Expand All @@ -57,11 +62,10 @@ public class SyncWorkflowImpl implements SyncWorkflow {
private NormalizationSummaryCheckActivity normalizationSummaryCheckActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private WebhookOperationActivity webhookOperationActivity;
// Temporarily disabled to address OC issue #1210
// @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
// private RefreshSchemaActivity refreshSchemaActivity;
// @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
// private ConfigFetchActivity configFetchActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private RefreshSchemaActivity refreshSchemaActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private ConfigFetchActivity configFetchActivity;

@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME)
@Override
Expand All @@ -80,30 +84,26 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
final int version = Workflow.getVersion(VERSION_LABEL, Workflow.DEFAULT_VERSION, CURRENT_VERSION);
final String taskQueue = Workflow.getInfo().getTaskQueue();

// Temporarily suppressed to address OC issue #1210
@SuppressWarnings("PMD.UnusedLocalVariable")
final int autoDetectSchemaVersion =
Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, AUTO_DETECT_SCHEMA_VERSION);

// Temporarily disabled to address OC issue #1210
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to double check, are all the issue related to this ticket solved? and is there a way to test it in dev?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the config repository is removed from all activities (config fetch activity & refresh schema activity). I'm not sure how to test this in the data plane. @jdpgrailsdev do you know?

Copy link
Contributor

@jdpgrailsdev jdpgrailsdev Jan 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can at least test it locally to verify that the application starts up in "data plane" mode. This will at least test the problem we saw in production with the missing data source. Additionally, cloud now has "data plane" acceptance tests, which performs a sync in the data plane. That should also fail if there is still a problem related to this.

// if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) {
// final Optional<UUID> sourceId = configFetchActivity.getSourceId(connectionId);
//
// if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) {
// LOGGER.info("Refreshing source schema...");
// refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId);
// }
//
// final Optional<Status> status = configFetchActivity.getStatus(connectionId);
// if (!status.isEmpty() && Status.INACTIVE == status.get()) {
// LOGGER.info("Connection is disabled. Cancelling run.");
// final StandardSyncOutput output =
// new StandardSyncOutput()
// .withStandardSyncSummary(new
// StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats()));
// return output;
// }
// }
if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we will need a new version here. Since it wasn't commented out, the version will be in the history and even old workflow will run into undeterministic exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated AUTO_DETECT_SCHEMA_VERSION to 2. is that what you mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that will work

final Optional<UUID> sourceId = configFetchActivity.getSourceId(connectionId);

if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) {
LOGGER.info("Refreshing source schema...");
refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId);
}

final Optional<ConnectionStatus> status = configFetchActivity.getStatus(connectionId);
if (!status.isEmpty() && ConnectionStatus.INACTIVE == status.get()) {
LOGGER.info("Connection is disabled. Cancelling run.");
final StandardSyncOutput output =
new StandardSyncOutput()
.withStandardSyncSummary(new StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats()));
return output;
}
}

StandardSyncOutput syncOutput =
replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput, taskQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,8 @@ void testSuccess() {
verifyNormalize(normalizationActivity, normalizationInput);
verifyDbtTransform(dbtTransformationActivity, syncInput.getResourceRequirements(),
operatorDbtInput);
// Temporarily disabled to address OC issue #1210
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
assertEquals(
replicationSuccessOutput.withNormalizationSummary(normalizationSummary).getStandardSyncSummary(),
actualOutput.getStandardSyncSummary());
Expand All @@ -247,9 +246,8 @@ void testReplicationFailure() {

assertThrows(WorkflowFailedException.class, this::execute);

// Temporarily disabled to address OC issue #1210
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyNoInteractions(persistStateActivity);
verifyNoInteractions(normalizationActivity);
Expand All @@ -271,9 +269,8 @@ void testReplicationFailedGracefully() {

final StandardSyncOutput actualOutput = execute();

// Temporarily disabled to address OC issue #1210
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyPersistState(persistStateActivity, sync, replicationFailOutput, syncInput.getCatalog());
verifyNormalize(normalizationActivity, normalizationInput);
Expand All @@ -299,9 +296,8 @@ void testNormalizationFailure() {

assertThrows(WorkflowFailedException.class, this::execute);

// Temporarily disabled to address OC issue #1210
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog());
verifyNormalize(normalizationActivity, normalizationInput);
Expand All @@ -321,9 +317,8 @@ void testCancelDuringReplication() {

assertThrows(WorkflowFailedException.class, this::execute);

// Temporarily disabled to address OC issue #1210
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyNoInteractions(persistStateActivity);
verifyNoInteractions(normalizationActivity);
Expand All @@ -348,9 +343,8 @@ void testCancelDuringNormalization() {

assertThrows(WorkflowFailedException.class, this::execute);

// Temporarily disabled to address OC issue #1210
// verifyShouldRefreshSchema(refreshSchemaActivity);
// verifyRefreshSchema(refreshSchemaActivity, sync);
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
verifyReplication(replicationActivity, syncInput);
verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog());
verifyNormalize(normalizationActivity, normalizationInput);
Expand Down Expand Up @@ -403,7 +397,6 @@ void testWebhookOperation() {
}

@Test
@Disabled("Temporarily disabled to address OC issue #1210")
void testSkipReplicationAfterRefreshSchema() {
when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(ConnectionStatus.INACTIVE));
final StandardSyncOutput output = execute();
Expand Down