From 5313df80ffd9720bf0b86492f35903c5b5ebbdb5 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 5 Nov 2024 09:17:47 +0100 Subject: [PATCH 1/6] Pass table-token to commit endpoint (#1278) * Pass table-token to subsequent requests See open-api spec: https://github.com/apache/iceberg/blob/ea61ee46db17d94f22a5ef11fd913146557bdce7/open-api/rest-catalog-open-api.yaml#L927-L929 Resolves #1113 * Replace with constant --- pyiceberg/catalog/rest.py | 7 +++++++ pyiceberg/table/__init__.py | 10 +++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py index 20a04d9c5b..7c5d774c27 100644 --- a/pyiceberg/catalog/rest.py +++ b/pyiceberg/catalog/rest.py @@ -525,6 +525,7 @@ def _response_to_table(self, identifier_tuple: Tuple[str, ...], table_response: {**table_response.metadata.properties, **table_response.config}, table_response.metadata_location ), catalog=self, + config=table_response.config, ) def _response_to_staged_table(self, identifier_tuple: Tuple[str, ...], table_response: TableResponse) -> StagedTable: @@ -777,9 +778,15 @@ def commit_table( identifier = self._identifier_to_tuple_without_catalog(table.identifier) table_identifier = TableIdentifier(namespace=identifier[:-1], name=identifier[-1]) table_request = CommitTableRequest(identifier=table_identifier, requirements=requirements, updates=updates) + + headers = self._session.headers + if table_token := table.config.get(TOKEN): + headers[AUTHORIZATION_HEADER] = f"{BEARER_PREFIX} {table_token}" + response = self._session.post( self.url(Endpoints.update_table, prefixed=True, **self._split_identifier_for_path(table_request.identifier)), data=table_request.model_dump_json().encode(UTF8), + headers=headers, ) try: response.raise_for_status() diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index e431101ba9..8055082542 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -734,15 +734,23 @@ class Table: metadata_location: str = Field() io: FileIO catalog: Catalog + config: Dict[str, str] def __init__( - self, identifier: Identifier, metadata: TableMetadata, metadata_location: str, io: FileIO, catalog: Catalog + self, + identifier: Identifier, + metadata: TableMetadata, + metadata_location: str, + io: FileIO, + catalog: Catalog, + config: Dict[str, str] = EMPTY_DICT, ) -> None: self._identifier = identifier self.metadata = metadata self.metadata_location = metadata_location self.io = io self.catalog = catalog + self.config = config def transaction(self) -> Transaction: """Create a new transaction object to first stage the changes, and then commit them to the catalog. From 07dbdd4e6640db7d70d24de68d46b9f6455b765a Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 5 Nov 2024 09:18:10 +0100 Subject: [PATCH 2/6] Bump mkdocs-material from 9.5.42 to 9.5.43 (#1288) Bumps [mkdocs-material](https://github.com/squidfunk/mkdocs-material) from 9.5.42 to 9.5.43. - [Release notes](https://github.com/squidfunk/mkdocs-material/releases) - [Changelog](https://github.com/squidfunk/mkdocs-material/blob/master/CHANGELOG) - [Commits](https://github.com/squidfunk/mkdocs-material/compare/9.5.42...9.5.43) --- updated-dependencies: - dependency-name: mkdocs-material dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- mkdocs/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mkdocs/requirements.txt b/mkdocs/requirements.txt index 290bc27ad9..6fcaf7f36c 100644 --- a/mkdocs/requirements.txt +++ b/mkdocs/requirements.txt @@ -23,6 +23,6 @@ mkdocstrings-python==1.11.1 mkdocs-literate-nav==0.6.1 mkdocs-autorefs==1.2.0 mkdocs-gen-files==0.5.0 -mkdocs-material==9.5.42 +mkdocs-material==9.5.43 mkdocs-material-extensions==1.3.1 mkdocs-section-index==0.3.9 From 5f123caf17f1e238fca49c724068251695853d29 Mon Sep 17 00:00:00 2001 From: Samuel Hinton Date: Tue, 5 Nov 2024 19:23:00 +1000 Subject: [PATCH 3/6] Updating configuration docs (#1292) * Updating configuration docs * Fixing linting --- mkdocs/docs/configuration.md | 46 ++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 2ed58091bb..606a18ce91 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -24,6 +24,30 @@ hide: # Configuration +## Setting Configuration Values + +There are three ways to pass in configuration: + +- Using the `~/.pyiceberg.yaml` configuration file +- Through environment variables +- By passing in credentials through the CLI or the Python API + +The configuration file is recommended since that's the easiest way to manage the credentials. + +To change the path searched for the `.pyiceberg.yaml`, you can overwrite the `PYICEBERG_HOME` environment variable. + +Another option is through environment variables: + +```sh +export PYICEBERG_CATALOG__DEFAULT__URI=thrift://localhost:9083 +export PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID=username +export PYICEBERG_CATALOG__DEFAULT__S3__SECRET_ACCESS_KEY=password +``` + +The environment variable picked up by Iceberg starts with `PYICEBERG_` and then follows the yaml structure below, where a double underscore `__` represents a nested field, and the underscore `_` is converted into a dash `-`. + +For example, `PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID`, sets `s3.access-key-id` on the `default` catalog. + ## Tables Iceberg tables support table properties to configure table behavior. @@ -36,7 +60,7 @@ Iceberg tables support table properties to configure table behavior. | `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg | | `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group | | `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk | -| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the approximate encoded size of data pages within a column chunk | +| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the maximum number of rows within a column chunk | | `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group | | `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. | @@ -161,26 +185,6 @@ Alternatively, you can also directly set the catalog implementation: | type | rest | Type of catalog, one of `rest`, `sql`, `hive`, `glue`, `dymamodb`. Default to `rest` | | py-catalog-impl | mypackage.mymodule.MyCatalog | Sets the catalog explicitly to an implementation, and will fail explicitly if it can't be loaded | -There are three ways to pass in configuration: - -- Using the `~/.pyiceberg.yaml` configuration file -- Through environment variables -- By passing in credentials through the CLI or the Python API - -The configuration file is recommended since that's the easiest way to manage the credentials. - -Another option is through environment variables: - -```sh -export PYICEBERG_CATALOG__DEFAULT__URI=thrift://localhost:9083 -export PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID=username -export PYICEBERG_CATALOG__DEFAULT__S3__SECRET_ACCESS_KEY=password -``` - -The environment variable picked up by Iceberg starts with `PYICEBERG_` and then follows the yaml structure below, where a double underscore `__` represents a nested field, and the underscore `_` is converted into a dash `-`. - -For example, `PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID`, sets `s3.access-key-id` on the `default` catalog. - ### REST Catalog ```yaml From 9b8400a6a1927ac6f810a9a44af7b9ac420fd0e2 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 5 Nov 2024 12:39:05 +0100 Subject: [PATCH 4/6] Bump griffe from 1.3.1 to 1.5.1 (#1289) Bumps [griffe](https://github.com/mkdocstrings/griffe) from 1.3.1 to 1.5.1. - [Release notes](https://github.com/mkdocstrings/griffe/releases) - [Changelog](https://github.com/mkdocstrings/griffe/blob/main/CHANGELOG.md) - [Commits](https://github.com/mkdocstrings/griffe/compare/1.3.1...1.5.1) --- updated-dependencies: - dependency-name: griffe dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- mkdocs/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mkdocs/requirements.txt b/mkdocs/requirements.txt index 6fcaf7f36c..cf8de1391e 100644 --- a/mkdocs/requirements.txt +++ b/mkdocs/requirements.txt @@ -16,7 +16,7 @@ # under the License. mkdocs==1.6.1 -griffe==1.3.1 +griffe==1.5.1 jinja2==3.1.4 mkdocstrings==0.26.2 mkdocstrings-python==1.11.1 From c3bf16c3d168159f034ce8f4fc079328c27ecb21 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 5 Nov 2024 16:54:27 +0100 Subject: [PATCH 5/6] Allow union of `{int,long}`, `{float,double}`, etc (#1283) * Allow union of `{int,long}`, `{float,double}`, etc * Thanks Kevin! Co-authored-by: Kevin Liu * Thanks Kevin! Co-authored-by: Kevin Liu * MOAR tests * lint * Make the tests happy * Remove redundant test --------- Co-authored-by: Kevin Liu --- pyiceberg/table/update/schema.py | 8 ++++++- tests/test_schema.py | 39 +++++++++++++++++++++++++++++--- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/pyiceberg/table/update/schema.py b/pyiceberg/table/update/schema.py index 0442a604be..0c83628f37 100644 --- a/pyiceberg/table/update/schema.py +++ b/pyiceberg/table/update/schema.py @@ -770,7 +770,13 @@ def _update_column(self, field: NestedField, existing_field: NestedField) -> Non self.update_schema.make_column_optional(full_name) if field.field_type.is_primitive and field.field_type != existing_field.field_type: - self.update_schema.update_column(full_name, field_type=field.field_type) + try: + # If the current type is wider than the new type, then + # we perform a noop + _ = promote(field.field_type, existing_field.field_type) + except ResolveError: + # If this is not the case, perform the type evolution + self.update_schema.update_column(full_name, field_type=field.field_type) if field.doc is not None and field.doc != existing_field.doc: self.update_schema.update_column(full_name, doc=field.doc) diff --git a/tests/test_schema.py b/tests/test_schema.py index 7f2ab906fa..4d894b0d03 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -1189,6 +1189,17 @@ def test_detect_invalid_top_level_maps() -> None: _ = UpdateSchema(transaction=None, schema=current_schema).union_by_name(new_schema)._apply() # type: ignore +def test_allow_double_to_float() -> None: + current_schema = Schema(NestedField(field_id=1, name="aCol", field_type=DoubleType(), required=False)) + new_schema = Schema(NestedField(field_id=1, name="aCol", field_type=FloatType(), required=False)) + + applied = UpdateSchema(transaction=None, schema=current_schema).union_by_name(new_schema)._apply() # type: ignore + + assert applied.as_struct() == current_schema.as_struct() + assert len(applied.fields) == 1 + assert isinstance(applied.fields[0].field_type, DoubleType) + + def test_promote_float_to_double() -> None: current_schema = Schema(NestedField(field_id=1, name="aCol", field_type=FloatType(), required=False)) new_schema = Schema(NestedField(field_id=1, name="aCol", field_type=DoubleType(), required=False)) @@ -1200,11 +1211,33 @@ def test_promote_float_to_double() -> None: assert isinstance(applied.fields[0].field_type, DoubleType) -def test_detect_invalid_promotion_double_to_float() -> None: - current_schema = Schema(NestedField(field_id=1, name="aCol", field_type=DoubleType(), required=False)) +def test_allow_long_to_int() -> None: + current_schema = Schema(NestedField(field_id=1, name="aCol", field_type=LongType(), required=False)) + new_schema = Schema(NestedField(field_id=1, name="aCol", field_type=IntegerType(), required=False)) + + applied = UpdateSchema(transaction=None, schema=current_schema).union_by_name(new_schema)._apply() # type: ignore + + assert applied.as_struct() == current_schema.as_struct() + assert len(applied.fields) == 1 + assert isinstance(applied.fields[0].field_type, LongType) + + +def test_promote_int_to_long() -> None: + current_schema = Schema(NestedField(field_id=1, name="aCol", field_type=IntegerType(), required=False)) + new_schema = Schema(NestedField(field_id=1, name="aCol", field_type=LongType(), required=False)) + + applied = UpdateSchema(transaction=None, schema=current_schema).union_by_name(new_schema)._apply() # type: ignore + + assert applied.as_struct() == new_schema.as_struct() + assert len(applied.fields) == 1 + assert isinstance(applied.fields[0].field_type, LongType) + + +def test_detect_invalid_promotion_string_to_float() -> None: + current_schema = Schema(NestedField(field_id=1, name="aCol", field_type=StringType(), required=False)) new_schema = Schema(NestedField(field_id=1, name="aCol", field_type=FloatType(), required=False)) - with pytest.raises(ValidationError, match="Cannot change column type: aCol: double -> float"): + with pytest.raises(ValidationError, match="Cannot change column type: aCol: string -> float"): _ = UpdateSchema(transaction=None, schema=current_schema).union_by_name(new_schema)._apply() # type: ignore From 36e4de6cca1cc99adaf9579a92cb3a5aec6b9434 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 6 Nov 2024 03:30:43 +0100 Subject: [PATCH 6/6] Allow passing in ARN Role and Session name (#1296) --- mkdocs/docs/configuration.md | 20 +++++++++++--------- pyiceberg/io/__init__.py | 4 ++++ pyiceberg/io/pyarrow.py | 10 ++++++++++ 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 606a18ce91..ba77867ba7 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -101,18 +101,20 @@ For the FileIO there are several configuration options available: -| Key | Example | Description | -| -------------------- | ------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| Key | Example | Description | +|----------------------|----------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | s3.endpoint | | Configure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. | -| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. | -| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. | -| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. | -| s3.signer | bearer | Configure the signature version of the FileIO. | +| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. | +| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. | +| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. | +| s3.session-name | session | An optional identifier for the assumed role session. | +| s3.role-arn | arn:aws:... | AWS Role ARN. If provided instead of access_key and secret_key, temporary credentials will be fetched by assuming this role. | +| s3.signer | bearer | Configure the signature version of the FileIO. | | s3.signer.uri | | Configure the remote signing uri if it differs from the catalog uri. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `/`. | -| s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `/`. (default : v1/aws/s3/sign). | -| s3.region | us-west-2 | Sets the region of the bucket | +| s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `/`. (default : v1/aws/s3/sign). | +| s3.region | us-west-2 | Sets the region of the bucket | | s3.proxy-uri | | Configure the proxy server to be used by the FileIO. | -| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. | +| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. | diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py index fe3ea43e10..23a2cf3594 100644 --- a/pyiceberg/io/__init__.py +++ b/pyiceberg/io/__init__.py @@ -60,6 +60,8 @@ AWS_ACCESS_KEY_ID = "client.access-key-id" AWS_SECRET_ACCESS_KEY = "client.secret-access-key" AWS_SESSION_TOKEN = "client.session-token" +AWS_ROLE_ARN = "aws.role-arn" +AWS_SESSION_NAME = "aws.session-name" S3_ENDPOINT = "s3.endpoint" S3_ACCESS_KEY_ID = "s3.access-key-id" S3_SECRET_ACCESS_KEY = "s3.secret-access-key" @@ -70,6 +72,8 @@ S3_SIGNER_URI = "s3.signer.uri" S3_SIGNER_ENDPOINT = "s3.signer.endpoint" S3_SIGNER_ENDPOINT_DEFAULT = "v1/aws/s3/sign" +S3_ROLE_ARN = "s3.role-arn" +S3_SESSION_NAME = "s3.session-name" HDFS_HOST = "hdfs.host" HDFS_PORT = "hdfs.port" HDFS_USER = "hdfs.user" diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index ab4de5185b..a053b83ac9 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -85,7 +85,9 @@ from pyiceberg.io import ( AWS_ACCESS_KEY_ID, AWS_REGION, + AWS_ROLE_ARN, AWS_SECRET_ACCESS_KEY, + AWS_SESSION_NAME, AWS_SESSION_TOKEN, GCS_DEFAULT_LOCATION, GCS_ENDPOINT, @@ -101,7 +103,9 @@ S3_ENDPOINT, S3_PROXY_URI, S3_REGION, + S3_ROLE_ARN, S3_SECRET_ACCESS_KEY, + S3_SESSION_NAME, S3_SESSION_TOKEN, FileIO, InputFile, @@ -362,6 +366,12 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT): client_kwargs["connect_timeout"] = float(connect_timeout) + if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN): + client_kwargs["role_arn"] = role_arn + + if session_name := get_first_property_value(self.properties, S3_SESSION_NAME, AWS_SESSION_NAME): + client_kwargs["session_name"] = session_name + return S3FileSystem(**client_kwargs) elif scheme in ("hdfs", "viewfs"): from pyarrow.fs import HadoopFileSystem