Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2003 - PoC: ibis support #2004

Draft
wants to merge 10 commits into
base: devel
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,16 @@
try:
from dlt.common.libs.pandas import DataFrame
from dlt.common.libs.pyarrow import Table as ArrowTable
from dlt.common.libs.ibis import BaseBackend as IbisBackend
except MissingDependencyException:
DataFrame = Any
ArrowTable = Any
IbisBackend = Any

else:
DataFrame = Any
ArrowTable = Any
IbisBackend = Any


class StorageSchemaInfo(NamedTuple):
Expand Down Expand Up @@ -566,12 +570,17 @@ def close(self) -> None: ...
class SupportsReadableDataset(Protocol):
"""A readable dataset retrieved from a destination, has support for creating readable relations for a query or table"""

@property
def schema(self) -> Schema: ...

def __call__(self, query: Any) -> SupportsReadableRelation: ...

def __getitem__(self, table: str) -> SupportsReadableRelation: ...

def __getattr__(self, table: str) -> SupportsReadableRelation: ...

def ibis(self) -> IbisBackend: ...


class JobClientBase(ABC):
def __init__(
Expand Down
59 changes: 59 additions & 0 deletions dlt/common/libs/ibis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from typing import cast

from dlt.common.exceptions import MissingDependencyException
from dlt.common.schema import Schema

from dlt.common.destination.reference import TDestinationReferenceArg, Destination, JobClientBase

try:
import ibis # type: ignore[import-not-found]
from ibis import BaseBackend
except ModuleNotFoundError:
raise MissingDependencyException("dlt ibis Helpers", ["ibis"])


SUPPORTED_DESTINATIONS = [
"dlt.destinations.postgres",
"dlt.destinations.duckdb",
"dlt.destinations.filesystem",
]


def create_ibis_backend(
destination: TDestinationReferenceArg, dataset_name: str, client: JobClientBase
) -> BaseBackend:
"""Create a given ibis backend for a destination client and dataset"""
import duckdb
from dlt.destinations.impl.duckdb.factory import DuckDbCredentials

# check if destination is supported
destination_type = Destination.from_reference(destination).destination_type
if destination_type not in SUPPORTED_DESTINATIONS:
raise NotImplementedError(f"Destination of type {destination_type} not supported")

if destination_type in ["dlt.destinations.postgres", "dlt.destinations.duckdb"]:
credentials = client.config.credentials.to_native_representation()
con = ibis.connect(credentials)
elif destination_type == "dlt.destinations.filesystem":
from dlt.destinations.impl.filesystem.sql_client import (
FilesystemClient,
FilesystemSqlClient,
)

# we create an in memory duckdb and create all tables on there
duck = duckdb.connect(":memory:")
fs_client = cast(FilesystemClient, client)
creds = DuckDbCredentials(duck)
sql_client = FilesystemSqlClient(
fs_client, dataset_name=fs_client.dataset_name, credentials=creds
)

# NOTE: we should probably have the option for the user to only select a subset of tables here
with sql_client as _:
sql_client.create_views_for_all_tables()
con = ibis.duckdb.from_connection(duck)

# NOTE: there seems to be no standardized way to set the current dataset / schema in ibis
con.raw_sql(f"SET search_path TO {dataset_name};")

return con
40 changes: 33 additions & 7 deletions dlt/destinations/dataset.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Any, Generator, Optional, Sequence, Union, List
from typing import Any, Generator, Optional, Sequence, Union, List, TYPE_CHECKING, cast
from dlt.common.json import json
from copy import deepcopy

from dlt.common.normalizers.naming.naming import NamingConvention
from dlt.common.exceptions import MissingDependencyException

from contextlib import contextmanager
from dlt.common.destination.reference import (
Expand All @@ -21,6 +22,14 @@
from dlt.common.schema import Schema
from dlt.common.exceptions import DltException

if TYPE_CHECKING:
try:
from dlt.common.libs.ibis import BaseBackend as IbisBackend
except MissingDependencyException:
IbisBackend = Any
else:
IbisBackend = Any


class DatasetException(DltException):
pass
Expand Down Expand Up @@ -228,6 +237,17 @@ def __init__(
self._sql_client: SqlClientBase[Any] = None
self._schema: Schema = None

def ibis(self) -> IbisBackend:
"""return a connected ibis backend"""
from dlt.common.libs.ibis import create_ibis_backend

self._ensure_client_and_schema()
return create_ibis_backend(
self._destination,
self._dataset_name,
self._destination_client(self.schema),
)

@property
def schema(self) -> Schema:
self._ensure_client_and_schema()
Expand All @@ -239,12 +259,7 @@ def sql_client(self) -> SqlClientBase[Any]:
return self._sql_client

def _destination_client(self, schema: Schema) -> JobClientBase:
client_spec = self._destination.spec()
if isinstance(client_spec, DestinationClientDwhConfiguration):
client_spec._bind_dataset_name(
dataset_name=self._dataset_name, default_schema_name=schema.name
)
return self._destination.client(schema, client_spec)
return _get_client_for_destination(self._destination, schema, self._dataset_name)

def _ensure_client_and_schema(self) -> None:
"""Lazy load schema and client"""
Expand Down Expand Up @@ -310,3 +325,14 @@ def dataset(
if dataset_type == "dbapi":
return ReadableDBAPIDataset(destination, dataset_name, schema)
raise NotImplementedError(f"Dataset of type {dataset_type} not implemented")


# helpers
def _get_client_for_destination(
destination: TDestinationReferenceArg, schema: Schema, dataset_name: str
) -> JobClientBase:
destination = Destination.from_reference(destination)
client_spec = destination.spec()
if isinstance(client_spec, DestinationClientDwhConfiguration):
client_spec._bind_dataset_name(dataset_name=dataset_name, default_schema_name=schema.name)
return destination.client(schema, client_spec)
6 changes: 2 additions & 4 deletions dlt/destinations/impl/duckdb/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,8 @@ def iter_arrow(self, chunk_size: int) -> Generator[ArrowTable, None, None]:
yield self.native_cursor.fetch_arrow_table()
return
# iterate
try:
yield from self.native_cursor.fetch_record_batch(chunk_size)
except StopIteration:
pass
for item in self.native_cursor.fetch_record_batch(chunk_size):
yield ArrowTable.from_batches([item])


class DuckDbSqlClient(SqlClientBase[duckdb.DuckDBPyConnection], DBTransaction):
Expand Down
3 changes: 3 additions & 0 deletions dlt/destinations/impl/filesystem/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ def open_connection(self) -> duckdb.DuckDBPyConnection:

return self._conn

def create_views_for_all_tables(self) -> None:
self.create_views_for_tables({v: v for v in self.fs_client.schema.tables.keys()})

@raise_database_error
def create_views_for_tables(self, tables: Dict[str, str]) -> None:
"""Add the required tables as views to the duckdb in memory instance"""
Expand Down
82 changes: 80 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ pytest-mock = "^3.14.0"
types-regex = "^2024.5.15.20240519"
flake8-print = "^5.0.0"
mimesis = "^7.0.0"
ibis-framework = { version = ">=9.0.0", markers = "python_version >= '3.10'"}

[tool.poetry.group.sources]
optional = true
Expand Down
2 changes: 1 addition & 1 deletion tests/destinations/test_readable_dbapi_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def test_computed_schema_columns() -> None:

# now add columns
relation = dataset.items
dataset.schema.tables["items"] = { # type: ignore[attr-defined]
dataset.schema.tables["items"] = {
"columns": {"one": {"data_type": "text"}, "two": {"data_type": "json"}}
}

Expand Down
41 changes: 40 additions & 1 deletion tests/load/test_read_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def double_items():
yield pipeline

# NOTE: we need to drop pipeline data here since we are keeping the pipelines around for the whole module
drop_pipeline_data(pipeline)
# drop_pipeline_data(pipeline)


# NOTE: we collect all destination configs centrally, this way the session based
Expand Down Expand Up @@ -251,6 +251,45 @@ def test_db_cursor_access(populated_pipeline: Pipeline) -> None:
assert set(ids) == set(range(total_records))


@pytest.mark.no_load
@pytest.mark.essential
@pytest.mark.parametrize(
"populated_pipeline",
configs,
indirect=True,
ids=lambda x: x.name,
)
def test_ibis_dataset_access(populated_pipeline: Pipeline) -> None:
# NOTE: we could generalize this with a context for certain deps
import subprocess

subprocess.check_call(["pip", "install", "ibis-framework[duckdb,postgres,bigquery]"])

from dlt.common.libs.ibis import SUPPORTED_DESTINATIONS

# check correct error if not supported
if populated_pipeline.destination.destination_type not in SUPPORTED_DESTINATIONS:
with pytest.raises(NotImplementedError):
populated_pipeline._dataset().ibis()
return

total_records = _total_records(populated_pipeline)
ibis_connection = populated_pipeline._dataset().ibis()

# just do a basic check to see wether ibis can connect
assert ibis_connection.list_tables() == [
"_dlt_loads",
"_dlt_pipeline_state",
"_dlt_version",
"double_items",
"items",
"items__children",
]

items_table = ibis_connection.table("items")
assert items_table.count().to_pandas() == total_records


@pytest.mark.no_load
@pytest.mark.essential
@pytest.mark.parametrize(
Expand Down
Loading