Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][API] Check catalog table fields name legal before send to downstream #7358

Merged
merged 3 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run seatunnel zeta integration test
if: needs.changes.outputs.api == 'true'
run: |
Expand Down Expand Up @@ -609,6 +611,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run transform-v2 integration test (part-1)
if: needs.changes.outputs.api == 'true'
run: |
Expand All @@ -633,6 +637,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run transform-v2 integration test (part-2)
if: needs.changes.outputs.api == 'true'
run: |
Expand All @@ -657,6 +663,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-1)
if: needs.changes.outputs.api == 'true'
run: |
Expand Down Expand Up @@ -684,6 +692,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-2)
if: needs.changes.outputs.api == 'true'
run: |
Expand Down Expand Up @@ -711,6 +721,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-3)
if: needs.changes.outputs.api == 'true'
run: |
Expand Down Expand Up @@ -738,6 +750,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-4)
if: needs.changes.outputs.api == 'true'
run: |
Expand Down Expand Up @@ -765,6 +779,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-5)
if: needs.changes.outputs.api == 'true'
run: |
Expand Down Expand Up @@ -792,6 +808,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-6)
if: needs.changes.outputs.api == 'true'
run: |
Expand Down Expand Up @@ -819,6 +837,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-7)
if: needs.changes.outputs.api == 'true'
run: |
Expand Down Expand Up @@ -898,6 +918,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run jdbc connectors integration test (part-3)
if: needs.changes.outputs.api == 'true'
run: |
Expand All @@ -922,6 +944,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run jdbc connectors integration test (part-4)
if: needs.changes.outputs.api == 'true'
run: |
Expand All @@ -946,6 +970,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run jdbc connectors integration test (part-5)
if: needs.changes.outputs.api == 'true'
run: |
Expand Down Expand Up @@ -996,6 +1022,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run jdbc connectors integration test (part-7)
if: needs.changes.outputs.api == 'true'
run: |
Expand All @@ -1020,6 +1048,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run kudu connector integration test
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-kudu-e2e -am -Pci
Expand All @@ -1043,6 +1073,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run amazonsqs connector integration test
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-amazonsqs-e2e -am -Pci
Expand All @@ -1066,6 +1098,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run kafka connector integration test
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-kafka-e2e -am -Pci
Expand All @@ -1089,6 +1123,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run rocket connector integration test
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-rocketmq-e2e -am -Pci
Expand Down Expand Up @@ -1139,6 +1175,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run oracle cdc connector integration test
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-cdc-oracle-e2e -am -Pci
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
package org.apache.seatunnel.api.table.factory;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.common.utils.SeaTunnelException;

import org.apache.commons.lang3.StringUtils;

import lombok.Getter;

import java.util.ArrayList;
import java.util.List;

@Getter
public abstract class TableFactoryContext {

Expand All @@ -31,4 +38,25 @@ public TableFactoryContext(ReadonlyConfig options, ClassLoader classLoader) {
this.options = options;
this.classLoader = classLoader;
}

protected static void checkCatalogTableIllegal(List<CatalogTable> catalogTables) {
for (CatalogTable catalogTable : catalogTables) {
List<String> alreadyChecked = new ArrayList<>();
for (String fieldName : catalogTable.getTableSchema().getFieldNames()) {
if (StringUtils.isBlank(fieldName)) {
throw new SeaTunnelException(
String.format(
"Table %s field name cannot be empty",
catalogTable.getTablePath().getFullName()));
}
if (alreadyChecked.contains(fieldName)) {
throw new SeaTunnelException(
String.format(
"Table %s field %s duplicate",
catalogTable.getTablePath().getFullName(), fieldName));
}
alreadyChecked.add(fieldName);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,24 @@
import org.apache.seatunnel.api.sink.TablePlaceholder;
import org.apache.seatunnel.api.table.catalog.CatalogTable;

import com.google.common.annotations.VisibleForTesting;
import lombok.Getter;

import java.util.Collection;
import java.util.Collections;

@Getter
public class TableSinkFactoryContext extends TableFactoryContext {

private final CatalogTable catalogTable;

protected TableSinkFactoryContext(
@VisibleForTesting
public TableSinkFactoryContext(
CatalogTable catalogTable, ReadonlyConfig options, ClassLoader classLoader) {
super(options, classLoader);
if (catalogTable != null) {
checkCatalogTableIllegal(Collections.singletonList(catalogTable));
}
this.catalogTable = catalogTable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class TableTransformFactoryContext extends TableFactoryContext {
public TableTransformFactoryContext(
List<CatalogTable> catalogTables, ReadonlyConfig options, ClassLoader classLoader) {
super(options, classLoader);
checkCatalogTableIllegal(catalogTables);
this.catalogTables = catalogTables;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
package org.apache.seatunnel.api.table.catalog;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.SeaTunnelException;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -89,4 +93,62 @@ public void testReadCatalogTableWithUnsupportedType() {
});
Assertions.assertEquals(result, exception.getParamsValueAs("tableUnsupportedTypes"));
}

@Test
public void testCatalogTableWithIllegalFieldNames() {
CatalogTable catalogTable =
CatalogTable.of(
TableIdentifier.of("catalog", "database", "table"),
TableSchema.builder()
.column(
PhysicalColumn.of(
" ", BasicType.STRING_TYPE, 1L, true, null, ""))
.build(),
Collections.emptyMap(),
Collections.emptyList(),
"comment");
SeaTunnelException exception =
Assertions.assertThrows(
SeaTunnelException.class,
() ->
new TableTransformFactoryContext(
Collections.singletonList(catalogTable), null, null));
SeaTunnelException exception2 =
Assertions.assertThrows(
SeaTunnelException.class,
() -> new TableSinkFactoryContext(catalogTable, null, null));
Assertions.assertEquals(
"Table database.table field name cannot be empty", exception.getMessage());
Assertions.assertEquals(
"Table database.table field name cannot be empty", exception2.getMessage());

CatalogTable catalogTable2 =
CatalogTable.of(
TableIdentifier.of("catalog", "database", "table"),
TableSchema.builder()
.column(
PhysicalColumn.of(
"name1", BasicType.STRING_TYPE, 1L, true, null, ""))
.column(
PhysicalColumn.of(
"name1", BasicType.STRING_TYPE, 1L, true, null, ""))
.build(),
Collections.emptyMap(),
Collections.emptyList(),
"comment");
SeaTunnelException exception3 =
Assertions.assertThrows(
SeaTunnelException.class,
() ->
new TableTransformFactoryContext(
Collections.singletonList(catalogTable2), null, null));
SeaTunnelException exception4 =
Assertions.assertThrows(
SeaTunnelException.class,
() -> new TableSinkFactoryContext(catalogTable2, null, null));
Assertions.assertEquals(
"Table database.table field name1 duplicate", exception3.getMessage());
Assertions.assertEquals(
"Table database.table field name1 duplicate", exception4.getMessage());
}
}
Loading