Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues related to having catalog_name in identifier #964

Merged
merged 3 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,10 @@ def _identifier_to_validated_tuple(self, identifier: Union[str, Identifier]) ->

def _split_identifier_for_path(self, identifier: Union[str, Identifier, TableIdentifier]) -> Properties:
if isinstance(identifier, TableIdentifier):
return {"namespace": NAMESPACE_SEPARATOR.join(identifier.namespace.root[1:]), "table": identifier.name}
if identifier.namespace.root[0] == self.name:
return {"namespace": NAMESPACE_SEPARATOR.join(identifier.namespace.root[1:]), "table": identifier.name}
else:
return {"namespace": NAMESPACE_SEPARATOR.join(identifier.namespace.root), "table": identifier.name}
identifier_tuple = self._identifier_to_validated_tuple(identifier)
return {"namespace": NAMESPACE_SEPARATOR.join(identifier_tuple[:-1]), "table": identifier_tuple[-1]}

Expand Down Expand Up @@ -675,6 +678,17 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U

return self.load_table(to_identifier)

def _remove_catalog_name_from_table_request_identifier(self, table_request: CommitTableRequest) -> CommitTableRequest:
if table_request.identifier.namespace.root[0] == self.name:
return table_request.model_copy(
update={
"identifier": TableIdentifier(
namespace=table_request.identifier.namespace.root[1:], name=table_request.identifier.name
).model_dump()
}
)
return table_request

@retry(**_RETRY_ARGS)
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
Expand All @@ -692,7 +706,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
"""
response = self._session.post(
self.url(Endpoints.update_table, prefixed=True, **self._split_identifier_for_path(table_request.identifier)),
data=table_request.model_dump_json().encode(UTF8),
data=self._remove_catalog_name_from_table_request_identifier(table_request).model_dump_json().encode(UTF8),
)
try:
response.raise_for_status()
Expand Down
24 changes: 23 additions & 1 deletion tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from pyiceberg.io import load_file_io
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.table import CommitTableRequest, Table, TableIdentifier
from pyiceberg.table.metadata import TableMetadataV1
from pyiceberg.table.sorting import SortField, SortOrder
from pyiceberg.transforms import IdentityTransform, TruncateTransform
Expand Down Expand Up @@ -1226,3 +1226,25 @@ def test_catalog_from_parameters_empty_env(rest_mock: Mocker) -> None:

catalog = cast(RestCatalog, load_catalog("production", uri="https://other-service.io/api"))
assert catalog.uri == "https://other-service.io/api"


def test_table_identifier_in_commit_table_request(rest_mock: Mocker, example_table_metadata_v2: Dict[str, Any]) -> None:
test_table_request = CommitTableRequest(
identifier=TableIdentifier(namespace=("catalog_name", "namespace"), name="table_name"),
updates=[],
requirements=[],
)
rest_mock.post(
url=f"{TEST_URI}v1/namespaces/namespace/tables/table_name",
json={
"metadata": example_table_metadata_v2,
"metadata-location": "test",
},
status_code=200,
request_headers=TEST_HEADERS,
)
RestCatalog("catalog_name", uri=TEST_URI, token=TEST_TOKEN)._commit_table(test_table_request)
assert (
rest_mock.last_request.text
== """{"identifier":{"namespace":["namespace"],"name":"table_name"},"requirements":[],"updates":[]}"""
)
13 changes: 12 additions & 1 deletion tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from pyspark.sql import SparkSession
from pytest_mock.plugin import MockerFixture

from pyiceberg.catalog import Catalog
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.catalog.hive import HiveCatalog
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.catalog.sql import SqlCatalog
Expand Down Expand Up @@ -1282,3 +1282,14 @@ def test_merge_manifests_file_content(session_catalog: Catalog, arrow_table_with
(11, 3),
(12, 3),
]


@pytest.mark.integration
def test_rest_catalog_with_empty_catalog_name_append_data(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.test_rest_append"
test_catalog = load_catalog(
"", # intentionally empty
**session_catalog.properties,
)
tbl = _create_table(test_catalog, identifier, data=[])
tbl.append(arrow_table_with_null)