-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
load_stac: support loading unfinished results #489
Comments
As discussed: works similarly to SHub batch process polling. SHub batch processes: when starting a batch job, detect a load_stac: when starting a batch job, detect a |
"openeo:status" is in the root of the STAC Collection object. Some considerations/decisions that make sense at this time but might need revisiting:
|
Traceback (most recent call last): File "/home/bossie/PycharmProjects/openeo/venv38/lib/python3.8/site-packages/flask/app.py", line 1516, in full_dispatch_request rv = self.dispatch_request() File "/home/bossie/PycharmProjects/openeo/venv38/lib/python3.8/site-packages/flask/app.py", line 1502, in dispatch_request return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args) File "/home/bossie/PycharmProjects/openeo/venv38/lib/python3.8/site-packages/openeo_driver/users/auth.py", line 88, in decorated return f(*args, **kwargs) File "/home/bossie/PycharmProjects/openeo/venv38/lib/python3.8/site-packages/openeo_driver/views.py", line 865, in queue_job backend_implementation.batch_jobs.start_job(job_id=job_id, user=user) File "/home/bossie/PycharmProjects/openeo/openeo-geopyspark-driver/openeogeotrellis/backend.py", line 1827, in start_job self._start_job(job_id, user, _get_vault_token) File "/home/bossie/PycharmProjects/openeo/openeo-geopyspark-driver/openeogeotrellis/backend.py", line 2185, in _start_job args.append(serialize_dependencies()) File "/home/bossie/PycharmProjects/openeo/openeo-geopyspark-driver/openeogeotrellis/backend.py", line 1957, in serialize_dependencies dependencies = dependencies or job_info.get('dependencies') or [] UnboundLocalError: local variable 'dependencies' referenced before assignment
Currently implemented as such:
|
We schedule batch processes ourselves, then pass on their source_location (and card4l flag) to the batch job that is subsequently started upon their completion ("DONE"). The load_collections that get invoked while evaluating the process graph in the batch job only rely on source_location and card4l, not on the arguments actually passed to load_collection. In the case of unfinished job results dependencies, however, there's no need to serialize them and pass them on to the subsequent batch job, because load_stacs evaluated in that batch job will simply act upon the very same "running" URL that made it a dependency in the first place (but this time it will actually read the job results because they have "finished" in the meanwhile).
…489 > batch_processes = reduce(partial(dict_merge_recursive, overwrite=True), (batch_request_details(dependency) for dependency in batch_process_dependencies)) E TypeError: reduce() of empty sequence with no initial value
…finished-results start job: await unfinished job results dependencies #489
Did some testing and loading partial job results from OpenEO dev takes a very long time (but not always):
|
Not sure what the deal is with:
In step 2, the web app driver will attempt to fetch A's |
If I
{
"process_graph": {
"load1": {
"arguments": {
"id": "SENTINEL2_L2A",
"spatial_extent": {
"coordinates": [
[
[
14.20922527067026,
40.855657765536336
],
[
14.20922527067026,
40.95056915081699
],
[
14.316342442933973,
40.95056915081699
],
[
14.316342442933973,
40.855657765536336
],
[
14.20922527067026,
40.855657765536336
]
]
],
"type": "Polygon"
},
"temporal_extent": [
"2022-04-17T00:00:00Z",
"2022-04-17T00:00:00Z"
]
},
"process_id": "load_collection"
},
"save2": {
"arguments": {
"data": {
"from_node": "load1"
},
"format": "GTIFF"
},
"process_id": "save_result",
"result": true
}
}
}
{
"process_graph": {
"load1": {
"arguments": {
"url": "https://openeo-staging.dataspace.copernicus.eu/openeo/1.1/jobs/j-735dfda9c31849efba3895cf8b8cf64c/results/...&partial=true"
},
"process_id": "load_stac"
},
"save2": {
"arguments": {
"data": {
"from_node": "load1"
},
"format": "GTiff"
},
"process_id": "save_result",
"result": true
}
}
}
However, B does not yet run to completion; there are 2 errors in its logs:
|
Open-EO/openeo-geopyspark-driver#489 OpenEO batch job failed: java.lang.IllegalArgumentException: requirement failed: Server doesn't support ranged byte reads Traceback (most recent call last): File "batch_job.py", line 1291, in <module> main(sys.argv) File "batch_job.py", line 1028, in main run_driver() File "batch_job.py", line 999, in run_driver run_job( File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/utils.py", line 52, in memory_logging_wrapper return function(*args, **kwargs) File "batch_job.py", line 1092, in run_job result = ProcessGraphDeserializer.evaluate(process_graph, env=env, do_dry_run=tracer) File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 348, in evaluate result = convert_node(result_node, env=env) File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 368, in convert_node process_result = apply_process(process_id=process_id, args=processGraph.get('arguments', {}), File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1480, in apply_process args = {name: convert_node(expr, env=env) for (name, expr) in sorted(args.items())} File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1480, in <dictcomp> args = {name: convert_node(expr, env=env) for (name, expr) in sorted(args.items())} File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 380, in convert_node return convert_node(processGraph['node'], env=env) File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 368, in convert_node process_result = apply_process(process_id=process_id, args=processGraph.get('arguments', {}), File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1512, in apply_process return process_function(args=ProcessArgs(args, process_id=process_id), env=env) File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 2091, in load_stac return env.backend_implementation.load_stac(url=url, load_params=load_params, env=env) File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/backend.py", line 1078, in load_stac pyramid = pyramid_factory.datacube_seq(projected_polygons, from_date, to_date, metadata_properties, File "/opt/spark3_4_0/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__ return_value = get_return_value( File "/opt/spark3_4_0/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o1635.datacube_seq. : java.io.IOException: Exception while determining data type of collection https://openeo-staging.dataspace.copernicus.eu/openeo/1.1/jobs/j-735dfda9c31849efba3895cf8b8cf64c/results/...&partial=true and item https://openeo-staging.dataspace.copernicus.eu/openeo/1.1/jobs/j-735dfda9c31849efba3895cf8b8cf64c/results/assets/.../openEO_2022-04-17Z.tif?expires=1694006445. Detailed message: requirement failed: Server doesn't support ranged byte reads at org.openeo.geotrellis.layers.FileLayerProvider.determineCelltype(FileLayerProvider.scala:662) at org.openeo.geotrellis.layers.FileLayerProvider.readKeysToRasterSources(FileLayerProvider.scala:690) at org.openeo.geotrellis.layers.FileLayerProvider.readMultibandTileLayer(FileLayerProvider.scala:862) at org.openeo.geotrellis.file.PyramidFactory.datacube(PyramidFactory.scala:111) at org.openeo.geotrellis.file.PyramidFactory.datacube_seq(PyramidFactory.scala:84) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.IllegalArgumentException: requirement failed: Server doesn't support ranged byte reads at scala.Predef$.require(Predef.scala:281) at org.openeo.geotrellis.CustomizableHttpRangeReader.totalLength$lzycompute(CustomizableHttpRangeReader.scala:31) at org.openeo.geotrellis.CustomizableHttpRangeReader.totalLength(CustomizableHttpRangeReader.scala:10) at geotrellis.util.StreamingByteReader.ensureChunk(StreamingByteReader.scala:109) at geotrellis.util.StreamingByteReader.get(StreamingByteReader.scala:130) at geotrellis.raster.io.geotiff.reader.GeoTiffInfo$.read(GeoTiffInfo.scala:127) at geotrellis.raster.io.geotiff.reader.GeoTiffReader$.readMultiband(GeoTiffReader.scala:211) at geotrellis.raster.geotiff.GeoTiffReprojectRasterSource.$anonfun$tiff$1(GeoTiffReprojectRasterSource.scala:46) at scala.Option.getOrElse(Option.scala:189) at geotrellis.raster.geotiff.GeoTiffReprojectRasterSource.tiff$lzycompute(GeoTiffReprojectRasterSource.scala:43) at geotrellis.raster.geotiff.GeoTiffReprojectRasterSource.tiff(GeoTiffReprojectRasterSource.scala:40) at geotrellis.raster.geotiff.GeoTiffReprojectRasterSource.$anonfun$cellType$1(GeoTiffReprojectRasterSource.scala:50) at scala.Option.getOrElse(Option.scala:189) at geotrellis.raster.geotiff.GeoTiffReprojectRasterSource.cellType(GeoTiffReprojectRasterSource.scala:50) at org.openeo.geotrellis.layers.BandCompositeRasterSource.$anonfun$cellType$1(FileLayerProvider.scala:79) at cats.data.NonEmptyList.map(NonEmptyList.scala:87) at org.openeo.geotrellis.layers.BandCompositeRasterSource.cellType(FileLayerProvider.scala:79) at org.openeo.geotrellis.layers.FileLayerProvider.determineCelltype(FileLayerProvider.scala:656) ... 16 more
Failed status sync for job_id='j-7c5e66fbba9f402b9ce7c19f3d42c29c': unexpected KeyError: 'batch_request_id' Traceback (most recent call last): File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/job_tracker_v2.py", line 383, in update_statuses self._sync_job_status( File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/job_tracker_v2.py", line 460, in _sync_job_status dependency_sources = list(set(get_dependency_sources(job_info))) File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/job_registry.py", line 433, in get_dependency_sources return [source for dependency in (job_info.get("dependencies") or []) for source in sources(dependency)] File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/job_registry.py", line 433, in <listcomp> return [source for dependency in (job_info.get("dependencies") or []) for source in sources(dependency)] File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/job_registry.py", line 427, in sources subfolder = dependency.get("subfolder") or dependency["batch_request_id"] KeyError: 'batch_request_id
A |
With both jobs on Terrascope, it still hangs. TimingLogger when starting the
TimingLogger in the subsequent get partial job results call (
In both cases, there's no corresponding "end/elapsed" log. Added some more logging to the implementation of |
I think the mutex introduced in 656b7ce leads to a deadlock.
The mutex is acquired for the whole of |
Confirmed and subsequently fixed by splitting up the Why was this |
Both original + |
Using load_stac, we can normally also load openEO generated results provided by a signed url.
Using the 'partial' query parameter, it seems possible to get a canonical link to a job that is still running:
https://api.openeo.org/#tag/Data-Processing/operation/list-results
(support for 'partial' needs to be added to our backend)
So if our backend receives a process graph with a load_stac, it should only start the actual processing when all dependencies are finished!
This is an important piece of the puzzle for federated processing, because it allows the aggregator to perform job splitting without keeping track of the various jobs itself: it can just schedule all jobs and forward them to the respective backends. Hence it also no longer needs a long lived token.
The text was updated successfully, but these errors were encountered: