From 311a53108bc73bad9e3d1f9d0eb12543c58e2d2c Mon Sep 17 00:00:00 2001 From: dave Date: Thu, 31 Oct 2024 17:34:31 +0100 Subject: [PATCH] add support for filesystem --- dlt/destinations/dataset.py | 60 +++++++++++++------ .../impl/filesystem/sql_client.py | 5 ++ tests/load/test_read_interfaces.py | 1 + 3 files changed, 48 insertions(+), 18 deletions(-) diff --git a/dlt/destinations/dataset.py b/dlt/destinations/dataset.py index 5e24a0d9a3..3c53cbb2be 100644 --- a/dlt/destinations/dataset.py +++ b/dlt/destinations/dataset.py @@ -1,4 +1,4 @@ -from typing import Any, Generator, Optional, Sequence, Union, List, TYPE_CHECKING +from typing import Any, Generator, Optional, Sequence, Union, List, TYPE_CHECKING, cast from dlt.common.json import json from copy import deepcopy @@ -266,12 +266,7 @@ def sql_client(self) -> SqlClientBase[Any]: return self._sql_client def _destination_client(self, schema: Schema) -> JobClientBase: - client_spec = self._destination.spec() - if isinstance(client_spec, DestinationClientDwhConfiguration): - client_spec._bind_dataset_name( - dataset_name=self._dataset_name, default_schema_name=schema.name - ) - return self._destination.client(schema, client_spec) + return _get_client_for_destination(self._destination, schema, self._dataset_name) def _ensure_client_and_schema(self) -> None: """Lazy load schema and client""" @@ -339,27 +334,56 @@ def dataset( raise NotImplementedError(f"Dataset of type {dataset_type} not implemented") +from dlt.common.destination.reference import JobClientBase + + def create_ibis_backend( destination: TDestinationReferenceArg, dataset_name: str, schema: Schema ) -> IbisBackend: from dlt.common.libs.ibis import ibis + import duckdb + from dlt.destinations.impl.duckdb.factory import DuckDbCredentials - # TODO: abstract out destination client related stuff destination = Destination.from_reference(destination) - client_spec = destination.spec() - if isinstance(client_spec, DestinationClientDwhConfiguration): - client_spec._bind_dataset_name(dataset_name=dataset_name, default_schema_name=schema.name) - client = destination.client(schema, client_spec) + client = _get_client_for_destination(destination, schema, dataset_name) + + if destination.destination_type in ["dlt.destinations.postgres", "dlt.destinations.duckdb"]: + credentials = client.config.credentials.to_native_representation() + ibis = ibis.connect(credentials) + elif destination.destination_type == "dlt.destinations.filesystem": + from dlt.destinations.impl.filesystem.sql_client import ( + FilesystemClient, + FilesystemSqlClient, + ) + + # we create an in memory duckdb and create all tables on there + duck = duckdb.connect(":memory:") + fs_client = cast(FilesystemClient, client) + creds = DuckDbCredentials(duck) + sql_client = FilesystemSqlClient( + fs_client, dataset_name=fs_client.dataset_name, credentials=creds + ) + + # NOTE: we should probably have the option for the user to only select a subset of tables here + with sql_client as _: + sql_client.create_views_for_all_tables() + ibis = ibis.duckdb.from_connection(duck) - if destination.destination_type not in [ - "dlt.destinations.postgres", - "dlt.destinations.duckdb", - "dlt.destinations.filesystem", - ]: + else: raise NotImplementedError() - ibis = ibis.connect(client.config.credentials.to_native_representation()) # NOTE: there seems to be no standardized way to set the current dataset / schema in ibis ibis.raw_sql(f"SET search_path TO {dataset_name};") return ibis + + +# helpers +def _get_client_for_destination( + destination: TDestinationReferenceArg, schema: Schema, dataset_name: str +) -> JobClientBase: + destination = Destination.from_reference(destination) + client_spec = destination.spec() + if isinstance(client_spec, DestinationClientDwhConfiguration): + client_spec._bind_dataset_name(dataset_name=dataset_name, default_schema_name=schema.name) + return destination.client(schema, client_spec) diff --git a/dlt/destinations/impl/filesystem/sql_client.py b/dlt/destinations/impl/filesystem/sql_client.py index 64e76c96c8..fff4ef3846 100644 --- a/dlt/destinations/impl/filesystem/sql_client.py +++ b/dlt/destinations/impl/filesystem/sql_client.py @@ -171,6 +171,8 @@ def open_connection(self) -> duckdb.DuckDBPyConnection: # set up dataset if not self.has_dataset(): self.create_dataset() + print("CREATE") + print(self.fully_qualified_dataset_name()) self._conn.sql(f"USE {self.fully_qualified_dataset_name()}") # create authentication to data provider @@ -178,6 +180,9 @@ def open_connection(self) -> duckdb.DuckDBPyConnection: return self._conn + def create_views_for_all_tables(self) -> None: + self.create_views_for_tables({v: v for v in self.fs_client.schema.tables.keys()}) + @raise_database_error def create_views_for_tables(self, tables: Dict[str, str]) -> None: """Add the required tables as views to the duckdb in memory instance""" diff --git a/tests/load/test_read_interfaces.py b/tests/load/test_read_interfaces.py index 0fa1fba5af..6288e5ffcf 100644 --- a/tests/load/test_read_interfaces.py +++ b/tests/load/test_read_interfaces.py @@ -305,6 +305,7 @@ def test_ibis_dataset_access(populated_pipeline: Pipeline) -> None: total_records = _total_records(populated_pipeline) ibis_connection = populated_pipeline._ibis() + # just do a basic check to see wether ibis can connect assert ibis_connection.list_tables() == [ "_dlt_loads", "_dlt_pipeline_state",