Skip to content

Commit

Permalink
Rename TableConfig to TableCreateConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
banmoy committed Dec 2, 2023
1 parent 19f96a4 commit cbc25a5
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ public class StarRocksDataSink implements DataSink, Serializable {
private final StarRocksSinkOptions sinkOptions;

/** Configurations for creating a StarRocks table. */
private final TableConfig tableConfig;
private final TableCreateConfig tableCreateConfig;

/** Configurations for schema change. */
private final SchemaChangeConfig schemaChangeConfig;

public StarRocksDataSink(
StarRocksSinkOptions sinkOptions,
TableConfig tableConfig,
TableCreateConfig tableCreateConfig,
SchemaChangeConfig schemaChangeConfig) {
this.sinkOptions = sinkOptions;
this.tableConfig = tableConfig;
this.tableCreateConfig = tableCreateConfig;
this.schemaChangeConfig = schemaChangeConfig;
}

Expand All @@ -65,6 +65,6 @@ public MetadataApplier getMetadataApplier() {
sinkOptions.getJdbcUrl(),
sinkOptions.getUsername(),
sinkOptions.getPassword());
return new StarRocksMetadataApplier(catalog, tableConfig, schemaChangeConfig);
return new StarRocksMetadataApplier(catalog, tableCreateConfig, schemaChangeConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public class StarRocksDataSinkFactory implements DataSinkFactory {
@Override
public DataSink createDataSink(Context context) {
StarRocksSinkOptions sinkOptions = buildSinkConnectorOptions(context.getConfiguration());
TableConfig tableConfig = TableConfig.from(context.getConfiguration());
TableCreateConfig tableCreateConfig = TableCreateConfig.from(context.getConfiguration());
SchemaChangeConfig schemaChangeConfig = SchemaChangeConfig.from(context.getConfiguration());
return new StarRocksDataSink(sinkOptions, tableConfig, schemaChangeConfig);
return new StarRocksDataSink(sinkOptions, tableCreateConfig, schemaChangeConfig);
}

private StarRocksSinkOptions buildSinkConnectorOptions(Configuration cdcConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ public class StarRocksMetadataApplier implements MetadataApplier {
private static final Logger LOG = LoggerFactory.getLogger(StarRocksMetadataApplier.class);

private final StarRocksCatalog catalog;
private final TableConfig tableConfig;
private final TableCreateConfig tableCreateConfig;
private final SchemaChangeConfig schemaChangeConfig;
private boolean isOpened;

public StarRocksMetadataApplier(
StarRocksCatalog catalog,
TableConfig tableConfig,
TableCreateConfig tableCreateConfig,
SchemaChangeConfig schemaChangeConfig) {
this.catalog = catalog;
this.tableConfig = tableConfig;
this.tableCreateConfig = tableCreateConfig;
this.schemaChangeConfig = schemaChangeConfig;
this.isOpened = false;
}
Expand Down Expand Up @@ -86,7 +86,9 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
private void applyCreateTable(CreateTableEvent createTableEvent) {
StarRocksTable starRocksTable =
StarRocksUtils.toStarRocksTable(
createTableEvent.tableId(), createTableEvent.getSchema(), tableConfig);
createTableEvent.tableId(),
createTableEvent.getSchema(),
tableCreateConfig);
if (!catalog.databaseExists(starRocksTable.getDatabaseName())) {
catalog.createDatabase(starRocksTable.getDatabaseName(), true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class StarRocksUtils {

/** Convert a source table to {@link StarRocksTable}. */
public static StarRocksTable toStarRocksTable(
TableId tableId, Schema schema, TableConfig tableConfig) {
TableId tableId, Schema schema, TableCreateConfig tableCreateConfig) {
if (schema.primaryKeys().isEmpty()) {
throw new RuntimeException(
String.format(
Expand Down Expand Up @@ -96,10 +96,10 @@ public static StarRocksTable toStarRocksTable(
// use primary keys as distribution keys by default
.setDistributionKeys(schema.primaryKeys())
.setComment(schema.comment());
if (tableConfig.getNumBuckets().isPresent()) {
tableBuilder.setNumBuckets(tableConfig.getNumBuckets().get());
if (tableCreateConfig.getNumBuckets().isPresent()) {
tableBuilder.setNumBuckets(tableCreateConfig.getNumBuckets().get());
}
tableBuilder.setTableProperties(tableConfig.getProperties());
tableBuilder.setTableProperties(tableCreateConfig.getProperties());
return tableBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* primary_key_table/#create-a-table">StarRocks Documentation</a> for how to create a StarRocks
* primary key table.
*/
public class TableConfig implements Serializable {
public class TableCreateConfig implements Serializable {

private static final long serialVersionUID = 1L;

Expand All @@ -46,7 +46,7 @@ public class TableConfig implements Serializable {
/** Properties for the table. */
private final Map<String, String> properties;

public TableConfig(@Nullable Integer numBuckets, Map<String, String> properties) {
public TableCreateConfig(@Nullable Integer numBuckets, Map<String, String> properties) {
this.numBuckets = numBuckets;
this.properties = new HashMap<>(properties);
}
Expand All @@ -59,7 +59,7 @@ public Map<String, String> getProperties() {
return Collections.unmodifiableMap(properties);
}

public static TableConfig from(Configuration config) {
public static TableCreateConfig from(Configuration config) {
Integer numBuckets = config.get(TABLE_CREATE_NUM_BUCKETS);
Map<String, String> tableProperties =
config.toMap().entrySet().stream()
Expand All @@ -73,6 +73,6 @@ public static TableConfig from(Configuration config) {
.length())
.toLowerCase(),
Map.Entry::getValue));
return new TableConfig(numBuckets, tableProperties);
return new TableCreateConfig(numBuckets, tableProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import com.ververica.cdc.common.types.TimestampType;
import com.ververica.cdc.connectors.starrocks.sink.SchemaChangeConfig;
import com.ververica.cdc.connectors.starrocks.sink.StarRocksMetadataApplier;
import com.ververica.cdc.connectors.starrocks.sink.TableConfig;
import com.ververica.cdc.connectors.starrocks.sink.TableCreateConfig;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -64,10 +64,10 @@ public void setup() {
.put("table.create.properties.replication_num", "5")
.build());
SchemaChangeConfig schemaChangeConfig = SchemaChangeConfig.from(configuration);
TableConfig tableConfig = TableConfig.from(configuration);
TableCreateConfig tableCreateConfig = TableCreateConfig.from(configuration);
this.catalog = new MockStarRocksCatalog();
this.metadataApplier =
new StarRocksMetadataApplier(catalog, tableConfig, schemaChangeConfig);
new StarRocksMetadataApplier(catalog, tableCreateConfig, schemaChangeConfig);
}

@Test
Expand Down

0 comments on commit cbc25a5

Please sign in to comment.