Skip to content

Commit

Permalink
Remove config repo dependency for getStatus (#21033)
Browse files Browse the repository at this point in the history
  • Loading branch information
alovew authored Jan 4, 2023
1 parent 40b4ad1 commit c6715bd
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

package io.airbyte.workers.temporal.scheduling.activities;

import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.activity.ActivityInterface;
Expand All @@ -25,7 +25,7 @@ public interface ConfigFetchActivity {
Optional<UUID> getSourceId(UUID connectionId);

@ActivityMethod
Optional<Status> getStatus(UUID connectionId);
Optional<ConnectionStatus> getStatus(UUID connectionId);

@Data
@NoArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionRead;
import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.config.Cron;
Expand Down Expand Up @@ -248,11 +249,13 @@ public Optional<UUID> getSourceId(final UUID connectionId) {
}

@Override
public Optional<Status> getStatus(final UUID connectionId) {
public Optional<ConnectionStatus> getStatus(final UUID connectionId) {
try {
final StandardSync standardSync = getStandardSync(connectionId);
return Optional.ofNullable(standardSync.getStatus());
} catch (final JsonValidationException | ConfigNotFoundException | IOException e) {
final io.airbyte.api.client.model.generated.ConnectionIdRequestBody requestBody =
new io.airbyte.api.client.model.generated.ConnectionIdRequestBody().connectionId(connectionId);
final ConnectionRead connectionRead = connectionApi.getConnection(requestBody);
return Optional.ofNullable(connectionRead.getStatus());
} catch (ApiException e) {
log.info("Encountered an error fetching the connection's status: ", e);
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.temporal.TemporalUtils;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
Expand All @@ -26,7 +27,6 @@
import io.airbyte.config.OperatorWebhookInput;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
Expand Down Expand Up @@ -159,7 +159,7 @@ void setUp() {

when(configFetchActivity.getSourceId(sync.getConnectionId())).thenReturn(Optional.of(SOURCE_ID));
when(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)).thenReturn(true);
when(configFetchActivity.getStatus(sync.getConnectionId())).thenReturn(Optional.of(Status.ACTIVE));
when(configFetchActivity.getStatus(sync.getConnectionId())).thenReturn(Optional.of(ConnectionStatus.ACTIVE));

longActivityOptions = ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofDays(3))
Expand Down Expand Up @@ -405,7 +405,7 @@ void testWebhookOperation() {
@Test
@Disabled("Temporarily disabled to address OC issue #1210")
void testSkipReplicationAfterRefreshSchema() {
when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(Status.INACTIVE));
when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(ConnectionStatus.INACTIVE));
final StandardSyncOutput output = execute();
verifyShouldRefreshSchema(refreshSchemaActivity);
verifyRefreshSchema(refreshSchemaActivity, sync);
Expand Down

0 comments on commit c6715bd

Please sign in to comment.