Skip to content

Commit

Permalink
Merge branch 'apache:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
omkenge authored Nov 6, 2024
2 parents 2364936 + 36e4de6 commit 06aa879
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 37 deletions.
66 changes: 36 additions & 30 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,30 @@ hide:

# Configuration

## Setting Configuration Values

There are three ways to pass in configuration:

- Using the `~/.pyiceberg.yaml` configuration file
- Through environment variables
- By passing in credentials through the CLI or the Python API

The configuration file is recommended since that's the easiest way to manage the credentials.

To change the path searched for the `.pyiceberg.yaml`, you can overwrite the `PYICEBERG_HOME` environment variable.

Another option is through environment variables:

```sh
export PYICEBERG_CATALOG__DEFAULT__URI=thrift://localhost:9083
export PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID=username
export PYICEBERG_CATALOG__DEFAULT__S3__SECRET_ACCESS_KEY=password
```

The environment variable picked up by Iceberg starts with `PYICEBERG_` and then follows the yaml structure below, where a double underscore `__` represents a nested field, and the underscore `_` is converted into a dash `-`.

For example, `PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID`, sets `s3.access-key-id` on the `default` catalog.

## Tables

Iceberg tables support table properties to configure table behavior.
Expand All @@ -36,7 +60,7 @@ Iceberg tables support table properties to configure table behavior.
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
| `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group |
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the approximate encoded size of data pages within a column chunk |
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the maximum number of rows within a column chunk |
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. |

Expand Down Expand Up @@ -77,18 +101,20 @@ For the FileIO there are several configuration options available:

<!-- markdown-link-check-disable -->

| Key | Example | Description |
| -------------------- | ------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Key | Example | Description |
|----------------------|----------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| s3.endpoint | <https://10.0.19.25/> | Configure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. |
| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. |
| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. |
| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
| s3.signer | bearer | Configure the signature version of the FileIO. |
| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. |
| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. |
| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
| s3.session-name | session | An optional identifier for the assumed role session. |
| s3.role-arn | arn:aws:... | AWS Role ARN. If provided instead of access_key and secret_key, temporary credentials will be fetched by assuming this role. |
| s3.signer | bearer | Configure the signature version of the FileIO. |
| s3.signer.uri | <http://my.signer:8080/s3> | Configure the remote signing uri if it differs from the catalog uri. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. |
| s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. (default : v1/aws/s3/sign). |
| s3.region | us-west-2 | Sets the region of the bucket |
| s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. (default : v1/aws/s3/sign). |
| s3.region | us-west-2 | Sets the region of the bucket |
| s3.proxy-uri | <http://my.proxy.com:8080> | Configure the proxy server to be used by the FileIO. |
| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. |
| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. |

<!-- markdown-link-check-enable-->

Expand Down Expand Up @@ -161,26 +187,6 @@ Alternatively, you can also directly set the catalog implementation:
| type | rest | Type of catalog, one of `rest`, `sql`, `hive`, `glue`, `dymamodb`. Default to `rest` |
| py-catalog-impl | mypackage.mymodule.MyCatalog | Sets the catalog explicitly to an implementation, and will fail explicitly if it can't be loaded |

There are three ways to pass in configuration:

- Using the `~/.pyiceberg.yaml` configuration file
- Through environment variables
- By passing in credentials through the CLI or the Python API

The configuration file is recommended since that's the easiest way to manage the credentials.

Another option is through environment variables:

```sh
export PYICEBERG_CATALOG__DEFAULT__URI=thrift://localhost:9083
export PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID=username
export PYICEBERG_CATALOG__DEFAULT__S3__SECRET_ACCESS_KEY=password
```

The environment variable picked up by Iceberg starts with `PYICEBERG_` and then follows the yaml structure below, where a double underscore `__` represents a nested field, and the underscore `_` is converted into a dash `-`.

For example, `PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID`, sets `s3.access-key-id` on the `default` catalog.

### REST Catalog

```yaml
Expand Down
4 changes: 2 additions & 2 deletions mkdocs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
# under the License.

mkdocs==1.6.1
griffe==1.3.1
griffe==1.5.1
jinja2==3.1.4
mkdocstrings==0.26.2
mkdocstrings-python==1.11.1
mkdocs-literate-nav==0.6.1
mkdocs-autorefs==1.2.0
mkdocs-gen-files==0.5.0
mkdocs-material==9.5.42
mkdocs-material==9.5.43
mkdocs-material-extensions==1.3.1
mkdocs-section-index==0.3.9
7 changes: 7 additions & 0 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ def _response_to_table(self, identifier_tuple: Tuple[str, ...], table_response:
{**table_response.metadata.properties, **table_response.config}, table_response.metadata_location
),
catalog=self,
config=table_response.config,
)

def _response_to_staged_table(self, identifier_tuple: Tuple[str, ...], table_response: TableResponse) -> StagedTable:
Expand Down Expand Up @@ -777,9 +778,15 @@ def commit_table(
identifier = self._identifier_to_tuple_without_catalog(table.identifier)
table_identifier = TableIdentifier(namespace=identifier[:-1], name=identifier[-1])
table_request = CommitTableRequest(identifier=table_identifier, requirements=requirements, updates=updates)

headers = self._session.headers
if table_token := table.config.get(TOKEN):
headers[AUTHORIZATION_HEADER] = f"{BEARER_PREFIX} {table_token}"

response = self._session.post(
self.url(Endpoints.update_table, prefixed=True, **self._split_identifier_for_path(table_request.identifier)),
data=table_request.model_dump_json().encode(UTF8),
headers=headers,
)
try:
response.raise_for_status()
Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
AWS_ACCESS_KEY_ID = "client.access-key-id"
AWS_SECRET_ACCESS_KEY = "client.secret-access-key"
AWS_SESSION_TOKEN = "client.session-token"
AWS_ROLE_ARN = "aws.role-arn"
AWS_SESSION_NAME = "aws.session-name"
S3_ENDPOINT = "s3.endpoint"
S3_ACCESS_KEY_ID = "s3.access-key-id"
S3_SECRET_ACCESS_KEY = "s3.secret-access-key"
Expand All @@ -70,6 +72,8 @@
S3_SIGNER_URI = "s3.signer.uri"
S3_SIGNER_ENDPOINT = "s3.signer.endpoint"
S3_SIGNER_ENDPOINT_DEFAULT = "v1/aws/s3/sign"
S3_ROLE_ARN = "s3.role-arn"
S3_SESSION_NAME = "s3.session-name"
HDFS_HOST = "hdfs.host"
HDFS_PORT = "hdfs.port"
HDFS_USER = "hdfs.user"
Expand Down
10 changes: 10 additions & 0 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@
from pyiceberg.io import (
AWS_ACCESS_KEY_ID,
AWS_REGION,
AWS_ROLE_ARN,
AWS_SECRET_ACCESS_KEY,
AWS_SESSION_NAME,
AWS_SESSION_TOKEN,
GCS_DEFAULT_LOCATION,
GCS_ENDPOINT,
Expand All @@ -101,7 +103,9 @@
S3_ENDPOINT,
S3_PROXY_URI,
S3_REGION,
S3_ROLE_ARN,
S3_SECRET_ACCESS_KEY,
S3_SESSION_NAME,
S3_SESSION_TOKEN,
FileIO,
InputFile,
Expand Down Expand Up @@ -362,6 +366,12 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste
if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT):
client_kwargs["connect_timeout"] = float(connect_timeout)

if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN):
client_kwargs["role_arn"] = role_arn

if session_name := get_first_property_value(self.properties, S3_SESSION_NAME, AWS_SESSION_NAME):
client_kwargs["session_name"] = session_name

return S3FileSystem(**client_kwargs)
elif scheme in ("hdfs", "viewfs"):
from pyarrow.fs import HadoopFileSystem
Expand Down
10 changes: 9 additions & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,15 +734,23 @@ class Table:
metadata_location: str = Field()
io: FileIO
catalog: Catalog
config: Dict[str, str]

def __init__(
self, identifier: Identifier, metadata: TableMetadata, metadata_location: str, io: FileIO, catalog: Catalog
self,
identifier: Identifier,
metadata: TableMetadata,
metadata_location: str,
io: FileIO,
catalog: Catalog,
config: Dict[str, str] = EMPTY_DICT,
) -> None:
self._identifier = identifier
self.metadata = metadata
self.metadata_location = metadata_location
self.io = io
self.catalog = catalog
self.config = config

def transaction(self) -> Transaction:
"""Create a new transaction object to first stage the changes, and then commit them to the catalog.
Expand Down
8 changes: 7 additions & 1 deletion pyiceberg/table/update/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,13 @@ def _update_column(self, field: NestedField, existing_field: NestedField) -> Non
self.update_schema.make_column_optional(full_name)

if field.field_type.is_primitive and field.field_type != existing_field.field_type:
self.update_schema.update_column(full_name, field_type=field.field_type)
try:
# If the current type is wider than the new type, then
# we perform a noop
_ = promote(field.field_type, existing_field.field_type)
except ResolveError:
# If this is not the case, perform the type evolution
self.update_schema.update_column(full_name, field_type=field.field_type)

if field.doc is not None and field.doc != existing_field.doc:
self.update_schema.update_column(full_name, doc=field.doc)
Expand Down
39 changes: 36 additions & 3 deletions tests/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,17 @@ def test_detect_invalid_top_level_maps() -> None:
_ = UpdateSchema(transaction=None, schema=current_schema).union_by_name(new_schema)._apply() # type: ignore


def test_allow_double_to_float() -> None:
current_schema = Schema(NestedField(field_id=1, name="aCol", field_type=DoubleType(), required=False))
new_schema = Schema(NestedField(field_id=1, name="aCol", field_type=FloatType(), required=False))

applied = UpdateSchema(transaction=None, schema=current_schema).union_by_name(new_schema)._apply() # type: ignore

assert applied.as_struct() == current_schema.as_struct()
assert len(applied.fields) == 1
assert isinstance(applied.fields[0].field_type, DoubleType)


def test_promote_float_to_double() -> None:
current_schema = Schema(NestedField(field_id=1, name="aCol", field_type=FloatType(), required=False))
new_schema = Schema(NestedField(field_id=1, name="aCol", field_type=DoubleType(), required=False))
Expand All @@ -1200,11 +1211,33 @@ def test_promote_float_to_double() -> None:
assert isinstance(applied.fields[0].field_type, DoubleType)


def test_detect_invalid_promotion_double_to_float() -> None:
current_schema = Schema(NestedField(field_id=1, name="aCol", field_type=DoubleType(), required=False))
def test_allow_long_to_int() -> None:
current_schema = Schema(NestedField(field_id=1, name="aCol", field_type=LongType(), required=False))
new_schema = Schema(NestedField(field_id=1, name="aCol", field_type=IntegerType(), required=False))

applied = UpdateSchema(transaction=None, schema=current_schema).union_by_name(new_schema)._apply() # type: ignore

assert applied.as_struct() == current_schema.as_struct()
assert len(applied.fields) == 1
assert isinstance(applied.fields[0].field_type, LongType)


def test_promote_int_to_long() -> None:
current_schema = Schema(NestedField(field_id=1, name="aCol", field_type=IntegerType(), required=False))
new_schema = Schema(NestedField(field_id=1, name="aCol", field_type=LongType(), required=False))

applied = UpdateSchema(transaction=None, schema=current_schema).union_by_name(new_schema)._apply() # type: ignore

assert applied.as_struct() == new_schema.as_struct()
assert len(applied.fields) == 1
assert isinstance(applied.fields[0].field_type, LongType)


def test_detect_invalid_promotion_string_to_float() -> None:
current_schema = Schema(NestedField(field_id=1, name="aCol", field_type=StringType(), required=False))
new_schema = Schema(NestedField(field_id=1, name="aCol", field_type=FloatType(), required=False))

with pytest.raises(ValidationError, match="Cannot change column type: aCol: double -> float"):
with pytest.raises(ValidationError, match="Cannot change column type: aCol: string -> float"):
_ = UpdateSchema(transaction=None, schema=current_schema).union_by_name(new_schema)._apply() # type: ignore


Expand Down

0 comments on commit 06aa879

Please sign in to comment.