Skip to content

Commit

Permalink
update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alovew committed Jun 29, 2022
1 parent 60b8a72 commit e923d14
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
import io.airbyte.metrics.lib.MetricQueries;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.time.OffsetDateTime;
Expand Down Expand Up @@ -995,4 +995,5 @@ public ConfiguredAirbyteCatalog getConfiguredCatalogForConnection(final UUID con
final StandardSync standardSync = getStandardSync(connectionId);
return standardSync.getCatalog();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@ public static StreamDescriptor streamDescriptorToApi(final io.airbyte.protocol.m
return new StreamDescriptor().name(protocolStreamDescriptor.getName()).namespace(protocolStreamDescriptor.getNamespace());
}

public static io.airbyte.protocol.models.StreamDescriptor streamDescriptorToProtocol(final StreamDescriptor apiStreamDescriptor) {
return new io.airbyte.protocol.models.StreamDescriptor().withName(apiStreamDescriptor.getName())
.withNamespace(apiStreamDescriptor.getNamespace());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public ConnectionRead getConnection(final UUID connectionId)
return buildConnectionRead(connectionId);
}

public static CatalogDiff getDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog) {
public CatalogDiff getDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog) {
return new CatalogDiff().transforms(CatalogHelpers.getCatalogDiff(
CatalogHelpers.configuredCatalogToCatalog(CatalogConverter.toProtocolKeepAllStreams(oldCatalog)),
CatalogHelpers.configuredCatalogToCatalog(CatalogConverter.toProtocolKeepAllStreams(newCatalog)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import io.airbyte.api.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.api.model.generated.SourceIdRequestBody;
import io.airbyte.api.model.generated.SourceRead;
import io.airbyte.api.model.generated.StreamDescriptor;
import io.airbyte.api.model.generated.StreamTransform;
import io.airbyte.api.model.generated.WebBackendConnectionCreate;
import io.airbyte.api.model.generated.WebBackendConnectionRead;
import io.airbyte.api.model.generated.WebBackendConnectionReadList;
Expand All @@ -52,8 +54,8 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.scheduler.client.EventRunner;
import io.airbyte.server.converters.ProtocolConverters;
import io.airbyte.server.handlers.helpers.CatalogConverter;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult;
Expand Down Expand Up @@ -256,7 +258,7 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
* but was present at time of configuration will appear in the diff as an added stream which is
* confusing. We need to figure out why source_catalog_id is not always populated in the db.
*/
diff = ConnectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(configuredCatalog), refreshedCatalog.get().getCatalog());
diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(configuredCatalog), refreshedCatalog.get().getCatalog());
} else if (catalogUsedToMakeConfiguredCatalog.isPresent()) {
// reconstructs a full picture of the full schema at the time the catalog was configured.
syncCatalog = updateSchemaWithDiscovery(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get());
Expand Down Expand Up @@ -365,13 +367,24 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
connectionRead = connectionsHandler.updateConnection(connectionUpdate);

if (needReset) {
final ConfiguredAirbyteCatalog existingConfiguredCatalog =
configRepository.getConfiguredCatalogForConnection(webBackendConnectionUpdate.getConnectionId());
final io.airbyte.protocol.models.AirbyteCatalog existingCatalog = CatalogHelpers.configuredCatalogToCatalog(existingConfiguredCatalog);
final AirbyteCatalog apiExistingCatalog = CatalogConverter.toApi(existingCatalog);
final AirbyteCatalog newAirbyteCatalog = webBackendConnectionUpdate.getSyncCatalog();
final CatalogDiff catalogDiff = ConnectionsHandler.getDiff(apiExistingCatalog, newAirbyteCatalog);
final List<StreamDescriptor> streamsToReset = getStreamsToReset(catalogDiff);
final UUID connectionId = webBackendConnectionUpdate.getConnectionId();
final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody().connectionId(connectionId);
final ConnectionStateType stateType = getStateType(connectionIdRequestBody);
final List<io.airbyte.protocol.models.StreamDescriptor> streamsToReset;

if (stateType == ConnectionStateType.LEGACY || stateType == ConnectionStateType.NOT_SET) {
streamsToReset = configRepository.getAllStreamsForConnection(connectionId);
} else {
final ConfiguredAirbyteCatalog existingConfiguredCatalog =
configRepository.getConfiguredCatalogForConnection(connectionId);
final io.airbyte.protocol.models.AirbyteCatalog existingCatalog = CatalogHelpers.configuredCatalogToCatalog(existingConfiguredCatalog);
final AirbyteCatalog apiExistingCatalog = CatalogConverter.toApi(existingCatalog);
final AirbyteCatalog newAirbyteCatalog = webBackendConnectionUpdate.getSyncCatalog();
final CatalogDiff catalogDiff = connectionsHandler.getDiff(apiExistingCatalog, newAirbyteCatalog);
final List<StreamDescriptor> apiStreamsToReset = getStreamsToReset(catalogDiff);
streamsToReset = apiStreamsToReset.stream().map(streamDescriptor -> ProtocolConverters.streamDescriptorToProtocol(streamDescriptor)).toList();
}

ManualOperationResult manualOperationResult = eventRunner.synchronousResetConnection(
webBackendConnectionUpdate.getConnectionId(),
streamsToReset);
Expand Down Expand Up @@ -497,8 +510,9 @@ protected static ConnectionSearch toConnectionSearch(final WebBackendConnectionS
.status(webBackendConnectionSearch.getStatus());
}

public static List<StreamDescriptor> getStreamsToReset(final CatalogDiff catalogDiff) {
return catalogDiff.getTransforms().stream().map(sT -> sT.getStreamDescriptor()).toList();
@VisibleForTesting
protected static List<StreamDescriptor> getStreamsToReset(final CatalogDiff catalogDiff) {
return catalogDiff.getTransforms().stream().map(StreamTransform::getStreamDescriptor).toList();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import io.airbyte.api.model.generated.ConnectionSchedule;
import io.airbyte.api.model.generated.ConnectionSchedule.TimeUnitEnum;
import io.airbyte.api.model.generated.ConnectionSearch;
import io.airbyte.api.model.generated.ConnectionState;
import io.airbyte.api.model.generated.ConnectionStateType;
import io.airbyte.api.model.generated.ConnectionStatus;
import io.airbyte.api.model.generated.ConnectionUpdate;
import io.airbyte.api.model.generated.DestinationIdRequestBody;
Expand Down Expand Up @@ -102,6 +104,7 @@ class WebBackendConnectionsHandlerTest {
private ConnectionsHandler connectionsHandler;
private OperationsHandler operationsHandler;
private SchedulerHandler schedulerHandler;
private StateHandler stateHandler;
private WebBackendConnectionsHandler wbHandler;

private SourceRead sourceRead;
Expand All @@ -115,7 +118,7 @@ class WebBackendConnectionsHandlerTest {
@BeforeEach
public void setup() throws IOException, JsonValidationException, ConfigNotFoundException {
connectionsHandler = mock(ConnectionsHandler.class);
final StateHandler stateHandler = mock(StateHandler.class);
stateHandler = mock(StateHandler.class);
operationsHandler = mock(OperationsHandler.class);
final SourceHandler sourceHandler = mock(SourceHandler.class);
final DestinationHandler destinationHandler = mock(DestinationHandler.class);
Expand Down Expand Up @@ -236,7 +239,7 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE
.isSyncing(expected.getIsSyncing())
.catalogDiff(new CatalogDiff().transforms(List.of(
new StreamTransform().transformType(TransformTypeEnum.ADD_STREAM)
.streamDescriptor(new StreamDescriptor().name("users-data1"))
.streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name("users-data1"))
.updateStream(null))))
.resourceRequirements(new ResourceRequirements()
.cpuRequest(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getCpuRequest())
Expand Down Expand Up @@ -364,6 +367,7 @@ public WebBackendConnectionRead testWebBackendGetConnection(final boolean withCa

@Test
public void testWebBackendGetConnectionWithDiscovery() throws ConfigNotFoundException, IOException, JsonValidationException {
when(connectionsHandler.getDiff(any(), any())).thenReturn(expectedWithNewSchema.getCatalogDiff());
final WebBackendConnectionRead result = testWebBackendGetConnection(true);
verify(schedulerHandler).discoverSchemaForSourceFromSourceId(any());
assertEquals(expectedWithNewSchema, result);
Expand Down Expand Up @@ -586,7 +590,7 @@ void testUpdateConnectionWithOperations() throws JsonValidationException, Config
}

@Test
void testUpdateConnectionWithUpdatedSchema() throws JsonValidationException, ConfigNotFoundException, IOException {
void testUpdateConnectionWithUpdatedSchemaLegacy() throws JsonValidationException, ConfigNotFoundException, IOException {
final WebBackendConnectionUpdate updateBody = new WebBackendConnectionUpdate()
.namespaceDefinition(expected.getNamespaceDefinition())
.namespaceFormat(expected.getNamespaceFormat())
Expand All @@ -596,7 +600,9 @@ void testUpdateConnectionWithUpdatedSchema() throws JsonValidationException, Con
.status(expected.getStatus())
.syncCatalog(expectedWithNewSchema.getSyncCatalog())
.withRefreshedCatalog(true);
final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody().connectionId(expected.getConnectionId());

when(stateHandler.getState(connectionIdRequestBody)).thenReturn(new ConnectionState().stateType(ConnectionStateType.LEGACY));
when(operationsHandler.listOperationsForConnection(any())).thenReturn(operationReadList);
when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn(
new ConnectionRead().connectionId(expected.getConnectionId()));
Expand Down Expand Up @@ -634,6 +640,75 @@ void testUpdateConnectionWithUpdatedSchema() throws JsonValidationException, Con
orderVerifier.verify(eventRunner, times(1)).startNewManualSync(connectionId.getConnectionId());
}

@Test
void testUpdateConnectionWithUpdatedSchemaPerStream() throws JsonValidationException, ConfigNotFoundException, IOException {
final WebBackendConnectionUpdate updateBody = new WebBackendConnectionUpdate()
.namespaceDefinition(expected.getNamespaceDefinition())
.namespaceFormat(expected.getNamespaceFormat())
.prefix(expected.getPrefix())
.connectionId(expected.getConnectionId())
.schedule(expected.getSchedule())
.status(expected.getStatus())
.syncCatalog(expectedWithNewSchema.getSyncCatalog())
.withRefreshedCatalog(true);

// state is per-stream
final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody().connectionId(expected.getConnectionId());
when(stateHandler.getState(connectionIdRequestBody)).thenReturn(new ConnectionState().stateType(ConnectionStateType.STREAM));
when(configRepository.getConfiguredCatalogForConnection(expected.getConnectionId()))
.thenReturn(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog());

final StreamDescriptor streamDescriptorAdd = new StreamDescriptor().name("addStream");
final StreamDescriptor streamDescriptorRemove = new StreamDescriptor().name("removeStream");
final StreamDescriptor streamDescriptorUpdate = new StreamDescriptor().name("updateStream");

final StreamTransform streamTransformAdd =
new StreamTransform().streamDescriptor(streamDescriptorAdd).transformType(TransformTypeEnum.ADD_STREAM);
final StreamTransform streamTransformRemove =
new StreamTransform().streamDescriptor(streamDescriptorRemove).transformType(TransformTypeEnum.REMOVE_STREAM);
final StreamTransform streamTransformUpdate =
new StreamTransform().streamDescriptor(streamDescriptorUpdate).transformType(TransformTypeEnum.UPDATE_STREAM);

final CatalogDiff catalogDiff = new CatalogDiff().transforms(List.of(streamTransformAdd, streamTransformRemove, streamTransformUpdate));
when(connectionsHandler.getDiff(any(), any())).thenReturn(catalogDiff);

when(operationsHandler.listOperationsForConnection(any())).thenReturn(operationReadList);
when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn(
new ConnectionRead().connectionId(expected.getConnectionId()));
final ConnectionRead connectionRead = new ConnectionRead()
.connectionId(expected.getConnectionId())
.sourceId(expected.getSourceId())
.destinationId(expected.getDestinationId())
.name(expected.getName())
.namespaceDefinition(expected.getNamespaceDefinition())
.namespaceFormat(expected.getNamespaceFormat())
.prefix(expected.getPrefix())
.syncCatalog(expectedWithNewSchema.getSyncCatalog())
.status(expected.getStatus())
.schedule(expected.getSchedule());
when(connectionsHandler.updateConnection(any())).thenReturn(connectionRead);
when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn(connectionRead);

final ManualOperationResult successfulResult = ManualOperationResult.builder().jobId(Optional.empty()).failingReason(Optional.empty()).build();
when(eventRunner.synchronousResetConnection(any(), any())).thenReturn(successfulResult);
when(eventRunner.startNewManualSync(any())).thenReturn(successfulResult);

final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody);

assertEquals(expectedWithNewSchema.getSyncCatalog(), result.getSyncCatalog());

final ConnectionIdRequestBody connectionId = new ConnectionIdRequestBody().connectionId(result.getConnectionId());
verify(schedulerHandler, times(0)).resetConnection(connectionId);
verify(schedulerHandler, times(0)).syncConnection(connectionId);
verify(connectionsHandler, times(1)).updateConnection(any());
final InOrder orderVerifier = inOrder(eventRunner);
orderVerifier.verify(eventRunner, times(1)).synchronousResetConnection(connectionId.getConnectionId(),
List.of(new io.airbyte.protocol.models.StreamDescriptor().withName("addStream"),
new io.airbyte.protocol.models.StreamDescriptor().withName("removeStream"),
new io.airbyte.protocol.models.StreamDescriptor().withName("updateStream")));
orderVerifier.verify(eventRunner, times(1)).startNewManualSync(connectionId.getConnectionId());
}

@Test
public void testUpdateSchemaWithDiscoveryFromEmpty() {
final AirbyteCatalog original = new AirbyteCatalog().streams(List.of());
Expand Down Expand Up @@ -854,13 +929,13 @@ public void testGetStreamsToReset() {
final List<StreamDescriptor> resultList = WebBackendConnectionsHandler.getStreamsToReset(catalogDiff);
assertTrue(
resultList.stream().anyMatch(
streamDescriptor -> streamDescriptor == new StreamDescriptor().name("added_stream")));
streamDescriptor -> streamDescriptor.getName() == "added_stream"));
assertTrue(
resultList.stream().anyMatch(
streamDescriptor -> streamDescriptor == new StreamDescriptor().name("updated_stream")));
streamDescriptor -> streamDescriptor.getName() == "removed_stream"));
assertTrue(
resultList.stream().anyMatch(
streamDescriptor -> streamDescriptor == new StreamDescriptor().name("removed_stream")));
streamDescriptor -> streamDescriptor.getName() == "updated_stream"));
}

}

0 comments on commit e923d14

Please sign in to comment.