Skip to content

Commit

Permalink
add PoC for exposing an ibis backend for a destination
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Oct 31, 2024
1 parent 4bf54be commit 5befef9
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 3 deletions.
1 change: 1 addition & 0 deletions dlt/common/libs/ibis.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
import ibis # type: ignore[import-untyped]
from ibis import Table
from ibis import memtable
from ibis import BaseBackend
except ModuleNotFoundError:
raise MissingDependencyException("dlt ibis Helpers", ["ibis"])
29 changes: 29 additions & 0 deletions dlt/destinations/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
if TYPE_CHECKING:
try:
from dlt.common.libs.ibis import Table as IbisTable
from dlt.common.libs.ibis import BaseBackend as IbisBackend
except MissingDependencyException:
IbisTable = Any
IbisBackend = Any
else:
IbisTable = Any
IbisBackend = Any


class DatasetException(DltException):
Expand Down Expand Up @@ -319,3 +322,29 @@ def dataset(
if dataset_type == "dbapi":
return ReadableDBAPIDataset(destination, dataset_name, schema)
raise NotImplementedError(f"Dataset of type {dataset_type} not implemented")


def create_ibis_backend(
destination: TDestinationReferenceArg, dataset_name: str, schema: Schema
) -> IbisBackend:
from dlt.common.libs.ibis import ibis

# TODO: abstract out destination client related stuff
destination = Destination.from_reference(destination)
client_spec = destination.spec()
if isinstance(client_spec, DestinationClientDwhConfiguration):
client_spec._bind_dataset_name(dataset_name=dataset_name, default_schema_name=schema.name)
client = destination.client(schema, client_spec)

if destination.destination_type not in [
"dlt.destinations.postgres",
"dlt.destinations.duckdb",
"dlt.destinations.filesystem",
]:
raise NotImplementedError()

ibis = ibis.connect(client.config.credentials.to_native_representation())
# NOTE: there seems to be no standardized way to set the current dataset / schema in ibis
ibis.raw_sql(f"SET search_path TO {dataset_name};")

return ibis
15 changes: 14 additions & 1 deletion dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
from dlt.destinations.sql_client import SqlClientBase, WithSqlClient
from dlt.destinations.fs_client import FSClientBase
from dlt.destinations.job_client_impl import SqlJobClientBase
from dlt.destinations.dataset import dataset
from dlt.destinations.dataset import dataset, create_ibis_backend
from dlt.load.configuration import LoaderConfiguration
from dlt.load import Load

Expand Down Expand Up @@ -149,6 +149,11 @@
from dlt.common.storages.load_package import TLoadPackageState
from dlt.pipeline.helpers import refresh_source

try:
from dlt.common.libs.ibis import BaseBackend as IbisBackend
except MissingDependencyException:
IbisBackend = None


def with_state_sync(may_extract_state: bool = False) -> Callable[[TFun], TFun]:
def decorator(f: TFun) -> TFun:
Expand Down Expand Up @@ -1784,3 +1789,11 @@ def _dataset(self, dataset_type: TDatasetType = "dbapi") -> SupportsReadableData
schema=(self.default_schema if self.default_schema_name else None),
dataset_type=dataset_type,
)

def _ibis(self) -> IbisBackend:
"""return a connected ibis backend"""
return create_ibis_backend(
self.destination,
self.dataset_name,
schema=(self.default_schema if self.default_schema_name else None),
)
29 changes: 27 additions & 2 deletions tests/load/test_read_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def double_items():
yield pipeline

# NOTE: we need to drop pipeline data here since we are keeping the pipelines around for the whole module
drop_pipeline_data(pipeline)
# drop_pipeline_data(pipeline)


# NOTE: we collect all destination configs centrally, this way the session based
Expand Down Expand Up @@ -260,7 +260,7 @@ def test_db_cursor_access(populated_pipeline: Pipeline) -> None:
indirect=True,
ids=lambda x: x.name,
)
def test_ibis_access(populated_pipeline: Pipeline) -> None:
def test_ibis_tables_access(populated_pipeline: Pipeline) -> None:
table_relationship = populated_pipeline._dataset().items
total_records = _total_records(populated_pipeline)
chunk_size = _chunk_size(populated_pipeline)
Expand All @@ -284,6 +284,31 @@ def test_ibis_access(populated_pipeline: Pipeline) -> None:
assert set(ids) == set(range(total_records))


@pytest.mark.no_load
@pytest.mark.essential
@pytest.mark.parametrize(
"populated_pipeline",
configs,
indirect=True,
ids=lambda x: x.name,
)
def test_ibis_dataset_access(populated_pipeline: Pipeline) -> None:
total_records = _total_records(populated_pipeline)
ibis_connection = populated_pipeline._ibis()

assert ibis_connection.list_tables() == [
"_dlt_loads",
"_dlt_pipeline_state",
"_dlt_version",
"double_items",
"items",
"items__children",
]

items_table = ibis_connection.table("items")
assert items_table.count().to_pandas() == total_records


@pytest.mark.no_load
@pytest.mark.essential
@pytest.mark.parametrize(
Expand Down

0 comments on commit 5befef9

Please sign in to comment.