Skip to content

Commit

Permalink
[pipeline-connector][mysql] Remove unnecessary serverTimeZone in Debe…
Browse files Browse the repository at this point in the history
…ziumEventDeserializationSchema (#2816)
  • Loading branch information
Jiabao-Sun authored Dec 5, 2023
1 parent 18fb0d8 commit 7d7c1af
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import com.ververica.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter;
import com.ververica.cdc.debezium.table.DebeziumChangelogMode;

import java.time.ZoneId;

/** A {@link DataSource} for mysql cdc connector. */
@Internal
public class MySqlDataSource implements DataSource {
Expand All @@ -46,9 +44,7 @@ public MySqlDataSource(MySqlSourceConfigFactory configFactory) {
public EventSourceProvider getEventSourceProvider() {
MySqlEventDeserializer deserializer =
new MySqlEventDeserializer(
DebeziumChangelogMode.ALL,
ZoneId.of(sourceConfig.getServerTimeZone()),
sourceConfig.isIncludeSchemaChanges());
DebeziumChangelogMode.ALL, sourceConfig.isIncludeSchemaChanges());

MySqlSource<Event> source =
new MySqlSource<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -63,10 +62,8 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
private transient CustomMySqlAntlrDdlParser customParser;

public MySqlEventDeserializer(
DebeziumChangelogMode changelogMode,
ZoneId serverTimeZone,
boolean includeSchemaChanges) {
super(new MySqlSchemaDataTypeInference(), changelogMode, serverTimeZone);
DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) {
super(new MySqlSchemaDataTypeInference(), changelogMode);
this.includeSchemaChanges = includeSchemaChanges;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -83,16 +81,10 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve
/** Changelog Mode to use for encoding changes in Flink internal data structure. */
protected final DebeziumChangelogMode changelogMode;

/** The session time zone in database server. */
protected final ZoneId serverTimeZone;

public DebeziumEventDeserializationSchema(
SchemaDataTypeInference schemaDataTypeInference,
DebeziumChangelogMode changelogMode,
ZoneId serverTimeZone) {
SchemaDataTypeInference schemaDataTypeInference, DebeziumChangelogMode changelogMode) {
this.schemaDataTypeInference = schemaDataTypeInference;
this.changelogMode = changelogMode;
this.serverTimeZone = serverTimeZone;
}

@Override
Expand Down Expand Up @@ -322,8 +314,11 @@ protected Object convertToTimestamp(Object dbzObj, Schema schema) {
return TimestampData.fromMillis(nano / 1000_000, (int) (nano % 1000_000));
}
}
LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
return TimestampData.fromLocalDateTime(localDateTime);
throw new IllegalArgumentException(
"Unable to convert to TIMESTAMP from unexpected value '"
+ dbzObj
+ "' of type "
+ dbzObj.getClass().getName());
}

protected Object convertToLocalTimeZoneTimestamp(Object dbzObj, Schema schema) {
Expand All @@ -334,7 +329,7 @@ protected Object convertToLocalTimeZoneTimestamp(Object dbzObj, Schema schema) {
return LocalZonedTimestampData.fromInstant(instant);
}
throw new IllegalArgumentException(
"Unable to convert to TimestampData from unexpected value '"
"Unable to convert to TIMESTAMP_LTZ from unexpected value '"
+ dbzObj
+ "' of type "
+ dbzObj.getClass().getName());
Expand Down

0 comments on commit 7d7c1af

Please sign in to comment.