Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Support iceberg base catalog in python library (#3245) #4706

Merged
merged 32 commits into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
f619c13
Python: Add base Catalog and NamespacedCatalog, with placeholder for …
dhruv-pratap May 5, 2022
da2e057
Python: Add base Catalog and NamespacedCatalog, with placeholder for …
dhruv-pratap May 5, 2022
adba63e
Python: Change Docstring format from sphinx to google for Catalog (#3…
dhruv-pratap May 5, 2022
794fa04
Python: Collapse NamespacedCatalog into Catalog(ABC) and use collecti…
dhruv-pratap May 6, 2022
68ce34c
Python: Added a TableSpec class to hold user provided table specifica…
dhruv-pratap May 6, 2022
1ab5b89
Python: Added an In-Memory Catalog implementation with table APIs imp…
dhruv-pratap May 9, 2022
7f3b810
Python: Completed the In-Memory Catalog implementation with all APIs …
dhruv-pratap May 10, 2022
e699d6b
Python: Rename Catalog API get_namespace_metadata() to load_namespace…
dhruv-pratap May 10, 2022
33a2714
Python: Change namespace to be a Tuple of levels. Remove TableSpec cl…
dhruv-pratap May 11, 2022
f904c50
Python: Move custom errors to exceptions.py (#3245)
dhruv-pratap May 11, 2022
bc3b3c6
Python: Move catalog fixture to conftest.py. Remove InMemoryTable. (#…
dhruv-pratap May 13, 2022
5520d62
Python: Change catalog API to accept string or tuple of strings as ta…
dhruv-pratap May 16, 2022
0f2b240
Python: Fix list_tables return type to List of Identifier. (#3245)
dhruv-pratap May 16, 2022
b08349c
Python: Rename type alias Metadata to Properties. Move helper identif…
dhruv-pratap May 18, 2022
edf11c1
Python: Add base Catalog and NamespacedCatalog, with placeholder for …
dhruv-pratap May 5, 2022
b9c0230
Python: Add base Catalog and NamespacedCatalog, with placeholder for …
dhruv-pratap May 5, 2022
011240b
Python: Change Docstring format from sphinx to google for Catalog (#3…
dhruv-pratap May 5, 2022
1564329
Python: Collapse NamespacedCatalog into Catalog(ABC) and use collecti…
dhruv-pratap May 6, 2022
bb6f98d
Python: Added a TableSpec class to hold user provided table specifica…
dhruv-pratap May 6, 2022
c8a87e2
Python: Added an In-Memory Catalog implementation with table APIs imp…
dhruv-pratap May 9, 2022
8300905
Python: Completed the In-Memory Catalog implementation with all APIs …
dhruv-pratap May 10, 2022
a3a1689
Python: Rename Catalog API get_namespace_metadata() to load_namespace…
dhruv-pratap May 10, 2022
b11c334
Python: Change namespace to be a Tuple of levels. Remove TableSpec cl…
dhruv-pratap May 11, 2022
f7414a2
Python: Move custom errors to exceptions.py (#3245)
dhruv-pratap May 11, 2022
e577988
Python: Move catalog fixture to conftest.py. Remove InMemoryTable. (#…
dhruv-pratap May 13, 2022
d884cb6
Python: Change catalog API to accept string or tuple of strings as ta…
dhruv-pratap May 16, 2022
fe39f02
Python: Fix list_tables return type to List of Identifier. (#3245)
dhruv-pratap May 16, 2022
c147259
Python: Rename type alias Metadata to Properties. Move helper identif…
dhruv-pratap May 18, 2022
c4030f6
Python: Sync from master to pull in all recent PRs. (#3245)
dhruv-pratap May 18, 2022
5af795b
Python: Add namespace to spellcheck-dictionary.txt (#3245)
dhruv-pratap May 19, 2022
7e00c96
Python: Fix linter errors. (#3245)
dhruv-pratap May 19, 2022
5bae65d
Python: Standardize error messages. (#3245)
dhruv-pratap May 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ package_dir =
packages = find:
python_requires = >=3.7
install_requires =
attrs
dhruv-pratap marked this conversation as resolved.
Show resolved Hide resolved
mmh3
singledispatch
[options.extras_require]
Expand Down
222 changes: 222 additions & 0 deletions python/src/iceberg/catalog/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Tuple

from iceberg.table.base import Table, TableSpec


class Catalog(ABC):
"""Base Catalog for table operations like - create, drop, load, list and others.

Attributes:
name(str): Name of the catalog
properties(Dict[str, str]): Catalog properties
"""

def __init__(self, name: str, properties: Dict[str, str]):
self._name = name
self._properties = properties

@property
def name(self) -> str:
return self._name

@property
def properties(self) -> Dict[str, str]:
return self._properties

@abstractmethod
def create_table(self, table_spec: TableSpec) -> Table:
dhruv-pratap marked this conversation as resolved.
Show resolved Hide resolved
"""Create a table

Args:
table_spec: A specification to create a table

Returns:
Table: the created table instance

Raises:
AlreadyExistsError: If a table with the name already exists
"""

@abstractmethod
def table(self, namespace: str, name: str) -> Table:
dhruv-pratap marked this conversation as resolved.
Show resolved Hide resolved
"""Loads the table's metadata and returns the table instance.

You can also use this method to check for table existence using 'try catalog.table() except TableNotFoundError'
dhruv-pratap marked this conversation as resolved.
Show resolved Hide resolved
Note: This method does not load table's data in any form.
dhruv-pratap marked this conversation as resolved.
Show resolved Hide resolved

Args:
namespace: Table's namespace
name: Table's name.

Returns:
Table: the table instance with its metadata

Raises:
TableNotFoundError: If a table with the name does not exist
"""

@abstractmethod
def drop_table(self, namespace: str, name: str, purge: bool = True) -> None:
"""Drop a table; Optionally purge all data and metadata files.

Args:
namespace: table namespace
name: table name
purge: Defaults to true, which deletes all data and metadata files in the table; Optional Argument

Raises:
TableNotFoundError: If a table with the name does not exist
"""

@abstractmethod
def rename_table(self, from_namespace: str, from_name: str, to_namespace: str, to_name: str) -> Table:
"""Rename a fully classified table name

Args:
from_namespace: Existing table's namespace.
from_name: Existing table's name.
to_namespace: New Table namespace to be assigned.
to_name: New Table name to be assigned.

Returns:
Table: the updated table instance with its metadata

Raises:
TableNotFoundError: If a table with the name does not exist
"""

@abstractmethod
def replace_table(self, table_spec: TableSpec) -> Table:
"""Starts a transaction and replaces the table with the provided spec.
dhruv-pratap marked this conversation as resolved.
Show resolved Hide resolved

Args:
table_spec: A specification to replace a table

Returns:
Table: the replaced table instance with the updated state

Raises:
TableNotFoundError: If a table with the name does not exist
"""

@abstractmethod
def create_namespace(self, namespace: str, properties: Optional[Dict[str, str]] = None) -> None:
"""Create a namespace in the catalog.

Args:
namespace: The namespace to be created.
properties: A string dictionary of properties for the given namespace

Raises:
AlreadyExistsError: If a namespace with the name already exists in the namespace
"""

@abstractmethod
def drop_namespace(self, namespace: str) -> None:
"""Drop a namespace.

Args:
namespace: The namespace to be dropped.

Raises:
NamespaceNotFoundError: If a namespace with the name does not exist in the namespace
dhruv-pratap marked this conversation as resolved.
Show resolved Hide resolved
NamespaceNotEmptyError: If the namespace is not empty
"""

@abstractmethod
def list_tables(self, namespace: Optional[str] = None) -> List[Tuple[str, str]]:
"""List tables under the given namespace in the catalog.

If namespace not provided, will list all tables in the catalog.

Args:
namespace: the namespace to search

Returns:
List[Tuple[str, str]]: list of tuple of table namespace and their names.

Raises:
NamespaceNotFoundError: If a namespace with the name does not exist in the namespace
"""

@abstractmethod
def list_namespaces(self) -> List[str]:
"""List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.

Returns:
List[str]: a List of namespace string
"""

@abstractmethod
def load_namespace_metadata(self, namespace: str) -> Dict[str, str]:
"""Get metadata dictionary for a namespace.

Args:
namespace: the namespace

Returns:
Dict[str, str]: a string dictionary of properties for the given namespace

Raises:
NamespaceNotFoundError: If a namespace with the name does not exist in the namespace
"""

@abstractmethod
def set_namespace_metadata(self, namespace: str, metadata: Dict[str, str]) -> None:
"""Update or remove metadata for a namespace.

Note: Existing metadata is overridden, use get, mutate, and then set.

Args:
namespace: the namespace
metadata: a string dictionary of properties for the given namespace

Raises:
NamespaceNotFoundError: If a namespace with the name does not exist in the namespace
"""


class TableNotFoundError(Exception):
dhruv-pratap marked this conversation as resolved.
Show resolved Hide resolved
"""Exception when a table is not found in the catalog"""

def __init__(self, name: str):
super().__init__(self, f"Table {name} not found in the catalog")


class NamespaceNotFoundError(Exception):
"""Exception when a Namespace is not found in the catalog"""

def __init__(self, namespace: str):
super().__init__(self, f"Namespace {namespace} not found in the catalog")


class NamespaceNotEmptyError(Exception):
"""Exception when a Namespace is not empty"""

def __init__(self, namespace: str):
super().__init__(self, f"Namespace {namespace} not empty")


class AlreadyExistsError(Exception):
"""Exception when an entity like table or namespace already exists in the catalog"""

def __init__(self, name: str):
super().__init__(self, f"Table or namespace {name} already exists")
139 changes: 139 additions & 0 deletions python/src/iceberg/catalog/in_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import Dict, List, Optional, Tuple, cast

from iceberg.catalog.base import (
AlreadyExistsError,
Catalog,
NamespaceNotEmptyError,
NamespaceNotFoundError,
TableNotFoundError,
)
from iceberg.table.base import Table, TableSpec


class InMemoryCatalog(Catalog):
dhruv-pratap marked this conversation as resolved.
Show resolved Hide resolved
__tables: Dict[tuple, Table]
__namespaces: Dict[str, Dict[str, str]]

def __init__(self, name: str, properties: Dict[str, str]):
super().__init__(name, properties)
self.__tables = {}
self.__namespaces = {}

def create_table(self, spec: TableSpec) -> Table:
if (spec.namespace, spec.name) in self.__tables:
raise AlreadyExistsError(spec.name)
else:
if spec.namespace not in self.__namespaces:
self.__namespaces[spec.namespace] = {}

table = Table(spec)
self.__tables[(spec.namespace, spec.name)] = table
return table

def table(self, namespace: str, name: str) -> Table:
try:
return self.__tables[(namespace, name)]
except KeyError:
raise TableNotFoundError(name)

def drop_table(self, namespace: str, name: str, purge: bool = True) -> None:
try:
self.__tables.pop((namespace, name))
except KeyError:
raise TableNotFoundError(name)

def rename_table(self, from_namespace: str, from_name: str, to_namespace: str, to_name: str) -> Table:
try:
table = self.__tables.pop((from_namespace, from_name))
except KeyError:
raise TableNotFoundError(from_name)

renamed_table = Table(
TableSpec(
namespace=to_namespace,
name=to_name,
schema=table.spec.schema,
location=table.spec.location,
partition_spec=table.spec.partition_spec,
properties=table.spec.properties,
)
)
if to_namespace not in self.__namespaces:
self.__namespaces[to_namespace] = {}

self.__tables[(to_namespace, to_name)] = renamed_table
return renamed_table

def replace_table(self, table_spec: TableSpec) -> Table:
try:
table = self.__tables.pop((table_spec.namespace, table_spec.name))
except KeyError:
raise TableNotFoundError(table_spec.name)

replaced_table = Table(
TableSpec(
namespace=table_spec.namespace if table_spec.namespace else table.spec.namespace,
name=table_spec.name if table_spec.name else table.spec.name,
schema=table_spec.schema if table_spec.schema else table.spec.schema,
location=table_spec.location if table_spec.location else table.spec.location,
partition_spec=table_spec.partition_spec if table_spec.partition_spec else table.spec.partition_spec,
properties={**table.spec.properties, **table_spec.properties},
)
)
self.__tables[(replaced_table.spec.namespace, replaced_table.spec.name)] = replaced_table
return replaced_table

def create_namespace(self, namespace: str, properties: Optional[Dict[str, str]] = None) -> None:
if namespace in self.__namespaces:
raise AlreadyExistsError(namespace)
else:
self.__namespaces[namespace] = properties if properties else {}

def drop_namespace(self, namespace: str) -> None:
if [table_name_tuple for table_name_tuple in self.__tables.keys() if namespace in table_name_tuple]:
raise NamespaceNotEmptyError(namespace)
try:
self.__namespaces.pop(namespace)
except KeyError:
raise NamespaceNotFoundError(namespace)

def list_tables(self, namespace: Optional[str] = None) -> List[Tuple[str, str]]:
if namespace:
list_tables = [table_name_tuple for table_name_tuple in self.__tables.keys() if namespace in table_name_tuple]
else:
list_tables = list(self.__tables.keys())

# Casting to make mypy happy
return cast(List[Tuple[str, str]], list_tables)

def list_namespaces(self) -> List[str]:
return list(self.__namespaces.keys())

def load_namespace_metadata(self, namespace: str) -> Dict[str, str]:
try:
return self.__namespaces[namespace]
except KeyError:
raise NamespaceNotFoundError(namespace)

def set_namespace_metadata(self, namespace: str, metadata: Dict[str, str]) -> None:
if namespace in self.__namespaces:
self.__namespaces[namespace] = metadata
else:
raise NamespaceNotFoundError(namespace)
Loading