Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export missing data #26

Merged
merged 2 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
# Surface

## Run Docker

docker-compose up

## Run and Stop the project

docker-compose -f docker-compose-dev.yml up

docker-compose -f docker-compose-dev.yml stop

## Production env

in api file add the production env, you can find a example in "./api/production.env.example", add the values of variables, in hosts and ports you can put 0

## Generate Docker Images
docker-compose -f docker-compose-dev.yml build

## Running with Docker

docker-compose -f docker-compose-dev.yml up postgres cache redis api
Expand All @@ -8,6 +25,14 @@ docker-compose -f docker-compose-dev.yml up postgres cache redis api

docker-compose -f docker-compose-dev.yml exec api bash load_initial_data.sh

### if you're using windows

docker-compose -f docker-compose-dev.yml exec api bash

python manage.py migrate

python manage.py loaddata /surface/fixtures/*

## Create superuser

docker-compose -f docker-compose-dev.yml exec api python manage.py createsuperuser
Expand All @@ -17,3 +42,13 @@ docker-compose -f docker-compose-dev.yml exec api python manage.py createsuperus
docker-compose -f docker-compose-prd.yml -p surface_new exec api bash load_initial_data.sh

docker-compose -f docker-compose-prd.yml -p surface_new exec api python manage.py collectstatic --noinput

## Loading data

docker-compose -f docker-compose-dev.yml exec postgres pg_restore -U dba -d surface_db /data/shared/dump_surface_20211114.dump

docker-compose -f docker-compose-dev.yml exec postgres psql -U dba -d surface_db -c "\COPY raw_data FROM '/data/shared/dump_raw_data_20211130.csv' WITH DELIMITER ',' CSV HEADER;"

## Access DB manually

docker-compose -f docker-compose-dev.yml exec postgres psql -U dba -d surface_db
154 changes: 140 additions & 14 deletions api/wx/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from django.core.cache import cache
from django.db import connection


from tempestas_api import settings
from wx.decoders.flash import read_data as read_data_flash
from wx.decoders.hobo import read_file as read_file_hobo
Expand Down Expand Up @@ -898,13 +899,13 @@ def export_data(station_id, source, start_date, end_date, variable_ids, file_id)
current_end_datetime = datetime_list[i + 1]

with connection.cursor() as cursor:

if source == 'raw_data':
cursor.execute(f'''

query_raw_data = '''
WITH processed_data AS (
SELECT datetime
,var.id as variable_id
,CASE WHEN var.variable_type ilike 'code' THEN data.code ELSE data.measured::varchar END AS value
,COALESCE(CASE WHEN var.variable_type ilike 'code' THEN data.code ELSE data.measured::varchar END, '-99.9') AS value
FROM raw_data data
JOIN wx_variable var ON data.variable_id = var.id AND var.id IN %(variable_ids)s
WHERE data.datetime >= %(start_datetime)s
Expand All @@ -913,22 +914,147 @@ def export_data(station_id, source, start_date, end_date, variable_ids, file_id)
)
SELECT (generated_time + interval '%(utc_offset)s minutes') at time zone 'utc' as datetime
,variable.id
,value
,COALESCE(value, '-99.9')
FROM generate_series(%(start_datetime)s, %(end_datetime)s - INTERVAL '1 seconds', INTERVAL '%(data_interval)s seconds') generated_time
JOIN wx_variable variable ON variable.id IN %(variable_ids)s
LEFT JOIN processed_data ON datetime = generated_time AND variable.id = variable_id
''', {'utc_offset': station.utc_offset_minutes, 'variable_ids': variable_ids,
'''

logging.info(query_raw_data, {'utc_offset': station.utc_offset_minutes, 'variable_ids': variable_ids,
'start_datetime': current_start_datetime, 'end_datetime': current_end_datetime,
'station_id': station_id, 'data_interval': current_datafile.interval_in_seconds})

cursor.execute(query_raw_data, {'utc_offset': station.utc_offset_minutes, 'variable_ids': variable_ids,
'start_datetime': current_start_datetime, 'end_datetime': current_end_datetime,
'station_id': station_id, 'data_interval': current_datafile.interval_in_seconds})
else:
cursor.execute(f'''
SELECT {date_source}, var.id, {measured_source}
FROM {source} data
JOIN wx_variable var ON data.variable_id = var.id AND var.id in %s
WHERE data.{datetime_variable} >= %s
AND data.{datetime_variable} < %s
AND data.station_id = %s
''', (variable_ids, current_start_datetime, current_end_datetime, station_id,))

elif source == 'hourly_summary':

query_hourly = '''
WITH processed_data AS (
SELECT datetime ,var.id as variable_id
,COALESCE(CASE WHEN var.sampling_operation_id in (1,2) THEN data.avg_value::real
WHEN var.sampling_operation_id = 3 THEN data.min_value
WHEN var.sampling_operation_id = 4 THEN data.max_value
WHEN var.sampling_operation_id = 6 THEN data.sum_value
ELSE data.sum_value END, '-99.9') as value
FROM hourly_summary data
JOIN wx_variable var ON data.variable_id = var.id AND var.id IN %(variable_ids)s
WHERE data.datetime >= %(start_datetime)s
AND data.datetime < %(end_datetime)s
AND data.station_id = %(station_id)s
)
SELECT (generated_time + interval '%(utc_offset)s minutes') at time zone 'utc' as datetime
,variable.id
,COALESCE(value, '-99.9')
FROM generate_series(%(start_datetime)s, %(end_datetime)s - INTERVAL '1 seconds', INTERVAL '1 hours') generated_time
JOIN wx_variable variable ON variable.id IN %(variable_ids)s
LEFT JOIN processed_data ON datetime = generated_time AND variable.id = variable_id
'''

logging.info(query_hourly,{'utc_offset': station.utc_offset_minutes, 'variable_ids': variable_ids,
'start_datetime': current_start_datetime, 'end_datetime': current_end_datetime,
'station_id': station_id})

cursor.execute(query_hourly,{'utc_offset': station.utc_offset_minutes, 'variable_ids': variable_ids,
'start_datetime': current_start_datetime, 'end_datetime': current_end_datetime,
'station_id': station_id})

elif source == 'daily_summary':

query_daily = '''
WITH processed_data AS (
SELECT day ,var.id as variable_id
,COALESCE(CASE WHEN var.sampling_operation_id in (1,2) THEN data.avg_value::real
WHEN var.sampling_operation_id = 3 THEN data.min_value
WHEN var.sampling_operation_id = 4 THEN data.max_value
WHEN var.sampling_operation_id = 6 THEN data.sum_value
ELSE data.sum_value END, '-99.9') as value
FROM daily_summary data
JOIN wx_variable var ON data.variable_id = var.id AND var.id IN %(variable_ids)s
WHERE data.day >= %(start_datetime)s
AND data.day < %(end_datetime)s
AND data.station_id = %(station_id)s
)
SELECT (generated_time) as datetime
,variable.id
,COALESCE(value, '-99.9')
FROM generate_series(%(start_datetime)s, %(end_datetime)s - INTERVAL '1 seconds', INTERVAL '1 days') generated_time
JOIN wx_variable variable ON variable.id IN %(variable_ids)s
LEFT JOIN processed_data ON day = generated_time AND variable.id = variable_id
'''

logging.info(query_daily, {'variable_ids': variable_ids,
'start_datetime': current_start_datetime, 'end_datetime': current_end_datetime,
'station_id': station_id})

cursor.execute(query_daily, {'variable_ids': variable_ids,
'start_datetime': current_start_datetime, 'end_datetime': current_end_datetime,
'station_id': station_id})

elif source == 'monthly_summary':

query_monthly = '''
WITH processed_data AS (
SELECT date ,var.id as variable_id
,COALESCE(CASE WHEN var.sampling_operation_id in (1,2) THEN data.avg_value::real
WHEN var.sampling_operation_id = 3 THEN data.min_value
WHEN var.sampling_operation_id = 4 THEN data.max_value
WHEN var.sampling_operation_id = 6 THEN data.sum_value
ELSE data.sum_value END, '-99.9') as value
FROM monthly_summary data
JOIN wx_variable var ON data.variable_id = var.id AND var.id IN %(variable_ids)s
WHERE data.date >= %(start_datetime)s
AND data.date < %(end_datetime)s
AND data.station_id = %(station_id)s
)
SELECT (generated_time) as datetime
,variable.id
,COALESCE(value, '-99.9')
FROM generate_series(%(start_datetime)s, %(end_datetime)s , INTERVAL '1 months') generated_time
JOIN wx_variable variable ON variable.id IN %(variable_ids)s
LEFT JOIN processed_data ON date = generated_time AND variable.id = variable_id
'''

logging.info(query_monthly, {'variable_ids': variable_ids,
'start_datetime': current_start_datetime, 'end_datetime': current_end_datetime,
'station_id': station_id})

cursor.execute(query_monthly, {'variable_ids': variable_ids,
'start_datetime': current_start_datetime, 'end_datetime': current_end_datetime,
'station_id': station_id})

elif source == 'yearly_summary':

query_yearly = '''
WITH processed_data AS (
SELECT date ,var.id as variable_id
,COALESCE(CASE WHEN var.sampling_operation_id in (1,2) THEN data.avg_value::real
WHEN var.sampling_operation_id = 3 THEN data.min_value
WHEN var.sampling_operation_id = 4 THEN data.max_value
WHEN var.sampling_operation_id = 6 THEN data.sum_value
ELSE data.sum_value END, '-99.9') as value
FROM yearly_summary data
JOIN wx_variable var ON data.variable_id = var.id AND var.id IN %(variable_ids)s
WHERE data.date >= %(start_datetime)s
AND data.date < %(end_datetime)s
AND data.station_id = %(station_id)s
)
SELECT (generated_time) as datetime
,variable.id
,COALESCE(value, '-99.9')
FROM generate_series(%(start_datetime)s, %(end_datetime)s , INTERVAL '1 years') generated_time
JOIN wx_variable variable ON variable.id IN %(variable_ids)s
LEFT JOIN processed_data ON date = generated_time AND variable.id = variable_id
'''

logging.info(query_yearly, {'variable_ids': variable_ids,
'start_datetime': current_start_datetime, 'end_datetime': current_end_datetime,
'station_id': station_id})

cursor.execute(query_yearly, {'variable_ids': variable_ids,
'start_datetime': current_start_datetime, 'end_datetime': current_end_datetime,
'station_id': station_id})

query_result = query_result + cursor.fetchall()

Expand Down
11 changes: 7 additions & 4 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ services:
postgres:
container_name: surface-database
image: timescale/timescaledb-postgis:2.3.0-pg13
ports:
- "5432:5432"
restart: unless-stopped
env_file:
- api/production.env
Expand All @@ -41,6 +43,7 @@ services:
- ./api:/surface
- ./data/media:/data/media
- ./data/shared:/data/shared
- ./data/exported_data:/data/exported_data
- ./data/documents:/data/documents
ports:
- "8080:8000"
Expand All @@ -59,14 +62,14 @@ services:
dockerfile: Dockerfile
context: api
container_name: surface-celery-worker
command: celery -A tempestas_api worker -l info
command: /home/surface/.local/bin/celery -A tempestas_api worker -l info
env_file:
- api/production.env
restart: unless-stopped
volumes:
- ./api:/surface
- ./data/media/documents/ingest:/data/documents/ingest
- ./data/media/exported_data:/data/exported_data
- ./data/documents/ingest:/data/documents/ingest
- ./data/exported_data:/data/exported_data
- ./data/shared:/data/shared
depends_on:
- api
Expand All @@ -83,7 +86,7 @@ services:
context: api
container_name: surface-celery-beat
restart: unless-stopped
command: celery -A tempestas_api beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
command: /home/surface/.local/bin/celery -A tempestas_api beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
env_file:
- api/production.env
volumes:
Expand Down