Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

duckdb filesystem custom secrets #2017

Open
wants to merge 12 commits into
base: devel
Choose a base branch
from
8 changes: 4 additions & 4 deletions .github/workflows/test_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ jobs:
run: poetry install --no-interaction --with sentry-sdk

- run: |
poetry run pytest tests/common tests/normalize tests/reflection tests/load/test_dummy_client.py tests/extract/test_extract.py tests/extract/test_sources.py tests/pipeline/test_pipeline_state.py
poetry run pytest tests/common tests/normalize tests/reflection tests/load/test_dummy_client.py tests/extract/test_extract.py tests/extract/test_sources.py tests/pipeline/test_pipeline_state.py
if: runner.os != 'Windows'
name: Run common tests with minimum dependencies Linux/MAC
- run: |
Expand All @@ -103,14 +103,14 @@ jobs:
shell: cmd

- name: Install duckdb dependencies
run: poetry install --no-interaction -E duckdb --with sentry-sdk
run: poetry install --no-interaction -E duckdb -E filesystem --with sentry-sdk

- run: |
poetry run pytest tests/pipeline/test_pipeline.py tests/pipeline/test_import_export_schema.py
poetry run pytest tests/pipeline/test_pipeline.py tests/pipeline/test_import_export_schema.py tests/pipeline/test_filesystem_sql_secrets.py
if: runner.os != 'Windows'
name: Run pipeline smoke tests with minimum deps Linux/MAC
- run: |
poetry run pytest tests/pipeline/test_pipeline.py tests/pipeline/test_import_export_schema.py -m "not forked"
poetry run pytest tests/pipeline/test_pipeline.py tests/pipeline/test_import_export_schema.py tests/pipeline/test_filesystem_sql_secrets.py -m "not forked"
if: runner.os == 'Windows'
name: Run smoke tests with minimum deps Windows
shell: cmd
Expand Down
3 changes: 0 additions & 3 deletions .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ jobs:
- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- name: clear duckdb secrets and cache
run: rm -rf ~/.duckdb

- run: |
poetry run pytest tests/load --ignore tests/load/sources -m "essential"
name: Run essential tests Linux
Expand Down
3 changes: 0 additions & 3 deletions .github/workflows/test_pyarrow17.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ jobs:
- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- name: clear duckdb secrets and cache
run: rm -rf ~/.duckdb

- name: Run needspyarrow17 tests Linux
run: |
poetry run pytest tests/libs -m "needspyarrow17"
Expand Down
29 changes: 24 additions & 5 deletions dlt/destinations/impl/filesystem/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from contextlib import contextmanager

from dlt.common.destination.reference import DBApiCursor
from dlt.common.destination.typing import PreparedTableSchema

from dlt.destinations.sql_client import raise_database_error

Expand All @@ -25,6 +24,8 @@
)
from dlt.destinations.utils import is_compression_disabled

from pathlib import Path

SUPPORTED_PROTOCOLS = ["gs", "gcs", "s3", "file", "memory", "az", "abfss"]

if TYPE_CHECKING:
Expand Down Expand Up @@ -74,12 +75,32 @@ def drop_authentication(self, secret_name: str = None) -> None:
self._conn.sql(f"DROP PERSISTENT SECRET IF EXISTS {secret_name}")

def create_authentication(self, persistent: bool = False, secret_name: str = None) -> None:
# TODO: allow users to set explicit path on filesystem where secrets are stored
# https://duckdb.org/docs/configuration/secrets_manager.html#persistent-secrets
# home dir is a bad choice, it should be more explicit
if not secret_name:
secret_name = self._create_default_secret_name()

if persistent and self.memory_db:
raise Exception("Creating persistent secrets for in memory db is not allowed.")

secrets_path = Path(
self._conn.sql(
"SELECT current_setting('secret_directory') AS secret_directory;"
).fetchone()[0]
)

is_default_secrets_directory = (
len(secrets_path.parts) >= 2
and secrets_path.parts[-1] == "stored_secrets"
and secrets_path.parts[-2] == ".duckdb"
)

if is_default_secrets_directory and persistent:
logger.warn(
"You are persisting duckdb secrets but are storing them in the default folder"
f" {secrets_path}. These secrets are saved there unencrypted, we"
" recommend using a custom secret directory."
)

persistent_stmt = ""
if persistent:
persistent_stmt = " PERSISTENT "
Expand Down Expand Up @@ -172,8 +193,6 @@ def open_connection(self) -> duckdb.DuckDBPyConnection:
if not self.has_dataset():
self.create_dataset()
self._conn.sql(f"USE {self.fully_qualified_dataset_name()}")

# create authentication to data provider
self.create_authentication()

return self._conn
Expand Down
2 changes: 1 addition & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[pytest]
pythonpath= dlt docs/website/docs
norecursedirs= .direnv .eggs build dist
addopts= -v --showlocals --durations 10
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, somehow it does not work on ci...

addopts= -v --showlocals --durations 10 --capture=tee-sys
xfail_strict= true
log_cli= 1
log_cli_level= INFO
Expand Down
28 changes: 23 additions & 5 deletions tests/load/filesystem/test_sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import pytest
import dlt
import os
import shutil
import logging


from dlt import Pipeline
from dlt.common.utils import uniq_id
Expand All @@ -16,15 +19,24 @@
GCS_BUCKET,
SFTP_BUCKET,
MEMORY_BUCKET,
AWS_BUCKET,
)
from dlt.destinations import filesystem
from tests.utils import TEST_STORAGE_ROOT
from dlt.destinations.exceptions import DatabaseUndefinedRelation


@pytest.fixture(scope="function", autouse=True)
def secret_directory():
secrets_dir = f"{TEST_STORAGE_ROOT}/duck_secrets_{uniq_id()}"
yield secrets_dir
shutil.rmtree(secrets_dir, ignore_errors=True)


def _run_dataset_checks(
pipeline: Pipeline,
destination_config: DestinationTestConfiguration,
secret_directory: str,
table_format: Any = None,
alternate_access_pipeline: Pipeline = None,
) -> None:
Expand Down Expand Up @@ -131,6 +143,7 @@ def _external_duckdb_connection() -> duckdb.DuckDBPyConnection:
external_db = duckdb.connect(duck_db_location)
# the line below solves problems with certificate path lookup on linux, see duckdb docs
external_db.sql("SET azure_transport_option_type = 'curl';")
external_db.sql(f"SET secret_directory = '{secret_directory}';")
return external_db

def _fs_sql_client_for_external_db(
Expand Down Expand Up @@ -160,7 +173,7 @@ def _fs_sql_client_for_external_db(
assert len(external_db.sql("SELECT * FROM first.items").fetchall()) == 3
external_db.close()

# in case we are not connecting to a bucket, views should still be here after connection reopen
# in case we are not connecting to a bucket that needs secrets, views should still be here after connection reopen
if not needs_persistent_secrets and not unsupported_persistent_secrets:
external_db = _external_duckdb_connection()
assert (
Expand Down Expand Up @@ -232,7 +245,9 @@ def _fs_sql_client_for_external_db(
)
@pytest.mark.parametrize("disable_compression", [True, False])
def test_read_interfaces_filesystem(
destination_config: DestinationTestConfiguration, disable_compression: bool
destination_config: DestinationTestConfiguration,
disable_compression: bool,
secret_directory: str,
) -> None:
# we force multiple files per table, they may only hold 700 items
os.environ["DATA_WRITER__FILE_MAX_ITEMS"] = "700"
Expand All @@ -250,7 +265,7 @@ def test_read_interfaces_filesystem(
dev_mode=True,
)

_run_dataset_checks(pipeline, destination_config)
_run_dataset_checks(pipeline, destination_config, secret_directory=secret_directory)

# for gcs buckets we additionally test the s3 compat layer
if destination_config.bucket_url == GCS_BUCKET:
Expand All @@ -260,7 +275,7 @@ def test_read_interfaces_filesystem(
pipeline = destination_config.setup_pipeline(
"read_pipeline", dataset_name="read_test", dev_mode=True, destination=gcp_bucket
)
_run_dataset_checks(pipeline, destination_config)
_run_dataset_checks(pipeline, destination_config, secret_directory=secret_directory)


@pytest.mark.essential
Expand All @@ -274,7 +289,9 @@ def test_read_interfaces_filesystem(
),
ids=lambda x: x.name,
)
def test_delta_tables(destination_config: DestinationTestConfiguration) -> None:
def test_delta_tables(
destination_config: DestinationTestConfiguration, secret_directory: str
) -> None:
os.environ["DATA_WRITER__FILE_MAX_ITEMS"] = "700"

pipeline = destination_config.setup_pipeline(
Expand All @@ -297,6 +314,7 @@ def test_delta_tables(destination_config: DestinationTestConfiguration) -> None:
_run_dataset_checks(
pipeline,
destination_config,
secret_directory=secret_directory,
table_format="delta",
alternate_access_pipeline=access_pipeline,
)
Expand Down
106 changes: 106 additions & 0 deletions tests/pipeline/test_filesystem_sql_secrets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from typing import Any

import pytest
import os

from tests.utils import TEST_STORAGE_ROOT
from tests.load.utils import (
destinations_configs,
DestinationTestConfiguration,
AWS_BUCKET,
)
from dlt.common.utils import uniq_id


@pytest.mark.essential
@pytest.mark.parametrize(
"destination_config",
destinations_configs(all_buckets_filesystem_configs=True, bucket_subset=(AWS_BUCKET,)),
ids=lambda x: x.name,
)
def test_secrets_management(
destination_config: DestinationTestConfiguration, capfd: pytest.CaptureFixture[Any]
) -> None:
"""Test the handling of secrets by the sql_client, we only need to do this on s3
as the other destinations work accordingly"""

# we can use fake keys
os.environ["DESTINATION__FILESYSTEM__CREDENTIALS__AWS_SECRET_ACCESS_KEY"] = "secret_key"
os.environ["DESTINATION__FILESYSTEM__CREDENTIALS__AWS_ACCESS_KEY_ID"] = "key"

warning_mesage = "You are persisting duckdb secrets but are storing them in the default folder"

pipeline = destination_config.setup_pipeline(
"read_pipeline",
dataset_name="read_test",
)

import duckdb
from duckdb import HTTPException
from dlt.destinations.impl.filesystem.sql_client import (
FilesystemSqlClient,
DuckDbCredentials,
)

duck_db_location = TEST_STORAGE_ROOT + "/" + uniq_id()
secrets_dir = f"{TEST_STORAGE_ROOT}/duck_secrets_{uniq_id()}"

def _external_duckdb_connection() -> duckdb.DuckDBPyConnection:
external_db = duckdb.connect(duck_db_location)
external_db.sql(f"SET secret_directory = '{secrets_dir}';")
external_db.execute("CREATE SCHEMA IF NOT EXISTS first;")
return external_db

def _fs_sql_client_for_external_db(
connection: duckdb.DuckDBPyConnection,
) -> FilesystemSqlClient:
return FilesystemSqlClient(
dataset_name="second",
fs_client=pipeline.destination_client(), # type: ignore
credentials=DuckDbCredentials(connection),
)

def _secrets_exist() -> bool:
return os.path.isdir(secrets_dir) and len(os.listdir(secrets_dir)) > 0

# first test what happens if there are no external secrets
external_db = _external_duckdb_connection()
fs_sql_client = _fs_sql_client_for_external_db(external_db)
with fs_sql_client as sql_client:
sql_client.create_views_for_tables({"items": "items"})
external_db.close()
assert not _secrets_exist()

# add secrets and check that they are there
external_db = _external_duckdb_connection()
fs_sql_client = _fs_sql_client_for_external_db(external_db)
with fs_sql_client as sql_client:
fs_sql_client.create_authentication(persistent=True)
assert _secrets_exist()

# remove secrets and check that they are removed
with fs_sql_client as sql_client:
fs_sql_client.drop_authentication()
assert not _secrets_exist()
external_db.close()

# prevent creating persistent secrets on in mem databases
fs_sql_client = FilesystemSqlClient(
dataset_name="second",
fs_client=pipeline.destination_client(), # type: ignore
)
with pytest.raises(Exception):
with fs_sql_client as sql_client:
fs_sql_client.create_authentication(persistent=True)

# check that no warning was logged
assert warning_mesage not in capfd.readouterr().err

# check that warning is logged when secrets are persisted in the default folder
duck_db_location = TEST_STORAGE_ROOT + "/" + uniq_id()
secrets_dir = f"{TEST_STORAGE_ROOT}/duck_secrets_{uniq_id()}"
duck_db = duckdb.connect(duck_db_location)
fs_sql_client = _fs_sql_client_for_external_db(duck_db)
with fs_sql_client as sql_client:
sql_client.create_authentication(persistent=True)
assert warning_mesage in capfd.readouterr().err
Loading