Skip to content

Commit

Permalink
Rename data_sequence_number to sequence_number in ManifestEntry (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
soumya-ghosh authored Jul 11, 2024
1 parent 77a07c9 commit 5aa451d
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 54 deletions.
58 changes: 26 additions & 32 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def __eq__(self, other: Any) -> bool:
2: Schema(
NestedField(0, "status", IntegerType(), required=True),
NestedField(1, "snapshot_id", LongType(), required=False),
NestedField(3, "data_sequence_number", LongType(), required=False),
NestedField(3, "sequence_number", LongType(), required=False),
NestedField(4, "file_sequence_number", LongType(), required=False),
NestedField(2, "data_file", DATA_FILE_TYPE[2], required=True),
),
Expand All @@ -394,10 +394,10 @@ def manifest_entry_schema_with_data_file(format_version: TableVersion, data_file


class ManifestEntry(Record):
__slots__ = ("status", "snapshot_id", "data_sequence_number", "file_sequence_number", "data_file")
__slots__ = ("status", "snapshot_id", "sequence_number", "file_sequence_number", "data_file")
status: ManifestEntryStatus
snapshot_id: Optional[int]
data_sequence_number: Optional[int]
sequence_number: Optional[int]
file_sequence_number: Optional[int]
data_file: DataFile

Expand All @@ -408,43 +408,39 @@ def _wrap(
self,
new_status: ManifestEntryStatus,
new_snapshot_id: Optional[int],
new_data_sequence_number: Optional[int],
new_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
self.status = new_status
self.snapshot_id = new_snapshot_id
self.data_sequence_number = new_data_sequence_number
self.sequence_number = new_sequence_number
self.file_sequence_number = new_file_sequence_number
self.data_file = new_file
return self

def _wrap_append(
self, new_snapshot_id: Optional[int], new_data_sequence_number: Optional[int], new_file: DataFile
self, new_snapshot_id: Optional[int], new_sequence_number: Optional[int], new_file: DataFile
) -> ManifestEntry:
return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id, new_data_sequence_number, None, new_file)
return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id, new_sequence_number, None, new_file)

def _wrap_delete(
self,
new_snapshot_id: Optional[int],
new_data_sequence_number: Optional[int],
new_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
return self._wrap(
ManifestEntryStatus.DELETED, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file
)
return self._wrap(ManifestEntryStatus.DELETED, new_snapshot_id, new_sequence_number, new_file_sequence_number, new_file)

def _wrap_existing(
self,
new_snapshot_id: Optional[int],
new_data_sequence_number: Optional[int],
new_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
return self._wrap(
ManifestEntryStatus.EXISTING, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file
)
return self._wrap(ManifestEntryStatus.EXISTING, new_snapshot_id, new_sequence_number, new_file_sequence_number, new_file)


PARTITION_FIELD_SUMMARY_TYPE = StructType(
Expand Down Expand Up @@ -665,10 +661,10 @@ def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> Mani
if entry.snapshot_id is None:
entry.snapshot_id = manifest.added_snapshot_id

# in v1 tables, the data sequence number is not persisted and can be safely defaulted to 0
# in v2 tables, the data sequence number should be inherited iff the entry status is ADDED
if entry.data_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
entry.data_sequence_number = manifest.sequence_number
# in v1 tables, the sequence number is not persisted and can be safely defaulted to 0
# in v2 tables, the sequence number should be inherited iff the entry status is ADDED
if entry.sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
entry.sequence_number = manifest.sequence_number

# in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0
# in v2 tables, the file sequence number should be inherited iff the entry status is ADDED
Expand All @@ -695,7 +691,7 @@ class ManifestWriter(ABC):
_existing_rows: int
_deleted_files: int
_deleted_rows: int
_min_data_sequence_number: Optional[int]
_min_sequence_number: Optional[int]
_partitions: List[Record]
_reused_entry_wrapper: ManifestEntry

Expand All @@ -712,7 +708,7 @@ def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile,
self._existing_rows = 0
self._deleted_files = 0
self._deleted_rows = 0
self._min_data_sequence_number = None
self._min_sequence_number = None
self._partitions = []
self._reused_entry_wrapper = ManifestEntry()

Expand Down Expand Up @@ -774,7 +770,7 @@ def to_manifest_file(self) -> ManifestFile:
"""Return the manifest file."""
# once the manifest file is generated, no more entries can be added
self.closed = True
min_sequence_number = self._min_data_sequence_number or UNASSIGNED_SEQ
min_sequence_number = self._min_sequence_number or UNASSIGNED_SEQ
return ManifestFile(
manifest_path=self._output_file.location,
manifest_length=len(self._writer.output_file),
Expand Down Expand Up @@ -812,35 +808,33 @@ def add_entry(self, entry: ManifestEntry) -> ManifestWriter:

if (
(entry.status == ManifestEntryStatus.ADDED or entry.status == ManifestEntryStatus.EXISTING)
and entry.data_sequence_number is not None
and (self._min_data_sequence_number is None or entry.data_sequence_number < self._min_data_sequence_number)
and entry.sequence_number is not None
and (self._min_sequence_number is None or entry.sequence_number < self._min_sequence_number)
):
self._min_data_sequence_number = entry.data_sequence_number
self._min_sequence_number = entry.sequence_number

self._writer.write_block([self.prepare_entry(entry)])
return self

def add(self, entry: ManifestEntry) -> ManifestWriter:
if entry.data_sequence_number is not None and entry.data_sequence_number >= 0:
self.add_entry(
self._reused_entry_wrapper._wrap_append(self._snapshot_id, entry.data_sequence_number, entry.data_file)
)
if entry.sequence_number is not None and entry.sequence_number >= 0:
self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, entry.sequence_number, entry.data_file))
else:
self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, None, entry.data_file))
return self

def delete(self, entry: ManifestEntry) -> ManifestWriter:
self.add_entry(
self._reused_entry_wrapper._wrap_delete(
self._snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file
self._snapshot_id, entry.sequence_number, entry.file_sequence_number, entry.data_file
)
)
return self

def existing(self, entry: ManifestEntry) -> ManifestWriter:
self.add_entry(
self._reused_entry_wrapper._wrap_existing(
entry.snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file
entry.snapshot_id, entry.sequence_number, entry.file_sequence_number, entry.data_file
)
)
return self
Expand Down Expand Up @@ -885,7 +879,7 @@ def _meta(self) -> Dict[str, str]:
}

def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
if entry.data_sequence_number is None:
if entry.sequence_number is None:
if entry.snapshot_id is not None and entry.snapshot_id != self._snapshot_id:
raise ValueError(f"Found unassigned sequence number for an entry from snapshot: {entry.snapshot_id}")
if entry.status != ManifestEntryStatus.ADDED:
Expand Down
24 changes: 12 additions & 12 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1888,7 +1888,7 @@ def _open_manifest(
]


def _min_data_file_sequence_number(manifests: List[ManifestFile]) -> int:
def _min_sequence_number(manifests: List[ManifestFile]) -> int:
try:
return min(
manifest.min_sequence_number or INITIAL_SEQUENCE_NUMBER
Expand Down Expand Up @@ -1949,11 +1949,11 @@ def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]
# shared instance across multiple threads.
return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)

def _check_sequence_number(self, min_data_sequence_number: int, manifest: ManifestFile) -> bool:
def _check_sequence_number(self, min_sequence_number: int, manifest: ManifestFile) -> bool:
"""Ensure that no manifests are loaded that contain deletes that are older than the data.
Args:
min_data_sequence_number (int): The minimal sequence number.
min_sequence_number (int): The minimal sequence number.
manifest (ManifestFile): A ManifestFile that can be either data or deletes.
Returns:
Expand All @@ -1962,7 +1962,7 @@ def _check_sequence_number(self, min_data_sequence_number: int, manifest: Manife
return manifest.content == ManifestContent.DATA or (
# Not interested in deletes that are older than the data
manifest.content == ManifestContent.DELETES
and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_data_sequence_number
and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number
)

def plan_files(self) -> Iterable[FileScanTask]:
Expand Down Expand Up @@ -1994,10 +1994,10 @@ def plan_files(self) -> Iterable[FileScanTask]:
self.table_metadata.schema(), self.row_filter, self.case_sensitive, self.options.get("include_empty_files") == "true"
).eval

min_data_sequence_number = _min_data_file_sequence_number(manifests)
min_sequence_number = _min_sequence_number(manifests)

data_entries: List[ManifestEntry] = []
positional_delete_entries = SortedList(key=lambda entry: entry.data_sequence_number or INITIAL_SEQUENCE_NUMBER)
positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)

executor = ExecutorFactory.get_or_create()
for manifest_entry in chain(
Expand All @@ -2011,7 +2011,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
metrics_evaluator,
)
for manifest in manifests
if self._check_sequence_number(min_data_sequence_number, manifest)
if self._check_sequence_number(min_sequence_number, manifest)
],
)
):
Expand Down Expand Up @@ -3150,7 +3150,7 @@ def _write_added_manifest() -> List[ManifestFile]:
ManifestEntry(
status=ManifestEntryStatus.ADDED,
snapshot_id=self._snapshot_id,
data_sequence_number=None,
sequence_number=None,
file_sequence_number=None,
data_file=data_file,
)
Expand Down Expand Up @@ -3353,7 +3353,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
return ManifestEntry(
status=status,
snapshot_id=entry.snapshot_id,
data_sequence_number=entry.data_sequence_number,
sequence_number=entry.sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)
Expand Down Expand Up @@ -3537,7 +3537,7 @@ def _existing_manifests(self) -> List[ManifestFile]:
ManifestEntry(
status=ManifestEntryStatus.EXISTING,
snapshot_id=entry.snapshot_id,
data_sequence_number=entry.data_sequence_number,
sequence_number=entry.sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)
Expand Down Expand Up @@ -3568,7 +3568,7 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]:
ManifestEntry(
status=ManifestEntryStatus.DELETED,
snapshot_id=entry.snapshot_id,
data_sequence_number=entry.data_sequence_number,
sequence_number=entry.sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)
Expand Down Expand Up @@ -4016,7 +4016,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
entries.append({
"status": entry.status.value,
"snapshot_id": entry.snapshot_id,
"sequence_number": entry.data_sequence_number,
"sequence_number": entry.sequence_number,
"file_sequence_number": entry.file_sequence_number,
"data_file": {
"content": entry.data_file.content,
Expand Down
10 changes: 5 additions & 5 deletions tests/avro/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v1() -> None:
entry = ManifestEntry(
status=ManifestEntryStatus.ADDED,
snapshot_id=8638475580105682862,
data_sequence_number=0,
sequence_number=0,
file_sequence_number=0,
data_file=data_file,
)
Expand Down Expand Up @@ -173,7 +173,7 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v1() -> None:
v2_entry = todict(entry)

# These are not written in V1
del v2_entry["data_sequence_number"]
del v2_entry["sequence_number"]
del v2_entry["file_sequence_number"]
del v2_entry["data_file"]["content"]
del v2_entry["data_file"]["equality_ids"]
Expand Down Expand Up @@ -206,7 +206,7 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v2() -> None:
entry = ManifestEntry(
status=ManifestEntryStatus.ADDED,
snapshot_id=8638475580105682862,
data_sequence_number=0,
sequence_number=0,
file_sequence_number=0,
data_file=data_file,
)
Expand Down Expand Up @@ -263,7 +263,7 @@ def test_write_manifest_entry_with_fastavro_read_with_iceberg(format_version: in
entry = ManifestEntry(
status=ManifestEntryStatus.ADDED,
snapshot_id=8638475580105682862,
data_sequence_number=0,
sequence_number=0,
file_sequence_number=0,
data_file=data_file,
)
Expand Down Expand Up @@ -305,7 +305,7 @@ def test_write_manifest_entry_with_fastavro_read_with_iceberg(format_version: in
status=ManifestEntryStatus.ADDED,
snapshot_id=8638475580105682862,
# Not part of v1
data_sequence_number=None,
sequence_number=None,
file_sequence_number=None,
data_file=v1_datafile,
)
Expand Down
10 changes: 5 additions & 5 deletions tests/utils/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def test_read_manifest_entry(generated_manifest_entry_file: str) -> None:

assert manifest_entry.status == ManifestEntryStatus.ADDED
assert manifest_entry.snapshot_id == 8744736658442914487
assert manifest_entry.data_sequence_number == 0
assert manifest_entry.sequence_number == 0
assert isinstance(manifest_entry.data_file, DataFile)

data_file = manifest_entry.data_file
Expand Down Expand Up @@ -250,7 +250,7 @@ def test_read_manifest_v1(generated_manifest_file_file_v1: str) -> None:

entry = entries[0]

assert entry.data_sequence_number == 0
assert entry.sequence_number == 0
assert entry.file_sequence_number == 0
assert entry.snapshot_id == 8744736658442914487
assert entry.status == ManifestEntryStatus.ADDED
Expand Down Expand Up @@ -300,7 +300,7 @@ def test_read_manifest_v2(generated_manifest_file_file_v2: str) -> None:

entry = entries[0]

assert entry.data_sequence_number == 3
assert entry.sequence_number == 3
assert entry.file_sequence_number == 3
assert entry.snapshot_id == 8744736658442914487
assert entry.status == ManifestEntryStatus.ADDED
Expand Down Expand Up @@ -379,7 +379,7 @@ def test_write_manifest(

assert manifest_entry.status == ManifestEntryStatus.ADDED
assert manifest_entry.snapshot_id == 8744736658442914487
assert manifest_entry.data_sequence_number == -1 if format_version == 1 else 3
assert manifest_entry.sequence_number == -1 if format_version == 1 else 3
assert isinstance(manifest_entry.data_file, DataFile)

data_file = manifest_entry.data_file
Expand Down Expand Up @@ -556,7 +556,7 @@ def test_write_manifest_list(

entry = entries[0]

assert entry.data_sequence_number == 0 if format_version == 1 else 3
assert entry.sequence_number == 0 if format_version == 1 else 3
assert entry.file_sequence_number == 0 if format_version == 1 else 3
assert entry.snapshot_id == 8744736658442914487
assert entry.status == ManifestEntryStatus.ADDED

0 comments on commit 5aa451d

Please sign in to comment.