From 21bcf02fd7eb324145195665f35fb81a6a102d83 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Tue, 9 Jul 2024 21:58:28 +0000 Subject: [PATCH 1/8] schema check --- pyiceberg/io/pyarrow.py | 43 ++++++++++++++ pyiceberg/table/__init__.py | 45 +------------- tests/integration/test_add_files.py | 51 ++++++++++++++++ tests/io/test_pyarrow.py | 91 ++++++++++++++++++++++++++++ tests/table/test_init.py | 92 ----------------------------- 5 files changed, 186 insertions(+), 136 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 50406972a7..3499c50116 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1974,6 +1974,47 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[ return bin_packed_record_batches +def _check_schema_compatible(table_schema: Schema, other_schema: pa.Schema) -> None: + """ + Check if the `table_schema` is compatible with `other_schema`. + + Two schemas are considered compatible when they are equal in terms of the Iceberg Schema type. + + Raises: + ValueError: If the schemas are not compatible. + """ + name_mapping = table_schema.name_mapping + try: + task_schema = pyarrow_to_schema(other_schema, name_mapping=name_mapping) + except ValueError as e: + other_schema = _pyarrow_to_schema_without_ids(other_schema) + additional_names = set(other_schema.column_names) - set(table_schema.column_names) + raise ValueError( + f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." + ) from e + + if table_schema.as_struct() != task_schema.as_struct(): + from rich.console import Console + from rich.table import Table as RichTable + + console = Console(record=True) + + rich_table = RichTable(show_header=True, header_style="bold") + rich_table.add_column("") + rich_table.add_column("Table field") + rich_table.add_column("Dataframe field") + + for lhs in table_schema.fields: + try: + rhs = task_schema.find_field(lhs.field_id) + rich_table.add_row("✅" if lhs == rhs else "❌", str(lhs), str(rhs)) + except ValueError: + rich_table.add_row("❌", str(lhs), "Missing") + + console.print(rich_table) + raise ValueError(f"Mismatch in fields:\n{console.export_text()}") + + def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]: for file_path in file_paths: input_file = io.new_input(file_path) @@ -1985,6 +2026,8 @@ def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_ f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids" ) schema = table_metadata.schema() + _check_schema_compatible(schema, parquet_metadata.schema.to_arrow_schema()) + statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=parquet_metadata, stats_columns=compute_statistics_plan(schema, table_metadata.properties), diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 8eea9859bc..18bec1a78a 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -70,7 +70,7 @@ manifest_evaluator, ) from pyiceberg.io import FileIO, load_file_io -from pyiceberg.io.pyarrow import _dataframe_to_data_files, expression_to_pyarrow, project_table +from pyiceberg.io.pyarrow import _check_schema_compatible, _dataframe_to_data_files, expression_to_pyarrow, project_table from pyiceberg.manifest import ( POSITIONAL_DELETE_SCHEMA, DataFile, @@ -165,49 +165,6 @@ _JAVA_LONG_MAX = 9223372036854775807 -def _check_schema_compatible(table_schema: Schema, other_schema: "pa.Schema") -> None: - """ - Check if the `table_schema` is compatible with `other_schema`. - - Two schemas are considered compatible when they are equal in terms of the Iceberg Schema type. - - Raises: - ValueError: If the schemas are not compatible. - """ - from pyiceberg.io.pyarrow import _pyarrow_to_schema_without_ids, pyarrow_to_schema - - name_mapping = table_schema.name_mapping - try: - task_schema = pyarrow_to_schema(other_schema, name_mapping=name_mapping) - except ValueError as e: - other_schema = _pyarrow_to_schema_without_ids(other_schema) - additional_names = set(other_schema.column_names) - set(table_schema.column_names) - raise ValueError( - f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." - ) from e - - if table_schema.as_struct() != task_schema.as_struct(): - from rich.console import Console - from rich.table import Table as RichTable - - console = Console(record=True) - - rich_table = RichTable(show_header=True, header_style="bold") - rich_table.add_column("") - rich_table.add_column("Table field") - rich_table.add_column("Dataframe field") - - for lhs in table_schema.fields: - try: - rhs = task_schema.find_field(lhs.field_id) - rich_table.add_row("✅" if lhs == rhs else "❌", str(lhs), str(rhs)) - except ValueError: - rich_table.add_row("❌", str(lhs), "Missing") - - console.print(rich_table) - raise ValueError(f"Mismatch in fields:\n{console.export_text()}") - - class TableProperties: PARQUET_ROW_GROUP_SIZE_BYTES = "write.parquet.row-group-size-bytes" PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 # 128 MB diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 84729fcca4..18ecaa48bf 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -448,3 +448,54 @@ def test_add_files_snapshot_properties(spark: SparkSession, session_catalog: Cat assert "snapshot_prop_a" in summary assert summary["snapshot_prop_a"] == "test_prop_a" + + +@pytest.mark.integration +def test_add_files_fails_on_schema_mismatch(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.table_schema_mismatch_fails_v{format_version}" + + tbl = _create_table(session_catalog, identifier, format_version) + WRONG_SCHEMA = pa.schema([ + ("foo", pa.bool_()), + ("bar", pa.string()), + ("baz", pa.string()), # should be integer + ("qux", pa.date32()), + ]) + file_path = f"s3://warehouse/default/table_schema_mismatch_fails/v{format_version}/test.parquet" + # write parquet files + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=WRONG_SCHEMA) as writer: + writer.write_table( + pa.Table.from_pylist( + [ + { + "foo": True, + "bar": "bar_string", + "baz": "123", + "qux": date(2024, 3, 7), + }, + { + "foo": True, + "bar": "bar_string", + "baz": "124", + "qux": date(2024, 3, 7), + }, + ], + schema=WRONG_SCHEMA, + ) + ) + + expected = """Mismatch in fields: +┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +┃ ┃ Table field ┃ Dataframe field ┃ +┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━┩ +│ ✅ │ 1: foo: optional boolean │ 1: foo: optional boolean │ +| ✅ │ 2: bar: optional string │ 2: bar: optional string │ +│ ❌ │ 3: baz: optional int │ 3: baz: optional string │ +│ ✅ │ 4: qux: optional date │ 4: qux: optional date │ +└────┴──────────────────────────┴──────────────────────────┘ +""" + + with pytest.raises(ValueError, match=expected): + tbl.add_files(file_paths=[file_path]) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index ecb946a98b..18004ca2e8 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -60,6 +60,7 @@ PyArrowFile, PyArrowFileIO, StatsAggregator, + _check_schema_compatible, _ConvertToArrowSchema, _primitive_to_physical, _read_deletes, @@ -1718,3 +1719,93 @@ def test_bin_pack_arrow_table(arrow_table_with_null: pa.Table) -> None: # and will produce half the number of files if we double the target size bin_packed = bin_pack_arrow_table(bigger_arrow_tbl, target_file_size=arrow_table_with_null.nbytes * 2) assert len(list(bin_packed)) == 5 + + +def test_schema_mismatch_type(table_schema_simple: Schema) -> None: + other_schema = pa.schema(( + pa.field("foo", pa.string(), nullable=True), + pa.field("bar", pa.decimal128(18, 6), nullable=False), + pa.field("baz", pa.bool_(), nullable=True), + )) + + expected = r"""Mismatch in fields: +┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +┃ ┃ Table field ┃ Dataframe field ┃ +┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩ +│ ✅ │ 1: foo: optional string │ 1: foo: optional string │ +│ ❌ │ 2: bar: required int │ 2: bar: required decimal\(18, 6\) │ +│ ✅ │ 3: baz: optional boolean │ 3: baz: optional boolean │ +└────┴──────────────────────────┴─────────────────────────────────┘ +""" + + with pytest.raises(ValueError, match=expected): + _check_schema_compatible(table_schema_simple, other_schema) + + +def test_schema_mismatch_nullability(table_schema_simple: Schema) -> None: + other_schema = pa.schema(( + pa.field("foo", pa.string(), nullable=True), + pa.field("bar", pa.int32(), nullable=True), + pa.field("baz", pa.bool_(), nullable=True), + )) + + expected = """Mismatch in fields: +┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +┃ ┃ Table field ┃ Dataframe field ┃ +┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━┩ +│ ✅ │ 1: foo: optional string │ 1: foo: optional string │ +│ ❌ │ 2: bar: required int │ 2: bar: optional int │ +│ ✅ │ 3: baz: optional boolean │ 3: baz: optional boolean │ +└────┴──────────────────────────┴──────────────────────────┘ +""" + + with pytest.raises(ValueError, match=expected): + _check_schema_compatible(table_schema_simple, other_schema) + + +def test_schema_mismatch_missing_field(table_schema_simple: Schema) -> None: + other_schema = pa.schema(( + pa.field("foo", pa.string(), nullable=True), + pa.field("baz", pa.bool_(), nullable=True), + )) + + expected = """Mismatch in fields: +┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +┃ ┃ Table field ┃ Dataframe field ┃ +┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━┩ +│ ✅ │ 1: foo: optional string │ 1: foo: optional string │ +│ ❌ │ 2: bar: required int │ Missing │ +│ ✅ │ 3: baz: optional boolean │ 3: baz: optional boolean │ +└────┴──────────────────────────┴──────────────────────────┘ +""" + + with pytest.raises(ValueError, match=expected): + _check_schema_compatible(table_schema_simple, other_schema) + + +def test_schema_mismatch_additional_field(table_schema_simple: Schema) -> None: + other_schema = pa.schema(( + pa.field("foo", pa.string(), nullable=True), + pa.field("bar", pa.int32(), nullable=True), + pa.field("baz", pa.bool_(), nullable=True), + pa.field("new_field", pa.date32(), nullable=True), + )) + + expected = r"PyArrow table contains more columns: new_field. Update the schema first \(hint, use union_by_name\)." + + with pytest.raises(ValueError, match=expected): + _check_schema_compatible(table_schema_simple, other_schema) + + +def test_schema_downcast(table_schema_simple: Schema) -> None: + # large_string type is compatible with string type + other_schema = pa.schema(( + pa.field("foo", pa.large_string(), nullable=True), + pa.field("bar", pa.int32(), nullable=False), + pa.field("baz", pa.bool_(), nullable=True), + )) + + try: + _check_schema_compatible(table_schema_simple, other_schema) + except Exception: + pytest.fail("Unexpected Exception raised when calling `_check_schema`") diff --git a/tests/table/test_init.py b/tests/table/test_init.py index d7c4ffeeaf..4362fc9e5a 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -19,7 +19,6 @@ from copy import copy from typing import Any, Dict -import pyarrow as pa import pytest from pydantic import ValidationError from sortedcontainers import SortedList @@ -63,7 +62,6 @@ TableIdentifier, UpdateSchema, _apply_table_update, - _check_schema_compatible, _determine_partitions, _match_deletes_to_data_file, _TableMetadataUpdateContext, @@ -1124,96 +1122,6 @@ def test_correct_schema() -> None: assert "Snapshot not found: -1" in str(exc_info.value) -def test_schema_mismatch_type(table_schema_simple: Schema) -> None: - other_schema = pa.schema(( - pa.field("foo", pa.string(), nullable=True), - pa.field("bar", pa.decimal128(18, 6), nullable=False), - pa.field("baz", pa.bool_(), nullable=True), - )) - - expected = r"""Mismatch in fields: -┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ -┃ ┃ Table field ┃ Dataframe field ┃ -┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩ -│ ✅ │ 1: foo: optional string │ 1: foo: optional string │ -│ ❌ │ 2: bar: required int │ 2: bar: required decimal\(18, 6\) │ -│ ✅ │ 3: baz: optional boolean │ 3: baz: optional boolean │ -└────┴──────────────────────────┴─────────────────────────────────┘ -""" - - with pytest.raises(ValueError, match=expected): - _check_schema_compatible(table_schema_simple, other_schema) - - -def test_schema_mismatch_nullability(table_schema_simple: Schema) -> None: - other_schema = pa.schema(( - pa.field("foo", pa.string(), nullable=True), - pa.field("bar", pa.int32(), nullable=True), - pa.field("baz", pa.bool_(), nullable=True), - )) - - expected = """Mismatch in fields: -┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┓ -┃ ┃ Table field ┃ Dataframe field ┃ -┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━┩ -│ ✅ │ 1: foo: optional string │ 1: foo: optional string │ -│ ❌ │ 2: bar: required int │ 2: bar: optional int │ -│ ✅ │ 3: baz: optional boolean │ 3: baz: optional boolean │ -└────┴──────────────────────────┴──────────────────────────┘ -""" - - with pytest.raises(ValueError, match=expected): - _check_schema_compatible(table_schema_simple, other_schema) - - -def test_schema_mismatch_missing_field(table_schema_simple: Schema) -> None: - other_schema = pa.schema(( - pa.field("foo", pa.string(), nullable=True), - pa.field("baz", pa.bool_(), nullable=True), - )) - - expected = """Mismatch in fields: -┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┓ -┃ ┃ Table field ┃ Dataframe field ┃ -┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━┩ -│ ✅ │ 1: foo: optional string │ 1: foo: optional string │ -│ ❌ │ 2: bar: required int │ Missing │ -│ ✅ │ 3: baz: optional boolean │ 3: baz: optional boolean │ -└────┴──────────────────────────┴──────────────────────────┘ -""" - - with pytest.raises(ValueError, match=expected): - _check_schema_compatible(table_schema_simple, other_schema) - - -def test_schema_mismatch_additional_field(table_schema_simple: Schema) -> None: - other_schema = pa.schema(( - pa.field("foo", pa.string(), nullable=True), - pa.field("bar", pa.int32(), nullable=True), - pa.field("baz", pa.bool_(), nullable=True), - pa.field("new_field", pa.date32(), nullable=True), - )) - - expected = r"PyArrow table contains more columns: new_field. Update the schema first \(hint, use union_by_name\)." - - with pytest.raises(ValueError, match=expected): - _check_schema_compatible(table_schema_simple, other_schema) - - -def test_schema_downcast(table_schema_simple: Schema) -> None: - # large_string type is compatible with string type - other_schema = pa.schema(( - pa.field("foo", pa.large_string(), nullable=True), - pa.field("bar", pa.int32(), nullable=False), - pa.field("baz", pa.bool_(), nullable=True), - )) - - try: - _check_schema_compatible(table_schema_simple, other_schema) - except Exception: - pytest.fail("Unexpected Exception raised when calling `_check_schema`") - - def test_table_properties(example_table_metadata_v2: Dict[str, Any]) -> None: # metadata properties are all strings for k, v in example_table_metadata_v2["properties"].items(): From 177edce939c1611c76f7bb38ecfd3de154ab5cb9 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Wed, 10 Jul 2024 20:24:22 +0000 Subject: [PATCH 2/8] merge --- pyiceberg/table/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 0148d53aa4..9d99dec970 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -151,7 +151,6 @@ ) from pyiceberg.utils.bin_packing import ListPacker from pyiceberg.utils.concurrent import ExecutorFactory -from pyiceberg.utils.config import Config from pyiceberg.utils.datetime import datetime_to_millis from pyiceberg.utils.deprecated import deprecated from pyiceberg.utils.singleton import _convert_to_hashable_type From e9004aebfdf6932316aea0e56b1e934acb8040e6 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Wed, 10 Jul 2024 21:18:02 +0000 Subject: [PATCH 3/8] fix --- pyiceberg/catalog/__init__.py | 3 +-- pyiceberg/io/pyarrow.py | 11 ++++++++--- pyiceberg/table/__init__.py | 1 - 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index ae978329a0..bf6292dc04 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -49,7 +49,6 @@ from pyiceberg.schema import Schema from pyiceberg.serializers import ToOutputFile from pyiceberg.table import ( - DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, CommitTableRequest, CommitTableResponse, CreateTableTransaction, @@ -674,7 +673,7 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema: try: import pyarrow as pa - from pyiceberg.io.pyarrow import _ConvertToIcebergWithoutIDs, visit_pyarrow + from pyiceberg.io.pyarrow import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, _ConvertToIcebergWithoutIDs, visit_pyarrow downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False if isinstance(schema, pa.Schema): diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 15cb19f829..7e41c4c4ef 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -166,6 +166,8 @@ ONE_MEGABYTE = 1024 * 1024 BUFFER_SIZE = "buffer-size" +DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" + ICEBERG_SCHEMA = b"iceberg.schema" # The PARQUET: in front means that it is Parquet specific, in this case the field_id PYARROW_PARQUET_FIELD_ID_KEY = b"PARQUET:field_id" @@ -1934,7 +1936,7 @@ def data_file_statistics_from_parquet_metadata( def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: - from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, PropertyUtil, TableProperties + from pyiceberg.table import PropertyUtil, TableProperties parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) row_group_size = PropertyUtil.property_as_int( @@ -2025,10 +2027,13 @@ def _check_schema_compatible(table_schema: Schema, other_schema: pa.Schema) -> N ValueError: If the schemas are not compatible. """ name_mapping = table_schema.name_mapping + downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False try: - task_schema = pyarrow_to_schema(other_schema, name_mapping=name_mapping) + task_schema = pyarrow_to_schema( + other_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + ) except ValueError as e: - other_schema = _pyarrow_to_schema_without_ids(other_schema) + other_schema = _pyarrow_to_schema_without_ids(other_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) additional_names = set(other_schema.column_names) - set(table_schema.column_names) raise ValueError( f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 9d99dec970..6fe4595529 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -166,7 +166,6 @@ ALWAYS_TRUE = AlwaysTrue() TABLE_ROOT_ID = -1 -DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" _JAVA_LONG_MAX = 9223372036854775807 From c33fe1c3051074e3497a406f4ffe5c2b6ed6ae08 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Wed, 10 Jul 2024 21:42:50 +0000 Subject: [PATCH 4/8] Etc/UTC support --- pyiceberg/io/pyarrow.py | 2 +- tests/integration/test_writes/test_writes.py | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 7e41c4c4ef..9bd5082b21 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -939,7 +939,7 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType: else: raise TypeError(f"Unsupported precision for timestamp type: {primitive.unit}") - if primitive.tz == "UTC" or primitive.tz == "+00:00": + if primitive.tz in ("UTC", "+00:00", "Etc/UTC"): return TimestamptzType() elif primitive.tz is None: return TimestampType() diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 2542fbdb38..cb87e69327 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -979,6 +979,7 @@ def test_write_all_timestamp_precision(mocker: MockerFixture, session_catalog: C ("timestamptz_us", pa.timestamp(unit="us", tz="UTC")), ("timestamp_ns", pa.timestamp(unit="ns")), ("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")), + ("timestamptz_us_etc_utc", pa.timestamp(unit="us", tz="Etc/UTC")), ]) TEST_DATA_WITH_NULL = { "timestamp_s": [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)], @@ -1005,6 +1006,11 @@ def test_write_all_timestamp_precision(mocker: MockerFixture, session_catalog: C None, datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc), ], + "timestamptz_us_etc_utc": [ + datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc), + None, + datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc), + ], } input_arrow_table = pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=arrow_table_schema_with_all_timestamp_precisions) mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE": "True"}) @@ -1028,6 +1034,7 @@ def test_write_all_timestamp_precision(mocker: MockerFixture, session_catalog: C ("timestamptz_us", pa.timestamp(unit="us", tz="UTC")), ("timestamp_ns", pa.timestamp(unit="us")), ("timestamptz_ns", pa.timestamp(unit="us", tz="UTC")), + ("timestamptz_us_etc_utc", pa.timestamp(unit="us", tz="UTC")), ]) assert written_arrow_table.schema == expected_schema_in_all_us assert written_arrow_table == input_arrow_table.cast(expected_schema_in_all_us) From 2c95cd4e51fdccc92efec00421e607477eb5a0bf Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Wed, 10 Jul 2024 21:43:53 +0000 Subject: [PATCH 5/8] revert - not the right PR --- pyiceberg/io/pyarrow.py | 2 +- tests/integration/test_writes/test_writes.py | 7 ------- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 9bd5082b21..7e41c4c4ef 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -939,7 +939,7 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType: else: raise TypeError(f"Unsupported precision for timestamp type: {primitive.unit}") - if primitive.tz in ("UTC", "+00:00", "Etc/UTC"): + if primitive.tz == "UTC" or primitive.tz == "+00:00": return TimestamptzType() elif primitive.tz is None: return TimestampType() diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index cb87e69327..2542fbdb38 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -979,7 +979,6 @@ def test_write_all_timestamp_precision(mocker: MockerFixture, session_catalog: C ("timestamptz_us", pa.timestamp(unit="us", tz="UTC")), ("timestamp_ns", pa.timestamp(unit="ns")), ("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")), - ("timestamptz_us_etc_utc", pa.timestamp(unit="us", tz="Etc/UTC")), ]) TEST_DATA_WITH_NULL = { "timestamp_s": [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)], @@ -1006,11 +1005,6 @@ def test_write_all_timestamp_precision(mocker: MockerFixture, session_catalog: C None, datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc), ], - "timestamptz_us_etc_utc": [ - datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc), - None, - datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc), - ], } input_arrow_table = pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=arrow_table_schema_with_all_timestamp_precisions) mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE": "True"}) @@ -1034,7 +1028,6 @@ def test_write_all_timestamp_precision(mocker: MockerFixture, session_catalog: C ("timestamptz_us", pa.timestamp(unit="us", tz="UTC")), ("timestamp_ns", pa.timestamp(unit="us")), ("timestamptz_ns", pa.timestamp(unit="us", tz="UTC")), - ("timestamptz_us_etc_utc", pa.timestamp(unit="us", tz="UTC")), ]) assert written_arrow_table.schema == expected_schema_in_all_us assert written_arrow_table == input_arrow_table.cast(expected_schema_in_all_us) From 2641301819f618c30eab50048b5079907bb01a83 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Thu, 11 Jul 2024 14:02:59 +0000 Subject: [PATCH 6/8] lint --- tests/io/test_pyarrow.py | 2 ++ tests/table/test_init.py | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 0a5c138e3c..326eeff195 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1811,6 +1811,8 @@ def test_schema_downcast(table_schema_simple: Schema) -> None: _check_schema_compatible(table_schema_simple, other_schema) except Exception: pytest.fail("Unexpected Exception raised when calling `_check_schema`") + + def test_partition_for_demo() -> None: test_pa_schema = pa.schema([("year", pa.int64()), ("n_legs", pa.int64()), ("animal", pa.string())]) test_schema = Schema( diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 635321a92f..7a5ea86d7a 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -62,8 +62,6 @@ TableIdentifier, UpdateSchema, _apply_table_update, - _determine_partitions, - _check_schema_compatible, _match_deletes_to_data_file, _TableMetadataUpdateContext, update_table_metadata, From 6c4f5d751f2be96ca1ab765b3d91564275503500 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Thu, 11 Jul 2024 15:41:20 +0000 Subject: [PATCH 7/8] fix --- pyiceberg/catalog/__init__.py | 3 ++- pyiceberg/io/pyarrow.py | 6 ++--- pyiceberg/table/__init__.py | 14 ++++++++---- tests/integration/test_add_files.py | 34 ++++++++++++----------------- 4 files changed, 28 insertions(+), 29 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index bf6292dc04..ae978329a0 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -49,6 +49,7 @@ from pyiceberg.schema import Schema from pyiceberg.serializers import ToOutputFile from pyiceberg.table import ( + DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, CommitTableRequest, CommitTableResponse, CreateTableTransaction, @@ -673,7 +674,7 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema: try: import pyarrow as pa - from pyiceberg.io.pyarrow import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, _ConvertToIcebergWithoutIDs, visit_pyarrow + from pyiceberg.io.pyarrow import _ConvertToIcebergWithoutIDs, visit_pyarrow downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False if isinstance(schema, pa.Schema): diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index f7c86273fd..6a231f1b28 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -166,7 +166,6 @@ ONE_MEGABYTE = 1024 * 1024 BUFFER_SIZE = "buffer-size" -DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" ICEBERG_SCHEMA = b"iceberg.schema" # The PARQUET: in front means that it is Parquet specific, in this case the field_id @@ -1952,7 +1951,7 @@ def data_file_statistics_from_parquet_metadata( def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: - from pyiceberg.table import PropertyUtil, TableProperties + from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, PropertyUtil, TableProperties parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) row_group_size = PropertyUtil.property_as_int( @@ -2034,7 +2033,7 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[ return bin_packed_record_batches -def _check_schema_compatible(table_schema: Schema, other_schema: pa.Schema) -> None: +def _check_schema_compatible(table_schema: Schema, other_schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False) -> None: """ Check if the `table_schema` is compatible with `other_schema`. @@ -2044,7 +2043,6 @@ def _check_schema_compatible(table_schema: Schema, other_schema: pa.Schema) -> N ValueError: If the schemas are not compatible. """ name_mapping = table_schema.name_mapping - downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False try: task_schema = pyarrow_to_schema( other_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 6fdfe1535c..62440c4773 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -150,6 +150,7 @@ ) from pyiceberg.utils.bin_packing import ListPacker from pyiceberg.utils.concurrent import ExecutorFactory +from pyiceberg.utils.config import Config from pyiceberg.utils.datetime import datetime_to_millis from pyiceberg.utils.deprecated import deprecated from pyiceberg.utils.singleton import _convert_to_hashable_type @@ -166,6 +167,7 @@ ALWAYS_TRUE = AlwaysTrue() TABLE_ROOT_ID = -1 _JAVA_LONG_MAX = 9223372036854775807 +DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" class TableProperties: @@ -478,8 +480,10 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) raise ValueError( f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}." ) - - _check_schema_compatible(self._table.schema(), other_schema=df.schema) + downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False + _check_schema_compatible( + self._table.schema(), other_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + ) # cast if the two schemas are compatible but not equal table_arrow_schema = self._table.schema().as_arrow() if table_arrow_schema != df.schema: @@ -537,8 +541,10 @@ def overwrite( raise ValueError( f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}." ) - - _check_schema_compatible(self._table.schema(), other_schema=df.schema) + downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False + _check_schema_compatible( + self._table.schema(), other_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + ) # cast if the two schemas are compatible but not equal table_arrow_schema = self._table.schema().as_arrow() if table_arrow_schema != df.schema: diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 65c0b0a1ee..984c7d1175 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -17,6 +17,7 @@ # pylint:disable=redefined-outer-name import os +import re from datetime import date from typing import Iterator @@ -569,7 +570,7 @@ def test_add_files_with_large_and_regular_schema(spark: SparkSession, session_ca assert table_schema == arrow_schema_large -def test_timestamp_tz_ns_downcast_on_read(session_catalog: Catalog, format_version: int, mocker: MockerFixture) -> None: +def test_add_files_with_timestamp_tz_ns_fails(session_catalog: Catalog, format_version: int, mocker: MockerFixture) -> None: nanoseconds_schema_iceberg = Schema(NestedField(1, "quux", TimestamptzType())) nanoseconds_schema = pa.schema([ @@ -600,25 +601,18 @@ def test_timestamp_tz_ns_downcast_on_read(session_catalog: Catalog, format_versi partition_spec=PartitionSpec(), ) - file_paths = [f"s3://warehouse/default/test_timestamp_tz/v{format_version}/test-{i}.parquet" for i in range(5)] + file_path = f"s3://warehouse/default/test_timestamp_tz/v{format_version}/test.parquet" # write parquet files - for file_path in file_paths: - fo = tbl.io.new_output(file_path) - with fo.create(overwrite=True) as fos: - with pq.ParquetWriter(fos, schema=nanoseconds_schema) as writer: - writer.write_table(arrow_table) + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=nanoseconds_schema) as writer: + writer.write_table(arrow_table) # add the parquet files as data files - tbl.add_files(file_paths=file_paths) - - assert tbl.scan().to_arrow() == pa.concat_tables( - [ - arrow_table.cast( - pa.schema([ - ("quux", pa.timestamp("us", tz="UTC")), - ]), - safe=False, - ) - ] - * 5 - ) + with pytest.raises( + TypeError, + match=re.escape( + "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write." + ), + ): + tbl.add_files(file_paths=[file_path]) From dead345bd8591bf594ec894693879702981f529a Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Thu, 11 Jul 2024 16:22:11 -0400 Subject: [PATCH 8/8] adopt review suggestion - thank you Fokko! Co-authored-by: Fokko Driesprong --- pyiceberg/io/pyarrow.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 6a231f1b28..56f2242514 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -166,7 +166,6 @@ ONE_MEGABYTE = 1024 * 1024 BUFFER_SIZE = "buffer-size" - ICEBERG_SCHEMA = b"iceberg.schema" # The PARQUET: in front means that it is Parquet specific, in this case the field_id PYARROW_PARQUET_FIELD_ID_KEY = b"PARQUET:field_id"