Skip to content

Commit

Permalink
Add salient scheduler (#143)
Browse files Browse the repository at this point in the history
* set worker_max_tasks_per_child to 1

* add scheduler to run salient ingestor

* fix lint

* fix test title on crop insight
  • Loading branch information
danangmassandy committed Sep 6, 2024
1 parent 83fa5a4 commit 2c1be89
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 38 deletions.
10 changes: 9 additions & 1 deletion django_project/core/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# use max task = 1 to avoid memory leak from numpy/ingestor
app.conf.worker_max_tasks_per_child = 1

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

Expand All @@ -44,9 +47,14 @@
app.conf.beat_schedule = {
'generate-crop-plan': {
'task': 'generate_crop_plan',
# Run everyday at 2am East Africa Time or 23:00 UTC
# Run everyday at 01:30 UTC or 04:30 EAT
'schedule': crontab(minute='30', hour='1'),
},
'salient-collector-session': {
'task': 'salient_collector_session',
# Run everyday at 6am East Africa Time or 02:00 UTC
'schedule': crontab(minute='0', hour='2'),
},
}


Expand Down
1 change: 1 addition & 0 deletions django_project/gap/admin/preferences.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class PreferencesAdmin(admin.ModelAdmin):
None, {
'fields': (
'area_of_interest',
'salient_area'
)
}
),
Expand Down
4 changes: 1 addition & 3 deletions django_project/gap/ingestor/salient.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,9 @@ def _get_variable_list_config(self):

def _get_coords(self):
"""Retrieve polygon coordinates."""
# TODO: we need to verify on dev whether
# it's possible to request for whole GAP AoI
return self.session.additional_config.get(
'coords',
list(Preferences.load().area_of_interest.coords[0])
list(Preferences.load().salient_area.coords[0])
)

def _convert_forecast_date(self, date_str: str):
Expand Down
20 changes: 20 additions & 0 deletions django_project/gap/migrations/0017_preferences_salient_area.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Generated by Django 4.2.7 on 2024-09-06 12:29

import django.contrib.gis.db.models.fields
from django.db import migrations
import gap.models.preferences


class Migration(migrations.Migration):

dependencies = [
('gap', '0016_datasourcefile_is_latest_datasourcefile_metadata'),
]

operations = [
migrations.AddField(
model_name='preferences',
name='salient_area',
field=django.contrib.gis.db.models.fields.PolygonField(default=gap.models.preferences.area_of_salient_default, help_text='Area that Salient collector will use to pull the data', srid=4326),
),
]
8 changes: 7 additions & 1 deletion django_project/gap/models/ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,18 @@ class IngestorSession(BaseSession):

collectors = models.ManyToManyField(CollectorSession, blank=True)

def __init__(self, *args, trigger_task=True, **kwargs):
"""Initialize IngestorSession class."""
super().__init__(*args, **kwargs)
# Set the temporary attribute
self._trigger_task = trigger_task

def save(self, *args, **kwargs):
"""Override ingestor save."""
from gap.tasks import run_ingestor_session # noqa
created = self.pk is None
super(IngestorSession, self).save(*args, **kwargs)
if created:
if created and self._trigger_task:
run_ingestor_session.delay(self.id)

def _run(self, working_dir):
Expand Down
26 changes: 25 additions & 1 deletion django_project/gap/models/preferences.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,29 @@ def area_of_interest_default():
return Polygon(coordinates)


def area_of_salient_default():
"""Return polygon default for salient collector."""
coordinates = [
(41.89, 3.98),
(35.08, 4.87),
(30.92, 3.57),
(28.66, -2.48),
(31.13, -8.62),
(34.6, -11.74),
(40.65, -10.68),
(39.34, -4.73),
(41.56, -1.64),
(41.9, 3.98),
(41.89, 3.98)
]
return Polygon(coordinates)


def crop_plan_config_default() -> dict:
"""Return dictionary for crop plan config."""
return {
'lat_lon_decimal_digits': -1,
'tz': '+02:00' # East Africa Time
'tz': '+03:00' # East Africa Time
}


Expand All @@ -53,6 +71,12 @@ class Preferences(SingletonModel):
blank=True
)

# salient config
salient_area = models.PolygonField(
srid=4326, default=area_of_salient_default,
help_text='Area that Salient collector will use to pull the data'
)

# Documentations
documentation_url = models.URLField(
default='https://kartoza.github.io/tomorrownow_gap/',
Expand Down
58 changes: 41 additions & 17 deletions django_project/gap/tasks/ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,16 @@
from celery.utils.log import get_task_logger

from core.celery import app
from gap.models.ingestor import IngestorSession, CollectorSession, IngestorType
from gap.models.dataset import (
Dataset,
DatasetStore,
DataSourceFile
)
from gap.models.ingestor import (
IngestorType,
IngestorSession,
CollectorSession
)

logger = get_task_logger(__name__)

Expand All @@ -33,22 +42,37 @@ def run_collector_session(_id: int):
logger.error('Collector Session {} does not exists'.format(_id))


@app.task(name='cbam_collector_session')
def run_cbam_collector_session():
"""Run Collector for CBAM Dataset."""
session = CollectorSession.objects.create(
ingestor_type=IngestorType.CBAM
)
session.run()
if session.dataset_files.count() > 0:
# create ingestor session to convert into zarr
IngestorSession.objects.create(
ingestor_type=IngestorType.CBAM,
collector=session
)


@app.task(name='salient_collector_session')
def run_salient_collector_session():
"""Run Collector for Salient Dataset."""
pass
dataset = Dataset.objects.get(name='Salient Seasonal Forecast')
# create the collector object
collector_session = CollectorSession.objects.create(
ingestor_type=IngestorType.SALIENT
)
# run collector
collector_session.run()

# if success, create ingestor session
collector_session.refresh_from_db()
total_file = collector_session.dataset_files.count()
if total_file > 0:
# find latest DataSourceFile
data_source = DataSourceFile.objects.filter(
dataset=dataset,
format=DatasetStore.ZARR,
is_latest=True
).last()
additional_conf = {}
if data_source:
additional_conf = {
'datasourcefile_id': data_source.id,
'datasourcefile_zarr_exists': True
}
session = IngestorSession.objects.create(
ingestor_type=IngestorType.SALIENT,
trigger_task=False,
additional_config=additional_conf
)
session.collectors.add(collector_session)
run_ingestor_session.delay(session.id)
10 changes: 5 additions & 5 deletions django_project/gap/tests/crop_insight/test_crop_insight_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def test_title(self):
self.assertEqual(
obj.title, (
"GAP - Crop Plan Generator Results - "
"Monday-01-01-2024 (UTC+02:00)"
"Monday-01-01-2024 (UTC+03:00)"
)
)

Expand All @@ -72,7 +72,7 @@ def test_title(self):
self.assertEqual(
obj.title, (
"GAP - Crop Plan Generator Results - "
"Tuesday-02-01-2024 (UTC+02:00)"
"Tuesday-02-01-2024 (UTC+03:00)"
)
)

Expand All @@ -85,19 +85,19 @@ def test_title(self):
self.assertEqual(
obj.title, (
"GAP - Crop Plan Generator Results - "
"Tuesday-02-01-2024 (UTC+02:00)"
"Tuesday-02-01-2024 (UTC+03:00)"
)
)

# Change to before 21, so east time should today
obj.requested_at = datetime(
2024, 1, 1, 21, 0, 0,
2024, 1, 1, 20, 0, 0,
tzinfo=timezone.utc
)
obj.save()
self.assertEqual(
obj.title, (
"GAP - Crop Plan Generator Results - "
"Monday-01-01-2024 (UTC+02:00)"
"Monday-01-01-2024 (UTC+03:00)"
)
)
10 changes: 0 additions & 10 deletions django_project/gap/tests/ingestor/test_cbam.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
IngestorSessionStatus
)
from gap.ingestor.cbam import CBAMIngestor
from gap.tasks.ingestor import run_cbam_collector_session
from gap.factories import DataSourceFileFactory
from gap.utils.netcdf import find_start_latlng

Expand Down Expand Up @@ -136,15 +135,6 @@ def test_cbam_collector_cancel(
mock_fs.walk.assert_called_with('s3://test_bucket/cbam')
self.assertEqual(collector.dataset_files.count(), 0)

@patch.object(CollectorSession, 'run')
def test_run_cbam_collector_session(self, mocked_run):
"""Test task to run cbam collector session."""
run_cbam_collector_session()
mocked_run.assert_called_once()
self.assertFalse(IngestorSession.objects.filter(
ingestor_type=IngestorType.CBAM
).exists())


class CBAMIngestorTest(CBAMIngestorBaseTest):
"""CBAM ingestor test case."""
Expand Down
29 changes: 29 additions & 0 deletions django_project/gap/tests/ingestor/test_salient.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from gap.ingestor.salient import SalientIngestor, SalientCollector
from gap.factories import DataSourceFileFactory
from gap.tasks.ingestor import run_salient_collector_session


class SalientIngestorBaseTest(TestCase):
Expand Down Expand Up @@ -145,6 +146,34 @@ def test_run_with_exception(self, mock_sk, mock_logger):
self.collector.run()
self.assertEqual(mock_logger.error.call_count, 2)

@patch('gap.models.ingestor.CollectorSession.dataset_files')
@patch('gap.models.ingestor.CollectorSession.run')
@patch('gap.tasks.ingestor.run_ingestor_session.delay')
def test_run_salient_collector_session(
self, mock_ingestor, mock_collector, mock_count
):
"""Test run salient collector session."""
mock_count.count.return_value = 0
run_salient_collector_session()
# assert
mock_collector.assert_called_once()
mock_ingestor.assert_not_called()

mock_collector.reset_mock()
mock_ingestor.reset_mock()
# test with collector result
mock_count.count.return_value = 1
run_salient_collector_session()

# assert
session = IngestorSession.objects.filter(
ingestor_type=IngestorType.SALIENT,
).last()
self.assertTrue(session)
self.assertEqual(session.collectors.count(), 1)
mock_collector.assert_called_once()
mock_ingestor.assert_called_once_with(session.id)


class TestSalientIngestor(SalientIngestorBaseTest):
"""Salient ingestor test case."""
Expand Down

0 comments on commit 2c1be89

Please sign in to comment.