From 3f574d389b4b5cd17654638a40963eacf65563f1 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 9 Jul 2024 11:36:43 +0200 Subject: [PATCH] Support partial deletes (#569) * Add option to delete datafiles This is done through the Iceberg metadata, resulting in efficient deletes if the data is partitioned correctly * Pull in main * WIP * Change DataScan to accept Metadata and io For the partial deletes I want to do a scan on in memory metadata. Changing this API allows this. * fix name-mapping issue * WIP * WIP * Moar tests * Oops * Cleanup * WIP * WIP * Fix summary generation * Last few bits * Fix the requirement * Make ruff happy * Comments, thanks Kevin! * Comments * Append rather than truncate * Fix merge conflicts * Make the tests pass * Add another test * Conflicts * Add docs (#33) * docs * docs * Add a partitioned overwrite test * Fix comment * Skip empty manifests --------- Co-authored-by: HonahX Co-authored-by: Sung Yun <107272191+syun64@users.noreply.github.com> --- mkdocs/docs/api.md | 21 +- pyiceberg/io/pyarrow.py | 64 ++- pyiceberg/manifest.py | 2 +- pyiceberg/table/__init__.py | 451 +++++++++++++++--- pyiceberg/table/snapshots.py | 2 +- tests/catalog/integration_test_glue.py | 3 +- tests/catalog/test_sql.py | 3 +- tests/conftest.py | 32 +- tests/integration/test_deletes.py | 419 ++++++++++++++++ tests/integration/test_inspect_table.py | 18 +- tests/integration/test_rest_schema.py | 18 +- .../test_writes/test_partitioned_writes.py | 20 +- tests/integration/test_writes/test_writes.py | 107 +++-- tests/table/test_snapshots.py | 4 - 14 files changed, 1025 insertions(+), 139 deletions(-) create mode 100644 tests/integration/test_deletes.py diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 0e80b6eb5e..7386d0297a 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -331,12 +331,25 @@ df = pa.Table.from_pylist( table.append(df) ``` - +You can delete some of the data from the table by calling `tbl.delete()` with a desired `delete_filter`. + +```python +tbl.delete(delete_filter="city == 'Paris'") +``` -!!! example "Under development" - Writing using PyIceberg is still under development. Support for [partial overwrites](https://github.com/apache/iceberg-python/issues/268) and writing to [partitioned tables](https://github.com/apache/iceberg-python/issues/208) is planned and being worked on. +In the above example, any records where the city field value equals to `Paris` will be deleted. +Running `tbl.scan().to_arrow()` will now yield: - +``` +pyarrow.Table +city: string +lat: double +long: double +---- +city: [["Amsterdam","San Francisco","Drachten"],["Groningen"]] +lat: [[52.371807,37.773972,53.11254],[53.21917]] +long: [[4.896029,-122.431297,6.0989],[6.56667]] +``` ## Inspecting tables diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index e6490ae156..50406972a7 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -31,6 +31,7 @@ import logging import os import re +import uuid from abc import ABC, abstractmethod from concurrent.futures import Future from copy import copy @@ -126,7 +127,6 @@ visit, visit_with_partner, ) -from pyiceberg.table import PropertyUtil, TableProperties, WriteTask from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.name_mapping import NameMapping from pyiceberg.transforms import TruncateTransform @@ -159,7 +159,7 @@ from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string if TYPE_CHECKING: - from pyiceberg.table import FileScanTask + from pyiceberg.table import FileScanTask, WriteTask logger = logging.getLogger(__name__) @@ -1563,6 +1563,8 @@ class PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector] _default_mode: str def __init__(self, schema: Schema, properties: Dict[str, str]): + from pyiceberg.table import TableProperties + self._schema = schema self._properties = properties self._default_mode = self._properties.get( @@ -1598,6 +1600,8 @@ def map( return k + v def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]: + from pyiceberg.table import TableProperties + column_name = self._schema.find_column_name(self._field_id) if column_name is None: return [] @@ -1895,6 +1899,8 @@ def data_file_statistics_from_parquet_metadata( def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: + from pyiceberg.table import PropertyUtil, TableProperties + parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) row_group_size = PropertyUtil.property_as_int( properties=table_metadata.properties, @@ -2005,6 +2011,8 @@ def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_ def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]: + from pyiceberg.table import PropertyUtil, TableProperties + for key_pattern in [ TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, TableProperties.PARQUET_PAGE_ROW_LIMIT, @@ -2042,3 +2050,55 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]: default=TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT, ), } + + +def _dataframe_to_data_files( + table_metadata: TableMetadata, + df: pa.Table, + io: FileIO, + write_uuid: Optional[uuid.UUID] = None, + counter: Optional[itertools.count[int]] = None, +) -> Iterable[DataFile]: + """Convert a PyArrow table into a DataFile. + + Returns: + An iterable that supplies datafiles that represent the table. + """ + from pyiceberg.table import PropertyUtil, TableProperties, WriteTask + + counter = counter or itertools.count(0) + write_uuid = write_uuid or uuid.uuid4() + target_file_size: int = PropertyUtil.property_as_int( # type: ignore # The property is set with non-None value. + properties=table_metadata.properties, + property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, + default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, + ) + + if table_metadata.spec().is_unpartitioned(): + yield from write_file( + io=io, + table_metadata=table_metadata, + tasks=iter([ + WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=table_metadata.schema()) + for batches in bin_pack_arrow_table(df, target_file_size) + ]), + ) + else: + from pyiceberg.table import _determine_partitions + + partitions = _determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df) + yield from write_file( + io=io, + table_metadata=table_metadata, + tasks=iter([ + WriteTask( + write_uuid=write_uuid, + task_id=next(counter), + record_batches=batches, + partition_key=partition.partition_key, + schema=table_metadata.schema(), + ) + for partition in partitions + for batches in bin_pack_arrow_table(partition.arrow_table_partition, target_file_size) + ]), + ) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 4fd82fec1a..e6a81d2a6a 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -341,7 +341,7 @@ class DataFile(Record): split_offsets: Optional[List[int]] equality_ids: Optional[List[int]] sort_order_id: Optional[int] - spec_id: Optional[int] + spec_id: int def __setattr__(self, name: str, value: Any) -> None: """Assign a key/value to a DataFile.""" diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 2eec4d3036..8eea9859bc 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -50,19 +50,27 @@ from pyiceberg.conversions import from_bytes from pyiceberg.exceptions import CommitFailedException, ResolveError, ValidationError from pyiceberg.expressions import ( + AlwaysFalse, AlwaysTrue, And, BooleanExpression, EqualTo, + Not, + Or, Reference, ) from pyiceberg.expressions.visitors import ( + ROWS_CANNOT_MATCH, + ROWS_MUST_MATCH, _InclusiveMetricsEvaluator, + _StrictMetricsEvaluator, + bind, expression_evaluator, inclusive_projection, manifest_evaluator, ) from pyiceberg.io import FileIO, load_file_io +from pyiceberg.io.pyarrow import _dataframe_to_data_files, expression_to_pyarrow, project_table from pyiceberg.manifest import ( POSITIONAL_DELETE_SCHEMA, DataFile, @@ -238,6 +246,11 @@ class TableProperties: WRITE_PARTITION_SUMMARY_LIMIT = "write.summary.partition-limit" WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT = 0 + DELETE_MODE = "write.delete.mode" + DELETE_MODE_COPY_ON_WRITE = "copy-on-write" + DELETE_MODE_MERGE_ON_READ = "merge-on-read" + DELETE_MODE_DEFAULT = DELETE_MODE_COPY_ON_WRITE + DEFAULT_NAME_MAPPING = "schema.name-mapping.default" FORMAT_VERSION = "format-version" DEFAULT_FORMAT_VERSION = 2 @@ -305,7 +318,13 @@ def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequ requirement.validate(self.table_metadata) self._updates += updates - self._requirements += requirements + + # For the requirements, it does not make sense to add a requirement more than once + # For example, you cannot assert that the current schema has two different IDs + existing_requirements = {type(requirement) for requirement in self._requirements} + for new_requirement in requirements: + if type(new_requirement) not in existing_requirements: + self._requirements = self._requirements + requirements self.table_metadata = update_table_metadata(self.table_metadata, updates) @@ -316,6 +335,14 @@ def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequ return self + def _scan(self, row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE) -> DataScan: + """Minimal data scan the table with the current state of the transaction.""" + return DataScan( + table_metadata=self.table_metadata, + io=self._table.io, + row_filter=row_filter, + ) + def upgrade_table_version(self, format_version: TableVersion) -> Transaction: """Set the table to a certain version. @@ -499,11 +526,20 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) update_snapshot.append_data_file(data_file) def overwrite( - self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT + self, + df: pa.Table, + overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, + snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: """ Shorthand for adding a table overwrite with a PyArrow table to the transaction. + An overwrite may produce zero or more snapshots based on the operation: + + - DELETE: In case existing Parquet files can be dropped completely. + - REPLACE: In case existing Parquet files need to be rewritten. + - APPEND: In case new data is being inserted into the table. + Args: df: The Arrow dataframe that will be used to overwrite the table overwrite_filter: ALWAYS_TRUE when you overwrite all the data, @@ -518,11 +554,12 @@ def overwrite( if not isinstance(df, pa.Table): raise ValueError(f"Expected PyArrow table, got: {df}") - if overwrite_filter != AlwaysTrue(): - raise NotImplementedError("Cannot overwrite a subset of a table") - - if len(self._table.spec().fields) > 0: - raise ValueError("Cannot write to partitioned tables") + if unsupported_partitions := [ + field for field in self.table_metadata.spec().fields if not field.transform.supports_pyarrow_transform + ]: + raise ValueError( + f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}." + ) _check_schema_compatible(self._table.schema(), other_schema=df.schema) # cast if the two schemas are compatible but not equal @@ -530,7 +567,9 @@ def overwrite( if table_arrow_schema != df.schema: df = df.cast(table_arrow_schema) - with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as update_snapshot: + self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties) + + with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot: # skip writing data files if the dataframe is empty if df.shape[0] > 0: data_files = _dataframe_to_data_files( @@ -539,6 +578,86 @@ def overwrite( for data_file in data_files: update_snapshot.append_data_file(data_file) + def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + """ + Shorthand for deleting record from a table. + + An deletee may produce zero or more snapshots based on the operation: + + - DELETE: In case existing Parquet files can be dropped completely. + - REPLACE: In case existing Parquet files need to be rewritten + + Args: + delete_filter: A boolean expression to delete rows from a table + snapshot_properties: Custom properties to be added to the snapshot summary + """ + if ( + self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT) + == TableProperties.DELETE_MODE_MERGE_ON_READ + ): + warnings.warn("Merge on read is not yet supported, falling back to copy-on-write") + + if isinstance(delete_filter, str): + delete_filter = _parse_row_filter(delete_filter) + + with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot: + delete_snapshot.delete_by_predicate(delete_filter) + + # Check if there are any files that require an actual rewrite of a data file + if delete_snapshot.rewrites_needed is True: + bound_delete_filter = bind(self._table.schema(), delete_filter, case_sensitive=True) + preserve_row_filter = expression_to_pyarrow(Not(bound_delete_filter)) + + files = self._scan(row_filter=delete_filter).plan_files() + + commit_uuid = uuid.uuid4() + counter = itertools.count(0) + + replaced_files: List[Tuple[DataFile, List[DataFile]]] = [] + # This will load the Parquet file into memory, including: + # - Filter out the rows based on the delete filter + # - Projecting it to the current schema + # - Applying the positional deletes if they are there + # When writing + # - Apply the latest partition-spec + # - And sort order when added + for original_file in files: + df = project_table( + tasks=[original_file], + table_metadata=self._table.metadata, + io=self._table.io, + row_filter=AlwaysTrue(), + projected_schema=self.table_metadata.schema(), + ) + filtered_df = df.filter(preserve_row_filter) + + # Only rewrite if there are records being deleted + if len(df) != len(filtered_df): + replaced_files.append(( + original_file.file, + list( + _dataframe_to_data_files( + io=self._table.io, + df=filtered_df, + table_metadata=self._table.metadata, + write_uuid=commit_uuid, + counter=counter, + ) + ), + )) + + if len(replaced_files) > 0: + with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite( + commit_uuid=commit_uuid + ) as overwrite_snapshot: + for original_data_file, replaced_data_files in replaced_files: + overwrite_snapshot.delete_data_file(original_data_file) + for replaced_data_file in replaced_data_files: + overwrite_snapshot.append_data_file(replaced_data_file) + + if not delete_snapshot.files_affected and not delete_snapshot.rewrites_needed: + warnings.warn("Delete operation did not match any records") + def add_files(self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ Shorthand API for adding files as data files to the table transaction. @@ -1381,6 +1500,9 @@ def current_snapshot(self) -> Optional[Snapshot]: return self.snapshot_by_id(self.metadata.current_snapshot_id) return None + def snapshots(self) -> List[Snapshot]: + return self.metadata.snapshots + def snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]: """Get the snapshot of this table with the given id, or None if there is no matching snapshot.""" return self.metadata.snapshot_by_id(snapshot_id) @@ -1455,11 +1577,20 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) tx.append(df=df, snapshot_properties=snapshot_properties) def overwrite( - self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT + self, + df: pa.Table, + overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, + snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: """ Shorthand for overwriting the table with a PyArrow table. + An overwrite may produce zero or more snapshots based on the operation: + + - DELETE: In case existing Parquet files can be dropped completely. + - REPLACE: In case existing Parquet files need to be rewritten. + - APPEND: In case new data is being inserted into the table. + Args: df: The Arrow dataframe that will be used to overwrite the table overwrite_filter: ALWAYS_TRUE when you overwrite all the data, @@ -1469,6 +1600,19 @@ def overwrite( with self.transaction() as tx: tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties) + def delete( + self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT + ) -> None: + """ + Shorthand for deleting rows from the table. + + Args: + delete_filter: The predicate that used to remove rows + snapshot_properties: Custom properties to be added to the snapshot summary + """ + with self.transaction() as tx: + tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties) + def add_files(self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ Shorthand API for adding files as data files to the table. @@ -2904,52 +3048,6 @@ def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int, return f"{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro" -def _dataframe_to_data_files( - table_metadata: TableMetadata, df: pa.Table, io: FileIO, write_uuid: Optional[uuid.UUID] = None -) -> Iterable[DataFile]: - """Convert a PyArrow table into a DataFile. - - Returns: - An iterable that supplies datafiles that represent the table. - """ - from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file - - counter = itertools.count(0) - write_uuid = write_uuid or uuid.uuid4() - target_file_size: int = PropertyUtil.property_as_int( # type: ignore # The property is set with non-None value. - properties=table_metadata.properties, - property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, - default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, - ) - - if len(table_metadata.spec().fields) > 0: - partitions = _determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df) - yield from write_file( - io=io, - table_metadata=table_metadata, - tasks=iter([ - WriteTask( - write_uuid=write_uuid, - task_id=next(counter), - record_batches=batches, - partition_key=partition.partition_key, - schema=table_metadata.schema(), - ) - for partition in partitions - for batches in bin_pack_arrow_table(partition.arrow_table_partition, target_file_size) - ]), - ) - else: - yield from write_file( - io=io, - table_metadata=table_metadata, - tasks=iter([ - WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=table_metadata.schema()) - for batches in bin_pack_arrow_table(df, target_file_size) - ]), - ) - - def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]: """Convert a list files into DataFiles. @@ -2961,12 +3059,14 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, file_paths=iter(file_paths)) -class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]): +class _MergingSnapshotProducer(UpdateTableMetadata[U], Generic[U]): commit_uuid: uuid.UUID _operation: Operation _snapshot_id: int _parent_snapshot_id: Optional[int] _added_data_files: List[DataFile] + _deleted_data_files: Set[DataFile] + _manifest_counter: itertools.count[int] def __init__( self, @@ -2986,12 +3086,18 @@ def __init__( snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None ) self._added_data_files = [] + self._deleted_data_files = set() self.snapshot_properties = snapshot_properties + self._manifest_counter = itertools.count(0) - def append_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer: + def append_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer[U]: self._added_data_files.append(data_file) return self + def delete_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer[U]: + self._deleted_data_files.add(data_file) + return self + @abstractmethod def _deleted_entries(self) -> List[ManifestEntry]: ... @@ -3002,7 +3108,9 @@ def _manifests(self) -> List[ManifestFile]: def _write_added_manifest() -> List[ManifestFile]: if self._added_data_files: output_file_location = _new_manifest_path( - location=self._transaction.table_metadata.location, num=0, commit_uuid=self.commit_uuid + location=self._transaction.table_metadata.location, + num=next(self._manifest_counter), + commit_uuid=self.commit_uuid, ) with write_manifest( format_version=self._transaction.table_metadata.format_version, @@ -3030,7 +3138,9 @@ def _write_delete_manifest() -> List[ManifestFile]: deleted_entries = self._deleted_entries() if len(deleted_entries) > 0: output_file_location = _new_manifest_path( - location=self._transaction.table_metadata.location, num=1, commit_uuid=self.commit_uuid + location=self._transaction.table_metadata.location, + num=next(self._manifest_counter), + commit_uuid=self.commit_uuid, ) with write_manifest( @@ -3070,6 +3180,15 @@ def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: schema=self._transaction.table_metadata.schema(), ) + if len(self._deleted_data_files) > 0: + specs = self._transaction.table_metadata.specs() + for data_file in self._deleted_data_files: + ssc.remove_file( + data_file=data_file, + partition_spec=specs[data_file.spec_id], + schema=self._transaction.table_metadata.schema(), + ) + previous_snapshot = ( self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) if self._parent_snapshot_id is not None @@ -3119,11 +3238,153 @@ def _commit(self) -> UpdatesAndRequirements: snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name="main", type="branch" ), ), - (AssertRefSnapshotId(snapshot_id=self._parent_snapshot_id, ref="main"),), + (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),), ) -class FastAppendFiles(_MergingSnapshotProducer): +class DeleteFiles(_MergingSnapshotProducer["DeleteFiles"]): + """Will delete manifest entries from the current snapshot based on the predicate. + + This will produce a DELETE snapshot: + Data files were removed and their contents logically deleted and/or delete + files were added to delete rows. + + From the specification + """ + + _predicate: BooleanExpression + + def __init__( + self, + operation: Operation, + transaction: Transaction, + io: FileIO, + commit_uuid: Optional[uuid.UUID] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ): + super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) + self._predicate = AlwaysFalse() + + def _commit(self) -> UpdatesAndRequirements: + # Only produce a commit when there is something to delete + if self.files_affected: + return super()._commit() + else: + return (), () + + def _build_partition_projection(self, spec_id: int) -> BooleanExpression: + schema = self._transaction.table_metadata.schema() + spec = self._transaction.table_metadata.specs()[spec_id] + project = inclusive_projection(schema, spec) + return project(self._predicate) + + @cached_property + def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: + return KeyDefaultDict(self._build_partition_projection) + + def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: + schema = self._transaction.table_metadata.schema() + spec = self._transaction.table_metadata.specs()[spec_id] + return manifest_evaluator(spec, schema, self.partition_filters[spec_id], case_sensitive=True) + + def delete_by_predicate(self, predicate: BooleanExpression) -> None: + self._predicate = Or(self._predicate, predicate) + + @cached_property + def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry], bool]: + """Computes all the delete operation and cache it when nothing changes. + + Returns: + - List of existing manifests that are not affected by the delete operation. + - The manifest-entries that are deleted based on the metadata. + - Flag indicating that rewrites of data-files are needed. + """ + schema = self._transaction.table_metadata.schema() + + def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ManifestEntry: + return ManifestEntry( + status=status, + snapshot_id=entry.snapshot_id, + data_sequence_number=entry.data_sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, + ) + + manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) + strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=True).eval + inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(schema, self._predicate, case_sensitive=True).eval + + existing_manifests = [] + total_deleted_entries = [] + partial_rewrites_needed = False + self._deleted_data_files = set() + if snapshot := self._transaction.table_metadata.current_snapshot(): + for manifest_file in snapshot.manifests(io=self._io): + if manifest_file.content == ManifestContent.DATA: + if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file): + # If the manifest isn't relevant, we can just keep it in the manifest-list + existing_manifests.append(manifest_file) + else: + # It is relevant, let's check out the content + deleted_entries = [] + existing_entries = [] + for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True): + if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH: + deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED)) + self._deleted_data_files.add(entry.data_file) + elif inclusive_metrics_evaluator(entry.data_file) == ROWS_CANNOT_MATCH: + existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING)) + else: + # Based on the metadata, it is unsure to say if the file can be deleted + partial_rewrites_needed = True + + if len(deleted_entries) > 0: + total_deleted_entries += deleted_entries + + # Rewrite the manifest + if len(existing_entries) > 0: + output_file_location = _new_manifest_path( + location=self._transaction.table_metadata.location, + num=next(self._manifest_counter), + commit_uuid=self.commit_uuid, + ) + with write_manifest( + format_version=self._transaction.table_metadata.format_version, + spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id], + schema=self._transaction.table_metadata.schema(), + output_file=self._io.new_output(output_file_location), + snapshot_id=self._snapshot_id, + ) as writer: + for existing_entry in existing_entries: + writer.add_entry(existing_entry) + existing_manifests.append(writer.to_manifest_file()) + # else: + # deleted_manifests.append() + else: + existing_manifests.append(manifest_file) + else: + existing_manifests.append(manifest_file) + + return existing_manifests, total_deleted_entries, partial_rewrites_needed + + def _existing_manifests(self) -> List[ManifestFile]: + return self._compute_deletes[0] + + def _deleted_entries(self) -> List[ManifestEntry]: + return self._compute_deletes[1] + + @property + def rewrites_needed(self) -> bool: + """Indicate if data files need to be rewritten.""" + return self._compute_deletes[2] + + @property + def files_affected(self) -> bool: + """Indicate if any manifest-entries can be dropped.""" + return len(self._deleted_entries()) > 0 + + +class FastAppendFiles(_MergingSnapshotProducer["FastAppendFiles"]): def _existing_manifests(self) -> List[ManifestFile]: """To determine if there are any existing manifest files. @@ -3152,14 +3413,53 @@ def _deleted_entries(self) -> List[ManifestEntry]: return [] -class OverwriteFiles(_MergingSnapshotProducer): +class OverwriteFiles(_MergingSnapshotProducer["OverwriteFiles"]): + """Overwrites data from the table. This will produce an OVERWRITE snapshot. + + Data and delete files were added and removed in a logical overwrite operation. + """ + def _existing_manifests(self) -> List[ManifestFile]: - """To determine if there are any existing manifest files. + """Determine if there are any existing manifest files.""" + existing_files = [] - In the of a full overwrite, all the previous manifests are - considered deleted. - """ - return [] + if snapshot := self._transaction.table_metadata.current_snapshot(): + for manifest_file in snapshot.manifests(io=self._io): + entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True) + found_deleted_data_files = [entry.data_file for entry in entries if entry.data_file in self._deleted_data_files] + + if len(found_deleted_data_files) == 0: + existing_files.append(manifest_file) + else: + # We have to rewrite the + output_file_location = _new_manifest_path( + location=self._transaction.table_metadata.location, + num=next(self._manifest_counter), + commit_uuid=self.commit_uuid, + ) + if any(entry.data_file not in found_deleted_data_files for entry in entries): + with write_manifest( + format_version=self._transaction.table_metadata.format_version, + spec=self._transaction.table_metadata.spec(), + schema=self._transaction.table_metadata.schema(), + output_file=self._io.new_output(output_file_location), + snapshot_id=self._snapshot_id, + ) as writer: + [ + writer.add_entry( + ManifestEntry( + status=ManifestEntryStatus.EXISTING, + snapshot_id=entry.snapshot_id, + data_sequence_number=entry.data_sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, + ) + ) + for entry in entries + if entry.data_file not in found_deleted_data_files + ] + existing_files.append(writer.to_manifest_file()) + return existing_files def _deleted_entries(self) -> List[ManifestEntry]: """To determine if we need to record any deleted entries. @@ -3186,7 +3486,7 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: data_file=entry.data_file, ) for entry in manifest.fetch_manifest_entry(self._io, discard_deleted=True) - if entry.data_file.content == DataFileContent.DATA + if entry.data_file.content == DataFileContent.DATA and entry.data_file in self._deleted_data_files ] list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._io)) @@ -3200,7 +3500,7 @@ class UpdateSnapshot: _io: FileIO _snapshot_properties: Dict[str, str] - def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str]) -> None: + def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: self._transaction = transaction self._io = io self._snapshot_properties = snapshot_properties @@ -3210,8 +3510,9 @@ def fast_append(self) -> FastAppendFiles: operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties ) - def overwrite(self) -> OverwriteFiles: + def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles: return OverwriteFiles( + commit_uuid=commit_uuid, operation=Operation.OVERWRITE if self._transaction.table_metadata.current_snapshot() is not None else Operation.APPEND, @@ -3220,6 +3521,14 @@ def overwrite(self) -> OverwriteFiles: snapshot_properties=self._snapshot_properties, ) + def delete(self) -> DeleteFiles: + return DeleteFiles( + operation=Operation.DELETE, + transaction=self._transaction, + io=self._io, + snapshot_properties=self._snapshot_properties, + ) + class UpdateSpec(UpdateTableMetadata["UpdateSpec"]): _transaction: Transaction @@ -4056,7 +4365,7 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T {'year': [2020, 2022, 2022, 2021, 2022, 2022, 2022, 2019, 2021], 'n_legs': [2, 2, 2, 4, 4, 4, 4, 5, 100], 'animal': ["Flamingo", "Parrot", "Parrot", "Dog", "Horse", "Horse", "Horse","Brittle stars", "Centipede"]}. - The algrithm: + The algorithm: Firstly we group the rows into partitions by sorting with sort order [('n_legs', 'descending'), ('year', 'descending')] and null_placement of "at_end". This gives the same table as raw input. diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 842d42522a..1ccb079922 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -352,7 +352,7 @@ def get_prop(prop: str) -> int: def update_snapshot_summaries( summary: Summary, previous_summary: Optional[Mapping[str, str]] = None, truncate_full_table: bool = False ) -> Summary: - if summary.operation not in {Operation.APPEND, Operation.OVERWRITE}: + if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE}: raise ValueError(f"Operation not implemented: {summary.operation}") if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None: diff --git a/tests/catalog/integration_test_glue.py b/tests/catalog/integration_test_glue.py index 21c415212a..c69bc86ca8 100644 --- a/tests/catalog/integration_test_glue.py +++ b/tests/catalog/integration_test_glue.py @@ -33,9 +33,8 @@ NoSuchTableError, TableAlreadyExistsError, ) -from pyiceberg.io.pyarrow import schema_to_pyarrow +from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow from pyiceberg.schema import Schema -from pyiceberg.table import _dataframe_to_data_files from pyiceberg.types import IntegerType from tests.conftest import clean_up, get_bucket_name, get_s3_path diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index bb8fcea3ae..9251d717f8 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -39,10 +39,9 @@ TableAlreadyExistsError, ) from pyiceberg.io import FSSPEC_FILE_IO, PY_IO_IMPL -from pyiceberg.io.pyarrow import schema_to_pyarrow +from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC from pyiceberg.schema import Schema -from pyiceberg.table import _dataframe_to_data_files from pyiceberg.table.snapshots import Operation from pyiceberg.table.sorting import ( NullOrder, diff --git a/tests/conftest.py b/tests/conftest.py index d200f3ab3c..95e1128af6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2299,7 +2299,37 @@ def arrow_table_with_null(pa_schema: "pa.Schema") -> "pa.Table": """Pyarrow table with all kinds of columns.""" import pyarrow as pa - return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema) + return pa.Table.from_pydict( + { + "bool": [False, None, True], + "string": ["a", None, "z"], + # Go over the 16 bytes to kick in truncation + "string_long": ["a" * 22, None, "z" * 22], + "int": [1, None, 9], + "long": [1, None, 9], + "float": [0.0, None, 0.9], + "double": [0.0, None, 0.9], + # 'time': [1_000_000, None, 3_000_000], # Example times: 1s, none, and 3s past midnight #Spark does not support time fields + "timestamp": [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)], + "timestamptz": [ + datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc), + None, + datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc), + ], + "date": [date(2023, 1, 1), None, date(2023, 3, 1)], + # Not supported by Spark + # 'time': [time(1, 22, 0), None, time(19, 25, 0)], + # Not natively supported by Arrow + # 'uuid': [uuid.UUID('00000000-0000-0000-0000-000000000000').bytes, None, uuid.UUID('11111111-1111-1111-1111-111111111111').bytes], + "binary": [b"\01", None, b"\22"], + "fixed": [ + uuid.UUID("00000000-0000-0000-0000-000000000000").bytes, + None, + uuid.UUID("11111111-1111-1111-1111-111111111111").bytes, + ], + }, + schema=pa_schema, + ) @pytest.fixture(scope="session") diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py new file mode 100644 index 0000000000..ad3adedeca --- /dev/null +++ b/tests/integration/test_deletes.py @@ -0,0 +1,419 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name +from typing import List + +import pyarrow as pa +import pytest +from pyspark.sql import SparkSession + +from pyiceberg.catalog.rest import RestCatalog +from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.expressions import AlwaysTrue, EqualTo +from pyiceberg.manifest import ManifestEntryStatus +from pyiceberg.schema import Schema +from pyiceberg.table.snapshots import Operation, Summary +from pyiceberg.types import IntegerType, NestedField + + +def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None: + for sql in sqls: + spark.sql(sql) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_partitioned_table_delete_full_file(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None: + identifier = "default.table_partitioned_delete" + + run_spark_commands( + spark, + [ + f"DROP TABLE IF EXISTS {identifier}", + f""" + CREATE TABLE {identifier} ( + number_partitioned int, + number int + ) + USING iceberg + PARTITIONED BY (number_partitioned) + TBLPROPERTIES('format-version' = {format_version}) + """, + f""" + INSERT INTO {identifier} VALUES (10, 20), (10, 30) + """, + f""" + INSERT INTO {identifier} VALUES (11, 20), (11, 30) + """, + ], + ) + + tbl = session_catalog.load_table(identifier) + tbl.delete(EqualTo("number_partitioned", 10)) + + # No overwrite operation + assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "append", "delete"] + assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [11, 11], "number": [20, 30]} + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_partitioned_table_rewrite(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None: + identifier = "default.table_partitioned_delete" + + run_spark_commands( + spark, + [ + f"DROP TABLE IF EXISTS {identifier}", + f""" + CREATE TABLE {identifier} ( + number_partitioned int, + number int + ) + USING iceberg + PARTITIONED BY (number_partitioned) + TBLPROPERTIES('format-version' = {format_version}) + """, + f""" + INSERT INTO {identifier} VALUES (10, 20), (10, 30) + """, + f""" + INSERT INTO {identifier} VALUES (11, 20), (11, 30) + """, + ], + ) + + tbl = session_catalog.load_table(identifier) + tbl.delete(EqualTo("number", 20)) + + # We don't delete a whole partition, so there is only a overwrite + assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "append", "overwrite"] + assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [11, 10], "number": [30, 30]} + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_partitioned_table_no_match(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None: + identifier = "default.table_partitioned_delete" + + run_spark_commands( + spark, + [ + f"DROP TABLE IF EXISTS {identifier}", + f""" + CREATE TABLE {identifier} ( + number_partitioned int, + number int + ) + USING iceberg + PARTITIONED BY (number_partitioned) + TBLPROPERTIES('format-version' = {format_version}) + """, + f""" + INSERT INTO {identifier} VALUES (10, 20), (10, 30) + """, + ], + ) + + tbl = session_catalog.load_table(identifier) + tbl.delete(EqualTo("number_partitioned", 22)) # Does not affect any data + + assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append"] + assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10, 10], "number": [20, 30]} + + +@pytest.mark.integration +def test_delete_partitioned_table_positional_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None: + identifier = "default.table_partitioned_delete" + + run_spark_commands( + spark, + [ + f"DROP TABLE IF EXISTS {identifier}", + f""" + CREATE TABLE {identifier} ( + number_partitioned int, + number int + ) + USING iceberg + PARTITIONED BY (number_partitioned) + TBLPROPERTIES( + 'format-version' = 2, + 'write.delete.mode'='merge-on-read', + 'write.update.mode'='merge-on-read', + 'write.merge.mode'='merge-on-read' + ) + """, + f""" + INSERT INTO {identifier} VALUES (10, 20), (10, 30), (10, 40) + """, + # Generate a positional delete + f""" + DELETE FROM {identifier} WHERE number = 30 + """, + ], + ) + + tbl = session_catalog.load_table(identifier) + + # Assert that there is just a single Parquet file, that has one merge on read file + files = list(tbl.scan().plan_files()) + assert len(files) == 1 + assert len(files[0].delete_files) == 1 + + # Will rewrite a data file without the positional delete + tbl.delete(EqualTo("number", 40)) + + # One positional delete has been added, but an OVERWRITE status is set + # https://github.com/apache/iceberg/issues/10122 + assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "overwrite", "overwrite"] + assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10], "number": [20]} + + +@pytest.mark.integration +def test_overwrite_partitioned_table(spark: SparkSession, session_catalog: RestCatalog) -> None: + identifier = "default.table_partitioned_delete" + + run_spark_commands( + spark, + [ + f"DROP TABLE IF EXISTS {identifier}", + f""" + CREATE TABLE {identifier} ( + number_partitioned int, + number int + ) + USING iceberg + PARTITIONED BY (number_partitioned) + TBLPROPERTIES( + 'format-version' = 2, + 'write.delete.mode'='merge-on-read', + 'write.update.mode'='merge-on-read', + 'write.merge.mode'='merge-on-read' + ) + """, + f""" + INSERT INTO {identifier} VALUES (10, 1), (10, 2), (20, 3) + """, + ], + ) + + tbl = session_catalog.load_table(identifier) + + files = list(tbl.scan().plan_files()) + assert len(files) == 2 + + arrow_schema = pa.schema([pa.field("number_partitioned", pa.int32()), pa.field("number", pa.int32())]) + arrow_tbl = pa.Table.from_pylist( + [ + {"number_partitioned": 10, "number": 4}, + {"number_partitioned": 10, "number": 5}, + ], + schema=arrow_schema, + ) + + # Will rewrite a data file without the positional delete + tbl.overwrite(arrow_tbl, "number_partitioned == 10") + + # One positional delete has been added, but an OVERWRITE status is set + # https://github.com/apache/iceberg/issues/10122 + assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "delete", "append"] + assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10, 10, 20], "number": [4, 5, 3]} + + +@pytest.mark.integration +def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSession, session_catalog: RestCatalog) -> None: + identifier = "default.table_partitioned_delete_sequence_number" + + # This test case is a bit more complex. Here we run a MoR delete on a file, we make sure that + # the manifest gets rewritten (but not the data file with a MoR), and check if the delete is still there + # to assure that the sequence numbers are maintained + + run_spark_commands( + spark, + [ + f"DROP TABLE IF EXISTS {identifier}", + f""" + CREATE TABLE {identifier} ( + number_partitioned int, + number int + ) + USING iceberg + PARTITIONED BY (number_partitioned) + TBLPROPERTIES( + 'format-version' = 2, + 'write.delete.mode'='merge-on-read', + 'write.update.mode'='merge-on-read', + 'write.merge.mode'='merge-on-read' + ) + """, + f""" + INSERT INTO {identifier} VALUES (10, 100), (10, 101), (20, 200), (20, 201), (20, 202) + """, + # Generate a positional delete + f""" + DELETE FROM {identifier} WHERE number = 101 + """, + ], + ) + + tbl = session_catalog.load_table(identifier) + + files = list(tbl.scan().plan_files()) + assert len(files) == 2 + + # Will rewrite a data file without a positional delete + tbl.delete(EqualTo("number", 201)) + + # One positional delete has been added, but an OVERWRITE status is set + # https://github.com/apache/iceberg/issues/10122 + snapshots = tbl.snapshots() + assert len(snapshots) == 3 + + # Snapshots produced by Spark + assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()[0:2]] == ["append", "overwrite"] + + # Will rewrite one parquet file + assert snapshots[2].summary == Summary( + Operation.OVERWRITE, + **{ + "added-files-size": "1145", + "added-data-files": "1", + "added-records": "2", + "changed-partition-count": "1", + "total-files-size": snapshots[2].summary["total-files-size"], + "total-delete-files": "0", + "total-data-files": "1", + "total-position-deletes": "0", + "total-records": "2", + "total-equality-deletes": "0", + "deleted-data-files": "2", + "removed-delete-files": "1", + "deleted-records": "5", + "removed-files-size": snapshots[2].summary["removed-files-size"], + "removed-position-deletes": "1", + }, + ) + + assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [20, 20, 10], "number": [200, 202, 100]} + + +@pytest.mark.integration +def test_delete_no_match(session_catalog: RestCatalog) -> None: + arrow_schema = pa.schema([pa.field("ints", pa.int32())]) + arrow_tbl = pa.Table.from_pylist( + [ + {"ints": 1}, + {"ints": 3}, + ], + schema=arrow_schema, + ) + + iceberg_schema = Schema(NestedField(1, "ints", IntegerType())) + + tbl_identifier = "default.test_delete_no_match" + + try: + session_catalog.drop_table(tbl_identifier) + except NoSuchTableError: + pass + + tbl = session_catalog.create_table(tbl_identifier, iceberg_schema) + tbl.append(arrow_tbl) + + assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [Operation.APPEND] + + tbl.delete("ints == 2") # Only 1 and 3 in the file, but is between the lower and upper bound + + assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [Operation.APPEND] + + +@pytest.mark.integration +def test_delete_overwrite(session_catalog: RestCatalog) -> None: + arrow_schema = pa.schema([pa.field("ints", pa.int32())]) + arrow_tbl = pa.Table.from_pylist( + [ + {"ints": 1}, + {"ints": 2}, + ], + schema=arrow_schema, + ) + + iceberg_schema = Schema(NestedField(1, "ints", IntegerType())) + + tbl_identifier = "default.test_delete_overwrite" + + try: + session_catalog.drop_table(tbl_identifier) + except NoSuchTableError: + pass + + tbl = session_catalog.create_table(tbl_identifier, iceberg_schema) + tbl.append(arrow_tbl) + + assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [Operation.APPEND] + + arrow_tbl_overwrite = pa.Table.from_pylist( + [ + {"ints": 3}, + {"ints": 4}, + ], + schema=arrow_schema, + ) + tbl.overwrite(arrow_tbl_overwrite, "ints == 2") # Should rewrite one file + + assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [ + Operation.APPEND, + Operation.OVERWRITE, + Operation.APPEND, + ] + + assert tbl.scan().to_arrow()["ints"].to_pylist() == [3, 4, 1] + + +@pytest.mark.integration +def test_delete_truncate(session_catalog: RestCatalog) -> None: + arrow_schema = pa.schema([pa.field("ints", pa.int32())]) + arrow_tbl = pa.Table.from_pylist( + [ + {"ints": 1}, + ], + schema=arrow_schema, + ) + + iceberg_schema = Schema(NestedField(1, "ints", IntegerType())) + + tbl_identifier = "default.test_delete_overwrite" + + try: + session_catalog.drop_table(tbl_identifier) + except NoSuchTableError: + pass + + tbl = session_catalog.create_table(tbl_identifier, iceberg_schema) + tbl.append(arrow_tbl) + + # Effectively a truncate + tbl.delete(delete_filter=AlwaysTrue()) + + manifests = tbl.current_snapshot().manifests(tbl.io) + assert len(manifests) == 1 + + entries = manifests[0].fetch_manifest_entry(tbl.io, discard_deleted=False) + assert len(entries) == 1 + + assert entries[0].status == ManifestEntryStatus.DELETED diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 834fe83d5f..d8a83e0df7 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -103,13 +103,14 @@ def test_inspect_snapshots( assert isinstance(snapshot_id.as_py(), int) assert df["parent_id"][0].as_py() is None - assert df["parent_id"][1:] == df["snapshot_id"][:2] + assert df["parent_id"][1:].to_pylist() == df["snapshot_id"][:-1].to_pylist() - assert [operation.as_py() for operation in df["operation"]] == ["append", "overwrite", "append"] + assert [operation.as_py() for operation in df["operation"]] == ["append", "delete", "append", "append"] for manifest_list in df["manifest_list"]: assert manifest_list.as_py().startswith("s3://") + # Append assert df["summary"][0].as_py() == [ ("added-files-size", "5459"), ("added-data-files", "1"), @@ -122,6 +123,19 @@ def test_inspect_snapshots( ("total-equality-deletes", "0"), ] + # Delete + assert df["summary"][1].as_py() == [ + ("removed-files-size", "5459"), + ("deleted-data-files", "1"), + ("deleted-records", "3"), + ("total-data-files", "0"), + ("total-delete-files", "0"), + ("total-records", "0"), + ("total-files-size", "0"), + ("total-position-deletes", "0"), + ("total-equality-deletes", "0"), + ] + lhs = spark.table(f"{identifier}.snapshots").toPandas() rhs = df.to_pandas() for column in df.column_names: diff --git a/tests/integration/test_rest_schema.py b/tests/integration/test_rest_schema.py index f4ab98a883..644cb8053d 100644 --- a/tests/integration/test_rest_schema.py +++ b/tests/integration/test_rest_schema.py @@ -2512,14 +2512,18 @@ def test_two_add_schemas_in_a_single_transaction(catalog: Catalog) -> None: ), ) - with pytest.raises(CommitFailedException) as exc_info: - with tbl.transaction() as tr: - with tr.update_schema() as update: - update.add_column("bar", field_type=StringType()) - with tr.update_schema() as update: - update.add_column("baz", field_type=StringType()) + with tbl.transaction() as tr: + with tr.update_schema() as update: + update.add_column("bar", field_type=StringType()) + with tr.update_schema() as update: + update.add_column("baz", field_type=StringType()) - assert "CommitFailedException: Requirement failed: current schema changed: expected id 1 != 0" in str(exc_info.value) + assert tbl.schema().schema_id == 2 + assert tbl.schema() == Schema( + NestedField(field_id=1, name="foo", field_type=StringType()), + NestedField(field_id=2, name="bar", field_type=StringType()), + NestedField(field_id=3, name="baz", field_type=StringType()), + ) @pytest.mark.integration diff --git a/tests/integration/test_writes/test_partitioned_writes.py b/tests/integration/test_writes/test_partitioned_writes.py index f6e6e93c11..59bb76933e 100644 --- a/tests/integration/test_writes/test_partitioned_writes.py +++ b/tests/integration/test_writes/test_partitioned_writes.py @@ -38,7 +38,6 @@ TruncateTransform, YearTransform, ) -from tests.conftest import TEST_DATA_WITH_NULL from utils import TABLE_SCHEMA, _create_table @@ -70,7 +69,7 @@ def test_query_filter_null_partitioned( assert tbl.format_version == format_version, f"Expected v{format_version}, got: v{tbl.format_version}" df = spark.table(identifier) assert df.count() == 3, f"Expected 3 total rows for {identifier}" - for col in TEST_DATA_WITH_NULL.keys(): + for col in arrow_table_with_null.column_names: assert df.where(f"{col} is not null").count() == 2, f"Expected 2 non-null rows for {col}" assert df.where(f"{col} is null").count() == 1, f"Expected 1 null row for {col} is null" @@ -81,7 +80,12 @@ def test_query_filter_null_partitioned( ) @pytest.mark.parametrize("format_version", [1, 2]) def test_query_filter_without_data_partitioned( - session_catalog: Catalog, spark: SparkSession, arrow_table_without_data: pa.Table, part_col: str, format_version: int + session_catalog: Catalog, + spark: SparkSession, + arrow_table_without_data: pa.Table, + part_col: str, + arrow_table_with_null: pa.Table, + format_version: int, ) -> None: # Given identifier = f"default.arrow_table_v{format_version}_without_data_partitioned_on_col_{part_col}" @@ -102,7 +106,7 @@ def test_query_filter_without_data_partitioned( # Then assert tbl.format_version == format_version, f"Expected v{format_version}, got: v{tbl.format_version}" df = spark.table(identifier) - for col in TEST_DATA_WITH_NULL.keys(): + for col in arrow_table_with_null.column_names: assert df.where(f"{col} is null").count() == 0, f"Expected 0 row for {col}" assert df.where(f"{col} is not null").count() == 0, f"Expected 0 row for {col}" @@ -134,7 +138,7 @@ def test_query_filter_only_nulls_partitioned( # Then assert tbl.format_version == format_version, f"Expected v{format_version}, got: v{tbl.format_version}" df = spark.table(identifier) - for col in TEST_DATA_WITH_NULL.keys(): + for col in arrow_table_with_only_nulls.column_names: assert df.where(f"{col} is null").count() == 2, f"Expected 2 row for {col}" assert df.where(f"{col} is not null").count() == 0, f"Expected 0 rows for {col}" @@ -169,7 +173,7 @@ def test_query_filter_appended_null_partitioned( # Then assert tbl.format_version == format_version, f"Expected v{format_version}, got: v{tbl.format_version}" df = spark.table(identifier) - for col in TEST_DATA_WITH_NULL.keys(): + for col in arrow_table_with_null.column_names: assert df.where(f"{col} is not null").count() == 6, f"Expected 6 non-null rows for {col}" assert df.where(f"{col} is null").count() == 3, f"Expected 3 null rows for {col}" # expecting 6 files: first append with [A], [B], [C], second append with [A, A], [B, B], [C, C] @@ -212,7 +216,7 @@ def test_query_filter_v1_v2_append_null( # Then assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" - for col in TEST_DATA_WITH_NULL.keys(): # type: ignore + for col in arrow_table_with_null.column_names: # type: ignore df = spark.table(identifier) assert df.where(f"{col} is not null").count() == 4, f"Expected 4 non-null rows for {col}" assert df.where(f"{col} is null").count() == 2, f"Expected 2 null rows for {col}" @@ -422,7 +426,7 @@ def test_append_ymd_transform_partitioned( assert tbl.format_version == format_version, f"Expected v{format_version}, got: v{tbl.format_version}" df = spark.table(identifier) assert df.count() == 3, f"Expected 3 total rows for {identifier}" - for col in TEST_DATA_WITH_NULL.keys(): + for col in arrow_table_with_null.column_names: assert df.where(f"{col} is not null").count() == 2, f"Expected 2 non-null rows for {col}" assert df.where(f"{col} is null").count() == 1, f"Expected 1 null row for {col} is null" diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 4585406cbb..9a541bc8c8 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -37,12 +37,12 @@ from pyiceberg.catalog.rest import RestCatalog from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.io.pyarrow import _dataframe_to_data_files from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table import TableProperties, _dataframe_to_data_files +from pyiceberg.table import TableProperties from pyiceberg.transforms import IdentityTransform from pyiceberg.types import IntegerType, NestedField -from tests.conftest import TEST_DATA_WITH_NULL from utils import _create_table @@ -124,52 +124,55 @@ def test_query_count(spark: SparkSession, format_version: int) -> None: @pytest.mark.integration -@pytest.mark.parametrize("col", TEST_DATA_WITH_NULL.keys()) @pytest.mark.parametrize("format_version", [1, 2]) -def test_query_filter_null(spark: SparkSession, col: str, format_version: int) -> None: +def test_query_filter_null(spark: SparkSession, arrow_table_with_null: pa.Table, format_version: int) -> None: identifier = f"default.arrow_table_v{format_version}_with_null" df = spark.table(identifier) - assert df.where(f"{col} is null").count() == 1, f"Expected 1 row for {col}" - assert df.where(f"{col} is not null").count() == 2, f"Expected 2 rows for {col}" + for col in arrow_table_with_null.column_names: + assert df.where(f"{col} is null").count() == 1, f"Expected 1 row for {col}" + assert df.where(f"{col} is not null").count() == 2, f"Expected 2 rows for {col}" @pytest.mark.integration -@pytest.mark.parametrize("col", TEST_DATA_WITH_NULL.keys()) @pytest.mark.parametrize("format_version", [1, 2]) -def test_query_filter_without_data(spark: SparkSession, col: str, format_version: int) -> None: +def test_query_filter_without_data(spark: SparkSession, arrow_table_with_null: pa.Table, format_version: int) -> None: identifier = f"default.arrow_table_v{format_version}_without_data" df = spark.table(identifier) - assert df.where(f"{col} is null").count() == 0, f"Expected 0 row for {col}" - assert df.where(f"{col} is not null").count() == 0, f"Expected 0 row for {col}" + for col in arrow_table_with_null.column_names: + assert df.where(f"{col} is null").count() == 0, f"Expected 0 row for {col}" + assert df.where(f"{col} is not null").count() == 0, f"Expected 0 row for {col}" @pytest.mark.integration -@pytest.mark.parametrize("col", TEST_DATA_WITH_NULL.keys()) @pytest.mark.parametrize("format_version", [1, 2]) -def test_query_filter_only_nulls(spark: SparkSession, col: str, format_version: int) -> None: +def test_query_filter_only_nulls(spark: SparkSession, arrow_table_with_null: pa.Table, format_version: int) -> None: identifier = f"default.arrow_table_v{format_version}_with_only_nulls" df = spark.table(identifier) - assert df.where(f"{col} is null").count() == 2, f"Expected 2 rows for {col}" - assert df.where(f"{col} is not null").count() == 0, f"Expected 0 row for {col}" + for col in arrow_table_with_null.column_names: + assert df.where(f"{col} is null").count() == 2, f"Expected 2 rows for {col}" + assert df.where(f"{col} is not null").count() == 0, f"Expected 0 row for {col}" @pytest.mark.integration -@pytest.mark.parametrize("col", TEST_DATA_WITH_NULL.keys()) @pytest.mark.parametrize("format_version", [1, 2]) -def test_query_filter_appended_null(spark: SparkSession, col: str, format_version: int) -> None: +def test_query_filter_appended_null(spark: SparkSession, arrow_table_with_null: pa.Table, format_version: int) -> None: identifier = f"default.arrow_table_v{format_version}_appended_with_null" df = spark.table(identifier) - assert df.where(f"{col} is null").count() == 2, f"Expected 1 row for {col}" - assert df.where(f"{col} is not null").count() == 4, f"Expected 2 rows for {col}" + for col in arrow_table_with_null.column_names: + assert df.where(f"{col} is null").count() == 2, f"Expected 1 row for {col}" + assert df.where(f"{col} is not null").count() == 4, f"Expected 2 rows for {col}" @pytest.mark.integration -@pytest.mark.parametrize("col", TEST_DATA_WITH_NULL.keys()) -def test_query_filter_v1_v2_append_null(spark: SparkSession, col: str) -> None: +def test_query_filter_v1_v2_append_null( + spark: SparkSession, + arrow_table_with_null: pa.Table, +) -> None: identifier = "default.arrow_table_v1_v2_appended_with_null" df = spark.table(identifier) - assert df.where(f"{col} is null").count() == 2, f"Expected 1 row for {col}" - assert df.where(f"{col} is not null").count() == 4, f"Expected 2 rows for {col}" + for col in arrow_table_with_null.column_names: + assert df.where(f"{col} is null").count() == 2, f"Expected 1 row for {col}" + assert df.where(f"{col} is not null").count() == 4, f"Expected 2 rows for {col}" @pytest.mark.integration @@ -187,10 +190,11 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi ).collect() operations = [row.operation for row in rows] - assert operations == ["append", "append", "overwrite"] + assert operations == ["append", "append", "delete", "append"] summaries = [row.summary for row in rows] + # Append assert summaries[0] == { "added-data-files": "1", "added-files-size": "5459", @@ -203,6 +207,7 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi "total-records": "3", } + # Append assert summaries[1] == { "added-data-files": "1", "added-files-size": "5459", @@ -215,13 +220,24 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi "total-records": "6", } + # Delete assert summaries[2] == { - "added-data-files": "1", - "added-files-size": "5459", - "added-records": "3", "deleted-data-files": "2", "deleted-records": "6", "removed-files-size": "10918", + "total-data-files": "0", + "total-delete-files": "0", + "total-equality-deletes": "0", + "total-files-size": "0", + "total-position-deletes": "0", + "total-records": "0", + } + + # Overwrite + assert summaries[3] == { + "added-data-files": "1", + "added-files-size": "5459", + "added-records": "3", "total-data-files": "1", "total-delete-files": "0", "total-equality-deletes": "0", @@ -249,9 +265,9 @@ def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_w """ ).collect() - assert [row.added_data_files_count for row in rows] == [1, 1, 0, 1, 1] + assert [row.added_data_files_count for row in rows] == [1, 0, 1, 1, 1] assert [row.existing_data_files_count for row in rows] == [0, 0, 0, 0, 0] - assert [row.deleted_data_files_count for row in rows] == [0, 0, 1, 0, 0] + assert [row.deleted_data_files_count for row in rows] == [0, 1, 0, 0, 0] @pytest.mark.integration @@ -556,7 +572,7 @@ def test_summaries_with_only_nulls( ).collect() operations = [row.operation for row in rows] - assert operations == ["append", "append", "overwrite"] + assert operations == ["append", "append", "delete", "append"] summaries = [row.summary for row in rows] @@ -582,14 +598,23 @@ def test_summaries_with_only_nulls( } assert summaries[2] == { + "deleted-data-files": "1", + "deleted-records": "2", "removed-files-size": "4239", + "total-data-files": "0", + "total-delete-files": "0", "total-equality-deletes": "0", + "total-files-size": "0", "total-position-deletes": "0", - "deleted-data-files": "1", + "total-records": "0", + } + + assert summaries[3] == { + "total-data-files": "0", "total-delete-files": "0", + "total-equality-deletes": "0", "total-files-size": "0", - "deleted-records": "2", - "total-data-files": "0", + "total-position-deletes": "0", "total-records": "0", } @@ -812,13 +837,14 @@ def test_inspect_snapshots( assert isinstance(snapshot_id.as_py(), int) assert df["parent_id"][0].as_py() is None - assert df["parent_id"][1:] == df["snapshot_id"][:2] + assert df["parent_id"][1:].to_pylist() == df["snapshot_id"][:-1].to_pylist() - assert [operation.as_py() for operation in df["operation"]] == ["append", "overwrite", "append"] + assert [operation.as_py() for operation in df["operation"]] == ["append", "delete", "append", "append"] for manifest_list in df["manifest_list"]: assert manifest_list.as_py().startswith("s3://") + # Append assert df["summary"][0].as_py() == [ ("added-files-size", "5459"), ("added-data-files", "1"), @@ -831,6 +857,19 @@ def test_inspect_snapshots( ("total-equality-deletes", "0"), ] + # Delete + assert df["summary"][1].as_py() == [ + ("removed-files-size", "5459"), + ("deleted-data-files", "1"), + ("deleted-records", "3"), + ("total-data-files", "0"), + ("total-delete-files", "0"), + ("total-records", "0"), + ("total-files-size", "0"), + ("total-position-deletes", "0"), + ("total-equality-deletes", "0"), + ] + lhs = spark.table(f"{identifier}.snapshots").toPandas() rhs = df.to_pandas() for column in df.column_names: diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index fa3464052a..ff9d92cea3 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -314,10 +314,6 @@ def test_invalid_operation() -> None: update_snapshot_summaries(summary=Summary(Operation.REPLACE)) assert "Operation not implemented: Operation.REPLACE" in str(e.value) - with pytest.raises(ValueError) as e: - update_snapshot_summaries(summary=Summary(Operation.DELETE)) - assert "Operation not implemented: Operation.DELETE" in str(e.value) - def test_invalid_type() -> None: with pytest.raises(ValueError) as e: