Skip to content

Commit

Permalink
[cdc-connector][sqlserver][tests] Fix UT errors by correcting right o…
Browse files Browse the repository at this point in the history
…utput (#2864)

(cherry picked from commit 2c557c6)
  • Loading branch information
loserwang1024 authored and lvyanquan committed Jan 18, 2024
1 parent 5116d20 commit b124b1f
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ private List<String> testBackfillWhenWritingEvents(
env.enableCheckpointing(1000);
env.setParallelism(1);

ResolvedSchema customersSchame =
ResolvedSchema customersSchema =
new ResolvedSchema(
Arrays.asList(
physical("cid", BIGINT().notNull()),
Expand All @@ -407,7 +407,7 @@ private List<String> testBackfillWhenWritingEvents(
physical("phone_number", STRING())),
new ArrayList<>(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("cid")));
TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchame);
TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchema);
MongoDBSource source =
new MongoDBSourceBuilder()
.hosts(CONTAINER.getHostAndPort())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ private List<String> testBackfillWhenWritingEvents(
env.enableCheckpointing(1000);
env.setParallelism(1);

ResolvedSchema customersSchame =
ResolvedSchema customersSchema =
new ResolvedSchema(
Arrays.asList(
physical("cid", BIGINT().notNull()),
Expand All @@ -311,7 +311,7 @@ private List<String> testBackfillWhenWritingEvents(
physical("phone_number", STRING())),
new ArrayList<>(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("cid")));
TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchame);
TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchema);
MongoDBSource source =
new MongoDBSourceBuilder()
.hosts(CONTAINER.getHostAndPort())
Expand Down Expand Up @@ -345,11 +345,6 @@ private List<String> testBackfillWhenWritingEvents(
mongoCollection.updateOne(
Filters.eq("cid", 2000L), Updates.set("address", "Pittsburgh"));
mongoCollection.deleteOne(Filters.eq("cid", 1019L));
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};

if (hookType == USE_POST_LOWWATERMARK_HOOK) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,11 +528,6 @@ private List<String> testBackfillWhenWritingEvents(
connection.setAutoCommit(false);
connection.execute(statements);
connection.commit();
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
if (hookType == USE_PRE_HIGHWATERMARK_HOOK) {
hooks.setPreHighWatermarkAction(snapshotPhaseHook);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ private List<String> testBackfillWhenWritingEvents(
env.enableCheckpointing(200L);
env.setParallelism(1);

ResolvedSchema customersSchame =
ResolvedSchema customersSchema =
new ResolvedSchema(
Arrays.asList(
physical("ID", BIGINT().notNull()),
Expand All @@ -285,7 +285,7 @@ private List<String> testBackfillWhenWritingEvents(
new ArrayList<>(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("ID")));
TestTable customerTable =
new TestTable(ORACLE_DATABASE, ORACLE_SCHEMA, "CUSTOMERS", customersSchame);
new TestTable(ORACLE_DATABASE, ORACLE_SCHEMA, "CUSTOMERS", customersSchema);
String tableId = customerTable.getTableId();

OracleSourceBuilder.OracleIncrementalSource source =
Expand Down Expand Up @@ -326,9 +326,6 @@ private List<String> testBackfillWhenWritingEvents(
try (OracleConnection oracleConnection =
OracleConnectionUtils.createOracleConnection(configuration)) {
oracleConnection.execute(statements);
Thread.sleep(500L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ private List<String> testBackfillWhenWritingEvents(
env.enableCheckpointing(1000);
env.setParallelism(1);

ResolvedSchema customersSchame =
ResolvedSchema customersSchema =
new ResolvedSchema(
Arrays.asList(
physical("id", BIGINT().notNull()),
Expand All @@ -492,7 +492,7 @@ private List<String> testBackfillWhenWritingEvents(
new ArrayList<>(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));
TestTable customerTable =
new TestTable(customDatabase, "customer", "customers", customersSchame);
new TestTable(customDatabase, "customer", "customers", customersSchema);
String tableId = customerTable.getTableId();

PostgresSourceBuilder.PostgresIncrementalSource source =
Expand Down Expand Up @@ -525,9 +525,6 @@ private List<String> testBackfillWhenWritingEvents(
try (PostgresConnection postgresConnection = dialect.openJdbcConnection()) {
postgresConnection.execute(statements);
postgresConnection.commit();
Thread.sleep(500L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws Exc
@Test
public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {

List<String> records = testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK);
List<String> records = testBackfillWhenWritingEvents(false, 25, USE_PRE_HIGHWATERMARK_HOOK);

List<String> expectedRecords =
Arrays.asList(
Expand All @@ -146,17 +146,23 @@ public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[2000, user_21, Pittsburgh, 123567891234]",
"+I[15213, user_15213, Shanghai, 123567891234]");
// when enable backfill, the wal log between [snapshot, high_watermark) will be
// applied as snapshot image
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]",
"+I[15213, user_15213, Shanghai, 123567891234]",
"-U[2000, user_21, Shanghai, 123567891234]",
"+U[2000, user_21, Pittsburgh, 123567891234]",
"-D[1019, user_20, Shanghai, 123567891234]");
// In sqlserver database, because the capture process extracts change data from the
// transaction log, there is a built-in latency between the time that a change is committed
// to a source table and the time that the change appears within its associated change
// table.Then in streaming phase, the log which should be ignored will be read again.
assertEqualsInAnyOrder(expectedRecords, records);
}

@Test
public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {

List<String> records = testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK);
List<String> records = testBackfillWhenWritingEvents(false, 25, USE_POST_LOWWATERMARK_HOOK);

List<String> expectedRecords =
Arrays.asList(
Expand All @@ -180,9 +186,15 @@ public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[2000, user_21, Pittsburgh, 123567891234]",
"+I[15213, user_15213, Shanghai, 123567891234]");
// when enable backfill, the wal log between [low_watermark, snapshot) will be applied
// as snapshot image
"+I[15213, user_15213, Shanghai, 123567891234]",
"+I[15213, user_15213, Shanghai, 123567891234]",
"-U[2000, user_21, Shanghai, 123567891234]",
"+U[2000, user_21, Pittsburgh, 123567891234]",
"-D[1019, user_20, Shanghai, 123567891234]");
// In sqlserver database, because the capture process extracts change data from the
// transaction log, there is a built-in latency between the time that a change is committed
// to a source table and the time that the change appears within its associated change
// table.Then in streaming phase, the log which should be ignored will be read again.
assertEqualsInAnyOrder(expectedRecords, records);
}

Expand Down Expand Up @@ -272,7 +284,7 @@ private List<String> testBackfillWhenWritingEvents(
env.enableCheckpointing(1000);
env.setParallelism(1);

ResolvedSchema customersSchame =
ResolvedSchema customersSchema =
new ResolvedSchema(
Arrays.asList(
physical("id", BIGINT().notNull()),
Expand All @@ -281,7 +293,7 @@ private List<String> testBackfillWhenWritingEvents(
physical("phone_number", STRING())),
new ArrayList<>(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));
TestTable customerTable = new TestTable(databaseName, "dbo", "customers", customersSchame);
TestTable customerTable = new TestTable(databaseName, "dbo", "customers", customersSchema);
String tableId = customerTable.getTableId();

SqlServerSourceBuilder.SqlServerIncrementalSource source =
Expand Down Expand Up @@ -310,14 +322,10 @@ private List<String> testBackfillWhenWritingEvents(
(sourceConfig, split) -> {
SqlServerDialect dialect =
new SqlServerDialect((SqlServerSourceConfig) sourceConfig);
JdbcConnection sqlServerConnection =
dialect.openJdbcConnection((JdbcSourceConfig) sourceConfig);
sqlServerConnection.execute(statements);
sqlServerConnection.commit();
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
try (JdbcConnection sqlServerConnection =
dialect.openJdbcConnection((JdbcSourceConfig) sourceConfig)) {
sqlServerConnection.execute(statements);
sqlServerConnection.commit();
}
};

Expand Down

0 comments on commit b124b1f

Please sign in to comment.