Skip to content

Commit

Permalink
CDC Acceptance Test (#14370)
Browse files Browse the repository at this point in the history
* Use Debezium Postgres image for CDC tests

* Formatting

* add cdc acceptance tests

* make method public

* format

* add update destination definition version method

* wait for successful job

* add acceptance tests for incremental with old dest, and for delete + some refactoring

* format

* fix assignment of sourceDbConfig

* fix init

* remove logs

* increase timeout on deleteConnection test to prevent transient failures

* TEMPORARY add logs to help with debugging CI failures

* add -i to acceptance test for debugging

* add back destinationPsql start to see if it fixes the issue

* Revert "add -i to acceptance test for debugging"

This reverts commit 693a955.

* Revert "TEMPORARY add logs to help with debugging CI failures"

This reverts commit a30efd2.

* fix timestamp comparison

Co-authored-by: jdpgrailsdev <[email protected]>
  • Loading branch information
lmossman and jdpgrailsdev authored Jul 8, 2022
1 parent a2c194a commit f809270
Show file tree
Hide file tree
Showing 4 changed files with 429 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.airbyte.api.client.model.generated.DestinationCreate;
import io.airbyte.api.client.model.generated.DestinationDefinitionCreate;
import io.airbyte.api.client.model.generated.DestinationDefinitionRead;
import io.airbyte.api.client.model.generated.DestinationDefinitionUpdate;
import io.airbyte.api.client.model.generated.DestinationIdRequestBody;
import io.airbyte.api.client.model.generated.DestinationRead;
import io.airbyte.api.client.model.generated.JobIdRequestBody;
Expand Down Expand Up @@ -135,6 +136,8 @@ public class AirbyteAcceptanceTestHarness {
public static final String COOL_EMPLOYEES_TABLE_NAME = "cool_employees";
public static final String AWESOME_PEOPLE_TABLE_NAME = "awesome_people";

private static final String DEFAULT_POSTGRES_INIT_SQL_FILE = "postgres_init.sql";

private static boolean isKube;
private static boolean isMinikube;
private static boolean isGke;
Expand All @@ -151,6 +154,7 @@ public class AirbyteAcceptanceTestHarness {
private AirbyteTestContainer airbyteTestContainer;
private AirbyteApiClient apiClient;
private final UUID defaultWorkspaceId;
private final String postgresSqlInitFile;

private KubernetesClient kubernetesClient = null;

Expand All @@ -175,12 +179,13 @@ public void setApiClient(final AirbyteApiClient apiClient) {
this.apiClient = apiClient;
}

public AirbyteAcceptanceTestHarness(final AirbyteApiClient apiClient, final UUID defaultWorkspaceId)
public AirbyteAcceptanceTestHarness(final AirbyteApiClient apiClient, final UUID defaultWorkspaceId, final String postgresSqlInitFile)
throws URISyntaxException, IOException, InterruptedException {
// reads env vars to assign static variables
assignEnvVars();
this.apiClient = apiClient;
this.defaultWorkspaceId = defaultWorkspaceId;
this.postgresSqlInitFile = postgresSqlInitFile;

if (isGke && !isKube) {
throw new RuntimeException("KUBE Flag should also be enabled if GKE flag is enabled");
Expand Down Expand Up @@ -219,6 +224,11 @@ public AirbyteAcceptanceTestHarness(final AirbyteApiClient apiClient, final UUID
}
}

public AirbyteAcceptanceTestHarness(final AirbyteApiClient apiClient, final UUID defaultWorkspaceId)
throws URISyntaxException, IOException, InterruptedException {
this(apiClient, defaultWorkspaceId, DEFAULT_POSTGRES_INIT_SQL_FILE);
}

public void stopDbAndContainers() {
if (!isGke) {
sourcePsql.stop();
Expand All @@ -239,7 +249,7 @@ public void setup() throws SQLException, URISyntaxException, IOException {
if (isGke) {
// seed database.
final Database database = getSourceDatabase();
final Path path = Path.of(MoreResources.readResourceAsFile("postgres_init.sql").toURI());
final Path path = Path.of(MoreResources.readResourceAsFile(postgresSqlInitFile).toURI());
final StringBuilder query = new StringBuilder();
for (final String line : java.nio.file.Files.readAllLines(path, StandardCharsets.UTF_8)) {
if (line != null && !line.isEmpty()) {
Expand All @@ -248,7 +258,7 @@ public void setup() throws SQLException, URISyntaxException, IOException {
}
database.query(context -> context.execute(query.toString()));
} else {
PostgreSQLContainerHelper.runSqlScript(MountableFile.forClasspathResource("postgres_init.sql"), sourcePsql);
PostgreSQLContainerHelper.runSqlScript(MountableFile.forClasspathResource(postgresSqlInitFile), sourcePsql);

destinationPsql = new PostgreSQLContainer("postgres:13-alpine");
destinationPsql.start();
Expand Down Expand Up @@ -527,7 +537,7 @@ private List<JsonNode> retrieveDestinationRecords(final Database database, final
.collect(Collectors.toList());
}

private List<JsonNode> retrieveRawDestinationRecords(final SchemaTableNamePair pair) throws Exception {
public List<JsonNode> retrieveRawDestinationRecords(final SchemaTableNamePair pair) throws Exception {
final Database destination = getDestinationDatabase();
final Set<SchemaTableNamePair> namePairs = listAllTables(destination);

Expand Down Expand Up @@ -639,11 +649,25 @@ public UUID getPostgresSourceDefinitionId() throws ApiException {
.getSourceDefinitionId();
}

public UUID getPostgresDestinationDefinitionId() throws ApiException {
return apiClient.getDestinationDefinitionApi().listDestinationDefinitions().getDestinationDefinitions()
.stream()
.filter(destRead -> destRead.getName().equalsIgnoreCase("postgres"))
.findFirst()
.orElseThrow()
.getDestinationDefinitionId();
}

public void updateSourceDefinitionVersion(final UUID sourceDefinitionId, final String dockerImageTag) throws ApiException {
apiClient.getSourceDefinitionApi().updateSourceDefinition(new SourceDefinitionUpdate()
.sourceDefinitionId(sourceDefinitionId).dockerImageTag(dockerImageTag));
}

public void updateDestinationDefinitionVersion(final UUID destDefinitionId, final String dockerImageTag) throws ApiException {
apiClient.getDestinationDefinitionApi().updateDestinationDefinition(new DestinationDefinitionUpdate()
.destinationDefinitionId(destDefinitionId).dockerImageTag(dockerImageTag));
}

private void clearSourceDbData() throws SQLException {
final Database database = getSourceDatabase();
final Set<SchemaTableNamePair> pairs = listAllTables(database);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ public void testDeleteConnection() throws Exception {
testHarness.removeConnection(connectionId);

LOGGER.info("Waiting for connection to be deleted...");
Thread.sleep(500);
Thread.sleep(5000);

ConnectionStatus connectionStatus =
apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId)).getStatus();
Expand All @@ -553,7 +553,7 @@ public void testDeleteConnection() throws Exception {
apiClient.getConnectionApi().deleteConnection(new ConnectionIdRequestBody().connectionId(connectionId));

LOGGER.info("Waiting for connection to be deleted...");
Thread.sleep(500);
Thread.sleep(5000);

connectionStatus = apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId)).getStatus();
assertEquals(ConnectionStatus.DEPRECATED, connectionStatus);
Expand Down
Loading

0 comments on commit f809270

Please sign in to comment.