Skip to content

Commit

Permalink
Add table statistics update
Browse files Browse the repository at this point in the history
  • Loading branch information
ndrluis committed Nov 4, 2024
1 parent 24a0175 commit a70edb2
Show file tree
Hide file tree
Showing 6 changed files with 322 additions and 0 deletions.
39 changes: 39 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
SnapshotLogEntry,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.table.statistics import StatisticsFile
from pyiceberg.table.update import (
AddPartitionSpecUpdate,
AddSchemaUpdate,
Expand All @@ -94,12 +95,14 @@
AssertTableUUID,
AssignUUIDUpdate,
RemovePropertiesUpdate,
RemoveStatisticsUpdate,
SetCurrentSchemaUpdate,
SetDefaultSortOrderUpdate,
SetDefaultSpecUpdate,
SetLocationUpdate,
SetPropertiesUpdate,
SetSnapshotRefUpdate,
SetStatisticsUpdate,
TableRequirement,
TableUpdate,
UpdatesAndRequirements,
Expand Down Expand Up @@ -663,6 +666,42 @@ def update_location(self, location: str) -> Transaction:
"""
raise NotImplementedError("Not yet implemented")

def set_statistics(self, snapshot_id: int, statistics_file: StatisticsFile) -> Transaction:
"""Set the statistics for a snapshot.
Args:
snapshot_id: The snapshot ID to set the statistics for.
statistics_file: The statistics file to set.
Returns:
The alter table builder.
"""
updates = (
SetStatisticsUpdate(
snapshot_id=snapshot_id,
statistics=statistics_file,
),
)

return self._apply(updates, ())

def remove_statistics(self, snapshot_id: int) -> Transaction:
"""Remove the statistics for a snapshot.
Args:
snapshot_id: The snapshot ID to remove the statistics for.
Returns:
The alter table builder.
"""
updates = (
RemoveStatisticsUpdate(
snapshot_id=snapshot_id,
),
)

return self._apply(updates, ())

def commit_transaction(self) -> Table:
"""Commit the changes to the catalog.
Expand Down
9 changes: 9 additions & 0 deletions pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
SortOrder,
assign_fresh_sort_order_ids,
)
from pyiceberg.table.statistics import StatisticsFile
from pyiceberg.typedef import (
EMPTY_DICT,
IcebergBaseModel,
Expand Down Expand Up @@ -221,6 +222,14 @@ class TableMetadataCommonFields(IcebergBaseModel):
There is always a main branch reference pointing to the
current-snapshot-id even if the refs map is null."""

statistics: List[StatisticsFile] = Field(default_factory=list)
"""A optional list of table statistics files.
Table statistics files are valid Puffin files. Statistics are
informational. A reader can choose to ignore statistics
information. Statistics support is not required to read the
table correctly. A table can contain many statistics files
associated with different table snapshots."""

# validators
@field_validator("properties", mode="before")
def transform_properties_dict_value_to_str(cls, properties: Properties) -> Dict[str, str]:
Expand Down
41 changes: 41 additions & 0 deletions pyiceberg/table/statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import (
Dict,
List,
Optional,
)

from pydantic import Field

from pyiceberg.typedef import IcebergBaseModel


class BlobMetadata(IcebergBaseModel):
type: str
snapshot_id: int = Field(alias="snapshot-id")
sequence_number: int = Field(alias="sequence-number")
fields: List[int]
properties: Optional[Dict[str, str]] = None


class StatisticsFile(IcebergBaseModel):
snapshot_id: int = Field(alias="snapshot-id")
statistics_path: str = Field(alias="statistics-path")
file_size_in_bytes: int = Field(alias="file-size-in-bytes")
file_footer_size_in_bytes: int = Field(alias="file-footer-size-in-bytes")
blob_metadata: List[BlobMetadata] = Field(alias="blob-metadata")
37 changes: 37 additions & 0 deletions pyiceberg/table/update/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
SnapshotLogEntry,
)
from pyiceberg.table.sorting import SortOrder
from pyiceberg.table.statistics import StatisticsFile
from pyiceberg.typedef import (
IcebergBaseModel,
Properties,
Expand Down Expand Up @@ -172,6 +173,17 @@ class RemovePropertiesUpdate(IcebergBaseModel):
removals: List[str]


class SetStatisticsUpdate(IcebergBaseModel):
action: Literal["set-statistics"] = Field(default="set-statistics")
snapshot_id: int = Field(alias="snapshot-id")
statistics: StatisticsFile


class RemoveStatisticsUpdate(IcebergBaseModel):
action: Literal["remove-statistics"] = Field(default="remove-statistics")
snapshot_id: int = Field(alias="snapshot-id")


TableUpdate = Annotated[
Union[
AssignUUIDUpdate,
Expand All @@ -189,6 +201,8 @@ class RemovePropertiesUpdate(IcebergBaseModel):
SetLocationUpdate,
SetPropertiesUpdate,
RemovePropertiesUpdate,
SetStatisticsUpdate,
RemoveStatisticsUpdate,
],
Field(discriminator="action"),
]
Expand Down Expand Up @@ -477,6 +491,29 @@ def _(
return base_metadata.model_copy(update={"default_sort_order_id": new_sort_order_id})


@_apply_table_update.register(SetStatisticsUpdate)
def _(update: SetStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
if update.snapshot_id != update.statistics.snapshot_id:
raise ValueError("Snapshot id in statistics does not match the snapshot id in the update")

rest_statistics = [stat for stat in base_metadata.statistics if stat.snapshot_id != update.snapshot_id]

context.add_update(update)
return base_metadata.model_copy(update={"statistics": rest_statistics + [update.statistics]})


@_apply_table_update.register(RemoveStatisticsUpdate)
def _(update: RemoveStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
if not any(stat.snapshot_id == update.snapshot_id for stat in base_metadata.statistics):
raise ValueError(f"Statistics with snapshot id {update.snapshot_id} does not exist")

statistics = [stat for stat in base_metadata.statistics if stat.snapshot_id != update.snapshot_id]

context.add_update(update)

return base_metadata.model_copy(update={"statistics": statistics})


def update_table_metadata(
base_metadata: TableMetadata,
updates: Tuple[TableUpdate, ...],
Expand Down
98 changes: 98 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,87 @@ def generate_snapshot(
"refs": {"test": {"snapshot-id": 3051729675574597004, "type": "tag", "max-ref-age-ms": 10000000}},
}

TABLE_METADATA_V2_WITH_STATISTICS = {
"format-version": 2,
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
"location": "s3://bucket/test/location",
"last-sequence-number": 34,
"last-updated-ms": 1602638573590,
"last-column-id": 3,
"current-schema-id": 0,
"schemas": [
{
"type": "struct",
"schema-id": 0,
"fields": [
{
"id": 1,
"name": "x",
"required": True,
"type": "long",
}
],
}
],
"default-spec-id": 0,
"partition-specs": [{"spec-id": 0, "fields": []}],
"last-partition-id": 1000,
"default-sort-order-id": 0,
"sort-orders": [{"order-id": 0, "fields": []}],
"properties": {},
"current-snapshot-id": 3055729675574597004,
"snapshots": [
{
"snapshot-id": 3051729675574597004,
"timestamp-ms": 1515100955770,
"sequence-number": 0,
"summary": {"operation": "append"},
"manifest-list": "s3://a/b/1.avro",
},
{
"snapshot-id": 3055729675574597004,
"parent-snapshot-id": 3051729675574597004,
"timestamp-ms": 1555100955770,
"sequence-number": 1,
"summary": {"operation": "append"},
"manifest-list": "s3://a/b/2.avro",
"schema-id": 1,
},
],
"statistics": [
{
"snapshot-id": 3051729675574597004,
"statistics-path": "s3://a/b/stats.puffin",
"file-size-in-bytes": 413,
"file-footer-size-in-bytes": 42,
"blob-metadata": [
{
"type": "ndv",
"snapshot-id": 3051729675574597004,
"sequence-number": 1,
"fields": [1],
}
],
},
{
"snapshot-id": 3055729675574597004,
"statistics-path": "s3://a/b/stats.puffin",
"file-size-in-bytes": 413,
"file-footer-size-in-bytes": 42,
"blob-metadata": [
{
"type": "ndv",
"snapshot-id": 3055729675574597004,
"sequence-number": 1,
"fields": [1],
}
],
},
],
"snapshot-log": [],
"metadata-log": [],
}


@pytest.fixture
def example_table_metadata_v2() -> Dict[str, Any]:
Expand All @@ -929,6 +1010,11 @@ def table_metadata_v2_with_fixed_and_decimal_types() -> Dict[str, Any]:
return TABLE_METADATA_V2_WITH_FIXED_AND_DECIMAL_TYPES


@pytest.fixture
def table_metadata_v2_with_statistics() -> Dict[str, Any]:
return TABLE_METADATA_V2_WITH_STATISTICS


@pytest.fixture(scope="session")
def metadata_location(tmp_path_factory: pytest.TempPathFactory) -> str:
from pyiceberg.io.pyarrow import PyArrowFileIO
Expand Down Expand Up @@ -2170,6 +2256,18 @@ def table_v2_with_extensive_snapshots(example_table_metadata_v2_with_extensive_s
)


@pytest.fixture
def table_v2_with_statistics(table_metadata_v2_with_statistics: Dict[str, Any]) -> Table:
table_metadata = TableMetadataV2(**table_metadata_v2_with_statistics)
return Table(
identifier=("database", "table"),
metadata=table_metadata,
metadata_location=f"{table_metadata.location}/uuid.metadata.json",
io=load_file_io(),
catalog=NoopCatalog("NoopCatalog"),
)


@pytest.fixture
def bound_reference_str() -> BoundReference[str]:
return BoundReference(field=NestedField(1, "field", StringType(), required=False), accessor=Accessor(position=0, inner=None))
Expand Down
Loading

0 comments on commit a70edb2

Please sign in to comment.