Skip to content

Commit

Permalink
[3.2][Kernel][Writes] Add support of inserting data into tables (#3030)
Browse files Browse the repository at this point in the history
(Split from #2944)

Adds support for inserting data into the table.

Tests for inserting into partitioned and unpartitioned tables with
various combinations of the types, partition values etc. Also tests the
checkpoint is ready to create.
  • Loading branch information
vkorukanti committed May 5, 2024
1 parent fe5d931 commit 6453fe5
Show file tree
Hide file tree
Showing 20 changed files with 1,085 additions and 86 deletions.
84 changes: 79 additions & 5 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,31 @@
*/
package io.delta.kernel;

import java.net.URI;
import java.util.List;
import java.util.Map;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.data.*;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.ConcurrentWriteException;
import io.delta.kernel.expressions.Literal;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.*;

import io.delta.kernel.internal.DataWriteContextImpl;
import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.actions.SingleAction;
import io.delta.kernel.internal.fs.Path;
import static io.delta.kernel.internal.DeltaErrors.dataSchemaMismatch;
import static io.delta.kernel.internal.DeltaErrors.partitionColumnMissingInData;
import static io.delta.kernel.internal.TransactionImpl.getStatisticsColumns;
import static io.delta.kernel.internal.data.TransactionStateRow.*;
import static io.delta.kernel.internal.util.PartitionUtils.getTargetDirectory;
import static io.delta.kernel.internal.util.PartitionUtils.validateAndSanitizePartitionValues;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static io.delta.kernel.internal.util.SchemaUtils.findColIndex;

/**
* Represents a transaction to mutate a Delta table.
*
Expand Down Expand Up @@ -104,7 +117,41 @@ static CloseableIterator<FilteredColumnarBatch> transformLogicalData(
Row transactionState,
CloseableIterator<FilteredColumnarBatch> dataIter,
Map<String, Literal> partitionValues) {
throw new UnsupportedOperationException("Not implemented yet");

// Note: `partitionValues` are not used as of now in this API, but taking the partition
// values as input forces the connector to not pass data from multiple partitions this
// API in a single call.
StructType tableSchema = getLogicalSchema(engine, transactionState);
List<String> partitionColNames = getPartitionColumnsList(transactionState);
validateAndSanitizePartitionValues(tableSchema, partitionColNames, partitionValues);

// TODO: add support for:
// - enforcing the constraints
// - generating the default value columns
// - generating the generated columns

// Remove the partition columns from the data as they are already part of file metadata
// and are not needed in the data files. TODO: once we start supporting uniform complaint
// tables, we may conditionally skip this step.

// TODO: set the correct schema once writing into column mapping enabled table is supported.
String tablePath = getTablePath(transactionState);
return dataIter.map(
filteredBatch -> {
ColumnarBatch data = filteredBatch.getData();
if (!data.getSchema().equals(tableSchema)) {
throw dataSchemaMismatch(tablePath, tableSchema, data.getSchema());
}
for (String partitionColName : partitionColNames) {
int partitionColIndex = findColIndex(data.getSchema(), partitionColName);
if (partitionColIndex < 0) {
throw partitionColumnMissingInData(tablePath, partitionColName);
}
data = data.withDeletedColumnAt(partitionColIndex);
}
return new FilteredColumnarBatch(data, filteredBatch.getSelectionVector());
}
);
}

/**
Expand All @@ -124,7 +171,21 @@ static DataWriteContext getWriteContext(
Engine engine,
Row transactionState,
Map<String, Literal> partitionValues) {
throw new UnsupportedOperationException("Not implemented yet");
StructType tableSchema = getLogicalSchema(engine, transactionState);
List<String> partitionColNames = getPartitionColumnsList(transactionState);

partitionValues =
validateAndSanitizePartitionValues(tableSchema, partitionColNames, partitionValues);

String targetDirectory = getTargetDirectory(
getTablePath(transactionState),
partitionColNames,
partitionValues);

return new DataWriteContextImpl(
targetDirectory,
partitionValues,
getStatisticsColumns(engine, transactionState));
}

/**
Expand All @@ -146,6 +207,19 @@ static CloseableIterator<Row> generateAppendActions(
Row transactionState,
CloseableIterator<DataFileStatus> fileStatusIter,
DataWriteContext dataWriteContext) {
throw new UnsupportedOperationException("Not implemented yet");
checkArgument(dataWriteContext instanceof DataWriteContextImpl,
"DataWriteContext is not created by the `Transaction.getWriteContext()`");

URI tableRoot = new Path(getTablePath(transactionState)).toUri();
return fileStatusIter.map(
dataFileStatus -> {
Row addFileRow = AddFile.convertDataFileStatus(
tableRoot,
dataFileStatus,
((DataWriteContextImpl) dataWriteContext).getPartitionValues(),
true /* dataChange */);
return SingleAction.createAddFileSingleAction(addFileRow);
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ CloseableIterator<ColumnarBatch> readJsonFiles(
* <ul>
* <li>Primitive types: @code boolean, byte, short, int, long, float, double, string}</li>
* <li>{@code struct}: any element whose value is null is not written to file</li>
* <li>{@code map}: only a {@code map} with {@code string} key type is supported</li>
* <li>{@code map}: only a {@code map} with {@code string} key type is supported. If an
* entry value is {@code null}, it should be written to the file.</li>
* <li>{@code array}: {@code null} value elements are written to file</li>
* </ul>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,22 @@ public static KernelException tableAlreadyExists(String tablePath, String messag
return new TableAlreadyExistsException(tablePath, message);
}

public static KernelException dataSchemaMismatch(
String tablePath,
StructType tableSchema,
StructType dataSchema) {
String msgT = "The schema of the data to be written to the table doesn't match " +
"the table schema. \nTable: %s\nTable schema: %s, \nData schema: %s";
return new KernelException(format(msgT, tablePath, tableSchema, dataSchema));
}

public static KernelException partitionColumnMissingInData(
String tablePath,
String partitionColumn) {
String msgT = "Missing partition column '%s' in the data to be written to the table '%s'.";
return new KernelException(format(msgT, partitionColumn, tablePath));
}

/* ------------------------ HELPER METHODS ----------------------------- */
private static String formatTimestamp(long millisSinceEpochUTC) {
return new Timestamp(millisSinceEpochUTC).toInstant().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.ConcurrentWriteException;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterable;
import io.delta.kernel.utils.CloseableIterator;
Expand Down Expand Up @@ -158,7 +159,9 @@ private boolean isReadyForCheckpoint(long newVersion) {
}

private boolean isBlindAppend() {
return isNewTable; // Later can add more conditions to determine if it is a blind append
// For now, Kernel just supports blind append.
// Change this when read-after-write is supported.
return true;
}

private Map<String, String> getOperationParameters() {
Expand All @@ -171,4 +174,16 @@ private Map<String, String> getOperationParameters() {
}
return Collections.emptyMap();
}

/**
* Get the part of the schema of the table that needs the statistics to be collected per file.
*
* @param engine {@link Engine} instance to use.
* @param transactionState State of the transaction
* @return
*/
public static List<Column> getStatisticsColumns(Engine engine, Row transactionState) {
// TODO: implement this once we start supporting collecting stats
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,21 @@
*/
package io.delta.kernel.internal.actions;

import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.IntStream;
import static java.util.stream.Collectors.toMap;

import io.delta.kernel.data.Row;
import io.delta.kernel.expressions.Literal;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.DataFileStatus;

import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.internal.fs.Path;
import static io.delta.kernel.internal.util.InternalUtils.relativizePath;
import static io.delta.kernel.internal.util.PartitionUtils.serializePartitionMap;

/**
* Delta log action representing an `AddFile`
Expand Down Expand Up @@ -57,4 +71,35 @@ public class AddFile {
true /* nullable */);
// There are more fields which are added when row-id tracking and clustering is enabled.
// When Kernel starts supporting row-ids and clustering, we should add those fields here.

private static final Map<String, Integer> COL_NAME_TO_ORDINAL =
IntStream.range(0, FULL_SCHEMA.length())
.boxed()
.collect(toMap(i -> FULL_SCHEMA.at(i).getName(), i -> i));

/**
* Utility to generate `AddFile` row from the given {@link DataFileStatus} and partition values.
*/
public static Row convertDataFileStatus(
URI tableRoot,
DataFileStatus dataFileStatus,
Map<String, Literal> partitionValues,
boolean dataChange) {
Path filePath = new Path(dataFileStatus.getPath());
Map<Integer, Object> valueMap = new HashMap<>();
valueMap.put(COL_NAME_TO_ORDINAL.get("path"),
relativizePath(filePath, tableRoot).toString());
valueMap.put(COL_NAME_TO_ORDINAL.get("partitionValues"),
serializePartitionMap(partitionValues));
valueMap.put(COL_NAME_TO_ORDINAL.get("size"), dataFileStatus.getSize());
valueMap.put(COL_NAME_TO_ORDINAL.get("modificationTime"),
dataFileStatus.getModificationTime());
valueMap.put(COL_NAME_TO_ORDINAL.get("dataChange"), dataChange);
if (dataFileStatus.getStatistics().isPresent()) {
valueMap.put(COL_NAME_TO_ORDINAL.get("stats"),
dataFileStatus.getStatistics().get().serializeAsJson());
}
// any fields not present in the valueMap are considered null
return new GenericRow(FULL_SCHEMA, valueMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

public class TransactionStateRow extends GenericRow {
private static final StructType SCHEMA = new StructType()
.add("logicalSchemaString", StringType.STRING)
.add("partitionColumns", new ArrayType(StringType.STRING, false))
.add("tablePath", StringType.STRING);

Expand All @@ -39,6 +40,7 @@ public class TransactionStateRow extends GenericRow {

public static TransactionStateRow of(Metadata metadata, String tablePath) {
HashMap<Integer, Object> valueMap = new HashMap<>();
valueMap.put(COL_NAME_TO_ORDINAL.get("logicalSchemaString"), metadata.getSchemaString());
valueMap.put(COL_NAME_TO_ORDINAL.get("partitionColumns"), metadata.getPartitionColumns());
valueMap.put(COL_NAME_TO_ORDINAL.get("tablePath"), tablePath);
return new TransactionStateRow(valueMap);
Expand All @@ -48,11 +50,24 @@ private TransactionStateRow(HashMap<Integer, Object> valueMap) {
super(SCHEMA, valueMap);
}

/**
* Get the logical schema of the table from the transaction state {@link Row} returned by
* {@link Transaction#getTransactionState(Engine)}}
*
* @param engine {@link Engine} instance to use for parsing the schema
* @param transactionState Transaction state state {@link Row}
* @return Logical schema of the table as {@link StructType}
*/
public static StructType getLogicalSchema(Engine engine, Row transactionState) {
return engine.getJsonHandler().deserializeStructType(
transactionState.getString(COL_NAME_TO_ORDINAL.get("logicalSchemaString")));
}

/**
* Get the list of partition column names from the write state {@link Row} returned by
* {@link Transaction#getTransactionState(Engine)}
*
* @param transactionState Scan state {@link Row}
* @param transactionState Transaction state state {@link Row}
* @return List of partition column names according to the scan state.
*/
public static List<String> getPartitionColumnsList(Row transactionState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
package io.delta.kernel.internal.util;

import java.io.IOException;
import java.net.URI;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
Expand All @@ -31,6 +35,8 @@
import io.delta.kernel.types.StringType;
import io.delta.kernel.utils.CloseableIterator;

import io.delta.kernel.internal.fs.Path;

public class InternalUtils {
private static final LocalDate EPOCH_DAY = LocalDate.ofEpochDay(0);
private static final LocalDateTime EPOCH_DATETIME =
Expand Down Expand Up @@ -152,4 +158,26 @@ public static ColumnVector requireNonNull(ColumnVector vector, int rowId, String
}
return vector;
}

/**
* Relativize the given child path with respect to the given root URI. If the child path is
* already a relative path, it is returned as is.
*
* @param child
* @param root Root directory as URI. Relativization is done with respect to this root.
* The relativize operation requires conversion to URI, so the caller is expected to
* convert the root directory to URI once and use it for relativizing for multiple
* child paths.
* @return
*/
public static Path relativizePath(Path child, URI root) {
if (child.isAbsolute()) {
return new Path(root.relativize(child.toUri()));
}
return child;
}

public static Set<String> toLowerCaseSet(Collection<String> set) {
return set.stream().map(String::toLowerCase).collect(Collectors.toSet());
}
}
Loading

0 comments on commit 6453fe5

Please sign in to comment.