Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tahmo ingestor #17

Merged
merged 11 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deployment/docker-compose.override.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,6 @@ services:
links:
- db
- redis
- worker
- celery_beat
entrypoint: [ ]
2 changes: 1 addition & 1 deletion deployment/docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ services:
- DATABASE_PASSWORD=docker
- DATABASE_HOST=db
- RABBITMQ_HOST=rabbitmq
- DJANGO_SETTINGS_MODULE=core.settings.dev
- DJANGO_SETTINGS_MODULE=core.settings.test
- SECRET_KEY=SECRET_KEY

# Redis config
Expand Down
15 changes: 4 additions & 11 deletions deployment/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ volumes:
nginx-cache:
backups-data:
data-volume:
redis-data:

x-common-django:
&default-common-django
Expand Down Expand Up @@ -40,6 +41,8 @@ services:
image: bitnami/redis:7.0.2
environment:
- REDIS_PASSWORD=${REDIS_PASSWORD:-redis_password}
volumes:
- redis-data:/bitnami/redis/data

db:
image: kartoza/postgis:14-3.3
Expand Down Expand Up @@ -94,14 +97,4 @@ services:
- media-data:/home/web/media
- nginx-cache:/home/web/nginx_cache
links:
- django

dev:
<<: *default-common-django
entrypoint: [ ]
volumes:
- static-data:/home/web/static
- media-data:/home/web/media
links:
- db
- redis
- django
2 changes: 1 addition & 1 deletion django_project/core/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
f'@{os.environ.get("REDIS_HOST", "")}',
)

app = Celery('georepo')
app = Celery('GAP')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
Expand Down
18 changes: 18 additions & 0 deletions django_project/core/settings/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# coding=utf-8
"""
Tomorrow Now GAP.

.. note:: Project level settings.
"""

from .prod import * # noqa

TEST_RUNNER = 'core.tests.runner.CustomTestRunner'
DEBUG = True

# Disable caching while in development
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.dummy.DummyCache',
}
}
33 changes: 33 additions & 0 deletions django_project/core/tests/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# coding=utf-8
"""
Tomorrow Now GAP.

.. note:: Test runner.
"""

from core.celery import app as celery_app
from django.conf import settings
from django.test.runner import DiscoverRunner


class CustomTestRunner(DiscoverRunner):
"""Postgres schema test runner."""

@staticmethod
def __disable_celery():
"""Disabling celery."""
settings.CELERY_BROKER_URL = \
celery_app.conf.BROKER_URL = 'filesystem:///dev/null/'
celery_app.conf.task_always_eager = True
data = {
'data_folder_in': '/tmp',
'data_folder_out': '/tmp',
'data_folder_processed': '/tmp',
}
settings.BROKER_TRANSPORT_OPTIONS = \
celery_app.conf.BROKER_TRANSPORT_OPTIONS = data

def setup_test_environment(self, **kwargs):
"""Prepare test env."""
CustomTestRunner.__disable_celery()
super(CustomTestRunner, self).setup_test_environment(**kwargs)
14 changes: 12 additions & 2 deletions django_project/gap/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from django.contrib import admin

from .models import (
Attribute, Country, Provider, Measurement, Station
Attribute, Country, Provider, Measurement, Station, IngestorSession
)


Expand Down Expand Up @@ -46,7 +46,7 @@ class MeasurementAdmin(admin.ModelAdmin):
"""Measurement admin."""

list_display = (
'station', 'attribute', 'date', 'value'
'station', 'attribute', 'date_time', 'value'
)
list_filter = ('station', 'attribute')
search_fields = ('name',)
Expand All @@ -61,3 +61,13 @@ class StationAdmin(admin.ModelAdmin):
)
list_filter = ('provider', 'country')
search_fields = ('code', 'name')


@admin.register(IngestorSession)
class IngestorSessionAdmin(admin.ModelAdmin):
"""IngestorSession admin."""

list_display = (
'run_at', 'status', 'end_at', 'ingestor_type'
)
list_filter = ('ingestor_type', 'status')
5 changes: 4 additions & 1 deletion django_project/gap/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ class Meta: # noqa
name = factory.Sequence(
lambda n: f'station-{n}'
)
code = factory.Sequence(
lambda n: f'code-{n}'
)
country = factory.SubFactory(CountryFactory)
geometry = factory.LazyFunction(lambda: Point(0, 0))
provider = factory.SubFactory(ProviderFactory)
Expand All @@ -104,5 +107,5 @@ class Meta: # noqa

station = factory.SubFactory(StationFactory)
attribute = factory.SubFactory(AttributeFactory)
date = factory.Faker('date')
date_time = factory.Faker('date_time')
value = factory.Faker('pyfloat')
Empty file.
14 changes: 14 additions & 0 deletions django_project/gap/ingestor/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# coding=utf-8
"""
Tomorrow Now GAP.

.. note:: Exceptions for ingestor.
"""


class FileNotFoundException(Exception):
"""File not found."""

def __init__(self): # noqa
self.message = 'File not found.'
super().__init__(self.message)
178 changes: 178 additions & 0 deletions django_project/gap/ingestor/tahmo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# coding=utf-8
"""
Tomorrow Now GAP.

.. note:: Tahmo ingestor.
"""

import csv
import json
import os
import shutil
from datetime import datetime, timezone
from zipfile import ZipFile

from django.contrib.gis.geos import Point

from gap.ingestor.exceptions import FileNotFoundException
from gap.models import (
Provider, Station, ObservationType, Country, IngestorSession,
Attribute, Measurement
)


class TahmoVariable:
"""Contains Tahmo Variable."""

def __init__(self, name, unit=None):
"""Initialize the Tahmo variable."""
self.name = name
self.unit = unit


TAHMO_VARIABLES = {
'ap': TahmoVariable('Atmospheric pressure'),
'pr': TahmoVariable('Precipitation', 'mm'),
'rh': TahmoVariable('Relative humidity'),
'ra': TahmoVariable('Shortwave radiation', 'W/m2'),
'te': TahmoVariable('Surface air temperature', '°C'),
'wd': TahmoVariable('Wind direction', 'Degrees from North'),
'wg': TahmoVariable('Wind gust', 'm/s'),
'ws': TahmoVariable('Wind speed', 'm/s')
}


class TahmoIngestor:
"""Ingestor for tahmo data."""

def __init__(self, session: IngestorSession):
"""Initialize the ingestor."""
self.session = session

self.provider, _ = Provider.objects.get_or_create(
name='Tahmo'
)
self.obs_type, _ = ObservationType.objects.get_or_create(
name='Ground Observations'
)

def _run(self, dir_path):
"""Run the ingestor."""
# Data is coming from CSV.
# CSV headers:
# - longitude
# - latitude
# - station code
# - name

# INGEST STATIONS
for (dirpath, dirnames, filenames) in os.walk(dir_path):
for filename in filenames:
try:
reader = csv.DictReader(
open(os.path.join(dirpath, filename), 'r')
)
if 'station' in filename:
for data in reader:
try:
point = Point(
x=float(data['longitude']),
y=float(data['latitude']),
srid=4326
)
try:
country = Country.get_countries_by_point(
point
)[0]
except IndexError:
country = None
Station.objects.get_or_create(
code=data['station code'],
provider=self.provider,
defaults={
'name': data['name'],
'geometry': point,
'country': country,
'observation_type': self.obs_type,
}
)
except KeyError as e:
raise Exception(
json.dumps({
'filename': filename,
'data': data,
'error': f'{e}'
})
)
except UnicodeDecodeError:
continue

# INGEST MEASUREMENTS
for (dirpath, dirnames, filenames) in os.walk(dir_path):
for filename in filenames:
code = filename.split('_')[0]
try:
station = Station.objects.get(
code=code, provider=self.provider
)
reader = csv.DictReader(
open(os.path.join(dirpath, filename), 'r')
)
for data in reader:
date = data[''] # noqa
if not date:
continue
date_time = datetime.strptime(
date, '%Y-%m-%d %H:%M'
).replace(tzinfo=timezone.utc)
date_time.replace(second=0)
for key, value in data.items(): # noqa
try:
attr_var = TAHMO_VARIABLES[key]
except KeyError:
continue
try:
attribute, _ = Attribute.objects.get_or_create(
name=attr_var.name
)
try:
unit = attr_var.unit
except KeyError:
unit = None
measure, _ = Measurement.objects.get_or_create(
station=station,
attribute=attribute,
date_time=date_time,
defaults={
'unit': unit,
'value': float(value)
}
)
except (KeyError, ValueError) as e:
raise Exception(
json.dumps({
'filename': filename,
'data': data,
'error': f'{e}'
})
)
except Station.DoesNotExist:
pass

def run(self):
"""Run the ingestor."""
if not self.session.file:
raise FileNotFoundException()

# Extract file
dir_path = os.path.splitext(self.session.file.path)[0]
with ZipFile(self.session.file.path, 'r') as zip_ref:
zip_ref.extractall(dir_path)

# Run the ingestion
try:
self._run(dir_path)
shutil.rmtree(dir_path)
except Exception as e:
shutil.rmtree(dir_path)
raise Exception(e)
5 changes: 4 additions & 1 deletion django_project/gap/management/commands/load_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from django.core.management import call_command
from django.core.management.base import BaseCommand

from gap.models import Country


class Command(BaseCommand):
"""Command to load fixtures."""
Expand All @@ -16,4 +18,5 @@ class Command(BaseCommand):

def handle(self, *args, **options):
"""Handle load fixtures."""
call_command('loaddata', 'gap/fixtures/1.country.json')
if Country.objects.count() == 0:
call_command('loaddata', 'gap/fixtures/1.country.json')
Loading
Loading