Skip to content

Commit

Permalink
[#1313][#1471] feat(iceberg): Support struct column for iceberg (#2089)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

add  struct column support for iceberg 

### Why are the changes needed?

Fix: #1313
Fix: #1471 

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

UT added

Co-authored-by: Tianhang Li <[email protected]>
Co-authored-by: teo <[email protected]>
  • Loading branch information
3 people authored Feb 6, 2024
1 parent ff0e1e7 commit 38ee3f5
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergColumn;
import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTable;
import java.util.Arrays;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
Expand All @@ -16,11 +17,14 @@ public class ConvertUtil {
/**
* Convert the Iceberg Table to the corresponding schema information in the Iceberg.
*
* @param icebergTable Iceberg table.
* @return iceberg schema.
* @param gravitinoTable Gravitino table of Iceberg.
* @return Iceberg schema.
*/
public static Schema toIcebergSchema(IcebergTable icebergTable) {
Type converted = ToIcebergTypeVisitor.visit(icebergTable, new ToIcebergType(icebergTable));
public static Schema toIcebergSchema(IcebergTable gravitinoTable) {
com.datastrato.gravitino.rel.types.Types.StructType gravitinoStructType =
toGravitinoStructType(gravitinoTable);
Type converted =
ToIcebergTypeVisitor.visit(gravitinoStructType, new ToIcebergType(gravitinoStructType));
return new Schema(converted.asNestedType().asStructType().fields());
}

Expand Down Expand Up @@ -50,7 +54,7 @@ public static com.datastrato.gravitino.rel.types.Type formIcebergType(Type type)
* Convert the nested field of Iceberg to the Iceberg column.
*
* @param nestedField Iceberg nested field.
* @return
* @return Gravitino iceberg column
*/
public static IcebergColumn fromNestedField(Types.NestedField nestedField) {
return new IcebergColumn.Builder()
Expand All @@ -61,4 +65,22 @@ public static IcebergColumn fromNestedField(Types.NestedField nestedField) {
.withType(ConvertUtil.formIcebergType(nestedField.type()))
.build();
}

/**
* Convert the Gravitino iceberg table to the Gravitino StructType
*
* @param icebergTable Gravitino iceberg table
* @return Gravitino StructType
*/
private static com.datastrato.gravitino.rel.types.Types.StructType toGravitinoStructType(
IcebergTable icebergTable) {
com.datastrato.gravitino.rel.types.Types.StructType.Field[] fields =
Arrays.stream(icebergTable.columns())
.map(
column ->
com.datastrato.gravitino.rel.types.Types.StructType.Field.of(
column.name(), column.dataType(), column.nullable(), column.comment()))
.toArray(com.datastrato.gravitino.rel.types.Types.StructType.Field[]::new);
return com.datastrato.gravitino.rel.types.Types.StructType.of(fields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.datastrato.gravitino.catalog.lakehouse.iceberg.converter;

import com.datastrato.gravitino.rel.types.Type;
import java.util.ArrayList;
import java.util.List;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.TypeUtil;
Expand All @@ -26,7 +27,20 @@ public Type schema(Schema schema, Type structType) {

@Override
public Type struct(Types.StructType struct, List<Type> fieldResults) {
throw new UnsupportedOperationException("Data conversion of struct type is not supported");
List<com.datastrato.gravitino.rel.types.Types.StructType.Field> fieldsList = new ArrayList<>();
List<Types.NestedField> originalFields = struct.fields();

for (int i = 0; i < originalFields.size(); i++) {
Types.NestedField nestedField = originalFields.get(i);
fieldsList.add(
com.datastrato.gravitino.rel.types.Types.StructType.Field.of(
nestedField.name(),
fieldResults.get(i),
nestedField.isOptional(),
nestedField.doc()));
}
return com.datastrato.gravitino.rel.types.Types.StructType.of(
fieldsList.toArray(new com.datastrato.gravitino.rel.types.Types.StructType.Field[0]));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,8 @@
*/
package com.datastrato.gravitino.catalog.lakehouse.iceberg.converter;

import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergColumn;
import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTable;
import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

Expand All @@ -19,7 +15,7 @@
* <p>Referred from core/src/main/java/org/apache/iceberg/spark/SparkTypeToType.java
*/
public class ToIcebergType extends ToIcebergTypeVisitor<Type> {
private final IcebergTable root;
private final com.datastrato.gravitino.rel.types.Types.StructType root;
private int nextId = 0;
private boolean nullable;

Expand All @@ -28,42 +24,47 @@ public ToIcebergType(boolean nullable) {
this.nullable = nullable;
}

public ToIcebergType(IcebergTable root) {
public ToIcebergType(com.datastrato.gravitino.rel.types.Types.StructType root) {
this.root = root;
// the root struct's fields use the first ids
this.nextId = root.columns().length;
this.nextId = root.fields().length;
}

private int getNextId() {
return nextId++;
}

@Override
public Type struct(IcebergTable struct, List<Type> types) {
List<IcebergColumn> fields =
Arrays.stream(struct.columns())
.map(column -> (IcebergColumn) column)
.collect(Collectors.toList());
List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(fields.size());
public Type struct(com.datastrato.gravitino.rel.types.Types.StructType struct, List<Type> types) {
com.datastrato.gravitino.rel.types.Types.StructType.Field[] fields = struct.fields();
List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(fields.length);
boolean isRoot = root == struct;

for (int i = 0; i < fields.size(); i += 1) {
IcebergColumn field = fields.get(i);
for (int i = 0; i < fields.length; i += 1) {
com.datastrato.gravitino.rel.types.Types.StructType.Field field = fields[i];
Type type = types.get(i);

// for new conversions, use ordinals for ids in the root struct
int id = isRoot ? i : getNextId();
int id;
if (isRoot) {
// for new conversions, use ordinals for ids in the root struct
id = i;
} else {
id = getNextId();
}

String doc = field.comment();

if (field.nullable()) {
newFields.add(Types.NestedField.optional(id, field.name(), type, field.comment()));
newFields.add(Types.NestedField.optional(id, field.name(), type, doc));
} else {
newFields.add(Types.NestedField.required(id, field.name(), type, field.comment()));
newFields.add(Types.NestedField.required(id, field.name(), type, doc));
}
}
return Types.StructType.of(newFields);
}

@Override
public Type field(IcebergColumn field, Type typeResult) {
public Type field(
com.datastrato.gravitino.rel.types.Types.StructType.Field field, Type typeResult) {
return typeResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
*/
package com.datastrato.gravitino.catalog.lakehouse.iceberg.converter;

import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergColumn;
import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTable;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.types.Type;
import com.datastrato.gravitino.rel.types.Types;
import com.google.common.collect.Lists;
Expand All @@ -20,30 +18,12 @@
public class ToIcebergTypeVisitor<T> {

/**
* Traverse the gravitino table and convert the fields into iceberg fields.
* Traverse the Gravitino data type and convert the fields into Iceberg fields.
*
* @param table iceberg table.
* @param visitor
* @param <T>
* @return
*/
public static <T> T visit(IcebergTable table, ToIcebergTypeVisitor<T> visitor) {
Column[] columns = table.columns();
List<T> fieldResults = Lists.newArrayListWithExpectedSize(columns.length);

for (Column field : columns) {
fieldResults.add(visitor.field((IcebergColumn) field, visit(field.dataType(), visitor)));
}
return visitor.struct(table, fieldResults);
}

/**
* Convert the type mapping of gravitino to Iceberg.
*
* @param type TODO Abstract a data type in a gravitino.
* @param visitor
* @return
* @param <T>
* @param type Gravitino a data type in a gravitino.
* @param visitor Visitor of Iceberg type
* @param <T> Iceberg type
* @return Iceberg type
*/
public static <T> T visit(Type type, ToIcebergTypeVisitor<T> visitor) {
if (type instanceof Types.MapType) {
Expand All @@ -52,6 +32,14 @@ public static <T> T visit(Type type, ToIcebergTypeVisitor<T> visitor) {
} else if (type instanceof Types.ListType) {
Types.ListType list = (Types.ListType) type;
return visitor.array(list, visit(list.elementType(), visitor));
} else if (type instanceof Types.StructType) {
Types.StructType.Field[] fields = ((Types.StructType) type).fields();
List<T> fieldResults = Lists.newArrayListWithExpectedSize(fields.length);
for (Types.StructType.Field field : fields) {
fieldResults.add(visitor.field(field, visit(field.type(), visitor)));
}
return visitor.struct(
(com.datastrato.gravitino.rel.types.Types.StructType) type, fieldResults);
} else {
return visitor.atomic((Type.PrimitiveType) type);
}
Expand All @@ -61,7 +49,12 @@ public T struct(IcebergTable struct, List<T> fieldResults) {
throw new UnsupportedOperationException();
}

public T field(IcebergColumn field, T typeResult) {
public T struct(
com.datastrato.gravitino.rel.types.Types.StructType struct, List<T> fieldResults) {
throw new UnsupportedOperationException();
}

public T field(Types.StructType.Field field, T typeResult) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,25 @@ public void testCreateIcebergTable() {
.withComment(ICEBERG_COMMENT)
.withNullable(false)
.build();
Column[] columns = new Column[] {col1, col2};
Types.StructType structTypeInside =
Types.StructType.of(
Types.StructType.Field.notNullField("integer_field_inside", Types.IntegerType.get()),
Types.StructType.Field.notNullField(
"string_field_inside", Types.StringType.get(), "string field inside"));
Types.StructType structType =
Types.StructType.of(
Types.StructType.Field.notNullField("integer_field", Types.IntegerType.get()),
Types.StructType.Field.notNullField(
"string_field", Types.StringType.get(), "string field"),
Types.StructType.Field.nullableField("struct_field", structTypeInside, "struct field"));
IcebergColumn col3 =
new IcebergColumn.Builder()
.withName("col_3")
.withType(structType)
.withComment(ICEBERG_COMMENT)
.withNullable(false)
.build();
Column[] columns = new Column[] {col1, col2, col3};

SortOrder[] sortOrders = createSortOrder();
Table table =
Expand All @@ -166,6 +184,7 @@ public void testCreateIcebergTable() {
Assertions.assertEquals("val2", loadedTable.properties().get("key2"));
Assertions.assertTrue(loadedTable.columns()[0].nullable());
Assertions.assertFalse(loadedTable.columns()[1].nullable());
Assertions.assertFalse(loadedTable.columns()[2].nullable());

Assertions.assertTrue(icebergCatalog.asTableCatalog().tableExists(tableIdentifier));
NameIdentifier[] tableIdents =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,101 @@ public void testFormIcebergType() {
Assertions.assertTrue(
((com.datastrato.gravitino.rel.types.Types.ListType) gravitinoListType).elementType()
instanceof com.datastrato.gravitino.rel.types.Types.StringType);

Types.StructType structTypeInside =
Types.StructType.of(
Types.NestedField.optional(
2, "integer_type_inside", Types.IntegerType.get(), "integer type"),
Types.NestedField.optional(
3, "string_type_inside", Types.StringType.get(), "string type"));
Types.StructType structType =
Types.StructType.of(
Types.NestedField.optional(0, "integer_type", Types.IntegerType.get(), "integer type"),
Types.NestedField.optional(1, "struct_type", structTypeInside, "struct type inside"));
com.datastrato.gravitino.rel.types.Type gravitinoStructType =
ConvertUtil.formIcebergType(structType);
// check for type
Assertions.assertTrue(
(gravitinoStructType) instanceof com.datastrato.gravitino.rel.types.Types.StructType);
Assertions.assertTrue(
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[0].type()
instanceof com.datastrato.gravitino.rel.types.Types.IntegerType);
Assertions.assertTrue(
((com.datastrato.gravitino.rel.types.Types.StructType)
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].type())
.fields()[0].type()
instanceof com.datastrato.gravitino.rel.types.Types.IntegerType);
Assertions.assertTrue(
((com.datastrato.gravitino.rel.types.Types.StructType)
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].type())
.fields()[1].type()
instanceof com.datastrato.gravitino.rel.types.Types.StringType);
// check for name
Assertions.assertEquals(
structType.fields().get(0).name(),
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[0].name());
Assertions.assertEquals(
structType.fields().get(1).name(),
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].name());
Assertions.assertEquals(
structTypeInside.fields().get(0).name(),
((com.datastrato.gravitino.rel.types.Types.StructType)
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].type())
.fields()[0].name());
Assertions.assertEquals(
structTypeInside.fields().get(1).name(),
((com.datastrato.gravitino.rel.types.Types.StructType)
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].type())
.fields()[1].name());
// check for comment
Assertions.assertEquals(
structType.fields().get(0).doc(),
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[0].comment());
Assertions.assertEquals(
structType.fields().get(1).doc(),
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].comment());
Assertions.assertEquals(
structTypeInside.fields().get(0).doc(),
((com.datastrato.gravitino.rel.types.Types.StructType)
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].type())
.fields()[0].comment());
Assertions.assertEquals(
structTypeInside.fields().get(1).doc(),
((com.datastrato.gravitino.rel.types.Types.StructType)
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].type())
.fields()[1].comment());
// check for nullable
Assertions.assertEquals(
structType.fields().get(0).isOptional(),
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[0].nullable());
Assertions.assertEquals(
structType.fields().get(1).isOptional(),
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].nullable());
Assertions.assertEquals(
structTypeInside.fields().get(0).isOptional(),
((com.datastrato.gravitino.rel.types.Types.StructType)
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].type())
.fields()[0].nullable());
Assertions.assertEquals(
structTypeInside.fields().get(1).isOptional(),
((com.datastrato.gravitino.rel.types.Types.StructType)
((com.datastrato.gravitino.rel.types.Types.StructType) gravitinoStructType)
.fields()[1].type())
.fields()[1].nullable());
}

@Test
Expand Down
Loading

0 comments on commit 38ee3f5

Please sign in to comment.