Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][API] Make sure the table name in TablePath not be null #7252

Merged
merged 5 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,16 @@ protected void dropTable() {

protected void createTable() {
if (!catalog.databaseExists(tablePath.getDatabaseName())) {
TablePath databasePath = TablePath.of(tablePath.getDatabaseName(), "");
try {
log.info(
"Creating database {} with action {}",
tablePath.getDatabaseName(),
catalog.previewAction(
Catalog.ActionType.CREATE_DATABASE,
databasePath,
Optional.empty()));
Catalog.ActionType.CREATE_DATABASE, tablePath, Optional.empty()));
} catch (UnsupportedOperationException ignore) {
log.info("Creating database {}", tablePath.getDatabaseName());
}
catalog.createDatabase(databasePath, true);
catalog.createDatabase(tablePath, true);
}
try {
log.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

package org.apache.seatunnel.api.table.catalog;

import org.apache.commons.lang3.StringUtils;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.NonNull;

import java.io.Serializable;

@Getter
@EqualsAndHashCode
@RequiredArgsConstructor
public final class TableIdentifier implements Serializable {
private static final long serialVersionUID = 1L;

Expand All @@ -35,7 +36,18 @@ public final class TableIdentifier implements Serializable {

private final String schemaName;

private final String tableName;
@NonNull private final String tableName;

public TableIdentifier(
String catalogName, String databaseName, String schemaName, @NonNull String tableName) {
this.catalogName = catalogName;
this.databaseName = databaseName;
this.schemaName = schemaName;
this.tableName = tableName;
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("tableName cannot be empty");
}
}

public static TableIdentifier of(String catalogName, String databaseName, String tableName) {
return new TableIdentifier(catalogName, databaseName, null, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,32 @@

package org.apache.seatunnel.api.table.catalog;

import org.apache.commons.lang3.StringUtils;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.NonNull;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

@Getter
@EqualsAndHashCode
@RequiredArgsConstructor
public final class TablePath implements Serializable {
private static final long serialVersionUID = 1L;
private final String databaseName;
private final String schemaName;
private final String tableName;
@NonNull private final String tableName;

public TablePath(String databaseName, String schemaName, @NonNull String tableName) {
this.databaseName = databaseName;
this.schemaName = schemaName;
this.tableName = tableName;
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("tableName cannot be empty");
}
}

public static final TablePath DEFAULT = TablePath.of("default", "default", "default");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void testSinkOptions() {
@Test
public void testSinkOptionsWithNoTablePath() {
ReadonlyConfig config = createConfig();
CatalogTable table = createTestTableWithNoTablePath();
CatalogTable table = createTestTableWithNoDatabaseAndSchemaName();
ReadonlyConfig newConfig = TablePlaceholder.replaceTablePlaceholder(config, table);

Assertions.assertEquals("xyz_default_db_test", newConfig.get(DATABASE));
Expand All @@ -95,7 +95,7 @@ public void testSinkOptionsWithNoTablePath() {
@Test
public void testSinkOptionsWithExcludeKeys() {
ReadonlyConfig config = createConfig();
CatalogTable table = createTestTableWithNoTablePath();
CatalogTable table = createTestTableWithNoDatabaseAndSchemaName();
ReadonlyConfig newConfig =
TablePlaceholder.replaceTablePlaceholder(
config, table, Arrays.asList(DATABASE.key()));
Expand All @@ -116,7 +116,7 @@ public void testSinkOptionsWithExcludeKeys() {
public void testSinkOptionsWithMultiTable() {
ReadonlyConfig config = createConfig();
CatalogTable table1 = createTestTable();
CatalogTable table2 = createTestTableWithNoTablePath();
CatalogTable table2 = createTestTableWithNoDatabaseAndSchemaName();
ReadonlyConfig newConfig1 =
TablePlaceholder.replaceTablePlaceholder(config, table1, Arrays.asList());
ReadonlyConfig newConfig2 =
Expand Down Expand Up @@ -159,8 +159,8 @@ private static ReadonlyConfig createConfig() {
return ReadonlyConfig.fromMap(configMap);
}

private static CatalogTable createTestTableWithNoTablePath() {
TableIdentifier tableId = TableIdentifier.of("my-catalog", null, null, null);
private static CatalogTable createTestTableWithNoDatabaseAndSchemaName() {
TableIdentifier tableId = TableIdentifier.of("my-catalog", null, null, "default_table");
TableSchema tableSchema =
TableSchema.builder()
.primaryKey(PrimaryKey.of("my-pk", Arrays.asList("f1", "f2")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testJsonParseError() {
SeaTunnelRowType schema =
new SeaTunnelRowType(new String[] {"name"}, new SeaTunnelDataType[] {STRING_TYPE});

CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "", schema);
CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "test", schema);

final DeserializationSchema<SeaTunnelRow> deser =
new JsonDeserializationSchema(catalogTables, false, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.BasicType;
Expand Down Expand Up @@ -146,7 +147,7 @@ protected void buildSchemaWithConfig(Config pluginConfig) {
}
} else {
TableIdentifier tableIdentifier =
TableIdentifier.of(HttpConfig.CONNECTOR_IDENTITY, null, null);
TableIdentifier.of(HttpConfig.CONNECTOR_IDENTITY, TablePath.DEFAULT);
TableSchema tableSchema =
TableSchema.builder()
.column(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void decoder() throws IOException {

SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames, dataTypes);
CatalogTable catalogTables =
CatalogTableUtil.getCatalogTable("", "", "", "", seaTunnelRowType);
CatalogTableUtil.getCatalogTable("", "", "", "test", seaTunnelRowType);
CanalJsonDeserializationSchema canalJsonDeserializationSchema =
CanalJsonDeserializationSchema.builder(catalogTables).build();
PulsarCanalDecorator pulsarCanalDecorator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ public void testFakeSourceToKafkaAvroFormat(TestContainer container)
};
SeaTunnelRowType fake_source_row_type = new SeaTunnelRowType(fieldNames, fieldTypes);
CatalogTable catalogTable =
CatalogTableUtil.getCatalogTable("", "", "", "", fake_source_row_type);
CatalogTableUtil.getCatalogTable("", "", "", "test", fake_source_row_type);
AvroDeserializationSchema avroDeserializationSchema =
new AvroDeserializationSchema(catalogTable);
List<SeaTunnelRow> kafkaSTRow =
Expand Down Expand Up @@ -446,7 +446,7 @@ public void testKafkaAvroToAssert(TestContainer container)
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

CatalogTable catalogTable =
CatalogTableUtil.getCatalogTable("", "", "", "", SEATUNNEL_ROW_TYPE);
CatalogTableUtil.getCatalogTable("", "", "", "test", SEATUNNEL_ROW_TYPE);

AvroDeserializationSchema avroDeserializationSchema =
new AvroDeserializationSchema(catalogTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,11 @@ public SeaTunnelSourceCollector(
tablePaths.forEach(
tablePath ->
sourceReceivedCountPerTable.put(
getFullName(tablePath),
tablePath.getFullName(),
metricsContext.counter(
SOURCE_RECEIVED_COUNT + "#" + getFullName(tablePath))));
SOURCE_RECEIVED_COUNT
+ "#"
+ tablePath.getFullName())));
}
sourceReceivedCount = metricsContext.counter(SOURCE_RECEIVED_COUNT);
sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS);
Expand All @@ -131,7 +133,7 @@ public void collect(T row) {
sourceReceivedBytesPerSeconds.markEvent(size);
flowControlGate.audit((SeaTunnelRow) row);
if (StringUtils.isNotEmpty(tableId)) {
String tableName = getFullName(TablePath.of(tableId));
String tableName = TablePath.of(tableId).getFullName();
Counter sourceTableCounter = sourceReceivedCountPerTable.get(tableName);
if (Objects.nonNull(sourceTableCounter)) {
sourceTableCounter.inc();
Expand Down Expand Up @@ -232,12 +234,4 @@ public void sendRecordToNext(Record<?> record) throws IOException {
}
}
}

private String getFullName(TablePath tablePath) {
if (StringUtils.isBlank(tablePath.getTableName())) {
tablePath =
TablePath.of(tablePath.getDatabaseName(), tablePath.getSchemaName(), "default");
}
return tablePath.getFullName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ public SinkFlowLifeCycle(
sinkTables.forEach(
tablePath ->
sinkWriteCountPerTable.put(
getFullName(tablePath),
tablePath.getFullName(),
metricsContext.counter(
SINK_WRITE_COUNT + "#" + getFullName(tablePath))));
SINK_WRITE_COUNT + "#" + tablePath.getFullName())));
}
}

Expand Down Expand Up @@ -275,7 +275,7 @@ public void received(Record<?> record) {
sinkWriteBytesPerSeconds.markEvent(size);
String tableId = ((SeaTunnelRow) record.getData()).getTableId();
if (StringUtils.isNotBlank(tableId)) {
String tableName = getFullName(TablePath.of(tableId));
String tableName = TablePath.of(tableId).getFullName();
Counter sinkTableCounter = sinkWriteCountPerTable.get(tableName);
if (Objects.nonNull(sinkTableCounter)) {
sinkTableCounter.inc();
Expand Down Expand Up @@ -345,12 +345,4 @@ public void restoreState(List<ActionSubtaskState> actionStateList) throws Except
((SupportResourceShare) this.writer).setMultiTableResourceManager(resourceManager, 0);
}
}

private String getFullName(TablePath tablePath) {
if (StringUtils.isBlank(tablePath.getTableName())) {
tablePath =
TablePath.of(tablePath.getDatabaseName(), tablePath.getSchemaName(), "default");
}
return tablePath.getFullName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private SeaTunnelRowType buildSeaTunnelRowType() {
@Test
public void testSerialization() throws IOException {
SeaTunnelRowType rowType = buildSeaTunnelRowType();
CatalogTable catalogTable = CatalogTableUtil.getCatalogTable("", "", "", "", rowType);
CatalogTable catalogTable = CatalogTableUtil.getCatalogTable("", "", "", "test", rowType);
SeaTunnelRow seaTunnelRow = buildSeaTunnelRow();
AvroSerializationSchema serializationSchema = new AvroSerializationSchema(rowType);
byte[] bytes = serializationSchema.serialize(seaTunnelRow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void testSerDe() throws Exception {
new MapType(STRING_TYPE, new MapType(STRING_TYPE, INT_TYPE))
})
});
CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "", schema);
CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "test", schema);
JsonDeserializationSchema deserializationSchema =
new JsonDeserializationSchema(catalogTables, false, false);

Expand Down Expand Up @@ -230,7 +230,7 @@ public void testSerDeMultiRows() throws Exception {
new SeaTunnelDataType[] {STRING_TYPE, INT_TYPE})
});

CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "", schema);
CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "test", schema);

JsonDeserializationSchema deserializationSchema =
new JsonDeserializationSchema(catalogTables, false, false);
Expand Down Expand Up @@ -308,7 +308,7 @@ public void testSerDeMultiRowsWithNullValues() throws Exception {
new MapType(STRING_TYPE, DOUBLE_TYPE)
});

CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "", rowType);
CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "test", rowType);

JsonDeserializationSchema deserializationSchema =
new JsonDeserializationSchema(catalogTables, false, true);
Expand All @@ -327,7 +327,7 @@ public void testSerDeMultiRowsWithNullValues() throws Exception {
public void testDeserializationNullRow() throws Exception {
SeaTunnelRowType schema =
new SeaTunnelRowType(new String[] {"name"}, new SeaTunnelDataType[] {STRING_TYPE});
CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "", schema);
CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "test", schema);

JsonDeserializationSchema deserializationSchema =
new JsonDeserializationSchema(catalogTables, true, false);
Expand All @@ -339,7 +339,7 @@ public void testDeserializationNullRow() throws Exception {
public void testDeserializationMissingNode() throws Exception {
SeaTunnelRowType schema =
new SeaTunnelRowType(new String[] {"name"}, new SeaTunnelDataType[] {STRING_TYPE});
CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "", schema);
CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "test", schema);

JsonDeserializationSchema deserializationSchema =
new JsonDeserializationSchema(catalogTables, true, false);
Expand All @@ -359,7 +359,7 @@ public void testDeserializationPassMissingField() throws Exception {
SeaTunnelRowType schema =
new SeaTunnelRowType(new String[] {"name"}, new SeaTunnelDataType[] {STRING_TYPE});

CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "", schema);
CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "test", schema);

// pass on missing field
final JsonDeserializationSchema deser =
Expand All @@ -382,7 +382,7 @@ public void testDeserializationMissingField() throws Exception {
SeaTunnelRowType schema =
new SeaTunnelRowType(new String[] {"name"}, new SeaTunnelDataType[] {STRING_TYPE});

CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "", schema);
CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "test", schema);

// fail on missing field
final JsonDeserializationSchema deser =
Expand Down Expand Up @@ -418,7 +418,7 @@ public void testDeserializationIgnoreParseError() throws Exception {
SeaTunnelRowType schema =
new SeaTunnelRowType(new String[] {"name"}, new SeaTunnelDataType[] {STRING_TYPE});
SeaTunnelRow expected = new SeaTunnelRow(1);
CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "", schema);
CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "test", schema);

// ignore on parse error
final JsonDeserializationSchema deser =
Expand Down Expand Up @@ -446,7 +446,7 @@ public void testDeserializationNoJson() throws Exception {
SeaTunnelRowType schema =
new SeaTunnelRowType(new String[] {"name"}, new SeaTunnelDataType[] {STRING_TYPE});

CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "", schema);
CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "test", schema);

String noJson = "{]";
final JsonDeserializationSchema deser =
Expand Down
Loading
Loading