From 2dd0e37f44fe02289eda4261ea0ddab62d3910dd Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Fri, 16 Feb 2024 12:05:09 +0100 Subject: [PATCH 01/12] reference process description https://github.com/Open-EO/openeo-geopyspark-driver/issues/676 --- openeo_driver/specs/openeo-processes/2.x | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openeo_driver/specs/openeo-processes/2.x b/openeo_driver/specs/openeo-processes/2.x index 965bbaeb..3ed4396e 160000 --- a/openeo_driver/specs/openeo-processes/2.x +++ b/openeo_driver/specs/openeo-processes/2.x @@ -1 +1 @@ -Subproject commit 965bbaebd4d5984203a0437076c85a66a72a23e0 +Subproject commit 3ed4396e9859acb772b40ccc549ab648d1f77417 From 5f4cc9b522f60cf48e6cd3ba9cc924e0cd5dd493 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Wed, 21 Feb 2024 14:28:32 +0100 Subject: [PATCH 02/12] support export_workspace https://github.com/Open-EO/openeo-geopyspark-driver/issues/676 --- openeo_driver/ProcessGraphDeserializer.py | 18 ++++++++++-- openeo_driver/config/config.py | 7 +++-- openeo_driver/save_result.py | 35 +++++++++++++++++++++-- openeo_driver/workspace.py | 21 ++++++++++++++ 4 files changed, 74 insertions(+), 7 deletions(-) create mode 100644 openeo_driver/workspace.py diff --git a/openeo_driver/ProcessGraphDeserializer.py b/openeo_driver/ProcessGraphDeserializer.py index b77f7436..f2d4092c 100644 --- a/openeo_driver/ProcessGraphDeserializer.py +++ b/openeo_driver/ProcessGraphDeserializer.py @@ -730,7 +730,7 @@ def apply_dimension(args: ProcessArgs, env: EvalEnv) -> DriverDataCube: @process -def save_result(args: Dict, env: EvalEnv) -> SaveResult: +def save_result(args: Dict, env: EvalEnv) -> SaveResult: # TODO: return type no longer holds data = extract_arg(args, 'data') format = extract_arg(args, 'format') options = args.get('options', {}) @@ -738,7 +738,8 @@ def save_result(args: Dict, env: EvalEnv) -> SaveResult: if isinstance(data, SaveResult): # TODO: Is this an expected code path? `save_result` should be terminal node in a graph # so chaining `save_result` calls should not be valid - data.set_format(format, options) + # https://github.com/Open-EO/openeo-geopyspark-driver/issues/295 + data = data.with_format(format, options) if ENV_SAVE_RESULT in env: env[ENV_SAVE_RESULT].append(data) return data @@ -2205,6 +2206,19 @@ def if_(value: Union[bool, None], accept, reject=None): return accept if value else reject +# TODO: is it ok to update the submodule? +@process_registry_2xx.add_function(spec=read_spec("openeo-processes/2.x/proposals/export_workspace.json")) +def export_workspace(args: ProcessArgs, env: EvalEnv) -> SaveResult: + data = args.get_required("data") + workspace_id = args.get_required("workspace", expected_type=str) + merge = args.get_optional("merge", expected_type=str) + + result = to_save_result(data) + result.add_workspace_export(workspace_id, merge=merge) + + return result + + # Finally: register some fallback implementation if possible _register_fallback_implementations_by_process_graph(process_registry_100) _register_fallback_implementations_by_process_graph(process_registry_2xx) diff --git a/openeo_driver/config/config.py b/openeo_driver/config/config.py index 96008290..c8f41e1f 100644 --- a/openeo_driver/config/config.py +++ b/openeo_driver/config/config.py @@ -7,6 +7,7 @@ from openeo_driver.server import build_backend_deploy_metadata from openeo_driver.urlsigning import UrlSigner from openeo_driver.users.oidc import OidcProvider +from openeo_driver.workspace import Workspace class ConfigException(ValueError): @@ -69,5 +70,7 @@ class OpenEoBackendConfig: url_signer: Optional[UrlSigner] = None - collection_exclusion_list: Dict[str,List[str]] = {} # e.g. {"1.1.0":["my_collection_id"]} - processes_exclusion_list: Dict[str,List[str]] = {} # e.g. {"1.1.0":["my_process_id"]} \ No newline at end of file + collection_exclusion_list: Dict[str, List[str]] = {} # e.g. {"1.1.0":["my_collection_id"]} + processes_exclusion_list: Dict[str, List[str]] = {} # e.g. {"1.1.0":["my_process_id"]} + + workspaces: Dict[str, Workspace] = attrs.Factory(dict) diff --git a/openeo_driver/save_result.py b/openeo_driver/save_result.py index c505fb87..8816699a 100644 --- a/openeo_driver/save_result.py +++ b/openeo_driver/save_result.py @@ -1,3 +1,4 @@ +import copy import glob import os import re @@ -5,7 +6,7 @@ import warnings import logging from pathlib import Path -from shutil import copy +import shutil from tempfile import mkstemp from typing import Union, Dict, List, Optional, Any from zipfile import ZipFile @@ -44,6 +45,7 @@ class SaveResult: def __init__(self, format: Optional[str] = None, options: Optional[dict] = None): self.format = format or self.DEFAULT_FORMAT self.options = options or {} + self._workspace_exports = [] def is_format(self, *args): return self.format.lower() in {f.lower() for f in args} @@ -52,6 +54,12 @@ def set_format(self, format: str, options: dict = None): self.format = format.lower() self.options = options or {} + def with_format(self, format: str, options: dict = None) -> 'SaveResult': + shallow_copy = copy.copy(self) + shallow_copy.format = format.lower() + shallow_copy.options = options or {} + return shallow_copy + def write_assets(self, directory: Union[str, Path]) -> Dict[str, StacAsset]: raise NotImplementedError @@ -81,6 +89,27 @@ def flask_response_from_write_assets(self) -> Response: def get_mimetype(self, default="application/octet-stream"): return IOFORMATS.get_mimetype(self.format) + def add_workspace_export(self, workspace_id: str, merge: Optional[str]): + # TODO: should probably return a copy as well (~ with_format) + self._workspace_exports.append(dict(workspace_id=workspace_id, merge=merge)) + + def export_workspace(self, source_files: List[Path], default_merge: str): + # TODO: move this somewhere else? + from openeo_driver.config import get_backend_config + + for export in self._workspace_exports: + workspace = get_backend_config().workspaces[export["workspace_id"]] + + for source_file in source_files: + merge = export["merge"] + + if merge is None: + merge = default_merge + elif merge == "": + merge = "." + + workspace.write(source_file, merge) + def get_temp_file(suffix="", prefix="openeo-pydrvr-"): # TODO: make sure temp files are cleaned up when read @@ -596,7 +625,7 @@ def to_csv(self, destination=None): if(destination == None): return csv_paths[0] else: - copy(csv_paths[0],destination) + shutil.copy(csv_paths[0], destination) return destination @@ -692,7 +721,7 @@ def write_assets(self, directory: Union[str, Path]) -> Dict[str, StacAsset]: filename = str(directory / "timeseries.csv") asset["type"] = IOFORMATS.get_mimetype(self.format) - copy(self._csv_path(), filename) + shutil.copy(self._csv_path(), filename) elif self.is_format("parquet"): filename = str(directory / "timeseries.parquet") asset["type"] = IOFORMATS.get_mimetype(self.format) diff --git a/openeo_driver/workspace.py b/openeo_driver/workspace.py new file mode 100644 index 00000000..bab363fa --- /dev/null +++ b/openeo_driver/workspace.py @@ -0,0 +1,21 @@ +import abc +import shutil +from abc import ABC +from pathlib import Path + + +class Workspace(ABC): + @abc.abstractmethod + def write(self, source_file: str, merge: str): + raise NotImplementedError + + +class DiskWorkspace(Workspace): + def __init__(self, root_directory: Path): + self.root_directory = root_directory + + def write(self, + source_file: str, # TODO: assumes source files are on disk + merge: str): + # TODO: create missing directories? + shutil.copy(source_file, self.root_directory / merge) From b0a1a229c3e5de70269b3cd430934350bd601e88 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Fri, 23 Feb 2024 19:37:12 +0100 Subject: [PATCH 03/12] work around save_result not returning a SaveResult https://github.com/Open-EO/openeo-geopyspark-driver/issues/676 --- openeo_driver/ProcessGraphDeserializer.py | 11 +++++++++-- openeo_driver/save_result.py | 14 ++++++-------- openeo_driver/workspace.py | 10 +++++----- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/openeo_driver/ProcessGraphDeserializer.py b/openeo_driver/ProcessGraphDeserializer.py index 2de0c16f..2a5f6ed4 100644 --- a/openeo_driver/ProcessGraphDeserializer.py +++ b/openeo_driver/ProcessGraphDeserializer.py @@ -2217,9 +2217,16 @@ def export_workspace(args: ProcessArgs, env: EvalEnv) -> SaveResult: workspace_id = args.get_required("workspace", expected_type=str) merge = args.get_optional("merge", expected_type=str) - result = to_save_result(data) - result.add_workspace_export(workspace_id, merge=merge) + if isinstance(data, SaveResult): + result = data + else: + # TODO: work around save_result returning a data cube instead of a SaveResult (#295) + results = env[ENV_SAVE_RESULT] + if len(results) != 1: + raise FeatureUnsupportedException("only process graphs with a single save_result node are supported") + result = results[0] + result.add_workspace_export(workspace_id, merge=merge) return result diff --git a/openeo_driver/save_result.py b/openeo_driver/save_result.py index 6f10251b..1df77ef9 100644 --- a/openeo_driver/save_result.py +++ b/openeo_driver/save_result.py @@ -9,7 +9,7 @@ from pathlib import Path import shutil from tempfile import mkstemp -from typing import Union, Dict, List, Optional, Any +from typing import Callable, Union, Dict, List, Optional, Any from zipfile import ZipFile import geopandas as gpd @@ -28,6 +28,7 @@ from openeo_driver.errors import OpenEOApiException, FeatureUnsupportedException, InternalException from openeo_driver.util.ioformats import IOFORMATS from openeo_driver.utils import replace_nan_values +from openeo_driver.workspace import Workspace _log = logging.getLogger(__name__) @@ -94,14 +95,11 @@ def add_workspace_export(self, workspace_id: str, merge: Optional[str]): # TODO: should probably return a copy as well (~ with_format) self._workspace_exports.append(dict(workspace_id=workspace_id, merge=merge)) - def export_workspace(self, source_files: List[Path], default_merge: str): - # TODO: move this somewhere else? - from openeo_driver.config import get_backend_config - + def export_workspace(self, get_workspace_by_id: Callable[[str], Workspace], files: List[Path], default_merge: str): for export in self._workspace_exports: - workspace = get_backend_config().workspaces[export["workspace_id"]] + workspace = get_workspace_by_id(export["workspace_id"]) - for source_file in source_files: + for file in files: merge = export["merge"] if merge is None: @@ -109,7 +107,7 @@ def export_workspace(self, source_files: List[Path], default_merge: str): elif merge == "": merge = "." - workspace.write(source_file, merge) + workspace.import_file(file, merge) def get_temp_file(suffix="", prefix="openeo-pydrvr-"): diff --git a/openeo_driver/workspace.py b/openeo_driver/workspace.py index bab363fa..54e6fd95 100644 --- a/openeo_driver/workspace.py +++ b/openeo_driver/workspace.py @@ -6,7 +6,7 @@ class Workspace(ABC): @abc.abstractmethod - def write(self, source_file: str, merge: str): + def import_file(self, file: Path, merge: str): raise NotImplementedError @@ -14,8 +14,8 @@ class DiskWorkspace(Workspace): def __init__(self, root_directory: Path): self.root_directory = root_directory - def write(self, - source_file: str, # TODO: assumes source files are on disk - merge: str): + def import_file(self, + file: Path, + merge: str): # TODO: create missing directories? - shutil.copy(source_file, self.root_directory / merge) + shutil.copy(file, self.root_directory / merge) From b1f9c8349e986974f3e44dde0c1d071d2235f24b Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Mon, 26 Feb 2024 15:13:46 +0100 Subject: [PATCH 04/12] revert submodule update in favor of copy https://github.com/Open-EO/openeo-geopyspark-driver/issues/676 --- openeo_driver/ProcessGraphDeserializer.py | 3 +- openeo_driver/specs/openeo-processes/2.x | 2 +- .../experimental/export_workspace.json | 48 +++++++++++++++++++ 3 files changed, 50 insertions(+), 3 deletions(-) create mode 100644 openeo_driver/specs/openeo-processes/experimental/export_workspace.json diff --git a/openeo_driver/ProcessGraphDeserializer.py b/openeo_driver/ProcessGraphDeserializer.py index 2a5f6ed4..38bc9ed2 100644 --- a/openeo_driver/ProcessGraphDeserializer.py +++ b/openeo_driver/ProcessGraphDeserializer.py @@ -2210,8 +2210,7 @@ def if_(value: Union[bool, None], accept, reject=None): return accept if value else reject -# TODO: is it ok to update the submodule? -@process_registry_2xx.add_function(spec=read_spec("openeo-processes/2.x/proposals/export_workspace.json")) +@process_registry_2xx.add_function(spec=read_spec("openeo-processes/experimental/export_workspace.json")) def export_workspace(args: ProcessArgs, env: EvalEnv) -> SaveResult: data = args.get_required("data") workspace_id = args.get_required("workspace", expected_type=str) diff --git a/openeo_driver/specs/openeo-processes/2.x b/openeo_driver/specs/openeo-processes/2.x index 3ed4396e..965bbaeb 160000 --- a/openeo_driver/specs/openeo-processes/2.x +++ b/openeo_driver/specs/openeo-processes/2.x @@ -1 +1 @@ -Subproject commit 3ed4396e9859acb772b40ccc549ab648d1f77417 +Subproject commit 965bbaebd4d5984203a0437076c85a66a72a23e0 diff --git a/openeo_driver/specs/openeo-processes/experimental/export_workspace.json b/openeo_driver/specs/openeo-processes/experimental/export_workspace.json new file mode 100644 index 00000000..c469254f --- /dev/null +++ b/openeo_driver/specs/openeo-processes/experimental/export_workspace.json @@ -0,0 +1,48 @@ +{ + "id": "export_workspace", + "summary": "Export data to a cloud user workspace", + "description": "Exports the given processing results made available through a STAC resource (e.g., a STAC Collection) to the given user workspace. The STAC resource itself is exported with all STAC resources and assets underneath.", + "categories": [ + "export", + "stac" + ], + "experimental": true, + "parameters": [ + { + "name": "data", + "description": "The data to export to the user workspace as a STAC resource.", + "schema": { + "type": "object", + "subtype": "stac" + } + }, + { + "name": "workspace", + "description": "The identifier of the workspace to export to.", + "schema": { + "type": "string", + "pattern": "^[\\w\\-\\.~]+$", + "subtype": "workspace-id" + } + }, + { + "name": "merge", + "description": "Provides a cloud-specific path identifier to a STAC resource to merge the given STAC resource into. If not provided, the STAC resource is kept separate from any other STAC resources in the workspace.", + "schema": { + "type": [ + "string", + "null" + ] + }, + "optional": true, + "default": null + } + ], + "returns": { + "description": "Returns the potentially updated STAC resource.", + "schema": { + "type": "object", + "subtype": "stac" + } + } +} From bf08f85b00dc22dbed6e0d41877e6046a541dff1 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Mon, 26 Feb 2024 16:33:41 +0100 Subject: [PATCH 05/12] test export_workspace process https://github.com/Open-EO/openeo-geopyspark-driver/issues/676 --- tests/test_dry_run.py | 53 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/tests/test_dry_run.py b/tests/test_dry_run.py index f5e3e295..000e136c 100644 --- a/tests/test_dry_run.py +++ b/tests/test_dry_run.py @@ -1,5 +1,7 @@ +from pathlib import Path import pytest import shapely.geometry +from unittest import mock from openeo.internal.graph_building import PGNode from openeo.rest.datacube import DataCube @@ -14,6 +16,7 @@ from openeo_driver.testing import DictSubSet, approxify from openeo_driver.util.geometry import as_geojson_feature_collection from openeo_driver.utils import EvalEnv +from openeo_driver.workspace import Workspace from tests.data import get_path, load_json @@ -1726,3 +1729,53 @@ def test_invalid_latlon_in_geojson(dry_run_env): } cube = init_cube.filter_spatial(geometries=multipoint_many_coordinates) evaluate(cube.flat_graph(), env=dry_run_env) + + +@pytest.mark.parametrize("merge,expected_workspace_path", [ + ("some-path", "some-path"), + ("", "."), + (None, "/some/unique/path") +]) +def test_export_workspace(dry_run_tracer, backend_implementation, merge, expected_workspace_path): + mock_workspace = mock.Mock(spec=Workspace) + + dry_run_env = EvalEnv({ + ENV_DRY_RUN_TRACER: dry_run_tracer, + "backend_implementation": backend_implementation, + "version": "2.0.0" + }) + + pg = { + "loadcollection1": { + "process_id": "load_collection", + "arguments": {"id": "S2_FOOBAR"}, + }, + "saveresult1": { + "process_id": "save_result", + "arguments": { + "data": {"from_node": "loadcollection1"}, + "format": "GTiff", + }, + }, + "exportworkspace1": { + "process_id": "export_workspace", + "arguments": { + "data": {"from_node": "saveresult1"}, + "workspace": "some-workspace", + "merge": merge, + }, + "result": True, + } + } + + save_result = evaluate(pg, env=dry_run_env) + + assert save_result.is_format("GTiff") + + save_result.export_workspace(lambda workspace_id: mock_workspace, + files=[Path("file1"), Path("file2")], + default_merge="/some/unique/path") + mock_workspace.import_file.assert_has_calls([ + mock.call(Path("file1"), expected_workspace_path), + mock.call(Path("file2"), expected_workspace_path), + ]) From 6eb3e44044f18f62b9d0a53faf4996118ea144b4 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Tue, 27 Feb 2024 08:56:38 +0100 Subject: [PATCH 06/12] test SaveResult changes https://github.com/Open-EO/openeo-geopyspark-driver/issues/676 --- openeo_driver/save_result.py | 4 ++-- tests/test_save_result.py | 22 ++++++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/openeo_driver/save_result.py b/openeo_driver/save_result.py index 1df77ef9..56b0888f 100644 --- a/openeo_driver/save_result.py +++ b/openeo_driver/save_result.py @@ -53,12 +53,12 @@ def is_format(self, *args): return self.format.lower() in {f.lower() for f in args} def set_format(self, format: str, options: dict = None): - self.format = format.lower() + self.format = format self.options = options or {} def with_format(self, format: str, options: dict = None) -> 'SaveResult': shallow_copy = copy.copy(self) - shallow_copy.format = format.lower() + shallow_copy.format = format shallow_copy.options = options or {} return shallow_copy diff --git a/tests/test_save_result.py b/tests/test_save_result.py index b8d57362..0cad192a 100644 --- a/tests/test_save_result.py +++ b/tests/test_save_result.py @@ -1,6 +1,7 @@ import datetime import json from pathlib import Path +from unittest import mock import geopandas as gpd import numpy as np @@ -11,6 +12,7 @@ from openeo_driver.datacube import DriverVectorCube from openeo_driver.save_result import AggregatePolygonResult, SaveResult, AggregatePolygonSpatialResult, \ AggregatePolygonResultCSV, JSONResult +from openeo_driver.workspace import Workspace from .data import load_json, json_normalize, get_path @@ -19,6 +21,7 @@ Polygon([(6, 1), (1, 7), (9, 9)]) ]) + def test_is_format(): r = SaveResult("GTiff") assert r.is_format("gtiff") @@ -26,6 +29,25 @@ def test_is_format(): assert not r.is_format("geotiff") +def test_with_format(): + g = SaveResult("GTiff", options={"ZLEVEL": 9}) + n = g.with_format("netCDF", options={}) + + assert (g.format, g.options) == ("GTiff", {"ZLEVEL": 9}) + assert (n.format, n.options) == ("netCDF", {}) + + +def test_export_workspace(): + mock_workspace = mock.Mock(spec=Workspace) + + r = SaveResult() + r.add_workspace_export(workspace_id="some-workspace", merge="some/path") + r.export_workspace(get_workspace_by_id=lambda workspace_id: mock_workspace, files=[Path("/some/file")], + default_merge="some/unique/path") + + mock_workspace.import_file.assert_called_with(Path("/some/file"), "some/path") + + def test_aggregate_polygon_result_basic(): timeseries = { "2019-10-15T08:15:45Z": [[1, 2, 3], [4, 5, 6]], From 5baa4236af0bb1c61049be6907f69b80dfa6baaf Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Tue, 27 Feb 2024 13:30:20 +0100 Subject: [PATCH 07/12] more SaveResult tests https://github.com/Open-EO/openeo-geopyspark-driver/issues/676 --- openeo_driver/save_result.py | 3 ++- tests/test_dry_run.py | 13 ++++--------- tests/test_save_result.py | 13 +++++++++---- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/openeo_driver/save_result.py b/openeo_driver/save_result.py index 56b0888f..1cde7849 100644 --- a/openeo_driver/save_result.py +++ b/openeo_driver/save_result.py @@ -92,7 +92,8 @@ def get_mimetype(self, default="application/octet-stream"): return IOFORMATS.get_mimetype(self.format) def add_workspace_export(self, workspace_id: str, merge: Optional[str]): - # TODO: should probably return a copy as well (~ with_format) + # TODO: should probably return a copy (like with_format) but does not work well with evaluate() returning + # results stored in env[ENV_SAVE_RESULT] instead of what ultimately comes out of the process graph. self._workspace_exports.append(dict(workspace_id=workspace_id, merge=merge)) def export_workspace(self, get_workspace_by_id: Callable[[str], Workspace], files: List[Path], default_merge: str): diff --git a/tests/test_dry_run.py b/tests/test_dry_run.py index 000e136c..df27acdd 100644 --- a/tests/test_dry_run.py +++ b/tests/test_dry_run.py @@ -1731,12 +1731,7 @@ def test_invalid_latlon_in_geojson(dry_run_env): evaluate(cube.flat_graph(), env=dry_run_env) -@pytest.mark.parametrize("merge,expected_workspace_path", [ - ("some-path", "some-path"), - ("", "."), - (None, "/some/unique/path") -]) -def test_export_workspace(dry_run_tracer, backend_implementation, merge, expected_workspace_path): +def test_export_workspace(dry_run_tracer, backend_implementation): mock_workspace = mock.Mock(spec=Workspace) dry_run_env = EvalEnv({ @@ -1762,7 +1757,7 @@ def test_export_workspace(dry_run_tracer, backend_implementation, merge, expecte "arguments": { "data": {"from_node": "saveresult1"}, "workspace": "some-workspace", - "merge": merge, + "merge": "some/path", }, "result": True, } @@ -1776,6 +1771,6 @@ def test_export_workspace(dry_run_tracer, backend_implementation, merge, expecte files=[Path("file1"), Path("file2")], default_merge="/some/unique/path") mock_workspace.import_file.assert_has_calls([ - mock.call(Path("file1"), expected_workspace_path), - mock.call(Path("file2"), expected_workspace_path), + mock.call(Path("file1"), "some/path"), + mock.call(Path("file2"), "some/path"), ]) diff --git a/tests/test_save_result.py b/tests/test_save_result.py index 0cad192a..3986a57a 100644 --- a/tests/test_save_result.py +++ b/tests/test_save_result.py @@ -37,15 +37,20 @@ def test_with_format(): assert (n.format, n.options) == ("netCDF", {}) -def test_export_workspace(): +@pytest.mark.parametrize(["merge", "expected_workspace_path"], [ + ("some/path", "some/path"), + ("", "."), + (None, "/some/unique/path") +]) +def test_export_workspace(merge, expected_workspace_path): mock_workspace = mock.Mock(spec=Workspace) r = SaveResult() - r.add_workspace_export(workspace_id="some-workspace", merge="some/path") + r.add_workspace_export(workspace_id="some-workspace", merge=merge) r.export_workspace(get_workspace_by_id=lambda workspace_id: mock_workspace, files=[Path("/some/file")], - default_merge="some/unique/path") + default_merge="/some/unique/path") - mock_workspace.import_file.assert_called_with(Path("/some/file"), "some/path") + mock_workspace.import_file.assert_called_with(Path("/some/file"), expected_workspace_path) def test_aggregate_polygon_result_basic(): From 93142bb10dde055d6cb76c19483ffe761ec614a4 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Tue, 27 Feb 2024 15:03:25 +0100 Subject: [PATCH 08/12] test DiskWorkspace https://github.com/Open-EO/openeo-geopyspark-driver/issues/676 --- openeo_driver/workspace.py | 10 ++++++++-- tests/test_workspace.py | 24 ++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) create mode 100644 tests/test_workspace.py diff --git a/openeo_driver/workspace.py b/openeo_driver/workspace.py index 54e6fd95..1a709bc1 100644 --- a/openeo_driver/workspace.py +++ b/openeo_driver/workspace.py @@ -1,4 +1,5 @@ import abc +import os.path import shutil from abc import ABC from pathlib import Path @@ -17,5 +18,10 @@ def __init__(self, root_directory: Path): def import_file(self, file: Path, merge: str): - # TODO: create missing directories? - shutil.copy(file, self.root_directory / merge) + merge = os.path.normpath(merge) + subdirectory = merge[1:] if merge.startswith("/") else merge + target_directory = self.root_directory / subdirectory + target_directory.relative_to(self.root_directory) # assert target_directory is in root_directory + + target_directory.mkdir(parents=True, exist_ok=True) + shutil.copy(file, target_directory) diff --git a/tests/test_workspace.py b/tests/test_workspace.py new file mode 100644 index 00000000..475416fc --- /dev/null +++ b/tests/test_workspace.py @@ -0,0 +1,24 @@ +import os +from pathlib import Path + +import pytest + +from openeo_driver.workspace import DiskWorkspace + + +@pytest.mark.parametrize("merge", [ + "subdirectory", + "/subdirectory", + "path/to/subdirectory", + "/path/to/subdirectory", +]) +def test_disk_workspace(tmp_path, merge): + workspace = DiskWorkspace(root_directory=tmp_path) + + subdirectory = merge[1:] if merge.startswith("/") else merge + target_directory = tmp_path / subdirectory + + input_file = Path(__file__) + workspace.import_file(file=input_file, merge=merge) + + assert "test_workspace.py" in os.listdir(target_directory) From d7edd2006b1b6873b9c51293b28b2f2d092aee3c Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Tue, 27 Feb 2024 15:19:49 +0100 Subject: [PATCH 09/12] test DiskWorkspace actually supports "." https://github.com/Open-EO/openeo-geopyspark-driver/issues/676 --- tests/test_workspace.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_workspace.py b/tests/test_workspace.py index 475416fc..98489779 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -11,6 +11,7 @@ "/subdirectory", "path/to/subdirectory", "/path/to/subdirectory", + ".", ]) def test_disk_workspace(tmp_path, merge): workspace = DiskWorkspace(root_directory=tmp_path) From 0d023bf0a1e518ae25a0236b2a931e5a3f1a58af Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Tue, 27 Feb 2024 15:55:15 +0100 Subject: [PATCH 10/12] refactor: introduce WorkspaceRepository https://github.com/Open-EO/openeo-geopyspark-driver/issues/676 --- openeo_driver/dummy/dummy_config.py | 4 ++++ openeo_driver/save_result.py | 8 ++++---- openeo_driver/workspace.py | 3 +-- openeo_driver/workspacerepository.py | 18 ++++++++++++++++++ tests/test_dry_run.py | 6 ++++-- tests/test_save_result.py | 7 ++++--- tests/test_workspacerepository.py | 17 +++++++++++++++++ 7 files changed, 52 insertions(+), 11 deletions(-) create mode 100644 openeo_driver/workspacerepository.py create mode 100644 tests/test_workspacerepository.py diff --git a/openeo_driver/dummy/dummy_config.py b/openeo_driver/dummy/dummy_config.py index eefc0211..3ca74784 100644 --- a/openeo_driver/dummy/dummy_config.py +++ b/openeo_driver/dummy/dummy_config.py @@ -1,6 +1,9 @@ +from pathlib import Path + from openeo_driver.config import OpenEoBackendConfig from openeo_driver.server import build_backend_deploy_metadata from openeo_driver.users.oidc import OidcProvider +from openeo_driver.workspace import DiskWorkspace oidc_providers = [ OidcProvider( @@ -75,4 +78,5 @@ def _valid_basic_auth(username: str, password: str) -> bool: oidc_providers=oidc_providers, enable_basic_auth=True, valid_basic_auth=_valid_basic_auth, + workspaces={"tmp": DiskWorkspace(root_directory=Path("/tmp"))}, ) diff --git a/openeo_driver/save_result.py b/openeo_driver/save_result.py index 1cde7849..3cc84f4c 100644 --- a/openeo_driver/save_result.py +++ b/openeo_driver/save_result.py @@ -9,7 +9,7 @@ from pathlib import Path import shutil from tempfile import mkstemp -from typing import Callable, Union, Dict, List, Optional, Any +from typing import Union, Dict, List, Optional, Any from zipfile import ZipFile import geopandas as gpd @@ -28,7 +28,7 @@ from openeo_driver.errors import OpenEOApiException, FeatureUnsupportedException, InternalException from openeo_driver.util.ioformats import IOFORMATS from openeo_driver.utils import replace_nan_values -from openeo_driver.workspace import Workspace +from openeo_driver.workspacerepository import WorkspaceRepository _log = logging.getLogger(__name__) @@ -96,9 +96,9 @@ def add_workspace_export(self, workspace_id: str, merge: Optional[str]): # results stored in env[ENV_SAVE_RESULT] instead of what ultimately comes out of the process graph. self._workspace_exports.append(dict(workspace_id=workspace_id, merge=merge)) - def export_workspace(self, get_workspace_by_id: Callable[[str], Workspace], files: List[Path], default_merge: str): + def export_workspace(self, workspace_repository: WorkspaceRepository, files: List[Path], default_merge: str): for export in self._workspace_exports: - workspace = get_workspace_by_id(export["workspace_id"]) + workspace = workspace_repository.get_by_id(export["workspace_id"]) for file in files: merge = export["merge"] diff --git a/openeo_driver/workspace.py b/openeo_driver/workspace.py index 1a709bc1..a91cb7f7 100644 --- a/openeo_driver/workspace.py +++ b/openeo_driver/workspace.py @@ -1,11 +1,10 @@ import abc import os.path import shutil -from abc import ABC from pathlib import Path -class Workspace(ABC): +class Workspace(abc.ABC): @abc.abstractmethod def import_file(self, file: Path, merge: str): raise NotImplementedError diff --git a/openeo_driver/workspacerepository.py b/openeo_driver/workspacerepository.py new file mode 100644 index 00000000..5102e9ce --- /dev/null +++ b/openeo_driver/workspacerepository.py @@ -0,0 +1,18 @@ +import abc + +from openeo_driver.config import get_backend_config +from openeo_driver.workspace import Workspace + + +class WorkspaceRepository(abc.ABC): + @abc.abstractmethod + def get_by_id(self, workspace_id: str) -> Workspace: + raise NotImplementedError + + +class BackendConfigWorkspaceRepository(WorkspaceRepository): + def get_by_id(self, workspace_id: str) -> Workspace: + return get_backend_config().workspaces[workspace_id] + + +backend_config_workspace_repository = BackendConfigWorkspaceRepository() diff --git a/tests/test_dry_run.py b/tests/test_dry_run.py index df27acdd..554bd5f3 100644 --- a/tests/test_dry_run.py +++ b/tests/test_dry_run.py @@ -17,6 +17,7 @@ from openeo_driver.util.geometry import as_geojson_feature_collection from openeo_driver.utils import EvalEnv from openeo_driver.workspace import Workspace +from openeo_driver.workspacerepository import WorkspaceRepository from tests.data import get_path, load_json @@ -1732,7 +1733,8 @@ def test_invalid_latlon_in_geojson(dry_run_env): def test_export_workspace(dry_run_tracer, backend_implementation): - mock_workspace = mock.Mock(spec=Workspace) + mock_workspace_repository = mock.Mock(WorkspaceRepository) + mock_workspace = mock_workspace_repository.get_by_id.return_value dry_run_env = EvalEnv({ ENV_DRY_RUN_TRACER: dry_run_tracer, @@ -1767,7 +1769,7 @@ def test_export_workspace(dry_run_tracer, backend_implementation): assert save_result.is_format("GTiff") - save_result.export_workspace(lambda workspace_id: mock_workspace, + save_result.export_workspace(mock_workspace_repository, files=[Path("file1"), Path("file2")], default_merge="/some/unique/path") mock_workspace.import_file.assert_has_calls([ diff --git a/tests/test_save_result.py b/tests/test_save_result.py index 3986a57a..d70bc96e 100644 --- a/tests/test_save_result.py +++ b/tests/test_save_result.py @@ -12,7 +12,7 @@ from openeo_driver.datacube import DriverVectorCube from openeo_driver.save_result import AggregatePolygonResult, SaveResult, AggregatePolygonSpatialResult, \ AggregatePolygonResultCSV, JSONResult -from openeo_driver.workspace import Workspace +from openeo_driver.workspacerepository import WorkspaceRepository from .data import load_json, json_normalize, get_path @@ -43,11 +43,12 @@ def test_with_format(): (None, "/some/unique/path") ]) def test_export_workspace(merge, expected_workspace_path): - mock_workspace = mock.Mock(spec=Workspace) + mock_workspace_repository = mock.Mock(spec=WorkspaceRepository) + mock_workspace = mock_workspace_repository.get_by_id.return_value r = SaveResult() r.add_workspace_export(workspace_id="some-workspace", merge=merge) - r.export_workspace(get_workspace_by_id=lambda workspace_id: mock_workspace, files=[Path("/some/file")], + r.export_workspace(workspace_repository=mock_workspace_repository, files=[Path("/some/file")], default_merge="/some/unique/path") mock_workspace.import_file.assert_called_with(Path("/some/file"), expected_workspace_path) diff --git a/tests/test_workspacerepository.py b/tests/test_workspacerepository.py new file mode 100644 index 00000000..752213f3 --- /dev/null +++ b/tests/test_workspacerepository.py @@ -0,0 +1,17 @@ +import pytest + +from openeo_driver.workspace import Workspace +from openeo_driver.workspacerepository import backend_config_workspace_repository + + +def test_backend_config_workspace_repository(): + workspace_repository = backend_config_workspace_repository + workspace = workspace_repository.get_by_id("tmp") + + assert isinstance(workspace, Workspace) + + +def test_backend_config_workspace_repository_unknown_workspace(): + with pytest.raises(KeyError): + workspace_repository = backend_config_workspace_repository + workspace_repository.get_by_id("retteketet") From fd6f9138218ea8e6c27ecc9354a0bc9d00315e39 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Wed, 28 Feb 2024 09:15:09 +0100 Subject: [PATCH 11/12] refactor: use dataclass iso/ dict https://github.com/Open-EO/openeo-geopyspark-driver/issues/676 --- openeo_driver/save_result.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/openeo_driver/save_result.py b/openeo_driver/save_result.py index 3cc84f4c..926badd0 100644 --- a/openeo_driver/save_result.py +++ b/openeo_driver/save_result.py @@ -5,6 +5,7 @@ import tempfile import warnings import logging +from dataclasses import dataclass from datetime import datetime, date from pathlib import Path import shutil @@ -47,7 +48,7 @@ class SaveResult: def __init__(self, format: Optional[str] = None, options: Optional[dict] = None): self.format = format or self.DEFAULT_FORMAT self.options = options or {} - self._workspace_exports = [] + self._workspace_exports: List['SaveResult._WorkspaceExport'] = [] def is_format(self, *args): return self.format.lower() in {f.lower() for f in args} @@ -94,14 +95,14 @@ def get_mimetype(self, default="application/octet-stream"): def add_workspace_export(self, workspace_id: str, merge: Optional[str]): # TODO: should probably return a copy (like with_format) but does not work well with evaluate() returning # results stored in env[ENV_SAVE_RESULT] instead of what ultimately comes out of the process graph. - self._workspace_exports.append(dict(workspace_id=workspace_id, merge=merge)) + self._workspace_exports.append(self._WorkspaceExport(workspace_id, merge)) def export_workspace(self, workspace_repository: WorkspaceRepository, files: List[Path], default_merge: str): for export in self._workspace_exports: - workspace = workspace_repository.get_by_id(export["workspace_id"]) + workspace = workspace_repository.get_by_id(export.workspace_id) for file in files: - merge = export["merge"] + merge = export.merge if merge is None: merge = default_merge @@ -110,6 +111,11 @@ def export_workspace(self, workspace_repository: WorkspaceRepository, files: Lis workspace.import_file(file, merge) + @dataclass + class _WorkspaceExport: + workspace_id: str + merge: str + def get_temp_file(suffix="", prefix="openeo-pydrvr-"): # TODO: make sure temp files are cleaned up when read From 98011b949096c9166e68fa57adbf9b8ebc039051 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Wed, 28 Feb 2024 09:26:03 +0100 Subject: [PATCH 12/12] update version and CHANGELOG https://github.com/Open-EO/openeo-geopyspark-driver/issues/676 --- CHANGELOG.md | 4 ++++ openeo_driver/_version.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ec419044..4e039a60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,10 @@ and start a new "In Progress" section above it. ## In progress +## 0.91.0 + +- Support `export_workspace` process and `DiskWorkspace` implementation ([Open-EO/openeo-geopyspark-driver#676](https://github.com/Open-EO/openeo-geopyspark-driver/issues/676)) + ## 0.90.1 diff --git a/openeo_driver/_version.py b/openeo_driver/_version.py index 348385fc..6f275e2c 100644 --- a/openeo_driver/_version.py +++ b/openeo_driver/_version.py @@ -1 +1 @@ -__version__ = "0.90.1a1" +__version__ = "0.91.0a1"