Skip to content

Commit

Permalink
implemented the jobs API (tests still missing)
Browse files Browse the repository at this point in the history
  • Loading branch information
forman committed Jul 12, 2023
1 parent 7d0e061 commit 11b4500
Show file tree
Hide file tree
Showing 12 changed files with 480 additions and 200 deletions.
48 changes: 48 additions & 0 deletions test/webapi/compute/test_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# The MIT License (MIT)
# Copyright (c) 2023 by the xcube team and contributors
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.

from unittest import TestCase

from xcube.util.jsonschema import JsonObjectSchema
from xcube.webapi.compute.config import CONFIG_SCHEMA


class ComputeConfigTest(TestCase):
def test_config_schema(self):
self.assertIsInstance(CONFIG_SCHEMA, JsonObjectSchema)
self.assertEqual(
{
'type': 'object',
'properties': {
'Compute': {
'type': 'object',
'properties': {
'MaxWorkers': {
'type': 'integer',
'minimum': 1,
}
},
'additionalProperties': False,
}
},
},
CONFIG_SCHEMA.to_dict()
)
4 changes: 2 additions & 2 deletions test/webapi/compute/test_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import xarray as xr

from xcube.core.new import new_cube
from xcube.webapi.compute.op import get_operations
from xcube.webapi.compute.op.registry import OP_REGISTRY
from xcube.webapi.compute.operations import spatial_subset


class ComputeOperationsTest(TestCase):
def test_operations_registered(self):
ops = get_operations()
ops = OP_REGISTRY.ops
self.assertIn("spatial_subset", ops)
self.assertTrue(callable(ops["spatial_subset"]))

Expand Down
12 changes: 11 additions & 1 deletion xcube/webapi/compute/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,19 @@
# DEALINGS IN THE SOFTWARE.

from xcube.util.jsonschema import JsonObjectSchema
from xcube.util.jsonschema import JsonIntegerSchema

CONFIG_SCHEMA = JsonObjectSchema(
COMPUTE_CONFIG_SCHEMA = JsonObjectSchema(
properties=dict(
MaxWorkers=JsonIntegerSchema(minimum=1),
# Executor=JsonObjectSchema(),
# OpRegistry=JsonStringSchema(),
),
additional_properties=False
)

CONFIG_SCHEMA = JsonObjectSchema(
properties=dict(
Compute=COMPUTE_CONFIG_SCHEMA,
)
)
138 changes: 137 additions & 1 deletion xcube/webapi/compute/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,34 @@
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.

import importlib
import concurrent.futures
import datetime
from typing import Dict, Any

import xarray as xr

from xcube.server.api import Context
from xcube.webapi.common.context import ResourcesContext
from xcube.webapi.datasets.context import DatasetsContext
from xcube.webapi.places import PlacesContext
from xcube.constants import LOG
from .op.info import OpInfo
from .op.registry import OpRegistry
from .op.registry import OP_REGISTRY

# Register default operations:
importlib.import_module("xcube.webapi.compute.operations")


LocalExecutor = concurrent.futures.ThreadPoolExecutor


class ComputeContext(ResourcesContext):

def __init__(self, server_ctx: Context):
def __init__(self,
server_ctx: Context,
op_registry: OpRegistry = OP_REGISTRY):
super().__init__(server_ctx)
self._datasets_ctx: DatasetsContext \
= server_ctx.get_api_ctx("datasets")
Expand All @@ -36,10 +55,127 @@ def __init__(self, server_ctx: Context):
= server_ctx.get_api_ctx("places")
assert isinstance(self._places_ctx, PlacesContext)

self._op_registry = op_registry
assert isinstance(self._op_registry, OpRegistry)

compute_config = server_ctx.config.get("Compute", {})
max_workers = compute_config.get("MaxWorkers", 3)

self.next_job_id = 0
self.jobs: Dict[int, Any] = {}
self.job_futures: Dict[int, concurrent.futures.Future] = {}
self.job_executor = LocalExecutor(max_workers=max_workers,
thread_name_prefix='xcube-job-')

def on_dispose(self):
self.job_executor.shutdown(cancel_futures=True)

@property
def op_registry(self) -> OpRegistry:
return self._op_registry

@property
def datasets_ctx(self) -> DatasetsContext:
return self._datasets_ctx

@property
def places_ctx(self) -> PlacesContext:
return self._places_ctx

def schedule_job(self, job_request: Dict[str, Any]):
"""Schedule a new job given by *job_request*,
which is expected to be validated already.
Job status transitions:
scheduled
scheduled --> cancelled
scheduled --> running
scheduled --> running --> completed
scheduled --> running --> failed
scheduled --> running --> cancelled
"""

with self.rlock:
job_id = self.next_job_id
self.next_job_id += 1

job = {
"id": job_id,
"request": job_request,
"state": {
"status": "scheduled"
},
"createTime": datetime.datetime.utcnow().isoformat(),
"startTime": None,
}

LOG.info(f"Scheduled job #{job_id}")
self.jobs[job_id] = job
job_future = self.job_executor.submit(self.invoke_job, job_id)
self.job_futures[job_id] = job_future

return job

def invoke_job(self, job_id: int):
job = self.jobs.get(job_id)
if job is None:
return

if job["state"]["status"] == "cancelled":
return

job_request = job["request"]

op_id = job_request["operationId"]
output_ds_id = job_request["datasetId"]
parameters = job_request.get("parameters", {})

LOG.info(f"Started job #{job_id}")
job["state"]["status"] = "running"

op = self.op_registry.get_op(op_id)
op_info = OpInfo.get_op_info(op)
param_py_types = op_info.effective_param_py_types

parameters = parameters.copy()
for param_name, param_py_type in param_py_types.items():
if param_py_type is xr.Dataset:
input_ds_id = parameters.get(param_name)
if input_ds_id is not None:
input_ds = self._datasets_ctx.get_dataset(input_ds_id)
parameters[param_name] = input_ds

try:
output_ds = op(**parameters)
except Exception as e:
LOG.error(f"Job #{job_id} failed:", e)
job["state"]["status"] = "failed"
job["state"]["error"] = {
"message": str(e),
"type": type(e).__name__
}
return

if job["state"]["status"] == "cancelled":
return

# TODO: get other dataset properties from "output"
self.datasets_ctx.add_dataset(
output_ds,
ds_id=output_ds_id
)

LOG.info(f"Completed job #{job_id}")
job["state"]["status"] = "completed"

def cancel_job(self, job_id: int):
job = self.jobs.get(job_id)
if job is None:
return

LOG.info(f"Job #{job_id} cancelled:")
job["state"]["status"] = "cancelled"

future = self.job_futures.pop(job_id, None)
if future is not None:
future.cancel()
24 changes: 10 additions & 14 deletions xcube/webapi/compute/controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,36 @@
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.

import importlib
import inspect
from typing import Callable, Any, Dict, List
from typing import Callable, Any, Dict

from xcube.server.api import ApiError

from .context import ComputeContext
from .op import get_operations
from .op import get_op_params_schema

importlib.import_module("xcube.webapi.compute.operations")
from .op.info import OpInfo


def get_compute_operations(ctx: ComputeContext):
ops = get_operations()
ops = ctx.op_registry.ops
return {
"operations": [encode_op(op_id, f) for op_id, f in ops.items()]
}


def get_compute_operation(ctx: ComputeContext, op_id: str):
ops = get_operations()
f = ops.get(op_id)
if f is None:
op = ctx.op_registry.get_op(op_id)
if op is None:
raise ApiError.NotFound(f'operation {op_id!r} not found')
return encode_op(op_id, f)
return encode_op(op_id, op)


def encode_op(op_id: str, f: Callable) -> Dict[str, Any]:
def encode_op(op_id: str, op: Callable) -> Dict[str, Any]:
op_info = OpInfo.get_op_info(op)
op_json = {
"operationId": op_id,
"parametersSchema": get_op_params_schema(f)
"parametersSchema": op_info.params_schema
}
doc = inspect.getdoc(f)
doc = inspect.getdoc(op)
if doc:
op_json.update(description=doc)
return op_json
Loading

0 comments on commit 11b4500

Please sign in to comment.