Skip to content

Commit

Permalink
Merge branch 'kevinliu/test' into kevinjqliu/fix-sql-catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinjqliu committed Mar 1, 2024
2 parents 2dcdd25 + 10adb1c commit 6e8d1b5
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 53 deletions.
7 changes: 6 additions & 1 deletion mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ hide:

# Catalogs

PyIceberg currently has native support for REST, SQL, Hive, Glue and DynamoDB.
PyIceberg currently has native support for REST, SQL, Hive, Glue, DynamoDB, and In-Memory catalogs.

There are three ways to pass in configuration:

Expand Down Expand Up @@ -260,6 +260,11 @@ catalog:
region_name: <REGION_NAME>
```

## In-Memory Catalog

The In-Memory catalog uses in-memory data-structures to store information.
This is useful for test, demo, and playground. Do not use in production as the data is not persisted.

# Concurrency

PyIceberg uses multiple threads to parallelize operations. The number of workers can be configured by supplying a `max-workers` entry in the configuration file, or by setting the `PYICEBERG_MAX_WORKERS` environment variable. The default value depends on the system hardware and Python version. See [the Python documentation](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) for more details.
2 changes: 1 addition & 1 deletion pyiceberg/cli/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def describe_properties(self, properties: Properties) -> None:
Console().print(output_table)

def text(self, response: str) -> None:
Console().print(response, soft_wrap=True)
Console(soft_wrap=True).print(response)

def schema(self, schema: Schema) -> None:
output_table = self._table
Expand Down
105 changes: 71 additions & 34 deletions tests/catalog/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
# under the License.
# pylint:disable=redefined-outer-name


import uuid
from pathlib import PosixPath
from typing import (
Dict,
List,
Expand All @@ -42,7 +45,7 @@
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io import load_file_io
from pyiceberg.io import WAREHOUSE, load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import (
Expand All @@ -55,15 +58,21 @@
TableIdentifier,
update_table_metadata,
)
from pyiceberg.table.metadata import TableMetadataV1
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.transforms import IdentityTransform
from pyiceberg.typedef import EMPTY_DICT
from pyiceberg.types import IntegerType, LongType, NestedField

DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse"


class InMemoryCatalog(Catalog):
"""An in-memory catalog implementation for testing purposes."""
"""
An in-memory catalog implementation that uses in-memory data-structures to store the namespaces and tables.
This is useful for test, demo, and playground but not in production as data is not persisted.
"""

__tables: Dict[Identifier, Table]
__namespaces: Dict[Identifier, Properties]
Expand All @@ -72,6 +81,7 @@ def __init__(self, name: str, **properties: str) -> None:
super().__init__(name, **properties)
self.__tables = {}
self.__namespaces = {}
self._warehouse_location = properties.get(WAREHOUSE, None) or DEFAULT_WAREHOUSE_LOCATION

def create_table(
self,
Expand All @@ -81,6 +91,7 @@ def create_table(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
table_uuid: Optional[uuid.UUID] = None,
) -> Table:
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore

Expand All @@ -93,24 +104,26 @@ def create_table(
if namespace not in self.__namespaces:
self.__namespaces[namespace] = {}

new_location = location or f's3://warehouse/{"/".join(identifier)}/data'
metadata = TableMetadataV1(**{
"format-version": 1,
"table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
"location": new_location,
"last-updated-ms": 1602638573874,
"last-column-id": schema.highest_field_id,
"schema": schema.model_dump(),
"partition-spec": partition_spec.model_dump()["fields"],
"properties": properties,
"current-snapshot-id": -1,
"snapshots": [{"snapshot-id": 1925, "timestamp-ms": 1602638573822}],
})
if not location:
location = f'{self._warehouse_location}/{"/".join(identifier)}'

metadata_location = self._get_metadata_location(location=location)
metadata = new_table_metadata(
schema=schema,
partition_spec=partition_spec,
sort_order=sort_order,
location=location,
properties=properties,
table_uuid=table_uuid,
)
io = load_file_io({**self.properties, **properties}, location=location)
self._write_metadata(metadata, io, metadata_location)

table = Table(
identifier=identifier,
metadata=metadata,
metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json',
io=load_file_io(),
metadata_location=metadata_location,
io=io,
catalog=self,
)
self.__tables[identifier] = table
Expand All @@ -120,14 +133,29 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
raise NotImplementedError

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
identifier = tuple(table_request.identifier.namespace.root) + (table_request.identifier.name,)
table = self.__tables[identifier]
table.metadata = update_table_metadata(base_metadata=table.metadata, updates=table_request.updates)

return CommitTableResponse(
metadata=table.metadata.model_dump(),
metadata_location=table.location(),
identifier_tuple = self.identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
current_table = self.load_table(identifier_tuple)
base_metadata = current_table.metadata

for requirement in table_request.requirements:
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata, table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

# update table state
current_table.metadata = updated_metadata

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)

def load_table(self, identifier: Union[str, Identifier]) -> Table:
identifier = self.identifier_to_tuple_without_catalog(identifier)
Expand Down Expand Up @@ -162,7 +190,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
identifier=to_identifier,
metadata=table.metadata,
metadata_location=table.metadata_location,
io=load_file_io(),
io=self._load_file_io(properties=table.metadata.properties, location=table.metadata_location),
catalog=self,
)
return self.__tables[to_identifier]
Expand Down Expand Up @@ -234,8 +262,8 @@ def update_namespace_properties(


@pytest.fixture
def catalog() -> InMemoryCatalog:
return InMemoryCatalog("test.in.memory.catalog", **{"test.key": "test.value"})
def catalog(tmp_path: PosixPath) -> InMemoryCatalog:
return InMemoryCatalog("test.in_memory.catalog", **{WAREHOUSE: tmp_path.absolute().as_posix(), "test.key": "test.value"})


TEST_TABLE_IDENTIFIER = ("com", "organization", "department", "my_table")
Expand All @@ -246,7 +274,6 @@ def catalog() -> InMemoryCatalog:
NestedField(2, "y", LongType(), doc="comment"),
NestedField(3, "z", LongType()),
)
TEST_TABLE_LOCATION = "protocol://some/location"
TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000))
TEST_TABLE_PROPERTIES = {"key1": "value1", "key2": "value2"}
NO_SUCH_TABLE_ERROR = "Table does not exist: \\('com', 'organization', 'department', 'my_table'\\)"
Expand All @@ -263,7 +290,6 @@ def given_catalog_has_a_table(
return catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=properties or TEST_TABLE_PROPERTIES,
)
Expand Down Expand Up @@ -309,13 +335,25 @@ def test_create_table(catalog: InMemoryCatalog) -> None:
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table


def test_create_table_location_override(catalog: InMemoryCatalog) -> None:
new_location = f"{catalog._warehouse_location}/new_location"
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=new_location,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
assert table.location() == new_location


@pytest.mark.parametrize(
"schema,expected",
[
Expand All @@ -337,8 +375,6 @@ def test_create_table_pyarrow_schema(catalog: InMemoryCatalog, pyarrow_schema_si
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=pyarrow_schema_simple_without_ids,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
Expand Down Expand Up @@ -584,6 +620,7 @@ def test_commit_table(catalog: InMemoryCatalog) -> None:
NestedField(2, "y", LongType(), doc="comment"),
NestedField(3, "z", LongType()),
NestedField(4, "add", LongType()),
schema_id=1,
)

# When
Expand Down Expand Up @@ -664,7 +701,7 @@ def test_add_column_with_statement(catalog: InMemoryCatalog) -> None:

def test_catalog_repr(catalog: InMemoryCatalog) -> None:
s = repr(catalog)
assert s == "test.in.memory.catalog (<class 'test_base.InMemoryCatalog'>)"
assert s == "test.in_memory.catalog (<class 'test_base.InMemoryCatalog'>)"


def test_table_properties_int_value(catalog: InMemoryCatalog) -> None:
Expand Down
36 changes: 19 additions & 17 deletions tests/cli/test_console.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,6 @@ def fixture_warehouse(tmp_path_factory: TempPathFactory) -> Path:
return tmp_path_factory.mktemp("test_sql")


@pytest.fixture()
def mock_datetime_now(monkeypatch: pytest.MonkeyPatch) -> None:
datetime_mock = MagicMock(wraps=datetime.datetime)
datetime_mock.now.return_value = datetime.datetime.fromtimestamp(TEST_TIMESTAMP / 1000.0).astimezone()
monkeypatch.setattr(datetime, "datetime", datetime_mock)


@pytest.fixture()
def mock_uuids(mocker: MockFixture) -> None:
return mocker.patch('uuid.uuid4', return_value=TEST_TABLE_UUID)
Expand All @@ -82,6 +75,13 @@ def fixture_namespace_properties() -> Properties:
return TEST_NAMESPACE_PROPERTIES.copy()


@pytest.fixture()
def mock_datetime_now(monkeypatch: pytest.MonkeyPatch) -> None:
datetime_mock = MagicMock(wraps=datetime.datetime)
datetime_mock.now.return_value = datetime.datetime.fromtimestamp(TEST_TIMESTAMP / 1000.0).astimezone()
monkeypatch.setattr(datetime, "datetime", datetime_mock)


TEST_TABLE_IDENTIFIER = ("default", "my_table")
TEST_TABLE_NAMESPACE = "default"
TEST_NAMESPACE_PROPERTIES = {"location": "s3://warehouse/database/location"}
Expand All @@ -91,11 +91,10 @@ def fixture_namespace_properties() -> Properties:
NestedField(2, "y", LongType(), doc="comment"),
NestedField(3, "z", LongType()),
)
TEST_TABLE_LOCATION = "s3://bucket/test/location"
TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000))
TEST_TABLE_PROPERTIES = {"read.split.target.size": "134217728"}
TEST_TIMESTAMP = 1602638573874
TEST_TABLE_UUID = uuid.UUID("d20125c8-7284-442c-9aea-15fee620737c")
TEST_TIMESTAMP = 1602638573874
MOCK_ENVIRONMENT = {"PYICEBERG_CATALOG__PRODUCTION__URI": "test://doesnotexist"}


Expand Down Expand Up @@ -145,11 +144,12 @@ def test_describe_namespace_does_not_exists(catalog: Catalog) -> None:


@pytest.fixture()
def test_describe_table(catalog: Catalog) -> None:
def test_describe_table(catalog: Catalog, mock_datetime_now: None) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
table_uuid=TEST_TABLE_UUID,
)

runner = CliRunner()
Expand Down Expand Up @@ -249,6 +249,7 @@ def test_uuid(catalog: Catalog) -> None:
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
table_uuid=TEST_TABLE_UUID,
)

runner = CliRunner()
Expand All @@ -267,16 +268,15 @@ def test_uuid_does_not_exists(catalog: Catalog) -> None:


def test_location(catalog: Catalog) -> None:
tbl = catalog.create_table(
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)

runner = CliRunner()
result = runner.invoke(run, ["location", "default.my_table"])
assert result.exit_code == 0
assert result.output == f"""{tbl.metadata.location}\n"""
assert result.output == f"""{catalog._warehouse_location}/default/my_table\n"""


def test_location_does_not_exists(catalog: Catalog) -> None:
Expand Down Expand Up @@ -579,19 +579,20 @@ def test_json_describe_namespace_does_not_exists(catalog: Catalog) -> None:


@pytest.fixture()
def test_json_describe_table(catalog: Catalog) -> None:
def test_json_describe_table(catalog: Catalog, mock_datetime_now: None) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
table_uuid=TEST_TABLE_UUID,
)

runner = CliRunner()
result = runner.invoke(run, ["--output=json", "describe", "default.my_table"])
assert result.exit_code == 0
assert (
result.output
== """{"identifier":["default","my_table"],"metadata_location":"s3://warehouse/default/my_table/metadata/metadata.json","metadata":{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}}\n"""
== """{"identifier":["default","my_table"],"metadata_location":"s3://warehouse/default/my_table/metadata/metadata.json","metadata":{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":2,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}}\n"""
)


Expand Down Expand Up @@ -660,6 +661,7 @@ def test_json_uuid(catalog: Catalog) -> None:
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
table_uuid=TEST_TABLE_UUID,
)

runner = CliRunner()
Expand All @@ -678,7 +680,7 @@ def test_json_uuid_does_not_exists(catalog: Catalog) -> None:


def test_json_location(catalog: Catalog) -> None:
tbl = catalog.create_table(
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
Expand All @@ -687,7 +689,7 @@ def test_json_location(catalog: Catalog) -> None:
runner = CliRunner()
result = runner.invoke(run, ["--output=json", "location", "default.my_table"])
assert result.exit_code == 0
assert result.output == f""""{tbl.metadata.location}"\n"""
assert result.output == f'"{catalog._warehouse_location}/default/my_table"\n'


def test_json_location_does_not_exists(catalog: Catalog) -> None:
Expand Down

0 comments on commit 6e8d1b5

Please sign in to comment.