From aa5f82c38d48d1c79ae0d17fe90a68a971e88ece Mon Sep 17 00:00:00 2001 From: Soumya Ghosh Date: Sun, 7 Jul 2024 03:30:03 +0530 Subject: [PATCH 1/6] Rename data_sequence_number to sequence_number in ManifestEntry --- pyiceberg/manifest.py | 18 +++++++++--------- pyiceberg/table/__init__.py | 8 ++++---- tests/avro/test_file.py | 10 +++++----- tests/utils/test_manifest.py | 10 +++++----- 4 files changed, 23 insertions(+), 23 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 6148d9a69a..f2547e8fc6 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -377,7 +377,7 @@ def __eq__(self, other: Any) -> bool: 2: Schema( NestedField(0, "status", IntegerType(), required=True), NestedField(1, "snapshot_id", LongType(), required=False), - NestedField(3, "data_sequence_number", LongType(), required=False), + NestedField(3, "sequence_number", LongType(), required=False), NestedField(4, "file_sequence_number", LongType(), required=False), NestedField(2, "data_file", DATA_FILE_TYPE[2], required=True), ), @@ -394,10 +394,10 @@ def manifest_entry_schema_with_data_file(format_version: TableVersion, data_file class ManifestEntry(Record): - __slots__ = ("status", "snapshot_id", "data_sequence_number", "file_sequence_number", "data_file") + __slots__ = ("status", "snapshot_id", "sequence_number", "file_sequence_number", "data_file") status: ManifestEntryStatus snapshot_id: Optional[int] - data_sequence_number: Optional[int] + sequence_number: Optional[int] file_sequence_number: Optional[int] data_file: DataFile @@ -667,8 +667,8 @@ def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> Mani # in v1 tables, the data sequence number is not persisted and can be safely defaulted to 0 # in v2 tables, the data sequence number should be inherited iff the entry status is ADDED - if entry.data_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED): - entry.data_sequence_number = manifest.sequence_number + if entry.sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED): + entry.sequence_number = manifest.sequence_number # in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0 # in v2 tables, the file sequence number should be inherited iff the entry status is ADDED @@ -812,10 +812,10 @@ def add_entry(self, entry: ManifestEntry) -> ManifestWriter: if ( (entry.status == ManifestEntryStatus.ADDED or entry.status == ManifestEntryStatus.EXISTING) - and entry.data_sequence_number is not None - and (self._min_data_sequence_number is None or entry.data_sequence_number < self._min_data_sequence_number) + and entry.sequence_number is not None + and (self._min_data_sequence_number is None or entry.sequence_number < self._min_data_sequence_number) ): - self._min_data_sequence_number = entry.data_sequence_number + self._min_data_sequence_number = entry.sequence_number self._writer.write_block([self.prepare_entry(entry)]) return self @@ -885,7 +885,7 @@ def _meta(self) -> Dict[str, str]: } def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: - if entry.data_sequence_number is None: + if entry.sequence_number is None: if entry.snapshot_id is not None and entry.snapshot_id != self._snapshot_id: raise ValueError(f"Found unassigned sequence number for an entry from snapshot: {entry.snapshot_id}") if entry.status != ManifestEntryStatus.ADDED: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 39bcfc2ef6..ce71073a91 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1997,7 +1997,7 @@ def plan_files(self) -> Iterable[FileScanTask]: min_data_sequence_number = _min_data_file_sequence_number(manifests) data_entries: List[ManifestEntry] = [] - positional_delete_entries = SortedList(key=lambda entry: entry.data_sequence_number or INITIAL_SEQUENCE_NUMBER) + positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER) executor = ExecutorFactory.get_or_create() for manifest_entry in chain( @@ -3150,7 +3150,7 @@ def _write_added_manifest() -> List[ManifestFile]: ManifestEntry( status=ManifestEntryStatus.ADDED, snapshot_id=self._snapshot_id, - data_sequence_number=None, + sequence_number=None, file_sequence_number=None, data_file=data_file, ) @@ -3568,7 +3568,7 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: ManifestEntry( status=ManifestEntryStatus.DELETED, snapshot_id=entry.snapshot_id, - data_sequence_number=entry.data_sequence_number, + sequence_number=entry.sequence_number, file_sequence_number=entry.file_sequence_number, data_file=entry.data_file, ) @@ -4016,7 +4016,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: entries.append({ "status": entry.status.value, "snapshot_id": entry.snapshot_id, - "sequence_number": entry.data_sequence_number, + "sequence_number": entry.sequence_number, "file_sequence_number": entry.file_sequence_number, "data_file": { "content": entry.data_file.content, diff --git a/tests/avro/test_file.py b/tests/avro/test_file.py index 4df132304c..981aab2547 100644 --- a/tests/avro/test_file.py +++ b/tests/avro/test_file.py @@ -140,7 +140,7 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v1() -> None: entry = ManifestEntry( status=ManifestEntryStatus.ADDED, snapshot_id=8638475580105682862, - data_sequence_number=0, + sequence_number=0, file_sequence_number=0, data_file=data_file, ) @@ -173,7 +173,7 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v1() -> None: v2_entry = todict(entry) # These are not written in V1 - del v2_entry["data_sequence_number"] + del v2_entry["sequence_number"] del v2_entry["file_sequence_number"] del v2_entry["data_file"]["content"] del v2_entry["data_file"]["equality_ids"] @@ -206,7 +206,7 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v2() -> None: entry = ManifestEntry( status=ManifestEntryStatus.ADDED, snapshot_id=8638475580105682862, - data_sequence_number=0, + sequence_number=0, file_sequence_number=0, data_file=data_file, ) @@ -263,7 +263,7 @@ def test_write_manifest_entry_with_fastavro_read_with_iceberg(format_version: in entry = ManifestEntry( status=ManifestEntryStatus.ADDED, snapshot_id=8638475580105682862, - data_sequence_number=0, + sequence_number=0, file_sequence_number=0, data_file=data_file, ) @@ -305,7 +305,7 @@ def test_write_manifest_entry_with_fastavro_read_with_iceberg(format_version: in status=ManifestEntryStatus.ADDED, snapshot_id=8638475580105682862, # Not part of v1 - data_sequence_number=None, + sequence_number=None, file_sequence_number=None, data_file=v1_datafile, ) diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index ecb99e281e..ef33b16b00 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -66,7 +66,7 @@ def test_read_manifest_entry(generated_manifest_entry_file: str) -> None: assert manifest_entry.status == ManifestEntryStatus.ADDED assert manifest_entry.snapshot_id == 8744736658442914487 - assert manifest_entry.data_sequence_number == 0 + assert manifest_entry.sequence_number == 0 assert isinstance(manifest_entry.data_file, DataFile) data_file = manifest_entry.data_file @@ -250,7 +250,7 @@ def test_read_manifest_v1(generated_manifest_file_file_v1: str) -> None: entry = entries[0] - assert entry.data_sequence_number == 0 + assert entry.sequence_number == 0 assert entry.file_sequence_number == 0 assert entry.snapshot_id == 8744736658442914487 assert entry.status == ManifestEntryStatus.ADDED @@ -300,7 +300,7 @@ def test_read_manifest_v2(generated_manifest_file_file_v2: str) -> None: entry = entries[0] - assert entry.data_sequence_number == 3 + assert entry.sequence_number == 3 assert entry.file_sequence_number == 3 assert entry.snapshot_id == 8744736658442914487 assert entry.status == ManifestEntryStatus.ADDED @@ -379,7 +379,7 @@ def test_write_manifest( assert manifest_entry.status == ManifestEntryStatus.ADDED assert manifest_entry.snapshot_id == 8744736658442914487 - assert manifest_entry.data_sequence_number == -1 if format_version == 1 else 3 + assert manifest_entry.sequence_number == -1 if format_version == 1 else 3 assert isinstance(manifest_entry.data_file, DataFile) data_file = manifest_entry.data_file @@ -556,7 +556,7 @@ def test_write_manifest_list( entry = entries[0] - assert entry.data_sequence_number == 0 if format_version == 1 else 3 + assert entry.sequence_number == 0 if format_version == 1 else 3 assert entry.file_sequence_number == 0 if format_version == 1 else 3 assert entry.snapshot_id == 8744736658442914487 assert entry.status == ManifestEntryStatus.ADDED From 585d4afc085bff583955f25a7634ccf1c3ab41c2 Mon Sep 17 00:00:00 2001 From: Soumya Ghosh Date: Mon, 8 Jul 2024 23:33:34 +0530 Subject: [PATCH 2/6] Rename _min_data_sequence_number to _min_sequence_number in ManifestWriter --- pyiceberg/manifest.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index f2547e8fc6..a7b0500082 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -712,7 +712,7 @@ def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, self._existing_rows = 0 self._deleted_files = 0 self._deleted_rows = 0 - self._min_data_sequence_number = None + self._min_sequence_number = None self._partitions = [] self._reused_entry_wrapper = ManifestEntry() @@ -774,7 +774,7 @@ def to_manifest_file(self) -> ManifestFile: """Return the manifest file.""" # once the manifest file is generated, no more entries can be added self.closed = True - min_sequence_number = self._min_data_sequence_number or UNASSIGNED_SEQ + min_sequence_number = self._min_sequence_number or UNASSIGNED_SEQ return ManifestFile( manifest_path=self._output_file.location, manifest_length=len(self._writer.output_file), @@ -813,9 +813,9 @@ def add_entry(self, entry: ManifestEntry) -> ManifestWriter: if ( (entry.status == ManifestEntryStatus.ADDED or entry.status == ManifestEntryStatus.EXISTING) and entry.sequence_number is not None - and (self._min_data_sequence_number is None or entry.sequence_number < self._min_data_sequence_number) + and (self._min_sequence_number is None or entry.sequence_number < self._min_sequence_number) ): - self._min_data_sequence_number = entry.sequence_number + self._min_sequence_number = entry.sequence_number self._writer.write_block([self.prepare_entry(entry)]) return self From 3988ee00908d9c63e5721753972ec8e50b4941a2 Mon Sep 17 00:00:00 2001 From: Soumya Ghosh Date: Mon, 8 Jul 2024 23:40:15 +0530 Subject: [PATCH 3/6] Rename min_data_sequence_number to min_sequence_number in pyiceberg.table --- pyiceberg/table/__init__.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index ce71073a91..3d84c5b3b4 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1949,11 +1949,11 @@ def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool] # shared instance across multiple threads. return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition) - def _check_sequence_number(self, min_data_sequence_number: int, manifest: ManifestFile) -> bool: + def _check_sequence_number(self, min_sequence_number: int, manifest: ManifestFile) -> bool: """Ensure that no manifests are loaded that contain deletes that are older than the data. Args: - min_data_sequence_number (int): The minimal sequence number. + min_sequence_number (int): The minimal sequence number. manifest (ManifestFile): A ManifestFile that can be either data or deletes. Returns: @@ -1962,7 +1962,7 @@ def _check_sequence_number(self, min_data_sequence_number: int, manifest: Manife return manifest.content == ManifestContent.DATA or ( # Not interested in deletes that are older than the data manifest.content == ManifestContent.DELETES - and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_data_sequence_number + and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number ) def plan_files(self) -> Iterable[FileScanTask]: @@ -1994,7 +1994,7 @@ def plan_files(self) -> Iterable[FileScanTask]: self.table_metadata.schema(), self.row_filter, self.case_sensitive, self.options.get("include_empty_files") == "true" ).eval - min_data_sequence_number = _min_data_file_sequence_number(manifests) + min_sequence_number = _min_data_file_sequence_number(manifests) data_entries: List[ManifestEntry] = [] positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER) @@ -2011,7 +2011,7 @@ def plan_files(self) -> Iterable[FileScanTask]: metrics_evaluator, ) for manifest in manifests - if self._check_sequence_number(min_data_sequence_number, manifest) + if self._check_sequence_number(min_sequence_number, manifest) ], ) ): From aea36b8a8c669f69bcd0a151dc87aa5c2f88c8b7 Mon Sep 17 00:00:00 2001 From: Soumya Ghosh Date: Tue, 9 Jul 2024 00:13:24 +0530 Subject: [PATCH 4/6] Rename _min_data_sequence_number to _min_sequence_number in ManifestWriter --- pyiceberg/manifest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index a7b0500082..cbc1897448 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -695,7 +695,7 @@ class ManifestWriter(ABC): _existing_rows: int _deleted_files: int _deleted_rows: int - _min_data_sequence_number: Optional[int] + _min_sequence_number: Optional[int] _partitions: List[Record] _reused_entry_wrapper: ManifestEntry From 55b9985271d2ef5c32cd7ce61ed1d563b18f5290 Mon Sep 17 00:00:00 2001 From: Soumya Ghosh Date: Wed, 10 Jul 2024 00:24:11 +0530 Subject: [PATCH 5/6] Fix comments and method name --- pyiceberg/manifest.py | 4 ++-- pyiceberg/table/__init__.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index cbc1897448..4e1bab32bb 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -665,8 +665,8 @@ def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> Mani if entry.snapshot_id is None: entry.snapshot_id = manifest.added_snapshot_id - # in v1 tables, the data sequence number is not persisted and can be safely defaulted to 0 - # in v2 tables, the data sequence number should be inherited iff the entry status is ADDED + # in v1 tables, the sequence number is not persisted and can be safely defaulted to 0 + # in v2 tables, the sequence number should be inherited iff the entry status is ADDED if entry.sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED): entry.sequence_number = manifest.sequence_number diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 3d84c5b3b4..88100d9648 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1888,7 +1888,7 @@ def _open_manifest( ] -def _min_data_file_sequence_number(manifests: List[ManifestFile]) -> int: +def _min_sequence_number(manifests: List[ManifestFile]) -> int: try: return min( manifest.min_sequence_number or INITIAL_SEQUENCE_NUMBER @@ -1994,7 +1994,7 @@ def plan_files(self) -> Iterable[FileScanTask]: self.table_metadata.schema(), self.row_filter, self.case_sensitive, self.options.get("include_empty_files") == "true" ).eval - min_sequence_number = _min_data_file_sequence_number(manifests) + min_sequence_number = _min_sequence_number(manifests) data_entries: List[ManifestEntry] = [] positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER) From 32e07329d528a53040713bebe20d50fe1cd1c014 Mon Sep 17 00:00:00 2001 From: Soumya Ghosh Date: Wed, 10 Jul 2024 20:26:54 +0530 Subject: [PATCH 6/6] Fix variables in upstream code --- pyiceberg/manifest.py | 30 ++++++++++++------------------ pyiceberg/table/__init__.py | 4 ++-- 2 files changed, 14 insertions(+), 20 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 4e1bab32bb..960952d02d 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -408,43 +408,39 @@ def _wrap( self, new_status: ManifestEntryStatus, new_snapshot_id: Optional[int], - new_data_sequence_number: Optional[int], + new_sequence_number: Optional[int], new_file_sequence_number: Optional[int], new_file: DataFile, ) -> ManifestEntry: self.status = new_status self.snapshot_id = new_snapshot_id - self.data_sequence_number = new_data_sequence_number + self.sequence_number = new_sequence_number self.file_sequence_number = new_file_sequence_number self.data_file = new_file return self def _wrap_append( - self, new_snapshot_id: Optional[int], new_data_sequence_number: Optional[int], new_file: DataFile + self, new_snapshot_id: Optional[int], new_sequence_number: Optional[int], new_file: DataFile ) -> ManifestEntry: - return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id, new_data_sequence_number, None, new_file) + return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id, new_sequence_number, None, new_file) def _wrap_delete( self, new_snapshot_id: Optional[int], - new_data_sequence_number: Optional[int], + new_sequence_number: Optional[int], new_file_sequence_number: Optional[int], new_file: DataFile, ) -> ManifestEntry: - return self._wrap( - ManifestEntryStatus.DELETED, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file - ) + return self._wrap(ManifestEntryStatus.DELETED, new_snapshot_id, new_sequence_number, new_file_sequence_number, new_file) def _wrap_existing( self, new_snapshot_id: Optional[int], - new_data_sequence_number: Optional[int], + new_sequence_number: Optional[int], new_file_sequence_number: Optional[int], new_file: DataFile, ) -> ManifestEntry: - return self._wrap( - ManifestEntryStatus.EXISTING, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file - ) + return self._wrap(ManifestEntryStatus.EXISTING, new_snapshot_id, new_sequence_number, new_file_sequence_number, new_file) PARTITION_FIELD_SUMMARY_TYPE = StructType( @@ -821,10 +817,8 @@ def add_entry(self, entry: ManifestEntry) -> ManifestWriter: return self def add(self, entry: ManifestEntry) -> ManifestWriter: - if entry.data_sequence_number is not None and entry.data_sequence_number >= 0: - self.add_entry( - self._reused_entry_wrapper._wrap_append(self._snapshot_id, entry.data_sequence_number, entry.data_file) - ) + if entry.sequence_number is not None and entry.sequence_number >= 0: + self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, entry.sequence_number, entry.data_file)) else: self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, None, entry.data_file)) return self @@ -832,7 +826,7 @@ def add(self, entry: ManifestEntry) -> ManifestWriter: def delete(self, entry: ManifestEntry) -> ManifestWriter: self.add_entry( self._reused_entry_wrapper._wrap_delete( - self._snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file + self._snapshot_id, entry.sequence_number, entry.file_sequence_number, entry.data_file ) ) return self @@ -840,7 +834,7 @@ def delete(self, entry: ManifestEntry) -> ManifestWriter: def existing(self, entry: ManifestEntry) -> ManifestWriter: self.add_entry( self._reused_entry_wrapper._wrap_existing( - entry.snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file + entry.snapshot_id, entry.sequence_number, entry.file_sequence_number, entry.data_file ) ) return self diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 88100d9648..4080f3a05e 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3353,7 +3353,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> return ManifestEntry( status=status, snapshot_id=entry.snapshot_id, - data_sequence_number=entry.data_sequence_number, + sequence_number=entry.sequence_number, file_sequence_number=entry.file_sequence_number, data_file=entry.data_file, ) @@ -3537,7 +3537,7 @@ def _existing_manifests(self) -> List[ManifestFile]: ManifestEntry( status=ManifestEntryStatus.EXISTING, snapshot_id=entry.snapshot_id, - data_sequence_number=entry.data_sequence_number, + sequence_number=entry.sequence_number, file_sequence_number=entry.file_sequence_number, data_file=entry.data_file, )