Skip to content

Commit

Permalink
[test][postgres] Close postgres containers after tests and fix contai…
Browse files Browse the repository at this point in the history
…ner reuse bug (apache#2394)
  • Loading branch information
loserwang1024 authored Aug 17, 2023
1 parent f7df47e commit 90ed963
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
Expand Down Expand Up @@ -97,6 +98,13 @@ public static void startAll() {
LOG.info("Containers are started.");
}

@AfterClass
public static void stopAll() {
LOG.info("Stopping containers...");
POSTGRES_CONTAINER_OLD.stop();
LOG.info("Containers are stopped.");
}

@Before
public void before() {
initializePostgresTable(POSTGRES_CONTAINER_OLD, "inventory");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;

import com.ververica.cdc.connectors.postgres.source.PostgresConnectionPoolFactory;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConfiguration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -83,17 +85,28 @@ public static void startContainers() {
LOG.info("Containers are started.");
}

@AfterClass
public static void stopContainers() {
LOG.info("Stopping containers...");
POSTGRES_CONTAINER.stop();
LOG.info("Containers are stopped.");
}

protected Connection getJdbcConnection(PostgreSQLContainer container) throws SQLException {
return DriverManager.getConnection(
container.getJdbcUrl(), container.getUsername(), container.getPassword());
}

public static Connection getJdbcConnection(PostgreSQLContainer container, String databaseName)
throws SQLException {
String jdbcUrl =
String.format(
PostgresConnectionPoolFactory.JDBC_URL_PATTERN,
container.getHost(),
container.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT),
databaseName);
return DriverManager.getConnection(
container.withDatabaseName(databaseName).getJdbcUrl(),
container.getUsername(),
container.getPassword());
jdbcUrl, container.getUsername(), container.getPassword());
}

public static String getSlotName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,25 +535,58 @@ public void testMetadataColumns() throws Throwable {

// waiting for change events finished.
waitForSinkSize("sink", 16);
String databaseName = POSTGRES_CONTAINER.getDatabaseName();

List<String> expected =
Arrays.asList(
"+I(postgres,inventory,products,101,scooter,Small 2-wheel scooter,3.140)",
"+I(postgres,inventory,products,102,car battery,12V car battery,8.100)",
"+I(postgres,inventory,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)",
"+I(postgres,inventory,products,104,hammer,12oz carpenter's hammer,0.750)",
"+I(postgres,inventory,products,105,hammer,14oz carpenter's hammer,0.875)",
"+I(postgres,inventory,products,106,hammer,16oz carpenter's hammer,1.000)",
"+I(postgres,inventory,products,107,rocks,box of assorted rocks,5.300)",
"+I(postgres,inventory,products,108,jacket,water resistent black wind breaker,0.100)",
"+I(postgres,inventory,products,109,spare tire,24 inch spare tire,22.200)",
"+I(postgres,inventory,products,110,jacket,water resistent white wind breaker,0.200)",
"+I(postgres,inventory,products,111,scooter,Big 2-wheel scooter ,5.180)",
"+U(postgres,inventory,products,106,hammer,18oz carpenter hammer,1.000)",
"+U(postgres,inventory,products,107,rocks,box of assorted rocks,5.100)",
"+U(postgres,inventory,products,110,jacket,new water resistent white wind breaker,0.500)",
"+U(postgres,inventory,products,111,scooter,Big 2-wheel scooter ,5.170)",
"-D(postgres,inventory,products,111,scooter,Big 2-wheel scooter ,5.170)");
"+I("
+ databaseName
+ ",inventory,products,101,scooter,Small 2-wheel scooter,3.140)",
"+I("
+ databaseName
+ ",inventory,products,102,car battery,12V car battery,8.100)",
"+I("
+ databaseName
+ ",inventory,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)",
"+I("
+ databaseName
+ ",inventory,products,104,hammer,12oz carpenter's hammer,0.750)",
"+I("
+ databaseName
+ ",inventory,products,105,hammer,14oz carpenter's hammer,0.875)",
"+I("
+ databaseName
+ ",inventory,products,106,hammer,16oz carpenter's hammer,1.000)",
"+I("
+ databaseName
+ ",inventory,products,107,rocks,box of assorted rocks,5.300)",
"+I("
+ databaseName
+ ",inventory,products,108,jacket,water resistent black wind breaker,0.100)",
"+I("
+ databaseName
+ ",inventory,products,109,spare tire,24 inch spare tire,22.200)",
"+I("
+ databaseName
+ ",inventory,products,110,jacket,water resistent white wind breaker,0.200)",
"+I("
+ databaseName
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.180)",
"+U("
+ databaseName
+ ",inventory,products,106,hammer,18oz carpenter hammer,1.000)",
"+U("
+ databaseName
+ ",inventory,products,107,rocks,box of assorted rocks,5.100)",
"+U("
+ databaseName
+ ",inventory,products,110,jacket,new water resistent white wind breaker,0.500)",
"+U("
+ databaseName
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)",
"-D("
+ databaseName
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
Collections.sort(actual);
Collections.sort(expected);
Expand Down

0 comments on commit 90ed963

Please sign in to comment.