Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Add PartitionSpec #4717

Merged
merged 12 commits into from
Jun 12, 2022
Merged

Python: Add PartitionSpec #4717

merged 12 commits into from
Jun 12, 2022

Conversation

dramaticlly
Copy link
Contributor

2nd step of complete the #3228

This change include the PartitionSpec but not include its builder part as I want to keep the changelist as small as possible. As suggested in #4631, the construction of PartitionSpec shall rely on its Builder class with proper checks and such logic shall not be duplicated in its dunder init methods.

from iceberg.schema import Schema
from iceberg.transforms import bucket
from iceberg.types import BooleanType, IntegerType, NestedField, StringType
from iceberg.table.partitioning import PartitionSpec, PartitionField

table_schema = Schema(
        NestedField(field_id=1, name="foo", field_type=StringType(), is_optional=False),
        NestedField(field_id=2, name="bar", field_type=IntegerType(), is_optional=True),
        NestedField(field_id=3, name="baz", field_type=BooleanType(), is_optional=False),
        schema_id=1,
        identifier_field_ids=[1],
    )
bucket_transform = bucket(IntegerType(), 100)
foo_field = PartitionField(source_id=1, field_id=1001, transform=bucket_transform, name="foo_bucket")
partition_spec = PartitionSpec(schema=table_schema, spec_id=0, fields=(foo_field,), last_assigned_field_id=1001)

>>> partition_spec
PartitionSpec: [
  1001: foo_bucket: bucket[100](1)
]

I will come up with follow up on PartitionSpecBuilder class

CC @samredai @rdblue @dhruv-pratap

@github-actions github-actions bot added the python label May 6, 2022
@@ -64,3 +67,100 @@ def __str__(self):

def __repr__(self):
return f"PartitionField(field_id={self.field_id}, name={self.name}, transform={repr(self.transform)}, source_id={self.source_id})"

def __hash__(self):
return hash((self.source_id, self.field_id, self.name, self.transform))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is __hash__ needed? Are partition fields used as map keys?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I added back because it's being used in the hash of the PartitionSpec at line 129

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dramaticlly Why not use @dataclasses.dataclass or @attrs.frozen that will implement these dunder methods for you and reduce the boiler plate code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because it's being used in the hash of the PartitionSpec at line 129

Why does PartitionSpec define __hash__? Will spec be used as a key in a dict or map? I doubt it so I'd probably remove both.

delimiter = "\n "
partition_fields_in_str = (str(partition_field) for partition_field in self.fields)
head = f"[{delimiter}"
tail = f"\n]"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not inline these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python string.join(Iterable) does not have a way to attach head and tail, so I figured it might be easier to see it this way as in f"{head}{delimiter.join(partition_fields_in_str)}{tail}".

But I can certainly change quickly if above is what you prefer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, and this is just me, I feel like this is too much code for just str and repr. I would just take what dataclass or attrs will give me, even for the special unpartitioned case.

return self.spec_id == other.spec_id and self.fields == other.fields

def __str__(self):
if self.is_unpartitioned():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than special casing is_unpartitioned, I think this should just construct the inner string differently. The Java implementation doesn't use special cases.

Copy link
Contributor Author

@dramaticlly dramaticlly May 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a bit nasty to construct it exactly like what Java implementation does, so the special casing is essentially simplified from https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L299. The unpartitioned differ from rest with 1 less \n right before the ]

so here's my alternative way to construct, not sure how do you like it compare to what I have right now.

    def __str__(self):
        result_str = "["
        for partition_field in self.fields:
            result_str += f"\n  {str(partition_field)}"
        if self.is_unpartitioned():
            result_str += "]"
        else:
            result_str += "\n]"
        return result_str

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dramaticlly Again, I would just use @dataclasses.dataclass or @attrs.frozen that will implement all the dunder methods and override them wherever you need special behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a bit nasty to construct it exactly like what Java implementation does

I don't think it is necessary to construct it exactly like Java. But it should not have a special case here, since is_unpartitioned can include specs with void transforms. This should show all of the transforms and not special case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is necessary to construct it exactly like Java.

got it, thanks for clarify

is_unpartitioned can include specs with void transforms

But I am a bit behind on this. From what I can tell, the void transform can never be unpartitioned per existing java code

public boolean isPartitioned() {
return fields.length > 0;
}
public boolean isUnpartitioned() {
return !isPartitioned();
}
. And if there's one void transform added to such PartitionSpec, then I assume there's one partition field with voidTransform as its Transform, which is not unpartitioned like below

> PartitionSpec ps = PartitionSpec.builderFor(SCHEMA).alwaysNull("ts").build();
> System.out.println(ps.toString());
[
  1000: ts_null: void(5)
]

> System.out.println(ps.isUnpartitioned());
false

@@ -64,3 +67,100 @@ def __str__(self):

def __repr__(self):
return f"PartitionField(field_id={self.field_id}, name={self.name}, transform={repr(self.transform)}, source_id={self.source_id})"

def __hash__(self):
return hash((self.source_id, self.field_id, self.name, self.transform))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dramaticlly Why not use @dataclasses.dataclass or @attrs.frozen that will implement these dunder methods for you and reduce the boiler plate code?

last_assigned_field_id(int): auto-increment partition field id starting from PARTITION_DATA_ID_START
"""

def __init__(self, schema: Schema, spec_id: int, fields: Iterable[PartitionField], last_assigned_field_id: int):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we do not have a "builder" for it, should we enforce Keyword only arguments here for better readability? The same goes for PartitionField as well.

Copy link
Contributor Author

@dramaticlly dramaticlly May 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @dhruv-pratap , I specifically mentioned that this is second part of rolling out partitioning, we will have a follow up PR for PartitionSpec builder as it shall be the right way to construct the PartitionSpec, more context in #4631. I purposefully leave it outside scope of this PR

return self.spec_id == other.spec_id and self.fields == other.fields

def __str__(self):
if self.is_unpartitioned():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dramaticlly Again, I would just use @dataclasses.dataclass or @attrs.frozen that will implement all the dunder methods and override them wherever you need special behavior.

delimiter = "\n "
partition_fields_in_str = (str(partition_field) for partition_field in self.fields)
head = f"[{delimiter}"
tail = f"\n]"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, and this is just me, I feel like this is too much code for just str and repr. I would just take what dataclass or attrs will give me, even for the special unpartitioned case.

Comment on lines 135 to 142
if not self._fields_by_source_id:
for partition_field in self.fields:
source_column = self.schema.find_column_name(partition_field.source_id)
if not source_column:
raise ValueError(f"Cannot find source column: {partition_field.source_id}")
existing = self._fields_by_source_id.get(partition_field.source_id, [])
existing.append(partition_field)
self._fields_by_source_id[partition_field.source_id] = existing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this field value should be derived in __init__ , or __post_init__ if you are using @dataclass or @attrs. Reason being it validates the correctness of the object state, and does should raise ValueError as soon as it is created. This seems too late to do raise that error.

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this @dramaticlly 👍🏻 A few comments

from iceberg.transforms import Transform

_PARTITION_DATA_ID_START: int = 1000


class PartitionField:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of implementing the __eq__ and __hash__ we could leverage the dataclass library. If we set eq=True and frozen=True (which makes it immutable, which is also nice), then we get hash automatically:

If eq and frozen are both true, by default dataclass() will generate a __hash__() method for you. If eq is true and frozen is false, __hash__() will be set to None, marking it unhashable (which it is, since it is mutable). If eq is false, __hash__() will be left untouched meaning the __hash__() method of the superclass will be used (if the superclass is object, this means it will fall back to id-based hashing).

More information here: https://docs.python.org/3/library/dataclasses.html

Copy link
Contributor Author

@dramaticlly dramaticlly May 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I agree the PartitionField is an immutable class after construction so dataclass with both eq and frozen sounds fair to me.

for reference this is what will be look like for immutable PartitionField, with all testcase passing (small ordering change on repr but I think default one is very close to what we have today in java impl)

@dataclass(frozen=True)
class PartitionField:
    """
    PartitionField is a single element with name and unique id,
    It represents how one partition value is derived from the source column via transformation

    Attributes:
        source_id(int): The source column id of table's schema
        field_id(int): The partition field id across all the table metadata's partition specs
        transform(Transform): The transform used to produce partition values from source column
        name(str): The name of this partition field
    """
    source_id: int
    field_id: int
    transform: Transform
    name: str

    def __str__(self):
        return f"{self.field_id}: {self.name}: {self.transform}({self.source_id})"

On the other side, I think the biggest benefit of the dataclass is the __post_init__ method which allow for java-like builderPattern equivalent processing when we build the PartitionSpec. There's collection of validations need to happen and I am discussing with @samredai in #4631 (comment).

From what I can tell, we will need a PartitionSpecBuilder class with convenient way to construct the PartitionSpec, but we also want to make sure avoid duplicate the builder logic in an overly complex init method for PartitionSpec

Copy link
Collaborator

@samredai samredai May 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dramaticlly that's an interesting idea I haven't thought of. A big argument for using the builder pattern was that we wanted PartitionSpec to be immutable, which would require us to include a ton of validation logic (everything that would be in a builder) in the __init__ method. If I understand your suggestion, using __post_init__ would allow us to have a typical init method, but then include the builder-type validation logic in the __post_init__ which would fail the initialization of any invalid PartitionSpec. cc: @rdblue what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the plan to use @dataclass? I like that idea, but I won't hold off on reviewing if we want to get it in like this first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also go full pythonic and bump the dataclass to a pydantic model. With Pydantic you can annotate the fields with validators: https://pydantic-docs.helpmanual.io/usage/validators/

We could use the generate Open API classes as the base classes and extend from those:
https://github.com/apache/iceberg/pull/4858/files#diff-4f32e455c8da9fc5dc641048dc398741b72e928f359bfb9e5ef3640e7d32873e

This also allows us to add validation. For example, the BaseUserModel is the generated one from open-api, and the UserModel is the one extended with all the (convience) methods attached to it:

from pydantic import BaseModel, ValidationError, validator

class BaseUserModel(BaseModel):
    name: str
    username: str
    password1: str
    password2: str

    
class UserModel(BaseUserModel):
    @validator('name')
    def name_must_contain_space(cls, v):
        if ' ' not in v:
            raise ValueError('must contain a space')
        return v.title()
    @validator('password2')
    def passwords_match(cls, v, values, **kwargs):
        if 'password1' in values and v != values['password1']:
            raise ValueError('passwords do not match')
        return v
    @validator('username')
    def username_alphanumeric(cls, v):
        assert v.isalnum(), 'must be alphanumeric'
        return v

user = UserModel(
    name='samuel colvin',
    username='scolvin',
    password1='zxcvbn',
    password2='zxcvbn',
)
print(user)
#> name='Samuel Colvin' username='scolvin' password1='zxcvbn' password2='zxcvbn'

try:
    UserModel(
        name='samuel',
        username='scolvin',
        password1='zxcvbn',
        password2='zxcvbn2',
    )
except ValidationError as e:
    print(e)
    """
    2 validation errors for UserModel
    name
      must contain a space (type=value_error)
    password2
      passwords do not match (type=value_error)
    """

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this idea of wrapping the classes generated from the OpenAPI spec. The class naming here might be tricky. Module namespacing allows us to re-use the same name if we want, something like:

from iceberg.openapi import rest_catalog

class PartitionField(rest_catalog.PartitionField):
    ...

We shouldn't expect users to import from the openapi module directly so we shouldn't need to worry about naming conflicts, right? Maybe we should name it _openapi just to be super clear about that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll hold off on commenting too much until I have a chance to look into the pydantic project as well as look at the other PR.

My first ask would be how many dependencies are we bringing in if we add pydantic? I know that some folks were concerned about adding very many external python dependencies, so as not to conflict with their own dependencies, but if the benefit is very large I'm not personally opposed to it (I believe it was somebody / some group from Netflix that originally requested we keep the number of required dependencies down).

But validation, either via a library or via a common pattern we settle on, is something that would be very beneficial.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point, I just checked the dependencies for pydantic and the good news is that all it requires is typing-extensions which is probably just for some python 3.7 backports. That will even probably get dropped at some point when they no longer support 3.7

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not consider this blocking for now.

python/src/iceberg/table/partitioning.py Outdated Show resolved Hide resolved
python/src/iceberg/table/partitioning.py Outdated Show resolved Hide resolved
@dramaticlly
Copy link
Contributor Author

Updated the PR with what dataclass can offer for PartitionField and PartitionSpec. The scope of this change stay the same and need a following PR to add convenient Builder for PartitionSpec

Appreciate the review, if you can take a look that would be great! @rdblue @samredai @Fokko

existing = source_id_to_fields_map.get(partition_field.source_id, [])
existing.append(partition_field)
source_id_to_fields_map[partition_field.source_id] = existing
object.__setattr__(self, "source_id_to_fields_map", source_id_to_fields_map)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

python/src/iceberg/table/partitioning.py Show resolved Hide resolved

def __str__(self):
return f"{self.field_id}: {self.name}: {self.transform}({self.source_id})"
"""
PartitionSpec str method highlight the partition field only
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't really a docstring. It is more of a comment to explain the logic in the method. If you want to add it as a docstring, then I think it should be real docs (like "Produce a human-readable string representation of PartitionSpec") and have this as an additional note.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rdblue , updated per your suggestion

@@ -29,8 +30,37 @@ def test_partition_field_init():
assert partition_field.transform == bucket_transform
assert partition_field.name == "id"
assert partition_field == partition_field
print(str(partition_field))
print("repr")
print(repr(partition_field))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove all print statements from the tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I was testing and forgot about this, removed

schema(Schema): the schema of data table
spec_id(int): any change to PartitionSpec will produce a new specId
fields(List[PartitionField): list of partition fields to produce partition values
last_assigned_field_id(int): auto-increment partition field id starting from PARTITION_DATA_ID_START
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great if we can also add an example: https://github.com/apache/iceberg/blob/master/python/src/iceberg/types.py#L83-L87 this will also test the str method as the examples are executed as tests as well 👍🏻

Copy link
Contributor Author

@dramaticlly dramaticlly May 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Fokko, I think this is where I want to leave out the example as I intended to construct the PartitionSpec via dedicated builder (which is not included in this PR), as that's the desired way to construct the PartitionSpec with convenient transform helper method and equipped with validation. I can include the example there, what do you think?

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some small comments, but apart from that it looks good to me.

Note that it is still a stub, for example last_assigned_field_id isn't being used (yet).

@dramaticlly
Copy link
Contributor Author

Thanks @Fokko for reviewing, appreciate one more pass on this @rdblue and @samredai

Copy link
Collaborator

@samredai samredai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM, thanks @dramaticlly!

python/src/iceberg/table/partitioning.py Outdated Show resolved Hide resolved
Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a bug in the compatible_with, apart from that it looks good! Thanks!

python/src/iceberg/table/partitioning.py Show resolved Hide resolved
python/src/iceberg/table/partitioning.py Outdated Show resolved Hide resolved
@dramaticlly
Copy link
Contributor Author

Thank you Fokko and Sam for reviewing, @rdblue can you take another look?

@rdblue rdblue merged commit de909f0 into apache:master Jun 12, 2022
@rdblue
Copy link
Contributor

rdblue commented Jun 12, 2022

Looks great. Thanks, @dramaticlly! I merged this.

namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
Co-authored-by: Fokko Driesprong <[email protected]>
Co-authored-by: Steve Zhang <[email protected]>
namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
Co-authored-by: Fokko Driesprong <[email protected]>
Co-authored-by: Steve Zhang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants