Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extraction pipelines & data fetching blocks #36

Merged
merged 11 commits into from
May 22, 2024
Merged

Extraction pipelines & data fetching blocks #36

merged 11 commits into from
May 22, 2024

Conversation

GriffinBabe
Copy link
Contributor

@GriffinBabe GriffinBabe commented Mar 20, 2024

Implemented extraction pipelines for Sentinel2, Sentinel1 and AGERA5

The following pipelines are finished:

  • Sentinel-2
  • Sentinel-1
  • AGERA5

@GriffinBabe
Copy link
Contributor Author

Remaining for the Sentinel-2 Pipeline:

  • Finish the memory optimization removing observations with very large high proba cloud mask coverage.
  • Analyze backend behavior and optimize possible bottlenecks @jdries

@GriffinBabe
Copy link
Contributor Author

GriffinBabe commented Mar 20, 2024

@jdries You can start profiling the backend bottlenecks within the scripts/extraction/extract_optical.py file in this branch.

There is a full dataset with a realistic density here:
/data/users/Public/couchard/world_cereal/2019_BE_Flanders_rdm_grid_samples.gpkg

So far, I managed to run jobs with 100 samples, with a peak memory usage a bit higher than 5GB, and 3hours of processing costing 400 credits (CDSE-STAGING: j-2403205396124e2f8d417a0c39667ced). Memory usage seems fine but it would be nice to speed-up the job. The results are in this folder: /data/users/Public/couchard/world_cereal/extractions_optical

I did other tests with much less samples and it seems to me that the memory scales slowly up with the number of samples, which seems good.

This line here subsamples the divided openeo jobs into a single one for testing purposes:

@jdries
Copy link
Contributor

jdries commented Mar 20, 2024

Memory settings are quite high compared to what's actually used. My first guess would be to try and lower that.
There's some use of 'resample_cube_spatial' that can probably be avoided, and would make analysis easier.
I also don't see the conservative cloud masking yet.

Majority of time spent is in reading data. It's clearly still the case that reading many small blocks from object storage is not efficient.

@GriffinBabe
Copy link
Contributor Author

GriffinBabe commented Mar 21, 2024

@jdries
According our last discussion I generated a new job which is currently running (cdse-staging j-240321753d184a679a681f8c3c7e4642)

  • A conservative mask is applied
  • The SCL Dilated masks and the distance to clouds are both computed from the same input collection
  • I removed useless resample_cube_spatial and resample_spatial operations
  • I set the memory to 6GB (executor memory + overhead) instead of 8GB
  • Now sampling 500 locations, but will try increasing this later to 1000

@GriffinBabe
Copy link
Contributor Author

S1 pipeline seems to work well, managed to extract 500 samples with close to 6GB of memory

@jdries
Copy link
Contributor

jdries commented Mar 22, 2024

Found another cause of memory usage in S2: the distance to cloud computation works on the whole timeseries, while it should only need individual observations.
So the size argument needs to include time dimension:

{
      "dimension": "t",
     "unit": null,
     "value": "P1D"
}

This will then also require the UDF to be adjusted, maybe need to check with @VincentVerelst

This would be my guess for UDF without time dim:

import numpy as np
import xarray as xr
from scipy.ndimage import distance_transform_cdt
from skimage.morphology import binary_erosion, footprints


def apply_datacube(cube: xr.DataArray, context: dict) -> xr.DataArray:
    cube_array: xr.DataArray = cube
    cube_array = cube_array.transpose("bands", "y", "x")

    clouds = np.logical_or(np.logical_and(cube_array < 11, cube_array >= 8), cube_array == 3).isel(
        bands=0
    )

    # Calculate the Distance To Cloud score
    # Erode
    er = footprints.disk(3)

    # Define a function to apply binary erosion
    def erode(image, selem):
        return ~binary_erosion(image, selem)

    # Use apply_ufunc to apply the erosion operation
    eroded = xr.apply_ufunc(
        erode,  # function to apply
        clouds,  # input DataArray
        input_core_dims=[["y", "x"]],  # dimensions over which to apply function
        output_core_dims=[["y", "x"]],  # dimensions of the output
        vectorize=True,  # vectorize the function over non-core dimensions
        dask="parallelized",  # enable dask parallelization
        output_dtypes=[np.int32],  # data type of the output
        kwargs={"selem": er},  # additional keyword arguments to pass to erode
    )

    # Distance to cloud in manhattan distance measure
    distance = xr.apply_ufunc(
        distance_transform_cdt,
        eroded,
        input_core_dims=[["y", "x"]],
        output_core_dims=[["y", "x"]],
        vectorize=True,
        dask="parallelized",
        output_dtypes=[np.int32],
    )

    distance_da = xr.DataArray(
        distance,
        coords={
            "y": cube_array.coords["y"],
            "x": cube_array.coords["x"],
        },
        dims=["y", "x"],
    )

    distance_da = distance_da.expand_dims(
        dim={
            "bands": cube_array.coords["bands"],
        },
    )

    distance_da = distance_da.transpose( "bands", "y", "x")

    return distance_da

@jdries
Copy link
Contributor

jdries commented Mar 22, 2024

Another one: the cloud mask is now applied, but there's a 'merge_cubes' before the mask process.
I think this will still cause a full data load, which we try to avoid. Switching masking and merging should solve this.
There's then probably still the question of avoiding dates for masked data in the output netcdf. Need to see how to solve that, perhaps by setting the fully masked areas to nodata.

@GriffinBabe
Copy link
Contributor Author

@jdries Without the two optimizations you mentionned here I managed to make the extraction of 500 points for 8GB of peak memory, 580 credits and 4hours wall time (cdse-staging j-24032149eeb847fbbfdc5a6bfca9622c). I will run a new one with the same amount of samples and see the difference in memory usage.

I also ran an extraction on S1 data with 6GB peak memory for 500 points, 160 credits and ~1h30 wall time. Seems quite efficient to me, but could you give it a look to see how can we optimize it? (cdse production j-240321d24a59494d848ae4eedc695edf)

@jdries
Copy link
Contributor

jdries commented Mar 22, 2024

For sentinel-1, can you try setting executor-memory to 1G, keeping memory-overhead the same.

@jdries
Copy link
Contributor

jdries commented Mar 22, 2024

For sentinel-2, I'm going to try writing an alternative loader. May take a little bit of time, but might be worth it as I again notice the big difference between loading data in bulk vs samples.

@jdries
Copy link
Contributor

jdries commented Mar 23, 2024

New strategy looks promising, job ran with 2.5GB per executor, taking only 57 cpu hours in spark.
j-240323a9dcdf498898d8be0b12624a4d => 113 credits
Settings:

{
      "driver-memory": "2G",
      "driver-memoryOverhead": "2G",
      "driver-cores": "1",
      "executor-memory": "600m",
      "executor-memoryOverhead": "1900m",
      "executor-cores": "1",
      "max-executors": "34",
      "soft-errors": "true",
      "gdal-dataset-cache-size":2,
      "gdal-cachemax":120,
      "executor-threads-jvm":1
  }

Still need to work on ensuring correctness of the result. Also the end of the job takes too long to look up sample metadata. This could probably be done in parallel as a quickfix.
Now that the actual reading is a lot faster, there's also other areas to look at.

@GriffinBabe
Copy link
Contributor Author

@jdries For the S1 job, I think that limiting the memory overhead of the executor to 1GB is too little. This jobs seems to crash (I think for OOM reasons)

j-240325d99399443cabd88a9b4c0fb17f CDSE-STAGING

@jdries
Copy link
Contributor

jdries commented Mar 25, 2024

@GriffinBabe my suggestion was:

try setting executor-memory to 1G, keeping memory-overhead the same

you tried the other way round, which I would indeed expect to crash.
executor-memory is used by java and spark, which is fairly good at managing memory.
memory-overhead is for python and C code like orfeo and gdal, which is quite bad at keeping memory within bounds, hence jobs often need less executor-memory, and more overhead...

@jdries
Copy link
Contributor

jdries commented Mar 26, 2024

A working version of the performance improvement is available on CDSE staging. Patches look fine upon first inspection, but deeper comparison is needed. To enable the optimization, we need to set a feature flag on the load_collection call that loads the optical bands. Here's an example of how to do that (the second line):

cube = connection.load_collection("SENTINEL2_L2A",bands=["B02","B03","B04"],temporal_extent=["2023-08-01","2023-08-31"],max_cloud_cover=85)
    cube.result_node().update_arguments(featureflags={"tilesize": 128, "load_per_product": True})

Also the job options have a large impact on performance. Note that I run with a small number of executors, to minimize loss when idle, but you can increase it if you want to iterate faster:

job_options = {
        "driver-memory": "2G",
        "driver-memoryOverhead": "2G",
        "driver-cores": "1",
        "executor-memory": "600m",
        "executor-memoryOverhead": "1900m",
        "executor-cores": "1",
        "max-executors": "26",
        "soft-errors": "true",
        "gdal-dataset-cache-size":2,
        "gdal-cachemax":120,
        "executor-threads-jvm":1
    }

Note that the feature flag enables a very experimental code path, please don't start using it everywhere without asking or properly checking results. Processes like mask_scl_dilation are not yet supported for sure.

As for next steps, I would recommend to start up sample extraction where we do not yet apply cloud masking, and then properly verify results. Then we can add the cloud masking when the more complex strategy works.

I also still see other things that can be made faster, but takes some time to integrate everything.

Copy link
Contributor

@kvantricht kvantricht left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work, though I have to say it's quite lengthy code and it doesn't feel very modular. If this is what a user typically needs to do, it's quite something. I have the feeling that some of it could be embedded in GFMAP and also be written a bit more modular, so the actual user-based extraction scripts are also more readable. What do you think?

scripts/extractions/extract_meteo.py Show resolved Hide resolved
scripts/extractions/extract_meteo.py Show resolved Hide resolved
scripts/extractions/extract_meteo.py Outdated Show resolved Hide resolved
scripts/extractions/extract_meteo.py Show resolved Hide resolved
scripts/extractions/extract_optical.py Show resolved Hide resolved
scripts/extractions/extract_sar.py Outdated Show resolved Hide resolved
scripts/extractions/extract_sar.py Outdated Show resolved Hide resolved
@GriffinBabe GriffinBabe changed the title Extraction pipelines Extraction pipelines & data fetching blocks Apr 30, 2024
@GriffinBabe
Copy link
Contributor Author

@kvantricht This pull request will be much larger than it used to be, as I also started working on the inference blocks using GFMAP to extract the data in WorldCereal, instead of the usual recycled cropclass/HRL code that was in the repository.

The old code will still be accessible through the versioning, but I removed it in this branch (to be merged whenever we confirm that a fetched datacube will have the same values here as it had with the previous pipeline)

@kvantricht
Copy link
Contributor

@kvantricht This pull request will be much larger than it used to be, as I also started working on the inference blocks using GFMAP to extract the data in WorldCereal, instead of the usual recycled cropclass/HRL code that was in the repository.

The old code will still be accessible through the versioning, but I removed it in this branch (to be merged whenever we confirm that a fetched datacube will have the same values here as it had with the previous pipeline)

Did you tackle my previous comments? Wouldn't it be good to have a new stable version (so a merged PR) before you're actually gone?

@GriffinBabe GriffinBabe marked this pull request as ready for review May 22, 2024 10:35
@GriffinBabe
Copy link
Contributor Author

@kvantricht I resolved your comments and updates the pipelines. I think there's going to be a few changes later when extracting S1 and METEO but we have a solid foundation

)

# Rescale to uint16, multiplying by 100 first
cube = cube * 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it is confirmed that cube contains the unscaled values which is why you reintroduce it here?

"description": "Sentinel-2 bands",
"type": "application/x-netcdf",
"roles": ["data"],
"proj:shape": [64, 64],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this have to be hardcoded here? What if patch size changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that ideally it shouldn't be there. We discussed this with @VincentVerelst and we found no easy solution, as this definition is for a specific patch size for specific bands set selected, it is therefore application specific.

However, it should on the long run be part of the extraction pipeline in GFMAP. For example, the feature extractor and the postprocessing parameters should automatically in the parallel create those STAC definitions. But it is not a trivial issue and needs to be designed with a long-term view

Copy link
Contributor

@kvantricht kvantricht left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved for now after discussion with Darius. Some future changes planned.

@kvantricht kvantricht merged commit cfa2f76 into main May 22, 2024
1 check passed
@kvantricht kvantricht deleted the 29-pipelines branch May 22, 2024 13:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants