Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add entries metadata table #551

Merged
merged 9 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 159 additions & 1 deletion mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,165 @@ manifest_list: [["s3://warehouse/default/table_metadata_snapshots/metadata/snap-
summary: [[keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","1","0","3","5459","0","0"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-records",...,"total-equality-deletes","total-files-size","deleted-data-files","deleted-records","removed-files-size"]values:["5459","1","3","1","3",...,"0","5459","1","3","5459"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","2","0","6","10918","0","0"]]]
```

### Add Files
### Entries

To show all the table's current manifest entries for both data and delete files.

```python
table.inspect.entries()
```

```
pyarrow.Table
status: int8 not null
snapshot_id: int64 not null
sequence_number: int64 not null
file_sequence_number: int64 not null
data_file: struct<content: int8 not null, file_path: string not null, file_format: string not null, partition: struct<> not null, record_count: int64 not null, file_size_in_bytes: int64 not null, column_sizes: map<int32, int64>, value_counts: map<int32, int64>, null_value_counts: map<int32, int64>, nan_value_counts: map<int32, int64>, lower_bounds: map<int32, binary>, upper_bounds: map<int32, binary>, key_metadata: binary, split_offsets: list<item: int64>, equality_ids: list<item: int32>, sort_order_id: int32> not null
child 0, content: int8 not null
child 1, file_path: string not null
child 2, file_format: string not null
child 3, partition: struct<> not null
child 4, record_count: int64 not null
child 5, file_size_in_bytes: int64 not null
child 6, column_sizes: map<int32, int64>
child 0, entries: struct<key: int32 not null, value: int64> not null
child 0, key: int32 not null
child 1, value: int64
child 7, value_counts: map<int32, int64>
child 0, entries: struct<key: int32 not null, value: int64> not null
child 0, key: int32 not null
child 1, value: int64
child 8, null_value_counts: map<int32, int64>
child 0, entries: struct<key: int32 not null, value: int64> not null
child 0, key: int32 not null
child 1, value: int64
child 9, nan_value_counts: map<int32, int64>
child 0, entries: struct<key: int32 not null, value: int64> not null
child 0, key: int32 not null
child 1, value: int64
child 10, lower_bounds: map<int32, binary>
child 0, entries: struct<key: int32 not null, value: binary> not null
child 0, key: int32 not null
child 1, value: binary
child 11, upper_bounds: map<int32, binary>
child 0, entries: struct<key: int32 not null, value: binary> not null
child 0, key: int32 not null
child 1, value: binary
child 12, key_metadata: binary
child 13, split_offsets: list<item: int64>
child 0, item: int64
child 14, equality_ids: list<item: int32>
child 0, item: int32
child 15, sort_order_id: int32
readable_metrics: struct<city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string> not null, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null, long: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null>
child 0, city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string> not null
child 0, column_size: int64
child 1, value_count: int64
child 2, null_value_count: int64
child 3, nan_value_count: int64
child 4, lower_bound: string
child 5, upper_bound: string
child 1, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null
child 0, column_size: int64
child 1, value_count: int64
child 2, null_value_count: int64
child 3, nan_value_count: int64
child 4, lower_bound: double
child 5, upper_bound: double
child 2, long: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null
child 0, column_size: int64
child 1, value_count: int64
child 2, null_value_count: int64
child 3, nan_value_count: int64
child 4, lower_bound: double
child 5, upper_bound: double
----
status: [[1]]
snapshot_id: [[6245626162224016531]]
sequence_number: [[1]]
file_sequence_number: [[1]]
data_file: [
-- is_valid: all not null
-- child 0 type: int8
[0]
-- child 1 type: string
["s3://warehouse/default/cities/data/00000-0-80766b66-e558-4150-a5cf-85e4c609b9fe.parquet"]
-- child 2 type: string
["PARQUET"]
-- child 3 type: struct<>
-- is_valid: all not null
-- child 4 type: int64
[4]
-- child 5 type: int64
[1656]
-- child 6 type: map<int32, int64>
[keys:[1,2,3]values:[140,135,135]]
-- child 7 type: map<int32, int64>
[keys:[1,2,3]values:[4,4,4]]
-- child 8 type: map<int32, int64>
[keys:[1,2,3]values:[0,0,0]]
-- child 9 type: map<int32, int64>
[keys:[]values:[]]
-- child 10 type: map<int32, binary>
[keys:[1,2,3]values:[416D7374657264616D,8602B68311E34240,3A77BB5E9A9B5EC0]]
-- child 11 type: map<int32, binary>
[keys:[1,2,3]values:[53616E204672616E636973636F,F5BEF1B5678E4A40,304CA60A46651840]]
-- child 12 type: binary
[null]
-- child 13 type: list<item: int64>
[[4]]
-- child 14 type: list<item: int32>
[null]
-- child 15 type: int32
[null]]
readable_metrics: [
-- is_valid: all not null
-- child 0 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string>
-- is_valid: all not null
-- child 0 type: int64
[140]
-- child 1 type: int64
[4]
-- child 2 type: int64
[0]
-- child 3 type: int64
[null]
-- child 4 type: string
["Amsterdam"]
-- child 5 type: string
["San Francisco"]
-- child 1 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double>
-- is_valid: all not null
-- child 0 type: int64
[135]
-- child 1 type: int64
[4]
-- child 2 type: int64
[0]
-- child 3 type: int64
[null]
-- child 4 type: double
[37.773972]
-- child 5 type: double
[53.11254]
-- child 2 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double>
-- is_valid: all not null
-- child 0 type: int64
[135]
-- child 1 type: int64
[4]
-- child 2 type: int64
[0]
-- child 3 type: int64
[null]
-- child 4 type: double
[-122.431297]
-- child 5 type: double
[6.0989]]
```

## Add Files

Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.

Expand Down
124 changes: 124 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from typing_extensions import Annotated

import pyiceberg.expressions.parser as parser
from pyiceberg.conversions import from_bytes
from pyiceberg.exceptions import CommitFailedException, ResolveError, ValidationError
from pyiceberg.expressions import (
AlwaysTrue,
Expand Down Expand Up @@ -3264,3 +3265,126 @@ def snapshots(self) -> "pa.Table":
snapshots,
schema=snapshots_schema,
)

def entries(self) -> "pa.Table":
import pyarrow as pa

from pyiceberg.io.pyarrow import schema_to_pyarrow

schema = self.tbl.metadata.schema()

readable_metrics_struct = []

def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
pa_bound_type = schema_to_pyarrow(bound_type)
return pa.struct([
pa.field("column_size", pa.int64(), nullable=True),
pa.field("value_count", pa.int64(), nullable=True),
pa.field("null_value_count", pa.int64(), nullable=True),
pa.field("nan_value_count", pa.int64(), nullable=True),
pa.field("lower_bound", pa_bound_type, nullable=True),
pa.field("upper_bound", pa_bound_type, nullable=True),
])

for field in self.tbl.metadata.schema().fields:
readable_metrics_struct.append(
pa.field(schema.find_column_name(field.field_id), _readable_metrics_struct(field.field_type), nullable=False)
)

partition_record = self.tbl.metadata.specs_struct()
pa_record_struct = schema_to_pyarrow(partition_record)

entries_schema = pa.schema([
pa.field('status', pa.int8(), nullable=False),
pa.field('snapshot_id', pa.int64(), nullable=False),
pa.field('sequence_number', pa.int64(), nullable=False),
pa.field('file_sequence_number', pa.int64(), nullable=False),
pa.field(
'data_file',
pa.struct([
pa.field('content', pa.int8(), nullable=False),
pa.field('file_path', pa.string(), nullable=False),
pa.field('file_format', pa.string(), nullable=False),
pa.field('partition', pa_record_struct, nullable=False),
pa.field('record_count', pa.int64(), nullable=False),
pa.field('file_size_in_bytes', pa.int64(), nullable=False),
pa.field('column_sizes', pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field('value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field('null_value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field('nan_value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field('lower_bounds', pa.map_(pa.int32(), pa.binary()), nullable=True),
pa.field('upper_bounds', pa.map_(pa.int32(), pa.binary()), nullable=True),
pa.field('key_metadata', pa.binary(), nullable=True),
pa.field('split_offsets', pa.list_(pa.int64()), nullable=True),
pa.field('equality_ids', pa.list_(pa.int32()), nullable=True),
pa.field('sort_order_id', pa.int32(), nullable=True),
]),
nullable=False,
),
pa.field('readable_metrics', pa.struct(readable_metrics_struct), nullable=True),
])

entries = []
if snapshot := self.tbl.metadata.current_snapshot():
for manifest in snapshot.manifests(self.tbl.io):
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
column_sizes = entry.data_file.column_sizes or {}
value_counts = entry.data_file.value_counts or {}
null_value_counts = entry.data_file.null_value_counts or {}
nan_value_counts = entry.data_file.nan_value_counts or {}
lower_bounds = entry.data_file.lower_bounds or {}
upper_bounds = entry.data_file.upper_bounds or {}
readable_metrics = {
schema.find_column_name(field.field_id): {
"column_size": column_sizes.get(field.field_id),
"value_count": value_counts.get(field.field_id),
"null_value_count": null_value_counts.get(field.field_id),
"nan_value_count": nan_value_counts.get(field.field_id),
# Makes them readable
"lower_bound": from_bytes(field.field_type, lower_bound)
if (lower_bound := lower_bounds.get(field.field_id))
else None,
"upper_bound": from_bytes(field.field_type, upper_bound)
if (upper_bound := upper_bounds.get(field.field_id))
else None,
}
for field in self.tbl.metadata.schema().fields
}

partition = entry.data_file.partition
partition_record_dict = {
field.name: partition[pos]
for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
}

entries.append({
'status': entry.status.value,
'snapshot_id': entry.snapshot_id,
'sequence_number': entry.data_sequence_number,
'file_sequence_number': entry.file_sequence_number,
'data_file': {
"content": entry.data_file.content,
"file_path": entry.data_file.file_path,
"file_format": entry.data_file.file_format,
"partition": partition_record_dict,
"record_count": entry.data_file.record_count,
"file_size_in_bytes": entry.data_file.file_size_in_bytes,
"column_sizes": dict(entry.data_file.column_sizes),
"value_counts": dict(entry.data_file.value_counts),
"null_value_counts": dict(entry.data_file.null_value_counts),
"nan_value_counts": entry.data_file.nan_value_counts,
"lower_bounds": entry.data_file.lower_bounds,
"upper_bounds": entry.data_file.upper_bounds,
"key_metadata": entry.data_file.key_metadata,
"split_offsets": entry.data_file.split_offsets,
"equality_ids": entry.data_file.equality_ids,
"sort_order_id": entry.data_file.sort_order_id,
"spec_id": entry.data_file.spec_id,
},
'readable_metrics': readable_metrics,
})

return pa.Table.from_pylist(
entries,
schema=entries_schema,
)
27 changes: 26 additions & 1 deletion pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
IcebergRootModel,
Properties,
)
from pyiceberg.types import transform_dict_value_to_str
from pyiceberg.types import NestedField, StructType, transform_dict_value_to_str
from pyiceberg.utils.config import Config
from pyiceberg.utils.datetime import datetime_to_millis

Expand Down Expand Up @@ -245,6 +245,31 @@ def specs(self) -> Dict[int, PartitionSpec]:
"""Return a dict the partition specs this table."""
return {spec.spec_id: spec for spec in self.partition_specs}

def specs_struct(self) -> StructType:
"""Produce a struct of all the combined PartitionSpecs.

The partition fields should be optional: Partition fields may be added later,
in which case not all files would have the result field, and it may be null.

:return: A StructType that represents all the combined PartitionSpecs of the table
"""
specs = self.specs()

# Collect all the fields
struct_fields = {field.field_id: field for spec in specs.values() for field in spec.fields}

schema = self.schema()

nested_fields = []
# Sort them by field_id in order to get a deterministic output
for field_id in sorted(struct_fields):
field = struct_fields[field_id]
source_type = schema.find_type(field.source_id)
result_type = field.transform.result_type(source_type)
nested_fields.append(NestedField(field_id=field.field_id, name=field.name, type=result_type, required=False))

return StructType(*nested_fields)

def new_snapshot_id(self) -> int:
"""Generate a new snapshot-id that's not in use."""
snapshot_id = _generate_snapshot_id()
Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/utils/lazydict.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,7 @@ def __len__(self) -> int:
"""Return the number of items in the dictionary."""
source = self._dict or self._build_dict()
return len(source)

def __dict__(self) -> Dict[K, V]: # type: ignore
"""Convert the lazy dict in a dict."""
return self._dict or self._build_dict()
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2043,5 +2043,5 @@ def pa_schema() -> "pa.Schema":
def arrow_table_with_null(pa_schema: "pa.Schema") -> "pa.Table":
import pyarrow as pa

"""PyArrow table with all kinds of columns"""
"""Pyarrow table with all kinds of columns."""
return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema)
Loading