diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index f0eb56ebd5..4a461ccc0d 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -24,7 +24,7 @@ hide: # Catalogs -PyIceberg currently has native support for REST, SQL, Hive, Glue and DynamoDB. +PyIceberg currently has native support for REST, SQL, Hive, Glue, DynamoDB, and In-Memory catalogs. There are three ways to pass in configuration: @@ -260,6 +260,11 @@ catalog: region_name: ``` +## In-Memory Catalog + +The In-Memory catalog uses in-memory data-structures to store information. +This is useful for test, demo, and playground. Do not use in production as the data is not persisted. + # Concurrency PyIceberg uses multiple threads to parallelize operations. The number of workers can be configured by supplying a `max-workers` entry in the configuration file, or by setting the `PYICEBERG_MAX_WORKERS` environment variable. The default value depends on the system hardware and Python version. See [the Python documentation](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) for more details. diff --git a/pyiceberg/cli/output.py b/pyiceberg/cli/output.py index d9772a3991..3377ef3d2a 100644 --- a/pyiceberg/cli/output.py +++ b/pyiceberg/cli/output.py @@ -157,7 +157,7 @@ def describe_properties(self, properties: Properties) -> None: Console().print(output_table) def text(self, response: str) -> None: - Console().print(response, soft_wrap=True) + Console(soft_wrap=True).print(response) def schema(self, schema: Schema) -> None: output_table = self._table diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index c7d3f01ff1..ae943c384b 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -16,6 +16,9 @@ # under the License. # pylint:disable=redefined-outer-name + +import uuid +from pathlib import PosixPath from typing import ( Dict, List, @@ -42,7 +45,7 @@ NoSuchTableError, TableAlreadyExistsError, ) -from pyiceberg.io import load_file_io +from pyiceberg.io import WAREHOUSE, load_file_io from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import ( @@ -55,15 +58,21 @@ TableIdentifier, update_table_metadata, ) -from pyiceberg.table.metadata import TableMetadataV1 +from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.transforms import IdentityTransform from pyiceberg.typedef import EMPTY_DICT from pyiceberg.types import IntegerType, LongType, NestedField +DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse" + class InMemoryCatalog(Catalog): - """An in-memory catalog implementation for testing purposes.""" + """ + An in-memory catalog implementation that uses in-memory data-structures to store the namespaces and tables. + + This is useful for test, demo, and playground but not in production as data is not persisted. + """ __tables: Dict[Identifier, Table] __namespaces: Dict[Identifier, Properties] @@ -72,6 +81,7 @@ def __init__(self, name: str, **properties: str) -> None: super().__init__(name, **properties) self.__tables = {} self.__namespaces = {} + self._warehouse_location = properties.get(WAREHOUSE, None) or DEFAULT_WAREHOUSE_LOCATION def create_table( self, @@ -81,6 +91,7 @@ def create_table( partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, + table_uuid: Optional[uuid.UUID] = None, ) -> Table: schema: Schema = self._convert_schema_if_needed(schema) # type: ignore @@ -93,24 +104,26 @@ def create_table( if namespace not in self.__namespaces: self.__namespaces[namespace] = {} - new_location = location or f's3://warehouse/{"/".join(identifier)}/data' - metadata = TableMetadataV1(**{ - "format-version": 1, - "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c", - "location": new_location, - "last-updated-ms": 1602638573874, - "last-column-id": schema.highest_field_id, - "schema": schema.model_dump(), - "partition-spec": partition_spec.model_dump()["fields"], - "properties": properties, - "current-snapshot-id": -1, - "snapshots": [{"snapshot-id": 1925, "timestamp-ms": 1602638573822}], - }) + if not location: + location = f'{self._warehouse_location}/{"/".join(identifier)}' + + metadata_location = self._get_metadata_location(location=location) + metadata = new_table_metadata( + schema=schema, + partition_spec=partition_spec, + sort_order=sort_order, + location=location, + properties=properties, + table_uuid=table_uuid, + ) + io = load_file_io({**self.properties, **properties}, location=location) + self._write_metadata(metadata, io, metadata_location) + table = Table( identifier=identifier, metadata=metadata, - metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json', - io=load_file_io(), + metadata_location=metadata_location, + io=io, catalog=self, ) self.__tables[identifier] = table @@ -120,14 +133,29 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: raise NotImplementedError def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - identifier = tuple(table_request.identifier.namespace.root) + (table_request.identifier.name,) - table = self.__tables[identifier] - table.metadata = update_table_metadata(base_metadata=table.metadata, updates=table_request.updates) - - return CommitTableResponse( - metadata=table.metadata.model_dump(), - metadata_location=table.location(), + identifier_tuple = self.identifier_to_tuple_without_catalog( + tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) ) + current_table = self.load_table(identifier_tuple) + base_metadata = current_table.metadata + + for requirement in table_request.requirements: + requirement.validate(base_metadata) + + updated_metadata = update_table_metadata(base_metadata, table_request.updates) + if updated_metadata == base_metadata: + # no changes, do nothing + return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) + + # write new metadata + new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 + new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) + self._write_metadata(updated_metadata, current_table.io, new_metadata_location) + + # update table state + current_table.metadata = updated_metadata + + return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) def load_table(self, identifier: Union[str, Identifier]) -> Table: identifier = self.identifier_to_tuple_without_catalog(identifier) @@ -162,7 +190,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U identifier=to_identifier, metadata=table.metadata, metadata_location=table.metadata_location, - io=load_file_io(), + io=self._load_file_io(properties=table.metadata.properties, location=table.metadata_location), catalog=self, ) return self.__tables[to_identifier] @@ -234,8 +262,8 @@ def update_namespace_properties( @pytest.fixture -def catalog() -> InMemoryCatalog: - return InMemoryCatalog("test.in.memory.catalog", **{"test.key": "test.value"}) +def catalog(tmp_path: PosixPath) -> InMemoryCatalog: + return InMemoryCatalog("test.in_memory.catalog", **{WAREHOUSE: tmp_path.absolute().as_posix(), "test.key": "test.value"}) TEST_TABLE_IDENTIFIER = ("com", "organization", "department", "my_table") @@ -246,7 +274,6 @@ def catalog() -> InMemoryCatalog: NestedField(2, "y", LongType(), doc="comment"), NestedField(3, "z", LongType()), ) -TEST_TABLE_LOCATION = "protocol://some/location" TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000)) TEST_TABLE_PROPERTIES = {"key1": "value1", "key2": "value2"} NO_SUCH_TABLE_ERROR = "Table does not exist: \\('com', 'organization', 'department', 'my_table'\\)" @@ -263,7 +290,6 @@ def given_catalog_has_a_table( return catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=properties or TEST_TABLE_PROPERTIES, ) @@ -309,13 +335,25 @@ def test_create_table(catalog: InMemoryCatalog) -> None: table = catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table +def test_create_table_location_override(catalog: InMemoryCatalog) -> None: + new_location = f"{catalog._warehouse_location}/new_location" + table = catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + location=new_location, + partition_spec=TEST_TABLE_PARTITION_SPEC, + properties=TEST_TABLE_PROPERTIES, + ) + assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table + assert table.location() == new_location + + @pytest.mark.parametrize( "schema,expected", [ @@ -337,8 +375,6 @@ def test_create_table_pyarrow_schema(catalog: InMemoryCatalog, pyarrow_schema_si table = catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=pyarrow_schema_simple_without_ids, - location=TEST_TABLE_LOCATION, - partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table @@ -584,6 +620,7 @@ def test_commit_table(catalog: InMemoryCatalog) -> None: NestedField(2, "y", LongType(), doc="comment"), NestedField(3, "z", LongType()), NestedField(4, "add", LongType()), + schema_id=1, ) # When @@ -664,7 +701,7 @@ def test_add_column_with_statement(catalog: InMemoryCatalog) -> None: def test_catalog_repr(catalog: InMemoryCatalog) -> None: s = repr(catalog) - assert s == "test.in.memory.catalog ()" + assert s == "test.in_memory.catalog ()" def test_table_properties_int_value(catalog: InMemoryCatalog) -> None: diff --git a/tests/cli/test_console.py b/tests/cli/test_console.py index 140e83fa15..60672f9596 100644 --- a/tests/cli/test_console.py +++ b/tests/cli/test_console.py @@ -58,13 +58,6 @@ def fixture_warehouse(tmp_path_factory: TempPathFactory) -> Path: return tmp_path_factory.mktemp("test_sql") -@pytest.fixture() -def mock_datetime_now(monkeypatch: pytest.MonkeyPatch) -> None: - datetime_mock = MagicMock(wraps=datetime.datetime) - datetime_mock.now.return_value = datetime.datetime.fromtimestamp(TEST_TIMESTAMP / 1000.0).astimezone() - monkeypatch.setattr(datetime, "datetime", datetime_mock) - - @pytest.fixture() def mock_uuids(mocker: MockFixture) -> None: return mocker.patch('uuid.uuid4', return_value=TEST_TABLE_UUID) @@ -82,6 +75,13 @@ def fixture_namespace_properties() -> Properties: return TEST_NAMESPACE_PROPERTIES.copy() +@pytest.fixture() +def mock_datetime_now(monkeypatch: pytest.MonkeyPatch) -> None: + datetime_mock = MagicMock(wraps=datetime.datetime) + datetime_mock.now.return_value = datetime.datetime.fromtimestamp(TEST_TIMESTAMP / 1000.0).astimezone() + monkeypatch.setattr(datetime, "datetime", datetime_mock) + + TEST_TABLE_IDENTIFIER = ("default", "my_table") TEST_TABLE_NAMESPACE = "default" TEST_NAMESPACE_PROPERTIES = {"location": "s3://warehouse/database/location"} @@ -91,11 +91,10 @@ def fixture_namespace_properties() -> Properties: NestedField(2, "y", LongType(), doc="comment"), NestedField(3, "z", LongType()), ) -TEST_TABLE_LOCATION = "s3://bucket/test/location" TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000)) TEST_TABLE_PROPERTIES = {"read.split.target.size": "134217728"} -TEST_TIMESTAMP = 1602638573874 TEST_TABLE_UUID = uuid.UUID("d20125c8-7284-442c-9aea-15fee620737c") +TEST_TIMESTAMP = 1602638573874 MOCK_ENVIRONMENT = {"PYICEBERG_CATALOG__PRODUCTION__URI": "test://doesnotexist"} @@ -145,11 +144,12 @@ def test_describe_namespace_does_not_exists(catalog: Catalog) -> None: @pytest.fixture() -def test_describe_table(catalog: Catalog) -> None: +def test_describe_table(catalog: Catalog, mock_datetime_now: None) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, partition_spec=TEST_TABLE_PARTITION_SPEC, + table_uuid=TEST_TABLE_UUID, ) runner = CliRunner() @@ -249,6 +249,7 @@ def test_uuid(catalog: Catalog) -> None: identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, partition_spec=TEST_TABLE_PARTITION_SPEC, + table_uuid=TEST_TABLE_UUID, ) runner = CliRunner() @@ -267,16 +268,15 @@ def test_uuid_does_not_exists(catalog: Catalog) -> None: def test_location(catalog: Catalog) -> None: - tbl = catalog.create_table( + catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, partition_spec=TEST_TABLE_PARTITION_SPEC, ) - runner = CliRunner() result = runner.invoke(run, ["location", "default.my_table"]) assert result.exit_code == 0 - assert result.output == f"""{tbl.metadata.location}\n""" + assert result.output == f"""{catalog._warehouse_location}/default/my_table\n""" def test_location_does_not_exists(catalog: Catalog) -> None: @@ -579,11 +579,12 @@ def test_json_describe_namespace_does_not_exists(catalog: Catalog) -> None: @pytest.fixture() -def test_json_describe_table(catalog: Catalog) -> None: +def test_json_describe_table(catalog: Catalog, mock_datetime_now: None) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, partition_spec=TEST_TABLE_PARTITION_SPEC, + table_uuid=TEST_TABLE_UUID, ) runner = CliRunner() @@ -591,7 +592,7 @@ def test_json_describe_table(catalog: Catalog) -> None: assert result.exit_code == 0 assert ( result.output - == """{"identifier":["default","my_table"],"metadata_location":"s3://warehouse/default/my_table/metadata/metadata.json","metadata":{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}}\n""" + == """{"identifier":["default","my_table"],"metadata_location":"s3://warehouse/default/my_table/metadata/metadata.json","metadata":{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":2,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}}\n""" ) @@ -660,6 +661,7 @@ def test_json_uuid(catalog: Catalog) -> None: identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, partition_spec=TEST_TABLE_PARTITION_SPEC, + table_uuid=TEST_TABLE_UUID, ) runner = CliRunner() @@ -678,7 +680,7 @@ def test_json_uuid_does_not_exists(catalog: Catalog) -> None: def test_json_location(catalog: Catalog) -> None: - tbl = catalog.create_table( + catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, partition_spec=TEST_TABLE_PARTITION_SPEC, @@ -687,7 +689,7 @@ def test_json_location(catalog: Catalog) -> None: runner = CliRunner() result = runner.invoke(run, ["--output=json", "location", "default.my_table"]) assert result.exit_code == 0 - assert result.output == f""""{tbl.metadata.location}"\n""" + assert result.output == f'"{catalog._warehouse_location}/default/my_table"\n' def test_json_location_does_not_exists(catalog: Catalog) -> None: