Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename data_sequence_number to sequence_number in ManifestEntry #900

Merged
merged 6 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 26 additions & 32 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def __eq__(self, other: Any) -> bool:
2: Schema(
NestedField(0, "status", IntegerType(), required=True),
NestedField(1, "snapshot_id", LongType(), required=False),
NestedField(3, "data_sequence_number", LongType(), required=False),
NestedField(3, "sequence_number", LongType(), required=False),
NestedField(4, "file_sequence_number", LongType(), required=False),
NestedField(2, "data_file", DATA_FILE_TYPE[2], required=True),
),
Expand All @@ -394,10 +394,10 @@ def manifest_entry_schema_with_data_file(format_version: TableVersion, data_file


class ManifestEntry(Record):
__slots__ = ("status", "snapshot_id", "data_sequence_number", "file_sequence_number", "data_file")
__slots__ = ("status", "snapshot_id", "sequence_number", "file_sequence_number", "data_file")
status: ManifestEntryStatus
snapshot_id: Optional[int]
data_sequence_number: Optional[int]
sequence_number: Optional[int]
file_sequence_number: Optional[int]
data_file: DataFile

Expand All @@ -408,43 +408,39 @@ def _wrap(
self,
new_status: ManifestEntryStatus,
new_snapshot_id: Optional[int],
new_data_sequence_number: Optional[int],
new_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
self.status = new_status
self.snapshot_id = new_snapshot_id
self.data_sequence_number = new_data_sequence_number
self.sequence_number = new_sequence_number
self.file_sequence_number = new_file_sequence_number
self.data_file = new_file
return self

def _wrap_append(
self, new_snapshot_id: Optional[int], new_data_sequence_number: Optional[int], new_file: DataFile
self, new_snapshot_id: Optional[int], new_sequence_number: Optional[int], new_file: DataFile
) -> ManifestEntry:
return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id, new_data_sequence_number, None, new_file)
return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id, new_sequence_number, None, new_file)

def _wrap_delete(
self,
new_snapshot_id: Optional[int],
new_data_sequence_number: Optional[int],
new_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
return self._wrap(
ManifestEntryStatus.DELETED, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file
)
return self._wrap(ManifestEntryStatus.DELETED, new_snapshot_id, new_sequence_number, new_file_sequence_number, new_file)

def _wrap_existing(
self,
new_snapshot_id: Optional[int],
new_data_sequence_number: Optional[int],
new_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
return self._wrap(
ManifestEntryStatus.EXISTING, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file
)
return self._wrap(ManifestEntryStatus.EXISTING, new_snapshot_id, new_sequence_number, new_file_sequence_number, new_file)


PARTITION_FIELD_SUMMARY_TYPE = StructType(
Expand Down Expand Up @@ -665,10 +661,10 @@ def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> Mani
if entry.snapshot_id is None:
entry.snapshot_id = manifest.added_snapshot_id

# in v1 tables, the data sequence number is not persisted and can be safely defaulted to 0
# in v2 tables, the data sequence number should be inherited iff the entry status is ADDED
if entry.data_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
entry.data_sequence_number = manifest.sequence_number
# in v1 tables, the sequence number is not persisted and can be safely defaulted to 0
# in v2 tables, the sequence number should be inherited iff the entry status is ADDED
if entry.sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
entry.sequence_number = manifest.sequence_number

# in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0
# in v2 tables, the file sequence number should be inherited iff the entry status is ADDED
Expand All @@ -695,7 +691,7 @@ class ManifestWriter(ABC):
_existing_rows: int
_deleted_files: int
_deleted_rows: int
_min_data_sequence_number: Optional[int]
_min_sequence_number: Optional[int]
_partitions: List[Record]
_reused_entry_wrapper: ManifestEntry

Expand All @@ -712,7 +708,7 @@ def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile,
self._existing_rows = 0
self._deleted_files = 0
self._deleted_rows = 0
self._min_data_sequence_number = None
self._min_sequence_number = None
self._partitions = []
self._reused_entry_wrapper = ManifestEntry()

Expand Down Expand Up @@ -774,7 +770,7 @@ def to_manifest_file(self) -> ManifestFile:
"""Return the manifest file."""
# once the manifest file is generated, no more entries can be added
self.closed = True
min_sequence_number = self._min_data_sequence_number or UNASSIGNED_SEQ
min_sequence_number = self._min_sequence_number or UNASSIGNED_SEQ
return ManifestFile(
manifest_path=self._output_file.location,
manifest_length=len(self._writer.output_file),
Expand Down Expand Up @@ -812,35 +808,33 @@ def add_entry(self, entry: ManifestEntry) -> ManifestWriter:

if (
(entry.status == ManifestEntryStatus.ADDED or entry.status == ManifestEntryStatus.EXISTING)
and entry.data_sequence_number is not None
and (self._min_data_sequence_number is None or entry.data_sequence_number < self._min_data_sequence_number)
and entry.sequence_number is not None
and (self._min_sequence_number is None or entry.sequence_number < self._min_sequence_number)
):
self._min_data_sequence_number = entry.data_sequence_number
self._min_sequence_number = entry.sequence_number

self._writer.write_block([self.prepare_entry(entry)])
return self

def add(self, entry: ManifestEntry) -> ManifestWriter:
if entry.data_sequence_number is not None and entry.data_sequence_number >= 0:
self.add_entry(
self._reused_entry_wrapper._wrap_append(self._snapshot_id, entry.data_sequence_number, entry.data_file)
)
if entry.sequence_number is not None and entry.sequence_number >= 0:
self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, entry.sequence_number, entry.data_file))
else:
self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, None, entry.data_file))
return self

def delete(self, entry: ManifestEntry) -> ManifestWriter:
self.add_entry(
self._reused_entry_wrapper._wrap_delete(
self._snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file
self._snapshot_id, entry.sequence_number, entry.file_sequence_number, entry.data_file
)
)
return self

def existing(self, entry: ManifestEntry) -> ManifestWriter:
self.add_entry(
self._reused_entry_wrapper._wrap_existing(
entry.snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file
entry.snapshot_id, entry.sequence_number, entry.file_sequence_number, entry.data_file
)
)
return self
Expand Down Expand Up @@ -885,7 +879,7 @@ def _meta(self) -> Dict[str, str]:
}

def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
if entry.data_sequence_number is None:
if entry.sequence_number is None:
if entry.snapshot_id is not None and entry.snapshot_id != self._snapshot_id:
raise ValueError(f"Found unassigned sequence number for an entry from snapshot: {entry.snapshot_id}")
if entry.status != ManifestEntryStatus.ADDED:
Expand Down
24 changes: 12 additions & 12 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1888,7 +1888,7 @@ def _open_manifest(
]


def _min_data_file_sequence_number(manifests: List[ManifestFile]) -> int:
def _min_sequence_number(manifests: List[ManifestFile]) -> int:
try:
return min(
manifest.min_sequence_number or INITIAL_SEQUENCE_NUMBER
Expand Down Expand Up @@ -1949,11 +1949,11 @@ def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]
# shared instance across multiple threads.
return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)

def _check_sequence_number(self, min_data_sequence_number: int, manifest: ManifestFile) -> bool:
def _check_sequence_number(self, min_sequence_number: int, manifest: ManifestFile) -> bool:
"""Ensure that no manifests are loaded that contain deletes that are older than the data.

Args:
min_data_sequence_number (int): The minimal sequence number.
min_sequence_number (int): The minimal sequence number.
manifest (ManifestFile): A ManifestFile that can be either data or deletes.

Returns:
Expand All @@ -1962,7 +1962,7 @@ def _check_sequence_number(self, min_data_sequence_number: int, manifest: Manife
return manifest.content == ManifestContent.DATA or (
# Not interested in deletes that are older than the data
manifest.content == ManifestContent.DELETES
and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_data_sequence_number
and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number
)

def plan_files(self) -> Iterable[FileScanTask]:
Expand Down Expand Up @@ -1994,10 +1994,10 @@ def plan_files(self) -> Iterable[FileScanTask]:
self.table_metadata.schema(), self.row_filter, self.case_sensitive, self.options.get("include_empty_files") == "true"
).eval

min_data_sequence_number = _min_data_file_sequence_number(manifests)
min_sequence_number = _min_sequence_number(manifests)

data_entries: List[ManifestEntry] = []
positional_delete_entries = SortedList(key=lambda entry: entry.data_sequence_number or INITIAL_SEQUENCE_NUMBER)
positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)

executor = ExecutorFactory.get_or_create()
for manifest_entry in chain(
Expand All @@ -2011,7 +2011,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
metrics_evaluator,
)
for manifest in manifests
if self._check_sequence_number(min_data_sequence_number, manifest)
if self._check_sequence_number(min_sequence_number, manifest)
],
)
):
Expand Down Expand Up @@ -3150,7 +3150,7 @@ def _write_added_manifest() -> List[ManifestFile]:
ManifestEntry(
status=ManifestEntryStatus.ADDED,
snapshot_id=self._snapshot_id,
data_sequence_number=None,
sequence_number=None,
file_sequence_number=None,
data_file=data_file,
)
Expand Down Expand Up @@ -3353,7 +3353,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
return ManifestEntry(
status=status,
snapshot_id=entry.snapshot_id,
data_sequence_number=entry.data_sequence_number,
sequence_number=entry.sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)
Expand Down Expand Up @@ -3537,7 +3537,7 @@ def _existing_manifests(self) -> List[ManifestFile]:
ManifestEntry(
status=ManifestEntryStatus.EXISTING,
snapshot_id=entry.snapshot_id,
data_sequence_number=entry.data_sequence_number,
sequence_number=entry.sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)
Expand Down Expand Up @@ -3568,7 +3568,7 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]:
ManifestEntry(
status=ManifestEntryStatus.DELETED,
snapshot_id=entry.snapshot_id,
data_sequence_number=entry.data_sequence_number,
sequence_number=entry.sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)
Expand Down Expand Up @@ -4016,7 +4016,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
entries.append({
"status": entry.status.value,
"snapshot_id": entry.snapshot_id,
"sequence_number": entry.data_sequence_number,
"sequence_number": entry.sequence_number,
"file_sequence_number": entry.file_sequence_number,
"data_file": {
"content": entry.data_file.content,
Expand Down
10 changes: 5 additions & 5 deletions tests/avro/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v1() -> None:
entry = ManifestEntry(
status=ManifestEntryStatus.ADDED,
snapshot_id=8638475580105682862,
data_sequence_number=0,
sequence_number=0,
file_sequence_number=0,
data_file=data_file,
)
Expand Down Expand Up @@ -173,7 +173,7 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v1() -> None:
v2_entry = todict(entry)

# These are not written in V1
del v2_entry["data_sequence_number"]
del v2_entry["sequence_number"]
del v2_entry["file_sequence_number"]
del v2_entry["data_file"]["content"]
del v2_entry["data_file"]["equality_ids"]
Expand Down Expand Up @@ -206,7 +206,7 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v2() -> None:
entry = ManifestEntry(
status=ManifestEntryStatus.ADDED,
snapshot_id=8638475580105682862,
data_sequence_number=0,
sequence_number=0,
file_sequence_number=0,
data_file=data_file,
)
Expand Down Expand Up @@ -263,7 +263,7 @@ def test_write_manifest_entry_with_fastavro_read_with_iceberg(format_version: in
entry = ManifestEntry(
status=ManifestEntryStatus.ADDED,
snapshot_id=8638475580105682862,
data_sequence_number=0,
sequence_number=0,
file_sequence_number=0,
data_file=data_file,
)
Expand Down Expand Up @@ -305,7 +305,7 @@ def test_write_manifest_entry_with_fastavro_read_with_iceberg(format_version: in
status=ManifestEntryStatus.ADDED,
snapshot_id=8638475580105682862,
# Not part of v1
data_sequence_number=None,
sequence_number=None,
file_sequence_number=None,
data_file=v1_datafile,
)
Expand Down
10 changes: 5 additions & 5 deletions tests/utils/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def test_read_manifest_entry(generated_manifest_entry_file: str) -> None:

assert manifest_entry.status == ManifestEntryStatus.ADDED
soumya-ghosh marked this conversation as resolved.
Show resolved Hide resolved
assert manifest_entry.snapshot_id == 8744736658442914487
assert manifest_entry.data_sequence_number == 0
assert manifest_entry.sequence_number == 0
assert isinstance(manifest_entry.data_file, DataFile)

data_file = manifest_entry.data_file
Expand Down Expand Up @@ -250,7 +250,7 @@ def test_read_manifest_v1(generated_manifest_file_file_v1: str) -> None:

entry = entries[0]

assert entry.data_sequence_number == 0
assert entry.sequence_number == 0
assert entry.file_sequence_number == 0
assert entry.snapshot_id == 8744736658442914487
assert entry.status == ManifestEntryStatus.ADDED
Expand Down Expand Up @@ -300,7 +300,7 @@ def test_read_manifest_v2(generated_manifest_file_file_v2: str) -> None:

entry = entries[0]

assert entry.data_sequence_number == 3
assert entry.sequence_number == 3
assert entry.file_sequence_number == 3
assert entry.snapshot_id == 8744736658442914487
assert entry.status == ManifestEntryStatus.ADDED
Expand Down Expand Up @@ -379,7 +379,7 @@ def test_write_manifest(

assert manifest_entry.status == ManifestEntryStatus.ADDED
assert manifest_entry.snapshot_id == 8744736658442914487
assert manifest_entry.data_sequence_number == -1 if format_version == 1 else 3
assert manifest_entry.sequence_number == -1 if format_version == 1 else 3
assert isinstance(manifest_entry.data_file, DataFile)

data_file = manifest_entry.data_file
Expand Down Expand Up @@ -556,7 +556,7 @@ def test_write_manifest_list(

entry = entries[0]

assert entry.data_sequence_number == 0 if format_version == 1 else 3
assert entry.sequence_number == 0 if format_version == 1 else 3
assert entry.file_sequence_number == 0 if format_version == 1 else 3
assert entry.snapshot_id == 8744736658442914487
assert entry.status == ManifestEntryStatus.ADDED