Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

676 export workspace support file copy #262

Merged
merged 14 commits into from
Feb 28, 2024
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ and start a new "In Progress" section above it.

## In progress

## 0.91.0

- Support `export_workspace` process and `DiskWorkspace` implementation ([Open-EO/openeo-geopyspark-driver#676](https://github.com/Open-EO/openeo-geopyspark-driver/issues/676))


## 0.90.1

Expand Down
24 changes: 22 additions & 2 deletions openeo_driver/ProcessGraphDeserializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,15 +734,16 @@ def apply_dimension(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:


@process
def save_result(args: Dict, env: EvalEnv) -> SaveResult:
def save_result(args: Dict, env: EvalEnv) -> SaveResult: # TODO: return type no longer holds
data = extract_arg(args, 'data')
format = extract_arg(args, 'format')
options = args.get('options', {})

if isinstance(data, SaveResult):
# TODO: Is this an expected code path? `save_result` should be terminal node in a graph
# so chaining `save_result` calls should not be valid
data.set_format(format, options)
# https://github.com/Open-EO/openeo-geopyspark-driver/issues/295
data = data.with_format(format, options)
if ENV_SAVE_RESULT in env:
env[ENV_SAVE_RESULT].append(data)
return data
Expand Down Expand Up @@ -2209,6 +2210,25 @@ def if_(value: Union[bool, None], accept, reject=None):
return accept if value else reject


@process_registry_2xx.add_function(spec=read_spec("openeo-processes/experimental/export_workspace.json"))
def export_workspace(args: ProcessArgs, env: EvalEnv) -> SaveResult:
data = args.get_required("data")
workspace_id = args.get_required("workspace", expected_type=str)
merge = args.get_optional("merge", expected_type=str)

if isinstance(data, SaveResult):
result = data
else:
# TODO: work around save_result returning a data cube instead of a SaveResult (#295)
results = env[ENV_SAVE_RESULT]
if len(results) != 1:
raise FeatureUnsupportedException("only process graphs with a single save_result node are supported")
result = results[0]

result.add_workspace_export(workspace_id, merge=merge)
return result


# Finally: register some fallback implementation if possible
_register_fallback_implementations_by_process_graph(process_registry_100)
_register_fallback_implementations_by_process_graph(process_registry_2xx)
2 changes: 1 addition & 1 deletion openeo_driver/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.90.1a1"
__version__ = "0.91.0a1"
7 changes: 5 additions & 2 deletions openeo_driver/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from openeo_driver.server import build_backend_deploy_metadata
from openeo_driver.urlsigning import UrlSigner
from openeo_driver.users.oidc import OidcProvider
from openeo_driver.workspace import Workspace


class ConfigException(ValueError):
Expand Down Expand Up @@ -69,5 +70,7 @@ class OpenEoBackendConfig:

url_signer: Optional[UrlSigner] = None

collection_exclusion_list: Dict[str,List[str]] = {} # e.g. {"1.1.0":["my_collection_id"]}
processes_exclusion_list: Dict[str,List[str]] = {} # e.g. {"1.1.0":["my_process_id"]}
collection_exclusion_list: Dict[str, List[str]] = {} # e.g. {"1.1.0":["my_collection_id"]}
processes_exclusion_list: Dict[str, List[str]] = {} # e.g. {"1.1.0":["my_process_id"]}

workspaces: Dict[str, Workspace] = attrs.Factory(dict)
4 changes: 4 additions & 0 deletions openeo_driver/dummy/dummy_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from pathlib import Path

from openeo_driver.config import OpenEoBackendConfig
from openeo_driver.server import build_backend_deploy_metadata
from openeo_driver.users.oidc import OidcProvider
from openeo_driver.workspace import DiskWorkspace

oidc_providers = [
OidcProvider(
Expand Down Expand Up @@ -75,4 +78,5 @@ def _valid_basic_auth(username: str, password: str) -> bool:
oidc_providers=oidc_providers,
enable_basic_auth=True,
valid_basic_auth=_valid_basic_auth,
workspaces={"tmp": DiskWorkspace(root_directory=Path("/tmp"))},
)
42 changes: 38 additions & 4 deletions openeo_driver/save_result.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import copy
import glob
import os
import re
import tempfile
import warnings
import logging
from dataclasses import dataclass
from datetime import datetime, date
from pathlib import Path
from shutil import copy
import shutil
from tempfile import mkstemp
from typing import Union, Dict, List, Optional, Any
from zipfile import ZipFile
Expand All @@ -27,6 +29,7 @@
from openeo_driver.errors import OpenEOApiException, FeatureUnsupportedException, InternalException
from openeo_driver.util.ioformats import IOFORMATS
from openeo_driver.utils import replace_nan_values
from openeo_driver.workspacerepository import WorkspaceRepository

_log = logging.getLogger(__name__)

Expand All @@ -45,14 +48,21 @@ class SaveResult:
def __init__(self, format: Optional[str] = None, options: Optional[dict] = None):
self.format = format or self.DEFAULT_FORMAT
self.options = options or {}
self._workspace_exports: List['SaveResult._WorkspaceExport'] = []

def is_format(self, *args):
return self.format.lower() in {f.lower() for f in args}

def set_format(self, format: str, options: dict = None):
self.format = format.lower()
self.format = format
self.options = options or {}

def with_format(self, format: str, options: dict = None) -> 'SaveResult':
shallow_copy = copy.copy(self)
shallow_copy.format = format
shallow_copy.options = options or {}
return shallow_copy

def write_assets(self, directory: Union[str, Path]) -> Dict[str, StacAsset]:
raise NotImplementedError

Expand Down Expand Up @@ -82,6 +92,30 @@ def flask_response_from_write_assets(self) -> Response:
def get_mimetype(self, default="application/octet-stream"):
return IOFORMATS.get_mimetype(self.format)

def add_workspace_export(self, workspace_id: str, merge: Optional[str]):
# TODO: should probably return a copy (like with_format) but does not work well with evaluate() returning
# results stored in env[ENV_SAVE_RESULT] instead of what ultimately comes out of the process graph.
self._workspace_exports.append(self._WorkspaceExport(workspace_id, merge))

def export_workspace(self, workspace_repository: WorkspaceRepository, files: List[Path], default_merge: str):
for export in self._workspace_exports:
workspace = workspace_repository.get_by_id(export.workspace_id)

for file in files:
merge = export.merge

if merge is None:
merge = default_merge
elif merge == "":
merge = "."

workspace.import_file(file, merge)

@dataclass
class _WorkspaceExport:
workspace_id: str
merge: str


def get_temp_file(suffix="", prefix="openeo-pydrvr-"):
# TODO: make sure temp files are cleaned up when read
Expand Down Expand Up @@ -608,7 +642,7 @@ def to_csv(self, destination=None):
if(destination == None):
return csv_paths[0]
else:
copy(csv_paths[0],destination)
shutil.copy(csv_paths[0], destination)
return destination


Expand Down Expand Up @@ -704,7 +738,7 @@ def write_assets(self, directory: Union[str, Path]) -> Dict[str, StacAsset]:
filename = str(directory / "timeseries.csv")
asset["type"] = IOFORMATS.get_mimetype(self.format)

copy(self._csv_path(), filename)
shutil.copy(self._csv_path(), filename)
elif self.is_format("parquet"):
filename = str(directory / "timeseries.parquet")
asset["type"] = IOFORMATS.get_mimetype(self.format)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"id": "export_workspace",
"summary": "Export data to a cloud user workspace",
"description": "Exports the given processing results made available through a STAC resource (e.g., a STAC Collection) to the given user workspace. The STAC resource itself is exported with all STAC resources and assets underneath.",
"categories": [
"export",
"stac"
],
"experimental": true,
"parameters": [
{
"name": "data",
"description": "The data to export to the user workspace as a STAC resource.",
"schema": {
"type": "object",
"subtype": "stac"
}
},
{
"name": "workspace",
"description": "The identifier of the workspace to export to.",
"schema": {
"type": "string",
"pattern": "^[\\w\\-\\.~]+$",
"subtype": "workspace-id"
}
},
{
"name": "merge",
"description": "Provides a cloud-specific path identifier to a STAC resource to merge the given STAC resource into. If not provided, the STAC resource is kept separate from any other STAC resources in the workspace.",
"schema": {
"type": [
"string",
"null"
]
},
"optional": true,
"default": null
}
],
"returns": {
"description": "Returns the potentially updated STAC resource.",
"schema": {
"type": "object",
"subtype": "stac"
}
}
}
26 changes: 26 additions & 0 deletions openeo_driver/workspace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import abc
import os.path
import shutil
from pathlib import Path


class Workspace(abc.ABC):
@abc.abstractmethod
def import_file(self, file: Path, merge: str):
raise NotImplementedError


class DiskWorkspace(Workspace):
def __init__(self, root_directory: Path):
self.root_directory = root_directory

def import_file(self,
file: Path,
merge: str):
merge = os.path.normpath(merge)
subdirectory = merge[1:] if merge.startswith("/") else merge
target_directory = self.root_directory / subdirectory
target_directory.relative_to(self.root_directory) # assert target_directory is in root_directory

target_directory.mkdir(parents=True, exist_ok=True)
shutil.copy(file, target_directory)
18 changes: 18 additions & 0 deletions openeo_driver/workspacerepository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import abc

from openeo_driver.config import get_backend_config
from openeo_driver.workspace import Workspace


class WorkspaceRepository(abc.ABC):
@abc.abstractmethod
def get_by_id(self, workspace_id: str) -> Workspace:
raise NotImplementedError


class BackendConfigWorkspaceRepository(WorkspaceRepository):
def get_by_id(self, workspace_id: str) -> Workspace:
return get_backend_config().workspaces[workspace_id]


backend_config_workspace_repository = BackendConfigWorkspaceRepository()
50 changes: 50 additions & 0 deletions tests/test_dry_run.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from pathlib import Path
import pytest
import shapely.geometry
from unittest import mock

from openeo.internal.graph_building import PGNode
from openeo.rest.datacube import DataCube
Expand All @@ -14,6 +16,8 @@
from openeo_driver.testing import DictSubSet, approxify
from openeo_driver.util.geometry import as_geojson_feature_collection
from openeo_driver.utils import EvalEnv
from openeo_driver.workspace import Workspace
from openeo_driver.workspacerepository import WorkspaceRepository
from tests.data import get_path, load_json


Expand Down Expand Up @@ -1726,3 +1730,49 @@ def test_invalid_latlon_in_geojson(dry_run_env):
}
cube = init_cube.filter_spatial(geometries=multipoint_many_coordinates)
evaluate(cube.flat_graph(), env=dry_run_env)


def test_export_workspace(dry_run_tracer, backend_implementation):
mock_workspace_repository = mock.Mock(WorkspaceRepository)
mock_workspace = mock_workspace_repository.get_by_id.return_value

dry_run_env = EvalEnv({
ENV_DRY_RUN_TRACER: dry_run_tracer,
"backend_implementation": backend_implementation,
"version": "2.0.0"
})

pg = {
"loadcollection1": {
"process_id": "load_collection",
"arguments": {"id": "S2_FOOBAR"},
},
"saveresult1": {
"process_id": "save_result",
"arguments": {
"data": {"from_node": "loadcollection1"},
"format": "GTiff",
},
},
"exportworkspace1": {
"process_id": "export_workspace",
"arguments": {
"data": {"from_node": "saveresult1"},
"workspace": "some-workspace",
"merge": "some/path",
},
"result": True,
}
}

save_result = evaluate(pg, env=dry_run_env)

assert save_result.is_format("GTiff")

save_result.export_workspace(mock_workspace_repository,
files=[Path("file1"), Path("file2")],
default_merge="/some/unique/path")
mock_workspace.import_file.assert_has_calls([
mock.call(Path("file1"), "some/path"),
mock.call(Path("file2"), "some/path"),
])
Loading