Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Snapshots table metadata #524

Merged
merged 3 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,38 @@ The nested lists indicate the different Arrow buffers, where the first write res

<!-- prettier-ignore-end -->

## Inspecting tables

To explore the table metadata, tables can be inspected.

### Snapshots

Inspect the snapshots of the table:

```python
table.inspect.snapshots()
```

```
pyarrow.Table
committed_at: timestamp[ms] not null
snapshot_id: int64 not null
parent_id: int64
operation: string
manifest_list: string not null
summary: map<string, string>
child 0, entries: struct<key: string not null, value: string> not null
child 0, key: string not null
child 1, value: string
----
committed_at: [[2024-03-15 15:01:25.682,2024-03-15 15:01:25.730,2024-03-15 15:01:25.772]]
snapshot_id: [[805611270568163028,3679426539959220963,5588071473139865870]]
parent_id: [[null,805611270568163028,3679426539959220963]]
operation: [["append","overwrite","append"]]
manifest_list: [["s3://warehouse/default/table_metadata_snapshots/metadata/snap-805611270568163028-0-43637daf-ea4b-4ceb-b096-a60c25481eb5.avro","s3://warehouse/default/table_metadata_snapshots/metadata/snap-3679426539959220963-0-8be81019-adf1-4bb6-a127-e15217bd50b3.avro","s3://warehouse/default/table_metadata_snapshots/metadata/snap-5588071473139865870-0-1382dd7e-5fbc-4c51-9776-a832d7d0984e.avro"]]
summary: [[keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","1","0","3","5459","0","0"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-records",...,"total-equality-deletes","total-files-size","deleted-data-files","deleted-records","removed-files-size"]values:["5459","1","3","1","3",...,"0","5459","1","3","5459"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","2","0","6","10918","0","0"]]]
```

## Schema evolution

PyIceberg supports full schema evolution through the Python API. It takes care of setting the field-IDs and makes sure that only non-breaking changes are done (can be overriden).
Expand Down
51 changes: 51 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,11 @@ def transaction(self) -> Transaction:
"""
return Transaction(self)

@property
def inspect(self) -> InspectTable:
"""Return the InspectTable object to browse the table metadata."""
return InspectTable(self)

def refresh(self) -> Table:
"""Refresh the current table metadata."""
fresh = self.catalog.load_table(self.identifier[1:])
Expand Down Expand Up @@ -2962,3 +2967,49 @@ def _new_field_id(self) -> int:

def _is_duplicate_partition(self, transform: Transform[Any, Any], partition_field: PartitionField) -> bool:
return partition_field.field_id not in self._deletes and partition_field.transform == transform


class InspectTable:
tbl: Table

def __init__(self, tbl: Table) -> None:
self.tbl = tbl

try:
import pyarrow as pa # noqa
except ModuleNotFoundError as e:
raise ModuleNotFoundError("For metadata operations PyArrow needs to be installed") from e

def snapshots(self) -> "pa.Table":
import pyarrow as pa

snapshots_schema = pa.schema([
pa.field('committed_at', pa.timestamp(unit='ms'), nullable=False),
pa.field('snapshot_id', pa.int64(), nullable=False),
pa.field('parent_id', pa.int64(), nullable=True),
pa.field('operation', pa.string(), nullable=True),
pa.field('manifest_list', pa.string(), nullable=False),
pa.field('summary', pa.map_(pa.string(), pa.string()), nullable=True),
])
snapshots = []
for snapshot in self.tbl.metadata.snapshots:
if summary := snapshot.summary:
operation = summary.operation.value
additional_properties = snapshot.summary.additional_properties
else:
operation = None
additional_properties = None

snapshots.append({
'committed_at': datetime.datetime.fromtimestamp(snapshot.timestamp_ms / 1000.0),
'snapshot_id': snapshot.snapshot_id,
'parent_id': snapshot.parent_snapshot_id,
'operation': str(operation),
'manifest_list': snapshot.manifest_list,
'summary': additional_properties,
})

return pa.Table.from_pylist(
snapshots,
schema=snapshots_schema,
)
64 changes: 60 additions & 4 deletions tests/integration/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import uuid
from datetime import date, datetime
from pathlib import Path
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse

import pyarrow as pa
Expand Down Expand Up @@ -135,15 +135,19 @@ def arrow_table_with_only_nulls(pa_schema: pa.Schema) -> pa.Table:
return pa.Table.from_pylist([{}, {}], schema=pa_schema)


def _create_table(session_catalog: Catalog, identifier: str, properties: Properties, data: List[pa.Table]) -> Table:
def _create_table(
session_catalog: Catalog, identifier: str, properties: Properties, data: Optional[List[pa.Table]] = None
) -> Table:
try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties=properties)
for d in data:
tbl.append(d)

if data:
for d in data:
tbl.append(d)

return tbl

Expand Down Expand Up @@ -664,3 +668,55 @@ def test_table_properties_raise_for_none_value(
session_catalog, identifier, {"format-version": format_version, **property_with_none}, [arrow_table_with_null]
)
assert "None type is not a supported value in properties: property_name" in str(exc_info.value)


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_inspect_snapshots(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
identifier = "default.table_metadata_snapshots"
tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version})

tbl.overwrite(arrow_table_with_null)
# should produce a DELETE entry
tbl.overwrite(arrow_table_with_null)
# Since we don't rewrite, this should produce a new manifest with an ADDED entry
tbl.append(arrow_table_with_null)

df = tbl.inspect.snapshots()
Fokko marked this conversation as resolved.
Show resolved Hide resolved

assert df.column_names == [
'committed_at',
'snapshot_id',
'parent_id',
'operation',
'manifest_list',
'summary',
]

for committed_at in df['committed_at']:
assert isinstance(committed_at.as_py(), datetime)

for snapshot_id in df['snapshot_id']:
assert isinstance(snapshot_id.as_py(), int)

assert df['parent_id'][0].as_py() is None
assert df['parent_id'][1:] == df['snapshot_id'][:2]

assert [operation.as_py() for operation in df['operation']] == ['append', 'overwrite', 'append']

for manifest_list in df['manifest_list']:
assert manifest_list.as_py().startswith("s3://")

assert df['summary'][0].as_py() == [
('added-files-size', '5459'),
('added-data-files', '1'),
('added-records', '3'),
('total-data-files', '1'),
('total-delete-files', '0'),
('total-records', '3'),
('total-files-size', '5459'),
('total-position-deletes', '0'),
('total-equality-deletes', '0'),
]