diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index adaaeff9..195d2f38 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -7,7 +7,7 @@ "../deployment/docker-compose.override.devcontainer.yml" ], "service": "dev", - "runServices": ["db", "minio", "redis", "celery_beat", "worker", "dev"], + "runServices": ["db", "minio", "plumber", "redis", "celery_beat", "worker", "dev"], "workspaceFolder": "/home/web/project", "runArgs": [ "--env-file", @@ -23,7 +23,7 @@ "shutdownAction": "stopCompose", "customizations": { "vscode": { - "extensions": ["ms-python.python", "ms-azuretools.vscode-docker"], + "extensions": ["ms-python.python", "ms-azuretools.vscode-docker", "njpwerner.autodocstring"], "settings": { "terminal.integrated.shell.linux": "/bin/bash", "python.pythonPath": "/usr/local/bin/python", diff --git a/codecov.yml b/codecov.yml index 052bd279..f35c9d2b 100644 --- a/codecov.yml +++ b/codecov.yml @@ -9,7 +9,7 @@ coverage: changes: false patch: default: - threshold: "5%" + threshold: "10%" ignore: - "**/migrations/*.py" - "django_project/core/settings/*.py" diff --git a/deployment/.template.env b/deployment/.template.env index 907e67ca..da4a1728 100644 --- a/deployment/.template.env +++ b/deployment/.template.env @@ -16,9 +16,9 @@ DATABASE_PORT=5432 REDIS_HOST=redis REDIS_PASSWORD=redis_password -RABBITMQ_HOST=rabbitmq SENTRY_DSN= INITIAL_FIXTURES= +PLUMBER_PORT=8282 # Minio Variables for django default storages MINIO_AWS_ACCESS_KEY_ID=minio_tomorrownow @@ -42,3 +42,6 @@ SALIENT_AWS_ENDPOINT_URL= SALIENT_AWS_BUCKET_NAME=tomorrownow SALIENT_AWS_DIR_PREFIX= SALIENT_AWS_REGION_NAME= + +# API keys +TOMORROW_IO_API_KEY= diff --git a/deployment/docker-compose.override.devcontainer.yml b/deployment/docker-compose.override.devcontainer.yml index 656941f1..bf122147 100644 --- a/deployment/docker-compose.override.devcontainer.yml +++ b/deployment/docker-compose.override.devcontainer.yml @@ -48,6 +48,7 @@ services: links: - db - worker + - plumber minio: image: quay.io/minio/minio:RELEASE.2024-03-30T09-41-56Z.fips @@ -62,3 +63,18 @@ services: volumes: - ./volumes/minio_data:/data restart: always + + plumber: + build: + context: ../ + dockerfile: deployment/plumber/Dockerfile + env_file: + - .env + volumes: + - ../:/home/web/project + - ../django_project:/home/web/django_project + ports: + - "8282:8282" + links: + - db + - redis diff --git a/deployment/docker-compose.override.template.yml b/deployment/docker-compose.override.template.yml index 09b9cd7c..eb421146 100644 --- a/deployment/docker-compose.override.template.yml +++ b/deployment/docker-compose.override.template.yml @@ -62,6 +62,18 @@ services: - celery_beat entrypoint: [ ] + plumber: + build: + context: ../ + dockerfile: deployment/plumber/Dockerfile + env_file: + - .env + volumes: + - ../django_project:/home/web/django_project + links: + - db + - redis + minio: image: quay.io/minio/minio:RELEASE.2024-03-30T09-41-56Z.fips command: minio server /data --console-address ":9001" diff --git a/deployment/docker-compose.test.yml b/deployment/docker-compose.test.yml index c87464cb..8e8e223d 100644 --- a/deployment/docker-compose.test.yml +++ b/deployment/docker-compose.test.yml @@ -28,7 +28,6 @@ services: - DATABASE_USERNAME=docker - DATABASE_PASSWORD=docker - DATABASE_HOST=db - - RABBITMQ_HOST=rabbitmq - DJANGO_SETTINGS_MODULE=core.settings.test - SECRET_KEY=SECRET_KEY diff --git a/deployment/docker-compose.yml b/deployment/docker-compose.yml index 5eaee272..ddd52c17 100644 --- a/deployment/docker-compose.yml +++ b/deployment/docker-compose.yml @@ -54,6 +54,7 @@ services: links: - db - worker + - plumber worker: <<: *default-common-django @@ -63,6 +64,7 @@ services: - db - redis - celery_beat + - plumber celery_beat: <<: *default-common-django @@ -81,3 +83,13 @@ services: - nginx-cache:/home/web/nginx_cache links: - django + + plumber: + build: + context: ../ + dockerfile: deployment/plumber/Dockerfile + env_file: + - .env + links: + - db + - redis diff --git a/deployment/docker/requirements-dev.txt b/deployment/docker/requirements-dev.txt index 07800d09..a0911799 100644 --- a/deployment/docker/requirements-dev.txt +++ b/deployment/docker/requirements-dev.txt @@ -16,4 +16,6 @@ pdbpp responses mock==4.0.3 -pytest-django \ No newline at end of file +pytest-django +# mock requests +requests-mock \ No newline at end of file diff --git a/deployment/plumber/Dockerfile b/deployment/plumber/Dockerfile new file mode 100644 index 00000000..0181ae58 --- /dev/null +++ b/deployment/plumber/Dockerfile @@ -0,0 +1,42 @@ +FROM rstudio/plumber:v1.2.0 AS prod + +# install python 3.10 +RUN apt-get update -y && apt-get upgrade -y + +RUN apt install software-properties-common -y + +RUN add-apt-repository ppa:deadsnakes/ppa + +RUN apt install Python3.10 + +RUN apt-get install -y --no-install-recommends \ + gcc gettext cron \ + spatialite-bin libsqlite3-mod-spatialite \ + python3-dev python3-gdal python3-psycopg2 python3-ldap \ + python3-pip python3-pil python3-lxml python3-pylibmc \ + uwsgi uwsgi-plugin-python3 \ + libfreetype6-dev libpng-dev libtiff5-dev libjpeg-dev + +# install R packages +RUN install2.r --error tidyverse tidygam mgcv ggpubr classInt zoo + +# Install pip packages +ADD deployment/docker/requirements.txt /requirements.txt +RUN pip3 install --upgrade pip && pip install --upgrade pip +# Fix uwsgi build failure missing cc1 +ARG CPUCOUNT=1 +RUN pip3 install -r /requirements.txt + +# add django project +ADD django_project /home/web/django_project + +# create directories +RUN mkdir -p /home/web/plumber_data/ + +# add entrypoint.sh +ADD django_project/plumber_entrypoint.sh /home/web/plumber_entrypoint.sh + +EXPOSE 8181 + +WORKDIR /home/web/django_project +ENTRYPOINT [ "/home/web/plumber_entrypoint.sh" ] diff --git a/django_project/core/settings/project.py b/django_project/core/settings/project.py index 3987b35f..5b146aae 100644 --- a/django_project/core/settings/project.py +++ b/django_project/core/settings/project.py @@ -32,7 +32,8 @@ 'core', 'frontend', 'gap', - 'gap_api' + 'gap_api', + 'spw' ) TEMPLATES[0]['DIRS'] += [ diff --git a/django_project/gap/admin.py b/django_project/gap/admin.py index 19f913d7..2177702d 100644 --- a/django_project/gap/admin.py +++ b/django_project/gap/admin.py @@ -67,7 +67,8 @@ class DatasetAdmin(admin.ModelAdmin): """Dataset admin.""" list_display = ( - 'name', 'provider', 'type', 'time_step', 'store_type', + 'name', 'provider', 'type', 'time_step', + 'store_type', 'is_internal_use' ) diff --git a/django_project/gap/providers/__init__.py b/django_project/gap/providers/__init__.py index 613ad831..1d61db45 100644 --- a/django_project/gap/providers/__init__.py +++ b/django_project/gap/providers/__init__.py @@ -10,6 +10,10 @@ from gap.providers.cbam import CBAMNetCDFReader from gap.providers.salient import SalientNetCDFReader from gap.providers.tahmo import TahmoDatasetReader +from gap.providers.tio import ( + TomorrowIODatasetReader, + PROVIDER_NAME as TIO_PROVIDER +) def get_reader_from_dataset(dataset: Dataset): @@ -27,6 +31,8 @@ def get_reader_from_dataset(dataset: Dataset): return SalientNetCDFReader elif dataset.provider.name == 'Tahmo': return TahmoDatasetReader + elif dataset.provider.name == TIO_PROVIDER: + return TomorrowIODatasetReader else: raise TypeError( f'Unsupported provider name: {dataset.provider.name}' diff --git a/django_project/gap/providers/cbam.py b/django_project/gap/providers/cbam.py index 5a389589..aa83dee1 100644 --- a/django_project/gap/providers/cbam.py +++ b/django_project/gap/providers/cbam.py @@ -30,7 +30,6 @@ ) - class CBAMNetCDFReader(BaseNetCDFReader): """Class to read NetCDF file from CBAM provider.""" diff --git a/django_project/gap/providers/tahmo.py b/django_project/gap/providers/tahmo.py index 95ea226e..b4f59521 100644 --- a/django_project/gap/providers/tahmo.py +++ b/django_project/gap/providers/tahmo.py @@ -2,7 +2,7 @@ """ Tomorrow Now GAP. -.. note:: CBAM Data Reader +.. note:: Tahmo Data Reader """ from typing import List diff --git a/django_project/gap/providers/tio.py b/django_project/gap/providers/tio.py new file mode 100644 index 00000000..67da6754 --- /dev/null +++ b/django_project/gap/providers/tio.py @@ -0,0 +1,397 @@ +# coding=utf-8 +""" +Tomorrow Now GAP. + +.. note:: Tomorrow.io Data Reader +""" + +import os +import json +import logging +from typing import List +from datetime import datetime, timedelta +import pytz +import requests +from django.contrib.gis.geos import Point + +from gap.models import ( + Provider, + CastType, + DatasetType, + Dataset, + DatasetAttribute, + DatasetTimeStep, + DatasetStore +) +from gap.utils.reader import ( + LocationInputType, + DatasetVariable, + DatasetReaderInput, + DatasetTimelineValue, + DatasetReaderValue, + BaseDatasetReader +) + +logger = logging.getLogger(__name__) +PROVIDER_NAME = 'Tomorrow.io' +TIO_VARIABLES = { + 'rainAccumulationSum': DatasetVariable( + 'Rain Accumulation', + 'The accumulated amount of liquid rain', + 'mm', 'total_rainfall' + ), + 'evapotranspirationSum': DatasetVariable( + 'Evapotranspiration', + 'The combined processes by which water moves from ' + 'the earth\'s surface into the atmosphere', + 'mm', 'total_evapotranspiration_flux' + ) +} + + +class TomorrowIODatasetReader(BaseDatasetReader): + """Class to read data from Tomorrow.io API.""" + + LONG_TERM_NORMALS_TYPE = 'Long Term Normals (20 years)' + BASE_URL = 'https://api.tomorrow.io/v4' + + def __init__( + self, dataset: Dataset, attributes: List[DatasetAttribute], + location_input: DatasetReaderInput, start_date: datetime, + end_date: datetime) -> None: + """Initialize Dataset Reader.""" + super().__init__( + dataset, attributes, location_input, start_date, end_date) + self.errors = None + self.warnings = None + self.results = [] + + @classmethod + def init_provider(cls): + """Init Tomorrow.io provider and variables.""" + provider, _ = Provider.objects.get_or_create(name='Tomorrow.io') + dt_historical, _ = DatasetType.objects.get_or_create( + name='Climate Reanalysis', + defaults={ + 'type': CastType.HISTORICAL + } + ) + ds_historical, _ = Dataset.objects.get_or_create( + name=f'{provider.name} {dt_historical.name}', + provider=provider, + type=dt_historical, + defaults={ + 'time_step': DatasetTimeStep.DAILY, + 'store_type': DatasetStore.EXT_API, + 'is_internal_use': True + } + ) + dt_shorttermforecast, _ = DatasetType.objects.get_or_create( + name='Short-term Forecast', + defaults={ + 'type': CastType.FORECAST + } + ) + ds_forecast, _ = Dataset.objects.get_or_create( + name=f'{provider.name} {dt_shorttermforecast.name}', + provider=provider, + type=dt_shorttermforecast, + defaults={ + 'time_step': DatasetTimeStep.DAILY, + 'store_type': DatasetStore.EXT_API, + 'is_internal_use': True + } + ) + dt_ltn, _ = DatasetType.objects.get_or_create( + name=cls.LONG_TERM_NORMALS_TYPE, + defaults={ + 'type': CastType.HISTORICAL + } + ) + ds_ltn, _ = Dataset.objects.get_or_create( + name=f'{provider.name} {dt_ltn.name}', + provider=provider, + type=dt_ltn, + defaults={ + 'time_step': DatasetTimeStep.DAILY, + 'store_type': DatasetStore.EXT_API, + 'is_internal_use': True + } + ) + + for key, val in TIO_VARIABLES.items(): + attr = val.get_gap_attribute() + # add to dataset attribute + DatasetAttribute.objects.get_or_create( + dataset=ds_historical, + attribute=attr, + source=key, + source_unit=attr.unit + ) + DatasetAttribute.objects.get_or_create( + dataset=ds_forecast, + attribute=attr, + source=key, + source_unit=attr.unit + ) + DatasetAttribute.objects.get_or_create( + dataset=ds_ltn, + attribute=attr, + source=key, + source_unit=attr.unit + ) + + def _is_ltn_request(self): + """Check if the request is for Long Term Normal (LTN) request.""" + return ( + self.dataset.type.type == CastType.HISTORICAL and + self.dataset.type.name == self.LONG_TERM_NORMALS_TYPE + ) + + def _get_api_key(self): + """Retrieve API Key for Tomorrow.io.""" + return os.environ.get('TOMORROW_IO_API_KEY', '') + + def _get_headers(self): + """Get request headers.""" + return { + 'Accept-Encoding': 'gzip', + 'accept': 'application/json', + 'content-type': 'application/json' + } + + def _get_payload( + self, start_date: datetime, end_date: datetime, + is_ltn: bool = False): + """Get request payload. + + This method will normalize the start_date if + start_date and date is less than 24H. + + :param start_date: _description_ + :type start_date: datetime + :param end_date: _description_ + :type end_date: datetime + :param is_ltn: _description_, defaults to False + :type is_ltn: bool, optional + :return: _description_ + :rtype: _type_ + """ + start_dt = start_date + if (end_date - start_dt).total_seconds() < 24 * 3600: + start_dt = start_dt - timedelta(days=1) + payload = { + 'location': ( + f'{self.location_input.point.y}, ' + f'{self.location_input.point.x}' + ), + 'fields': [attr.source for attr in self.attributes], + 'timesteps': ['1d'], + 'units': 'metric', + } + if is_ltn: + payload.update({ + 'startDate': start_dt.strftime('%m-%d'), + 'endDate': end_date.strftime('%m-%d') + }) + else: + payload.update({ + 'startTime': ( + start_dt.isoformat( + timespec='seconds').replace("+00:00", "Z") + ), + 'endTime': ( + end_date.isoformat( + timespec='seconds').replace("+00:00", "Z") + ), + }) + return payload + + def read(self): + """Read values from Tomorrow.io API.""" + self.results = [] + self.errors = None + self.warnings = None + today = datetime.now(tz=pytz.UTC) + if self.location_input.type != LocationInputType.POINT: + return + # handles: + # - start_date=end_date + # - d-7 should be using timelines API + # - historical/timelines may return the same day, + # choosing to use historical + if self._is_ltn_request(): + self._read_ltn_data( + self.start_date, + self.end_date + ) + elif self.dataset.type.type == CastType.HISTORICAL: + max_date = today - timedelta(days=7) + if self.start_date < max_date: + self.read_historical_data( + self.start_date, + self.end_date if self.end_date < max_date else + max_date + ) + if self.end_date >= max_date: + # read from forecast data + start_dt = self.start_date + if max_date > start_dt: + start_dt = max_date + timedelta(days=1) + self.read_forecast_data( + start_dt, + self.end_date + ) + else: + self.read_forecast_data( + self.start_date if self.start_date >= today else today, + self.end_date + ) + + def get_data_values(self) -> DatasetReaderValue: + """Fetch data values from dataset. + + :return: Data Value. + :rtype: DatasetReaderValue + """ + if self.location_input.type != LocationInputType.POINT: + return DatasetReaderValue(Point(x=0, y=0, srid=4326), []) + if not self.is_success(): + logger.error(f'Tomorrow.io API errors: {len(self.errors)}') + logger.error(json.dumps(self.errors)) + return DatasetReaderValue(self.location_input.point, []) + if self.warnings: + logger.warn(f'Tomorrow.io API warnings: {len(self.warnings)}') + logger.warn(json.dumps(self.warnings)) + return DatasetReaderValue(self.location_input.point, self.results) + + def read_historical_data(self, start_date: datetime, end_date: datetime): + """Read historical data from dataset. + + :param start_date: start date for reading historical data + :type start_date: datetime + :param end_date: end date for reading historical data + :type end_date: datetime + """ + url = f'{self.BASE_URL}/historical?apikey={self._get_api_key()}' + payload = self._get_payload(start_date, end_date) + response = requests.post( + url, json=payload, headers=self._get_headers()) + if response.status_code != 200: + self._get_error_from_response(response) + return + self.results.extend(self._parse_result(response.json())) + + def read_forecast_data(self, start_date: datetime, end_date: datetime): + """Read forecast data from dataset. + + :param start_date: start date for reading forecast data + :type start_date: datetime + :param end_date: end date for reading forecast data + :type end_date: datetime + """ + url = f'{self.BASE_URL}/timelines?apikey={self._get_api_key()}' + payload = self._get_payload(start_date, end_date) + response = requests.post( + url, json=payload, headers=self._get_headers()) + if response.status_code != 200: + self._get_error_from_response(response) + return + self.results.extend(self._parse_result(response.json())) + + def _read_ltn_data(self, start_date: datetime, end_date: datetime): + """Read Long Term Normals (LTN) data. + + :param start_date: start date for reading data + :type start_date: datetime + :param end_date: end date for reading data + :type end_date: datetime + """ + url = ( + f'{self.BASE_URL}/historical/normals?apikey={self._get_api_key()}' + ) + payload = self._get_payload(start_date, end_date, is_ltn=True) + response = requests.post( + url, json=payload, headers=self._get_headers()) + if response.status_code != 200: + self._get_error_from_response(response) + return + self.results = self._parse_result(response.json()) + + def _get_error_from_response(self, response): + """Get error detail from Tomorrow.io API response. + + :param response: API response + :type response: response object + """ + error = "Unknown error!" + try: + result = response.json() + error = f"{result.get('type', '')} {result.get('message', '')}" + except Exception: + pass + if self.errors is None: + self.errors = [error] + else: + self.errors.append(error) + + def _get_result_datetime(self, interval: dict) -> datetime: + """Parse datetime from API response. + + :param interval: interval dictionary + :type interval: dict + :return: datetime + :rtype: datetime + """ + dt_str = interval.get('startTime') + if self._is_ltn_request(): + dt_str = interval.get('startDate') + dt_str = f'{self.start_date.year}-{dt_str}' + return datetime.strptime( + dt_str, '%Y-%m-%d').replace(tzinfo=pytz.utc) + return datetime.fromisoformat(dt_str) + + def _parse_result(self, result: dict) -> List[DatasetTimelineValue]: + """Parse successful response from Tomorrow.io API. + + This method also checks for any warnings in the response. + :param result: response data + :type result: dict + :return: data values + :rtype: List[DatasetTimelineValue] + """ + value_list = [] + data = result.get('data', {}) + timelines = data.get('timelines', []) + intervals = ( + timelines[0].get('intervals', []) if len(timelines) > 0 else [] + ) + for interval in intervals: + start_dt = self._get_result_datetime(interval) + if start_dt < self.start_date or start_dt > self.end_date: + continue + values = interval.get('values') + value_data = {} + for attribute in self.attributes: + value_data[attribute.attribute.variable_name] = ( + values.get(attribute.source, None) + ) + value_list.append(DatasetTimelineValue( + start_dt, + value_data + )) + warnings = data.get('warnings', None) + if warnings: + if self.warnings is None: + self.warnings = warnings + else: + self.warnings.extend(warnings) + return value_list + + def is_success(self) -> bool: + """Check whether the API requests are successful. + + :return: True if there is no errors + :rtype: bool + """ + return self.errors is None diff --git a/django_project/gap/tests/providers/test_tio.py b/django_project/gap/tests/providers/test_tio.py new file mode 100644 index 00000000..710bdb45 --- /dev/null +++ b/django_project/gap/tests/providers/test_tio.py @@ -0,0 +1,285 @@ +# coding=utf-8 +""" +Tomorrow Now GAP. + +.. note:: Unit tests for Tomorrow.io Dataset Reader. +""" + +from django.test import TestCase +from unittest.mock import patch +from datetime import datetime, timedelta +import pytz +import requests_mock +from django.contrib.gis.geos import Point + +from gap.models import ( + Dataset, + DatasetAttribute, + DatasetType, + CastType +) +from gap.utils.reader import ( + DatasetReaderInput +) +from gap.providers.tio import TomorrowIODatasetReader + + +class TestTomorrowIODatasetReader(TestCase): + """Test class for Tomorrow io dataset reader.""" + + def setUp(self): + """Set test class.""" + TomorrowIODatasetReader.init_provider() + self.dataset = Dataset.objects.filter( + provider__name='Tomorrow.io' + ).first() + + attr = DatasetAttribute.objects.filter( + source='rainAccumulationSum', + dataset=self.dataset + ).first() + + self.attributes = [attr] + self.location_input = DatasetReaderInput.from_point( + Point(y=40.7128, x=-74.0060)) + self.start_date = datetime(2023, 7, 1, tzinfo=pytz.UTC) + self.start_date.replace(microsecond=0) + self.end_date = datetime(2023, 7, 10, tzinfo=pytz.UTC) + self.end_date.replace(microsecond=0) + + self.reader = TomorrowIODatasetReader( + self.dataset, self.attributes, self.location_input, + self.start_date, self.end_date + ) + + @requests_mock.Mocker() + @patch('os.environ.get', return_value='dummy_api_key') + def test_read_historical_data(self, mock_request, mock_env): + """Test read historical data.""" + mock_response = { + 'data': { + 'timelines': [{ + 'intervals': [ + { + 'startTime': '2023-07-01T00:00:00Z', + 'values': { + 'rainAccumulationSum': 5.0 + } + } + ] + }] + } + } + mock_request.post( + 'https://api.tomorrow.io/v4/historical?apikey=dummy_api_key', + json=mock_response, status_code=200 + ) + + self.reader.read_historical_data(self.start_date, self.end_date) + self.assertEqual(len(self.reader.results), 1) + self.assertEqual(self.reader.results[0].values['total_rainfall'], 5.0) + # mock 400 error + mock_response = { + 'type': 'Unknown', + 'message': 'Test error' + } + self.reader.errors = None + self.reader.results = [] + mock_request.post( + 'https://api.tomorrow.io/v4/historical?apikey=dummy_api_key', + json=mock_response, status_code=400 + ) + + self.reader.read_historical_data(self.start_date, self.end_date) + self.assertEqual(len(self.reader.results), 0) + self.assertEqual(len(self.reader.errors), 1) + + @requests_mock.Mocker() + @patch('os.environ.get', return_value='dummy_api_key') + def test_read_forecast_data(self, mock_request, mock_env): + """Test read forecast data.""" + mock_response = { + 'data': { + 'timelines': [{ + 'intervals': [ + { + 'startTime': '2023-07-01T00:00:00Z', + 'values': { + 'rainAccumulationSum': 10.0 + } + } + ] + }], + 'warnings': ['test-warnings'] + } + } + mock_request.post( + 'https://api.tomorrow.io/v4/timelines?apikey=dummy_api_key', + json=mock_response, status_code=200 + ) + + self.reader.read_forecast_data(self.start_date, self.end_date) + self.assertEqual(len(self.reader.results), 1) + self.assertEqual( + self.reader.results[0].values['total_rainfall'], 10.0) + self.assertEqual(len(self.reader.warnings), 1) + # test get_data_values + reader_value = self.reader.get_data_values() + self.assertEqual(reader_value.location, self.location_input.point) + self.assertEqual(len(reader_value.results), 1) + # mock 400 error + mock_response = { + 'type': 'Unknown', + 'message': 'Test error' + } + self.reader.errors = [] + self.reader.results = [] + mock_request.post( + 'https://api.tomorrow.io/v4/timelines?apikey=dummy_api_key', + json=mock_response, status_code=400 + ) + + self.reader.read_forecast_data(self.start_date, self.end_date) + self.assertEqual(len(self.reader.results), 0) + self.assertEqual(len(self.reader.errors), 1) + # test get_data_values + reader_value = self.reader.get_data_values() + self.assertEqual(reader_value.location, self.location_input.point) + self.assertEqual(len(reader_value.results), 0) + + @requests_mock.Mocker() + @patch('os.environ.get', return_value='dummy_api_key') + def test_read_ltn_data(self, mock_request, mock_env): + """Test read LTN data.""" + self.reader.dataset = Dataset.objects.filter( + provider__name='Tomorrow.io', + type__name=TomorrowIODatasetReader.LONG_TERM_NORMALS_TYPE + ).first() + mock_response = { + 'data': { + 'timelines': [{ + 'intervals': [ + { + 'startDate': '07-01', + 'values': { + 'rainAccumulationSum': 2.0 + } + } + ] + }] + } + } + mock_request.post( + 'https://api.tomorrow.io/v4/historical/' + 'normals?apikey=dummy_api_key', + json=mock_response, status_code=200 + ) + + self.reader._read_ltn_data(self.start_date, self.end_date) + self.assertEqual(len(self.reader.results), 1) + self.assertEqual(self.reader.results[0].values['total_rainfall'], 2.0) + # mock 400 error + mock_response = { + 'type': 'Unknown', + 'message': 'Test error' + } + self.reader.errors = [] + self.reader.results = [] + mock_request.post( + 'https://api.tomorrow.io/v4/historical/' + 'normals?apikey=dummy_api_key', + json=mock_response, status_code=400 + ) + + self.reader._read_ltn_data(self.start_date, self.end_date) + self.assertEqual(len(self.reader.results), 0) + self.assertEqual(len(self.reader.errors), 1) + + @patch.object(TomorrowIODatasetReader, 'read_historical_data') + @patch.object(TomorrowIODatasetReader, 'read_forecast_data') + @patch.object(TomorrowIODatasetReader, '_read_ltn_data') + def test_read_ltn_request( + self, mock_read_ltn_data, mock_read_forecast_data, + mock_read_historical_data): + """Test read() method that calls ltn request.""" + dt_now = datetime.now(tz=pytz.UTC) + dt_now.replace(microsecond=0) + self.reader.start_date = dt_now - timedelta(days=10) + self.reader.end_date = dt_now + self.reader.dataset.type = DatasetType( + type=CastType.HISTORICAL, + name=TomorrowIODatasetReader.LONG_TERM_NORMALS_TYPE) + + # Call the read method + with patch('gap.providers.tio.datetime') as mock_datetime: + mock_datetime.now.return_value = dt_now + self.reader.read() + + # Check that the correct method is called for LTN + mock_read_ltn_data.assert_called_once() + self.assertEqual( + mock_read_ltn_data.call_args[0][0], self.reader.start_date) + self.assertEqual( + mock_read_ltn_data.call_args[0][1], self.reader.end_date) + mock_read_historical_data.assert_not_called() + mock_read_forecast_data.assert_not_called() + + @patch.object(TomorrowIODatasetReader, 'read_historical_data') + @patch.object(TomorrowIODatasetReader, 'read_forecast_data') + @patch.object(TomorrowIODatasetReader, '_read_ltn_data') + def test_read_historical_and_forecast_data( + self, mock_read_ltn_data, mock_read_forecast_data, + mock_read_historical_data): + """Test read() method that calls historical and forecast.""" + dt_now = datetime.now(tz=pytz.UTC) + dt_now.replace(microsecond=0) + self.reader.dataset.type = DatasetType( + type=CastType.HISTORICAL, name='TestHistorical') + self.reader.start_date = dt_now - timedelta(days=10) + self.reader.end_date = dt_now # Ensure end_date is after max_date + + # Call the read method + with patch('gap.providers.tio.datetime') as mock_datetime: + mock_datetime.now.return_value = dt_now + self.reader.read() + + # Check that the correct methods are called + max_date = dt_now - timedelta(days=7) + mock_read_historical_data.assert_called_once() + self.assertEqual( + mock_read_historical_data.call_args[0][0], self.reader.start_date) + self.assertEqual(mock_read_historical_data.call_args[0][1], max_date) + mock_read_forecast_data.assert_called_once() + self.assertEqual( + mock_read_forecast_data.call_args[0][0], + max_date + timedelta(days=1)) + self.assertEqual( + mock_read_forecast_data.call_args[0][1], self.reader.end_date) + mock_read_ltn_data.assert_not_called() + + @patch.object(TomorrowIODatasetReader, 'read_historical_data') + @patch.object(TomorrowIODatasetReader, 'read_forecast_data') + @patch.object(TomorrowIODatasetReader, '_read_ltn_data') + def test_read_forecast_data_only( + self, mock_read_ltn_data, mock_read_forecast_data, + mock_read_historical_data): + """Test read() method that calls forecast only.""" + dt_now = datetime.now(tz=pytz.UTC) + dt_now.replace(microsecond=0) + self.reader.end_date = dt_now + self.reader.dataset.type = DatasetType( + type=CastType.FORECAST, name='TestForecast') + self.reader.start_date = dt_now - timedelta(days=1) + + # Call the read method + with patch('gap.providers.tio.datetime') as mock_datetime: + mock_datetime.now.return_value = dt_now + self.reader.read() + + # Check that the correct method is called + mock_read_forecast_data.assert_called_once() + self.assertEqual(mock_read_forecast_data.call_args[0][0], dt_now) + self.assertEqual( + mock_read_forecast_data.call_args[0][1], self.reader.end_date) + mock_read_historical_data.assert_not_called() + mock_read_ltn_data.assert_not_called() diff --git a/django_project/gap/utils/netcdf.py b/django_project/gap/utils/netcdf.py index 300e7861..5309b420 100644 --- a/django_project/gap/utils/netcdf.py +++ b/django_project/gap/utils/netcdf.py @@ -80,16 +80,6 @@ def get_s3_client_kwargs(cls, provider: Provider): return client_kwargs -class NetCDFVariable: - """Contains Variable from NetCDF File.""" - - def __init__(self, name, desc, unit=None) -> None: - """Initialize NetCDFVariable object.""" - self.name = name - self.desc = desc - self.unit = unit - - def daterange_inc(start_date: datetime, end_date: datetime): """Iterate through start_date and end_date (inclusive). diff --git a/django_project/gap/utils/reader.py b/django_project/gap/utils/reader.py index ae0abbec..eec8b7d6 100644 --- a/django_project/gap/utils/reader.py +++ b/django_project/gap/utils/reader.py @@ -15,11 +15,57 @@ ) from gap.models import ( + CastType, + Attribute, + Unit, Dataset, DatasetAttribute ) +class DatasetVariable: + """Contains Variable from a Dataset.""" + + def __init__( + self, name, desc, unit, attr_var_name=None) -> None: + """Initialize variable object. + + :param name: Name of the variable + :type name: str + :param desc: Description of the variable + :type desc: str + :param unit: Unit + :type unit: str, optional + :param attr_var_name: Mapping to attribute name, defaults to None + :type attr_var_name: str, optional + """ + self.name = name + self.desc = desc + self.unit = unit + self.attr_var_name = attr_var_name + + def get_gap_attribute(self) -> Attribute: + """Get or create a mapping attribute. + + :return: Gap Attribute + :rtype: Attribute + """ + if self.attr_var_name is None: + return None + unit, _ = Unit.objects.get_or_create( + name=self.unit + ) + attr, _ = Attribute.objects.get_or_create( + variable_name=self.attr_var_name, + defaults={ + 'description': self.desc, + 'name': self.name, + 'unit': unit, + } + ) + return attr + + class DatasetTimelineValue: """Class representing data value for given datetime.""" @@ -45,6 +91,23 @@ def _datetime_as_str(self): self.datetime, unit='s', timezone='UTC') return self.datetime.isoformat(timespec='seconds') + def get_datetime_repr(self, format: str) -> str: + """Return the representation of datetime in given format. + + :param format: Format like '%Y-%m-%d' + :type format: str + :return: String of datetime + :rtype: str + """ + dt = self.datetime + if isinstance(self.datetime, np.datetime64): + timestamp = ( + (dt - np.datetime64('1970-01-01T00:00:00')) / + np.timedelta64(1, 's') + ) + dt = datetime.fromtimestamp(timestamp, tz=pytz.UTC) + return dt.strftime(format) + def to_dict(self): """Convert into dict. @@ -238,18 +301,15 @@ def get_attributes_metadata(self) -> dict: def read(self): """Read values from dataset.""" today = datetime.now(tz=pytz.UTC) - if self.start_date < today: + if self.dataset.type.type == CastType.HISTORICAL: self.read_historical_data( self.start_date, self.end_date if self.end_date < today else today ) - if self.end_date > today: - self.read_forecast_data( - today, self.end_date - ) - else: + elif self.end_date >= today: self.read_forecast_data( - self.start_date, self.end_date + self.start_date if self.start_date >= today else today, + self.end_date ) def get_data_values(self) -> DatasetReaderValue: diff --git a/django_project/gap_api/api_views/measurement.py b/django_project/gap_api/api_views/measurement.py index 45fb87d8..cd0d2bac 100644 --- a/django_project/gap_api/api_views/measurement.py +++ b/django_project/gap_api/api_views/measurement.py @@ -193,12 +193,12 @@ def get_response_data(self): 'results': [] } for reader in dataset_dict.values(): - data['metadata']['dataset'].append({ - 'provider': reader.dataset.provider.name, - 'attributes': reader.get_attributes_metadata() - }) values = self._read_data(reader).to_dict() if values: + data['metadata']['dataset'].append({ + 'provider': reader.dataset.provider.name, + 'attributes': reader.get_attributes_metadata() + }) data['results'].append(values) return data @@ -242,13 +242,15 @@ def get(self, request, *args, **kwargs): ) @swagger_auto_schema( - operation_id='get-measurement-by-polygon', + operation_id='get-measurement-by-geom', tags=[ApiTag.Measurement], manual_parameters=[ *api_parameters ], request_body=openapi.Schema( - description='Polygon (SRID 4326) in geojson format', + description=( + 'MultiPolygon or MultiPoint (SRID 4326) in geojson format' + ), type=openapi.TYPE_STRING ), responses={ @@ -263,7 +265,7 @@ def get(self, request, *args, **kwargs): } ) def post(self, request, *args, **kwargs): - """Fetch measurement data by polygon.""" + """Fetch measurement data by polygon/points.""" return Response( status=200, data=self.get_response_data() diff --git a/django_project/plumber_entrypoint.sh b/django_project/plumber_entrypoint.sh new file mode 100755 index 00000000..bc2edb3a --- /dev/null +++ b/django_project/plumber_entrypoint.sh @@ -0,0 +1,21 @@ +#!/bin/sh + +# Exit script in case of error +set -e + +echo $"\n\n\n" +echo "-----------------------------------------------------" +echo "STARTING PLUMBER ENTRYPOINT $(date)" +echo "-----------------------------------------------------" + +# Run initialization +cd /home/web/django_project +echo 'Running plumber_initialize.py...' +python3 -u plumber_initialize.py + +echo "-----------------------------------------------------" +echo "FINISHED PLUMBER ENTRYPOINT --------------------------" +echo "-----------------------------------------------------" + +# start worker for Plumber +celery -A core worker -c 1 -Q plumber -l INFO -n plumberworker diff --git a/django_project/plumber_initialize.py b/django_project/plumber_initialize.py new file mode 100644 index 00000000..cc74f312 --- /dev/null +++ b/django_project/plumber_initialize.py @@ -0,0 +1,48 @@ +"""This script initializes plumber service.""" + +######################################################### +# Setting up the context +######################################################### + +######################################################### +# Imports +######################################################### +import django +from django.db import connection +from django.db.utils import OperationalError +import time + + +django.setup() + +######################################################### +# 1. Waiting for PostgreSQL +######################################################### + +print('-----------------------------------------------------') +print('1. Waiting for PostgreSQL') +for _ in range(60): + try: + connection.ensure_connection() + break + except OperationalError: + time.sleep(1) +else: + connection.ensure_connection() +connection.close() + +print('-----------------------------------------------------') +print('2. Generate plumber.R file') +from spw.utils.plumber import ( # noqa + spawn_r_plumber, + write_plumber_file +) +write_plumber_file() + +print('-----------------------------------------------------') +print('3. Spawn initial plumber process') +plumber_process = spawn_r_plumber() +if plumber_process: + print(f'plumber process pid {plumber_process.pid}') +else: + raise RuntimeError('Cannot execute plumber process!') diff --git a/django_project/spw/__init__.py b/django_project/spw/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/django_project/spw/admin.py b/django_project/spw/admin.py new file mode 100644 index 00000000..4c24eacf --- /dev/null +++ b/django_project/spw/admin.py @@ -0,0 +1,44 @@ +# coding=utf-8 +""" +Tomorrow Now GAP. + +.. note:: Admins +""" +from django.contrib import admin, messages + +from spw.models import RModel, RModelOutput, RModelExecutionLog +from spw.tasks import start_plumber_process + + +@admin.action(description='Restart plumber process') +def restart_plumber_process(modeladmin, request, queryset): + """Restart plumber process action.""" + start_plumber_process.apply_async(queue='plumber') + modeladmin.message_user( + request, + 'Plumber process will be started in background!', + messages.SUCCESS + ) + + +class RModelOutputInline(admin.TabularInline): + """Inline list for model output in RModel admin page.""" + + model = RModelOutput + extra = 1 + + +@admin.register(RModel) +class RModelAdmin(admin.ModelAdmin): + """Admin page for RModel.""" + + list_display = ('name', 'version', 'created_on') + inlines = [RModelOutputInline] + actions = [restart_plumber_process] + + +@admin.register(RModelExecutionLog) +class RModelExecutionLogAdmin(admin.ModelAdmin): + """Admin page for RModelExecutionLog.""" + + list_display = ('model', 'start_date_time', 'status') diff --git a/django_project/spw/apps.py b/django_project/spw/apps.py new file mode 100644 index 00000000..877a94e2 --- /dev/null +++ b/django_project/spw/apps.py @@ -0,0 +1,14 @@ +# coding=utf-8 +""" +Tomorrow Now GAP. + +.. note:: AppConfig for SPW +""" +from django.apps import AppConfig + + +class SpwConfig(AppConfig): + """SPW App Config.""" + + default_auto_field = 'django.db.models.BigAutoField' + name = 'spw' diff --git a/django_project/spw/factories.py b/django_project/spw/factories.py new file mode 100644 index 00000000..7682e12c --- /dev/null +++ b/django_project/spw/factories.py @@ -0,0 +1,64 @@ +# coding=utf-8 +""" +Tomorrow Now GAP. + +.. note:: Factory classes for Models +""" +import factory +from django.contrib.gis.geos import Point +from django.db.models.signals import post_save, post_delete + +from core.factories import BaseMetaFactory, BaseFactory, UserF +from spw.models import ( + RModel, + RModelOutput, + RModelExecutionLog, + RModelOutputType +) + + +@factory.django.mute_signals(post_save, post_delete) +class RModelFactory( + BaseFactory[RModel], metaclass=BaseMetaFactory[RModel] +): + """Factory class for RModel model.""" + + class Meta: # noqa + model = RModel + + name = factory.Faker('company') + version = 1.0 + code = 'd <- 100 + 2' + notes = factory.Faker('text') + created_on = factory.Faker('date_time') + updated_on = factory.Faker('date_time') + created_by = factory.SubFactory(UserF) + updated_by = factory.SubFactory(UserF) + + +class RModelOutputFactory( + BaseFactory[RModelOutput], metaclass=BaseMetaFactory[RModelOutput] +): + """Factory class for RModelOutput.""" + + class Meta: # noqa + model = RModelOutput + + model = factory.SubFactory(RModelFactory) + type = RModelOutputType.GO_NO_GO_STATUS + variable_name = RModelOutputType.GO_NO_GO_STATUS + + +class RModelExecutionLogFactory( + BaseFactory[RModelExecutionLog], + metaclass=BaseMetaFactory[RModelExecutionLog] +): + """Factory class for RModelExecutionLog.""" + + class Meta: # noqa + model = RModelExecutionLog + + model = factory.SubFactory(RModelFactory) + location_input = factory.LazyFunction(lambda: Point(0, 0)) + start_date_time = factory.Faker('date_time') + end_date_time = factory.Faker('date_time') diff --git a/django_project/spw/generator.py b/django_project/spw/generator.py new file mode 100644 index 00000000..519c31ea --- /dev/null +++ b/django_project/spw/generator.py @@ -0,0 +1,217 @@ +# coding=utf-8 +""" +Tomorrow Now GAP. + +.. note:: SPW Generator +""" + +import logging +import os +import pytz +from typing import List +from types import SimpleNamespace +from datetime import datetime, timedelta +from django.utils import timezone +from django.contrib.gis.geos import Point + +from gap.models import Dataset, DatasetAttribute, CastType +from gap.utils.reader import DatasetReaderInput +from gap.providers import TomorrowIODatasetReader, TIO_PROVIDER +from spw.models import RModel, RModelExecutionLog, RModelExecutionStatus +from spw.utils.plumber import ( + execute_spw_model, + write_plumber_data, + remove_plumber_data +) + + +logger = logging.getLogger(__name__) +ATTRIBUTES = [ + 'total_evapotranspiration_flux', + 'total_rainfall' +] +COLUMNS = [ + 'month_day', + 'date', + 'evapotranspirationSum', + 'rainAccumulationSum', + 'LTNPET', + 'LTNPrecip' +] +VAR_MAPPING = { + 'total_evapotranspiration_flux': 'evapotranspirationSum', + 'total_rainfall': 'rainAccumulationSum' +} +LTN_MAPPING = { + 'total_evapotranspiration_flux': 'LTNPET', + 'total_rainfall': 'LTNPrecip' +} + + +class SPWOutput: + """Class to store the output from SPW model.""" + + def __init__( + self, point: Point, input_data: dict) -> None: + """Initialize the SPWOutput class.""" + self.point = point + data = {} + for key, val in input_data.items(): + if key == 'metadata': + continue + if isinstance(val, list) and len(val) == 1: + data[key] = val[0] + else: + data[key] = val + self.data = SimpleNamespace(**data) + + +def calculate_from_point(point: Point) -> SPWOutput: + """Calculate SPW from given point. + + :param point: Location to be queried + :type point: Point + :return: Output with GoNoGo classification + :rtype: SPWOutput + """ + TomorrowIODatasetReader.init_provider() + location_input = DatasetReaderInput.from_point(point) + attrs = list(DatasetAttribute.objects.filter( + attribute__variable_name__in=ATTRIBUTES, + dataset__provider__name=TIO_PROVIDER + )) + today = datetime.now(tz=pytz.UTC) + start_dt = today - timedelta(days=37) + end_dt = today + timedelta(days=14) + print(f'Today: {today} - start_dt: {start_dt} - end_dt: {end_dt}') + historical_dict = _fetch_timelines_data( + location_input, attrs, start_dt, end_dt + ) + final_dict = _fetch_ltn_data( + location_input, attrs, start_dt, end_dt, historical_dict) + rows = [] + for month_day, val in final_dict.items(): + row = [month_day] + for c in COLUMNS: + if c == 'month_day': + continue + row.append(val.get(c, 0)) + rows.append(row) + return _execute_spw_model(rows, point) + + +def _execute_spw_model(rows: List, point: Point) -> SPWOutput: + """Execute SPW Model and return the output. + + :param rows: Data rows + :type rows: List + :param point: location input + :type point: Point + :return: SPW Model output + :rtype: SPWOutput + """ + model = RModel.objects.order_by('-version').first() + data_file_path = write_plumber_data(COLUMNS, rows, dir_path='/tmp') + filename = os.path.basename(data_file_path) + execution_log = RModelExecutionLog.objects.create( + model=model, + location_input=point, + start_date_time=timezone.now() + ) + with open(data_file_path, 'rb') as output_file: + execution_log.input_file.save(filename, output_file) + remove_plumber_data(data_file_path) + success, data = execute_spw_model( + execution_log.input_file.url, filename, point.y, point.x, 'gap_place') + if isinstance(data, dict): + execution_log.output = data + else: + execution_log.errors = data + output = None + if success: + output = SPWOutput(point, data) + execution_log.status = ( + RModelExecutionStatus.SUCCESS if success else + RModelExecutionStatus.FAILED + ) + execution_log.end_date_time = timezone.now() + execution_log.save() + return output + + +def _fetch_timelines_data( + location_input: DatasetReaderInput, attrs: List[DatasetAttribute], + start_dt: datetime, end_dt: datetime) -> dict: + """Fetch historical and forecast data for given location. + + :param location_input: Location for the query + :type location_input: DatasetReaderInput + :param attrs: List of attributes + :type attrs: List[DatasetAttribute] + :param start_dt: Start date time + :type start_dt: datetime + :param end_dt: End date time + :type end_dt: datetime + :return: Dictionary of month_day and results + :rtype: dict + """ + dataset = Dataset.objects.filter( + provider__name=TIO_PROVIDER, + type__type=CastType.HISTORICAL + ).exclude( + type__name=TomorrowIODatasetReader.LONG_TERM_NORMALS_TYPE + ).first() + reader = TomorrowIODatasetReader( + dataset, attrs, location_input, start_dt, end_dt) + reader.read() + values = reader.get_data_values() + results = {} + for val in values.results: + month_day = val.get_datetime_repr('%m-%d') + val_dict = val.to_dict()['values'] + data = { + 'date': val.get_datetime_repr('%Y-%m-%d') + } + for k, v in VAR_MAPPING.items(): + data[v] = val_dict.get(k, 0) + results[month_day] = data + return results + + +def _fetch_ltn_data( + location_input: DatasetReaderInput, attrs: List[DatasetAttribute], + start_dt: datetime, end_dt: datetime, + historical_dict: dict) -> dict: + """Fetch Long Term Normals data for given location. + + The resulting data will be merged into historical_dict. + + :param location_input: Location for the query + :type location_input: DatasetReaderInput + :param attrs: List of attributes + :type attrs: List[DatasetAttribute] + :param start_dt: Start date time + :type start_dt: datetime + :param end_dt: End date time + :type end_dt: datetime + :param historical_dict: Dictionary from historical data + :type historical_dict: dict + :return: Merged dictinoary with LTN data + :rtype: dict + """ + dataset = Dataset.objects.filter( + provider__name=TIO_PROVIDER, + type__type=CastType.HISTORICAL, + type__name=TomorrowIODatasetReader.LONG_TERM_NORMALS_TYPE + ).first() + reader = TomorrowIODatasetReader( + dataset, attrs, location_input, start_dt, end_dt) + reader.read() + values = reader.get_data_values() + for val in values.results: + month_day = val.get_datetime_repr('%m-%d') + if month_day in historical_dict: + data = historical_dict[month_day] + for k, v in LTN_MAPPING.items(): + data[v] = val.values.get(k, '') + return historical_dict diff --git a/django_project/spw/migrations/0001_initial.py b/django_project/spw/migrations/0001_initial.py new file mode 100644 index 00000000..ec013cfb --- /dev/null +++ b/django_project/spw/migrations/0001_initial.py @@ -0,0 +1,56 @@ +# Generated by Django 4.2.7 on 2024-07-20 12:39 + +from django.conf import settings +import django.contrib.gis.db.models.fields +from django.db import migrations, models +import django.db.models.deletion +import spw.models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.CreateModel( + name='RModel', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('name', models.CharField(max_length=256)), + ('version', models.FloatField()), + ('code', models.TextField()), + ('notes', models.TextField(blank=True, null=True)), + ('created_on', models.DateTimeField()), + ('updated_on', models.DateTimeField()), + ('created_by', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)), + ('updated_by', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='rmodel_updater', to=settings.AUTH_USER_MODEL)), + ], + ), + migrations.CreateModel( + name='RModelOutput', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('type', models.CharField(choices=[('goNoGo', 'goNoGo'), ('days_h2to_f2', 'days_h2to_f2'), ('days_f3to_f5', 'days_f3to_f5'), ('days_f6to_f13', 'days_f6to_f13'), ('nearDaysLTNPercent', 'nearDaysLTNPercent'), ('nearDaysCurPercent', 'nearDaysCurPercent')], max_length=100)), + ('variable_name', models.CharField(max_length=100)), + ('model', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='spw.rmodel')), + ], + ), + migrations.CreateModel( + name='RModelExecutionLog', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('location_input', django.contrib.gis.db.models.fields.GeometryField(blank=True, null=True, srid=4326)), + ('input_file', models.FileField(blank=True, null=True, upload_to=spw.models.r_model_input_file_path)), + ('output', models.JSONField(blank=True, default=dict, null=True)), + ('start_date_time', models.DateTimeField(blank=True, null=True)), + ('end_date_time', models.DateTimeField(blank=True, null=True)), + ('status', models.CharField(choices=[('RUNNING', 'RUNNING'), ('SUCCESS', 'SUCCESS'), ('FAILED', 'FAILED')], default='RUNNING', max_length=512)), + ('errors', models.TextField(blank=True, null=True)), + ('model', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='spw.rmodel')), + ], + ), + ] diff --git a/django_project/spw/migrations/__init__.py b/django_project/spw/migrations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/django_project/spw/models.py b/django_project/spw/models.py new file mode 100644 index 00000000..c1b0bbac --- /dev/null +++ b/django_project/spw/models.py @@ -0,0 +1,137 @@ +# coding=utf-8 +""" +Tomorrow Now GAP. + +.. note:: Models for SPW R code +""" +from django.db import models +from django.contrib.auth import get_user_model +from django.dispatch import receiver +from django.db.models.signals import post_save, post_delete +from django.contrib.gis.db import models as gis_models +from django.conf import settings + + +User = get_user_model() + + +def r_model_input_file_path(instance, filename): + """Return upload path for R Model input files.""" + return f'{settings.STORAGE_DIR_PREFIX}r_input/{filename}' + + +class RModel(models.Model): + """Model that stores R code.""" + + name = models.CharField(max_length=256) + version = models.FloatField() + code = models.TextField() + notes = models.TextField( + null=True, + blank=True + ) + created_on = models.DateTimeField() + created_by = models.ForeignKey(User, on_delete=models.CASCADE) + updated_on = models.DateTimeField() + updated_by = models.ForeignKey( + User, on_delete=models.CASCADE, related_name='rmodel_updater') + + +class RModelOutputType: + """R model output type.""" + + GO_NO_GO_STATUS = 'goNoGo' + DAYS_h2TO_F2 = 'days_h2to_f2' + DAYS_f3TO_F5 = 'days_f3to_f5' + DAYS_f6TO_F13 = 'days_f6to_f13' + NEAR_DAYS_LTN_PERCENT = 'nearDaysLTNPercent' + NEAR_DAYS_CUR_PERCENT = 'nearDaysCurPercent' + + +class RModelOutput(models.Model): + """Model that stores relationship between R Model and its outputs.""" + + model = models.ForeignKey(RModel, on_delete=models.CASCADE) + type = models.CharField( + max_length=100, + choices=( + (RModelOutputType.GO_NO_GO_STATUS, + RModelOutputType.GO_NO_GO_STATUS), + (RModelOutputType.DAYS_h2TO_F2, + RModelOutputType.DAYS_h2TO_F2), + (RModelOutputType.DAYS_f3TO_F5, + RModelOutputType.DAYS_f3TO_F5), + (RModelOutputType.DAYS_f6TO_F13, + RModelOutputType.DAYS_f6TO_F13), + (RModelOutputType.NEAR_DAYS_LTN_PERCENT, + RModelOutputType.NEAR_DAYS_LTN_PERCENT), + (RModelOutputType.NEAR_DAYS_CUR_PERCENT, + RModelOutputType.NEAR_DAYS_CUR_PERCENT), + ) + ) + variable_name = models.CharField(max_length=100) + + +@receiver(post_save, sender=RModel) +def rmodel_post_create(sender, instance: RModel, + created, *args, **kwargs): + """Restart plumber process when a RModel is created.""" + from spw.tasks import ( + start_plumber_process + ) + if instance.code and instance.id: + start_plumber_process.apply_async(queue='plumber') + + +@receiver(post_delete, sender=RModel) +def rmodel_post_delete(sender, instance: RModel, + *args, **kwargs): + """Restart plumber process when a RModel is deleted.""" + from spw.tasks import ( + start_plumber_process + ) + # respawn Plumber API + start_plumber_process.apply_async(queue='plumber') + + +class RModelExecutionStatus: + """Status of R Model execution.""" + + RUNNING = 'RUNNING' + SUCCESS = 'SUCCESS' + FAILED = 'FAILED' + + +class RModelExecutionLog(models.Model): + """Model that stores the execution log.""" + + model = models.ForeignKey(RModel, on_delete=models.CASCADE) + location_input = gis_models.GeometryField( + srid=4326, null=True, blank=True + ) + input_file = models.FileField( + upload_to=r_model_input_file_path, + null=True, blank=True + ) + output = models.JSONField( + default=dict, + null=True, blank=True + ) + start_date_time = models.DateTimeField( + blank=True, null=True + ) + end_date_time = models.DateTimeField( + blank=True, null=True + ) + status = models.CharField( + default=RModelExecutionStatus.RUNNING, + choices=( + (RModelExecutionStatus.RUNNING, RModelExecutionStatus.RUNNING), + (RModelExecutionStatus.SUCCESS, RModelExecutionStatus.SUCCESS), + (RModelExecutionStatus.FAILED, RModelExecutionStatus.FAILED), + ), + max_length=512 + ) + errors = models.TextField( + blank=True, null=True + ) diff --git a/django_project/spw/tasks.py b/django_project/spw/tasks.py new file mode 100644 index 00000000..7e0e2432 --- /dev/null +++ b/django_project/spw/tasks.py @@ -0,0 +1,33 @@ +# coding=utf-8 +""" +Tomorrow Now GAP. + +.. note:: SPW Tasks +""" +from celery import shared_task +from celery.utils.log import get_task_logger + +from spw.utils.plumber import ( + kill_r_plumber_process, + spawn_r_plumber, + write_plumber_file +) + + +logger = get_task_logger(__name__) + + +@shared_task(name="start_plumber_process") +def start_plumber_process(): + """Start plumber process when there is R code change.""" + logger.info('Starting plumber process') + # kill existing process + kill_r_plumber_process() + # Generate plumber.R file + write_plumber_file() + # spawn the process + plumber_process = spawn_r_plumber() + if plumber_process: + logger.info(f'plumber process pid {plumber_process.pid}') + else: + raise RuntimeError('Cannot execute plumber process!') diff --git a/django_project/spw/tests/__init__.py b/django_project/spw/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/django_project/spw/tests/test_generator.py b/django_project/spw/tests/test_generator.py new file mode 100644 index 00000000..6ed0af57 --- /dev/null +++ b/django_project/spw/tests/test_generator.py @@ -0,0 +1,251 @@ +# coding=utf-8 +""" +Tomorrow Now GAP. + +.. note:: UnitTest for Plumber functions. +""" +from datetime import datetime, timedelta +import pytz +from unittest.mock import patch, MagicMock +from django.test import TestCase +from django.contrib.gis.geos import Point + +from gap.models import ( + DatasetAttribute, + Dataset +) +from gap.utils.reader import ( + DatasetReaderInput, + DatasetReaderValue, + DatasetTimelineValue +) +from gap.providers.tio import ( + TomorrowIODatasetReader +) +from spw.models import RModelExecutionLog, RModelExecutionStatus +from spw.generator import ( + SPWOutput, + _fetch_timelines_data, + _fetch_ltn_data, + calculate_from_point +) +from spw.factories import ( + RModelFactory +) + + +class TestSPWOutput(TestCase): + """Unit test for SPWOutput class.""" + + def setUp(self): + """Set the test class.""" + self.point = Point(y=1.0, x=1.0) + self.input_data = { + 'temperature': [20.5], + 'pressure': [101.3], + 'humidity': 45, + 'metadata': 'some metadata' + } + self.expected_data = { + 'temperature': 20.5, + 'pressure': 101.3, + 'humidity': 45 + } + + def test_initialization(self): + """Test initialization of SPWOutput class.""" + spw_output = SPWOutput(self.point, self.input_data) + + self.assertEqual(spw_output.point, self.point) + self.assertEqual( + spw_output.data.temperature, self.expected_data['temperature']) + self.assertEqual( + spw_output.data.pressure, self.expected_data['pressure']) + self.assertEqual( + spw_output.data.humidity, self.expected_data['humidity']) + + def test_input_data_without_metadata(self): + """Test initialization of SPWOutput class without metadata.""" + input_data = { + 'temperature': [20.5], + 'pressure': [101.3], + 'humidity': 45 + } + spw_output = SPWOutput(self.point, input_data) + + self.assertEqual( + spw_output.data.temperature, input_data['temperature'][0]) + self.assertEqual(spw_output.data.pressure, input_data['pressure'][0]) + self.assertEqual(spw_output.data.humidity, input_data['humidity']) + + def test_input_data_with_single_element_list(self): + """Test initialization of SPWOutput class for single element.""" + input_data = { + 'temperature': [20.5], + 'humidity': 45 + } + spw_output = SPWOutput(self.point, input_data) + + self.assertEqual( + spw_output.data.temperature, input_data['temperature'][0]) + self.assertEqual(spw_output.data.humidity, input_data['humidity']) + + def test_input_data_with_multiple_element_list(self): + """Test initialization of SPWOutput class for list.""" + input_data = { + 'temperature': [20.5, 21.0], + 'humidity': 45 + } + spw_output = SPWOutput(self.point, input_data) + + self.assertEqual( + spw_output.data.temperature, input_data['temperature']) + self.assertEqual(spw_output.data.humidity, input_data['humidity']) + + +class TestSPWFetchDataFunctions(TestCase): + """Test SPW fetch data functions.""" + + def setUp(self): + """Set test fetch data functions.""" + TomorrowIODatasetReader.init_provider() + self.dataset = Dataset.objects.filter( + provider__name='Tomorrow.io' + ).first() + self.location_input = DatasetReaderInput.from_point(Point(0, 0)) + attr1 = DatasetAttribute.objects.filter( + source='rainAccumulationSum', + dataset=self.dataset + ).first() + attr2 = DatasetAttribute.objects.filter( + source='evapotranspirationSum', + dataset=self.dataset + ).first() + self.attrs = [attr1, attr2] + self.dt_now = datetime.now(tz=pytz.UTC).replace(microsecond=0) + self.start_dt = self.dt_now - timedelta(days=10) + self.end_dt = self.dt_now + + @patch.object(TomorrowIODatasetReader, 'read') + @patch.object(TomorrowIODatasetReader, 'get_data_values') + def test_fetch_timelines_data(self, mocked_get_data_values, mocked_read): + """Test fetch timelines data for SPW.""" + mocked_read.side_effect = MagicMock() + mocked_get_data_values.return_value = ( + DatasetReaderValue(self.location_input.point, [ + DatasetTimelineValue( + datetime(2023, 7, 20), + { + 'total_evapotranspiration_flux': 10, + 'total_rainfall': 5 + } + ) + ]) + ) + result = _fetch_timelines_data( + self.location_input, self.attrs, self.start_dt, self.end_dt) + expected_result = { + '07-20': { + 'date': '2023-07-20', + 'evapotranspirationSum': 10, + 'rainAccumulationSum': 5 + } + } + self.assertEqual(result, expected_result) + + + @patch.object(TomorrowIODatasetReader, 'read') + @patch.object(TomorrowIODatasetReader, 'get_data_values') + def test_fetch_ltn_data(self, mocked_get_data_values, mocked_read): + """Test fetch ltn data for SPW.""" + mocked_read.side_effect = MagicMock() + mocked_get_data_values.return_value = ( + DatasetReaderValue(self.location_input.point, [ + DatasetTimelineValue( + datetime(2023, 7, 20), + {'total_evapotranspiration_flux': 8, 'total_rainfall': 3}) + ]) + ) + # Initial historical data + historical_dict = { + '07-20': { + 'date': '2023-07-20', + 'evapotranspirationSum': 10, + 'rainAccumulationSum': 5 + } + } + result = _fetch_ltn_data( + self.location_input, self.attrs, + self.start_dt, self.end_dt, historical_dict) + expected_result = { + '07-20': { + 'date': '2023-07-20', + 'evapotranspirationSum': 10, + 'rainAccumulationSum': 5, + 'LTNPET': 8, + 'LTNPrecip': 3 + } + } + self.assertEqual(result, expected_result) + + +class TestSPWGenerator(TestCase): + """Test SPW Generator functions.""" + + def setUp(self): + """Set the test class.""" + self.dt_now = datetime.now(tz=pytz.UTC).replace(microsecond=0) + self.location_input = DatasetReaderInput.from_point(Point(0, 0)) + self.r_model = RModelFactory.create(name='test') + + @patch('spw.generator.execute_spw_model') + @patch('spw.generator._fetch_timelines_data') + @patch('spw.generator._fetch_ltn_data') + def test_calculate_from_point( + self, mock_fetch_ltn_data, mock_fetch_timelines_data, + mock_execute_spw_model): + """Test calculate_from_point function.""" + mock_fetch_ltn_data.return_value = { + '07-20': { + 'date': '2023-07-20', + 'evapotranspirationSum': 10, + 'rainAccumulationSum': 5, + 'LTNPET': 8, + 'LTNPrecip': 3 + } + } + mock_fetch_timelines_data.return_value = { + '07-20': { + 'date': '2023-07-20', + 'evapotranspirationSum': 10, + 'rainAccumulationSum': 5 + } + } + r_data = { + 'metadata': { + 'test': 'abcdef' + }, + 'goNoGo': ['Do not plant Tier 1a'], + 'nearDaysLTNPercent': [10.0], + 'nearDaysCurPercent': [60.0], + } + mock_execute_spw_model.return_value = (True, r_data) + + output = calculate_from_point(self.location_input.point) + mock_fetch_ltn_data.assert_called_once() + mock_fetch_timelines_data.assert_called_once() + mock_execute_spw_model.assert_called_once() + self.assertEqual(output.data.goNoGo, r_data['goNoGo'][0]) + self.assertEqual( + output.data.nearDaysLTNPercent, r_data['nearDaysLTNPercent'][0]) + self.assertEqual( + output.data.nearDaysCurPercent, r_data['nearDaysCurPercent'][0]) + # find RModelExecutionLog + log = RModelExecutionLog.objects.filter( + model=self.r_model, + location_input=self.location_input.point + ).first() + self.assertTrue(log) + self.assertTrue(log.input_file) + self.assertTrue(log.output) + self.assertEqual(log.status, RModelExecutionStatus.SUCCESS) diff --git a/django_project/spw/tests/test_plumber.py b/django_project/spw/tests/test_plumber.py new file mode 100644 index 00000000..d7b9942f --- /dev/null +++ b/django_project/spw/tests/test_plumber.py @@ -0,0 +1,168 @@ +# coding=utf-8 +""" +Tomorrow Now GAP. + +.. note:: UnitTest for Plumber functions. +""" +import os +import mock +import requests_mock +from django.test import TestCase + +from spw.utils.plumber import ( + write_plumber_file, + write_plumber_data, + remove_plumber_data, + PLUMBER_PORT, + plumber_health_check, + kill_r_plumber_process, + spawn_r_plumber, + execute_spw_model +) +from spw.utils.process import write_pidfile +from spw.factories import RModelFactory, RModelOutputFactory + + +def mocked_os_kill(self, *args, **kwargs): + """Mock os.kill function.""" + return 1 + + +def find_r_line_code(lines, code): + """Find code in lines.""" + filtered = [line for line in lines if code in line] + return len(filtered) > 0 + + +class DummyProcess: + """Class for dummy process result.""" + + def __init__(self, pid): + """Initialize class with dummy pid.""" + self.pid = pid + + +def mocked_process(*args, **kwargs): + """Mock a subprocess.""" + return DummyProcess(1) + + +class TestPlumberUtils(TestCase): + """Test plumber utility functions.""" + + def test_plumber_health_check(self): + """Test plumber health check.""" + with requests_mock.Mocker() as m: + json_response = {'echo': 'ok'} + m.get( + f'http://0.0.0.0:{PLUMBER_PORT}/statistical/echo', + json=json_response, + headers={'Content-Type': 'application/json'}, + status_code=200 + ) + is_running = plumber_health_check(max_retry=1) + self.assertTrue(is_running) + with requests_mock.Mocker() as m: + json_response = {'echo': 'ok'} + m.get( + f'http://0.0.0.0:{PLUMBER_PORT}/statistical/echo', + json=json_response, + headers={'Content-Type': 'application/json'}, + status_code=400 + ) + is_running = plumber_health_check(max_retry=1) + self.assertFalse(is_running) + + @mock.patch('subprocess.Popen', + mock.Mock(side_effect=mocked_process)) + def test_spawn_r_plumber(self): + """Test spawn new R plumber process.""" + with requests_mock.Mocker() as m: + json_response = {'echo': 'ok'} + m.get( + f'http://0.0.0.0:{PLUMBER_PORT}/statistical/echo', + json=json_response, + headers={'Content-Type': 'application/json'}, + status_code=200 + ) + process = spawn_r_plumber() + self.assertEqual(process.pid, 1) + + @mock.patch('os.kill') + def test_kill_r_plumber_process(self, mocked_os): + """Test killing running R Plumber Procces.""" + mocked_os.side_effect = mocked_os_kill + pid_path = '/tmp/plumber.pid' + write_pidfile(26, pid_path) + kill_r_plumber_process() + self.assertEqual(mocked_os.call_count, 1) + + def test_execute_spw_model(self): + """Test execute SPW R Mode.""" + data_filepath = '/tmp/test.csv' + with requests_mock.Mocker() as m: + json_response = {'national_trend': 'abcde'} + m.post( + f'http://plumber:{PLUMBER_PORT}/spw/generic', + json=json_response, + headers={'Content-Type': 'application/json'}, + status_code=200 + ) + is_success, response = execute_spw_model( + data_filepath, 'test.csv', 0.0, 0.0) + self.assertTrue(is_success) + self.assertEqual(response, json_response) + with requests_mock.Mocker() as m: + json_response = {'error': 'Internal server error'} + m.post( + f'http://plumber:{PLUMBER_PORT}/spw/generic', + json=json_response, + headers={'Content-Type': 'application/json'}, + status_code=500 + ) + is_success, response = execute_spw_model( + data_filepath, 'test.csv', 0.0, 0.0) + self.assertFalse(is_success) + self.assertEqual('Internal server error', response['error']) + with requests_mock.Mocker() as m: + data_response = 'Test' + m.post( + f'http://plumber:{PLUMBER_PORT}/spw/generic', + json=data_response, + headers={'Content-Type': 'text/plain'}, + status_code=500 + ) + is_success, response = execute_spw_model( + data_filepath, 'test.csv') + self.assertFalse(is_success) + self.assertEqual('Invalid response content type: text/plain', + response) + + def test_write_plumber_file(self): + """Test writing plumber R file.""" + r_model = RModelFactory.create() + RModelOutputFactory.create(model=r_model) + r_file_path = write_plumber_file( + os.path.join( + '/tmp', + 'plumber_test.R' + ) + ) + with open(r_file_path, 'r') as f: + lines = f.readlines() + self.assertTrue(find_r_line_code(lines, '@get /statistical/echo')) + if os.path.exists(r_file_path): + os.remove(r_file_path) + + def test_manage_plumber_data(self): + """Test manage plumber data files.""" + headers = ['data', 'count_total'] + csv_data = [ + ['abc', 10], + ['def', 20] + ] + file_path = write_plumber_data( + headers, csv_data, '/tmp') + self.assertTrue(os.path.exists(file_path)) + remove_plumber_data(file_path) + self.assertFalse(os.path.exists(file_path)) diff --git a/django_project/spw/tests/test_utils_process.py b/django_project/spw/tests/test_utils_process.py new file mode 100644 index 00000000..84fbc198 --- /dev/null +++ b/django_project/spw/tests/test_utils_process.py @@ -0,0 +1,33 @@ +# coding=utf-8 +""" +Tomorrow Now GAP. + +.. note:: UnitTest for Process functions. +""" +import os +from django.test import TestCase +import mock +from spw.utils.process import ( + write_pidfile, + kill_process_by_pid +) + + +class TestUtilsProcess(TestCase): + """Test for process utility functions.""" + + @staticmethod + def mocked_os_kill(self, *args, **kwargs): + """Mock os.kill method.""" + return 1 + + @mock.patch('os.kill') + def test_kill_process_by_pid(self, mocked_os): + """Test kill process by pid.""" + mocked_os.side_effect = TestUtilsProcess.mocked_os_kill + pid_path = '/tmp/test.pid' + write_pidfile(26, pid_path) + self.assertTrue(os.path.exists(pid_path)) + kill_process_by_pid(pid_path) + self.assertEqual(mocked_os.call_count, 1) + self.assertFalse(os.path.exists(pid_path)) diff --git a/django_project/spw/utils/__init__.py b/django_project/spw/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/django_project/spw/utils/plumber.py b/django_project/spw/utils/plumber.py new file mode 100644 index 00000000..1c34252c --- /dev/null +++ b/django_project/spw/utils/plumber.py @@ -0,0 +1,227 @@ +# coding=utf-8 +""" +Tomorrow Now GAP. + +.. note:: Plumber functions. +""" +import logging +import traceback +import os +import time +import csv +import subprocess +import requests +from uuid import uuid4 + +from core.settings.utils import absolute_path +from spw.models import RModel, RModelOutput +from spw.utils.process import ( + write_pidfile, + kill_process_by_pid +) + + +logger = logging.getLogger(__name__) +PLUMBER_PORT = os.getenv('PLUMBER_PORT', 8282) + + +def plumber_health_check(max_retry=5): + """ + Check whether API is up and running. + + This will be called from worker. + :param max_retry: maximum retry of checking + :return: True if successful number of check is less than max_retry + """ + request_url = f'http://0.0.0.0:{PLUMBER_PORT}/statistical/echo' + retry = 0 + req = None + time.sleep(1) + while (req is None or req.status_code != 200) and retry < max_retry: + try: + req = requests.get(request_url) + if req.status_code != 200: + time.sleep(2) + else: + break + except Exception as ex: # noqa + logger.error(ex) + time.sleep(2) + retry += 1 + if retry < max_retry: + logger.info('Plumber API is up and running!') + return retry < max_retry + + +def spawn_r_plumber(): + """Run a Plumber API server.""" + command_list = ( + [ + 'R', + '-e', + ( + "pr <- plumber::plumb(" + f"'/home/web/plumber_data/plumber.R'); " + f"args <- list(host = '0.0.0.0', port = {PLUMBER_PORT}); " + "do.call(pr$run, args)" + ) + ] + ) + logger.info('Starting plumber API') + process = None + try: + # redirect stdout and stderr + with open('/proc/1/fd/1', 'w') as fd: + process = subprocess.Popen( + command_list, + stdout=fd, + stderr=fd + ) + # sleep for 10 seconds to wait the API is up + time.sleep(10) + # we can also use polling to echo endpoint for health check + plumber_health_check() + # write process pid to /tmp/ + write_pidfile(process.pid, '/tmp/plumber.pid') + except Exception as ex: # noqa + logger.error(ex) + logger.error(traceback.format_exc()) + if process: + process.terminate() + process = None + return process + + +def kill_r_plumber_process(): + """Kill plumber process by PID stored in file.""" + pid_path = os.path.join( + '/', + 'tmp', + 'plumber.pid' + ) + kill_process_by_pid(pid_path) + + +def execute_spw_model( + data_url: str, filename: str, lat: float = 0.0, lon: float = 0.0, + place_name: str = None): + """Execute SPW model given the data_filepath. + + :param data_url: CSV file URL containing the data. + :type data_url: str + :param filename: CSV filename + :type filename: str + :param lat: Latitude of data query + :type lat: float + :param lon: Longitude of data query + :type lon: float + :param place_name: Place name (optional) + :type place_name: str + :return: dictionary of spw model output + :rtype: dict + """ + request_url = f'http://plumber:{PLUMBER_PORT}/spw/generic' + data = { + 'data_url': data_url, + 'filename': filename, + 'lat': lat, + 'lon': lon, + 'place_name': place_name if place_name else 'Default' + } + response = requests.post(request_url, data=data) + content_type = response.headers['Content-Type'] + error = None + if content_type == 'application/json': + if response.status_code == 200: + return True, response.json() + else: + logger.error( + f'Plumber error response: {str(response.json())}') + error = response.json() + else: + logger.error(f'Invalid response content type: {content_type}') + error = f'Invalid response content type: {content_type}' + return False, error + + +def write_plumber_file(file_path = None): + """Write R codes to plumber.R.""" + r_file_path = file_path if file_path else os.path.join( + '/home/web/plumber_data', + 'plumber.R' + ) + template_file = absolute_path( + 'spw', 'utils', 'plumber_template.R' + ) + with open(template_file, 'r') as f: + lines = f.readlines() + model = RModel.objects.order_by('-version').first() + if model: + lines.append('\n') + lines.append('#* Generic Model\n') + lines.append('#* @post /spw/generic\n') + + lines.append('function(data_url, filename, lat, lon, place_name) {\n') + lines.append(f' metadata <- list(version={model.version}, ' + 'lat=lat, lon=lon, place_name=place_name, ' + 'generated_on=format(Sys.time(), ' + '"%Y-%m-%d %H:%M:%S %Z"))\n') + lines.append(' time_start <- Sys.time()\n') + lines.append( + ' data_filename <- paste(\'/tmp/\', filename, sep="")\n') + lines.append(' download.file(data_url, data_filename)\n') + code_lines = model.code.splitlines() + for code in code_lines: + lines.append(f' {code}\n') + lines.append(' metadata[\'total_execution_time\'] ' + '<- Sys.time() - time_start\n') + lines.append('unlink(data_filename)\n') + # add output + model_outputs = RModelOutput.objects.filter( + model=model + ) + output_list = ['metadata=metadata'] + for output in model_outputs: + output_list.append(f'{output.type}={output.variable_name}') + output_list_str = ','.join(output_list) + lines.append( + f' list({output_list_str})\n' + ) + lines.append('}\n') + with open(r_file_path, 'w') as f: + for line in lines: + f.write(line) + return r_file_path + + +def write_plumber_data(headers, csv_data, dir_path='/home/web/plumber_data'): + """ + Write csv data to file in plumber_data. + + :param headers: list of header name + :param csv_data: list of row + :return: file path of exported csv + """ + r_data_path = os.path.join( + dir_path, + f'{str(uuid4())}.csv' + ) + with open(r_data_path, 'w', encoding='UTF8') as f: + writer = csv.writer(f) + # write the header + writer.writerow(headers) + writer.writerows(csv_data) + return r_data_path + + +def remove_plumber_data(data_filepath): + """ + Remove csv data file. + + :param data_filepath: filepath to the csv file + """ + try: + if os.path.exists(data_filepath): + os.remove(data_filepath) + except Exception as ex: + logger.error(ex) diff --git a/django_project/spw/utils/plumber_template.R b/django_project/spw/utils/plumber_template.R new file mode 100644 index 00000000..ac634afa --- /dev/null +++ b/django_project/spw/utils/plumber_template.R @@ -0,0 +1,26 @@ +# plumber.R + +library(tidyverse) +library(tidygam) +library(mgcv) +library(ggpubr) +library(zoo) + +#* @plumber +function(pr) { + err_func <- function(req, res, err) { + print(err) + res$status <- 500 + return ( + list(error = paste('Unhandled exception: ', err$message), detail = as.character(err)) + ) + } + + plumber::pr_set_error(pr, err_func) +} + +#* Echo back the input +#* @get /statistical/echo +function() { + list(msg = paste0("Plumber is working!")) +} diff --git a/django_project/spw/utils/process.py b/django_project/spw/utils/process.py new file mode 100644 index 00000000..bb00285e --- /dev/null +++ b/django_project/spw/utils/process.py @@ -0,0 +1,65 @@ +# coding=utf-8 +""" +Tomorrow Now GAP. + +.. note:: Process management functions. +""" +import os +import logging +from signal import SIGKILL + + +logger = logging.getLogger(__name__) + + +def write_pidfile(pid, pidfile_path): + """Write pid to file.""" + with open(pidfile_path, 'w', encoding='utf-8') as f: + f.write(str(pid)) + + +def read_pid_from_pidfile(pidfile_path): + """Read the PID recorded in the named PID file. + + Read and return the numeric PID recorded as text in the named + PID file. If the PID file cannot be read, or if the content is + not a valid PID, return ``None``. + + """ + pid = None + try: + pidfile = open(pidfile_path, 'r') + except IOError as ex: + logger.error(ex) + else: + # According to the FHS 2.3 section on PID files in /var/run: + # + # The file must consist of the process identifier in + # ASCII-encoded decimal, followed by a newline character. + # + # Programs that read PID files should be somewhat flexible + # in what they accept; i.e., they should ignore extra + # whitespace, leading zeroes, absence of the trailing + # newline, or additional lines in the PID file. + + line = pidfile.readline().strip() + try: + pid = int(line) + except ValueError as ex: + logger.error(ex) + pidfile.close() + + return pid + + +def kill_process_by_pid(pidfile_path): + """Kill process by PID.""" + plumber_pid = read_pid_from_pidfile(pidfile_path) + logger.info(f'Killing pid {plumber_pid}') + if plumber_pid: + # kill a process via pid + os.kill(plumber_pid, SIGKILL) + try: + os.remove(pidfile_path) + except IOError as ex: + logger.error(ex) diff --git a/docs/src/developer/data-model.md b/docs/src/developer/data-model.md index bb9bbd7d..b5d077f5 100644 --- a/docs/src/developer/data-model.md +++ b/docs/src/developer/data-model.md @@ -12,7 +12,11 @@ license: This program is free software; you can redistribute it and/or modify it # Data model diagram -## Ground observations +## GAP -![database design](./diagram/ground-observations-database-design-1.png) +![database design gap](./diagram/ground-observations-database-design-1.png) + +## SPW + +![database design spw](./diagram/ground-observations-database-design-2.png) diff --git a/docs/src/developer/diagram/ground-observations-database-design-1.png b/docs/src/developer/diagram/ground-observations-database-design-1.png index f5c15cd7..2fd0708d 100644 Binary files a/docs/src/developer/diagram/ground-observations-database-design-1.png and b/docs/src/developer/diagram/ground-observations-database-design-1.png differ diff --git a/docs/src/developer/diagram/ground-observations-database-design-2.png b/docs/src/developer/diagram/ground-observations-database-design-2.png new file mode 100644 index 00000000..fcb02d8f Binary files /dev/null and b/docs/src/developer/diagram/ground-observations-database-design-2.png differ diff --git a/docs/src/developer/diagram/solution-design-app-0018.drawio b/docs/src/developer/diagram/solution-design-app-0018.drawio index ba5101d9..631c19fa 100644 --- a/docs/src/developer/diagram/solution-design-app-0018.drawio +++ b/docs/src/developer/diagram/solution-design-app-0018.drawio @@ -1,4 +1,4 @@ - + @@ -85,12 +85,12 @@ - + - + @@ -138,10 +138,10 @@ - + - + @@ -183,7 +183,7 @@ - + @@ -277,7 +277,7 @@ - + @@ -345,7 +345,7 @@ - + @@ -418,14 +418,14 @@ - - + + - + @@ -500,13 +500,13 @@ - - + + - + @@ -552,13 +552,13 @@ - - + + - + @@ -600,7 +600,7 @@ - + @@ -670,12 +670,12 @@ - + - + @@ -683,18 +683,31 @@ - + - + + + + + + + + + + + + + + - + @@ -761,8 +774,8 @@ - - + + @@ -832,24 +845,36 @@ - + - + - + + + + + + + + + + + + + + - - - + + @@ -870,8 +895,8 @@ - - + + @@ -880,10 +905,10 @@ - - - - + + + + @@ -892,86 +917,455 @@ - - - - + + + + - + - - + + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +