diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/ConcurrentTransactionException.java b/kernel/kernel-api/src/main/java/io/delta/kernel/ConcurrentTransactionException.java new file mode 100644 index 00000000000..1f28547a34c --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/ConcurrentTransactionException.java @@ -0,0 +1,36 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel; + +import java.util.ConcurrentModificationException; + +import io.delta.kernel.annotation.Evolving; + +/** + * Thrown when concurrent transaction both attempt to update the same idempotent transaction. + * + * @since 3.2.0 + */ +@Evolving +public class ConcurrentTransactionException extends ConcurrentModificationException { + private static final String message = "This error occurs when multiple updates are " + + "using the same transaction identifier to write into this table.\n" + + "Application ID: %s, Attempted version: %s, Latest version in table: %s"; + + public ConcurrentTransactionException(String appId, long txnVersion, long lastUpdated) { + super(String.format(message, appId, txnVersion, lastUpdated)); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/ConcurrentWriteException.java b/kernel/kernel-api/src/main/java/io/delta/kernel/ConcurrentWriteException.java new file mode 100644 index 00000000000..f47c82119e9 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/ConcurrentWriteException.java @@ -0,0 +1,34 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel; + +import java.util.ConcurrentModificationException; + +import io.delta.kernel.annotation.Evolving; + +/** + * Thrown when a concurrent transaction has written data after the current transaction read the + * table. + * + * @since 3.2.0 + */ +@Evolving +public class ConcurrentWriteException extends ConcurrentModificationException { + public ConcurrentWriteException() { + super("Transaction has encountered a conflict and can not be committed. " + + "Query needs to be re-executed using the latest version of the table."); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/DataWriteContext.java b/kernel/kernel-api/src/main/java/io/delta/kernel/DataWriteContext.java new file mode 100644 index 00000000000..8137d8dbfb6 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/DataWriteContext.java @@ -0,0 +1,81 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel; + +import java.util.List; +import java.util.Map; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; + +import io.delta.kernel.annotation.Evolving; +import io.delta.kernel.expressions.Column; +import io.delta.kernel.expressions.Literal; + +/** + * Contains the context for writing data related to a partition to Delta table. + * + * @since 3.2.0 + */ +@Evolving +public class DataWriteContext { + private final String targetDirectory; + private final Map partitionValues; + private final List statsColumns; + + /** + * Creates a new instance of WriteContext. + * + * @param partitionPath fully qualified path of the target directory + * @param partitionValues partition values + * @param statsColumns schema of the statistics + */ + public DataWriteContext( + String partitionPath, + Map partitionValues, + List statsColumns) { + this.targetDirectory = partitionPath; + this.partitionValues = unmodifiableMap(partitionValues); + this.statsColumns = unmodifiableList(statsColumns); + } + + /** + * Returns the target directory where the data should be written. + * + * @return fully qualified path of the target directory + */ + public String getTargetDirectory() { + return targetDirectory; + } + + /** + * Returns the partition values for the data to be written. + * + * @return partition values + */ + public Map getPartitionValues() { + return partitionValues; + } + + /** + * Returns the list of columns that need to statistics for the data to be written. Statistics + * collections is optional, but when present can be used to optimize query performance. + * + * @return schema of the statistics + */ + public List getStatisticsColumns() { + return statsColumns; + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/Table.java b/kernel/kernel-api/src/main/java/io/delta/kernel/Table.java index c039a17dade..7895e1d1ae5 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/Table.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/Table.java @@ -114,6 +114,21 @@ Snapshot getSnapshotAsOfVersion(TableClient tableClient, long versionId) Snapshot getSnapshotAsOfTimestamp(TableClient tableClient, long millisSinceEpochUTC) throws TableNotFoundException; + /** + * Create a {@link TransactionBuilder} which can create a {@link Transaction} object to mutate + * the table. + * + * @param tableClient {@link TableClient} instance to use. + * @param engineInfo information about the engine that is making the updates. + * @param operation metadata of operation that is being performed. E.g. "insert", "delete". + * @return {@link TransactionBuilder} instance to build the transaction. + * @since 3.2.0 + */ + TransactionBuilder createTransactionBuilder( + TableClient tableClient, + String engineInfo, + String operation); + /** * Checkpoint the table at given version. It writes a single checkpoint file. * diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/Transaction.java b/kernel/kernel-api/src/main/java/io/delta/kernel/Transaction.java new file mode 100644 index 00000000000..055a6668792 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/Transaction.java @@ -0,0 +1,199 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel; + +import java.net.URI; +import java.util.*; + +import io.delta.kernel.annotation.Evolving; +import io.delta.kernel.client.TableClient; +import io.delta.kernel.data.*; +import io.delta.kernel.expressions.Literal; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.*; + +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.actions.SingleAction; +import io.delta.kernel.internal.fs.Path; +import static io.delta.kernel.internal.TransactionImpl.getStatisticsColumns; +import static io.delta.kernel.internal.data.TransactionStateRow.*; +import static io.delta.kernel.internal.util.PartitionUtils.getTargetDirectory; +import static io.delta.kernel.internal.util.PartitionUtils.validatePartitionValues; +import static io.delta.kernel.internal.util.Preconditions.checkArgument; + +/** + * Represents a transaction to mutate a Delta table. + * + * @since 3.2.0 + */ +@Evolving +public interface Transaction { + /** + * Get the schema of the data that is being written to the table. + */ + StructType getSchema(TableClient tableClient); + + /** + * Get the list of logical names of the partition columns. This helps the connector to do + * physical partitioning of the data before asking the Kernel to stage the data per partition. + */ + List getPartitionColumns(TableClient tableClient); + + /** + * Get the state of the transaction. The state helps Kernel do the transformations to logical + * data according to the Delta protocol and table features enabled on the table. The engine + * should use this at the data writer task to transform the logical data that the engine wants + * to write to the table in to physical data that goes in data files using + * {@link Transaction#transformLogicalData(TableClient, Row, CloseableIterator, Map)} + */ + Row getState(TableClient tableClient); + + /** + * Commit the transaction including the staged data rows generated by + * {@link Transaction#generateAppendActions}. + * + * @param tableClient {@link TableClient} instance. + * @param stagedData Iterable of data actions to commit. These data actions are generated by + * the {@link Transaction#generateAppendActions(TableClient, Row, + * CloseableIterator, DataWriteContext)}. The {@link CloseableIterable} + * allows the Kernel to access the list of actions multiple times (in case + * of retries to resolve the conflicts due to other writers to the table). + * @return {@link TransactionCommitResult} status of the successful transaction. + * @throws ConcurrentWriteException when the transaction has encountered a non-retryable + * conflicts or exceeded the maximum number of retries reached. + * The connector needs to rerun the query on top of the latest + * table state and retry the transaction. + */ + TransactionCommitResult commit(TableClient tableClient, CloseableIterable stagedData) + throws ConcurrentWriteException; + + /** + * Given the logical data that needs to be written to the table, convert it into the required + * physical data depending upon the table Delta protocol and features enabled on the table. + * Kernel takes care of adding any additional column or removing existing columns that doesn't + * need to be in physical data files. All these transformations are driven by the Delta protocol + * and table features enabled on the table. + *

+ * The given data should belong to exactly one partition. It is the job of the connector to do + * partitioning of the data before calling the API. Partition values are provided as map of + * column name to partition value (as {@link Literal}). If the table is an un-partitioned table, + * then map should be empty. + * + * @param tableClient {@link TableClient} instance to use. + * @param transactionState The transaction state + * @param dataIter Iterator of logical data to transform to physical data. All the data + * in this iterator should belong to one physical partition. + * @param partitionValues The partition values for the data. If the table is un-partitioned, + * the map should be empty + * @return Iterator of physical data to write to the data files. + */ + static CloseableIterator transformLogicalData( + TableClient tableClient, + Row transactionState, + CloseableIterator dataIter, + Map partitionValues) { + validatePartitionValues( + getPartitionColumnsList(transactionState), partitionValues); + // TODO: add support for: + // - enforcing the constraints + // - generating the default value columns + // - generating the generated columns + + // Remove the partition columns from the data as they are already part of file metadata + // and are not needed in the data files. TODO: once we start supporting uniform complaint + // tables, we may conditionally skip this step. + + // TODO: set the correct schema once writing into column mapping enabled table is supported. + return dataIter.map( + filteredColumnarBatch -> { + ColumnarBatch data = filteredColumnarBatch.getData(); + for (String partitionColName : partitionValues.keySet()) { + int partitionColIndex = data.getSchema().indexOf(partitionColName); + checkArgument( + partitionColIndex >= 0, + "Partition column %s not found in the data", + partitionColName); + data = data.withDeletedColumnAt(partitionColIndex); + } + return new FilteredColumnarBatch(data, + filteredColumnarBatch.getSelectionVector()); + } + ); + } + + /** + * Get the context for writing data into a table. The context tells the connector where the data + * should be written and what should be the target file size. For partitioned table context is + * generated per partition. So, the connector should call this API for each partition. For + * un-partitioned table, the context is same for all the data. + * + * @param tableClient {@link TableClient} instance to use. + * @param transactionState The transaction state + * @param partitionValues The partition values for the data. If the table is un-partitioned, + * the map should be empty + * @return {@link DataWriteContext} containing metadata about where and how the data for + * partition should be written. + */ + static DataWriteContext getWriteContext( + TableClient tableClient, + Row transactionState, + Map partitionValues) { + validatePartitionValues( + getPartitionColumnsList(transactionState), + partitionValues); + + String targetDirectory = getTargetDirectory( + getTableRoot(transactionState), + getPartitionColumnsList(transactionState), + partitionValues); + return new DataWriteContext( + targetDirectory, + partitionValues, + getStatisticsColumns(tableClient, transactionState)); + } + + /** + * For given newly data files, generate Delta actions that can be committed in a transaction. + * These data files are the result of writing the data returned by + * {@link Transaction#transformLogicalData} with the context returned by + * {@link Transaction#getWriteContext}. + * + * @param tableClient {@link TableClient} instance. + * @param transactionState State of the transaction. + * @param fileStatusIter Iterator of row objects representing each data file written. + * @param dataWriteContext The context used when writing the data files given in + * {@code fileStatusIter} + * @return {@link CloseableIterator} of {@link Row} representing the actions to commit using + * {@link Transaction#commit}. + */ + static CloseableIterator generateAppendActions( + TableClient tableClient, + Row transactionState, + CloseableIterator fileStatusIter, + DataWriteContext dataWriteContext) { + URI tableRoot = new Path(getTableRoot(transactionState)).toUri(); + return fileStatusIter.map( + dataFileStatus -> { + Row addFileRow = AddFile.convertDataFileStatus( + tableRoot, + dataFileStatus, + dataWriteContext.getPartitionValues(), + true /* dataChange */); + return SingleAction.createAddFileSingleAction(addFileRow); + } + ); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionBuilder.java b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionBuilder.java new file mode 100644 index 00000000000..c625f926fcf --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionBuilder.java @@ -0,0 +1,90 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel; + +import java.util.Set; + +import io.delta.kernel.annotation.Evolving; +import io.delta.kernel.client.TableClient; +import io.delta.kernel.expressions.Predicate; +import io.delta.kernel.types.StructType; + +/** + * Builder for creating a {@link Transaction} to mutate a Delta table. + * + * @since 3.2.0 + */ +@Evolving +public interface TransactionBuilder { + /** + * Set the new schema of the table. This is used when creating the table for the first time. + * + * @param tableClient {@link TableClient} instance to use. + * @param schema The new schema of the table. + * @return + */ + TransactionBuilder withSchema(TableClient tableClient, StructType schema); + + /** + * Set the partition columns of the table. Partition columns can only be set when creating the + * table for the first time. Subsequent updates to the partition columns are not allowed. + * + * @param tableClient {@link TableClient} instance to use. + * @param partitionColumns The partition columns of the table. These should be a subset of the + * columns in the schema. + * @return + */ + TransactionBuilder withPartitionColumns( + TableClient tableClient, + Set partitionColumns); + + /** + * Set the transaction identifier for idempotent writes. + * + * @param tableClient {@link TableClient} instance to use. + * @param applicationId The application ID that is writing to the table. + * @param transactionVersion The version of the transaction. This is used to ensure that the + * @return + */ + TransactionBuilder withTransactionId( + TableClient tableClient, + String applicationId, + long transactionVersion); + + /** + * Set the predicate of what qualifying files from and what version of the table are read in + * order to generate the updates for this transaction. + * + * @param tableClient {@link TableClient} instance + * @param readVersion What version of the table is read for generating the updates? + * @param predicate What set of files are read for generating the updates + * @return + */ + TransactionBuilder withReadSet( + TableClient tableClient, + long readVersion, + Predicate predicate); + + /** + * Build the transaction. + * + * @param tableClient {@link TableClient} instance to use. + * @throws ConcurrentTransactionException if the table already has a committed transaction with + * the same given transaction identifier (using + * #withTransactionId) as this transaction. + */ + Transaction build(TableClient tableClient); +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java new file mode 100644 index 00000000000..d81f1c476d0 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java @@ -0,0 +1,58 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel; + +import io.delta.kernel.annotation.Evolving; +import io.delta.kernel.client.TableClient; +import io.delta.kernel.utils.CloseableIterable; + +/** + * Contains the result of a successful transaction commit. Returned by + * {@link Transaction#commit(TableClient, CloseableIterable)}. + * + * @since 3.2.0 + */ +@Evolving +public class TransactionCommitResult { + private final long version; + private final boolean isReadyForCheckpoint; + + public TransactionCommitResult(long version, boolean isReadyForCheckpoint) { + this.version = version; + this.isReadyForCheckpoint = isReadyForCheckpoint; + } + + /** + * Contains the version of the transaction committed as. + * + * @return version the transaction is committed as. + */ + public long getVersion() { + return version; + } + + /** + * Is the table ready for checkpoint (i.e. there are enough commits since the last checkpoint)? + * If yes the connector can choose to checkpoint as the version the transaction is committed as + * using {@link Table#checkpoint(TableClient, long)} + * + * @return Is the table ready for checkpointing? + */ + public boolean isReadyForCheckpoint() { + return isReadyForCheckpoint; + } +} + diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java index 0363d1cfb09..834faa29b0e 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java @@ -28,13 +28,14 @@ import io.delta.kernel.internal.replay.CreateCheckpointIterator; import io.delta.kernel.internal.replay.LogReplay; import io.delta.kernel.internal.snapshot.LogSegment; -import io.delta.kernel.internal.snapshot.SnapshotHint; import static io.delta.kernel.internal.TableConfig.TOMBSTONE_RETENTION; + /** * Implementation of {@link Snapshot}. */ public class SnapshotImpl implements Snapshot { + private final Path logPath; private final Path dataPath; private final long version; private final LogReplay logReplay; @@ -45,23 +46,17 @@ public class SnapshotImpl implements Snapshot { public SnapshotImpl( Path logPath, Path dataPath, - long version, LogSegment logSegment, - TableClient tableClient, - long timestamp, - Optional snapshotHint) { + LogReplay logReplay, + Protocol protocol, + Metadata metadata) { + this.logPath = logPath; this.dataPath = dataPath; - this.version = version; + this.version = logSegment.version; this.logSegment = logSegment; - this.logReplay = new LogReplay( - logPath, - dataPath, - version, - tableClient, - logSegment, - snapshotHint); - this.protocol = logReplay.getProtocol(); - this.metadata = logReplay.getMetadata(); + this.logReplay = logReplay; + this.protocol = protocol; + this.metadata = metadata; } @Override @@ -122,4 +117,12 @@ public Optional getLatestTransactionVersion(String applicationId) { public LogSegment getLogSegment() { return logSegment; } + + public Path getLogPath() { + return logPath; + } + + public Path getDataPath() { + return dataPath; + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java index bc3ed026ec6..736c3d76c33 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java @@ -48,6 +48,18 @@ public class TableConfig { " and years are not accepted. You may specify '365 days' for a year instead." ); + /** + * How often to checkpoint the delta log? For every N (this config) commits to the log, we will + * suggest write out a checkpoint file that can speed up the Delta table state reconstruction. + */ + public static final TableConfig CHECKPOINT_INTERVAL = new TableConfig<>( + "delta.checkpointInterval", + "10", + Integer::valueOf, + value -> value > 0, + "needs to be a positive integer." + ); + private final String key; private final String defaultValue; private final Function fromString; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java index 03124b02479..11ab5eac474 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java @@ -18,6 +18,8 @@ import java.io.IOException; import io.delta.kernel.*; +import io.delta.kernel.TableNotFoundException; +import io.delta.kernel.TransactionBuilder; import io.delta.kernel.client.TableClient; import io.delta.kernel.internal.fs.Path; @@ -71,4 +73,20 @@ public void checkpoint(TableClient tableClient, long version) throws TableNotFoundException, CheckpointAlreadyExistsException, IOException { snapshotManager.checkpoint(tableClient, version); } + + @Override + public TransactionBuilder createTransactionBuilder( + TableClient tableClient, + String engineInfo, + String operation) { + return new TransactionBuilderImpl(this, engineInfo, operation); + } + + protected Path getDataPath() { + return new Path(tablePath); + } + + protected Path getLogPath() { + return new Path(tablePath, "_delta_log"); + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java new file mode 100644 index 00000000000..75c2893ae03 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java @@ -0,0 +1,237 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal; + +import java.util.*; +import static java.util.Objects.requireNonNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.delta.kernel.*; +import io.delta.kernel.client.TableClient; +import io.delta.kernel.expressions.Predicate; +import io.delta.kernel.types.*; + +import io.delta.kernel.internal.actions.*; +import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.replay.LogReplay; +import io.delta.kernel.internal.snapshot.*; +import io.delta.kernel.internal.util.Tuple2; +import static io.delta.kernel.internal.TransactionImpl.DEFAULT_READ_VERSION; +import static io.delta.kernel.internal.TransactionImpl.DEFAULT_WRITE_VERSION; +import static io.delta.kernel.internal.util.VectorUtils.stringArrayValue; +import static io.delta.kernel.internal.util.VectorUtils.stringStringMapValue; + +public class TransactionBuilderImpl implements TransactionBuilder { + private static final Logger logger = LoggerFactory.getLogger(TransactionBuilderImpl.class); + + private final long currentTimeMillis = System.currentTimeMillis(); + private final TableImpl table; + private final String engineInfo; + private final String operation; + private Optional schema = Optional.empty(); + private Optional> partitionColumns = Optional.empty(); + private Optional readPredicate = Optional.empty(); + private Optional readVersion = Optional.empty(); + private Optional transactionId = Optional.empty(); + + public TransactionBuilderImpl(TableImpl table, String engineInfo, String operation) { + this.table = table; + this.engineInfo = engineInfo; + this.operation = operation; + } + + @Override + public TransactionBuilder withSchema(TableClient tableClient, StructType newSchema) { + this.schema = Optional.of(newSchema); + // TODO: this needs to verified + return this; + } + + @Override + public TransactionBuilder withPartitionColumns( + TableClient tableClient, + Set partitionColumns) { + if (!partitionColumns.isEmpty()) { + this.partitionColumns = Optional.of(partitionColumns); + } + return this; + } + + @Override + public TransactionBuilder withReadSet( + TableClient tableClient, + long readVersion, + Predicate readPredicate) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + @Override + public TransactionBuilder withTransactionId( + TableClient tableClient, + String applicationId, + long transactionVersion) { + SetTransaction txnId = new SetTransaction( + requireNonNull(applicationId, "applicationId is null"), + transactionVersion, + Optional.of(currentTimeMillis)); + this.transactionId = Optional.of(txnId); + return this; + } + + @Override + public Transaction build(TableClient tableClient) { + SnapshotImpl snapshot; + try { + snapshot = (SnapshotImpl) table.getLatestSnapshot(tableClient); + } catch (TableNotFoundException tblf) { + logger.info( + "Table {} doesn't exist yet. Trying to create a new table.", + table.getPath(tableClient)); + schema.orElseThrow(() -> new IllegalArgumentException( + "Table doesn't exist yet. Must provide a new schema.")); + // Table doesn't exist yet. Create an initial snapshot with the new schema. + Metadata metadata = getInitialMetadata(); + Protocol protocol = getInitialProtocol(); + LogReplay logReplay = getEmptyLogReplay(tableClient, metadata, protocol); + snapshot = new InitialSnapshot( + table.getLogPath(), + table.getDataPath(), + logReplay, + metadata, + protocol); + } + + validate(tableClient, snapshot); + + return new TransactionImpl( + table.getDataPath(), + table.getLogPath(), + snapshot, + engineInfo, + operation, + schema, + snapshot.getProtocol(), + snapshot.getMetadata(), + readVersion, + readPredicate, + transactionId); + } + + /** + * Validate the given parameters for the transaction. + */ + private void validate(TableClient tableClient, SnapshotImpl snapshot) { + // Validate the table has no features that Kernel doesn't yet support writing into it. + TableFeatures.validateWriteSupportedTable( + snapshot.getProtocol(), + snapshot.getMetadata(), + snapshot.getMetadata().getSchema()); + // TODO: if a new schema is given make sure it is valid + + long currentSnapshotVersion = snapshot.getVersion(tableClient); + if (readVersion.isPresent() && readVersion.get() != currentSnapshotVersion) { + throw new IllegalArgumentException( + "Read version doesn't match the current snapshot version. " + + "Read version: " + readVersion.get() + + " Current snapshot version: " + currentSnapshotVersion); + } + if (currentSnapshotVersion >= 0 && partitionColumns.isPresent()) { + throw new IllegalArgumentException( + "Partition columns can only be set on a table that doesn't exist yet."); + } + + if (transactionId.isPresent()) { + Optional lastTxnVersion = + snapshot.getLatestTransactionVersion(transactionId.get().getAppId()); + if (lastTxnVersion.isPresent() && + lastTxnVersion.get() >= transactionId.get().getVersion()) { + throw new ConcurrentTransactionException( + transactionId.get().getAppId(), + transactionId.get().getVersion(), + lastTxnVersion.get()); + } + } + } + + + private class InitialSnapshot extends SnapshotImpl { + InitialSnapshot( + Path logPath, + Path dataPath, + LogReplay logReplay, + Metadata metadata, + Protocol protocol) { + super( + logPath, + dataPath, + LogSegment.empty(table.getLogPath()), + logReplay, + protocol, + metadata); + } + } + + private LogReplay getEmptyLogReplay( + TableClient tableClient, + Metadata metadata, + Protocol protocol) { + return new LogReplay( + table.getLogPath(), + table.getDataPath(), + -1, + tableClient, + LogSegment.empty(table.getLogPath()), + Optional.empty()) { + + @Override + protected Tuple2 loadTableProtocolAndMetadata( + Optional snapshotHint, long snapshotVersion) { + return new Tuple2<>(protocol, metadata); + } + + @Override + public Optional getLatestTransactionIdentifier(String applicationId) { + return Optional.empty(); + } + }; + } + + private Metadata getInitialMetadata() { + return new Metadata( + java.util.UUID.randomUUID().toString(), /* id */ + Optional.empty(), /* name */ + Optional.empty(), /* description */ + new Format(), /* format */ + schema.get().toJson(), /* schemaString */ + schema.get(), /* schema */ + stringArrayValue(partitionColumns.isPresent() ? + new ArrayList<>(partitionColumns.get()) : + Collections.emptyList()), /* partitionColumns */ + Optional.of(currentTimeMillis), /* createdTime */ + stringStringMapValue(Collections.emptyMap()) /* configuration */ + ); + } + + private Protocol getInitialProtocol() { + return new Protocol( + DEFAULT_READ_VERSION, + DEFAULT_WRITE_VERSION, + null /* readerFeatures */, + null /* writerFeatures */); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java new file mode 100644 index 00000000000..fbca322d56c --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -0,0 +1,195 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal; + +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.util.*; + +import io.delta.kernel.*; +import io.delta.kernel.client.TableClient; +import io.delta.kernel.data.Row; +import io.delta.kernel.expressions.Column; +import io.delta.kernel.expressions.Predicate; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterable; +import io.delta.kernel.utils.CloseableIterator; + +import io.delta.kernel.internal.actions.*; +import io.delta.kernel.internal.data.TransactionStateRow; +import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.util.FileNames; +import static io.delta.kernel.internal.TableConfig.CHECKPOINT_INTERVAL; +import static io.delta.kernel.internal.actions.SingleAction.*; +import static io.delta.kernel.internal.util.Preconditions.checkState; + +public class TransactionImpl implements Transaction { + public static final int DEFAULT_READ_VERSION = 1; + public static final int DEFAULT_WRITE_VERSION = 2; + + private final String engineInfo; + private final String operation; + private final Path dataPath; + private final Path logPath; + private final Optional newSchema; + private final Protocol protocol; + private final Metadata metadata; + private final Optional readVersion; + private final Optional readPredicate; + private final SnapshotImpl readSnapshot; + private final Optional txnIdentifier; + + private boolean committed; // To avoid trying to commit the same transaction again. + + public TransactionImpl( + Path dataPath, + Path logPath, + SnapshotImpl readSnapshot, + String engineInfo, + String operation, + Optional newSchema, + Protocol protocol, + Metadata metadata, + Optional readVersion, + Optional readPredicate, + Optional txnIdentifier) { + this.dataPath = dataPath; + this.logPath = logPath; + this.readSnapshot = readSnapshot; + this.engineInfo = engineInfo; + this.operation = operation; + this.newSchema = newSchema; + this.protocol = protocol; + this.metadata = metadata; + this.readVersion = readVersion; + this.readPredicate = readPredicate; + this.txnIdentifier = txnIdentifier; + } + + @Override + public Row getState(TableClient tableClient) { + return TransactionStateRow.of(metadata, dataPath.toString()); + } + + @Override + public List getPartitionColumns(TableClient tableClient) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + @Override + public StructType getSchema(TableClient tableClient) { + return newSchema.orElseGet(() -> readSnapshot.getSchema(tableClient)); + } + + @Override + public TransactionCommitResult commit( + TableClient tableClient, + CloseableIterable stagedData) throws ConcurrentWriteException { + checkState(!committed, "Transaction is already committed. Create a new transaction."); + List metadataActions = new ArrayList<>(); + metadataActions.add(createCommitInfoSingleAction(generateCommitAction())); + metadataActions.add(createMetadataSingleAction(metadata.toRow())); + metadataActions.add(createProtocolSingleAction(protocol.toRow())); + addSetTransactionIfPresent(metadataActions); + + CloseableIterator stageDataIter = stagedData.iterator(); + + // Create a new CloseableIterator that will return the commit action, metadata action, and + // the stagedData + CloseableIterator stagedDataWithCommitInfo = new CloseableIterator() { + private int metadataActionReturned = -1; + + @Override + public boolean hasNext() { + return metadataActionReturned < metadataActions.size() - 1 || + stageDataIter.hasNext(); + } + + @Override + public Row next() { + if (metadataActionReturned < metadataActions.size() - 1) { + metadataActionReturned += 1; + return metadataActions.get(metadataActionReturned); + } else { + return stageDataIter.next(); + } + } + + @Override + public void close() throws IOException { + stageDataIter.close(); + } + }; + + try { + long readVersion = readSnapshot.getVersion(tableClient); + if (readVersion == -1) { + // New table, create a delta log directory + tableClient.getFileSystemClient().mkdir(logPath.toString()); + } + + long newVersion = readVersion + 1; + // Write the staged data to a delta file + tableClient.getJsonHandler().writeJsonFileAtomically( + FileNames.deltaFile(logPath, newVersion), + stagedDataWithCommitInfo, + false /* overwrite */ + ); + + committed = true; + + return new TransactionCommitResult(newVersion, readyForCheckpoint(newVersion)); + } catch (FileAlreadyExistsException e) { + // TODO: Resolve conflicts and retry commit + throw new ConcurrentWriteException(); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + public Optional getReadVersion() { + return readVersion; + } + + public Optional getReadPredicate() { + return readPredicate; + } + + /** + * Get the part of the schema of the table that needs the statistics to be collected per file. + * + * @param tableClient {@link TableClient} instance to use. + * @param transactionState State of the transaction + * @return + */ + public static List getStatisticsColumns(TableClient tableClient, Row transactionState) { + // TODO: implement this once we start supporting collecting stats + return Collections.emptyList(); + } + + private Row generateCommitAction() { + return new CommitInfo(System.currentTimeMillis(), engineInfo, operation).toRow(); + } + + private void addSetTransactionIfPresent(List actions) { + txnIdentifier.ifPresent(txnId -> actions.add(createTxnSingleAction(txnId.toRow()))); + } + + private boolean readyForCheckpoint(long newVersion) { + int checkpointInterval = CHECKPOINT_INTERVAL.fromMetadata(metadata); + return newVersion > 0 && newVersion % checkpointInterval == 0; + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java index 66bf43cbce0..553422ca53d 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java @@ -15,7 +15,19 @@ */ package io.delta.kernel.internal.actions; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +import io.delta.kernel.data.Row; +import io.delta.kernel.expressions.Literal; import io.delta.kernel.types.*; +import io.delta.kernel.utils.DataFileStatus; + +import io.delta.kernel.internal.data.GenericRow; +import io.delta.kernel.internal.fs.Path; +import static io.delta.kernel.internal.fs.FileOperations.relativizePath; +import static io.delta.kernel.internal.util.PartitionUtils.serializePartitionMap; /** * Delta log action representing an `AddFile` @@ -57,4 +69,33 @@ public class AddFile { true /* nullable */); // There are more fields which are added when row-id tracking and clustering is enabled. // When Kernel starts supporting row-ids and clustering, we should add those fields here. + + private static final int PATH_ORDINAL = FULL_SCHEMA.indexOf("path"); + private static final int PARTITION_VALUES_ORDINAL = FULL_SCHEMA.indexOf("partitionValues"); + private static final int SIZE_ORDINAL = FULL_SCHEMA.indexOf("size"); + private static final int MODIFICATION_TIME_ORDINAL = FULL_SCHEMA.indexOf("modificationTime"); + private static final int DATA_CHANGE_ORDINAL = FULL_SCHEMA.indexOf("dataChange"); + private static final int STATS_ORDINAL = FULL_SCHEMA.indexOf("stats"); + + /** + * Utility to generate `AddFile` row from the given {@link DataFileStatus} and partition values. + */ + public static Row convertDataFileStatus( + URI tableRoot, + DataFileStatus dataFileStatus, + Map partitionValues, + boolean dataChange) { + Path filePath = new Path(dataFileStatus.getPath()); + Map valueMap = new HashMap<>(); + valueMap.put(PATH_ORDINAL, relativizePath(filePath, tableRoot).toString()); + valueMap.put(PARTITION_VALUES_ORDINAL, serializePartitionMap(partitionValues)); + valueMap.put(SIZE_ORDINAL, dataFileStatus.getSize()); + valueMap.put(MODIFICATION_TIME_ORDINAL, dataFileStatus.getModificationTime()); + valueMap.put(DATA_CHANGE_ORDINAL, dataChange); + if (dataFileStatus.getStatistics().isPresent()) { + valueMap.put(STATS_ORDINAL, dataFileStatus.getStatistics().get().serializeAsJson()); + } + // any fields not present in the valueMap are considered null + return new GenericRow(FULL_SCHEMA, valueMap); + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java index c757428303b..3571b1889ba 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java @@ -17,8 +17,7 @@ import java.util.*; -import io.delta.kernel.data.ColumnVector; -import io.delta.kernel.data.Row; +import io.delta.kernel.data.*; import io.delta.kernel.types.ArrayType; import io.delta.kernel.types.IntegerType; import io.delta.kernel.types.StringType; @@ -103,9 +102,14 @@ public Row toRow() { Map protocolMap = new HashMap<>(); protocolMap.put(0, minReaderVersion); protocolMap.put(1, minWriterVersion); - protocolMap.put(2, readerFeatures == null ? null : stringArrayValue(readerFeatures)); - protocolMap.put(3, writerFeatures == null ? null : stringArrayValue(writerFeatures)); + protocolMap.put(2, arrayValue(readerFeatures)); + protocolMap.put(3, arrayValue(writerFeatures)); return new GenericRow(Protocol.FULL_SCHEMA, protocolMap); } + + private static ArrayValue arrayValue(List values) { + // don't write empty array. + return values == null || values.isEmpty() ? null : stringArrayValue(values); + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/TransactionStateRow.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/TransactionStateRow.java new file mode 100644 index 00000000000..7a0ae7d7c3d --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/TransactionStateRow.java @@ -0,0 +1,73 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.data; + +import java.util.*; +import java.util.stream.IntStream; +import static java.util.stream.Collectors.toMap; + +import io.delta.kernel.Transaction; +import io.delta.kernel.client.TableClient; +import io.delta.kernel.data.Row; +import io.delta.kernel.types.*; + +import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.util.VectorUtils; + +public class TransactionStateRow extends GenericRow { + private static final StructType SCHEMA = new StructType() + .add("partitionColumns", new ArrayType(StringType.STRING, false)) + .add("tablePath", StringType.STRING); + + private static final Map COL_NAME_TO_ORDINAL = + IntStream.range(0, SCHEMA.length()) + .boxed() + .collect(toMap(i -> SCHEMA.at(i).getName(), i -> i)); + + public static TransactionStateRow of(Metadata metadata, String tablePath) { + HashMap valueMap = new HashMap<>(); + valueMap.put(COL_NAME_TO_ORDINAL.get("partitionColumns"), metadata.getPartitionColumns()); + valueMap.put(COL_NAME_TO_ORDINAL.get("tablePath"), tablePath); + return new TransactionStateRow(valueMap); + } + + private TransactionStateRow(HashMap valueMap) { + super(SCHEMA, valueMap); + } + + /** + * Get the list of partition column names from the write state {@link Row} returned by + * {@link Transaction#getState(TableClient)}. + * + * @param transactionState Scan state {@link Row} + * @return List of partition column names according to the scan state. + */ + public static List getPartitionColumnsList(Row transactionState) { + return VectorUtils.toJavaList( + transactionState.getArray(COL_NAME_TO_ORDINAL.get("partitionColumns"))); + } + + /** + * Get the table root from scan state {@link Row} returned by + * {@link Transaction#getState(TableClient)} + * + * @param transactionState Transaction state state {@link Row} + * @return Fully qualified path to the location of the table. + */ + public static String getTableRoot(Row transactionState) { + return transactionState.getString(COL_NAME_TO_ORDINAL.get("tablePath")); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/fs/FileOperations.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/fs/FileOperations.java new file mode 100644 index 00000000000..7e52b64eed1 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/fs/FileOperations.java @@ -0,0 +1,40 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.fs; + +import java.net.URI; + +public class FileOperations { + private FileOperations() {} + + /** + * Relativize the given child path with respect to the given root URI. If the child path is + * already a relative path, it is returned as is. + * + * @param child + * @param root Root directory as URI. Relativization is done with respect to this root. + * The relativize operation requires conversion to URI, so the caller is expected to + * convert the root directory to URI once and use it for relativizing for multiple + * child paths. + * @return + */ + public static Path relativizePath(Path child, URI root) { + if (child.isAbsolute()) { + return new Path(root.relativize(child.toUri())); + } + return child; + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java index 5cdfb8a2f0e..923150c49ad 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java @@ -188,7 +188,7 @@ public CloseableIterator getAddFilesAsColumnarBatches( * delta files newer than the hint to search for any new P & M. If we don't find them, we can * just use the P and/or M from the hint. */ - private Tuple2 loadTableProtocolAndMetadata( + protected Tuple2 loadTableProtocolAndMetadata( Optional snapshotHint, long snapshotVersion) { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index 97bdb7e1b11..c8bb8894a97 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -38,6 +38,7 @@ import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.lang.ListUtils; import io.delta.kernel.internal.replay.CreateCheckpointIterator; +import io.delta.kernel.internal.replay.LogReplay; import io.delta.kernel.internal.util.FileNames; import io.delta.kernel.internal.util.Tuple2; import static io.delta.kernel.internal.TableFeatures.validateWriteSupportedTable; @@ -365,19 +366,26 @@ private SnapshotImpl createSnapshot( .orElse("."); logger.info("{}: Loading version {} {}", tablePath, initSegment.version, startingFromStr); + + LogReplay logReplay = new LogReplay( + logPath, + tablePath, + initSegment.version, + tableClient, + initSegment, + Optional.ofNullable(latestSnapshotHint.get())); + long startTimeMillis = System.currentTimeMillis(); assertLogFilesBelongToTable(logPath, initSegment.allLogFilesUnsorted()); final SnapshotImpl snapshot = new SnapshotImpl( - logPath, - tablePath, - initSegment.version, - initSegment, - tableClient, - initSegment.lastCommitTimestamp, - Optional.ofNullable(latestSnapshotHint.get()) - ); + logPath, + tablePath, + initSegment, + logReplay, + logReplay.getProtocol(), + logReplay.getMetadata()); logger.info( "{}: Took {}ms to construct the snapshot (loading protocol and metadata) for {} {}", tablePath, diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/PartitionUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/PartitionUtils.java index e02b5c3b4c1..937a4c770d4 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/PartitionUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/PartitionUtils.java @@ -111,6 +111,56 @@ public static ColumnarBatch withPartitionColumns( return dataBatch; } + /** + * Convert the given partition values to a {@link MapValue} that can be serialized to a Delta + * commit file. + * + * @param partitionValueMap + * @return + */ + public static MapValue serializePartitionMap(Map partitionValueMap) { + if (partitionValueMap == null || partitionValueMap.size() == 0) { + return VectorUtils.stringStringMapValue(Collections.emptyMap()); + } + + Map serializedPartValues = + partitionValueMap.entrySet().stream().map(entry -> { + String partColName = entry.getKey(); + Literal partValue = entry.getValue(); + + if (partValue == null) { + return new Tuple2<>(partColName, (String) null); + } else { + return new Tuple2<>(partColName, serializePartitionValue(partValue)); + } + }).collect(Collectors.toMap( + tuple2 -> tuple2._1, + tuple2 -> tuple2._2 + )); + + return VectorUtils.stringStringMapValue(serializedPartValues); + } + + /** + * Validate {@code partitionValues} contains values for every partition column. + * + * @param partitionColumns List of partition columns + * @param partitionValues Map of partition column to value map. + */ + public static void validatePartitionValues( + List partitionColumns, + Map partitionValues) { + Set partitionColNames = new HashSet<>(partitionColumns); + Set partitionValueColNames = partitionValues.keySet(); + if (!partitionColNames.equals(partitionValueColNames)) { + throw new IllegalArgumentException( + String.format( + "Partition values provided are not matching the partition columns. " + + "Partition columns: %s, Partition values: %s", + partitionColNames, partitionValueColNames)); + } + } + /** * Split the given predicate into predicate on partition columns and predicate on data columns. * diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/CloseableIterable.java b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/CloseableIterable.java new file mode 100644 index 00000000000..2be2cc0b89f --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/CloseableIterable.java @@ -0,0 +1,41 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.utils; + +import java.io.Closeable; +import java.util.Spliterator; + +/** + * Extend the Java {@link Iterable} interface to provide a way to close the iterator. + * + * @param + */ +public interface CloseableIterable extends Iterable, Closeable { + + /** + * Overrides the default iterator method to return a {@link CloseableIterator}. + * + * @return a {@link CloseableIterator} instance. + */ + @Override + CloseableIterator iterator(); + + @Override + default Spliterator spliterator() { + // We need a way to close the iterator, so we don't support spliterator for now. + throw new UnsupportedOperationException("spliterator is not supported"); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/DataFileStatistics.java b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/DataFileStatistics.java index 6596cd00372..80f914b8a8f 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/DataFileStatistics.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/DataFileStatistics.java @@ -91,4 +91,9 @@ public Map getMaxValues() { public Map getNullCounts() { return nullCounts; } + + public String serializeAsJson() { + // TODO: implement this + return "{}"; + } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala new file mode 100644 index 00000000000..f3ad25d59dc --- /dev/null +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala @@ -0,0 +1,385 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults + +import io.delta.golden.GoldenTableUtils +import io.delta.kernel.data.{ColumnarBatch, FilteredColumnarBatch, Row} +import io.delta.kernel.defaults.internal.data.DefaultColumnarBatch +import io.delta.kernel.defaults.internal.parquet.ParquetSuiteBase +import io.delta.kernel.defaults.utils.{TestRow, TestUtils} +import io.delta.kernel.expressions.Literal +import io.delta.kernel.internal.util.Utils.toCloseableIterator +import io.delta.kernel.types.IntegerType.INTEGER +import io.delta.kernel.types.StructType +import io.delta.kernel.utils.{CloseableIterable, CloseableIterator} +import io.delta.kernel.{ConcurrentTransactionException, Table, Transaction, TransactionBuilder, TransactionCommitResult} +import org.scalatest.funsuite.AnyFunSuite + +import java.util.Optional +import scala.collection.JavaConverters._ + +class DeltaTableWritesSuite extends AnyFunSuite with TestUtils with ParquetSuiteBase { + + /** Test table schemas */ + val testSchema = new StructType().add("id", INTEGER) + val testPartitionSchema = new StructType() + .add("id", INTEGER) + .add("part1", INTEGER) // partition column + .add("part2", INTEGER) // partition column + + val dataBatch1 = testBatch(200) + val dataBatch2 = testBatch(400) + + val dataPartitionBatch1 = testPartitionBatch(size = 236, part1 = 1, part2 = 2) + val dataPartitionBatch2 = testPartitionBatch(size = 876, part1 = 4, part2 = 5) + + test("insert into table - table created from scratch") { + withTempDir { tempDir => + val tblPath = tempDir.getAbsolutePath + + val table = Table.forPath(defaultTableClient, tblPath) + val newVersion1 = writeData(isNewTable = true, table, dataBatch1, dataBatch2) + assert(newVersion1.getVersion == 0) + val expectedAnswer = dataBatch1.toTestRows ++ dataBatch2.toTestRows + + verifyWrittenContent(tblPath, expectedAnswer) + } + } + + test("insert into table - already existing table") { + withTempDir { tempDir => + val tblPath = tempDir.getAbsolutePath + + val table = Table.forPath(defaultTableClient, tblPath) + + { + val newVersion1 = writeData(isNewTable = true, table, dataBatch1, dataBatch2) + assert(newVersion1.getVersion == 0) + + val expectedAnswer1 = dataBatch1.toTestRows ++ dataBatch2.toTestRows + verifyWrittenContent(tblPath, expectedAnswer1) + } + { + val newVersion2 = writeData(isNewTable = false, table, dataBatch2) + assert(newVersion2.getVersion == 1) + + val expectedAnswer2 = dataBatch1.toTestRows ++ dataBatch2.toTestRows ++ // version 0 + dataBatch2.toTestRows // version 1 + + verifyWrittenContent(tblPath, expectedAnswer2) + } + } + } + + test("insert into table - fails when creating table without schema") { + withTempDir { tempDir => + val tblPath = tempDir.getAbsolutePath + val table = Table.forPath(defaultTableClient, tblPath) + + val ex = intercept[IllegalArgumentException] { + createTxnBuilder(table) + .build(defaultTableClient) + } + assert(ex.getMessage.contains("Table doesn't exist yet. Must provide a new schema")) + } + } + + test("insert into table - fails when committing the same txn twice") { + withTempDir { tempDir => + val tblPath = tempDir.getAbsolutePath + val table = Table.forPath(defaultTableClient, tblPath) + + val txn = createTxnBuilder(table) + .withSchema(defaultTableClient, testSchema) + .build(defaultTableClient) + + val txnState = txn.getState(defaultTableClient) + + val closebleIterBatches = + toCloseableIterator(Seq(dataBatch1).toIterator.asJava) + val stagedFiles = stageData(txnState, Map.empty, closebleIterBatches) + + val stagedActionsIterable = inMemoryIterable(stagedFiles) + val newVersion = txn.commit(defaultTableClient, stagedActionsIterable) + assert(newVersion.getVersion == 0) + + // try to commit the same transaction and expect failure + val ex = intercept[IllegalStateException] { + txn.commit(defaultTableClient, stagedActionsIterable) + } + assert(ex.getMessage.contains("Transaction is already committed. Create a new transaction.")) + } + } + + test("insert into partitioned table - table created from scratch") { + withTempDir { tempDir => + val tblPath = tempDir.getAbsolutePath + + val table = Table.forPath(defaultTableClient, tblPath) + val newVersion1 = writeDataWithPartitions( + tblPath, + isNewTable = true, + dataPartitionBatch1.getData.getSchema, + Set("part1", "part2"), + Map("part1" -> Literal.ofInt(1), "part2" -> Literal.ofInt(2)), + dataPartitionBatch1) + assert(newVersion1.getVersion == 0) + + verifyWrittenContent(tblPath, dataPartitionBatch1.toTestRows) + } + } + + test("insert into partitioned table - already existing table") { + withTempDir { tempDir => + val tblPath = tempDir.getAbsolutePath + + val table = Table.forPath(defaultTableClient, tblPath) + + { + val newVersion1 = writeDataWithPartitions( + tblPath, + isNewTable = true, + dataPartitionBatch1.getData.getSchema, + Set("part1", "part2"), + Map("part1" -> Literal.ofInt(1), "part2" -> Literal.ofInt(2)), + dataPartitionBatch1) + assert(newVersion1.getVersion == 0) + + verifyWrittenContent(tblPath, dataPartitionBatch1.toTestRows) + } + { + val newVersion2 = writeDataWithPartitions( + tblPath, + isNewTable = false, + dataPartitionBatch2.getData.getSchema, + Set("part1", "part2"), + Map("part1" -> Literal.ofInt(4), "part2" -> Literal.ofInt(5)), + dataPartitionBatch2) + assert(newVersion2.getVersion == 1) + + val expectedAnswer2 = dataPartitionBatch1.toTestRows /* version 0 */ ++ + dataPartitionBatch2.toTestRows /* version 1 */ + verifyWrittenContent(tblPath, expectedAnswer2) + } + } + } + + test("insert into table - idempotent writes") { + withTempDir { tempDir => + val tblPath = tempDir.getAbsolutePath + + val table = Table.forPath(defaultTableClient, tblPath) + + def addDataWithTxnId(appId: String, txnVersion: Long, expCommitVersion: Long): Unit = { + val txn = createTxnBuilder(table) + .withSchema(defaultTableClient, testSchema) + .withTransactionId(defaultTableClient, appId, txnVersion) + .build(defaultTableClient) + + val txnState = txn.getState(defaultTableClient) + + val closebleIterBatches = + toCloseableIterator(Seq(dataBatch1, dataBatch2).toIterator.asJava) + val stagedFiles = stageData(txnState, Map.empty, closebleIterBatches) + + val stagedActionsIterable = inMemoryIterable(stagedFiles) + val newVersion = txn.commit(defaultTableClient, stagedActionsIterable) + assertCommit(expCommitVersion, newVersion) + } + + def testData(): Seq[TestRow] = dataBatch1.toTestRows ++ dataBatch2.toTestRows + + { + // Create a transaction with id (txnAppId1, 0) and commit it + addDataWithTxnId(appId = "txnAppId1", txnVersion = 0, expCommitVersion = 0) + val expectedAnswer = testData() /* v0 */ + checkTable(path = tblPath, expectedAnswer) + } + { + // Try to create a transaction with id (txnAppId1, 0) and commit it - should be valid + addDataWithTxnId("txnAppId1", txnVersion = 1, expCommitVersion = 1) + val expectedAnswer = testData() /* v0 */ ++ testData() /* v1 */ + verifyWrittenContent(tblPath, expectedAnswer) + } + { + // Try to create a transaction with id (txnAppId1, 1) and try to commit it + // Should fail the it is already committed above. + val ex = intercept[ConcurrentTransactionException] { + addDataWithTxnId("txnAppId1", txnVersion = 1, expCommitVersion = 2) + } + assert(ex.getMessage.contains( + "This error occurs when multiple updates are using the same transaction " + + "identifier to write into this table.\n" + + "Application ID: txnAppId1, Attempted version: 1, Latest version in table: 1")) + } + { + // Try to create a transaction with id (txnAppId2, 1) and commit it + // Should be successful as the transaction app id is different + addDataWithTxnId("txnAppId2", txnVersion = 1, expCommitVersion = 2) + + val expectedAnswer = testData() /* v0 */ ++ testData() /* v1 */ ++ testData() /* v2 */ + verifyWrittenContent(tblPath, expectedAnswer) + } + { + // Try to create a transaction with id (txnAppId2, 0) and commit it + // Should fail as the transaction app id is same but the version is less than the committed + val ex = intercept[ConcurrentTransactionException] { + addDataWithTxnId("txnAppId2", txnVersion = 0, expCommitVersion = 3) + } + assert(ex.getMessage.contains( + "This error occurs when multiple updates are using the same transaction " + + "identifier to write into this table.\n" + + "Application ID: txnAppId2, Attempted version: 0, Latest version in table: 1")) + } + { + // TODO: Add a test case where there are concurrent transactions with same app id + // and only one of them succeeds. + } + } + } + + ignore("TODO: fix - read and write all types delta table file") { + withTempDir(tempDir => { + val tblPath = tempDir.getAbsolutePath + val inputTblPath = GoldenTableUtils.goldenTablePath("parquet-all-types") + val schema = tableSchema(inputTblPath) + val inputData = readParquetUsingKernelAsColumnarBatches(inputTblPath, schema) + + // write data using Kernel + val table = Table.forPath(defaultTableClient, tblPath) + val newVersion1 = writeData(isNewTable = true, table, inputData.map(_.toFiltered): _*) + assert(newVersion1.getVersion == 0) + verifyWrittenContent(tblPath, inputData.map(_.toFiltered).flatMap(_.toTestRows)) + }) + } + + def assertCommit(expVersion: Long, commitStatus: TransactionCommitResult): Unit = { + assert(commitStatus.getVersion == expVersion) + } + + def writeData(isNewTable: Boolean, table: Table, batches: FilteredColumnarBatch*) + : TransactionCommitResult = { + val tblPath = table.getPath(defaultTableClient) + val schema = batches.head.getData.getSchema + writeDataWithPartitions(tblPath, isNewTable, schema, Set.empty, Map.empty, batches: _*) + } + + def writeNewTableUnpartitioned( + tblPath: String, + schema: StructType, + batches: FilteredColumnarBatch*): TransactionCommitResult = { + writeDataUnpartitioned(tblPath, isNewTable = true, schema, batches: _*) + } + + def writeDataUnpartitioned( + tblPath: String, + isNewTable: Boolean, + schema: StructType, + batches: FilteredColumnarBatch*): TransactionCommitResult = { + writeDataWithPartitions(tblPath, isNewTable, schema, Set.empty, Map.empty, batches: _*) + } + + def writeDataWithPartitions( + tblPath: String, + isNewTable: Boolean, + schema: StructType, + partitionCols: Set[String], + partitionValues: Map[String, Literal], + data: FilteredColumnarBatch*): TransactionCommitResult = { + val table = Table.forPath(defaultTableClient, tblPath) + var txn1Builder = createTxnBuilder(table) + if (isNewTable) { + txn1Builder = txn1Builder + .withSchema(defaultTableClient, schema) + .withPartitionColumns(defaultTableClient, partitionCols.asJava) + } + val txn1 = txn1Builder.build(defaultTableClient) + + val txn1State = txn1.getState(defaultTableClient) + + val closebleIterBatches = + toCloseableIterator(data.toIterator.asJava) + val stagedFiles1 = stageData(txn1State, partitionValues, closebleIterBatches) + + val stagedActionsIterable = inMemoryIterable(stagedFiles1) + txn1.commit(defaultTableClient, stagedActionsIterable) + } + + def createTxnBuilder(table: Table): TransactionBuilder = { + table.createTransactionBuilder( + defaultTableClient, + "Delta Kernel 3.1.0", // engine info + "INSERT" + ) + } + + def testBatch(size: Integer): FilteredColumnarBatch = { + val intVector = testColumnVector(size, INTEGER) + val batch1: ColumnarBatch = + new DefaultColumnarBatch(intVector.getSize, testSchema, Seq(intVector).toArray) + new FilteredColumnarBatch(batch1, Optional.empty()) + } + + def testPartitionBatch(size: Integer, part1: Int, part2: Int): FilteredColumnarBatch = { + val intVector = testColumnVector(size, INTEGER) + val intPart1Vector = testSingleValueVector(INTEGER, size, part1) + val intPart2Vector = testSingleValueVector(INTEGER, size, part2) + val batch1: ColumnarBatch = + new DefaultColumnarBatch( + intVector.getSize, + testPartitionSchema, + Seq(intVector, intPart1Vector, intPart2Vector).toArray) + new FilteredColumnarBatch(batch1, Optional.empty()) + } + + def stageData( + state: Row, + partitionValues: Map[String, Literal], + data: CloseableIterator[FilteredColumnarBatch]) + : CloseableIterator[Row] = { + val physicalDataIter = + Transaction.transformLogicalData(defaultTableClient, state, data, partitionValues.asJava) + + val writeContext = Transaction.getWriteContext( + defaultTableClient, state, partitionValues.asJava) + + val writeResultIter = defaultTableClient + .getParquetHandler + .writeParquetFiles( + writeContext.getTargetDirectory, + physicalDataIter, + writeContext.getStatisticsColumns) + + Transaction.generateAppendActions(defaultTableClient, state, writeResultIter, writeContext) + } + + def inMemoryIterable(actionsIter: CloseableIterator[Row]): CloseableIterable[Row] = { + val actions: Seq[Row] = actionsIter.toSeq + new CloseableIterable[Row] { + override def iterator(): CloseableIterator[Row] = toCloseableIterator(actions.iterator.asJava) + + override def close(): Unit = {} + } + } + + def verifyWrittenContent(tablePath: String, expected: Seq[TestRow]): Unit = { + checkTable(tablePath, expected) // verify using Kernel reads + + // verify using Spark + val resultSpark = spark.sql(s"SELECT * FROM delta.`$tablePath`").collect().map(TestRow(_)) + checkAnswer(resultSpark, expected) + } +} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala index 7a11471fff6..366e5d1556b 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala @@ -109,6 +109,12 @@ trait TestUtils extends Assertions with SQLHelper { } } + implicit class FilteredColumnarBatchOps(batch: FilteredColumnarBatch) { + def toTestRows: Seq[TestRow] = { + batch.getRows.toSeq.map(TestRow(_)) + } + } + implicit class ColumnOps(column: Column) { def toPath: String = column.getNames.mkString(".") }