Skip to content

Commit

Permalink
[sqlserver] Add table filter to speed up SqlServerSchema reading
Browse files Browse the repository at this point in the history
  • Loading branch information
GOODBOY008 committed Aug 5, 2023
1 parent 61b9a2a commit 4c9fdeb
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.ververica.cdc.connectors.base.options.StartupOptions;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.relational.RelationalTableFilters;

import java.time.Duration;
import java.util.List;
Expand Down Expand Up @@ -85,12 +84,4 @@ public SqlServerSourceConfig(
public SqlServerConnectorConfig getDbzConnectorConfig() {
return new SqlServerConnectorConfig(getDbzConfiguration());
}

public Configuration getOriginDbzConnectorConfig() {
return super.getDbzConfiguration();
}

public RelationalTableFilters getTableFilters() {
return getDbzConnectorConfig().getTableFilters();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnector;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;

Expand All @@ -34,17 +32,6 @@ public class SqlServerSourceConfigFactory extends JdbcSourceConfigFactory {

private static final String DATABASE_SERVER_NAME = "sqlserver_transaction_log_source";
private static final String DRIVER_ClASS_NAME = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
private List<String> schemaList;

/**
* An optional list of regular expressions that match schema names to be monitored; any schema
* name not included in the whitelist will be excluded from monitoring. By default, all
* non-system schemas will be monitored.
*/
public JdbcSourceConfigFactory schemaList(String... schemaList) {
this.schemaList = Arrays.asList(schemaList);
return this;
}

@Override
public SqlServerSourceConfig create(int subtask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,10 @@
public class SqlServerDialect implements JdbcDataSourceDialect {

private static final long serialVersionUID = 1L;
private final SqlServerSourceConfigFactory configFactory;
private final SqlServerSourceConfig sourceConfig;
private transient SqlServerSchema sqlserverSchema;

public SqlServerDialect(SqlServerSourceConfigFactory configFactory) {
this.configFactory = configFactory;
this.sourceConfig = configFactory.create(0);
}

Expand All @@ -75,7 +73,6 @@ public Offset displayCurrentOffset(JdbcSourceConfig sourceConfig) {

@Override
public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) {
// todo: need to check the case sensitive of the database
return true;
}

Expand All @@ -100,7 +97,7 @@ public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) {
return SqlServerConnectionUtils.listTables(
jdbcConnection,
sqlserverSourceConfig.getTableFilters(),
sqlserverSourceConfig.getDbzConnectorConfig().getTableFilters(),
sqlserverSourceConfig.getDatabaseList());
} catch (SQLException e) {
throw new FlinkRuntimeException("Error to discover tables: " + e.getMessage(), e);
Expand Down Expand Up @@ -131,7 +128,10 @@ public TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
if (sqlserverSchema == null) {
sqlserverSchema = new SqlServerSchema();
}
return sqlserverSchema.getTableSchema(jdbc, tableId);
return sqlserverSchema.getTableSchema(
jdbc,
tableId,
sourceConfig.getDbzConnectorConfig().getTableFilters().dataCollectionFilter());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@

import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/** A component used to get schema by table path. */
Expand All @@ -42,28 +40,28 @@ public SqlServerSchema() {
this.schemasByTableId = new ConcurrentHashMap<>();
}

public TableChange getTableSchema(JdbcConnection jdbc, TableId tableId) {
public TableChange getTableSchema(
JdbcConnection jdbc, TableId tableId, Tables.TableFilter tableFilters) {
// read schema from cache first
TableChange schema = schemasByTableId.get(tableId);
if (schema == null) {
schema = readTableSchema(jdbc, tableId);
schema = readTableSchema(jdbc, tableId, tableFilters);
schemasByTableId.put(tableId, schema);
}
return schema;
}

private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
private TableChange readTableSchema(
JdbcConnection jdbc, TableId tableId, Tables.TableFilter tableFilters) {
SqlServerConnection sqlServerConnection = (SqlServerConnection) jdbc;
Set<TableId> tableIdSet = new HashSet<>();
tableIdSet.add(tableId);

final Map<TableId, TableChange> tableChangeMap = new HashMap<>();
Tables tables = new Tables();
tables.overwriteTable(tables.editOrCreateTable(tableId).create());

try {
sqlServerConnection.readSchema(
tables, tableId.catalog(), tableId.schema(), null, null, false);
tables, tableId.catalog(), tableId.schema(), tableFilters, null, false);
Table table = tables.forTable(tableId);
TableChange tableChange = new TableChange(TableChanges.TableChangeType.CREATE, table);
tableChangeMap.put(tableId, tableChange);
Expand Down

0 comments on commit 4c9fdeb

Please sign in to comment.