From e8b19dee5bb0577023604f35fe399c1b2e1138e6 Mon Sep 17 00:00:00 2001 From: Benjamin Webb <40066515+webb-ben@users.noreply.github.com> Date: Tue, 10 Sep 2024 23:19:06 -0400 Subject: [PATCH 1/4] Add support for ObservedProperties OAF and custom expand of entities --- pygeoapi/provider/sensorthings.py | 205 +++++++++++++++++++++--------- 1 file changed, 142 insertions(+), 63 deletions(-) diff --git a/pygeoapi/provider/sensorthings.py b/pygeoapi/provider/sensorthings.py index b888a60f7..ff0219e81 100644 --- a/pygeoapi/provider/sensorthings.py +++ b/pygeoapi/provider/sensorthings.py @@ -51,10 +51,10 @@ _EXPAND = { 'Things': 'Locations,Datastreams', 'Observations': 'Datastream,FeatureOfInterest', + 'ObservedProperties': 'Datastreams/Thing/Locations', 'Datastreams': """ Sensor ,ObservedProperty - ,Thing ,Thing/Locations ,Observations( $select=@iot.id; @@ -71,6 +71,7 @@ class SensorThingsProvider(BaseProvider): """SensorThings API (STA) Provider""" + expand = EXPAND def __init__(self, provider_def): """ @@ -82,19 +83,10 @@ def __init__(self, provider_def): :returns: pygeoapi.provider.sensorthings.SensorThingsProvider """ LOGGER.debug('Setting SensorThings API (STA) provider') - + self.linked_entity = {} super().__init__(provider_def) - self.data.rstrip('/') - try: - self.entity = provider_def['entity'] - self._url = url_join(self.data, self.entity) - except KeyError: - LOGGER.debug('Attempting to parse Entity from provider data') - if not self._get_entity(self.data): - raise RuntimeError('Entity type required') - self.entity = self._get_entity(self.data) - self._url = self.data - self.data = self._url.rstrip(f'/{self.entity}') + + self._generate_mappings(provider_def) LOGGER.debug(f'STA endpoint: {self.data}, Entity: {self.entity}') # Default id @@ -277,17 +269,19 @@ def _load(self, offset=0, limit=10, resulttype='results', return fc - def _make_feature(self, entity, select_properties=[], skip_geometry=False): + def _make_feature(self, feature, select_properties=[], skip_geometry=False, + entity=None): """ Private function: Create feature from entity - :param entity: `dict` of STA entity + :param feature: `dict` of STA entity :param select_properties: list of property names :param skip_geometry: bool of whether to skip geometry (default False) + :param entity: SensorThings entity name :returns: dict of GeoJSON Feature """ - _ = entity.pop(self.id_field) + _ = feature.pop(self.id_field) id = f"'{_}'" if isinstance(_, str) else str(_) f = { 'type': 'Feature', 'id': id, 'properties': {}, 'geometry': None @@ -295,28 +289,35 @@ def _make_feature(self, entity, select_properties=[], skip_geometry=False): # Make geometry if not skip_geometry: - f['geometry'] = self._geometry(entity) + f['geometry'] = self._geometry(feature, entity) # Fill properties block try: f['properties'] = self._expand_properties( - entity, select_properties) + feature, select_properties, entity) except KeyError as err: LOGGER.error(err) raise ProviderQueryError(err) return f - def _get_response(self, url, params={}): + def _get_response(self, url, params={}, entity=None, expand=None): """ Private function: Get STA response :param url: request url :param params: query parameters + :param entity: SensorThings entity name + :param expand: SensorThings expand query + :returns: STA response """ - params.update({'$expand': EXPAND[self.entity]}) + if expand: + params.update({'$expand': expand}) + else: + entity_ = entity or self.entity + params.update({'$expand': self.expand[entity_]}) r = self.http.get(url, params=params) @@ -332,13 +333,15 @@ def _get_response(self, url, params={}): return response - def _make_filter(self, properties, bbox=[], datetime_=None): + def _make_filter(self, properties, bbox=[], datetime_=None, + entity=None): """ Private function: Make STA filter from query properties :param properties: list of tuples (name, value) :param bbox: bounding box [minx,miny,maxx,maxy] :param datetime_: temporal (datestamp or extent) + :param entity: SensorThings entity name :returns: STA $filter string of properties """ @@ -350,16 +353,8 @@ def _make_filter(self, properties, bbox=[], datetime_=None): ret.append(f'{name} eq {value}') if bbox: - minx, miny, maxx, maxy = bbox - bbox_ = f'POLYGON (({minx} {miny}, {maxx} {miny}, \ - {maxx} {maxy}, {minx} {maxy}, {minx} {miny}))' - if self.entity == 'Things': - loc = 'Locations/location' - elif self.entity == 'Datastreams': - loc = 'Thing/Locations/location' - elif self.entity == 'Observations': - loc = 'FeatureOfInterest/feature' - ret.append(f"st_within({loc}, geography'{bbox_}')") + entity_ = entity or self.entity + ret.append(self._make_bbox(bbox, entity_)) if datetime_ is not None: if self.time_field is None: @@ -378,6 +373,20 @@ def _make_filter(self, properties, bbox=[], datetime_=None): return ' and '.join(ret) + @staticmethod + def _make_bbox(bbox, entity): + minx, miny, maxx, maxy = bbox + bbox_ = f'POLYGON(({minx} {miny},{maxx} {miny},{maxx} {maxy},{minx} {maxy},{minx} {miny}))' # noqa + if entity == 'Things': + loc = 'Locations/location' + elif entity == 'Datastreams': + loc = 'Thing/Locations/location' + elif entity == 'Observations': + loc = 'FeatureOfInterest/feature' + elif entity == 'ObservedProperties': + loc = 'Datastreams/observedArea' + return f"st_within({loc},geography'{bbox_}')" + def _make_orderby(self, sortby): """ Private function: Make STA filter from query properties @@ -398,79 +407,85 @@ def _make_orderby(self, sortby): return ','.join(ret) - def _geometry(self, entity): + def _geometry(self, feature, entity=None): """ Private function: Retrieve STA geometry - :param entity: SensorThings entity + :param feature: SensorThings entity + :param entity: SensorThings entity name :returns: GeoJSON Geometry for feature """ + entity_ = entity or self.entity try: - if self.entity == 'Things': - return entity['Locations'][0]['location'] + if entity_ == 'Things': + return feature['Locations'][0]['location'] - elif self.entity == 'Observations': - return entity['FeatureOfInterest'].pop('feature') + elif entity_ == 'Observations': + return feature['FeatureOfInterest'].pop('feature') - elif self.entity == 'Datastreams': + elif entity_ == 'Datastreams': try: - return entity['Observations'][0]['FeatureOfInterest'].pop('feature') # noqa + return feature['Observations'][0]['FeatureOfInterest'].pop('feature') # noqa except (KeyError, IndexError): - return entity['Thing'].pop('Locations')[0]['location'] + return feature['Thing'].pop('Locations')[0]['location'] + + elif entity_ == 'ObservedProperties': + return feature['Datastreams'][0]['Thing']['Locations'][0]['location'] # noqa except (KeyError, IndexError): LOGGER.warning('No geometry found') return None - def _expand_properties(self, entity, keys=(), uri=''): + def _expand_properties(self, feature, keys=(), uri='', + entity=None): """ Private function: Parse STA entity into feature - :param entity: SensorThings entity + :param feature: `dict` of SensorThings entity :param keys: keys used in properties block :param uri: uri of STA entity + :param entity: SensorThings entity name :returns: dict of SensorThings feature properties """ - LOGGER.debug('Adding extra properties') - # Properties filter & display keys = (() if not self.properties and not keys else set(self.properties) | set(keys)) - if self.entity == 'Things': - self._expand_location(entity) - elif 'Thing' in entity.keys(): - self._expand_location(entity['Thing']) + entity = entity or self.entity + if entity == 'Things': + self._expand_location(feature) + elif 'Thing' in feature.keys(): + self._expand_location(feature['Thing']) # Retain URI if present - if entity.get('properties') and self.uri_field: - uri = entity['properties'] + if feature.get('properties') and self.uri_field: + uri = feature['properties'] # Create intra links - LOGGER.debug('Creating intralinks') - for k, v in entity.items(): - if k in self.links: - entity[k] = [self._get_uri(_v, **self.links[k]) for _v in v] + for k, v in feature.items(): + if k in self.linked_entity: + feature[k] = [self._get_uri(_v, **self.linked_entity[k]) + for _v in v] LOGGER.debug(f'Created link for {k}') - elif f'{k}s' in self.links: - entity[k] = self._get_uri(v, **self.links[f'{k}s']) + elif f'{k}s' in self.linked_entity: + feature[k] = \ + self._get_uri(v, **self.linked_entity[f'{k}s']) LOGGER.debug(f'Created link for {k}') # Make properties block - LOGGER.debug('Making properties block') - if entity.get('properties'): - entity.update(entity.pop('properties')) + if feature.get('properties'): + feature.update(feature.pop('properties')) if keys: - ret = {k: entity.pop(k) for k in keys} - entity = ret + ret = {k: feature.pop(k) for k in keys} + feature = ret if self.uri_field is not None and uri != '': - entity[self.uri_field] = uri + feature[self.uri_field] = uri - return entity + return feature @staticmethod def _expand_location(entity): @@ -522,5 +537,69 @@ def _get_entity(uri): else: return '' + def _generate_mappings(self, provider_def: dict): + """ + Generate mappings for the STA entity and set up intra-links. + + This function sets up the necessary mappings and configurations for + the STA entity based on the provided provider definition. + + :param provider_def: `dict` of provider definition containing + configuration details for the STA entity. + """ + self.data.rstrip('/') + try: + self.entity = provider_def['entity'] + self._url = url_join(self.data, self.entity) + except KeyError: + LOGGER.debug('Attempting to parse Entity from provider data') + if not self._get_entity(self.data): + raise RuntimeError('Entity type required') + self.entity = self._get_entity(self.data) + self._url = self.data + self.data = self._url.rstrip(f'/{self.entity}') + + # Default id + if self.id_field: + LOGGER.debug(f'Using id field: {self.id_field}') + else: + LOGGER.debug('Using default @iot.id for id field') + self.id_field = '@iot.id' + + # Create intra-links + self.intralink = provider_def.get('intralink', False) + if self.intralink and provider_def.get('rel_link'): + # For pytest + self.rel_link = provider_def['rel_link'] + + elif self.intralink: + # Read from pygeoapi config + with open(os.getenv('PYGEOAPI_CONFIG'), encoding='utf8') as fh: + CONFIG = yaml_load(fh) + self.rel_link = get_base_url(CONFIG) + + for (name, rs) in CONFIG['resources'].items(): + pvs = rs.get('providers') + p = get_provider_default(pvs) + e = p.get('entity') or self._get_entity(p['data']) + if any([ + not pvs, # No providers in resource + not p.get('intralink'), # No configuration for intralinks + not e, # No STA entity found + self.data not in p.get('data') # No common STA endpoint + ]): + continue + + if p.get('uri_field'): + LOGGER.debug(f'Linking {e} with field: {p["uri_field"]}') + else: + LOGGER.debug(f'Linking {e} with collection: {name}') + + self.linked_entity[e] = { + 'cnm': name, # OAPI collection name, + 'cid': p.get('id_field', '@iot.id'), # OAPI id_field + 'uri': p.get('uri_field') # STA uri_field + } + def __repr__(self): return f' {self.data}, {self.entity}' From 157792dcc55ecefc4d75d501ee816adba06d47a1 Mon Sep 17 00:00:00 2001 From: Benjamin Webb <40066515+webb-ben@users.noreply.github.com> Date: Tue, 10 Sep 2024 23:26:10 -0400 Subject: [PATCH 2/4] Update sensorthings.py --- pygeoapi/provider/sensorthings.py | 48 ------------------------------- 1 file changed, 48 deletions(-) diff --git a/pygeoapi/provider/sensorthings.py b/pygeoapi/provider/sensorthings.py index ff0219e81..f00b4bef2 100644 --- a/pygeoapi/provider/sensorthings.py +++ b/pygeoapi/provider/sensorthings.py @@ -89,54 +89,6 @@ def __init__(self, provider_def): self._generate_mappings(provider_def) LOGGER.debug(f'STA endpoint: {self.data}, Entity: {self.entity}') - # Default id - if self.id_field: - LOGGER.debug(f'Using id field: {self.id_field}') - else: - LOGGER.debug('Using default @iot.id for id field') - self.id_field = '@iot.id' - - # Create intra-links - self.links = {} - self.intralink = provider_def.get('intralink', False) - if self.intralink and provider_def.get('rel_link'): - # For pytest - self.rel_link = provider_def['rel_link'] - - elif self.intralink: - # Read from pygeoapi config - with open(os.getenv('PYGEOAPI_CONFIG'), encoding='utf8') as fh: - CONFIG = yaml_load(fh) - self.rel_link = get_base_url(CONFIG) - - for (name, rs) in CONFIG['resources'].items(): - pvs = rs.get('providers') - - if pvs is None: - LOGGER.debug(f'Skipping collection: {name}') - continue - - p = get_provider_default(pvs) - e = p.get('entity') or self._get_entity(p['data']) - if any([ - not pvs, # No providers in resource - not p.get('intralink'), # No configuration for intralinks - not e, # No STA entity found - self.data not in p.get('data') # No common STA endpoint - ]): - continue - - if p.get('uri_field'): - LOGGER.debug(f'Linking {e} with field: {p["uri_field"]}') - else: - LOGGER.debug(f'Linking {e} with collection: {name}') - - self.links[e] = { - 'cnm': name, # OAPI collection name, - 'cid': p.get('id_field', '@iot.id'), # OAPI id_field - 'uri': p.get('uri_field') # STA uri_field - } - # Start session self.http = Session() self.get_fields() From 58aa433cc6fdc83335c0eede6e7119d2991b3951 Mon Sep 17 00:00:00 2001 From: Benjamin Webb <40066515+webb-ben@users.noreply.github.com> Date: Wed, 11 Sep 2024 15:13:41 -0400 Subject: [PATCH 3/4] Respond to feedback --- pygeoapi/provider/sensorthings.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pygeoapi/provider/sensorthings.py b/pygeoapi/provider/sensorthings.py index f00b4bef2..5fb5c991e 100644 --- a/pygeoapi/provider/sensorthings.py +++ b/pygeoapi/provider/sensorthings.py @@ -30,7 +30,6 @@ # ================================================================= from json.decoder import JSONDecodeError -import os import logging from requests import Session @@ -530,7 +529,7 @@ def _generate_mappings(self, provider_def: dict): CONFIG = yaml_load(fh) self.rel_link = get_base_url(CONFIG) - for (name, rs) in CONFIG['resources'].items(): + for name, rs in CONFIG['resources'].items(): pvs = rs.get('providers') p = get_provider_default(pvs) e = p.get('entity') or self._get_entity(p['data']) From 5d426664fea85ccbfc496e28fc9190760341fbd5 Mon Sep 17 00:00:00 2001 From: Benjamin Webb <40066515+webb-ben@users.noreply.github.com> Date: Wed, 11 Sep 2024 15:14:10 -0400 Subject: [PATCH 4/4] Use `pygeoapi.get_config` for SensorThings Intralinking --- pygeoapi/provider/sensorthings.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pygeoapi/provider/sensorthings.py b/pygeoapi/provider/sensorthings.py index 5fb5c991e..defd48c43 100644 --- a/pygeoapi/provider/sensorthings.py +++ b/pygeoapi/provider/sensorthings.py @@ -33,10 +33,11 @@ import logging from requests import Session +from pygeoapi.config import get_config from pygeoapi.provider.base import ( BaseProvider, ProviderQueryError, ProviderConnectionError) from pygeoapi.util import ( - yaml_load, url_join, get_provider_default, crs_transform, get_base_url) + url_join, get_provider_default, crs_transform, get_base_url) LOGGER = logging.getLogger(__name__) @@ -525,9 +526,8 @@ def _generate_mappings(self, provider_def: dict): elif self.intralink: # Read from pygeoapi config - with open(os.getenv('PYGEOAPI_CONFIG'), encoding='utf8') as fh: - CONFIG = yaml_load(fh) - self.rel_link = get_base_url(CONFIG) + CONFIG = get_config() + self.rel_link = get_base_url(CONFIG) for name, rs in CONFIG['resources'].items(): pvs = rs.get('providers')