Skip to content

Commit

Permalink
[pipeline-connector][mysql] fix timestamp with timezone format (apach…
Browse files Browse the repository at this point in the history
…e#2952)

* fix ts with tz parser

* test timestamp with default value

* fix related test

* use timestamp string in test cases
  • Loading branch information
whhe authored and joyCurry30 committed Mar 22, 2024
1 parent 4e3458d commit 69490f0
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Arrays;
Expand Down Expand Up @@ -118,6 +119,7 @@ public void testMysql57TimeDataTypes() throws Throwable {
DataTypes.TIMESTAMP(0),
DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(6),
DataTypes.TIMESTAMP_LTZ(0),
DataTypes.TIMESTAMP_LTZ(0));

Object[] expectedSnapshot =
Expand All @@ -131,10 +133,11 @@ public void testMysql57TimeDataTypes() throws Throwable {
// Because Flink's BinaryWriter force write int value for TIME(6).
// See BinaryWriter#write for detail.
64822123,
TimestampData.fromMillis(1595008822000L),
TimestampData.fromMillis(1595008822123L),
TimestampData.fromMillis(1595008822123L, 456000),
LocalZonedTimestampData.fromEpochMillis(1595008822000L, 0)
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123")),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123456")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
null
};

Object[] expectedStreamRecord =
Expand All @@ -145,10 +148,11 @@ public void testMysql57TimeDataTypes() throws Throwable {
64822000,
64822123,
null,
TimestampData.fromMillis(1595008822000L),
TimestampData.fromMillis(1595008822123L),
TimestampData.fromMillis(1595008822123L, 456000),
LocalZonedTimestampData.fromEpochMillis(1595008822000L, 0)
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123")),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123456")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2000-01-01 00:00:00"))
};

testTimeDataTypes(
Expand All @@ -170,7 +174,8 @@ public void testMysql8TimeDataTypes() throws Throwable {
DataTypes.TIMESTAMP(6),
DataTypes.TIMESTAMP_LTZ(0),
DataTypes.TIMESTAMP_LTZ(3),
DataTypes.TIMESTAMP_LTZ(6));
DataTypes.TIMESTAMP_LTZ(6),
DataTypes.TIMESTAMP_LTZ(0));

Object[] expectedSnapshot =
new Object[] {
Expand All @@ -183,13 +188,13 @@ public void testMysql8TimeDataTypes() throws Throwable {
// Because Flink's BinaryWriter force write int value for TIME(6).
// See BinaryWriter#write for detail.
64822123,
TimestampData.fromMillis(1595008822000L),
TimestampData.fromMillis(1595008822123L),
TimestampData.fromMillis(1595008822123L, 456000),
LocalZonedTimestampData.fromInstant(Instant.parse("2020-07-17T18:00:22Z")),
LocalZonedTimestampData.fromInstant(Instant.parse("2020-07-17T18:00:22.123Z")),
LocalZonedTimestampData.fromInstant(
Instant.parse("2020-07-17T18:00:22.123456Z"))
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123")),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123456")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22.123")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22.123456")),
null
};

Object[] expectedStreamRecord =
Expand All @@ -200,13 +205,13 @@ public void testMysql8TimeDataTypes() throws Throwable {
64822000,
64822123,
null,
TimestampData.fromMillis(1595008822000L),
TimestampData.fromMillis(1595008822123L),
TimestampData.fromMillis(1595008822123L, 456000),
LocalZonedTimestampData.fromInstant(Instant.parse("2020-07-17T18:00:22Z")),
LocalZonedTimestampData.fromInstant(Instant.parse("2020-07-17T18:00:22.123Z")),
LocalZonedTimestampData.fromInstant(
Instant.parse("2020-07-17T18:00:22.123456Z"))
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123")),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123456")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22.123")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22.123456")),
LocalZonedTimestampData.fromInstant(toInstant("2000-01-01 00:00:00"))
};

testTimeDataTypes(
Expand Down Expand Up @@ -317,6 +322,10 @@ private void testCommonDataTypes(UniqueDatabase database) throws Exception {
assertThat(recordFields(streamRecord, COMMON_TYPES)).isEqualTo(expectedStreamRecord);
}

private Instant toInstant(String ts) {
return Timestamp.valueOf(ts).toLocalDateTime().atZone(ZoneId.of("UTC")).toInstant();
}

private void testTimeDataTypes(
UniqueDatabase database,
RowType recordType,
Expand All @@ -340,7 +349,8 @@ private void testTimeDataTypes(

try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("UPDATE time_types SET time_6_c = null WHERE id = 1;");
statement.execute(
"UPDATE time_types SET time_6_c = null, timestamp_def_c = default WHERE id = 1;");
}

List<Event> streamResults = fetchResults(iterator, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public void testMysql57AccessTimeTypesSchema() {
DataTypes.TIMESTAMP(0),
DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(6),
DataTypes.TIMESTAMP_LTZ(0),
DataTypes.TIMESTAMP_LTZ(0)
},
new String[] {
Expand All @@ -142,7 +143,8 @@ public void testMysql57AccessTimeTypesSchema() {
"datetime_c",
"datetime3_c",
"datetime6_c",
"timestamp_c"
"timestamp_c",
"timestamp_def_c"
}))
.build();
assertThat(actualSchema).isEqualTo(expectedSchema);
Expand Down Expand Up @@ -176,7 +178,8 @@ public void testMysql8AccessTimeTypesSchema() {
DataTypes.TIMESTAMP(6),
DataTypes.TIMESTAMP_LTZ(0),
DataTypes.TIMESTAMP_LTZ(3),
DataTypes.TIMESTAMP_LTZ(6)
DataTypes.TIMESTAMP_LTZ(6),
DataTypes.TIMESTAMP_LTZ(0)
},
new String[] {
"id",
Expand All @@ -190,7 +193,8 @@ public void testMysql8AccessTimeTypesSchema() {
"datetime6_c",
"timestamp_c",
"timestamp3_c",
"timestamp6_c"
"timestamp6_c",
"timestamp_def_c"
}))
.build();
assertThat(actualSchema).isEqualTo(expectedSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ CREATE TABLE time_types
datetime3_c DATETIME(3),
datetime6_c DATETIME(6),
timestamp_c TIMESTAMP NULL,
timestamp_def_c TIMESTAMP NULL DEFAULT '2000-01-01 00:00:00',
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;

Expand All @@ -119,4 +120,5 @@ VALUES (DEFAULT,
'2020-07-17 18:00:22',
'2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456',
'2020-07-17 18:00:22');
'2020-07-17 18:00:22',
NULL);
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ CREATE TABLE time_types
timestamp_c TIMESTAMP(0),
timestamp3_c TIMESTAMP(3),
timestamp6_c TIMESTAMP(6),
timestamp_def_c TIMESTAMP NULL DEFAULT '2000-01-01 00:00:00',
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;

Expand All @@ -123,4 +124,5 @@ VALUES (DEFAULT,
'2020-07-17 18:00:22.123456',
'2020-07-17 18:00:22',
'2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456');
'2020-07-17 18:00:22.123456',
NULL);
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ protected DataType inferString(Object value, Schema schema) {
if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) {
int nano =
Optional.ofNullable((String) value)
.map(Instant::parse)
.map(s -> ZonedTimestamp.FORMATTER.parse(s, Instant::from))
.map(Instant::getNano)
.orElse(0);

Expand Down

0 comments on commit 69490f0

Please sign in to comment.