diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index d4584f299..109d4e6fe 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -13,6 +13,8 @@ | `plotly.HTMLDataset` | A dataset for saving a `plotly` figure as HTML | `kedro_datasets.plotly` | ## Bug fixes and other changes +* Refactored all datasets to set `fs_args` defaults in the same way as `load_args` and `save_args` and not have hardcoded values in the save methods. + ## Breaking Changes ## Community contributions Many thanks to the following Kedroids for contributing PRs to this release: diff --git a/kedro-datasets/kedro_datasets/biosequence/biosequence_dataset.py b/kedro-datasets/kedro_datasets/biosequence/biosequence_dataset.py index bba06f185..6c44d3d19 100644 --- a/kedro-datasets/kedro_datasets/biosequence/biosequence_dataset.py +++ b/kedro-datasets/kedro_datasets/biosequence/biosequence_dataset.py @@ -44,6 +44,10 @@ class BioSequenceDataset(AbstractDataset[list, list]): DEFAULT_LOAD_ARGS: dict[str, Any] = {} DEFAULT_SAVE_ARGS: dict[str, Any] = {} + DEFAULT_FS_ARGS: dict[str, Any] = { + "open_args_save": {"mode": "w"}, + "open_args_load": {"mode": "r"}, + } def __init__( # noqa: PLR0913 self, @@ -96,18 +100,17 @@ def __init__( # noqa: PLR0913 self._fs = fsspec.filesystem(self._protocol, **_credentials, **_fs_args) - # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) - - _fs_open_args_load.setdefault("mode", "r") - _fs_open_args_save.setdefault("mode", "w") - self._fs_open_args_load = _fs_open_args_load - self._fs_open_args_save = _fs_open_args_save + # Handle default load and save and fs arguments + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } self.metadata = metadata diff --git a/kedro-datasets/kedro_datasets/dask/csv_dataset.py b/kedro-datasets/kedro_datasets/dask/csv_dataset.py index 0e02f6ade..faee81a01 100644 --- a/kedro-datasets/kedro_datasets/dask/csv_dataset.py +++ b/kedro-datasets/kedro_datasets/dask/csv_dataset.py @@ -84,12 +84,8 @@ def __init__( # noqa: PLR0913 self.metadata = metadata # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} @property def fs_args(self) -> dict[str, Any]: diff --git a/kedro-datasets/kedro_datasets/dask/parquet_dataset.py b/kedro-datasets/kedro_datasets/dask/parquet_dataset.py index c75d067aa..402e97a27 100644 --- a/kedro-datasets/kedro_datasets/dask/parquet_dataset.py +++ b/kedro-datasets/kedro_datasets/dask/parquet_dataset.py @@ -114,12 +114,8 @@ def __init__( # noqa: PLR0913 self.metadata = metadata # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} @property def fs_args(self) -> dict[str, Any]: diff --git a/kedro-datasets/kedro_datasets/email/message_dataset.py b/kedro-datasets/kedro_datasets/email/message_dataset.py index df60a3c2a..97c3b203f 100644 --- a/kedro-datasets/kedro_datasets/email/message_dataset.py +++ b/kedro-datasets/kedro_datasets/email/message_dataset.py @@ -55,6 +55,10 @@ class EmailMessageDataset(AbstractVersionedDataset[Message, Message]): DEFAULT_LOAD_ARGS: dict[str, Any] = {} DEFAULT_SAVE_ARGS: dict[str, Any] = {} + DEFAULT_FS_ARGS: dict[str, Any] = { + "open_args_save": {"mode": "w"}, + "open_args_load": {"mode": "r"}, + } def __init__( # noqa: PLR0913 self, @@ -129,22 +133,21 @@ def __init__( # noqa: PLR0913 glob_function=self._fs.glob, ) - # Handle default load arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) + # Handle default load and save and fs arguments + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} self._parser_args = self._load_args.pop("parser", {"policy": default}) - # Handle default save arguments - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} self._generator_args = self._save_args.pop("generator", {}) - _fs_open_args_load.setdefault("mode", "r") - _fs_open_args_save.setdefault("mode", "w") - self._fs_open_args_load = _fs_open_args_load - self._fs_open_args_save = _fs_open_args_save + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } def _describe(self) -> dict[str, Any]: return { diff --git a/kedro-datasets/kedro_datasets/holoviews/holoviews_writer.py b/kedro-datasets/kedro_datasets/holoviews/holoviews_writer.py index 97e6446a9..d2c310df4 100644 --- a/kedro-datasets/kedro_datasets/holoviews/holoviews_writer.py +++ b/kedro-datasets/kedro_datasets/holoviews/holoviews_writer.py @@ -100,9 +100,7 @@ def __init__( # noqa: PLR0913 self._fs_open_args_save = _fs_open_args_save # Handle default save arguments - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} def _describe(self) -> dict[str, Any]: return { diff --git a/kedro-datasets/kedro_datasets/json/json_dataset.py b/kedro-datasets/kedro_datasets/json/json_dataset.py index cc882e75f..76f6e15d0 100644 --- a/kedro-datasets/kedro_datasets/json/json_dataset.py +++ b/kedro-datasets/kedro_datasets/json/json_dataset.py @@ -54,6 +54,7 @@ class JSONDataset(AbstractVersionedDataset[Any, Any]): """ DEFAULT_SAVE_ARGS: dict[str, Any] = {"indent": 2} + DEFAULT_FS_ARGS: dict[str, Any] = {"open_args_save": {"mode": "w"}} def __init__( # noqa: PLR0913 self, @@ -89,16 +90,15 @@ def __init__( # noqa: PLR0913 `open_args_load` and `open_args_save`. Here you can find all available arguments for `open`: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open - All defaults are preserved, except `mode`, which is set to `r` when loading - and to `w` when saving. + All defaults are preserved, except `mode`, which is set to `w` when saving. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ _fs_args = deepcopy(fs_args) or {} _fs_open_args_load = _fs_args.pop("open_args_load", {}) _fs_open_args_save = _fs_args.pop("open_args_save", {}) - _credentials = deepcopy(credentials) or {} + _credentials = deepcopy(credentials) or {} protocol, path = get_protocol_and_path(filepath, version) self._protocol = protocol @@ -115,14 +115,16 @@ def __init__( # noqa: PLR0913 glob_function=self._fs.glob, ) - # Handle default save arguments - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) - - _fs_open_args_save.setdefault("mode", "w") - self._fs_open_args_load = _fs_open_args_load - self._fs_open_args_save = _fs_open_args_save + # Handle default save and fs arguments + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } def _describe(self) -> dict[str, Any]: return { diff --git a/kedro-datasets/kedro_datasets/matlab/matlab_dataset.py b/kedro-datasets/kedro_datasets/matlab/matlab_dataset.py index a74d74209..916f5e940 100644 --- a/kedro-datasets/kedro_datasets/matlab/matlab_dataset.py +++ b/kedro-datasets/kedro_datasets/matlab/matlab_dataset.py @@ -53,6 +53,7 @@ class MatlabDataset(AbstractVersionedDataset[np.ndarray, np.ndarray]): """ DEFAULT_SAVE_ARGS: dict[str, Any] = {"indent": 2} + DEFAULT_FS_ARGS: dict[str, Any] = {"open_args_save": {"mode": "wb"}} def __init__( # noqa = PLR0913 self, @@ -83,8 +84,7 @@ def __init__( # noqa = PLR0913 `open_args_load` and `open_args_save`. Here you can find all available arguments for `open`: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open - All defaults are preserved, except `mode`, which is set to `r` when loading - and to `w` when saving. + All defaults are preserved, except `mode`, which is set to `wb` when saving. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ @@ -106,14 +106,16 @@ def __init__( # noqa = PLR0913 exists_function=self._fs.exists, glob_function=self._fs.glob, ) - # Handle default save arguments - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) - - _fs_open_args_save.setdefault("mode", "w") - self._fs_open_args_load = _fs_open_args_load - self._fs_open_args_save = _fs_open_args_save + # Handle default save and fs arguments + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } def _describe(self) -> dict[str, Any]: return { @@ -134,7 +136,7 @@ def _load(self) -> np.ndarray: def _save(self, data: np.ndarray) -> None: save_path = get_filepath_str(self._get_save_path(), self._protocol) - with self._fs.open(save_path, mode="wb") as f: + with self._fs.open(save_path, **self._fs_open_args_save) as f: io.savemat(f, {"data": data}) self._invalidate_cache() diff --git a/kedro-datasets/kedro_datasets/matplotlib/matplotlib_writer.py b/kedro-datasets/kedro_datasets/matplotlib/matplotlib_writer.py index 377c4dfbd..33cc6d12c 100644 --- a/kedro-datasets/kedro_datasets/matplotlib/matplotlib_writer.py +++ b/kedro-datasets/kedro_datasets/matplotlib/matplotlib_writer.py @@ -178,9 +178,7 @@ def __init__( # noqa: PLR0913 self._fs_open_args_save = _fs_open_args_save # Handle default save arguments - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} if overwrite and version is not None: warn( diff --git a/kedro-datasets/kedro_datasets/networkx/gml_dataset.py b/kedro-datasets/kedro_datasets/networkx/gml_dataset.py index dec121bee..ff6916b61 100644 --- a/kedro-datasets/kedro_datasets/networkx/gml_dataset.py +++ b/kedro-datasets/kedro_datasets/networkx/gml_dataset.py @@ -40,6 +40,10 @@ class GMLDataset(AbstractVersionedDataset[networkx.Graph, networkx.Graph]): DEFAULT_LOAD_ARGS: dict[str, Any] = {} DEFAULT_SAVE_ARGS: dict[str, Any] = {} + DEFAULT_FS_ARGS: dict[str, Any] = { + "open_args_save": {"mode": "wb"}, + "open_args_load": {"mode": "rb"}, + } def __init__( # noqa: PLR0913 self, @@ -74,9 +78,9 @@ def __init__( # noqa: PLR0913 `open_args_load` and `open_args_save`. Here you can find all available arguments for `open`: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open - All defaults are preserved, except `mode`, which is set to `r` when loading - and to `w` when saving. - metadata: Any Any arbitrary metadata. + All defaults are preserved, except `mode`, which is set to `rb` when loading + and to `wb` when saving. + metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ _fs_args = deepcopy(fs_args) or {} @@ -100,17 +104,17 @@ def __init__( # noqa: PLR0913 glob_function=self._fs.glob, ) - # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) - _fs_open_args_load.setdefault("mode", "rb") - _fs_open_args_save.setdefault("mode", "wb") - self._fs_open_args_load = _fs_open_args_load - self._fs_open_args_save = _fs_open_args_save + # Handle default load and save and fs arguments + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } def _load(self) -> networkx.Graph: load_path = get_filepath_str(self._get_load_path(), self._protocol) diff --git a/kedro-datasets/kedro_datasets/networkx/graphml_dataset.py b/kedro-datasets/kedro_datasets/networkx/graphml_dataset.py index f69113533..cccffee20 100644 --- a/kedro-datasets/kedro_datasets/networkx/graphml_dataset.py +++ b/kedro-datasets/kedro_datasets/networkx/graphml_dataset.py @@ -39,6 +39,10 @@ class GraphMLDataset(AbstractVersionedDataset[networkx.Graph, networkx.Graph]): DEFAULT_LOAD_ARGS: dict[str, Any] = {} DEFAULT_SAVE_ARGS: dict[str, Any] = {} + DEFAULT_FS_ARGS: dict[str, Any] = { + "open_args_save": {"mode": "wb"}, + "open_args_load": {"mode": "rb"}, + } def __init__( # noqa: PLR0913 self, @@ -73,9 +77,9 @@ def __init__( # noqa: PLR0913 `open_args_load` and `open_args_save`. Here you can find all available arguments for `open`: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open - All defaults are preserved, except `mode`, which is set to `r` when loading - and to `w` when saving. - metadata: Any arbitrary Any arbitrary metadata. + All defaults are preserved, except `mode`, which is set to `rb` when loading + and to `wb` when saving. + metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ _fs_args = deepcopy(fs_args) or {} @@ -99,17 +103,17 @@ def __init__( # noqa: PLR0913 glob_function=self._fs.glob, ) - # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) - _fs_open_args_load.setdefault("mode", "rb") - _fs_open_args_save.setdefault("mode", "wb") - self._fs_open_args_load = _fs_open_args_load - self._fs_open_args_save = _fs_open_args_save + # Handle default load and save and fs arguments + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } def _load(self) -> networkx.Graph: load_path = get_filepath_str(self._get_load_path(), self._protocol) diff --git a/kedro-datasets/kedro_datasets/networkx/json_dataset.py b/kedro-datasets/kedro_datasets/networkx/json_dataset.py index 1bef8dc3d..a40d7b3e1 100644 --- a/kedro-datasets/kedro_datasets/networkx/json_dataset.py +++ b/kedro-datasets/kedro_datasets/networkx/json_dataset.py @@ -40,6 +40,7 @@ class JSONDataset(AbstractVersionedDataset[networkx.Graph, networkx.Graph]): DEFAULT_LOAD_ARGS: dict[str, Any] = {} DEFAULT_SAVE_ARGS: dict[str, Any] = {} + DEFAULT_FS_ARGS: dict[str, Any] = {"open_args_save": {"mode": "w"}} def __init__( # noqa: PLR0913 self, @@ -74,9 +75,8 @@ def __init__( # noqa: PLR0913 `open_args_load` and `open_args_save`. Here you can find all available arguments for `open`: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open - All defaults are preserved, except `mode`, which is set to `r` when loading - and to `w` when saving. - metadata: Any Any arbitrary metadata. + All defaults are preserved, except `mode`, which is set to `w` when saving. + metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ _fs_args = deepcopy(fs_args) or {} @@ -100,17 +100,17 @@ def __init__( # noqa: PLR0913 glob_function=self._fs.glob, ) - # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) - - _fs_open_args_save.setdefault("mode", "w") - self._fs_open_args_load = _fs_open_args_load - self._fs_open_args_save = _fs_open_args_save + # Handle default load and save and fs arguments + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } def _load(self) -> networkx.Graph: load_path = get_filepath_str(self._get_load_path(), self._protocol) diff --git a/kedro-datasets/kedro_datasets/pandas/csv_dataset.py b/kedro-datasets/kedro_datasets/pandas/csv_dataset.py index 2c43e13c6..e9487b216 100644 --- a/kedro-datasets/kedro_datasets/pandas/csv_dataset.py +++ b/kedro-datasets/kedro_datasets/pandas/csv_dataset.py @@ -5,7 +5,6 @@ import logging from copy import deepcopy -from io import BytesIO from pathlib import PurePosixPath from typing import Any @@ -70,6 +69,7 @@ class CSVDataset(AbstractVersionedDataset[pd.DataFrame, pd.DataFrame]): DEFAULT_LOAD_ARGS: dict[str, Any] = {} DEFAULT_SAVE_ARGS: dict[str, Any] = {"index": False} + DEFAULT_FS_ARGS: dict[str, Any] = {"open_args_save": {"mode": "w"}} def __init__( # noqa: PLR0913 self, @@ -97,7 +97,7 @@ def __init__( # noqa: PLR0913 save_args: Pandas options for saving CSV files. Here you can find all available arguments: https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_csv.html - All defaults are preserved, but "index", which is set to False. + Defaults are preserved, apart from "index", which is set to False. version: If specified, should be an instance of ``kedro.io.core.Version``. If its ``load`` attribute is None, the latest version will be loaded. If its ``save`` @@ -106,10 +106,14 @@ def __init__( # noqa: PLR0913 E.g. for ``GCSFileSystem`` it should look like `{"token": None}`. fs_args: Extra arguments to pass into underlying filesystem class constructor (e.g. `{"project": "my-project"}` for ``GCSFileSystem``). - metadata: Any Any arbitrary metadata. + Defaults are preserved, apart from the `open_args_save` `mode` which is set to `w`. + metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ _fs_args = deepcopy(fs_args) or {} + _fs_open_args_load = _fs_args.pop("open_args_load", {}) + _fs_open_args_save = _fs_args.pop("open_args_save", {}) + _credentials = deepcopy(credentials) or {} protocol, path = get_protocol_and_path(filepath, version) @@ -119,7 +123,6 @@ def __init__( # noqa: PLR0913 self._protocol = protocol self._storage_options = {**_credentials, **_fs_args} self._fs = fsspec.filesystem(self._protocol, **self._storage_options) - self.metadata = metadata super().__init__( @@ -129,13 +132,17 @@ def __init__( # noqa: PLR0913 glob_function=self._fs.glob, ) - # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + # Handle default load and save and fs arguments + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } if "storage_options" in self._save_args or "storage_options" in self._load_args: logger.warning( @@ -172,11 +179,8 @@ def _load(self) -> pd.DataFrame: def _save(self, data: pd.DataFrame) -> None: save_path = get_filepath_str(self._get_save_path(), self._protocol) - buf = BytesIO() - data.to_csv(path_or_buf=buf, **self._save_args) - - with self._fs.open(save_path, mode="wb") as fs_file: - fs_file.write(buf.getvalue()) + with self._fs.open(save_path, **self._fs_open_args_save) as fs_file: + data.to_csv(path_or_buf=fs_file, **self._save_args) self._invalidate_cache() diff --git a/kedro-datasets/kedro_datasets/pandas/deltatable_dataset.py b/kedro-datasets/kedro_datasets/pandas/deltatable_dataset.py index fcc680d16..4e5db2f45 100644 --- a/kedro-datasets/kedro_datasets/pandas/deltatable_dataset.py +++ b/kedro-datasets/kedro_datasets/pandas/deltatable_dataset.py @@ -144,13 +144,9 @@ def __init__( # noqa: PLR0913 self.is_empty_dir: bool = False self._delta_table: DeltaTable | None = None - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args: - self._load_args.update(load_args) - - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args: - self._save_args.update(save_args) + # Handle default load and save arguments + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} write_mode = self._save_args.get("mode", None) if write_mode not in self.ACCEPTED_WRITE_MODES: diff --git a/kedro-datasets/kedro_datasets/pandas/excel_dataset.py b/kedro-datasets/kedro_datasets/pandas/excel_dataset.py index 601ef377e..d08250bdf 100644 --- a/kedro-datasets/kedro_datasets/pandas/excel_dataset.py +++ b/kedro-datasets/kedro_datasets/pandas/excel_dataset.py @@ -5,7 +5,6 @@ import logging from copy import deepcopy -from io import BytesIO from pathlib import PurePosixPath from typing import Any, Union @@ -109,6 +108,7 @@ class ExcelDataset( DEFAULT_LOAD_ARGS = {"engine": "openpyxl"} DEFAULT_SAVE_ARGS = {"index": False} + DEFAULT_FS_ARGS: dict[str, Any] = {"open_args_save": {"mode": "wb"}} def __init__( # noqa: PLR0913 self, @@ -153,6 +153,8 @@ def __init__( # noqa: PLR0913 E.g. for ``GCSFileSystem`` it should look like `{"token": None}`. fs_args: Extra arguments to pass into underlying filesystem class constructor (e.g. `{"project": "my-project"}` for ``GCSFileSystem``). + Defaults are preserved, apart from the `open_args_save` `mode` which is set to `wb`. + Note that the save method requires bytes, so any save mode provided should include "b" for bytes. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. @@ -160,6 +162,8 @@ def __init__( # noqa: PLR0913 DatasetError: If versioning is enabled while in append mode. """ _fs_args = deepcopy(fs_args) or {} + _fs_open_args_load = _fs_args.pop("open_args_load", {}) + _fs_open_args_save = _fs_args.pop("open_args_save", {}) _credentials = deepcopy(credentials) or {} protocol, path = get_protocol_and_path(filepath, version) @@ -179,15 +183,18 @@ def __init__( # noqa: PLR0913 glob_function=self._fs.glob, ) - # Handle default load arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) + # Handle default load and save and fs arguments + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } - # Handle default save arguments - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) self._writer_args = self._save_args.pop("writer", {}) # type: ignore self._writer_args.setdefault("engine", engine or "openpyxl") # type: ignore @@ -230,20 +237,17 @@ def _load(self) -> pd.DataFrame | dict[str, pd.DataFrame]: ) def _save(self, data: pd.DataFrame | dict[str, pd.DataFrame]) -> None: - output = BytesIO() save_path = get_filepath_str(self._get_save_path(), self._protocol) - with pd.ExcelWriter(output, **self._writer_args) as writer: - if isinstance(data, dict): - for sheet_name, sheet_data in data.items(): - sheet_data.to_excel( - writer, sheet_name=sheet_name, **self._save_args - ) - else: - data.to_excel(writer, **self._save_args) - - with self._fs.open(save_path, mode="wb") as fs_file: - fs_file.write(output.getvalue()) + with self._fs.open(save_path, **self._fs_open_args_save) as fs_file: + with pd.ExcelWriter(fs_file, **self._writer_args) as writer: + if isinstance(data, dict): + for sheet_name, sheet_data in data.items(): + sheet_data.to_excel( + writer, sheet_name=sheet_name, **self._save_args + ) + else: + data.to_excel(writer, **self._save_args) self._invalidate_cache() diff --git a/kedro-datasets/kedro_datasets/pandas/feather_dataset.py b/kedro-datasets/kedro_datasets/pandas/feather_dataset.py index eb1f115f0..706eaee1b 100644 --- a/kedro-datasets/kedro_datasets/pandas/feather_dataset.py +++ b/kedro-datasets/kedro_datasets/pandas/feather_dataset.py @@ -6,7 +6,6 @@ import logging from copy import deepcopy -from io import BytesIO from pathlib import PurePosixPath from typing import Any @@ -69,6 +68,7 @@ class FeatherDataset(AbstractVersionedDataset[pd.DataFrame, pd.DataFrame]): DEFAULT_LOAD_ARGS: dict[str, Any] = {} DEFAULT_SAVE_ARGS: dict[str, Any] = {} + DEFAULT_FS_ARGS: dict[str, Any] = {"open_args_save": {"mode": "wb"}} def __init__( # noqa: PLR0913 self, @@ -105,10 +105,14 @@ def __init__( # noqa: PLR0913 E.g. for ``GCSFileSystem`` it should look like `{"token": None}`. fs_args: Extra arguments to pass into underlying filesystem class constructor (e.g. `{"project": "my-project"}` for ``GCSFileSystem``). + Defaults are preserved, apart from the `open_args_save` `mode` which is set to `wb`. + Note that the save method requires bytes, so any save mode provided should include "b" for bytes. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ _fs_args = deepcopy(fs_args) or {} + _fs_open_args_load = _fs_args.pop("open_args_load", {}) + _fs_open_args_save = _fs_args.pop("open_args_save", {}) _credentials = deepcopy(credentials) or {} protocol, path = get_protocol_and_path(filepath, version) @@ -128,13 +132,17 @@ def __init__( # noqa: PLR0913 glob_function=self._fs.glob, ) - # Handle default load argument - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + # Handle default load and save and fs arguments + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } if "storage_options" in self._save_args or "storage_options" in self._load_args: logger.warning( @@ -170,11 +178,8 @@ def _load(self) -> pd.DataFrame: def _save(self, data: pd.DataFrame) -> None: save_path = get_filepath_str(self._get_save_path(), self._protocol) - buf = BytesIO() - data.to_feather(buf, **self._save_args) - - with self._fs.open(save_path, mode="wb") as fs_file: - fs_file.write(buf.getvalue()) + with self._fs.open(save_path, **self._fs_open_args_save) as fs_file: + data.to_feather(fs_file, **self._save_args) self._invalidate_cache() diff --git a/kedro-datasets/kedro_datasets/pandas/gbq_dataset.py b/kedro-datasets/kedro_datasets/pandas/gbq_dataset.py index 060316204..f16f828f7 100644 --- a/kedro-datasets/kedro_datasets/pandas/gbq_dataset.py +++ b/kedro-datasets/kedro_datasets/pandas/gbq_dataset.py @@ -107,12 +107,8 @@ def __init__( # noqa: PLR0913 are different. """ # Handle default load and save arguments - self._load_args = copy.deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = copy.deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} self._validate_location() validate_on_forbidden_chars(dataset=dataset, table_name=table_name) @@ -262,9 +258,7 @@ def __init__( # noqa: PLR0913 ) # Handle default load arguments - self._load_args = copy.deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} self._project_id = project diff --git a/kedro-datasets/kedro_datasets/pandas/generic_dataset.py b/kedro-datasets/kedro_datasets/pandas/generic_dataset.py index 4a4ec2726..77da9caf2 100644 --- a/kedro-datasets/kedro_datasets/pandas/generic_dataset.py +++ b/kedro-datasets/kedro_datasets/pandas/generic_dataset.py @@ -87,6 +87,7 @@ class GenericDataset(AbstractVersionedDataset[pd.DataFrame, pd.DataFrame]): DEFAULT_LOAD_ARGS: dict[str, Any] = {} DEFAULT_SAVE_ARGS: dict[str, Any] = {} + DEFAULT_FS_ARGS: dict[str, Any] = {"open_args_save": {"mode": "w"}} def __init__( # noqa: PLR0913 self, @@ -137,8 +138,7 @@ def __init__( # noqa: PLR0913 `open_args_load` and `open_args_save`. Here you can find all available arguments for `open`: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open - All defaults are preserved, except `mode`, which is set to `r` when loading - and to `w` when saving. + All defaults are preserved, except `mode`, which is set to `w` when saving. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. @@ -170,16 +170,17 @@ def __init__( # noqa: PLR0913 glob_function=self._fs.glob, ) - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) - - _fs_open_args_save.setdefault("mode", "w") - self._fs_open_args_load = _fs_open_args_load - self._fs_open_args_save = _fs_open_args_save + # Handle default load and save and fs arguments + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } def _ensure_file_system_target(self) -> None: # Fail fast if provided a known non-filesystem target diff --git a/kedro-datasets/kedro_datasets/pandas/hdf_dataset.py b/kedro-datasets/kedro_datasets/pandas/hdf_dataset.py index 227b26133..b3c56e2c7 100644 --- a/kedro-datasets/kedro_datasets/pandas/hdf_dataset.py +++ b/kedro-datasets/kedro_datasets/pandas/hdf_dataset.py @@ -60,6 +60,7 @@ class HDFDataset(AbstractVersionedDataset[pd.DataFrame, pd.DataFrame]): _lock = Lock() DEFAULT_LOAD_ARGS: dict[str, Any] = {} DEFAULT_SAVE_ARGS: dict[str, Any] = {} + DEFAULT_FS_ARGS: dict[str, Any] = {"open_args_save": {"mode": "wb"}} def __init__( # noqa: PLR0913 self, @@ -102,7 +103,7 @@ def __init__( # noqa: PLR0913 `open_args_load` and `open_args_save`. Here you can find all available arguments for `open`: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open - All defaults are preserved, except `mode`, which is set `wb` when saving. + All defaults are preserved, except `open_args_save` `mode`, which is set `wb` when saving. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ @@ -129,17 +130,17 @@ def __init__( # noqa: PLR0913 self._key = key - # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) - - _fs_open_args_save.setdefault("mode", "wb") - self._fs_open_args_load = _fs_open_args_load - self._fs_open_args_save = _fs_open_args_save + # Handle default load and save and fs arguments + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } def _describe(self) -> dict[str, Any]: return { diff --git a/kedro-datasets/kedro_datasets/pandas/json_dataset.py b/kedro-datasets/kedro_datasets/pandas/json_dataset.py index 578c494ce..d4d0c87d6 100644 --- a/kedro-datasets/kedro_datasets/pandas/json_dataset.py +++ b/kedro-datasets/kedro_datasets/pandas/json_dataset.py @@ -5,7 +5,6 @@ import logging from copy import deepcopy -from io import BytesIO from pathlib import PurePosixPath from typing import Any @@ -66,6 +65,7 @@ class JSONDataset(AbstractVersionedDataset[pd.DataFrame, pd.DataFrame]): DEFAULT_LOAD_ARGS: dict[str, Any] = {} DEFAULT_SAVE_ARGS: dict[str, Any] = {} + DEFAULT_FS_ARGS: dict[str, Any] = {"open_args_save": {"mode": "w"}} def __init__( # noqa: PLR0913 self, @@ -93,7 +93,7 @@ def __init__( # noqa: PLR0913 save_args: Pandas options for saving JSON files. Here you can find all available arguments: https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_json.html - All defaults are preserved, but "index", which is set to False. + All defaults are preserved. version: If specified, should be an instance of ``kedro.io.core.Version``. If its ``load`` attribute is None, the latest version will be loaded. If its ``save`` @@ -102,10 +102,14 @@ def __init__( # noqa: PLR0913 E.g. for ``GCSFileSystem`` it should look like `{'token': None}`. fs_args: Extra arguments to pass into underlying filesystem class constructor (e.g. `{"project": "my-project"}` for ``GCSFileSystem``). + Defaults are preserved, apart from the `open_args_save` `mode` which is set to `w`. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ _fs_args = deepcopy(fs_args) or {} + _fs_open_args_load = _fs_args.pop("open_args_load", {}) + _fs_open_args_save = _fs_args.pop("open_args_save", {}) + _credentials = deepcopy(credentials) or {} protocol, path = get_protocol_and_path(filepath, version) if protocol == "file": @@ -124,13 +128,17 @@ def __init__( # noqa: PLR0913 glob_function=self._fs.glob, ) - # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + # Handle default load and save and fs arguments + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } if "storage_options" in self._save_args or "storage_options" in self._load_args: logger.warning( @@ -167,11 +175,8 @@ def _load(self) -> pd.DataFrame: def _save(self, data: pd.DataFrame) -> None: save_path = get_filepath_str(self._get_save_path(), self._protocol) - buf = BytesIO() - data.to_json(path_or_buf=buf, **self._save_args) - - with self._fs.open(save_path, mode="wb") as fs_file: - fs_file.write(buf.getvalue()) + with self._fs.open(save_path, **self._fs_open_args_save) as fs_file: + data.to_json(path_or_buf=fs_file, **self._save_args) self._invalidate_cache() diff --git a/kedro-datasets/kedro_datasets/pandas/parquet_dataset.py b/kedro-datasets/kedro_datasets/pandas/parquet_dataset.py index 760d5a8f3..7aefe675e 100644 --- a/kedro-datasets/kedro_datasets/pandas/parquet_dataset.py +++ b/kedro-datasets/kedro_datasets/pandas/parquet_dataset.py @@ -5,7 +5,6 @@ import logging from copy import deepcopy -from io import BytesIO from pathlib import Path, PurePosixPath from typing import Any @@ -77,6 +76,7 @@ class ParquetDataset(AbstractVersionedDataset[pd.DataFrame, pd.DataFrame]): DEFAULT_LOAD_ARGS: dict[str, Any] = {} DEFAULT_SAVE_ARGS: dict[str, Any] = {} + DEFAULT_FS_ARGS: dict[str, Any] = {"open_args_save": {"mode": "wb"}} def __init__( # noqa: PLR0913 self, @@ -116,10 +116,14 @@ def __init__( # noqa: PLR0913 E.g. for ``GCSFileSystem`` it should look like `{"token": None}`. fs_args: Extra arguments to pass into underlying filesystem class constructor (e.g. `{"project": "my-project"}` for ``GCSFileSystem``). + Defaults are preserved, apart from the `open_args_save` `mode` which is set to `wb`. + Note that the save method requires bytes, so any save mode provided should include "b" for bytes. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ _fs_args = deepcopy(fs_args) or {} + _fs_open_args_load = _fs_args.pop("open_args_load", {}) + _fs_open_args_save = _fs_args.pop("open_args_save", {}) _credentials = deepcopy(credentials) or {} protocol, path = get_protocol_and_path(filepath, version) @@ -139,13 +143,17 @@ def __init__( # noqa: PLR0913 glob_function=self._fs.glob, ) - # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + # Handle default load and save and fs arguments + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } if "storage_options" in self._save_args or "storage_options" in self._load_args: logger.warning( @@ -193,11 +201,8 @@ def _save(self, data: pd.DataFrame) -> None: f"'partition_cols'. Please use 'kedro.io.PartitionedDataset' instead." ) - bytes_buffer = BytesIO() - data.to_parquet(bytes_buffer, **self._save_args) - - with self._fs.open(save_path, mode="wb") as fs_file: - fs_file.write(bytes_buffer.getvalue()) + with self._fs.open(save_path, **self._fs_open_args_save) as fs_file: + data.to_parquet(fs_file, **self._save_args) self._invalidate_cache() diff --git a/kedro-datasets/kedro_datasets/pandas/sql_dataset.py b/kedro-datasets/kedro_datasets/pandas/sql_dataset.py index f750ab685..3c5769c62 100644 --- a/kedro-datasets/kedro_datasets/pandas/sql_dataset.py +++ b/kedro-datasets/kedro_datasets/pandas/sql_dataset.py @@ -210,12 +210,8 @@ def __init__( # noqa: PLR0913 ) # Handle default load and save arguments - self._load_args = copy.deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = copy.deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} self._load_args["table_name"] = table_name self._save_args["name"] = table_name diff --git a/kedro-datasets/kedro_datasets/pandas/xml_dataset.py b/kedro-datasets/kedro_datasets/pandas/xml_dataset.py index b1173f43e..8b86909dc 100644 --- a/kedro-datasets/kedro_datasets/pandas/xml_dataset.py +++ b/kedro-datasets/kedro_datasets/pandas/xml_dataset.py @@ -5,7 +5,6 @@ import logging from copy import deepcopy -from io import BytesIO from pathlib import PurePosixPath from typing import Any @@ -47,6 +46,7 @@ class XMLDataset(AbstractVersionedDataset[pd.DataFrame, pd.DataFrame]): DEFAULT_LOAD_ARGS: dict[str, Any] = {} DEFAULT_SAVE_ARGS: dict[str, Any] = {"index": False} + DEFAULT_FS_ARGS: dict[str, Any] = {"open_args_save": {"mode": "wb"}} def __init__( # noqa: PLR0913 self, @@ -83,10 +83,14 @@ def __init__( # noqa: PLR0913 E.g. for ``GCSFileSystem`` it should look like `{"token": None}`. fs_args: Extra arguments to pass into underlying filesystem class constructor (e.g. `{"project": "my-project"}` for ``GCSFileSystem``). + Defaults are preserved, apart from the `open_args_save` `mode` which is set to `wb`. + Note that the save method requires bytes, so any save mode provided should include "b" for bytes. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ _fs_args = deepcopy(fs_args) or {} + _fs_open_args_load = _fs_args.pop("open_args_load", {}) + _fs_open_args_save = _fs_args.pop("open_args_save", {}) _credentials = deepcopy(credentials) or {} protocol, path = get_protocol_and_path(filepath, version) @@ -106,13 +110,17 @@ def __init__( # noqa: PLR0913 glob_function=self._fs.glob, ) - # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + # Handle default load and save and fs arguments + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } if "storage_options" in self._save_args or "storage_options" in self._load_args: logger.warning( @@ -149,11 +157,8 @@ def _load(self) -> pd.DataFrame: def _save(self, data: pd.DataFrame) -> None: save_path = get_filepath_str(self._get_save_path(), self._protocol) - buf = BytesIO() - data.to_xml(path_or_buffer=buf, **self._save_args) - - with self._fs.open(save_path, mode="wb") as fs_file: - fs_file.write(buf.getvalue()) + with self._fs.open(save_path, **self._fs_open_args_save) as fs_file: + data.to_xml(path_or_buffer=fs_file, **self._save_args) self._invalidate_cache() diff --git a/kedro-datasets/kedro_datasets/pickle/pickle_dataset.py b/kedro-datasets/kedro_datasets/pickle/pickle_dataset.py index 3ef071e6c..30b4d3630 100644 --- a/kedro-datasets/kedro_datasets/pickle/pickle_dataset.py +++ b/kedro-datasets/kedro_datasets/pickle/pickle_dataset.py @@ -74,6 +74,7 @@ class PickleDataset(AbstractVersionedDataset[Any, Any]): DEFAULT_LOAD_ARGS: dict[str, Any] = {} DEFAULT_SAVE_ARGS: dict[str, Any] = {} + DEFAULT_FS_ARGS: dict[str, Any] = {"open_args_save": {"mode": "wb"}} def __init__( # noqa: PLR0913 self, @@ -193,16 +194,16 @@ def __init__( # noqa: PLR0913 self._backend = backend # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) - - _fs_open_args_save.setdefault("mode", "wb") - self._fs_open_args_load = _fs_open_args_load - self._fs_open_args_save = _fs_open_args_save + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } def _describe(self) -> dict[str, Any]: return { diff --git a/kedro-datasets/kedro_datasets/pillow/image_dataset.py b/kedro-datasets/kedro_datasets/pillow/image_dataset.py index d3e2b838d..2f4ddae13 100644 --- a/kedro-datasets/kedro_datasets/pillow/image_dataset.py +++ b/kedro-datasets/kedro_datasets/pillow/image_dataset.py @@ -45,6 +45,7 @@ class ImageDataset(AbstractVersionedDataset[Image.Image, Image.Image]): """ DEFAULT_SAVE_ARGS: dict[str, Any] = {} + DEFAULT_FS_ARGS: dict[str, Any] = {"open_args_save": {"mode": "wb"}} def __init__( # noqa: PLR0913 self, @@ -80,8 +81,7 @@ def __init__( # noqa: PLR0913 `open_args_load` and `open_args_save`. Here you can find all available arguments for `open`: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open - All defaults are preserved, except `mode`, which is set to `r` when loading - and to `w` when saving. + All defaults are preserved, except `mode`, which is set to `wb` when saving. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ @@ -106,14 +106,16 @@ def __init__( # noqa: PLR0913 glob_function=self._fs.glob, ) - # Handle default save argument - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) - - _fs_open_args_save.setdefault("mode", "wb") - self._fs_open_args_load = _fs_open_args_load - self._fs_open_args_save = _fs_open_args_save + # Handle default save and fs arguments + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } def _describe(self) -> dict[str, Any]: return { diff --git a/kedro-datasets/kedro_datasets/plotly/json_dataset.py b/kedro-datasets/kedro_datasets/plotly/json_dataset.py index 4e5182f69..f1a097909 100644 --- a/kedro-datasets/kedro_datasets/plotly/json_dataset.py +++ b/kedro-datasets/kedro_datasets/plotly/json_dataset.py @@ -57,6 +57,7 @@ class JSONDataset( DEFAULT_LOAD_ARGS: dict[str, Any] = {} DEFAULT_SAVE_ARGS: dict[str, Any] = {} + DEFAULT_FS_ARGS: dict[str, Any] = {"open_args_save": {"mode": "w"}} def __init__( # noqa: PLR0913 self, @@ -123,17 +124,17 @@ def __init__( # noqa: PLR0913 glob_function=self._fs.glob, ) - # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) - - _fs_open_args_save.setdefault("mode", "w") - self._fs_open_args_load = _fs_open_args_load - self._fs_open_args_save = _fs_open_args_save + # Handle default load and save and fs arguments + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } def _describe(self) -> dict[str, Any]: return { diff --git a/kedro-datasets/kedro_datasets/plotly/plotly_dataset.py b/kedro-datasets/kedro_datasets/plotly/plotly_dataset.py index 68b64fd71..c15df71d7 100644 --- a/kedro-datasets/kedro_datasets/plotly/plotly_dataset.py +++ b/kedro-datasets/kedro_datasets/plotly/plotly_dataset.py @@ -70,6 +70,8 @@ class PlotlyDataset(JSONDataset): """ + DEFAULT_FS_ARGS: dict[str, Any] = {"open_args_save": {"mode": "w"}} + def __init__( # noqa: PLR0913 self, *, @@ -131,10 +133,16 @@ def __init__( # noqa: PLR0913 _fs_args = deepcopy(fs_args) or {} _fs_open_args_load = _fs_args.pop("open_args_load", {}) _fs_open_args_save = _fs_args.pop("open_args_save", {}) - _fs_open_args_save.setdefault("mode", "w") - self._fs_open_args_load = _fs_open_args_load - self._fs_open_args_save = _fs_open_args_save + # Handle default fs arguments + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } self.metadata = metadata diff --git a/kedro-datasets/kedro_datasets/polars/csv_dataset.py b/kedro-datasets/kedro_datasets/polars/csv_dataset.py index 1195ce295..44fd427af 100644 --- a/kedro-datasets/kedro_datasets/polars/csv_dataset.py +++ b/kedro-datasets/kedro_datasets/polars/csv_dataset.py @@ -5,7 +5,6 @@ import logging from copy import deepcopy -from io import BytesIO from pathlib import PurePosixPath from typing import Any @@ -67,6 +66,7 @@ class CSVDataset(AbstractVersionedDataset[pl.DataFrame, pl.DataFrame]): DEFAULT_LOAD_ARGS: dict[str, Any] = {"rechunk": True} DEFAULT_SAVE_ARGS: dict[str, Any] = {} + DEFAULT_FS_ARGS: dict[str, Any] = {"open_args_save": {"mode": "w"}} def __init__( # noqa: PLR0913 self, @@ -92,8 +92,8 @@ def __init__( # noqa: PLR0913 load_args: Polars options for loading CSV files. Here you can find all available arguments: https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.read_csv.html#polars.read_csv - All defaults are preserved, but we explicity use `rechunk=True` for `seaborn` - compability. + All defaults are preserved, but we explicitly use `rechunk=True` for `seaborn` + compatibility. save_args: Polars options for saving CSV files. Here you can find all available arguments: https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.DataFrame.write_csv.html @@ -106,10 +106,14 @@ def __init__( # noqa: PLR0913 E.g. for ``GCSFileSystem`` it should look like `{"token": None}`. fs_args: Extra arguments to pass into underlying filesystem class constructor (e.g. `{"project": "my-project"}` for ``GCSFileSystem``). + Defaults are preserved, apart from the `open_args_save` `mode` which is set to `w`. + metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ _fs_args = deepcopy(fs_args) or {} + _fs_open_args_load = _fs_args.pop("open_args_load", {}) + _fs_open_args_save = _fs_args.pop("open_args_save", {}) _credentials = deepcopy(credentials) or {} protocol, path = get_protocol_and_path(filepath, version) @@ -129,13 +133,17 @@ def __init__( # noqa: PLR0913 glob_function=self._fs.glob, ) - # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + # Handle default load and save and fs arguments + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } if "storage_options" in self._save_args or "storage_options" in self._load_args: logger.warning( @@ -172,11 +180,8 @@ def _load(self) -> pl.DataFrame: def _save(self, data: pl.DataFrame) -> None: save_path = get_filepath_str(self._get_save_path(), self._protocol) - buf = BytesIO() - data.write_csv(file=buf, **self._save_args) - - with self._fs.open(save_path, mode="wb") as fs_file: - fs_file.write(buf.getvalue()) + with self._fs.open(save_path, **self._fs_open_args_save) as fs_file: + data.write_csv(file=fs_file, **self._save_args) self._invalidate_cache() diff --git a/kedro-datasets/kedro_datasets/polars/lazy_polars_dataset.py b/kedro-datasets/kedro_datasets/polars/lazy_polars_dataset.py index 2e650e52e..fc9733100 100644 --- a/kedro-datasets/kedro_datasets/polars/lazy_polars_dataset.py +++ b/kedro-datasets/kedro_datasets/polars/lazy_polars_dataset.py @@ -6,7 +6,6 @@ import logging from copy import deepcopy -from io import BytesIO from pathlib import PurePosixPath from typing import Any, ClassVar, Union @@ -74,6 +73,7 @@ class LazyPolarsDataset(AbstractVersionedDataset[pl.LazyFrame, PolarsFrame]): DEFAULT_LOAD_ARGS: ClassVar[dict[str, Any]] = {} DEFAULT_SAVE_ARGS: ClassVar[dict[str, Any]] = {} + DEFAULT_FS_ARGS: dict[str, Any] = {"open_args_save": {"mode": "wb"}} def __init__( # noqa: PLR0913 self, @@ -126,8 +126,7 @@ def __init__( # noqa: PLR0913 `open_args_load` and `open_args_save`. Here you can find all available arguments for `open`: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open - All defaults are preserved, except `mode`, which is set to `r` when loading - and to `w` when saving. + All defaults are preserved, except `mode`, which is set to `wb` when saving. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. Raises: @@ -145,6 +144,8 @@ def __init__( # noqa: PLR0913 ) _fs_args = deepcopy(fs_args) or {} + _fs_open_args_load = _fs_args.pop("open_args_load", {}) + _fs_open_args_save = _fs_args.pop("open_args_save", {}) _credentials = deepcopy(credentials) or {} protocol, path = get_protocol_and_path(filepath, version) @@ -164,13 +165,17 @@ def __init__( # noqa: PLR0913 glob_function=self._fs.glob, ) - # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + # Handle default load and save and fs arguments + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } if "storage_options" in self._save_args or "storage_options" in self._load_args: logger.warning( @@ -218,10 +223,9 @@ def _save(self, data: pl.DataFrame | pl.LazyFrame) -> None: # https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.DataFrame.write_parquet.html save_method = getattr(collected_data, f"write_{self._file_format}", None) if save_method: - buf = BytesIO() - save_method(file=buf, **self._save_args) - with self._fs.open(save_path, mode="wb") as fs_file: - fs_file.write(buf.getvalue()) + with self._fs.open(save_path, **self._fs_open_args_save) as fs_file: + save_method(file=fs_file, **self._save_args) + self._invalidate_cache() # How the LazyPolarsDataset logic is currently written with # ACCEPTED_FILE_FORMATS and a check in the `__init__` method, diff --git a/kedro-datasets/kedro_datasets/snowflake/snowpark_dataset.py b/kedro-datasets/kedro_datasets/snowflake/snowpark_dataset.py index 249f5d001..c8685aa49 100644 --- a/kedro-datasets/kedro_datasets/snowflake/snowpark_dataset.py +++ b/kedro-datasets/kedro_datasets/snowflake/snowpark_dataset.py @@ -3,7 +3,6 @@ from __future__ import annotations import logging -from copy import deepcopy from typing import Any import snowflake.snowpark as sp @@ -156,12 +155,8 @@ def __init__( # noqa: PLR0913 ) schema = credentials["schema"] # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} self._table_name = table_name self._database = database diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index 2fc9bd16c..e077d6390 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -366,12 +366,8 @@ def __init__( # noqa: PLR0913 ) # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} # Handle schema load argument self._schema = self._load_args.pop("schema", None) diff --git a/kedro-datasets/kedro_datasets/spark/spark_jdbc_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_jdbc_dataset.py index 60e1443c0..3e4c7b2f4 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_jdbc_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_jdbc_dataset.py @@ -1,7 +1,6 @@ """SparkJDBCDataset to load and save a PySpark DataFrame via JDBC.""" from __future__ import annotations -from copy import deepcopy from typing import Any from kedro.io.core import AbstractDataset, DatasetError @@ -126,12 +125,8 @@ def __init__( # noqa: PLR0913 self.metadata = metadata # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} # Update properties in load_args and save_args with credentials. if credentials is not None: diff --git a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py index f30770852..c63bd80cb 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py @@ -1,7 +1,6 @@ """SparkStreamingDataset to load and save a PySpark Streaming DataFrame.""" from __future__ import annotations -from copy import deepcopy from pathlib import PurePosixPath from typing import Any @@ -87,12 +86,8 @@ def __init__( # noqa: PLR0913 self._filepath = PurePosixPath(filepath) # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} # Handle schema load argument self._schema = self._load_args.pop("schema", None) diff --git a/kedro-datasets/kedro_datasets/svmlight/svmlight_dataset.py b/kedro-datasets/kedro_datasets/svmlight/svmlight_dataset.py index 9d6818eaf..8e879dff7 100644 --- a/kedro-datasets/kedro_datasets/svmlight/svmlight_dataset.py +++ b/kedro-datasets/kedro_datasets/svmlight/svmlight_dataset.py @@ -91,6 +91,10 @@ class SVMLightDataset(AbstractVersionedDataset[_DI, _DO]): DEFAULT_LOAD_ARGS: dict[str, Any] = {} DEFAULT_SAVE_ARGS: dict[str, Any] = {} + DEFAULT_FS_ARGS: dict[str, Any] = { + "open_args_save": {"mode": "wb"}, + "open_args_load": {"mode": "rb"}, + } def __init__( # noqa: PLR0913 self, @@ -123,6 +127,8 @@ def __init__( # noqa: PLR0913 E.g. for ``GCSFileSystem`` it should look like `{"token": None}`. fs_args: Extra arguments to pass into underlying filesystem class constructor (e.g. `{"project": "my-project"}` for ``GCSFileSystem``). + All defaults are preserved, except `mode`, which is set to `rb` when loading + and to `wb` when saving. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ @@ -147,17 +153,17 @@ def __init__( # noqa: PLR0913 glob_function=self._fs.glob, ) - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) - - _fs_open_args_load.setdefault("mode", "rb") - _fs_open_args_save.setdefault("mode", "wb") - self._fs_open_args_load = _fs_open_args_load - self._fs_open_args_save = _fs_open_args_save + # Handle default load and save and fs arguments + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } def _describe(self): return { diff --git a/kedro-datasets/kedro_datasets/tensorflow/tensorflow_model_dataset.py b/kedro-datasets/kedro_datasets/tensorflow/tensorflow_model_dataset.py index 5a9bbc5a8..5c5dc27a1 100644 --- a/kedro-datasets/kedro_datasets/tensorflow/tensorflow_model_dataset.py +++ b/kedro-datasets/kedro_datasets/tensorflow/tensorflow_model_dataset.py @@ -129,12 +129,8 @@ def __init__( # noqa: PLR0913 self._tmp_prefix = "kedro_tensorflow_tmp" # temp prefix pattern # Handle default load and save arguments - self._load_args = copy.deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = copy.deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} self._is_h5 = self._save_args.get("save_format") == "h5" diff --git a/kedro-datasets/kedro_datasets/text/text_dataset.py b/kedro-datasets/kedro_datasets/text/text_dataset.py index 0432d066f..2f5c5684a 100644 --- a/kedro-datasets/kedro_datasets/text/text_dataset.py +++ b/kedro-datasets/kedro_datasets/text/text_dataset.py @@ -47,6 +47,11 @@ class TextDataset(AbstractVersionedDataset[str, str]): """ + DEFAULT_FS_ARGS: dict[str, Any] = { + "open_args_save": {"mode": "w"}, + "open_args_load": {"mode": "r"}, + } + def __init__( # noqa: PLR0913 self, *, @@ -102,10 +107,15 @@ def __init__( # noqa: PLR0913 glob_function=self._fs.glob, ) - _fs_open_args_load.setdefault("mode", "r") - _fs_open_args_save.setdefault("mode", "w") - self._fs_open_args_load = _fs_open_args_load - self._fs_open_args_save = _fs_open_args_save + # Handle default fs arguments + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } def _describe(self) -> dict[str, Any]: return { diff --git a/kedro-datasets/kedro_datasets/yaml/yaml_dataset.py b/kedro-datasets/kedro_datasets/yaml/yaml_dataset.py index 8e6325b68..b08060a12 100644 --- a/kedro-datasets/kedro_datasets/yaml/yaml_dataset.py +++ b/kedro-datasets/kedro_datasets/yaml/yaml_dataset.py @@ -54,6 +54,7 @@ class YAMLDataset(AbstractVersionedDataset[dict, dict]): """ DEFAULT_SAVE_ARGS: dict[str, Any] = {"default_flow_style": False} + DEFAULT_FS_ARGS: dict[str, Any] = {"open_args_save": {"mode": "w"}} def __init__( # noqa: PLR0913 self, @@ -89,8 +90,7 @@ def __init__( # noqa: PLR0913 `open_args_load` and `open_args_save`. Here you can find all available arguments for `open`: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open - All defaults are preserved, except `mode`, which is set to `r` when loading - and to `w` when saving. + All defaults are preserved, except `mode`, which is set to `w` when saving. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ @@ -116,13 +116,15 @@ def __init__( # noqa: PLR0913 ) # Handle default save arguments - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) - - _fs_open_args_save.setdefault("mode", "w") - self._fs_open_args_load = _fs_open_args_load - self._fs_open_args_save = _fs_open_args_save + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } def _describe(self) -> dict[str, Any]: return { diff --git a/kedro-datasets/kedro_datasets_experimental/netcdf/netcdf_dataset.py b/kedro-datasets/kedro_datasets_experimental/netcdf/netcdf_dataset.py index da83c1139..391d5521b 100644 --- a/kedro-datasets/kedro_datasets_experimental/netcdf/netcdf_dataset.py +++ b/kedro-datasets/kedro_datasets_experimental/netcdf/netcdf_dataset.py @@ -129,12 +129,8 @@ def __init__( # noqa self.metadata = metadata # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} # Determine if multiple NetCDF files are being loaded in. self._is_multifile = ( diff --git a/kedro-datasets/kedro_datasets_experimental/rioxarray/geotiff_dataset.py b/kedro-datasets/kedro_datasets_experimental/rioxarray/geotiff_dataset.py index 5b290888a..08fa52419 100644 --- a/kedro-datasets/kedro_datasets_experimental/rioxarray/geotiff_dataset.py +++ b/kedro-datasets/kedro_datasets_experimental/rioxarray/geotiff_dataset.py @@ -3,7 +3,6 @@ returns a xarray.DataArray object. """ import logging -from copy import deepcopy from pathlib import PurePosixPath from typing import Any @@ -106,12 +105,8 @@ def __init__( # noqa: PLR0913 ) # Handle default load and save arguments - self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} def _describe(self) -> dict[str, Any]: return { diff --git a/kedro-datasets/tests/matlab/test_matlab_dataset.py b/kedro-datasets/tests/matlab/test_matlab_dataset.py index a7a935962..331702db9 100644 --- a/kedro-datasets/tests/matlab/test_matlab_dataset.py +++ b/kedro-datasets/tests/matlab/test_matlab_dataset.py @@ -41,7 +41,7 @@ def test_save_and_load(self, matlab_dataset, dummy_data): reloaded = matlab_dataset.load() assert (dummy_data == reloaded["data"]).all() assert matlab_dataset._fs_open_args_load == {} - assert matlab_dataset._fs_open_args_save == {"mode": "w"} + assert matlab_dataset._fs_open_args_save == {"mode": "wb"} def test_exists(self, matlab_dataset, dummy_data): """Test `exists` method invocation for both existing and @@ -65,7 +65,7 @@ def test_save_extra_params(self, matlab_dataset, save_args): ) def test_open_extra_args(self, matlab_dataset, fs_args): assert matlab_dataset._fs_open_args_load == fs_args["open_args_load"] - assert matlab_dataset._fs_open_args_save == {"mode": "w"} # default unchanged + assert matlab_dataset._fs_open_args_save == {"mode": "wb"} # default unchanged def test_load_missing_file(self, matlab_dataset): """Check the error when trying to load missing file.""" diff --git a/kedro-datasets/tests/pandas/test_csv_dataset.py b/kedro-datasets/tests/pandas/test_csv_dataset.py index 0954b4b9b..6a5c52464 100644 --- a/kedro-datasets/tests/pandas/test_csv_dataset.py +++ b/kedro-datasets/tests/pandas/test_csv_dataset.py @@ -115,6 +115,16 @@ def test_save_extra_params(self, csv_dataset, save_args): for key, value in save_args.items(): assert csv_dataset._save_args[key] == value + @pytest.mark.parametrize( + "fs_args", + [{"open_args_load": {"k1": "v1"}, "open_args_save": {"index": "value"}}], + indirect=True, + ) + def test_fs_extra_params(self, csv_dataset, fs_args): + """Test overriding the default fs arguments.""" + assert csv_dataset._fs_open_args_load == {"k1": "v1"} + assert csv_dataset._fs_open_args_save == {"mode": "w", "index": "value"} + @pytest.mark.parametrize( "load_args,save_args", [ diff --git a/kedro-datasets/tests/pandas/test_feather_dataset.py b/kedro-datasets/tests/pandas/test_feather_dataset.py index 317921258..38d1f0e31 100644 --- a/kedro-datasets/tests/pandas/test_feather_dataset.py +++ b/kedro-datasets/tests/pandas/test_feather_dataset.py @@ -58,6 +58,16 @@ def test_load_extra_params(self, feather_dataset, load_args): for key, value in load_args.items(): assert feather_dataset._load_args[key] == value + @pytest.mark.parametrize( + "fs_args", + [{"open_args_load": {"k1": "v1"}, "open_args_save": {"index": "value"}}], + indirect=True, + ) + def test_fs_extra_params(self, feather_dataset, fs_args): + """Test overriding the default fs arguments.""" + assert feather_dataset._fs_open_args_load == {"k1": "v1"} + assert feather_dataset._fs_open_args_save == {"index": "value", "mode": "wb"} + @pytest.mark.parametrize( "load_args,save_args", [ diff --git a/kedro-datasets/tests/pandas/test_json_dataset.py b/kedro-datasets/tests/pandas/test_json_dataset.py index 04536c20d..20f0a1e21 100644 --- a/kedro-datasets/tests/pandas/test_json_dataset.py +++ b/kedro-datasets/tests/pandas/test_json_dataset.py @@ -83,6 +83,16 @@ def test_save_extra_params(self, json_dataset, save_args): for key, value in save_args.items(): assert json_dataset._save_args[key] == value + @pytest.mark.parametrize( + "fs_args", + [{"open_args_load": {"k1": "v1"}, "open_args_save": {"index": "value"}}], + indirect=True, + ) + def test_fs_extra_params(self, json_dataset, fs_args): + """Test overriding the default fs arguments.""" + assert json_dataset._fs_open_args_load == {"k1": "v1"} + assert json_dataset._fs_open_args_save == {"index": "value", "mode": "w"} + @pytest.mark.parametrize( "load_args,save_args", [ diff --git a/kedro-datasets/tests/spark/test_spark_streaming_dataset.py b/kedro-datasets/tests/spark/test_spark_streaming_dataset.py index 6ac8d8b0b..330c8d10d 100644 --- a/kedro-datasets/tests/spark/test_spark_streaming_dataset.py +++ b/kedro-datasets/tests/spark/test_spark_streaming_dataset.py @@ -94,7 +94,7 @@ def test_load(self, tmp_path, sample_spark_streaming_df): schema_path = (tmp_path / SCHEMA_FILE_NAME).as_posix() spark_json_ds = SparkDataset( - filepath=filepath, file_format="json", save_args=[{"mode", "overwrite"}] + filepath=filepath, file_format="json", save_args={"mode": "overwrite"} ) spark_json_ds.save(sample_spark_streaming_df) @@ -115,7 +115,7 @@ def test_load_options_schema_path_with_credentials( schema_path = (tmp_path / SCHEMA_FILE_NAME).as_posix() spark_json_ds = SparkDataset( - filepath=filepath, file_format="json", save_args=[{"mode", "overwrite"}] + filepath=filepath, file_format="json", save_args={"mode": "overwrite"} ) spark_json_ds.save(sample_spark_streaming_df) @@ -144,7 +144,7 @@ def test_save(self, tmp_path, sample_spark_streaming_df): spark_json_ds = SparkDataset( filepath=filepath_json, file_format="json", - save_args=[{"mode", "overwrite"}], + save_args={"mode": "overwrite"}, ) spark_json_ds.save(sample_spark_streaming_df)