Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

load_stac gives no data cross backend #786

Closed
bossie opened this issue May 30, 2024 · 16 comments · Fixed by #814
Closed

load_stac gives no data cross backend #786

bossie opened this issue May 30, 2024 · 16 comments · Fixed by #814
Assignees

Comments

@bossie
Copy link
Collaborator

bossie commented May 30, 2024

The aggregator delegates a load_collection to Terrascope and then loads it with load_stac, but gives an error "NoDataAvailable".
Adding spatial and temporal extent to the load_stac node in JSON makes the job come trough
meteo_cube = connection.load_stac("https://stac.openeo.vito.be/collections/agera5_daily", spatial_extent=EXTENT, temporal_extent=[STARTDATE, ENDDATE], bands=["2m_temperature_mean"]) gives a 429 error

@bossie
Copy link
Collaborator Author

bossie commented May 30, 2024

Some context.

Original job by Darius C. was as such:

# Now we extract the same input cube with openeo gfmap
import openeo

connection = openeo.connect("openeofed.dataspace.copernicus.eu").authenticate_oidc()

backend_context = BackendContext(Backend.FED)


EXTENT = dict(zip(["west", "south", "east", "north"], [5.318868004541495, 50.628576059801816, 5.3334400271343725, 50.637843899562576]))
EXTENT['crs'] = "EPSG:4326"
STARTDATE = '2022-01-01'
ENDDATE = '2022-03-31'

s2_cube = connection.load_collection("SENTINEL2_L2A", spatial_extent=EXTENT, temporal_extent=[STARTDATE, ENDDATE], bands=["B04"])
meteo_cube = connection.load_collection("AGERA5", spatial_extent=EXTENT, temporal_extent=[STARTDATE, ENDDATE], bands=["temperature-mean"])

s2_cube = s2_cube.aggregate_temporal_period(period='month', reducer='median', dimension='t')
meteo_cube = meteo_cube.aggregate_temporal_period(period='month', reducer='mean', dimension='t')

inputs = s2_cube.merge_cubes(meteo_cube)

job = inputs.create_job(
    out_format="NetCDF",
    title="Test extraction job",
    job_options={
        "split_strategy": "crossbackend",
        "driver-memory": "2G",
        "driver-memoryOverhead": "2G",
        "driver-cores": "1",
        "executor-memory": "1800m",
        "executor-memoryOverhead": "1900m",
    }
)
job.start_and_wait()

The openeofed aggregator split up this job between:

  • a job on CDSE to gather SENTINEL2_L2A (j-240528eeb7ce4b8bbd00e51746f0d01b) and
  • a job on Terrascope to gather AGERA5 (j-24052834d2d340839c3d047f60d4b8da)

where it essentially replaced the load_collection("AGERA5") in the CDSE job with a load_stac("j-24052834d2d340839c3d047f60d4b8da?partial=true"), with the intention that j-240528eeb7ce4b8bbd00e51746f0d01b waits until j-24052834d2d340839c3d047f60d4b8da results are available before resuming (#489).

Unfortunately, j-240528eeb7ce4b8bbd00e51746f0d01b failed with this error:

load_stac from url 'https://openeo-cdse.vito.be/openeo/1.1/jobs/j-24052834d2d340839c3d047f60d4b8da/results/YmJlNzQ2YzAtMzRjYy00NTFiLTk5YzYtNDhjYTU1Y2RhMzQ5/0be8a4541d6c7a44e276d2c44b13655d?expires=1717501136&partial=true' with load params {'temporal_extent': ['1970-01-01', '2070-01-01'], 'spatial_extent': {}, 'global_extent': None, 'bands': None, 'properties': {}, 'aggregate_spatial_geometries': None, 'sar_backscatter': None, 'process_types': set(), 'custom_mask': {}, 'data_mask': None, 'target_crs': None, 'target_resolution': None, 'resample_method': 'near', 'pixel_buffer': None}

Traceback (most recent call last):
  File "/opt/openeo/lib64/python3.8/site-packages/openeogeotrellis/deploy/batch_job.py", line 1374, in <module>
    main(sys.argv)
  File "/opt/openeo/lib64/python3.8/site-packages/openeogeotrellis/deploy/batch_job.py", line 1039, in main
    run_driver()
  File "/opt/openeo/lib64/python3.8/site-packages/openeogeotrellis/deploy/batch_job.py", line 1010, in run_driver
    run_job(
  File "/opt/openeo/lib/python3.8/site-packages/openeogeotrellis/utils.py", line 56, in memory_logging_wrapper
    return function(*args, **kwargs)
  File "/opt/openeo/lib64/python3.8/site-packages/openeogeotrellis/deploy/batch_job.py", line 1103, in run_job
    result = ProcessGraphDeserializer.evaluate(process_graph, env=env, do_dry_run=tracer)
  File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 377, in evaluate
    result = convert_node(result_node, env=env)
  File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 402, in convert_node
    process_result = apply_process(process_id=process_id, args=processGraph.get('arguments', {}),
  File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1581, in apply_process
    args = {name: convert_node(expr, env=env) for (name, expr) in sorted(args.items())}
  File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1581, in <dictcomp>
    args = {name: convert_node(expr, env=env) for (name, expr) in sorted(args.items())}
  File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 416, in convert_node
    return convert_node(processGraph['node'], env=env)
  File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 402, in convert_node
    process_result = apply_process(process_id=process_id, args=processGraph.get('arguments', {}),
  File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1581, in apply_process
    args = {name: convert_node(expr, env=env) for (name, expr) in sorted(args.items())}
  File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1581, in <dictcomp>
    args = {name: convert_node(expr, env=env) for (name, expr) in sorted(args.items())}
  File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 416, in convert_node
    return convert_node(processGraph['node'], env=env)
  File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 402, in convert_node
    process_result = apply_process(process_id=process_id, args=processGraph.get('arguments', {}),
  File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1581, in apply_process
    args = {name: convert_node(expr, env=env) for (name, expr) in sorted(args.items())}
  File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1581, in <dictcomp>
    args = {name: convert_node(expr, env=env) for (name, expr) in sorted(args.items())}
  File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 416, in convert_node
    return convert_node(processGraph['node'], env=env)
  File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 402, in convert_node
    process_result = apply_process(process_id=process_id, args=processGraph.get('arguments', {}),
  File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 1613, in apply_process
    return process_function(args=ProcessArgs(args, process_id=process_id), env=env)
  File "/opt/openeo/lib/python3.8/site-packages/openeo_driver/ProcessGraphDeserializer.py", line 2234, in load_stac
    return env.backend_implementation.load_stac(url=url, load_params=load_params, env=env)
  File "/opt/openeo/lib/python3.8/site-packages/openeogeotrellis/backend.py", line 760, in load_stac
    return load_stac.load_stac(url, load_params, env, layer_properties={}, batch_jobs=self.batch_jobs)
  File "/opt/openeo/lib/python3.8/site-packages/openeogeotrellis/load_stac.py", line 342, in load_stac
    raise no_data_available_exception
openeo_driver.errors.OpenEOApiException: There is no data available for the given extents.

@bossie
Copy link
Collaborator Author

bossie commented May 30, 2024

In the end, Terrascope job j-24052834d2d340839c3d047f60d4b8da finished successfully and produced 89 STAC Items.

At this moment load_stac is able to load its canonical URL into a data cube (without specifying spatial nor temporal extent) but apparently couldn't in the context of j-240528eeb7ce4b8bbd00e51746f0d01b.

@bossie
Copy link
Collaborator Author

bossie commented May 30, 2024

It should be noted that the dependent job ran on CDSE but this environment doesn't have the necessary infrastructure to poll its dependency job.

@bossie
Copy link
Collaborator Author

bossie commented May 30, 2024

This seems to be the point where it decides that CDSE does not in fact support dependencies:

async_tasks_supported = not ConfigParams().is_kube_deploy
if (
async_tasks_supported
and dependencies is None
and job_info.get("dependency_status")
not in [
DEPENDENCY_STATUS.AWAITING,
DEPENDENCY_STATUS.AWAITING_RETRY,
DEPENDENCY_STATUS.AVAILABLE,
]
):
job_dependencies = self._schedule_and_get_dependencies(
process_graph=job_process_graph,
api_version=api_version,
user_id=user_id,
job_id=job_id,
job_options=job_options,
sentinel_hub_client_alias=sentinel_hub_client_alias,
get_vault_token=get_vault_token,
logger_adapter=log,
)
log.debug(f"job_dependencies: {job_dependencies}")
if job_dependencies:
with self._double_job_registry as dbl_registry:
dbl_registry.set_dependencies(
job_id=job_id, user_id=user_id, dependencies=job_dependencies
)
async_task.schedule_await_job_dependencies(
batch_job_id=job_id,
user_id=user_id,
sentinel_hub_client_alias=sentinel_hub_client_alias,
vault_token=None
if sentinel_hub_client_alias == "default"
else get_vault_token(sentinel_hub_client_alias),
)
dbl_registry.set_dependency_status(
job_id, user_id, DEPENDENCY_STATUS.AWAITING
)
dbl_registry.set_status(job_id, user_id, JOB_STATUS.QUEUED)
return

CDSE batch job j-240528eeb7ce4b8bbd00e51746f0d01b therefore just proceeded with evaluating its process graph; at the point where it tried to load_stac("j-24052834d2d340839c3d047f60d4b8da?partial=true") the latter's results were not yet computed and there were no STAC Items to load, hence the original error:

File "/opt/openeo/lib/python3.8/site-packages/openeogeotrellis/load_stac.py", line 342, in load_stac
    raise no_data_available_exception
openeo_driver.errors.OpenEOApiException: There is no data available for the given extents.

This also explains why the CDSE job did not fail fast: it skipped putting the poll-message on Kafka altogether.

@soxofaan
Copy link
Member

async_tasks_supported = not ConfigParams().is_kube_deploy

I was afraid there would be something like that somewhere in the code path.

I'm really not a fan of such bazooka-style configs

@bossie
Copy link
Collaborator Author

bossie commented May 30, 2024

Reversing the arguments of merge_cubes will run the main job on Terrascope but it will not get into the "running" state because of this EJR error upon starting the job:

inputs = meteo_cube.merge_cubes(s2_cube)
Traceback (most recent call last):
  File "/opt/venv/lib64/python3.8/site-packages/flask/app.py", line 1484, in full_dispatch_request
    rv = self.dispatch_request()
  File "/opt/venv/lib64/python3.8/site-packages/flask/app.py", line 1469, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
  File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/users/auth.py", line 88, in decorated
    return f(*args, **kwargs)
  File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/views.py", line 939, in queue_job
    backend_implementation.batch_jobs.start_job(job_id=job_id, user=user)
  File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/backend.py", line 1745, in start_job
    self._start_job(job_id, user, _get_vault_token)
  File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/backend.py", line 1812, in _start_job
    dbl_registry.set_dependencies(
  File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/job_registry.py", line 851, in set_dependencies
    self.elastic_job_registry.set_dependencies(
  File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/jobregistry.py", line 498, in set_dependencies
    return self._update(job_id=job_id, data={"dependencies": dependencies})
  File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/jobregistry.py", line 493, in _update
    return self._do_request("PATCH", f"/jobs/{job_id}", json=data)
  File "/opt/venv/lib64/python3.8/site-packages/openeo_driver/jobregistry.py", line 330, in _do_request
    raise EjrHttpError.from_response(response=response)
openeo_driver.jobregistry.EjrHttpError: EJR API error: 400 'Bad Request' on `PATCH 'https://jobregistry.vgt.vito.be/jobs/j-2405308c6a2b4f1197bc4b429e7199b2'`: {"statusCode":400,"message":["dependencies.0.property partial_job_results_url should not exist"],"error":"Bad Request"}

In this case, dependencies look like this:

[{'partial_job_results_url': 'https://openeo.dataspace.copernicus.eu/openeo/1.1/jobs/j-2405304298c14e609b85ffc70b5b6382/results/OTc2MGU1OWItNTY2ZC00MmQxLWI2NWItMzRlZDE4NzlkYThh/a21f52059a2d099abf9ff73539d4618b?expires=1717678778&partial=true'}]

It will happily accept dependencies that look like:

[{'collection_id': 'SENTINEL1_GRD', 'batch_request_ids': ['9ecd1d54-3b44-492f-9021-6eed00ea8a30'], 'results_location': 's3://openeo-sentinelhub/9ecd1d54-3b44-492f-9021-6eed00ea8a30', 'card4l': False}]

or even

[{'collection_id': 'SENTINEL1_GRD']

even though the ES mapping type for dependencies is flattened, like e.g. job_options but that one does seem to allow arbitrary properties. 🤷

Maybe @JanssenBrm can explain this?

@bossie
Copy link
Collaborator Author

bossie commented May 31, 2024

Possible course of action:

  • 1. fix saving dependencies of type partial_job_results_url in ES: should work regardless of where the main job runs;
  • 2. force main job to run on Terrascope: either by manipulating the process graph or explicitly (Option to explicitly select primary backend for crossbackend processing openeo-aggregator#144);
  • 3. explicitly raise an error if the main job runs on CDSE: currently this typically fails further on with a "no data available" error but could theoretically also finish successfully with incomplete results;
  • 4. short-term workaround for not supporting async_task on CDSE: consider polling from within the main batch job;
  • 5. support async_task on CDSE.

@JanssenBrm
Copy link

A new version has been deployed that should fix this issue from EJR side point of view. Can you verify if you still see the bad request error?

@bossie
Copy link
Collaborator Author

bossie commented May 31, 2024

EJR is fixed, dependencies are good.

This job ran to completion; the only difference is the order of the arguments of merge_cubes so there's that:

connection = openeo.connect("openeofed.dataspace.copernicus.eu").authenticate_oidc()

EXTENT = dict(zip(["west", "south", "east", "north"],
                  [5.318868004541495, 50.628576059801816, 5.3334400271343725, 50.637843899562576]))
EXTENT['crs'] = "EPSG:4326"
STARTDATE = '2022-01-01'
ENDDATE = '2022-03-31'

s2_cube = connection.load_collection("SENTINEL2_L2A", spatial_extent=EXTENT,
                                     temporal_extent=[STARTDATE, ENDDATE], bands=["B04"])
meteo_cube = connection.load_collection("AGERA5", spatial_extent=EXTENT, temporal_extent=[STARTDATE, ENDDATE],
                                        bands=["temperature-mean"])

s2_cube = s2_cube.aggregate_temporal_period(period='month', reducer='median', dimension='t')
meteo_cube = meteo_cube.aggregate_temporal_period(period='month', reducer='mean', dimension='t')

inputs = meteo_cube.merge_cubes(s2_cube)

job = inputs.create_job(
    out_format="NetCDF",
    title="Test extraction job",
    job_options={
        "split_strategy": "crossbackend",
        "driver-memory": "2G",
        "driver-memoryOverhead": "2G",
        "driver-cores": "1",
        "executor-memory": "1800m",
        "executor-memoryOverhead": "1900m",
    }
)
job.start_and_wait()

@bossie
Copy link
Collaborator Author

bossie commented May 31, 2024

To avoid confusion and wrong results in the short term, main jobs on CDSE will now fail fast with an error like this:

E               openeo.rest.OpenEoApiError: [501] Internal: this backend does not support loading unfinished results from https://openeo.dataspace.copernicus.eu/openeo/1.2/jobs/j-240531df993e4a1eaf8f149ecd40e053/results/OTc2MGU1OWItNTY2ZC00MmQxLWI2NWItMzRlZDE4NzlkYThh/7fc817d5f9b343abd84d7b89acfe5bfe?expires=1717766014&partial=true with load_stac (ref: r-24053154a1224203a1a1e9b462da4797)

@bossie
Copy link
Collaborator Author

bossie commented May 31, 2024

429 errors are being handled in Open-EO/openeo-geotrellis-extensions#299.

@bossie bossie assigned bossie and unassigned EmileSonneveld May 31, 2024
@JeroenVerstraelen JeroenVerstraelen changed the title load_stac gives no data in aggregator load_stac gives no data cross backend Jun 4, 2024
@bossie
Copy link
Collaborator Author

bossie commented Jun 4, 2024

Darius is unblocked so this became less urgent.

@bossie
Copy link
Collaborator Author

bossie commented Jun 18, 2024

As discussed: as a first implementation, try polling from within the batch job. This then becomes a default implementation for which no extra infrastructure is required and effectively platform-agnostic.

@bossie
Copy link
Collaborator Author

bossie commented Jun 25, 2024

Random ramblings to summarize things and refresh my memory.

In the case of OG SHub:

  1. start_job interprets load_collection(some_extent) and schedules an appropriate SHub batch process to dump its results at a particular S3 "directory";
  2. then offloads polling this batch process to an async_task;
  3. if DONE, async_task submits a batch job on YARN, passing said S3 "directory" as a parameter;
  4. in the batch job, instead of doing anything with SHub at all, process graph evaluation will map the original load_collection to the S3 "directory" and load a data cube from there.

In the case of unfinished job results:

  1. start_job interprets load_stac(partial_results_url); there is nothing to schedule because the dependency job is managed externally;
  2. then offloads polling this dependency job to an async_task;
  3. if finished: async_task submits a batch job on YARN; no extra parameters are passed because unnecessary;
  4. in the batch job, process graph evaluation will simply do a load_stac(partial_results_url); this will succeed because the dependency job has finished while polling.

@bossie
Copy link
Collaborator Author

bossie commented Jun 25, 2024

The easiest way to do the polling in the main batch job seems to be to just poll partial_results_url in the load_stac method itself, actually loading a data cube if the dependency job eventually finishes, because:

  • load_stac already has the partial_results_url so there's no mapping involved;
  • yes, this stalls the process graph evaluation but it does not affect job duration (dependency jobs still run in parallel);
  • in case there actually was an async_task that did the polling in advance (like on YARN), nothing has to be changed in load_stac: its polling will finish immediately so there's no additional delay.

Notes:

  • As such, async_task essentially becomes a way to do the polling in a separate process rather than the batch job itself.
  • A user is not able to load_stac from actual partial results (load_stac will await their completion instead), but that was also not the case in the current implementation (and would probably require an additional parameter to control this behavior too).

bossie added a commit that referenced this issue Jun 26, 2024
bossie added a commit that referenced this issue Jun 27, 2024
bossie added a commit that referenced this issue Jun 27, 2024
@bossie
Copy link
Collaborator Author

bossie commented Jun 27, 2024

Original job ran successfully on openeofed-staging (agg-pj-20240627-113805).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants