Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Add PartitionSpec #4717

Merged
merged 12 commits into from
Jun 12, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/spellcheck-dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ NaN
nan
NestedField
nullability
PartitionField
pragma
PrimitiveType
pyarrow
Expand Down
102 changes: 76 additions & 26 deletions python/src/iceberg/table/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,16 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from dataclasses import dataclass, field
from typing import Dict, List, Tuple

from iceberg.schema import Schema
from iceberg.transforms import Transform

_PARTITION_DATA_ID_START: int = 1000


@dataclass(frozen=True)
class PartitionField:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of implementing the __eq__ and __hash__ we could leverage the dataclass library. If we set eq=True and frozen=True (which makes it immutable, which is also nice), then we get hash automatically:

If eq and frozen are both true, by default dataclass() will generate a __hash__() method for you. If eq is true and frozen is false, __hash__() will be set to None, marking it unhashable (which it is, since it is mutable). If eq is false, __hash__() will be left untouched meaning the __hash__() method of the superclass will be used (if the superclass is object, this means it will fall back to id-based hashing).

More information here: https://docs.python.org/3/library/dataclasses.html

Copy link
Contributor Author

@dramaticlly dramaticlly May 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I agree the PartitionField is an immutable class after construction so dataclass with both eq and frozen sounds fair to me.

for reference this is what will be look like for immutable PartitionField, with all testcase passing (small ordering change on repr but I think default one is very close to what we have today in java impl)

@dataclass(frozen=True)
class PartitionField:
    """
    PartitionField is a single element with name and unique id,
    It represents how one partition value is derived from the source column via transformation

    Attributes:
        source_id(int): The source column id of table's schema
        field_id(int): The partition field id across all the table metadata's partition specs
        transform(Transform): The transform used to produce partition values from source column
        name(str): The name of this partition field
    """
    source_id: int
    field_id: int
    transform: Transform
    name: str

    def __str__(self):
        return f"{self.field_id}: {self.name}: {self.transform}({self.source_id})"

On the other side, I think the biggest benefit of the dataclass is the __post_init__ method which allow for java-like builderPattern equivalent processing when we build the PartitionSpec. There's collection of validations need to happen and I am discussing with @samredai in #4631 (comment).

From what I can tell, we will need a PartitionSpecBuilder class with convenient way to construct the PartitionSpec, but we also want to make sure avoid duplicate the builder logic in an overly complex init method for PartitionSpec

Copy link
Collaborator

@samredai samredai May 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dramaticlly that's an interesting idea I haven't thought of. A big argument for using the builder pattern was that we wanted PartitionSpec to be immutable, which would require us to include a ton of validation logic (everything that would be in a builder) in the __init__ method. If I understand your suggestion, using __post_init__ would allow us to have a typical init method, but then include the builder-type validation logic in the __post_init__ which would fail the initialization of any invalid PartitionSpec. cc: @rdblue what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the plan to use @dataclass? I like that idea, but I won't hold off on reviewing if we want to get it in like this first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also go full pythonic and bump the dataclass to a pydantic model. With Pydantic you can annotate the fields with validators: https://pydantic-docs.helpmanual.io/usage/validators/

We could use the generate Open API classes as the base classes and extend from those:
https://github.com/apache/iceberg/pull/4858/files#diff-4f32e455c8da9fc5dc641048dc398741b72e928f359bfb9e5ef3640e7d32873e

This also allows us to add validation. For example, the BaseUserModel is the generated one from open-api, and the UserModel is the one extended with all the (convience) methods attached to it:

from pydantic import BaseModel, ValidationError, validator

class BaseUserModel(BaseModel):
    name: str
    username: str
    password1: str
    password2: str

    
class UserModel(BaseUserModel):
    @validator('name')
    def name_must_contain_space(cls, v):
        if ' ' not in v:
            raise ValueError('must contain a space')
        return v.title()
    @validator('password2')
    def passwords_match(cls, v, values, **kwargs):
        if 'password1' in values and v != values['password1']:
            raise ValueError('passwords do not match')
        return v
    @validator('username')
    def username_alphanumeric(cls, v):
        assert v.isalnum(), 'must be alphanumeric'
        return v

user = UserModel(
    name='samuel colvin',
    username='scolvin',
    password1='zxcvbn',
    password2='zxcvbn',
)
print(user)
#> name='Samuel Colvin' username='scolvin' password1='zxcvbn' password2='zxcvbn'

try:
    UserModel(
        name='samuel',
        username='scolvin',
        password1='zxcvbn',
        password2='zxcvbn2',
    )
except ValidationError as e:
    print(e)
    """
    2 validation errors for UserModel
    name
      must contain a space (type=value_error)
    password2
      passwords do not match (type=value_error)
    """

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this idea of wrapping the classes generated from the OpenAPI spec. The class naming here might be tricky. Module namespacing allows us to re-use the same name if we want, something like:

from iceberg.openapi import rest_catalog

class PartitionField(rest_catalog.PartitionField):
    ...

We shouldn't expect users to import from the openapi module directly so we shouldn't need to worry about naming conflicts, right? Maybe we should name it _openapi just to be super clear about that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll hold off on commenting too much until I have a chance to look into the pydantic project as well as look at the other PR.

My first ask would be how many dependencies are we bringing in if we add pydantic? I know that some folks were concerned about adding very many external python dependencies, so as not to conflict with their own dependencies, but if the benefit is very large I'm not personally opposed to it (I believe it was somebody / some group from Netflix that originally requested we keep the number of required dependencies down).

But validation, either via a library or via a common pattern we settle on, is something that would be very beneficial.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point, I just checked the dependencies for pydantic and the good news is that all it requires is typing-extensions which is probably just for some python 3.7 backports. That will even probably get dropped at some point when they no longer support 3.7

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not consider this blocking for now.

"""
PartitionField is a single element with name and unique id,
Expand All @@ -29,38 +36,81 @@ class PartitionField:
name(str): The name of this partition field
"""

def __init__(self, source_id: int, field_id: int, transform: Transform, name: str):
self._source_id = source_id
self._field_id = field_id
self._transform = transform
self._name = name
source_id: int
field_id: int
transform: Transform
name: str

def __str__(self):
return f"{self.field_id}: {self.name}: {self.transform}({self.source_id})"


@property
def source_id(self) -> int:
return self._source_id
@dataclass(eq=False, frozen=True)
class PartitionSpec:
"""
PartitionSpec capture the transformation from table data to partition values
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved

@property
def field_id(self) -> int:
return self._field_id
Attributes:
schema(Schema): the schema of data table
spec_id(int): any change to PartitionSpec will produce a new specId
fields(List[PartitionField): list of partition fields to produce partition values
last_assigned_field_id(int): auto-increment partition field id starting from PARTITION_DATA_ID_START
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great if we can also add an example: https://github.com/apache/iceberg/blob/master/python/src/iceberg/types.py#L83-L87 this will also test the str method as the examples are executed as tests as well 👍🏻

Copy link
Contributor Author

@dramaticlly dramaticlly May 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Fokko, I think this is where I want to leave out the example as I intended to construct the PartitionSpec via dedicated builder (which is not included in this PR), as that's the desired way to construct the PartitionSpec with convenient transform helper method and equipped with validation. I can include the example there, what do you think?

"""

@property
def name(self) -> str:
return self._name
schema: Schema
spec_id: int
fields: Tuple[PartitionField, ...]
last_assigned_field_id: int
source_id_to_fields_map: Dict[int, List[PartitionField]] = field(init=False, repr=False)

@property
def transform(self) -> Transform:
return self._transform
def __post_init__(self):
source_id_to_fields_map = dict()
for partition_field in self.fields:
source_column = self.schema.find_column_name(partition_field.source_id)
if not source_column:
raise ValueError(f"Cannot find source column: {partition_field.source_id}")
existing = source_id_to_fields_map.get(partition_field.source_id, [])
existing.append(partition_field)
source_id_to_fields_map[partition_field.source_id] = existing
object.__setattr__(self, "source_id_to_fields_map", source_id_to_fields_map)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


def __eq__(self, other):
return (
self.field_id == other.field_id
and self.source_id == other.source_id
and self.name == other.name
and self.transform == other.transform
)
"""
Produce a boolean to return True if two objects are considered equal

Note:
Equality of PartitionSpec is determined by spec_id and partition fields only
"""
if not isinstance(other, PartitionSpec):
return False
return self.spec_id == other.spec_id and self.fields == other.fields
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved

def __str__(self):
return f"{self.field_id}: {self.name}: {self.transform}({self.source_id})"
"""
Produce a human-readable string representation of PartitionSpec

def __repr__(self):
return f"PartitionField(field_id={self.field_id}, name={self.name}, transform={repr(self.transform)}, source_id={self.source_id})"
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
Note:
Only include list of partition fields in the PartitionSpec's string representation
"""
result_str = "["
if self.fields:
result_str += "\n " + "\n ".join([str(field) for field in self.fields]) + "\n"
result_str += "]"
return result_str

def is_unpartitioned(self) -> bool:
return len(self.fields) < 1
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved

def fields_by_source_id(self, field_id: int) -> List[PartitionField]:
return self.source_id_to_fields_map[field_id]

def compatible_with(self, other: "PartitionSpec") -> bool:
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
"""
Produce a boolean to return True if two PartitionSpec are considered compatible
"""
return all(
this_field.source_id == that_field.source_id
and this_field.transform == that_field.transform
and this_field.name == that_field.name
for this_field, that_field in zip(self.fields, other.fields)
)
32 changes: 30 additions & 2 deletions python/tests/table/test_partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
# specific language governing permissions and limitations
# under the License.

from iceberg.table.partitioning import PartitionField
from iceberg.schema import Schema
from iceberg.table.partitioning import PartitionField, PartitionSpec
from iceberg.transforms import bucket
from iceberg.types import IntegerType

Expand All @@ -32,5 +33,32 @@ def test_partition_field_init():
assert str(partition_field) == "1000: id: bucket[100](3)"
assert (
repr(partition_field)
== "PartitionField(field_id=1000, name=id, transform=transforms.bucket(source_type=IntegerType(), num_buckets=100), source_id=3)"
== "PartitionField(source_id=3, field_id=1000, transform=transforms.bucket(source_type=IntegerType(), num_buckets=100), name='id')"
)


def test_partition_spec_init(table_schema_simple: Schema):
bucket_transform = bucket(IntegerType(), 4)
id_field1 = PartitionField(3, 1001, bucket_transform, "id")
partition_spec1 = PartitionSpec(table_schema_simple, 0, (id_field1,), 1001)

assert partition_spec1.spec_id == 0
assert partition_spec1.schema == table_schema_simple
assert partition_spec1 == partition_spec1
assert partition_spec1 != id_field1
assert str(partition_spec1) == f"[\n {str(id_field1)}\n]"
assert not partition_spec1.is_unpartitioned()
# only differ by PartitionField field_id
id_field2 = PartitionField(3, 1002, bucket_transform, "id")
partition_spec2 = PartitionSpec(table_schema_simple, 0, (id_field2,), 1001)
assert partition_spec1 != partition_spec2
assert partition_spec1.compatible_with(partition_spec2)
assert partition_spec1.fields_by_source_id(3) == [id_field1]


def test_unpartitioned(table_schema_simple: Schema):
unpartitioned = PartitionSpec(table_schema_simple, 1, tuple(), 1000)

assert not unpartitioned.fields
assert unpartitioned.is_unpartitioned()
assert str(unpartitioned) == "[]"