Skip to content

Commit

Permalink
[Improve][Jdbc] Skip all index when auto create table to improve perf…
Browse files Browse the repository at this point in the history
…ormance of write (apache#7288)
  • Loading branch information
dailai authored Aug 9, 2024
1 parent 16950a6 commit dc3c239
Show file tree
Hide file tree
Showing 46 changed files with 1,089 additions and 84 deletions.
7 changes: 7 additions & 0 deletions docs/en/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
| custom_sql | String | No | - |
| enable_upsert | Boolean | No | true |
| use_copy_statement | Boolean | No | false |
| create_index | Boolean | No | true |

### driver [string]

Expand Down Expand Up @@ -205,6 +206,12 @@ Use `COPY ${table} FROM STDIN` statement to import data. Only drivers with `getC

NOTICE: `MAP`, `ARRAY`, `ROW` types are not supported.

### create_index [boolean]

Create the index(contains primary key and any other indexes) or not when auto-create table. You can use this option to improve the performance of jdbc writes when migrating large tables.

Notice: Note that this will sacrifice read performance, so you'll need to manually create indexes after the table migration to improve read performance

## tips

In the case of is_exactly_once = "true", Xa transactions are used. This requires database support, and some databases require some setup :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ protected void dropTable() {
catalog.dropTable(tablePath, true);
}

protected void createTable() {
protected void createTablePreCheck() {
if (!catalog.databaseExists(tablePath.getDatabaseName())) {
try {
log.info(
Expand All @@ -175,6 +175,10 @@ protected void createTable() {
} catch (UnsupportedOperationException ignore) {
log.info("Creating table {}", tablePath);
}
}

protected void createTable() {
createTablePreCheck();
catalog.createTable(tablePath, catalogTable, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,25 @@ default <T> void buildColumnsWithErrorCheck(
void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException;

/**
* Create a new table in this catalog.
*
* @param tablePath Path of the table
* @param table The table definition
* @param ignoreIfExists Flag to specify behavior when a table with the given name already exist
* @param createIndex If you want to create index or not
* @throws TableAlreadyExistException thrown if the table already exists in the catalog and
* ignoreIfExists is false
* @throws DatabaseNotExistException thrown if the database in tablePath doesn't exist in the
* catalog
* @throws CatalogException in case of any runtime exception
*/
default void createTable(
TablePath tablePath, CatalogTable table, boolean ignoreIfExists, boolean createIndex)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
createTable(tablePath, table, ignoreIfExists);
}

/**
* Drop an existing table in this catalog.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,13 @@ && listTables(tablePath.getDatabaseName())
@Override
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
createTable(tablePath, table, ignoreIfExists, true);
}

@Override
public void createTable(
TablePath tablePath, CatalogTable table, boolean ignoreIfExists, boolean createIndex)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
checkNotNull(tablePath, "Table path cannot be null");

if (!databaseExists(tablePath.getDatabaseName())) {
Expand All @@ -393,22 +400,25 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
throw new TableAlreadyExistException(catalogName, tablePath);
}

createTableInternal(tablePath, table);
createTableInternal(tablePath, table, createIndex);
}

protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
protected String getCreateTableSql(
TablePath tablePath, CatalogTable table, boolean createIndex) {
throw new UnsupportedOperationException();
}

protected List<String> getCreateTableSqls(TablePath tablePath, CatalogTable table) {
return Collections.singletonList(getCreateTableSql(tablePath, table));
protected List<String> getCreateTableSqls(
TablePath tablePath, CatalogTable table, boolean createIndex) {
return Collections.singletonList(getCreateTableSql(tablePath, table, createIndex));
}

protected void createTableInternal(TablePath tablePath, CatalogTable table)
protected void createTableInternal(TablePath tablePath, CatalogTable table, boolean createIndex)
throws CatalogException {
String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
try {
final List<String> createTableSqlList = getCreateTableSqls(tablePath, table);
final List<String> createTableSqlList =
getCreateTableSqls(tablePath, table, createIndex);
for (String sql : createTableSqlList) {
executeInternal(dbUrl, sql);
}
Expand Down Expand Up @@ -646,7 +656,7 @@ public PreviewResult previewAction(
ActionType actionType, TablePath tablePath, Optional<CatalogTable> catalogTable) {
if (actionType == ActionType.CREATE_TABLE) {
checkArgument(catalogTable.isPresent(), "CatalogTable cannot be null");
return new SQLPreviewResult(getCreateTableSql(tablePath, catalogTable.get()));
return new SQLPreviewResult(getCreateTableSql(tablePath, catalogTable.get(), true));
} else if (actionType == ActionType.DROP_TABLE) {
return new SQLPreviewResult(getDropTableSql(tablePath));
} else if (actionType == ActionType.TRUNCATE_TABLE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,10 @@ public interface JdbcCatalogOptions {
.noDefaultValue()
.withDescription(
"The table suffix name added when the table is automatically created");

Option<Boolean> CREATE_INDEX =
Options.key("create_index")
.booleanType()
.defaultValue(true)
.withDescription("Create index or not when auto create table");
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ protected String getListDatabaseSql() {
}

@Override
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
protected String getCreateTableSql(
TablePath tablePath, CatalogTable table, boolean createIndex) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ public IrisCatalog(
}

@Override
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
return new IrisCreateTableSqlBuilder(table).build(tablePath);
protected String getCreateTableSql(
TablePath tablePath, CatalogTable table, boolean createIndex) {
return new IrisCreateTableSqlBuilder(table, createIndex).build(tablePath);
}

@Override
Expand Down Expand Up @@ -224,7 +225,8 @@ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
}

@Override
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
public void createTable(
TablePath tablePath, CatalogTable table, boolean ignoreIfExists, boolean createIndex)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
checkNotNull(tablePath, "Table path cannot be null");
if (defaultSchema.isPresent()) {
Expand All @@ -242,7 +244,7 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
throw new TableAlreadyExistException(catalogName, tablePath);
}

createTableInternal(tablePath, table);
createTableInternal(tablePath, table, createIndex);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,16 @@ public class IrisCreateTableSqlBuilder {
private String fieldIde;

private String comment;
private boolean createIndex;

public IrisCreateTableSqlBuilder(CatalogTable catalogTable) {
public IrisCreateTableSqlBuilder(CatalogTable catalogTable, boolean createIndex) {
this.columns = catalogTable.getTableSchema().getColumns();
this.primaryKey = catalogTable.getTableSchema().getPrimaryKey();
this.constraintKeys = catalogTable.getTableSchema().getConstraintKeys();
this.sourceCatalogName = catalogTable.getCatalogName();
this.fieldIde = catalogTable.getOptions().get("fieldIde");
this.comment = catalogTable.getComment();
this.createIndex = createIndex;
}

public String build(TablePath tablePath) {
Expand All @@ -64,12 +66,13 @@ public String build(TablePath tablePath) {
.collect(Collectors.toList());

// Add primary key directly in the create table statement
if (primaryKey != null
if (createIndex
&& primaryKey != null
&& primaryKey.getColumnNames() != null
&& primaryKey.getColumnNames().size() > 0) {
columnSqls.add(buildPrimaryKeySql(primaryKey));
}
if (CollectionUtils.isNotEmpty(constraintKeys)) {
if (createIndex && CollectionUtils.isNotEmpty(constraintKeys)) {
for (ConstraintKey constraintKey : constraintKeys) {
if (StringUtils.isBlank(constraintKey.getConstraintName())
|| (primaryKey != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,18 @@

@Slf4j
public class IrisSaveModeHandler extends DefaultSaveModeHandler {
public boolean createIndex;

public IrisSaveModeHandler(
@Nonnull SchemaSaveMode schemaSaveMode,
@Nonnull DataSaveMode dataSaveMode,
@Nonnull Catalog catalog,
@Nonnull TablePath tablePath,
@Nullable CatalogTable catalogTable,
@Nullable String customSql) {
@Nullable String customSql,
boolean createIndex) {
super(schemaSaveMode, dataSaveMode, catalog, tablePath, catalogTable, customSql);
this.createIndex = createIndex;
}

@Override
Expand All @@ -53,7 +57,7 @@ protected void createTable() {
Catalog.ActionType.CREATE_TABLE,
tablePath,
Optional.ofNullable(catalogTable)));
catalog.createTable(tablePath, catalogTable, true);
catalog.createTable(tablePath, catalogTable, true, createIndex);
} catch (UnsupportedOperationException ignore) {
log.info("Creating table {}", tablePath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,9 @@ protected Column buildColumn(ResultSet resultSet) throws SQLException {
}

@Override
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
return MysqlCreateTableSqlBuilder.builder(tablePath, table, typeConverter)
protected String getCreateTableSql(
TablePath tablePath, CatalogTable table, boolean createIndex) {
return MysqlCreateTableSqlBuilder.builder(tablePath, table, typeConverter, createIndex)
.build(table.getCatalogName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,28 @@ public class MysqlCreateTableSqlBuilder {
private String fieldIde;

private final MySqlTypeConverter typeConverter;
private boolean createIndex;

private MysqlCreateTableSqlBuilder(String tableName, MySqlTypeConverter typeConverter) {
private MysqlCreateTableSqlBuilder(
String tableName, MySqlTypeConverter typeConverter, boolean createIndex) {
checkNotNull(tableName, "tableName must not be null");
this.tableName = tableName;
this.typeConverter = typeConverter;
this.createIndex = createIndex;
}

public static MysqlCreateTableSqlBuilder builder(
TablePath tablePath, CatalogTable catalogTable, MySqlTypeConverter typeConverter) {
TablePath tablePath,
CatalogTable catalogTable,
MySqlTypeConverter typeConverter,
boolean createIndex) {
checkNotNull(tablePath, "tablePath must not be null");
checkNotNull(catalogTable, "catalogTable must not be null");

TableSchema tableSchema = catalogTable.getTableSchema();
checkNotNull(tableSchema, "tableSchema must not be null");

return new MysqlCreateTableSqlBuilder(tablePath.getTableName(), typeConverter)
return new MysqlCreateTableSqlBuilder(tablePath.getTableName(), typeConverter, createIndex)
.comment(catalogTable.getComment())
// todo: set charset and collate
.engine(null)
Expand Down Expand Up @@ -156,10 +162,10 @@ private String buildColumnsIdentifySql(String catalogName) {
for (Column column : columns) {
columnSqls.add("\t" + buildColumnIdentifySql(column, catalogName, columnTypeMap));
}
if (primaryKey != null) {
if (createIndex && primaryKey != null) {
columnSqls.add("\t" + buildPrimaryKeySql());
}
if (CollectionUtils.isNotEmpty(constraintKeys)) {
if (createIndex && CollectionUtils.isNotEmpty(constraintKeys)) {
for (ConstraintKey constraintKey : constraintKeys) {
if (StringUtils.isBlank(constraintKey.getConstraintName())) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public List<String> listTables(String databaseName)
}

@Override
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
public void createTable(
TablePath tablePath, CatalogTable table, boolean ignoreIfExists, boolean createIndex)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
checkNotNull(tablePath, "Table path cannot be null");

Expand All @@ -99,6 +100,6 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
throw new TableAlreadyExistException(catalogName, tablePath);
}

createTableInternal(tablePath, table);
createTableInternal(tablePath, table, createIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,14 @@ protected String getListDatabaseSql() {
}

@Override
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
return new OracleCreateTableSqlBuilder(table).build(tablePath).get(0);
protected String getCreateTableSql(
TablePath tablePath, CatalogTable table, boolean createIndex) {
return new OracleCreateTableSqlBuilder(table, createIndex).build(tablePath).get(0);
}

protected List<String> getCreateTableSqls(TablePath tablePath, CatalogTable table) {
return new OracleCreateTableSqlBuilder(table).build(tablePath);
protected List<String> getCreateTableSqls(
TablePath tablePath, CatalogTable table, boolean createIndex) {
return new OracleCreateTableSqlBuilder(table, createIndex).build(tablePath);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ public class OracleCreateTableSqlBuilder {
private PrimaryKey primaryKey;
private String sourceCatalogName;
private String fieldIde;
private boolean createIndex;

public OracleCreateTableSqlBuilder(CatalogTable catalogTable) {
public OracleCreateTableSqlBuilder(CatalogTable catalogTable, boolean createIndex) {
this.columns = catalogTable.getTableSchema().getColumns();
this.primaryKey = catalogTable.getTableSchema().getPrimaryKey();
this.sourceCatalogName = catalogTable.getCatalogName();
this.fieldIde = catalogTable.getOptions().get("fieldIde");
this.createIndex = createIndex;
}

public List<String> build(TablePath tablePath) {
Expand All @@ -60,7 +62,8 @@ public List<String> build(TablePath tablePath) {
.collect(Collectors.toList());

// Add primary key directly in the create table statement
if (primaryKey != null
if (createIndex
&& primaryKey != null
&& primaryKey.getColumnNames() != null
&& primaryKey.getColumnNames().size() > 0) {
columnSqls.add(buildPrimaryKeySql(primaryKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ protected Column buildColumn(ResultSet resultSet) throws SQLException {
}

@Override
protected void createTableInternal(TablePath tablePath, CatalogTable table)
protected void createTableInternal(TablePath tablePath, CatalogTable table, boolean createIndex)
throws CatalogException {
PostgresCreateTableSqlBuilder postgresCreateTableSqlBuilder =
new PostgresCreateTableSqlBuilder(table);
new PostgresCreateTableSqlBuilder(table, createIndex);
String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
try {
String createTableSql = postgresCreateTableSqlBuilder.build(tablePath);
Expand All @@ -199,9 +199,10 @@ protected void createTableInternal(TablePath tablePath, CatalogTable table)
}

@Override
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
protected String getCreateTableSql(
TablePath tablePath, CatalogTable table, boolean createIndex) {
PostgresCreateTableSqlBuilder postgresCreateTableSqlBuilder =
new PostgresCreateTableSqlBuilder(table);
new PostgresCreateTableSqlBuilder(table, createIndex);
return postgresCreateTableSqlBuilder.build(tablePath);
}

Expand Down
Loading

0 comments on commit dc3c239

Please sign in to comment.