Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an option to load SQL queries from a file for SQLQueryDataSet #887

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
dc8a7e7
creating sql_dataset branch
BenjaminLevyQB Sep 9, 2021
ad95db3
Merge branch 'master' into feature/sql-dataset-filepath
BenjaminLevyQB Sep 9, 2021
dd425af
Update kedro/extras/datasets/pandas/sql_dataset.py
BenjaminLevyQB Sep 10, 2021
7fce17a
improving desc for sql dataset
BenjaminLevyQB Sep 10, 2021
f5c95b8
Merge branch 'feature/sql-dataset-filepath' of https://github.com/Ben…
BenjaminLevyQB Sep 10, 2021
68e734e
Updating RELEASE.md
BenjaminLevyQB Sep 10, 2021
19ab35a
Merge branch 'master' into feature/sql-dataset-filepath
BenjaminLevyQB Oct 12, 2021
fdfa53e
Update RELEASE.md
BenjaminLevyQB Oct 13, 2021
53dc89e
Update RELEASE.md
BenjaminLevyQB Oct 13, 2021
a64089b
changed dataset to raise error if both `sql` and `filepath` are provided
BenjaminLevyQB Oct 13, 2021
862fafc
moved `filepath` argument so that this is no longer a breaking change
BenjaminLevyQB Oct 13, 2021
ebe91d6
Merge branch 'master' of https://github.com/quantumblacklabs/kedro in…
BenjaminLevyQB Oct 13, 2021
6376fc2
Merge branch 'master' into feature/sql-dataset-filepath
BenjaminLevyQB Oct 19, 2021
6b48a3e
resolving small nits from PR conversation
BenjaminLevyQB Oct 25, 2021
0d7f627
Merge branch 'feature/sql-dataset-filepath' of https://github.com/Ben…
BenjaminLevyQB Oct 25, 2021
ae65d53
Merge branch 'master' into feature/sql-dataset-filepath
BenjaminLevyQB Oct 25, 2021
7c507b9
Update kedro/extras/datasets/pandas/sql_dataset.py
antonymilne Oct 25, 2021
d36d998
Update kedro/extras/datasets/pandas/sql_dataset.py
antonymilne Oct 25, 2021
c8cabce
Merge branch 'master' into feature/sql-dataset-filepath
antonymilne Oct 25, 2021
a90a1de
Update tests/extras/datasets/pandas/test_sql_dataset.py
antonymilne Oct 25, 2021
6979f25
Merge branch 'master' into feature/sql-dataset-filepath
antonymilne Oct 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
* Bumped minimum required `fsspec` version to 2021.04.
* Fixed the `kedro install` and `kedro build-reqs` flows when uninstalled dependencies are present in a project's `settings.py`, `context.py` or `hooks.py` ([Issue #829](https://github.com/quantumblacklabs/kedro/issues/829)).
* Imports are now refactored at `kedro pipeline package` and `kedro pipeline pull` time, so that _aliasing_ a modular pipeline doesn't break it.
* Added option to `pandas.SQLQueryDataSet` to specify a `filepath` with a SQL query, in addition to the current method of supplying the query itself in the `sql` argument.

## Minor breaking changes to the API
* Pinned `dynaconf` to `<3.1.6` because the method signature for `_validate_items` changed which is used in Kedro.

## Upcoming deprecations for Kedro 0.18.0
Expand All @@ -49,7 +52,8 @@

## Thanks for supporting contributions
[Moussa Taifi](https://github.com/moutai),
[Deepyaman Datta](https://github.com/deepyaman)
[Deepyaman Datta](https://github.com/deepyaman),
[Benjamin Levy](https://github.com/BenjaminLevyQB)

# Release 0.17.4

Expand Down
76 changes: 67 additions & 9 deletions kedro/extras/datasets/pandas/sql_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,20 @@

import copy
import re
from pathlib import PurePosixPath
from typing import Any, Dict, Optional

import fsspec
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.exc import NoSuchModuleError

from kedro.io.core import AbstractDataSet, DataSetError
from kedro.io.core import (
AbstractDataSet,
DataSetError,
get_filepath_str,
get_protocol_and_path,
)

__all__ = ["SQLTableDataSet", "SQLQueryDataSet"]

Expand Down Expand Up @@ -278,8 +285,13 @@ class SQLQueryDataSet(AbstractDataSet):

"""

def __init__(
self, sql: str, credentials: Dict[str, Any], load_args: Dict[str, Any] = None
def __init__( # pylint: disable=too-many-arguments
self,
sql: str = None,
credentials: Dict[str, Any] = None,
BenjaminLevyQB marked this conversation as resolved.
Show resolved Hide resolved
load_args: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
filepath: str = None,
) -> None:
"""Creates a new ``SQLQueryDataSet``.

Expand All @@ -297,14 +309,29 @@ def __init__(
https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_sql_query.html
To find all supported connection string formats, see here:
https://docs.sqlalchemy.org/en/13/core/engines.html#database-urls
fs_args: Extra arguments to pass into underlying filesystem class constructor
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``), as well as
to pass to the filesystem's `open` method through nested keys
`open_args_load` and `open_args_save`.
Here you can find all available arguments for `open`:
https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open
All defaults are preserved, except `mode`, which is set to `r` when loading
and to `w` when saving.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
and to `w` when saving.
and to `w` when saving.

This can be removed, we don't actually set mode to w ourselves anymore.

filepath: A path to a file with a sql query statement.

Raises:
DataSetError: When either ``sql`` or ``con`` parameters is emtpy.
antonymilne marked this conversation as resolved.
Show resolved Hide resolved
"""
if sql and filepath:
raise DataSetError(
"`sql` and `filepath` arguments cannot both be provided."
"Please only provide one."
)
BenjaminLevyQB marked this conversation as resolved.
Show resolved Hide resolved

if not sql:
if not (sql or filepath):
raise DataSetError(
"`sql` argument cannot be empty. Please provide a sql query"
"`sql` and `filepath` arguments cannot both be empty."
"Please provide a sql query or path to a sql query file."
)

if not (credentials and "con" in credentials and credentials["con"]):
Expand All @@ -321,18 +348,49 @@ def __init__(
else default_load_args
)

self._load_args["sql"] = sql
# load sql query from file
if not sql:
BenjaminLevyQB marked this conversation as resolved.
Show resolved Hide resolved
# filesystem for loading sql file
_fs_args = copy.deepcopy(fs_args) or {}
_fs_open_args_load = _fs_args.pop("open_args_load", {})
datajoely marked this conversation as resolved.
Show resolved Hide resolved
_fs_credentials = _fs_args.pop("credentials", {})
protocol, path = get_protocol_and_path(str(filepath))

self._protocol = protocol
self._fs = fsspec.filesystem(self._protocol, **_fs_credentials, **_fs_args)

_fs_open_args_load.setdefault("mode", "r")
self._fs_open_args_load = _fs_open_args_load

self._load_args["filepath"] = path
BenjaminLevyQB marked this conversation as resolved.
Show resolved Hide resolved
else:
self._load_args["sql"] = sql
self._load_args["con"] = credentials["con"]

def _describe(self) -> Dict[str, Any]:
BenjaminLevyQB marked this conversation as resolved.
Show resolved Hide resolved
load_args = self._load_args.copy()
del load_args["sql"]
desc = {}
if "sql" in load_args:
desc["sql"] = load_args.pop("sql")
if "filepath" in load_args:
desc["filepath"] = str(load_args.pop("filepath"))
BenjaminLevyQB marked this conversation as resolved.
Show resolved Hide resolved
del load_args["con"]
return dict(sql=self._load_args["sql"], load_args=load_args)
desc["load_args"] = load_args

return desc

def _load(self) -> pd.DataFrame:
load_args = self._load_args.copy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this return a shallow copy instead of deepcopy? I think the latter is the one we want for dictionaries generally. 🤔


if "sql" not in load_args:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why not if "filepath" in load_args? 😅 Or if self._filepath if we go with the previous suggestion.

filepath = load_args.pop("filepath")
load_path = get_filepath_str(PurePosixPath(filepath), self._protocol)
BenjaminLevyQB marked this conversation as resolved.
Show resolved Hide resolved

with self._fs.open(load_path, **self._fs_open_args_load) as fs_file:
load_args["sql"] = fs_file.read()

try:
return pd.read_sql_query(**self._load_args)
return pd.read_sql_query(**load_args)
except ImportError as import_error:
raise _get_missing_module_error(import_error) from import_error
except NoSuchModuleError as exc:
Expand Down
40 changes: 36 additions & 4 deletions tests/extras/datasets/pandas/test_sql_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ def dummy_dataframe():
return pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})


@pytest.fixture
def sql_file(tmp_path):
file = tmp_path / "test.sql"
with file.open("w") as f:
f.write(SQL_QUERY)
BenjaminLevyQB marked this conversation as resolved.
Show resolved Hide resolved
return file.as_posix()


@pytest.fixture(params=[{}])
def table_data_set(request):
kwargs = dict(table_name=TABLE_NAME, credentials=dict(con=CONNECTION))
Expand All @@ -65,6 +73,13 @@ def query_data_set(request):
return SQLQueryDataSet(**kwargs)


@pytest.fixture(params=[{}])
def query_file_data_set(request, sql_file):
kwargs = dict(filepath=sql_file, credentials=dict(con=CONNECTION))
kwargs.update(request.param)
return SQLQueryDataSet(**kwargs)


class TestSQLTableDataSetLoad:
@staticmethod
def _assert_pd_called_once():
Expand Down Expand Up @@ -244,10 +259,13 @@ def _assert_pd_called_once():
_callable.assert_called_once_with(sql=SQL_QUERY, con=CONNECTION)

def test_empty_query_error(self):
"""Check the error when instantiating with empty query"""
pattern = r"`sql` argument cannot be empty\. Please provide a sql query"
"""Check the error when instantiating with empty query or file"""
pattern = (
r"`sql` and `filepath` arguments cannot both be empty\."
r"Please provide a sql query or path to a sql query file\."
)
with pytest.raises(DataSetError, match=pattern):
SQLQueryDataSet(sql="", credentials=dict(con=CONNECTION))
SQLQueryDataSet(sql="", filepath="", credentials=dict(con=CONNECTION))

def test_empty_con_error(self):
"""Check the error when instantiating with empty connection string"""
Expand All @@ -264,6 +282,12 @@ def test_load(self, mocker, query_data_set):
query_data_set.load()
self._assert_pd_called_once()

def test_load_query_file(self, mocker, query_file_data_set):
"""Test `load` method with a query file"""
mocker.patch("pandas.read_sql_query")
query_file_data_set.load()
self._assert_pd_called_once()

def test_load_driver_missing(self, mocker, query_data_set):
"""Test that if an unknown module/driver is encountered by SQLAlchemy
then the error should contain the original error message"""
Expand Down Expand Up @@ -306,8 +330,16 @@ def test_save_error(self, query_data_set, dummy_dataframe):
with pytest.raises(DataSetError, match=pattern):
query_data_set.save(dummy_dataframe)

def test_str_representation_sql(self, query_data_set):
def test_str_representation_sql(self, query_data_set, sql_file):
"""Test the data set instance string representation"""
str_repr = str(query_data_set)
assert f"SQLQueryDataSet(load_args={{}}, sql={SQL_QUERY})" in str_repr
assert CONNECTION not in str_repr
assert sql_file not in str_repr

def test_str_representation_filepath(self, query_file_data_set, sql_file):
"""Test the data set instance string representation with filepath arg."""
str_repr = str(query_file_data_set)
assert f"SQLQueryDataSet(filepath={str(sql_file)}, load_args={{}}" in str_repr
assert CONNECTION not in str_repr
assert SQL_QUERY not in str_repr