diff --git a/CHANGES.rst b/CHANGES.rst index 17fad9b..75b335c 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -20,6 +20,14 @@ Changes ======= + +Version 0.8.3 (2022-10-03) +-------------------------- + +- Add support to customize data cube path and data cube item (`#236 `_) +- Review docs related with new path format cubes + + Version 0.8.2 (2022-09-21) -------------------------- diff --git a/INSTALL.rst b/INSTALL.rst index 68506cf..ca83585 100644 --- a/INSTALL.rst +++ b/INSTALL.rst @@ -77,6 +77,7 @@ Install in development mode: .. code-block:: shell + $ pip3 install -U pip setuptools wheel $ pip3 install -e .[all] diff --git a/USING.rst b/USING.rst index 3763d66..577ea20 100644 --- a/USING.rst +++ b/USING.rst @@ -126,14 +126,15 @@ The response will have status code ``201`` and the body:: Creating data cube Landsat-8 ---------------------------- -In order to create data cube ``Landsat-8`` monthly using the composite function ``Least Cloud Cover First`` (`LC8_30_1M_LCF`), use the following command to create data cube metadata:: +In order to create data cube ``Landsat-8`` monthly using the composite function ``Least Cloud Cover First`` (`LC8-1M`), use the following command to create data cube metadata:: curl --location \ --request POST '127.0.0.1:5000/cubes' \ --header 'Content-Type: application/json' \ --data-raw ' { - "datacube": "LC8", + "datacube": "LC8-1M", + "datacube_identity": "LC8", "grs": "BRAZIL_MD", "title": "Landsat-8 (OLI) Cube Monthly - v001", "resolution": 30, @@ -230,7 +231,7 @@ Brazil Data Cube environment. If you don't have any account, please, refer to `B Once the data cube definition is created, you can trigger a data cube using the following command:: SQLALCHEMY_DATABASE_URI="postgresql://postgres:postgres@localhost/bdc" \ - cube-builder build LC8_30_1M_LCF \ + cube-builder build LC8-1M \ --stac-url https://brazildatacube.dpi.inpe.br/stac/ \ --collections=LC8_SR-1 \ --tiles=011009 \ @@ -244,12 +245,12 @@ Once the data cube definition is created, you can trigger a data cube using the # Using curl (Make sure to execute cube-builder run) curl --location \ - --request POST '127.0.0.1:5000/start-cube' \ + --request POST '127.0.0.1:5000/start' \ --header 'Content-Type: application/json' \ --data-raw '{ "stac_url": "https://brazildatacube.dpi.inpe.br/stac/", "token": "", - "datacube": "LC8_30_1M_LCF", + "datacube": "LC8-1M", "collections": ["LC8_SR-1"], "tiles": ["011009"], "start_date": "2019-01-01", @@ -278,7 +279,8 @@ In order to create data cube Sentinel 2, use the following command to create dat --header 'Content-Type: application/json' \ --data-raw ' { - "datacube": "S2", + "datacube": "S2-16D", + "datacube_identity": "S2", "grs": "BRAZIL_SM", "title": "Sentinel-2 SR - Cube LCF 16 days -v001", "resolution": 10, @@ -364,11 +366,11 @@ In order to create data cube Sentinel 2, use the following command to create dat } }' -In order to trigger a data cube, we are going to use a collection `S2_10_16D_LCF-1` made with Surface Reflectance using Sen2Cor:: +In order to trigger a data cube, we are going to use a collection `S2-16-1` made with Surface Reflectance using Sen2Cor:: # Using cube-builder command line SQLALCHEMY_DATABASE_URI="postgresql://postgres:postgres@localhost/bdc" \ - cube-builder build S2_10_16D_LCF \ + cube-builder build S2-16D \ --stac-url https://brazildatacube.dpi.inpe.br/stac/ \ --collections=S2_L2A-1 \ --tiles=017019 \ @@ -389,7 +391,8 @@ In order to create data cube CBERS4 AWFI, use the following command to create da --header 'Content-Type: application/json' \ --data-raw ' { - "datacube": "CB4", + "datacube": "CB4-16D", + "datacube_identity": "CB4", "grs": "BRAZIL_LG", "title": "CBERS-4 (AWFI) SR - Data Cube LCF 16 days - v001", "resolution": 64, @@ -473,7 +476,7 @@ Trigger data cube generation with following command: # Using cube-builder command line SQLALCHEMY_DATABASE_URI="postgresql://postgres:postgres@localhost/bdc" \ - cube-builder build CB4_64_16D_LCF \ + cube-builder build CB4-16D \ --stac-url https://brazildatacube.dpi.inpe.br/stac/ \ --collections=CBERS4_AWFI_L4_SR \ --tiles=005004 \ @@ -489,10 +492,10 @@ When the ``Cube-Builder`` could not generate data cube for any unknown issue, yo with the same command you have dispatched:: SQLALCHEMY_DATABASE_URI="postgresql://postgres:postgres@localhost/bdc" \ - cube-builder build CB4_64_16D_LCF \ + cube-builder build CB4-16D \ --stac-url https://brazildatacube.dpi.inpe.br/stac/ \ --collections=CBERS4_AWFI_L4_SR \ - --tiles=022024 \ + --tiles=005004 \ --start=2019-01-01 \ --end=2019-01-31 \ --token @@ -500,10 +503,10 @@ with the same command you have dispatched:: It will reuse most of files that were already processed, executing only the failed tasks. If you notice anything suspicious or want to re-create theses files again, use the option ``--force``:: SQLALCHEMY_DATABASE_URI="postgresql://postgres:postgres@localhost/bdc" \ - cube-builder build CB4_64_16D_LCF \ + cube-builder build CB4-16D \ --stac-url https://brazildatacube.dpi.inpe.br/stac/ \ --collections=CBERS4_AWFI_L4_SR \ - --tiles=022024 \ + --tiles=005004 \ --start=2019-01-01 \ --end=2019-01-31 \ --token \ @@ -515,10 +518,10 @@ Data Cube Parameters The ``Cube-Builder`` supports a few parameters to be set during the data cube execution. -In order to check the parameters associated with data cube ``CB4_64_16D_STK-1``, use the command:: +In order to check the parameters associated with data cube ``CB4-16D-1``, use the command:: SQLALCHEMY_DATABASE_URI="postgresql://postgres:postgres@localhost/bdc" \ - cube-builder show-parameters CB4_64_16D_LCF-1 + cube-builder show-parameters CB4-16D-1 The following output represents all the parameters related with the given data cube:: @@ -532,7 +535,7 @@ The following output represents all the parameters related with the given data c You can change any parameter with the command ``cube-builder configure`` with ``DataCubeName-Version``:: SQLALCHEMY_DATABASE_URI="postgresql://postgres:postgres@localhost/bdc" \ - cube-builder configure CB4_64_16D_LCF-1 --stac-url=AnySTAC + cube-builder configure CB4-16D-1 --stac-url=AnySTAC .. note:: diff --git a/cube_builder/celery/tasks.py b/cube_builder/celery/tasks.py index ccfe0c5..80594b2 100644 --- a/cube_builder/celery/tasks.py +++ b/cube_builder/celery/tasks.py @@ -34,9 +34,8 @@ from ..models import Activity from ..utils import get_srid_column from ..utils.image import check_file_integrity, create_empty_raster, match_histogram_with_merges -from ..utils.processing import DataCubeFragments from ..utils.processing import blend as blend_processing -from ..utils.processing import build_cube_path, compute_data_set_stats, get_cube_id, get_item_id, get_or_create_model +from ..utils.processing import build_cube_path, compute_data_set_stats, get_item_id, get_or_create_model from ..utils.processing import merge as merge_processing from ..utils.processing import post_processing_quality, publish_datacube, publish_merge from ..utils.timeline import temporal_priority_timeline @@ -107,16 +106,16 @@ def warp_merge(activity, band_map, mask, force=False, data_dir=None, **kwargs): collection_id = f'{record.collection_id}-{version}' if kwargs.get('reuse_data_cube'): - ref_cube_idt = get_cube_id(kwargs['reuse_data_cube']['name']) + ref_cube_idt = kwargs['reuse_data_cube']['name'] # TODO: Should we search in Activity instead? merge_file_path = build_cube_path(ref_cube_idt, merge_date, tile_id, version=kwargs['reuse_data_cube']['version'], band=record.band, - prefix=data_dir) # check published dir + prefix=data_dir, composed=False, **kwargs) # check published dir if merge_file_path is None: merge_file_path = build_cube_path(record.warped_collection_id, merge_date, tile_id, version=version, band=record.band, - prefix=data_dir) + prefix=data_dir, composed=False, **kwargs) if activity['band'] == quality_band and len(activity['args']['datasets']): kwargs['build_provenance'] = True @@ -267,9 +266,16 @@ def prepare_blend(merges, band_map: dict, reuse_data_cube=None, **kwargs): version = merges[0]['args']['version'] identity_cube = merges[0]['warped_collection_id'] + collection_ref: Collection = ( + Collection.query() + .filter(Collection.name == merges[0]['collection_id'], + Collection.version == version) + .first() + ) + composite_function = collection_ref.composite_function.alias if reuse_data_cube: - identity_cube = get_cube_id(reuse_data_cube['name']) + identity_cube = reuse_data_cube['name'] version = reuse_data_cube['version'] kwargs['mask'] = kwargs['mask'] or dict() @@ -283,20 +289,20 @@ def prepare_blend(merges, band_map: dict, reuse_data_cube=None, **kwargs): if not was_reused: logging.info(f'Applying post-processing in {str(quality_file)}') post_processing_quality(quality_file, bands, identity_cube, - period, merges[0]['tile_id'], quality_band, band_map, + period, merges[0]['tile_id'], + band_map=band_map, version=version, block_size=block_size, - datasets=merges[0]['args']['datasets']) + datasets=merges[0]['args']['datasets'], + **kwargs) else: logging.info(f'Skipping post-processing {str(quality_file)}') - def _is_not_stk(_merge): + def _is_not_stk(merge): """Control flag to generate cloud mask. This function is a utility to dispatch the cloud mask generation only for STK data cubes. """ - collection_id = _merge['collection_id'] - fragments = DataCubeFragments(collection_id) - return _merge['band'] == quality_band and fragments.composite_function not in ('STK', 'LCF') + return merge['band'] == quality_band and composite_function not in ('STK', 'LCF') for _merge in merges: # Skip quality generation for MEDIAN, AVG @@ -306,6 +312,7 @@ def _is_not_stk(_merge): activity = activities.get(_merge['band'], dict(scenes=dict())) + activity['composite_function'] = composite_function activity['datacube'] = _merge['collection_id'] activity['warped_datacube'] = _merge['warped_collection_id'] activity['band'] = _merge['band'] @@ -382,10 +389,8 @@ def _is_not_stk(_merge): # Prepare list of activities to dispatch activity_list = list(activities.values()) - datacube = activity_list[0]['datacube'] - # For IDENTITY data cube trigger, just publish - if DataCubeFragments(datacube).composite_function == 'IDT': + if composite_function == 'IDT': task = publish.s(list(activities.values()), reuse_data_cube=reuse_data_cube, band_map=band_map, **kwargs) return task.apply_async() @@ -427,9 +432,9 @@ def blend(activity, band_map, build_clear_observation=False, reuse_data_cube=Non logging.warning('Executing blend - {} - {}'.format(activity.get('datacube'), activity.get('band'))) - return blend_processing(activity, band_map, kwargs['quality_band'], build_clear_observation, - block_size=block_size, reuse_data_cube=reuse_data_cube, - apply_valid_range=kwargs.get('apply_valid_range')) + return blend_processing(activity, band_map, + build_clear_observation=build_clear_observation, + block_size=block_size, reuse_data_cube=reuse_data_cube, **kwargs) @celery_app.task(queue=Config.QUEUE_PUBLISH_CUBE) @@ -468,7 +473,7 @@ def publish(blends, band_map, quality_band: str, reuse_data_cube=None, **kwargs) merges = dict() blend_files = dict() - composite_function = DataCubeFragments(cube.name).composite_function + composite_function = cube.composite_function.alias quality_blend = dict(efficacy=100, cloudratio=0) @@ -525,7 +530,8 @@ def publish(blends, band_map, quality_band: str, reuse_data_cube=None, **kwargs) continue _merge_result[merge_date] = publish_merge(quick_look_bands, wcube, tile_id, merge_date, definition, - reuse_data_cube=reuse_data_cube, srid=srid, data_dir=kwargs.get('data_dir')) + reuse_data_cube=reuse_data_cube, srid=srid, + **kwargs) try: db.session.commit() diff --git a/cube_builder/controller.py b/cube_builder/controller.py index ce148bc..592d848 100644 --- a/cube_builder/controller.py +++ b/cube_builder/controller.py @@ -18,27 +18,27 @@ """Define Cube Builder business interface.""" +import warnings from copy import deepcopy from datetime import datetime from typing import Tuple, Union # 3rdparty import sqlalchemy -from bdc_catalog.models import (Band, BandSRC, Collection, CompositeFunction, GridRefSys, Item, MimeType, Quicklook, - ResolutionUnit, SpatialRefSys, Tile, db) +from bdc_catalog.models import (Band, BandSRC, Collection, CollectionSRC, CompositeFunction, GridRefSys, Item, MimeType, + Quicklook, ResolutionUnit, SpatialRefSys, Tile, db) from geoalchemy2 import func from rasterio.crs import CRS from werkzeug.exceptions import NotFound, abort from .constants import (CLEAR_OBSERVATION_ATTRIBUTES, CLEAR_OBSERVATION_NAME, COG_MIME_TYPE, DATASOURCE_ATTRIBUTES, - PROVENANCE_ATTRIBUTES, PROVENANCE_NAME, SRID_ALBERS_EQUAL_AREA, TOTAL_OBSERVATION_ATTRIBUTES, - TOTAL_OBSERVATION_NAME) + PROVENANCE_ATTRIBUTES, PROVENANCE_NAME, TOTAL_OBSERVATION_ATTRIBUTES, TOTAL_OBSERVATION_NAME) from .forms import CollectionForm from .grids import create_grids from .models import Activity, CubeParameters from .utils import get_srid_column from .utils.image import validate_merges -from .utils.processing import get_cube_parts, get_or_create_model +from .utils.processing import get_or_create_model from .utils.serializer import Serializer from .utils.timeline import Timeline @@ -122,11 +122,7 @@ def _create_cube_definition(cls, cube_id: str, params: dict) -> dict: Returns: A serialized data cube information. """ - cube_parts = get_cube_parts(cube_id) - - function = cube_parts.composite_function - - cube_id = cube_parts.datacube + function = params['composite_function'] cube = Collection.query().filter(Collection.name == cube_id, Collection.version == params['version']).first() @@ -170,7 +166,7 @@ def _create_cube_definition(cls, cube_id: str, params: dict) -> dict: for band in params['bands']: name = band['name'].strip() - if name in default_bands: + if name.lower() in default_bands: continue is_not_cloud = params['quality_band'] != band['name'] if params.get('quality_band') is not None else False @@ -248,16 +244,18 @@ def create(cls, params): Returns: Tuple with serialized cube and HTTP Status code, respectively. """ - cube_name = '{}_{}'.format( - params['datacube'], - int(params['resolution']) - ) + cube_name = cube_identity = params['datacube'] + # When custom idt name given, use it. + if params.get('datacube_identity'): + cube_identity = params['datacube_identity'] params['bands'].extend(params['indexes']) with db.session.begin_nested(): # Create data cube Identity - cube = cls._create_cube_definition(cube_name, params) + identity_params = deepcopy(params) + identity_params['composite_function'] = 'IDT' + cube = cls._create_cube_definition(cube_identity, identity_params) cube_serialized = [cube] @@ -266,12 +264,26 @@ def create(cls, params): unit = params['temporal_composition']['unit'][0].upper() temporal_str = f'{step}{unit}' - cube_name_composite = f'{cube_name}_{temporal_str}_{params["composite_function"]}' + if cube_name == cube_identity: + cube_name_composite = f'{cube_name}-{temporal_str}' + warnings.warn(f'Data cube composed {cube_name} with same name of identity {cube_identity}.' + f'Renaming {cube_name} to {cube_name_composite}. ' + f'It will be deprecated in Cube-Builder 1.0. ' + f'Use the parameters "datacube" and "datacube_identity".', + DeprecationWarning, stacklevel=2) + cube_name = cube_name_composite # Create data cube with temporal composition - cube_composite = cls._create_cube_definition(cube_name_composite, params) + cube_composite = cls._create_cube_definition(cube_name, params) cube_serialized.append(cube_composite) + # Create relationship between identity and composed + collection_src_opts = { + 'collection_id': cube_composite['id'], + 'collection_src_id': cube['id'] + } + _, _ = get_or_create_model(CollectionSRC, defaults=collection_src_opts, **collection_src_opts) + db.session.commit() return cube_serialized, 201 @@ -568,9 +580,13 @@ def create_grs_schema(cls, names, description, projection, meridian, shape, tile spatial_index, _ = get_or_create_model(SpatialRefSys, defaults=data, srid=srid) - grs = GridRefSys.create_geometry_table(table_name=name, - features=grid['features'], - srid=srid) + try: + grs = GridRefSys.create_geometry_table(table_name=name, + features=grid['features'], + srid=srid) + except RuntimeError: + abort(409, f'GRS / Table {name} already exists.') + grs.description = description db.session.add(grs) for tile_obj in grid['tiles']: diff --git a/cube_builder/forms.py b/cube_builder/forms.py index 637b97a..0605639 100644 --- a/cube_builder/forms.py +++ b/cube_builder/forms.py @@ -120,6 +120,7 @@ class DataCubeForm(Schema): """Define parser for datacube creation.""" datacube = fields.String(required=True, allow_none=False, validate=Regexp('^[a-zA-Z0-9-]*$', error=INVALID_CUBE_NAME)) + datacube_identity = fields.String(required=False, allow_none=False, validate=Regexp('^[a-zA-Z0-9-]*$', error=INVALID_CUBE_NAME)) grs = fields.String(required=True, allow_none=False) resolution = fields.Integer(required=True, allow_none=False) temporal_composition = fields.Dict(required=True, allow_none=False) @@ -130,12 +131,12 @@ class DataCubeForm(Schema): indexes = fields.Nested(BandDefinition, many=True) metadata = fields.Dict(required=True, allow_none=True) description = fields.String(required=True, allow_none=False) - version = fields.Integer(required=True, allow_none=False, default=1) + version = fields.Integer(required=True, allow_none=False, dump_default=1) title = fields.String(required=True, allow_none=False) - # Set cubes as public by default. - public = fields.Boolean(required=False, allow_none=False, default=True) + # Set cubes as public by dump_default. + public = fields.Boolean(required=False, allow_none=False, dump_default=True) # Is Data cube generated from Combined Collections? - is_combined = fields.Boolean(required=False, allow_none=False, default=False) + is_combined = fields.Boolean(required=False, allow_none=False, dump_default=False) parameters = fields.Nested(CubeParametersSchema, required=True, allow_none=False, many=False) @pre_load @@ -188,7 +189,7 @@ class DataCubeMetadataForm(Schema): metadata = fields.Dict(required=False, allow_none=True) description = fields.String(required=False, allow_none=False) title = fields.String(required=False, allow_none=False) - public = fields.Boolean(required=False, allow_none=False, default=True) + public = fields.Boolean(required=False, allow_none=False, dump_default=True) bands = fields.Nested(BandForm, required=False, many=True) @@ -201,15 +202,15 @@ class DataCubeProcessForm(Schema): start_date = fields.Date() end_date = fields.Date() bands = fields.List(fields.String, required=False) - force = fields.Boolean(required=False, default=False) - with_rgb = fields.Boolean(required=False, default=False) + force = fields.Boolean(required=False, dump_default=False) + with_rgb = fields.Boolean(required=False, dump_default=False) token = fields.String(required=False, allow_none=True) stac_url = fields.String(required=False, allow_none=True) shape = fields.List(fields.Integer(required=False)) - block_size = fields.Integer(required=False, default=512) + block_size = fields.Integer(required=False, dump_default=512) # Reuse data cube from another data cube reuse_from = fields.String(required=False, allow_none=True) - histogram_matching = fields.Boolean(required=False, default=False) + histogram_matching = fields.Boolean(required=False, dump_default=False) mask = fields.Dict() diff --git a/cube_builder/maestro.py b/cube_builder/maestro.py index 0686d63..f9307c6 100644 --- a/cube_builder/maestro.py +++ b/cube_builder/maestro.py @@ -32,7 +32,7 @@ import numpy import shapely.geometry import sqlalchemy -from bdc_catalog.models import Band, Collection, GridRefSys, Tile, db +from bdc_catalog.models import Band, Collection, CollectionSRC, GridRefSys, Tile, db from celery import chain, group from geoalchemy2 import func from geoalchemy2.shape import to_shape @@ -44,7 +44,7 @@ from .constants import CLEAR_OBSERVATION_NAME, DATASOURCE_NAME, PROVENANCE_NAME, TOTAL_OBSERVATION_NAME from .models import CubeParameters from .utils import get_srid_column -from .utils.processing import get_cube_id, get_or_create_activity +from .utils.processing import get_or_create_activity from .utils.timeline import Timeline @@ -180,7 +180,11 @@ def _stac(self, collection: str, url: str, **kwargs) -> STAC: def orchestrate(self): """Orchestrate datacube defintion and prepare temporal resolutions.""" - self.datacube = Collection.query().filter(Collection.name == self.params['datacube']).one() + self.datacube = ( + Collection.query() + .filter(Collection.name == self.params['datacube']) + .first_or_404(f'Cube {self.params["datacube"]} not found.') + ) temporal_schema = self.datacube.temporal_composition_schema @@ -331,12 +335,52 @@ def warped_datacube(self) -> Collection: self._warped = reused_datacube else: - datacube_warped = get_cube_id(self.datacube.name) + source = self.datacube + if self.datacube.composite_function.alias != 'IDT': + source = self.source(self.datacube, composite_function='IDT') + if source is None: + raise RuntimeError(f'Missing Identity cube for {self.datacube.name}-{self.datacube.version}') - self._warped = Collection.query().filter(Collection.name == datacube_warped).first() + self._warped = source return self._warped + @staticmethod + def sources(cube: Collection) -> List[Collection]: + """Trace data cube collection origin. + + It traces all the collection origin from the given datacube using + ``bdc_catalog.models.CollectionSRC``. + """ + out = [] + ref = cube + while ref is not None: + source: CollectionSRC = ( + CollectionSRC.query() + .filter(CollectionSRC.collection_id == ref.id) + .first() + ) + if source is None: + break + + ref = Collection.query().get(source.collection_src_id) + out.append(ref) + return out + + @staticmethod + def source(cube: Collection, composite_function=None): + """Trace the first data cube origin. + + Args: + cube (Collection): Data cube to trace. + composite_function (str): String composite function to filter. + """ + sources = Maestro.sources(cube) + for collection in sources: + if (composite_function and collection.composite_function and + collection.composite_function.alias == composite_function): + return collection + @property def datacube_bands(self) -> List[Band]: """Retrieve data cube bands based int user input.""" diff --git a/cube_builder/utils/image.py b/cube_builder/utils/image.py index cfd6577..af2bfd7 100644 --- a/cube_builder/utils/image.py +++ b/cube_builder/utils/image.py @@ -28,10 +28,11 @@ import numpy import rasterio from rasterio._warp import Affine +from rio_cogeo.cogeo import cog_translate +from rio_cogeo.profiles import cog_profiles from sqlalchemy.engine.result import ResultProxy, RowProxy from ..config import Config -from .processing import SmartDataSet, generate_cogs, save_as_cog LANDSAT_BANDS = dict( int16=['band1', 'band2', 'band3', 'band4', 'band5', 'band6', 'band7', 'evi', 'ndvi'], @@ -185,7 +186,7 @@ def create_empty_raster(location: str, proj4: str, dtype: str, xmin: float, ymax return str(location) -def match_histogram_with_merges(source: str, source_mask: str, reference: str, reference_mask: str, block_size: int = None): +def match_histogram_with_merges(source: str, source_mask: str, reference: str, reference_mask: str, **kwargs): """Normalize the source image histogram with reference image. This functions implements the `skimage.exposure.match_histograms`, which consists in the manipulate the pixels of an @@ -204,6 +205,8 @@ def match_histogram_with_merges(source: str, source_mask: str, reference: str, r """ from skimage.exposure import match_histograms as _match_histograms + block_size = kwargs.get('block_size') + with rasterio.open(source) as source_data_set, rasterio.open(source_mask) as source_mask_data_set: source_arr = source_data_set.read(1, masked=True) source_mask_arr = source_mask_data_set.read(1) @@ -293,3 +296,108 @@ def check_file_integrity(file_path: Union[str, Path], read_bytes: bool = False) return True except (rasterio.RasterioIOError, Exception): return False + + +def save_as_cog(destination: str, raster, mode='w', tags=None, block_size=None, **profile): + """Save the raster file as Cloud Optimized GeoTIFF. + + See Also: + Cloud Optimized GeoTiff https://gdal.org/drivers/raster/cog.html + + Args: + destination: Path to store the data set. + raster: Numpy raster values to persist in disk + mode: Default rasterio mode. Default is 'w' but you also can set 'r+'. + tags: Tag values (Dict[str, str]) to write on dataset. + **profile: Rasterio profile values to add in dataset. + """ + with rasterio.open(str(destination), mode, **profile) as dataset: + if profile.get('nodata'): + dataset.nodata = profile['nodata'] + + dataset.write_band(1, raster) + + if tags: + dataset.update_tags(**tags) + + generate_cogs(str(destination), str(destination), block_size=block_size) + + +def generate_cogs(input_data_set_path, file_path, profile='deflate', block_size=None, profile_options=None, **options): + """Generate Cloud Optimized GeoTIFF files (COG). + + Args: + input_data_set_path (str) - Path to the input data set + file_path (str) - Target data set filename + profile (str) - A COG profile based in `rio_cogeo.profiles`. + profile_options (dict) - Custom options to the profile. + block_size (int) - Custom block size. + + Returns: + Path to COG. + """ + if profile_options is None: + profile_options = dict() + + output_profile = cog_profiles.get(profile) + output_profile.update(dict(BIGTIFF="IF_SAFER")) + output_profile.update(profile_options) + + if block_size: + output_profile["blockxsize"] = block_size + output_profile["blockysize"] = block_size + + # Dataset Open option (see gdalwarp `-oo` option) + config = dict( + GDAL_NUM_THREADS="ALL_CPUS", + GDAL_TIFF_INTERNAL_MASK=True, + GDAL_TIFF_OVR_BLOCKSIZE="128", + ) + + cog_translate( + str(input_data_set_path), + str(file_path), + output_profile, + config=config, + in_memory=False, + quiet=True, + **options, + ) + return str(file_path) + + +class SmartDataSet: + """Defines utility class to auto close rasterio data set. + + This class is class helper to avoid memory leak of opened data set in memory. + """ + + def __init__(self, file_path: str, mode='r', tags=None, **properties): + """Initialize SmartDataSet definition and open rasterio data set.""" + self.path = Path(file_path) + self.mode = mode + self.dataset = rasterio.open(file_path, mode=mode, **properties) + self.tags = tags + self.mode = mode + + def __del__(self): + """Close dataset on delete object.""" + self.close() + + def __enter__(self): + """Use data set context with operator ``with``.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Close data set on exit with clause.""" + self.close() + + def close(self): + """Close rasterio data set.""" + if not self.dataset.closed: + logging.debug('Closing dataset {}'.format(str(self.path))) + + if self.mode == 'w' and self.tags: + self.dataset.update_tags(**self.tags) + + self.dataset.close() diff --git a/cube_builder/utils/index_generator.py b/cube_builder/utils/index_generator.py index 245cef0..7443178 100644 --- a/cube_builder/utils/index_generator.py +++ b/cube_builder/utils/index_generator.py @@ -24,13 +24,15 @@ import numpy from bdc_catalog.models import Band, Collection +from .image import SmartDataSet, generate_cogs from .interpreter import execute BandMapFile = Dict[str, str] """Type which a key (represented as data cube band name) points to generated file in disk.""" -def generate_band_indexes(cube: Collection, scenes: dict, period: str, tile_id: str, reuse_data_cube: Collection = None) -> BandMapFile: +def generate_band_indexes(cube: Collection, scenes: dict, period: str, tile_id: str, reuse_data_cube: Collection = None, + **kwargs) -> BandMapFile: """Generate data cube custom bands based in string-expression on table `band_indexes`. This method seeks for custom bands on Collection Band definition. A custom band must have @@ -45,7 +47,7 @@ def generate_band_indexes(cube: Collection, scenes: dict, period: str, tile_id: Returns: A dict values with generated bands. """ - from .processing import SmartDataSet, build_cube_path, generate_cogs + from .processing import build_cube_path cube_band_indexes: List[Band] = [] @@ -92,7 +94,8 @@ def generate_band_indexes(cube: Collection, scenes: dict, period: str, tile_id: profile['dtype'] = band_data_type profile['nodata'] = float(band_index.nodata) - custom_band_path = build_cube_path(cube_name, period, tile_id, version=cube_version, band=band_name) + custom_band_path = build_cube_path(cube_name, period, tile_id, version=cube_version, band=band_name, + **kwargs) output_dataset = SmartDataSet(str(custom_band_path), mode='w', **profile) logging.info(f'Generating band {band_name} for cube {cube_name} - {custom_band_path.stem}...') diff --git a/cube_builder/utils/processing.py b/cube_builder/utils/processing.py index ccc1a49..d57d409 100644 --- a/cube_builder/utils/processing.py +++ b/cube_builder/utils/processing.py @@ -39,14 +39,11 @@ import shapely.geometry from bdc_catalog.models import Collection, Item, SpatialRefSys, Tile, db from bdc_catalog.utils import multihash_checksum_sha256 -from flask import abort from geoalchemy2 import func from geoalchemy2.shape import from_shape, to_shape from numpngw import write_png from rasterio import Affine, MemoryFile from rasterio.warp import Resampling, reproject -from rio_cogeo.cogeo import cog_translate -from rio_cogeo.profiles import cog_profiles from ..config import Config # Constant to define required bands to generate both NDVI and EVI @@ -55,9 +52,12 @@ TOTAL_OBSERVATION_NAME) # Builder from . import get_srid_column +from .image import SmartDataSet, generate_cogs, save_as_cog from .index_generator import generate_band_indexes +from .strings import StringFormatter VEGETATION_INDEX_BANDS = {'red', 'nir', 'blue'} +FORMATTER = StringFormatter() def get_rasterio_config() -> dict: @@ -113,93 +113,20 @@ def get_or_create_activity(cube: str, warped: str, activity_type: str, scene_typ ) -class DataCubeFragments(list): - """Parse a data cube name and retrieve their parts. - - A data cube is composed by the following structure: - ``Collections_Resolution_TemporalPeriod_CompositeFunction``. - - An IDT data cube does not have TemporalPeriod and CompositeFunction. - - Examples: - >>> # Parse Sentinel 2 Monthly MEDIAN - >>> cube_parts = DataCubeFragments('S2_10_1M_MED') # ['S2', '10', '1M', 'MED'] - >>> cube_parts.composite_function - ... 'MED' - >>> # Parse Sentinel 2 IDENTITY - >>> cube_parts = DataCubeFragments('S2_10') # ['S2', '10'] - >>> cube_parts.composite_function - ... 'IDT' - >>> DataCubeFragments('S2-10') # ValueError Invalid data cube name - """ - - def __init__(self, datacube: str): - """Construct a Data Cube Fragments parser. - - Exceptions: - ValueError when data cube name is invalid. - """ - cube_fragments = self.parse(datacube) - - self.datacube = '_'.join(cube_fragments) - - super(DataCubeFragments, self).__init__(cube_fragments) - - @staticmethod - def parse(datacube: str) -> List[str]: - """Parse a data cube name.""" - cube_fragments = datacube.split('_') - - if len(cube_fragments) > 4 or len(cube_fragments) < 2: - abort(400, 'Invalid data cube name. "{}"'.format(datacube)) - - return cube_fragments - - def __str__(self): - """Retrieve the data cube name.""" - return self.datacube - - @property - def composite_function(self): - """Retrieve data cube composite function based. - - TODO: Add reference to document User Guide - Convention Data Cube Names - """ - if len(self) < 4: - return 'IDT' - - return self[-1] - - -def get_cube_parts(datacube: str) -> DataCubeFragments: - """Build a `DataCubeFragments` and validate data cube name policy.""" - return DataCubeFragments(datacube) - - -def get_cube_id(datacube: str, func=None): - """Prepare data cube name based on temporal function.""" - cube_fragments = get_cube_parts(datacube) - - if not func or func.upper() == 'IDT': - return f"{'_'.join(cube_fragments[:2])}" - - # Ensure that data cube with composite function must have a - # temporal resolution - if len(cube_fragments) <= 3: - raise ValueError('Invalid cube id without temporal resolution. "{}"'.format(datacube)) - - # When data cube have temporal resolution (S2_10_1M) use it - # Otherwise, just remove last cube part (composite function) - cube = datacube if len(cube_fragments) == 3 else '_'.join(cube_fragments[:-1]) - - return f'{cube}_{func}' - - -def get_item_id(datacube: str, version: int, tile: str, date: str) -> str: +def get_item_id(datacube: str, version: int, tile: str, date: str, fmt=None) -> str: """Prepare a data cube item structure.""" - version_str = '{0:03d}'.format(version) - - return f'{datacube}_v{version_str}_{tile}_{date}' + if fmt is None: + fmt = '{datacube:upper}_V{version}_{tile_id}_{start_date}' + + return FORMATTER.format( + fmt, + datacube=datacube, + version=str(version), + version_legacy='{0:03d}'.format(version), + tile_id=tile, + date=date, + start_date=date.replace('-', '')[:8] + ) def prepare_asset_url(url: str) -> str: @@ -241,6 +168,7 @@ def merge(merge_file: str, mask: dict, assets: List[dict], band: str, resx, resy = kwargs['resx'], kwargs['resy'] block_size = kwargs.get('block_size') shape = kwargs.get('shape', None) + transform = None if native_grid: tile_id = kwargs['tile_id'] @@ -472,7 +400,8 @@ def _check_rio_file_access(url: str, access_token: str = None): def post_processing_quality(quality_file: str, bands: List[str], cube: str, - date: str, tile_id, quality_band: str, band_map: dict, version: int, block_size:int=None, datasets=None): + date: str, tile_id, quality_band: str, band_map: dict, version: int, + datasets=None, **kwargs): """Stack the merge bands in order to apply a filter on the quality band. We have faced some issues regarding `nodata` value in spectral bands, which was resulting @@ -495,7 +424,9 @@ def post_processing_quality(quality_file: str, bands: List[str], cube: str, tile_id: Brazil data cube tile identifier quality_band: Quality band name version: Data cube version + datasets: List of related data sets used """ + block_size = kwargs.get('block_size') # Get quality profile and chunks with rasterio.open(str(quality_file)) as merge_dataset: blocks = list(merge_dataset.block_windows()) @@ -524,7 +455,8 @@ def post_processing_quality(quality_file: str, bands: List[str], cube: str, nodata_scl = raster_merge[block.row_off: row_offset, block.col_off: col_offset] == nodata for band in bands_without_quality: - band_file = build_cube_path(get_cube_id(cube), date, tile_id, version=version, band=band) + band_file = build_cube_path(cube, date, tile_id, version=version, band=band, + prefix=Config.WORK_DIR, composed=False, **kwargs) with rasterio.open(str(band_file)) as ds: raster = ds.read(1, window=block) @@ -542,43 +474,6 @@ def post_processing_quality(quality_file: str, bands: List[str], cube: str, save_as_cog(str(quality_file), raster_merge, block_size=block_size, **profile) -class SmartDataSet: - """Defines utility class to auto close rasterio data set. - - This class is class helper to avoid memory leak of opened data set in memory. - """ - - def __init__(self, file_path: str, mode='r', tags=None, **properties): - """Initialize SmartDataSet definition and open rasterio data set.""" - self.path = Path(file_path) - self.mode = mode - self.dataset = rasterio.open(file_path, mode=mode, **properties) - self.tags = tags - self.mode = mode - - def __del__(self): - """Close dataset on delete object.""" - self.close() - - def __enter__(self): - """Use data set context with operator ``with``.""" - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Close data set on exit with clause.""" - self.close() - - def close(self): - """Close rasterio data set.""" - if not self.dataset.closed: - logging.debug('Closing dataset {}'.format(str(self.path))) - - if self.mode == 'w' and self.tags: - self.dataset.update_tags(**self.tags) - - self.dataset.close() - - def compute_data_set_stats(file_path: str, mask: dict, compute: bool = True) -> Tuple[float, float]: """Compute data set efficacy and cloud ratio. @@ -599,14 +494,15 @@ def compute_data_set_stats(file_path: str, mask: dict, compute: bool = True) -> return efficacy, cloud_ratio -def blend(activity, band_map, quality_band, build_clear_observation=False, block_size=None, reuse_data_cube=None, apply_valid_range=None): +def blend(activity, band_map, quality_band, build_clear_observation=False, block_size=None, + reuse_data_cube=None, apply_valid_range=None, **kwargs): """Apply blend and generate raster from activity. Basically, the blend operation consists in stack all the images (merges) in period. The stack is based in best pixel image (Best clear ratio). The cloud pixels are masked with `numpy.ma` module, enabling to apply temporal composite function MEDIAN, AVG over these rasters. - The following example represents a data cube Landsat-8 16 days using function Best Pixel (Stack - STK) and + The following example represents a data cube Landsat-8 16 days using function Best Pixel (Stack - LCF) and Median (MED) in period of 16 days from 1/1 to 16/1. The images from `10/1` and `15/1` were found and the values as described below:: @@ -700,6 +596,7 @@ def blend(activity, band_map, quality_band, build_clear_observation=False, block bandlist = [] provenance_merge_map = dict() + merges_band_map = {} for m in sorted(mask_tuples, reverse=True): key = m[1] @@ -718,6 +615,7 @@ def blend(activity, band_map, quality_band, build_clear_observation=False, block raise IOError('FileError while opening {} - {}'.format(filename, e)) filename = scene['ARDfiles'][band] + merges_band_map[filename] = key provenance_merge_map.setdefault(key, None) @@ -764,12 +662,13 @@ def blend(activity, band_map, quality_band, build_clear_observation=False, block datacube = reuse_data_cube['name'] version = reuse_data_cube['version'] - cube_file = build_cube_path(datacube, period, tile_id, version=version, band=band, suffix='.tif') + cube_file = build_cube_path(datacube, period, tile_id, version=version, band=band, suffix='.tif', + composed=True, **kwargs) # Create directory cube_file.parent.mkdir(parents=True, exist_ok=True) - cube_function = DataCubeFragments(datacube).composite_function + cube_function = activity['composite_function'] if cube_function == 'MED': median_raster = numpy.full((height, width), fill_value=nodata, dtype=profile['dtype']) @@ -777,8 +676,10 @@ def blend(activity, band_map, quality_band, build_clear_observation=False, block if build_clear_observation: logging.warning('Creating and computing Clear Observation (ClearOb) file...') - clear_ob_file_path = build_cube_path(datacube, period, tile_id, version=version, band=CLEAR_OBSERVATION_NAME, suffix='.tif') - dataset_file_path = build_cube_path(datacube, period, tile_id, version=version, band=DATASOURCE_NAME, suffix='.tif') + clear_ob_file_path = build_cube_path(datacube, period, tile_id, version=version, + band=CLEAR_OBSERVATION_NAME, suffix='.tif', composed=True, **kwargs) + dataset_file_path = build_cube_path(datacube, period, tile_id, version=version, + band=DATASOURCE_NAME, suffix='.tif', composed=True, **kwargs) clear_ob_profile = profile.copy() clear_ob_profile['dtype'] = CLEAR_OBSERVATION_ATTRIBUTES['data_type'] @@ -852,8 +753,8 @@ def blend(activity, band_map, quality_band, build_clear_observation=False, block stack_total_observation[window.row_off: row_offset, window.col_off: col_offset] += copy_mask.astype(numpy.uint8) # Get current observation file name - file_name = Path(bandlist[order].name).stem - file_date = datetime.strptime(file_name.split('_')[4], '%Y-%m-%d') + file_path = bandlist[order].name + file_date = datetime.strptime(merges_band_map[file_path], '%Y-%m-%d') day_of_year = file_date.timetuple().tm_yday # Find all no data in destination STACK image @@ -948,7 +849,8 @@ def blend(activity, band_map, quality_band, build_clear_observation=False, block clear_ob_data_set.close() logging.warning('Clear Observation (ClearOb) file generated successfully.') - total_observation_file = build_cube_path(datacube, period, tile_id, version=version, band=TOTAL_OBSERVATION_NAME) + total_observation_file = build_cube_path(datacube, period, tile_id, version=version, + band=TOTAL_OBSERVATION_NAME, composed=True, **kwargs) total_observation_profile = profile.copy() total_observation_profile.pop('nodata', None) total_observation_profile['dtype'] = 'uint8' @@ -966,7 +868,8 @@ def blend(activity, band_map, quality_band, build_clear_observation=False, block save_as_cog(str(cube_file), stack_raster, block_size=block_size, mode='w', **profile) if build_clear_observation: - provenance_file = build_cube_path(datacube, period, tile_id, version=version, band=PROVENANCE_NAME) + provenance_file = build_cube_path(datacube, period, tile_id, version=version, + band=PROVENANCE_NAME, composed=True, **kwargs) provenance_profile = profile.copy() provenance_profile.pop('nodata', -1) provenance_profile['dtype'] = PROVENANCE_ATTRIBUTES['data_type'] @@ -1054,7 +957,7 @@ def _item_prefix(absolute_path: Path, data_dir: str = None) -> Path: return concat_path(Config.ITEM_PREFIX, relative_path) -def publish_datacube(cube, bands, tile_id, period, scenes, cloudratio, reuse_data_cube=None, +def publish_datacube(cube: Collection, bands, tile_id, period, scenes, cloudratio, reuse_data_cube=None, srid=SRID_ALBERS_EQUAL_AREA, data_dir=None, **kwargs): """Generate quicklook and catalog datacube on database.""" start_date, end_date = period.split('_') @@ -1066,17 +969,15 @@ def publish_datacube(cube, bands, tile_id, period, scenes, cloudratio, reuse_dat datacube = reuse_data_cube['name'] version = reuse_data_cube['version'] - cube_parts = get_cube_parts(datacube) + format_item_cube = kwargs.get('format_item_cube') output = [] - for composite_function in [cube_parts.composite_function]: - item_datacube = get_cube_id(datacube, composite_function) - - item_id = get_item_id(item_datacube, version, tile_id, period) + for composite_function in [cube.composite_function.alias]: + item_id = get_item_id(datacube, version, tile_id, period, fmt=format_item_cube) cube_bands = cube.bands - quick_look_file = build_cube_path(item_datacube, period, tile_id, version=version, suffix=None) + quick_look_file = build_cube_path(datacube, period, tile_id, version=version, suffix=None, composed=True, **kwargs) ql_files = [] for band in bands: @@ -1085,12 +986,13 @@ def publish_datacube(cube, bands, tile_id, period, scenes, cloudratio, reuse_dat quick_look_file = generate_quick_look(str(quick_look_file), ql_files) if kwargs.get('with_rgb'): - rgb_file = build_cube_path(item_datacube, period, tile_id, version=version, band='RGB') + rgb_file = build_cube_path(datacube, period, tile_id, version=version, band='RGB', composed=True, **kwargs) generate_rgb(rgb_file, ql_files) map_band_scene = {name: composite_map[composite_function] for name, composite_map in scenes.items()} - custom_bands = generate_band_indexes(cube, map_band_scene, period, tile_id, reuse_data_cube=reuse_data_cube) + custom_bands = generate_band_indexes(cube, map_band_scene, period, tile_id, reuse_data_cube=reuse_data_cube, + composed=True, **kwargs) for name, file in custom_bands.items(): scenes[name] = {composite_function: str(file)} @@ -1185,7 +1087,8 @@ def publish_datacube(cube, bands, tile_id, period, scenes, cloudratio, reuse_dat return output -def publish_merge(bands, datacube, tile_id, date, scenes, reuse_data_cube=None, srid=SRID_ALBERS_EQUAL_AREA, data_dir=None): +def publish_merge(bands, datacube, tile_id, date, scenes, reuse_data_cube=None, srid=SRID_ALBERS_EQUAL_AREA, + data_dir=None, **kwargs): """Generate quicklook and catalog warped datacube on database. TODO: Review it with publish_datacube @@ -1193,14 +1096,16 @@ def publish_merge(bands, datacube, tile_id, date, scenes, reuse_data_cube=None, data_dir = data_dir or Config.DATA_DIR cube_name = datacube.name cube_version = datacube.version + if reuse_data_cube: - cube_name = get_cube_id(reuse_data_cube['name']) + cube_name = reuse_data_cube['name'] cube_version = reuse_data_cube['version'] reuse_data_cube['name'] = cube_name - item_id = get_item_id(cube_name, cube_version, tile_id, date) + format_item_cube = kwargs.get('format_item_cube') + item_id = get_item_id(cube_name, cube_version, tile_id, date, fmt=format_item_cube) - quick_look_file = build_cube_path(cube_name, date, tile_id, version=cube_version, suffix=None) + quick_look_file = build_cube_path(cube_name, date, tile_id, version=cube_version, composed=False, suffix=None, **kwargs) cube_bands = datacube.bands output = [] @@ -1213,7 +1118,8 @@ def publish_merge(bands, datacube, tile_id, date, scenes, reuse_data_cube=None, quick_look_file = generate_quick_look(str(quick_look_file), ql_files) # Generate VI - custom_bands = generate_band_indexes(datacube, scenes['ARDfiles'], date, tile_id, reuse_data_cube=reuse_data_cube) + custom_bands = generate_band_indexes(datacube, scenes['ARDfiles'], date, tile_id, reuse_data_cube=reuse_data_cube, + composed=False, **kwargs) scenes['ARDfiles'].update(custom_bands) tile = Tile.query().filter(Tile.name == tile_id, Tile.grid_ref_sys_id == datacube.grid_ref_sys_id).first() @@ -1456,18 +1362,74 @@ def _qa_statistics(raster, mask: dict, compute: bool = False) -> Tuple[float, fl return efficacy, not_clear_ratio -def build_cube_path(datacube: str, period: str, tile_id: str, version: int, band: str = None, suffix: Union[str, None] = '.tif', prefix=None) -> Path: - """Retrieve the path to the Data cube file in Brazil Data Cube Cluster.""" +def build_cube_path(datacube: str, period: str, tile_id: str, version: int, band: str = None, + suffix: Union[str, None] = '.tif', prefix=None, + format_path_cube: str = None, + format_item_cube: str = None, + composed: bool = False, + **kwargs) -> Path: + """Retrieve the path to the Data cube file in Brazil Data Cube Cluster. + + The following values are available for ``format_path_cube``: + + - ``datacube``: Data cube name + - ``prefix``: Prefix for cubes. + - ``path``: Orbit path from tile id (for native BDC Grids). + - ``row``: Orbit row from tile id (for native BDC Grids). + - ``tile_id``: Same value in variable. + - ``year``: String representation of Start Date Year + - ``month``: String representation of Start Date Month + - ``day``: String representation of Start Date Day + - ``version``: Version string using ``V``. + - ``version_legacy``: Legacy string version using the structure ``v{0:03d}`` -> ``v001``. + - ``period=period``: String period (Start/End) date. + - ``filename``: Entire Item id using value from ``format_item_cube``. + + Args: + datacube (str): The data cube base name + period (str): String representation for Data Period. It may be ``start_date`` for + Identity Data cubes or ``start_date_end_date`` for temporal composing data cube. + tile_id (str): The tile identifier as string. + version (str): String representation for Collection version. + band (Union[str, None]): Attach a band value into path. Defaults to ``None``. + suffix (Union[str, None]): Path suffix representing file extension. Defaults to ``.tif``. + prefix (str): Path prefix for cubes. Defaults to ``Config.WORK_DIR``. + format_path_cube (Optional[str]): Custom format while building data cube path. Defaults + to ``{prefix}/{folder}/{datacube:lower}/{version}/{path}/{row}/{year}/{month}/{day}/{filename}``. + format_item_cube (Optional[str]): Custom format while building data cube item name. Defaults + to ``{datacube:upper}_V{version}_{tile_id}_{start_date}``. + composed (bool): Flag to identify cube context (identity or composed). Defaults to ``False``. + """ # Default prefix path is WORK_DIR prefix = prefix or Config.WORK_DIR folder = 'identity' - date = period - - fragments = DataCubeFragments(datacube) + if composed: + folder = 'composed' version_str = 'v{0:03d}'.format(version) + path, row = tile_id[:3], tile_id[-3:] + # Manual start date reference + year = str(period[:4]) + month = '{0:02d}'.format(int(period[5:7])) + day = '{0:02d}'.format(int(period[8:10])) + + fmt_kwargs = dict( + datacube=datacube, + prefix=prefix, + path=path, row=row, + tile_id=tile_id, + year=year, + month=month, + day=day, + version=f'v{version}', # New version format + version_legacy=version_str, + period=period, + ) + + if format_path_cube is None: + format_path_cube = '{prefix}/{folder}/{datacube:lower}/{version}/{path}/{row}/{year}/{month}/{day}/{filename}' - file_name = get_item_id(datacube, version, tile_id, period) + file_name = get_item_id(datacube, version, tile_id, period, fmt=format_item_cube) if band is not None: file_name = f'{file_name}_{band}' @@ -1477,10 +1439,10 @@ def build_cube_path(datacube: str, period: str, tile_id: str, version: int, band file_name = f'{file_name}{suffix}' - if fragments.composite_function != 'IDT': # For cube with temporal composition - folder = 'composed' + fmt_kwargs['filename'] = file_name + fmt_kwargs['folder'] = folder - return Path(prefix) / folder / datacube / version_str / tile_id / date / file_name + return Path(FORMATTER.format(format_path_cube, **fmt_kwargs)) def create_asset_definition(href: str, mime_type: str, role: List[str], absolute_path: str, @@ -1532,74 +1494,6 @@ def create_asset_definition(href: str, mime_type: str, role: List[str], absolute return asset -def save_as_cog(destination: str, raster, mode='w', tags=None, block_size=None, **profile): - """Save the raster file as Cloud Optimized GeoTIFF. - - See Also: - Cloud Optimized GeoTiff https://gdal.org/drivers/raster/cog.html - - Args: - destination: Path to store the data set. - raster: Numpy raster values to persist in disk - mode: Default rasterio mode. Default is 'w' but you also can set 'r+'. - tags: Tag values (Dict[str, str]) to write on dataset. - **profile: Rasterio profile values to add in dataset. - """ - with rasterio.open(str(destination), mode, **profile) as dataset: - if profile.get('nodata'): - dataset.nodata = profile['nodata'] - - dataset.write_band(1, raster) - - if tags: - dataset.update_tags(**tags) - - generate_cogs(str(destination), str(destination), block_size=block_size) - - -def generate_cogs(input_data_set_path, file_path, profile='deflate', block_size=None, profile_options=None, **options): - """Generate Cloud Optimized GeoTIFF files (COG). - - Args: - input_data_set_path (str) - Path to the input data set - file_path (str) - Target data set filename - profile (str) - A COG profile based in `rio_cogeo.profiles`. - profile_options (dict) - Custom options to the profile. - block_size (int) - Custom block size. - - Returns: - Path to COG. - """ - if profile_options is None: - profile_options = dict() - - output_profile = cog_profiles.get(profile) - output_profile.update(dict(BIGTIFF="IF_SAFER")) - output_profile.update(profile_options) - - if block_size: - output_profile["blockxsize"] = block_size - output_profile["blockysize"] = block_size - - # Dataset Open option (see gdalwarp `-oo` option) - config = dict( - GDAL_NUM_THREADS="ALL_CPUS", - GDAL_TIFF_INTERNAL_MASK=True, - GDAL_TIFF_OVR_BLOCKSIZE="128", - ) - - cog_translate( - str(input_data_set_path), - str(file_path), - output_profile, - config=config, - in_memory=False, - quiet=True, - **options, - ) - return str(file_path) - - @contextmanager def rasterio_access_token(access_token=None): """Retrieve a context manager that wraps a temporary file containing the access token to be passed to STAC.""" diff --git a/cube_builder/utils/strings.py b/cube_builder/utils/strings.py new file mode 100644 index 0000000..a069b9a --- /dev/null +++ b/cube_builder/utils/strings.py @@ -0,0 +1,50 @@ +# +# This file is part of Cube Builder. +# Copyright (C) 2022 INPE. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +"""Define a module to deal with string functionalities. + +This module extends the builtin Python string module. +""" + +import string + + +class StringFormatter(string.Formatter): + """Implement a string formatter and supports pipes to format string. + + Examples: + >> from cube_builder.utils.strings import StringFormatter + >> formatter = StringFormatter() + >> template = '{name:upper}, {name:lower}, {name:capitalize}' + >> name = 'CustomNameTiler' + >> assert formatter.format(template, name=name) == f'{name.upper()}, {name.lower()}, {name.capitalize()}' + """ + + DEFAULT_STRING_FUNCTIONS = ['upper', 'lower', 'capitalize'] + """List of supported generic string functions.""" + + def format_field(self, value, format_spec): + """Format a string according PEP3101.""" + if isinstance(value, str): + if format_spec in self.DEFAULT_STRING_FUNCTIONS: + handler = self.DEFAULT_STRING_FUNCTIONS[self.DEFAULT_STRING_FUNCTIONS.index(format_spec)] + value = getattr(value, handler)() + format_spec = format_spec[:-1] + elif isinstance(value, int) or isinstance(value, float): + value = str(value) + return super(StringFormatter, self).format(value, format_spec) diff --git a/cube_builder/version.py b/cube_builder/version.py index b0325a1..72aceaf 100644 --- a/cube_builder/version.py +++ b/cube_builder/version.py @@ -23,4 +23,4 @@ """ -__version__ = '0.8.2' +__version__ = '0.8.3' diff --git a/tests/data/json/lc8-16d-stk.json b/tests/data/json/lc8-16d-stk.json index 1468906..85cb5ea 100644 --- a/tests/data/json/lc8-16d-stk.json +++ b/tests/data/json/lc8-16d-stk.json @@ -1,5 +1,6 @@ { - "datacube": "LC8-TESTE", + "datacube": "LC8-16D", + "datacube_identity": "LC8", "grs": "BDC_MD_TESTE", "title": "Landsat-8 OLI Data Cube", "resolution": 30, diff --git a/tests/test_cube_creation.py b/tests/test_cube_creation.py index cab2081..fbbe318 100644 --- a/tests/test_cube_creation.py +++ b/tests/test_cube_creation.py @@ -32,7 +32,7 @@ from cube_builder.utils.image import check_file_integrity CUBE_PARAMS = dict( - datacube='LC8-TESTE_30_16D_LCF', + datacube='LC8-16D', tiles=['007011'], collections=['LC8_SR-1'], start_date=datetime.date(year=2020, month=1, day=1), diff --git a/tests/test_processing.py b/tests/test_processing.py index 3ae4ead..fb30ced 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -19,18 +19,37 @@ """Define the unittests for data cube timeline.""" import os +from pathlib import Path from cube_builder.config import Config from cube_builder.utils.processing import build_cube_path -def assert_data_cube_path(datacube, period, tile_id, version, expected_base_path, band=None, prefix=Config.DATA_DIR): +def assert_data_cube_path(datacube, period, tile_id, version, expected_base_path, band=None, prefix=Config.DATA_DIR, + legacy=False): """Assert directive to validate data cube paths.""" - absolute_datacube_path = build_cube_path(datacube, period, tile_id, version, band=band, prefix=prefix) - version_str = 'v{0:03d}'.format(version) + format_path_cube = None + format_item_cube = None + folder = expected_base_path + expected_base_path = os.path.join(prefix, expected_base_path) + expected_path = os.path.join(expected_base_path, datacube.lower(), f'v{version}', + tile_id[:3], tile_id[-3:], + period[:4], + '{0:02d}'.format(int(period[5:7])), + '{0:02d}'.format(int(period[8:10]))) + expected_file_name = f'{datacube.upper()}_V{version}_{tile_id}_{period[:10].replace("-", "")}_{band}.tif' - expected_path = os.path.join(prefix, expected_base_path, datacube, version_str, tile_id, period) - expected_file_name = f'{datacube}_{version_str}_{tile_id}_{period}_{band}.tif' + if legacy: + version_str = 'v{0:03d}'.format(version) + format_path_cube = '{prefix}/{folder}/{datacube}/{version_legacy}/{tile_id}/{period}/{filename}' + format_item_cube = '{datacube}_v{version_legacy}_{tile_id}_{date}' + expected_path = os.path.join(expected_base_path, datacube, version_str, tile_id, period) + expected_file_name = f'{datacube}_{version_str}_{tile_id}_{period}_{band}.tif' + + absolute_datacube_path = build_cube_path(datacube, period, tile_id, version, band=band, prefix=prefix, + format_path_cube=format_path_cube, + format_item_cube=format_item_cube, + composed=folder == 'composed') assert str(absolute_datacube_path.parent) == expected_path assert absolute_datacube_path.name == expected_file_name @@ -42,11 +61,12 @@ def test_datacube_paths(): period = f'{date}_2017-01-31' tile_id = '000000' version = 2 - band = 'b001' + band = 'B1' datacube = 'MyCube_10' for prefix in [Config.DATA_DIR, Config.WORK_DIR]: - # Identity + # Identity New / Legacy assert_data_cube_path(datacube, date, tile_id, version, 'identity', band=band, prefix=prefix) + assert_data_cube_path(datacube, date, tile_id, version, 'identity', band=band, prefix=prefix, legacy=True) # Composed - assert_data_cube_path(f'{datacube}_1M_MED', period, tile_id, version, 'composed', band=band, prefix=prefix) + assert_data_cube_path(f'{datacube}-1M', period, tile_id, version, 'composed', band=band, prefix=prefix)