Skip to content

Commit

Permalink
add support for filesystem
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Nov 3, 2024
1 parent d369dac commit 311a531
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 18 deletions.
60 changes: 42 additions & 18 deletions dlt/destinations/dataset.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Generator, Optional, Sequence, Union, List, TYPE_CHECKING
from typing import Any, Generator, Optional, Sequence, Union, List, TYPE_CHECKING, cast
from dlt.common.json import json
from copy import deepcopy

Expand Down Expand Up @@ -266,12 +266,7 @@ def sql_client(self) -> SqlClientBase[Any]:
return self._sql_client

def _destination_client(self, schema: Schema) -> JobClientBase:
client_spec = self._destination.spec()
if isinstance(client_spec, DestinationClientDwhConfiguration):
client_spec._bind_dataset_name(
dataset_name=self._dataset_name, default_schema_name=schema.name
)
return self._destination.client(schema, client_spec)
return _get_client_for_destination(self._destination, schema, self._dataset_name)

def _ensure_client_and_schema(self) -> None:
"""Lazy load schema and client"""
Expand Down Expand Up @@ -339,27 +334,56 @@ def dataset(
raise NotImplementedError(f"Dataset of type {dataset_type} not implemented")


from dlt.common.destination.reference import JobClientBase


def create_ibis_backend(
destination: TDestinationReferenceArg, dataset_name: str, schema: Schema
) -> IbisBackend:
from dlt.common.libs.ibis import ibis
import duckdb
from dlt.destinations.impl.duckdb.factory import DuckDbCredentials

# TODO: abstract out destination client related stuff
destination = Destination.from_reference(destination)
client_spec = destination.spec()
if isinstance(client_spec, DestinationClientDwhConfiguration):
client_spec._bind_dataset_name(dataset_name=dataset_name, default_schema_name=schema.name)
client = destination.client(schema, client_spec)
client = _get_client_for_destination(destination, schema, dataset_name)

if destination.destination_type in ["dlt.destinations.postgres", "dlt.destinations.duckdb"]:
credentials = client.config.credentials.to_native_representation()
ibis = ibis.connect(credentials)
elif destination.destination_type == "dlt.destinations.filesystem":
from dlt.destinations.impl.filesystem.sql_client import (
FilesystemClient,
FilesystemSqlClient,
)

# we create an in memory duckdb and create all tables on there
duck = duckdb.connect(":memory:")
fs_client = cast(FilesystemClient, client)
creds = DuckDbCredentials(duck)
sql_client = FilesystemSqlClient(
fs_client, dataset_name=fs_client.dataset_name, credentials=creds
)

# NOTE: we should probably have the option for the user to only select a subset of tables here
with sql_client as _:
sql_client.create_views_for_all_tables()
ibis = ibis.duckdb.from_connection(duck)

if destination.destination_type not in [
"dlt.destinations.postgres",
"dlt.destinations.duckdb",
"dlt.destinations.filesystem",
]:
else:
raise NotImplementedError()

ibis = ibis.connect(client.config.credentials.to_native_representation())
# NOTE: there seems to be no standardized way to set the current dataset / schema in ibis
ibis.raw_sql(f"SET search_path TO {dataset_name};")

return ibis


# helpers
def _get_client_for_destination(
destination: TDestinationReferenceArg, schema: Schema, dataset_name: str
) -> JobClientBase:
destination = Destination.from_reference(destination)
client_spec = destination.spec()
if isinstance(client_spec, DestinationClientDwhConfiguration):
client_spec._bind_dataset_name(dataset_name=dataset_name, default_schema_name=schema.name)
return destination.client(schema, client_spec)
5 changes: 5 additions & 0 deletions dlt/destinations/impl/filesystem/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,18 @@ def open_connection(self) -> duckdb.DuckDBPyConnection:
# set up dataset
if not self.has_dataset():
self.create_dataset()
print("CREATE")
print(self.fully_qualified_dataset_name())
self._conn.sql(f"USE {self.fully_qualified_dataset_name()}")

# create authentication to data provider
self.create_authentication()

return self._conn

def create_views_for_all_tables(self) -> None:
self.create_views_for_tables({v: v for v in self.fs_client.schema.tables.keys()})

@raise_database_error
def create_views_for_tables(self, tables: Dict[str, str]) -> None:
"""Add the required tables as views to the duckdb in memory instance"""
Expand Down
1 change: 1 addition & 0 deletions tests/load/test_read_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ def test_ibis_dataset_access(populated_pipeline: Pipeline) -> None:
total_records = _total_records(populated_pipeline)
ibis_connection = populated_pipeline._ibis()

# just do a basic check to see wether ibis can connect
assert ibis_connection.list_tables() == [
"_dlt_loads",
"_dlt_pipeline_state",
Expand Down

0 comments on commit 311a531

Please sign in to comment.