Skip to content

Commit

Permalink
Support UUID type when iceberg to delta
Browse files Browse the repository at this point in the history
  • Loading branch information
danielhumanmod committed Sep 20, 2024
1 parent 15cc0f8 commit 04b2ce2
Show file tree
Hide file tree
Showing 13 changed files with 329 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public enum MetadataValue {
MILLIS
}

public static final String XTABLE_LOGICAL_TYPE = "xtableLogicalType";

/**
* Performs a level-order traversal of the schema and returns a list of all fields. Use this
* method to get a list that includes nested fields. Use {@link InternalSchema#getFields()} when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public enum InternalType {
LIST,
MAP,
UNION,
UUID,
FIXED,
STRING,
BYTES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,13 @@ private InternalSchema toInternalSchema(
break;
}
if (schema.getType() == Schema.Type.FIXED) {
metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, schema.getFixedSize());
newDataType = InternalType.FIXED;
String xtableLogicalType = schema.getProp(InternalSchema.XTABLE_LOGICAL_TYPE);
if ("uuid".equals(xtableLogicalType)) {
newDataType = InternalType.UUID;
} else {
metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, schema.getFixedSize());
newDataType = InternalType.FIXED;
}
} else {
newDataType = InternalType.BYTES;
}
Expand Down Expand Up @@ -435,6 +440,11 @@ private Schema fromInternalSchema(InternalSchema internalSchema, String currentP
Schema.createFixed(
internalSchema.getName(), internalSchema.getComment(), null, fixedSize),
internalSchema);
case UUID:
Schema uuidSchema =
Schema.createFixed(internalSchema.getName(), internalSchema.getComment(), null, 16);
uuidSchema.addProp(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid");
return finalizeSchema(uuidSchema, internalSchema);
default:
throw new UnsupportedSchemaTypeException(
"Encountered unhandled type during InternalSchema to Avro conversion: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

Expand Down Expand Up @@ -73,7 +74,7 @@ public StructType fromInternalSchema(InternalSchema internalSchema) {
field.getName(),
convertFieldType(field),
field.getSchema().isNullable(),
Metadata.empty()))
getMetaData(field.getSchema().getDataType())))
.toArray(StructField[]::new);
return new StructType(fields);
}
Expand All @@ -90,6 +91,7 @@ private DataType convertFieldType(InternalField field) {
return DataTypes.LongType;
case BYTES:
case FIXED:
case UUID:
return DataTypes.BinaryType;
case BOOLEAN:
return DataTypes.BooleanType;
Expand Down Expand Up @@ -142,12 +144,24 @@ private DataType convertFieldType(InternalField field) {
}
}

private Metadata getMetaData(InternalType type) {
if (type == InternalType.UUID) {
return new MetadataBuilder().putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid").build();
} else {
return Metadata.empty();
}
}

public InternalSchema toInternalSchema(StructType structType) {
return toInternalSchema(structType, null, false, null);
return toInternalSchema(structType, null, false, null, null);
}

private InternalSchema toInternalSchema(
DataType dataType, String parentPath, boolean nullable, String comment) {
DataType dataType,
String parentPath,
boolean nullable,
String comment,
Metadata originalMetadata) {
Map<InternalSchema.MetadataKey, Object> metadata = null;
List<InternalField> fields = null;
InternalType type;
Expand All @@ -172,7 +186,12 @@ private InternalSchema toInternalSchema(
type = InternalType.DOUBLE;
break;
case "binary":
type = InternalType.BYTES;
if (originalMetadata.contains(InternalSchema.XTABLE_LOGICAL_TYPE)
&& "uuid".equals(originalMetadata.getString(InternalSchema.XTABLE_LOGICAL_TYPE))) {
type = InternalType.UUID;
} else {
type = InternalType.BYTES;
}
break;
case "long":
type = InternalType.LONG;
Expand Down Expand Up @@ -210,7 +229,8 @@ private InternalSchema toInternalSchema(
field.dataType(),
SchemaUtils.getFullyQualifiedPath(parentPath, field.name()),
field.nullable(),
fieldComment);
fieldComment,
field.metadata());
return InternalField.builder()
.name(field.name())
.fieldId(fieldId)
Expand Down Expand Up @@ -238,6 +258,7 @@ private InternalSchema toInternalSchema(
SchemaUtils.getFullyQualifiedPath(
parentPath, InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME),
arrayType.containsNull(),
null,
null);
InternalField elementField =
InternalField.builder()
Expand All @@ -256,6 +277,7 @@ private InternalSchema toInternalSchema(
SchemaUtils.getFullyQualifiedPath(
parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME),
false,
null,
null);
InternalField keyField =
InternalField.builder()
Expand All @@ -269,6 +291,7 @@ private InternalSchema toInternalSchema(
SchemaUtils.getFullyQualifiedPath(
parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME),
mapType.valueContainsNull(),
null,
null);
InternalField valueField =
InternalField.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ private static PartialResult parseValue(
break;
case FIXED:
case BYTES:
case UUID:
parsedValue = valueAsString.getBytes(StandardCharsets.UTF_8);
break;
case BOOLEAN:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ Type toIcebergType(InternalField field, AtomicInteger fieldIdTracker) {
return Types.DecimalType.of(precision, scale);
case RECORD:
return Types.StructType.of(convertFields(field.getSchema(), fieldIdTracker));
case UUID:
return Types.UUIDType.get();
case MAP:
InternalField key =
field.getSchema().getFields().stream()
Expand Down Expand Up @@ -305,7 +307,7 @@ private InternalSchema fromIcebergType(
InternalSchema.MetadataKey.FIXED_BYTES_SIZE, fixedType.length());
break;
case UUID:
type = InternalType.FIXED;
type = InternalType.UUID;
metadata = Collections.singletonMap(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, 16);
break;
case STRUCT:
Expand Down
16 changes: 16 additions & 0 deletions xtable-core/src/test/java/org/apache/xtable/GenericTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,22 @@ static GenericTable getInstanceWithCustomPartitionConfig(
}
}

static GenericTable getInstanceWithUUIDColumns(
String tableName,
Path tempDir,
SparkSession sparkSession,
JavaSparkContext jsc,
String sourceFormat,
boolean isPartitioned) {
switch (sourceFormat) {
case ICEBERG:
return TestIcebergTable.forSchemaWithUUIDColumns(
tableName, isPartitioned ? "level" : null, tempDir, jsc.hadoopConfiguration());
default:
throw new IllegalArgumentException("Unsupported source format: " + sourceFormat);
}
}

static String getTableName() {
return "test_table_" + UUID.randomUUID().toString().replaceAll("-", "_");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;

import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand All @@ -36,11 +37,13 @@
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -79,6 +82,9 @@

import org.apache.spark.sql.delta.DeltaLog;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;

import org.apache.xtable.conversion.ConversionController;
Expand All @@ -100,6 +106,7 @@ public class ITConversionController {

private static JavaSparkContext jsc;
private static SparkSession sparkSession;
private static final ObjectMapper objectMapper = new ObjectMapper();

@BeforeAll
public static void setupOnce() {
Expand Down Expand Up @@ -139,6 +146,19 @@ private static Stream<Arguments> generateTestParametersForFormatsSyncModesAndPar
return arguments.stream();
}

private static Stream<Arguments> generateTestParametersForUUID() {
List<Arguments> arguments = new ArrayList<>();
for (SyncMode syncMode : SyncMode.values()) {
for (boolean isPartitioned : new boolean[] {true, false}) {
// TODO: Add Hudi UUID support later (https://github.com/apache/incubator-xtable/issues/543)
// Current spark parquet reader can not handle fix-size byte array with UUID logic type
List<String> targetTableFormats = Arrays.asList(DELTA);
arguments.add(Arguments.of(ICEBERG, targetTableFormats, syncMode, isPartitioned));
}
}
return arguments.stream();
}

private static Stream<Arguments> testCasesWithSyncModes() {
return Stream.of(Arguments.of(SyncMode.INCREMENTAL), Arguments.of(SyncMode.FULL));
}
Expand Down Expand Up @@ -260,6 +280,54 @@ public void testVariousOperations(
}
}

// The test content is the simplified version of testVariousOperations
// The difference is that the data source from Iceberg contains UUID columns
@ParameterizedTest
@MethodSource("generateTestParametersForUUID")
public void testVariousOperationsWithUUID(
String sourceTableFormat,
List<String> targetTableFormats,
SyncMode syncMode,
boolean isPartitioned) {
String tableName = getTableName();
ConversionController conversionController = new ConversionController(jsc.hadoopConfiguration());
String partitionConfig = null;
if (isPartitioned) {
partitionConfig = "level:VALUE";
}
ConversionSourceProvider<?> conversionSourceProvider =
getConversionSourceProvider(sourceTableFormat);
List<?> insertRecords;
try (GenericTable table =
GenericTable.getInstanceWithUUIDColumns(
tableName, tempDir, sparkSession, jsc, sourceTableFormat, isPartitioned)) {
insertRecords = table.insertRows(100);

ConversionConfig conversionConfig =
getTableSyncConfig(
sourceTableFormat,
syncMode,
tableName,
table,
targetTableFormats,
partitionConfig,
null);
conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 100);

// Upsert some records and sync again
table.upsertRows(insertRecords.subList(0, 20));
conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 100);

table.deleteRows(insertRecords.subList(30, 50));
conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 80);
checkDatasetEquivalenceWithFilter(
sourceTableFormat, table, targetTableFormats, table.getFilterQuery());
}
}

@ParameterizedTest
@MethodSource("testCasesWithPartitioningAndSyncModes")
public void testConcurrentInsertWritesInSource(
Expand Down Expand Up @@ -812,13 +880,72 @@ private void checkDatasetEquivalence(
// if count is not known ahead of time, ensure datasets are non-empty
assertFalse(dataset1Rows.isEmpty());
}

if (containsUUIDFields(dataset1Rows) && containsUUIDFields(dataset2Rows)) {
compareDatasetWithUUID(dataset1Rows, dataset2Rows);
} else {
assertEquals(
dataset1Rows,
dataset2Rows,
String.format(
"Datasets are not equivalent when reading from Spark. Source: %s, Target: %s",
sourceFormat, format));
}
});
}

/**
* Compares two datasets where dataset1Rows is for Iceberg and dataset2Rows is for other formats
* (such as Delta or Hudi). - For the "uuid_field", if present, the UUID from dataset1 (Iceberg)
* is compared with the Base64-encoded UUID from dataset2 (other formats), after decoding. - For
* all other fields, the values are compared directly. - If neither row contains the "uuid_field",
* the rows are compared as plain JSON strings.
*
* @param dataset1Rows List of JSON rows representing the dataset in Iceberg format (UUID is
* stored as a string).
* @param dataset2Rows List of JSON rows representing the dataset in other formats (UUID might be
* Base64-encoded).
*/
private void compareDatasetWithUUID(List<String> dataset1Rows, List<String> dataset2Rows) {
for (int i = 0; i < dataset1Rows.size(); i++) {
String row1 = dataset1Rows.get(i);
String row2 = dataset2Rows.get(i);
if (row1.contains("uuid_field") && row2.contains("uuid_field")) {
try {
JsonNode node1 = objectMapper.readTree(row1);
JsonNode node2 = objectMapper.readTree(row2);
String uuidStr1 = node1.get("uuid_field").asText();
byte[] bytes = Base64.getDecoder().decode((String) node2.get("uuid_field").asText());
ByteBuffer bb = ByteBuffer.wrap(bytes);
UUID uuid2 = new UUID(bb.getLong(), bb.getLong());
String uuidStr2 = uuid2.toString();
assertEquals(
dataset1Rows,
dataset2Rows,
uuidStr1,
uuidStr2,
String.format(
"Datasets are not equivalent when reading from Spark. Source: %s, Target: %s",
sourceFormat, format));
});
uuidStr1, uuidStr2));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
} else {
assertEquals(
row1,
row2,
String.format(
"Datasets are not equivalent when reading from Spark. Source: %s, Target: %s",
row1, row2));
}
}
}

private boolean containsUUIDFields(List<String> rows) {
for (String row : rows) {
if (row.contains("\"uuid_field\"")) {
return true;
}
}
return false;
}

private static Stream<Arguments> addBasicPartitionCases(Stream<Arguments> arguments) {
Expand Down
Loading

0 comments on commit 04b2ce2

Please sign in to comment.