diff --git a/django_project/core/celery.py b/django_project/core/celery.py index f700a10c..e7547373 100644 --- a/django_project/core/celery.py +++ b/django_project/core/celery.py @@ -32,6 +32,9 @@ # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') +# use max task = 1 to avoid memory leak from numpy/ingestor +app.conf.worker_max_tasks_per_child = 1 + # Load task modules from all registered Django app configs. app.autodiscover_tasks() @@ -44,9 +47,14 @@ app.conf.beat_schedule = { 'generate-crop-plan': { 'task': 'generate_crop_plan', - # Run everyday at 2am East Africa Time or 23:00 UTC + # Run everyday at 01:30 UTC or 04:30 EAT 'schedule': crontab(minute='30', hour='1'), }, + 'salient-collector-session': { + 'task': 'salient_collector_session', + # Run everyday at 6am East Africa Time or 02:00 UTC + 'schedule': crontab(minute='0', hour='2'), + }, } diff --git a/django_project/gap/admin/preferences.py b/django_project/gap/admin/preferences.py index ec9df14e..5779d9b0 100644 --- a/django_project/gap/admin/preferences.py +++ b/django_project/gap/admin/preferences.py @@ -19,6 +19,7 @@ class PreferencesAdmin(admin.ModelAdmin): None, { 'fields': ( 'area_of_interest', + 'salient_area' ) } ), diff --git a/django_project/gap/ingestor/salient.py b/django_project/gap/ingestor/salient.py index 620f8889..552668c6 100644 --- a/django_project/gap/ingestor/salient.py +++ b/django_project/gap/ingestor/salient.py @@ -84,11 +84,9 @@ def _get_variable_list_config(self): def _get_coords(self): """Retrieve polygon coordinates.""" - # TODO: we need to verify on dev whether - # it's possible to request for whole GAP AoI return self.session.additional_config.get( 'coords', - list(Preferences.load().area_of_interest.coords[0]) + list(Preferences.load().salient_area.coords[0]) ) def _convert_forecast_date(self, date_str: str): diff --git a/django_project/gap/migrations/0017_preferences_salient_area.py b/django_project/gap/migrations/0017_preferences_salient_area.py new file mode 100644 index 00000000..e9f50c95 --- /dev/null +++ b/django_project/gap/migrations/0017_preferences_salient_area.py @@ -0,0 +1,20 @@ +# Generated by Django 4.2.7 on 2024-09-06 12:29 + +import django.contrib.gis.db.models.fields +from django.db import migrations +import gap.models.preferences + + +class Migration(migrations.Migration): + + dependencies = [ + ('gap', '0016_datasourcefile_is_latest_datasourcefile_metadata'), + ] + + operations = [ + migrations.AddField( + model_name='preferences', + name='salient_area', + field=django.contrib.gis.db.models.fields.PolygonField(default=gap.models.preferences.area_of_salient_default, help_text='Area that Salient collector will use to pull the data', srid=4326), + ), + ] diff --git a/django_project/gap/models/ingestor.py b/django_project/gap/models/ingestor.py index bec4c617..4d61e186 100644 --- a/django_project/gap/models/ingestor.py +++ b/django_project/gap/models/ingestor.py @@ -147,12 +147,18 @@ class IngestorSession(BaseSession): collectors = models.ManyToManyField(CollectorSession, blank=True) + def __init__(self, *args, trigger_task=True, **kwargs): + """Initialize IngestorSession class.""" + super().__init__(*args, **kwargs) + # Set the temporary attribute + self._trigger_task = trigger_task + def save(self, *args, **kwargs): """Override ingestor save.""" from gap.tasks import run_ingestor_session # noqa created = self.pk is None super(IngestorSession, self).save(*args, **kwargs) - if created: + if created and self._trigger_task: run_ingestor_session.delay(self.id) def _run(self, working_dir): diff --git a/django_project/gap/models/preferences.py b/django_project/gap/models/preferences.py index 2468ea4e..380e8cac 100644 --- a/django_project/gap/models/preferences.py +++ b/django_project/gap/models/preferences.py @@ -33,11 +33,29 @@ def area_of_interest_default(): return Polygon(coordinates) +def area_of_salient_default(): + """Return polygon default for salient collector.""" + coordinates = [ + (41.89, 3.98), + (35.08, 4.87), + (30.92, 3.57), + (28.66, -2.48), + (31.13, -8.62), + (34.6, -11.74), + (40.65, -10.68), + (39.34, -4.73), + (41.56, -1.64), + (41.9, 3.98), + (41.89, 3.98) + ] + return Polygon(coordinates) + + def crop_plan_config_default() -> dict: """Return dictionary for crop plan config.""" return { 'lat_lon_decimal_digits': -1, - 'tz': '+02:00' # East Africa Time + 'tz': '+03:00' # East Africa Time } @@ -53,6 +71,12 @@ class Preferences(SingletonModel): blank=True ) + # salient config + salient_area = models.PolygonField( + srid=4326, default=area_of_salient_default, + help_text='Area that Salient collector will use to pull the data' + ) + # Documentations documentation_url = models.URLField( default='https://kartoza.github.io/tomorrownow_gap/', diff --git a/django_project/gap/tasks/ingestor.py b/django_project/gap/tasks/ingestor.py index 21afca4e..1c7df025 100644 --- a/django_project/gap/tasks/ingestor.py +++ b/django_project/gap/tasks/ingestor.py @@ -8,7 +8,16 @@ from celery.utils.log import get_task_logger from core.celery import app -from gap.models.ingestor import IngestorSession, CollectorSession, IngestorType +from gap.models.dataset import ( + Dataset, + DatasetStore, + DataSourceFile +) +from gap.models.ingestor import ( + IngestorType, + IngestorSession, + CollectorSession +) logger = get_task_logger(__name__) @@ -33,22 +42,37 @@ def run_collector_session(_id: int): logger.error('Collector Session {} does not exists'.format(_id)) -@app.task(name='cbam_collector_session') -def run_cbam_collector_session(): - """Run Collector for CBAM Dataset.""" - session = CollectorSession.objects.create( - ingestor_type=IngestorType.CBAM - ) - session.run() - if session.dataset_files.count() > 0: - # create ingestor session to convert into zarr - IngestorSession.objects.create( - ingestor_type=IngestorType.CBAM, - collector=session - ) - - @app.task(name='salient_collector_session') def run_salient_collector_session(): """Run Collector for Salient Dataset.""" - pass + dataset = Dataset.objects.get(name='Salient Seasonal Forecast') + # create the collector object + collector_session = CollectorSession.objects.create( + ingestor_type=IngestorType.SALIENT + ) + # run collector + collector_session.run() + + # if success, create ingestor session + collector_session.refresh_from_db() + total_file = collector_session.dataset_files.count() + if total_file > 0: + # find latest DataSourceFile + data_source = DataSourceFile.objects.filter( + dataset=dataset, + format=DatasetStore.ZARR, + is_latest=True + ).last() + additional_conf = {} + if data_source: + additional_conf = { + 'datasourcefile_id': data_source.id, + 'datasourcefile_zarr_exists': True + } + session = IngestorSession.objects.create( + ingestor_type=IngestorType.SALIENT, + trigger_task=False, + additional_config=additional_conf + ) + session.collectors.add(collector_session) + run_ingestor_session.delay(session.id) diff --git a/django_project/gap/tests/crop_insight/test_crop_insight_model.py b/django_project/gap/tests/crop_insight/test_crop_insight_model.py index c6e132f2..c128f03e 100644 --- a/django_project/gap/tests/crop_insight/test_crop_insight_model.py +++ b/django_project/gap/tests/crop_insight/test_crop_insight_model.py @@ -59,7 +59,7 @@ def test_title(self): self.assertEqual( obj.title, ( "GAP - Crop Plan Generator Results - " - "Monday-01-01-2024 (UTC+02:00)" + "Monday-01-01-2024 (UTC+03:00)" ) ) @@ -72,7 +72,7 @@ def test_title(self): self.assertEqual( obj.title, ( "GAP - Crop Plan Generator Results - " - "Tuesday-02-01-2024 (UTC+02:00)" + "Tuesday-02-01-2024 (UTC+03:00)" ) ) @@ -85,19 +85,19 @@ def test_title(self): self.assertEqual( obj.title, ( "GAP - Crop Plan Generator Results - " - "Tuesday-02-01-2024 (UTC+02:00)" + "Tuesday-02-01-2024 (UTC+03:00)" ) ) # Change to before 21, so east time should today obj.requested_at = datetime( - 2024, 1, 1, 21, 0, 0, + 2024, 1, 1, 20, 0, 0, tzinfo=timezone.utc ) obj.save() self.assertEqual( obj.title, ( "GAP - Crop Plan Generator Results - " - "Monday-01-01-2024 (UTC+02:00)" + "Monday-01-01-2024 (UTC+03:00)" ) ) diff --git a/django_project/gap/tests/ingestor/test_cbam.py b/django_project/gap/tests/ingestor/test_cbam.py index 1d129050..095fceed 100644 --- a/django_project/gap/tests/ingestor/test_cbam.py +++ b/django_project/gap/tests/ingestor/test_cbam.py @@ -20,7 +20,6 @@ IngestorSessionStatus ) from gap.ingestor.cbam import CBAMIngestor -from gap.tasks.ingestor import run_cbam_collector_session from gap.factories import DataSourceFileFactory from gap.utils.netcdf import find_start_latlng @@ -136,15 +135,6 @@ def test_cbam_collector_cancel( mock_fs.walk.assert_called_with('s3://test_bucket/cbam') self.assertEqual(collector.dataset_files.count(), 0) - @patch.object(CollectorSession, 'run') - def test_run_cbam_collector_session(self, mocked_run): - """Test task to run cbam collector session.""" - run_cbam_collector_session() - mocked_run.assert_called_once() - self.assertFalse(IngestorSession.objects.filter( - ingestor_type=IngestorType.CBAM - ).exists()) - class CBAMIngestorTest(CBAMIngestorBaseTest): """CBAM ingestor test case.""" diff --git a/django_project/gap/tests/ingestor/test_salient.py b/django_project/gap/tests/ingestor/test_salient.py index 3bb8c13e..5ebee812 100644 --- a/django_project/gap/tests/ingestor/test_salient.py +++ b/django_project/gap/tests/ingestor/test_salient.py @@ -20,6 +20,7 @@ ) from gap.ingestor.salient import SalientIngestor, SalientCollector from gap.factories import DataSourceFileFactory +from gap.tasks.ingestor import run_salient_collector_session class SalientIngestorBaseTest(TestCase): @@ -145,6 +146,34 @@ def test_run_with_exception(self, mock_sk, mock_logger): self.collector.run() self.assertEqual(mock_logger.error.call_count, 2) + @patch('gap.models.ingestor.CollectorSession.dataset_files') + @patch('gap.models.ingestor.CollectorSession.run') + @patch('gap.tasks.ingestor.run_ingestor_session.delay') + def test_run_salient_collector_session( + self, mock_ingestor, mock_collector, mock_count + ): + """Test run salient collector session.""" + mock_count.count.return_value = 0 + run_salient_collector_session() + # assert + mock_collector.assert_called_once() + mock_ingestor.assert_not_called() + + mock_collector.reset_mock() + mock_ingestor.reset_mock() + # test with collector result + mock_count.count.return_value = 1 + run_salient_collector_session() + + # assert + session = IngestorSession.objects.filter( + ingestor_type=IngestorType.SALIENT, + ).last() + self.assertTrue(session) + self.assertEqual(session.collectors.count(), 1) + mock_collector.assert_called_once() + mock_ingestor.assert_called_once_with(session.id) + class TestSalientIngestor(SalientIngestorBaseTest): """Salient ingestor test case."""