Skip to content

Commit

Permalink
[Bug][Connector-Iceberg]fix create iceberg v2 table with pks (#6895)
Browse files Browse the repository at this point in the history
  • Loading branch information
litiliu authored May 29, 2024
1 parent 3417fb2 commit 40d2c1b
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class IcebergTypeMapper {
private static int fieldId = 1;

public static SeaTunnelDataType<?> mapping(String field, @NonNull Type icebergType) {
switch (icebergType.typeId()) {
Expand Down Expand Up @@ -113,6 +113,10 @@ private static MapType mappingMapType(String field, Types.MapType mapType) {
}

public static Type toIcebergType(SeaTunnelDataType dataType) {
return toIcebergType(dataType, new AtomicInteger(1));
}

private static Type toIcebergType(SeaTunnelDataType dataType, AtomicInteger nextId) {
switch (dataType.getSqlType()) {
case BOOLEAN:
return Types.BooleanType.get();
Expand All @@ -134,22 +138,27 @@ public static Type toIcebergType(SeaTunnelDataType dataType) {
case ARRAY:
ArrayType arrayType = (ArrayType) dataType;
// converter elementType
Type elementType = toIcebergType(arrayType.getElementType());
return Types.ListType.ofOptional(nextId(), elementType);
Type elementType = toIcebergType(arrayType.getElementType(), nextId);
return Types.ListType.ofOptional(nextId.getAndIncrement(), elementType);
case MAP:
org.apache.seatunnel.api.table.type.MapType mapType =
(org.apache.seatunnel.api.table.type.MapType) dataType;
Type keyType = toIcebergType(mapType.getKeyType());
Type valueType = toIcebergType(mapType.getValueType());
return Types.MapType.ofOptional(nextId(), nextId(), keyType, valueType);
Type keyType = toIcebergType(mapType.getKeyType(), nextId);
Type valueType = toIcebergType(mapType.getValueType(), nextId);
return Types.MapType.ofOptional(
nextId.getAndIncrement(), nextId.getAndIncrement(), keyType, valueType);
case ROW:
SeaTunnelRowType seaTunnelRowType = (SeaTunnelRowType) dataType;
List<Types.NestedField> structFields = new ArrayList<>();
for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) {
String field = seaTunnelRowType.getFieldName(i);
SeaTunnelDataType fieldType = seaTunnelRowType.getFieldType(i);
structFields.add(
Types.NestedField.of(nextId(), true, field, toIcebergType(fieldType)));
Types.NestedField.of(
nextId.getAndIncrement(),
true,
field,
toIcebergType(fieldType, nextId)));
}
return Types.StructType.of(structFields);
case DATE:
Expand All @@ -163,8 +172,4 @@ public static Type toIcebergType(SeaTunnelDataType dataType) {
return Types.StringType.get();
}
}

private static int nextId() {
return fieldId++;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaDeleteColumn;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaModifyColumn;

import org.apache.commons.collections.CollectionUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
Expand All @@ -50,11 +51,17 @@

import org.jetbrains.annotations.NotNull;

import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -94,7 +101,7 @@ public static Table autoCreateTable(
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
TableSchema tableSchema = table.getTableSchema();
// Convert to iceberg schema
Schema schema = toIcebergSchema(tableSchema.toPhysicalRowDataType());
Schema schema = toIcebergSchema(tableSchema.toPhysicalRowDataType(), readonlyConfig);
// Convert sink config
SinkConfig config = new SinkConfig(readonlyConfig);
// build auto create table
Expand All @@ -120,7 +127,7 @@ public static Table autoCreateTable(
SinkConfig config,
SeaTunnelRowType rowType) {
// Generate struct type
Schema schema = toIcebergSchema(rowType);
Schema schema = toIcebergSchema(rowType, config.getReadonlyConfig());
return createTable(catalog, tableIdentifier, config, schema, config.getAutoCreateProps());
}

Expand Down Expand Up @@ -160,9 +167,42 @@ private static Table createTable(
return result.get();
}

@NotNull private static Schema toIcebergSchema(SeaTunnelRowType rowType) {
@VisibleForTesting
@NotNull protected static Schema toIcebergSchema(
SeaTunnelRowType rowType, ReadonlyConfig readonlyConfig) {
Types.StructType structType = SchemaUtils.toIcebergType(rowType).asStructType();
return new Schema(structType.fields());
Set<Integer> identifierFieldIds = new HashSet<>();
if (Objects.nonNull(readonlyConfig)) {
List<String> pks =
SinkConfig.stringToList(readonlyConfig.get(SinkConfig.TABLE_PRIMARY_KEYS), ",");
if (CollectionUtils.isNotEmpty(pks)) {
for (String pk : pks) {
Optional<Integer> pkId =
structType.fields().stream()
.filter(nestedField -> nestedField.name().equals(pk))
.map(nestedField -> nestedField.fieldId())
.findFirst();
if (!pkId.isPresent()) {
throw new IllegalArgumentException(
String.format(
"iceberg table pk:%s not present in the incoming struct",
pk));
}
identifierFieldIds.add(pkId.get());
}
}
}
List<Types.NestedField> fields = new ArrayList<>();
structType
.fields()
.forEach(
field -> {
fields.add(
identifierFieldIds.contains(field.fieldId())
? field.asRequired()
: field.asOptional());
});
return new Schema(fields, identifierFieldIds);
}

public static TableIdentifier toIcebergTableIdentifierFromCatalogTable(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.iceberg.utils;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SinkConfig;

import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;

class SchemaUtilsTest {

@Test
void testToIcebergSchemaWithPk() {
String[] fieldNames = new String[] {"id", "name", "description", "weight"};
SeaTunnelDataType<?>[] dataTypes =
new SeaTunnelDataType[] {
BasicType.LONG_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE
};
SeaTunnelRowType rowType = new SeaTunnelRowType(fieldNames, dataTypes);
List<String> pks = Arrays.asList("id", "name");
ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromMap(
new HashMap<String, Object>() {
{
put(SinkConfig.TABLE_PRIMARY_KEYS.key(), String.join(",", pks));
}
});
Schema schema = SchemaUtils.toIcebergSchema(rowType, readonlyConfig);
Assertions.assertNotNull(schema);
Assertions.assertEquals(fieldNames.length, schema.columns().size());
for (Types.NestedField column : schema.columns()) {
Assertions.assertEquals(fieldNames[column.fieldId() - 1], column.name());
if (pks.contains(column.name())) {
Assertions.assertEquals(Boolean.TRUE, column.isRequired());
} else {
Assertions.assertEquals(Boolean.FALSE, column.isRequired());
}
}
Assertions.assertNotNull(schema.identifierFieldIds());
Assertions.assertEquals(pks.size(), schema.identifierFieldIds().size());
for (Integer identifierFieldId : schema.identifierFieldIds()) {
Assertions.assertEquals(
pks.get(identifierFieldId - 1), fieldNames[identifierFieldId - 1]);
}
}

@Test
void testToIcebergSchemaWithoutPk() {
String[] fieldNames = new String[] {"id", "name", "description", "weight"};
SeaTunnelDataType<?>[] dataTypes =
new SeaTunnelDataType[] {
BasicType.LONG_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE
};
SeaTunnelRowType rowType = new SeaTunnelRowType(fieldNames, dataTypes);
ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromMap(
new HashMap<String, Object>() {
{
}
});
Schema schema = SchemaUtils.toIcebergSchema(rowType, readonlyConfig);
Assertions.assertNotNull(schema);
Assertions.assertEquals(fieldNames.length, schema.columns().size());
for (Types.NestedField column : schema.columns()) {
Assertions.assertEquals(fieldNames[column.fieldId() - 1], column.name());
Assertions.assertEquals(Boolean.FALSE, column.isRequired());
}
}
}

0 comments on commit 40d2c1b

Please sign in to comment.