Skip to content

Commit

Permalink
[FLINK-35056][cdc-connector/sqlserver] Fix scale mapping from SQL Ser…
Browse files Browse the repository at this point in the history
…ver TIMESTAMP to Flink SQL timestamp

This closes #3561.
  • Loading branch information
morozov authored Aug 22, 2024
1 parent 3837887 commit 52f2019
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ private static DataType convertFromColumn(Column column) {
return DataTypes.DATE();
case Types.TIMESTAMP:
case Types.TIMESTAMP_WITH_TIMEZONE:
return column.length() >= 0
? DataTypes.TIMESTAMP(column.length())
return column.scale().isPresent()
? DataTypes.TIMESTAMP(column.scale().get())
: DataTypes.TIMESTAMP();
case Types.BOOLEAN:
return DataTypes.BOOLEAN();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.cdc.connectors.sqlserver.testutils.RecordsFormatter;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
Expand All @@ -50,6 +51,8 @@
import java.util.stream.Collectors;

import static org.apache.flink.cdc.connectors.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.testcontainers.containers.MSSQLServerContainer.MS_SQL_SERVER_PORT;
Expand Down Expand Up @@ -189,6 +192,29 @@ public void testInsertDataInSnapshotScan() throws Exception {
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}

@Test
public void testDateTimePrimaryKey() throws Exception {
String databaseName = "pk";
String tableName = "dbo.dt_pk";

initializeSqlServerTable(databaseName);

SqlServerSourceConfigFactory sourceConfigFactory =
getConfigFactory(databaseName, new String[] {tableName}, 8096);
SqlServerSourceConfig sourceConfig = sourceConfigFactory.create(0);
SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfig);

List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, sqlServerDialect);
assertFalse(snapshotSplits.isEmpty());

RowType expectedType =
(RowType)
DataTypes.ROW(DataTypes.FIELD("dt", DataTypes.TIMESTAMP(3).notNull()))
.getLogicalType();

snapshotSplits.forEach(s -> assertEquals(expectedType, s.getSplitKeyType()));
}

@Test
public void testDeleteDataInSnapshotScan() throws Exception {
String databaseName = "customer";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

CREATE DATABASE pk;

USE pk;
EXEC sys.sp_cdc_enable_db;

CREATE TABLE dt_pk (
dt datetime NOT NULL PRIMARY KEY,
val INT
);

EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'dt_pk', @role_name = NULL, @supports_net_changes = 0;

0 comments on commit 52f2019

Please sign in to comment.