diff --git a/dlt/common/libs/ibis.py b/dlt/common/libs/ibis.py index 397489c036..3e4221bb52 100644 --- a/dlt/common/libs/ibis.py +++ b/dlt/common/libs/ibis.py @@ -5,5 +5,6 @@ import ibis # type: ignore[import-untyped] from ibis import Table from ibis import memtable + from ibis import BaseBackend except ModuleNotFoundError: raise MissingDependencyException("dlt ibis Helpers", ["ibis"]) diff --git a/dlt/destinations/dataset.py b/dlt/destinations/dataset.py index 0c2f8f86ea..27c5989e5b 100644 --- a/dlt/destinations/dataset.py +++ b/dlt/destinations/dataset.py @@ -25,10 +25,13 @@ if TYPE_CHECKING: try: from dlt.common.libs.ibis import Table as IbisTable + from dlt.common.libs.ibis import BaseBackend as IbisBackend except MissingDependencyException: IbisTable = Any + IbisBackend = Any else: IbisTable = Any + IbisBackend = Any class DatasetException(DltException): @@ -319,3 +322,29 @@ def dataset( if dataset_type == "dbapi": return ReadableDBAPIDataset(destination, dataset_name, schema) raise NotImplementedError(f"Dataset of type {dataset_type} not implemented") + + +def create_ibis_backend( + destination: TDestinationReferenceArg, dataset_name: str, schema: Schema +) -> IbisBackend: + from dlt.common.libs.ibis import ibis + + # TODO: abstract out destination client related stuff + destination = Destination.from_reference(destination) + client_spec = destination.spec() + if isinstance(client_spec, DestinationClientDwhConfiguration): + client_spec._bind_dataset_name(dataset_name=dataset_name, default_schema_name=schema.name) + client = destination.client(schema, client_spec) + + if destination.destination_type not in [ + "dlt.destinations.postgres", + "dlt.destinations.duckdb", + "dlt.destinations.filesystem", + ]: + raise NotImplementedError() + + ibis = ibis.connect(client.config.credentials.to_native_representation()) + # NOTE: there seems to be no standardized way to set the current dataset / schema in ibis + ibis.raw_sql(f"SET search_path TO {dataset_name};") + + return ibis diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 7c13d76e26..26f3ad7715 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -110,7 +110,7 @@ from dlt.destinations.sql_client import SqlClientBase, WithSqlClient from dlt.destinations.fs_client import FSClientBase from dlt.destinations.job_client_impl import SqlJobClientBase -from dlt.destinations.dataset import dataset +from dlt.destinations.dataset import dataset, create_ibis_backend from dlt.load.configuration import LoaderConfiguration from dlt.load import Load @@ -149,6 +149,11 @@ from dlt.common.storages.load_package import TLoadPackageState from dlt.pipeline.helpers import refresh_source +try: + from dlt.common.libs.ibis import BaseBackend as IbisBackend +except MissingDependencyException: + IbisBackend = None + def with_state_sync(may_extract_state: bool = False) -> Callable[[TFun], TFun]: def decorator(f: TFun) -> TFun: @@ -1784,3 +1789,11 @@ def _dataset(self, dataset_type: TDatasetType = "dbapi") -> SupportsReadableData schema=(self.default_schema if self.default_schema_name else None), dataset_type=dataset_type, ) + + def _ibis(self) -> IbisBackend: + """return a connected ibis backend""" + return create_ibis_backend( + self.destination, + self.dataset_name, + schema=(self.default_schema if self.default_schema_name else None), + ) diff --git a/tests/load/test_read_interfaces.py b/tests/load/test_read_interfaces.py index 1aab6a06d1..b5b0b662ac 100644 --- a/tests/load/test_read_interfaces.py +++ b/tests/load/test_read_interfaces.py @@ -132,7 +132,7 @@ def double_items(): yield pipeline # NOTE: we need to drop pipeline data here since we are keeping the pipelines around for the whole module - drop_pipeline_data(pipeline) + # drop_pipeline_data(pipeline) # NOTE: we collect all destination configs centrally, this way the session based @@ -260,7 +260,7 @@ def test_db_cursor_access(populated_pipeline: Pipeline) -> None: indirect=True, ids=lambda x: x.name, ) -def test_ibis_access(populated_pipeline: Pipeline) -> None: +def test_ibis_tables_access(populated_pipeline: Pipeline) -> None: table_relationship = populated_pipeline._dataset().items total_records = _total_records(populated_pipeline) chunk_size = _chunk_size(populated_pipeline) @@ -284,6 +284,31 @@ def test_ibis_access(populated_pipeline: Pipeline) -> None: assert set(ids) == set(range(total_records)) +@pytest.mark.no_load +@pytest.mark.essential +@pytest.mark.parametrize( + "populated_pipeline", + configs, + indirect=True, + ids=lambda x: x.name, +) +def test_ibis_dataset_access(populated_pipeline: Pipeline) -> None: + total_records = _total_records(populated_pipeline) + ibis_connection = populated_pipeline._ibis() + + assert ibis_connection.list_tables() == [ + "_dlt_loads", + "_dlt_pipeline_state", + "_dlt_version", + "double_items", + "items", + "items__children", + ] + + items_table = ibis_connection.table("items") + assert items_table.count().to_pandas() == total_records + + @pytest.mark.no_load @pytest.mark.essential @pytest.mark.parametrize(