Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel][Writes] Initial insert APIs and implementation #2944

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel;

import java.util.ConcurrentModificationException;

import io.delta.kernel.annotation.Evolving;

/**
* Thrown when concurrent transaction both attempt to update the same idempotent transaction.
*
* @since 3.2.0
*/
@Evolving
public class ConcurrentTransactionException extends ConcurrentModificationException {
private static final String message = "This error occurs when multiple updates are " +
"using the same transaction identifier to write into this table.\n" +
"Application ID: %s, Attempted version: %s, Latest version in table: %s";

public ConcurrentTransactionException(String appId, long txnVersion, long lastUpdated) {
super(String.format(message, appId, txnVersion, lastUpdated));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel;

import java.util.ConcurrentModificationException;

import io.delta.kernel.annotation.Evolving;

/**
* Thrown when a concurrent transaction has written data after the current transaction read the
* table.
*
* @since 3.2.0
*/
@Evolving
public class ConcurrentWriteException extends ConcurrentModificationException {
public ConcurrentWriteException() {
super("Transaction has encountered a conflict and can not be committed. " +
"Query needs to be re-executed using the latest version of the table.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel;

import java.util.List;
import java.util.Map;
import static java.util.Collections.unmodifiableList;
import static java.util.Collections.unmodifiableMap;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.expressions.Literal;

/**
* Contains the context for writing data related to a partition to Delta table.
*
* @since 3.2.0
*/
@Evolving
public class DataWriteContext {
private final String targetDirectory;
private final Map<String, Literal> partitionValues;
private final List<Column> statsColumns;

/**
* Creates a new instance of WriteContext.
*
* @param partitionPath fully qualified path of the target directory
* @param partitionValues partition values
* @param statsColumns schema of the statistics
*/
public DataWriteContext(
String partitionPath,
Map<String, Literal> partitionValues,
List<Column> statsColumns) {
this.targetDirectory = partitionPath;
this.partitionValues = unmodifiableMap(partitionValues);
this.statsColumns = unmodifiableList(statsColumns);
}

/**
* Returns the target directory where the data should be written.
*
* @return fully qualified path of the target directory
*/
public String getTargetDirectory() {
return targetDirectory;
}

/**
* Returns the partition values for the data to be written.
*
* @return partition values
*/
public Map<String, Literal> getPartitionValues() {
return partitionValues;
}

/**
* Returns the list of columns that need to statistics for the data to be written. Statistics
* collections is optional, but when present can be used to optimize query performance.
*
* @return schema of the statistics
*/
public List<Column> getStatisticsColumns() {
return statsColumns;
}
}
15 changes: 15 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,21 @@ Snapshot getSnapshotAsOfVersion(TableClient tableClient, long versionId)
Snapshot getSnapshotAsOfTimestamp(TableClient tableClient, long millisSinceEpochUTC)
throws TableNotFoundException;

/**
* Create a {@link TransactionBuilder} which can create a {@link Transaction} object to mutate
* the table.
*
* @param tableClient {@link TableClient} instance to use.
* @param engineInfo information about the engine that is making the updates.
* @param operation metadata of operation that is being performed. E.g. "insert", "delete".
* @return {@link TransactionBuilder} instance to build the transaction.
* @since 3.2.0
*/
TransactionBuilder createTransactionBuilder(
TableClient tableClient,
String engineInfo,
String operation);

/**
* Checkpoint the table at given version. It writes a single checkpoint file.
*
Expand Down
199 changes: 199 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Transaction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel;

import java.net.URI;
import java.util.*;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.*;
import io.delta.kernel.expressions.Literal;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.*;

import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.actions.SingleAction;
import io.delta.kernel.internal.fs.Path;
import static io.delta.kernel.internal.TransactionImpl.getStatisticsColumns;
import static io.delta.kernel.internal.data.TransactionStateRow.*;
import static io.delta.kernel.internal.util.PartitionUtils.getTargetDirectory;
import static io.delta.kernel.internal.util.PartitionUtils.validatePartitionValues;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;

/**
* Represents a transaction to mutate a Delta table.
*
* @since 3.2.0
*/
@Evolving
public interface Transaction {
/**
* Get the schema of the data that is being written to the table.
*/
StructType getSchema(TableClient tableClient);

/**
* Get the list of logical names of the partition columns. This helps the connector to do
* physical partitioning of the data before asking the Kernel to stage the data per partition.
*/
List<String> getPartitionColumns(TableClient tableClient);

/**
* Get the state of the transaction. The state helps Kernel do the transformations to logical
* data according to the Delta protocol and table features enabled on the table. The engine
* should use this at the data writer task to transform the logical data that the engine wants
* to write to the table in to physical data that goes in data files using
* {@link Transaction#transformLogicalData(TableClient, Row, CloseableIterator, Map)}
*/
Row getState(TableClient tableClient);

/**
* Commit the transaction including the staged data rows generated by
* {@link Transaction#generateAppendActions}.
*
* @param tableClient {@link TableClient} instance.
* @param stagedData Iterable of data actions to commit. These data actions are generated by
* the {@link Transaction#generateAppendActions(TableClient, Row,
* CloseableIterator, DataWriteContext)}. The {@link CloseableIterable}
* allows the Kernel to access the list of actions multiple times (in case
* of retries to resolve the conflicts due to other writers to the table).
* @return {@link TransactionCommitResult} status of the successful transaction.
* @throws ConcurrentWriteException when the transaction has encountered a non-retryable
* conflicts or exceeded the maximum number of retries reached.
* The connector needs to rerun the query on top of the latest
* table state and retry the transaction.
*/
TransactionCommitResult commit(TableClient tableClient, CloseableIterable<Row> stagedData)
throws ConcurrentWriteException;

/**
* Given the logical data that needs to be written to the table, convert it into the required
* physical data depending upon the table Delta protocol and features enabled on the table.
* Kernel takes care of adding any additional column or removing existing columns that doesn't
* need to be in physical data files. All these transformations are driven by the Delta protocol
* and table features enabled on the table.
* <p>
* The given data should belong to exactly one partition. It is the job of the connector to do
* partitioning of the data before calling the API. Partition values are provided as map of
* column name to partition value (as {@link Literal}). If the table is an un-partitioned table,
* then map should be empty.
*
* @param tableClient {@link TableClient} instance to use.
* @param transactionState The transaction state
* @param dataIter Iterator of logical data to transform to physical data. All the data
* in this iterator should belong to one physical partition.
* @param partitionValues The partition values for the data. If the table is un-partitioned,
* the map should be empty
* @return Iterator of physical data to write to the data files.
*/
static CloseableIterator<FilteredColumnarBatch> transformLogicalData(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make it clearer that this is for a single partition? you can push back and hold that the API docs are sufficient

transformLogicalDataForSinglePartition ?

transformLogicalDataUnpartitioned?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will open this up for discussion.

TableClient tableClient,
Row transactionState,
CloseableIterator<FilteredColumnarBatch> dataIter,
Map<String, Literal> partitionValues) {
validatePartitionValues(
getPartitionColumnsList(transactionState), partitionValues);
// TODO: add support for:
// - enforcing the constraints
// - generating the default value columns
// - generating the generated columns

// Remove the partition columns from the data as they are already part of file metadata
// and are not needed in the data files. TODO: once we start supporting uniform complaint
// tables, we may conditionally skip this step.

// TODO: set the correct schema once writing into column mapping enabled table is supported.
return dataIter.map(
filteredColumnarBatch -> {
ColumnarBatch data = filteredColumnarBatch.getData();
for (String partitionColName : partitionValues.keySet()) {
int partitionColIndex = data.getSchema().indexOf(partitionColName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any checks / validation we can do here? any chance that the index is -1? can we throw a better error than index out of bounds?

checkArgument(
partitionColIndex >= 0,
"Partition column %s not found in the data",
partitionColName);
data = data.withDeletedColumnAt(partitionColIndex);
}
return new FilteredColumnarBatch(data,
filteredColumnarBatch.getSelectionVector());
}
);
}

/**
* Get the context for writing data into a table. The context tells the connector where the data
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for a single partition, right? (or, for unpartitioned)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added more context.

* should be written and what should be the target file size. For partitioned table context is
* generated per partition. So, the connector should call this API for each partition. For
* un-partitioned table, the context is same for all the data.
*
* @param tableClient {@link TableClient} instance to use.
* @param transactionState The transaction state
* @param partitionValues The partition values for the data. If the table is un-partitioned,
* the map should be empty
* @return {@link DataWriteContext} containing metadata about where and how the data for
* partition should be written.
*/
static DataWriteContext getWriteContext(
TableClient tableClient,
Row transactionState,
Map<String, Literal> partitionValues) {
validatePartitionValues(
getPartitionColumnsList(transactionState),
partitionValues);

String targetDirectory = getTargetDirectory(
getTableRoot(transactionState),
getPartitionColumnsList(transactionState),
partitionValues);
return new DataWriteContext(
targetDirectory,
partitionValues,
getStatisticsColumns(tableClient, transactionState));
}

/**
* For given newly data files, generate Delta actions that can be committed in a transaction.
* These data files are the result of writing the data returned by
* {@link Transaction#transformLogicalData} with the context returned by
* {@link Transaction#getWriteContext}.
*
* @param tableClient {@link TableClient} instance.
* @param transactionState State of the transaction.
* @param fileStatusIter Iterator of row objects representing each data file written.
* @param dataWriteContext The context used when writing the data files given in
* {@code fileStatusIter}
* @return {@link CloseableIterator} of {@link Row} representing the actions to commit using
* {@link Transaction#commit}.
*/
static CloseableIterator<Row> generateAppendActions(
TableClient tableClient,
Row transactionState,
CloseableIterator<DataFileStatus> fileStatusIter,
DataWriteContext dataWriteContext) {
URI tableRoot = new Path(getTableRoot(transactionState)).toUri();
return fileStatusIter.map(
dataFileStatus -> {
Row addFileRow = AddFile.convertDataFileStatus(
tableRoot,
dataFileStatus,
dataWriteContext.getPartitionValues(),
true /* dataChange */);
return SingleAction.createAddFileSingleAction(addFileRow);
}
);
}
}
Loading
Loading