Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bccaqv2 split bbox and grid point #60

Merged
merged 33 commits into from
Jan 16, 2020
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
805f544
add start_date and end_date to bccaqv2 subset
Dec 19, 2019
8ef1ffa
split bccaqv2 bbox and grid point processes
Dec 19, 2019
96646f9
deprecate lon0 and lat0 for ...
Dec 20, 2019
614260e
add test for bccaqv2 boundingbox subset and ...
Dec 20, 2019
2ae93bc
add exception message when request fails in tests
Jan 6, 2020
f817ea4
read tests output in memory instead of writing to disk
Jan 6, 2020
e5253ef
change the point subset to accept a comma ...
Jan 6, 2020
107e6b5
add test for deprecation of lat0 and lon0
Jan 6, 2020
5d39de2
extract monkeypatching in a single fixture
Jan 6, 2020
81fe8e0
pin pywps~=4.2.3
Jan 6, 2020
5aa7c03
subset multiple grid cells
Jan 6, 2020
58b99e2
keep global_attributes when merging datasets
Jan 7, 2020
446f43f
fix dimensions for multiple grid points
Jan 10, 2020
1e3e91b
fix and test csv conversions
Jan 10, 2020
b5156d3
skip online heat_wave test
Jan 10, 2020
a924510
flake8
Jan 10, 2020
8acb667
typo and documentation
Jan 10, 2020
253a4f0
speed up csv creation
Jan 13, 2020
9071d14
docs
Jan 13, 2020
623bb59
explicitely close thread pool
Jan 13, 2020
63a132e
fix csv output dropna
Jan 13, 2020
85f531a
flake8
Jan 13, 2020
6ba2c34
fix multiple grid cells output and csv output
Jan 14, 2020
7cf9332
formatting (black) and fix tests assertions
Jan 14, 2020
5c1aacc
drop na values after dropping 'region' index
Jan 14, 2020
66c4a0e
bump xsubsetpoint version number
Jan 14, 2020
affab20
extract common inputs to wpsio.py
Jan 14, 2020
4980409
clarify unit test
Jan 14, 2020
3780ef7
heatwave inherits from point subset, so the input...
Jan 14, 2020
e00401c
remove unneeded optimization in heatwave process
Jan 14, 2020
4f7df80
extract lat and lon to wpsio.py
Jan 15, 2020
73617f3
flake8
Jan 15, 2020
ae632b5
Merge branch 'master' into bccaqv2-split-bbox-and-grid-point
Jan 16, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 20 additions & 21 deletions environment.yml
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
name: finch
channels:
- birdhouse
- conda-forge
- defaults
- birdhouse
- conda-forge
- defaults
dependencies:
- python>=3.6
- pip
- jinja2
- click
- psutil
- bottleneck
- netcdf4
- libnetcdf==4.6.2
- numpy
- unidecode
- dask
- xarray>=0.12
- scipy
- sentry-sdk
- siphon
- xclim>=0.12.2
- pywps>=4.2.3

- python>=3.6
- pip
- jinja2
- click
- psutil
- bottleneck
- netcdf4
- libnetcdf==4.6.2
- numpy
- unidecode
- dask
- xarray>=0.12
- scipy
- sentry-sdk
- siphon
- xclim>=0.12.2
- pywps~=4.2.3
6 changes: 4 additions & 2 deletions finch/processes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .wps_xsubsetbbox import SubsetBboxProcess
from .wps_xsubsetpoint import SubsetGridPointProcess
from .wps_xsubset_bccaqv2 import SubsetBCCAQV2Process
from .wps_xsubsetpoint_bccaqv2 import SubsetGridPointBCCAQV2Process
from .wps_xsubsetbbox_bccaqv2 import SubsetBboxBCCAQV2Process
from .wps_xclim_indices import make_xclim_indicator_process
from .wps_bccaqv2_heatwave import BCCAQV2HeatWave
import xclim
Expand Down Expand Up @@ -29,7 +30,8 @@ def get_indicators(*args):
[
SubsetBboxProcess(),
SubsetGridPointProcess(),
SubsetBCCAQV2Process(),
SubsetGridPointBCCAQV2Process(),
SubsetBboxBCCAQV2Process(),
BCCAQV2HeatWave(),
]
)
Expand Down
2 changes: 2 additions & 0 deletions finch/processes/subset.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def process_resource(resource):
if threads > 1:
pool = ThreadPool(processes=threads)
list(pool.imap_unordered(process_resource, resources))
pool.close()
pool.join()
else:
for r in resources:
process_resource(r)
Expand Down
15 changes: 11 additions & 4 deletions finch/processes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import List, Tuple
from enum import Enum

import numpy as np
import pandas as pd
import xarray as xr
import requests
Expand Down Expand Up @@ -241,9 +242,14 @@ def get_attrs_fallback(ds, *args):

ds = ds.rename({variable: output_variable})

df = ds.to_dataframe()[["lat", "lon", output_variable]]
# most runs have timestamp with hour == 12 a few hour == 0 .. make uniform
df.index = df.index.map(lambda x: x.replace(hour=12))
# most runs have timestamp with hour == 12 a few hour == 0 ... make uniform
if not np.all(ds.time.dt.hour == 12):
attrs = ds.time.attrs
ds['time'] = [y.replace(hour=12) for y in ds.time.values]
ds.time.attrs = attrs

df = ds.to_dataframe()
df = df.reset_index().set_index('time')[["lat", "lon", output_variable]]

if calendar not in concat_by_calendar:
concat_by_calendar[calendar] = [df]
Expand All @@ -255,7 +261,8 @@ def get_attrs_fallback(ds, *args):
output_csv_list = []
for calendar_type, data in concat_by_calendar.items():
output_csv = output_folder / f"{filename_prefix}_{calendar_type}.csv"
pd.concat(data, axis=1).to_csv(output_csv)
dropna_threshold = 3 # lat + lon + at least one value
pd.concat(data, axis=1).dropna(axis=1, thresh=dropna_threshold).to_csv(output_csv)
output_csv_list.append(output_csv)

metadata_folder = output_folder / "metadata"
Expand Down
2 changes: 0 additions & 2 deletions finch/processes/wps_xsubsetbbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ def subset(
lat0 = wps_inputs["lat0"][0].data
lon1 = self.get_input_or_none(wps_inputs, "lon1")
lat1 = self.get_input_or_none(wps_inputs, "lat1")
# dt0 = wps_inputs['dt0'][0].data or None
# dt1 = wps_inputs['dt1'][0].data or None
start = self.get_input_or_none(wps_inputs, "start_date")
end = self.get_input_or_none(wps_inputs, "end_date")
variables = [r.data for r in wps_inputs.get("variable", [])]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@
from pywps.response.execute import ExecuteResponse
from pywps.app.exceptions import ProcessError
from pywps.app import WPSRequest
from .wpsio import start_date, end_date
from pywps import LiteralInput, ComplexOutput, FORMATS, configuration

from finch.processes import SubsetBboxProcess
from finch.processes.subset import SubsetProcess
from finch.processes.utils import get_bccaqv2_inputs, netcdf_to_csv, zip_files


class SubsetBCCAQV2Process(SubsetBboxProcess):
class SubsetBboxBCCAQV2Process(SubsetBboxProcess):
"""Subset a NetCDF file using bounding box geometry."""

def __init__(self):
inputs = [
LiteralInput(
"variable",
"NetCDF Variable",
abstract="Name of the variable in the NetCDF file.",
abstract=(
"Name of the variable in the NetCDF file."
"If not provided, all variables will be subsetted."
),
data_type="string",
default=None,
min_occurs=0,
Expand Down Expand Up @@ -49,49 +53,21 @@ def __init__(self):
LiteralInput(
"lat1",
"Maximum latitude",
abstract="Maximum latitude. Omit this coordinate to subset for a single grid cell.",
abstract="Maximum latitude",
data_type="float",
default=None,
min_occurs=0,
),
LiteralInput(
"lon1",
"Maximum longitude",
abstract="Maximum longitude. Omit this coordinate to subset for a single grid cell.",
abstract="Maximum longitude",
data_type="float",
default=None,
min_occurs=0,
),
# LiteralInput('dt0',
# 'Initial datetime',
# abstract='Initial datetime for temporal subsetting. Defaults to first date in file.',
# data_type='dateTime',
# default=None,
# min_occurs=0,
# max_occurs=1),
# LiteralInput('dt1',
# 'Final datetime',
# abstract='Final datetime for temporal subsetting. Defaults to last date in file.',
# data_type='dateTime',
# default=None,
# min_occurs=0,
# max_occurs=1),
LiteralInput(
"y0",
"Initial year",
abstract="Initial year for temporal subsetting. Defaults to first year in file.",
data_type="integer",
default=None,
min_occurs=0,
),
LiteralInput(
"y1",
"Final year",
abstract="Final year for temporal subsetting. Defaults to last year in file.",
data_type="integer",
default=None,
min_occurs=0,
),
start_date,
end_date,
LiteralInput(
"output_format",
"Output format choice",
Expand All @@ -116,8 +92,8 @@ def __init__(self):
SubsetProcess.__init__(
self,
self._handler,
identifier="subset_ensemble_BCCAQv2",
title="Subset of BCCAQv2 datasets",
identifier="subset_ensemble_bbox_BCCAQv2",
title="Subset of BCCAQv2 datasets, using a bounding box",
version="0.1",
abstract=(
"For the BCCAQv2 datasets, "
Expand All @@ -134,13 +110,13 @@ def __init__(self):
def _handler(self, request: WPSRequest, response: ExecuteResponse):
self.write_log("Processing started", response, 5)

# Build output filename
variable = self.get_input_or_none(request.inputs, "variable")
rcp = self.get_input_or_none(request.inputs, "rcp")
lat0 = self.get_input_or_none(request.inputs, "lat0")
lon0 = self.get_input_or_none(request.inputs, "lon0")
output_format = request.inputs["output_format"][0].data

output_filename = f"BCCAQv2_subset_{lat0}_{lon0}"
output_filename = f"BCCAQv2_subset_bbox_{lat0:.3f}_{lon0:.3f}"

self.write_log("Fetching BCCAQv2 datasets", response, 6)
request.inputs = get_bccaqv2_inputs(request.inputs, variable, rcp)
Expand Down
28 changes: 18 additions & 10 deletions finch/processes/wps_xsubsetpoint.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from threading import Lock

import xarray as xr
from pywps import LiteralInput, ComplexInput, ComplexOutput, FORMATS
from pywps.inout.outputs import MetaLink4
from xclim.subset import subset_gridpoint
Expand All @@ -8,7 +9,7 @@


class SubsetGridPointProcess(SubsetProcess):
"""Subset a NetCDF file using bounding box geometry."""
"""Subset a NetCDF file grid cells using a list of coordinates."""

def __init__(self):
inputs = [
Expand All @@ -22,15 +23,15 @@ def __init__(self):
LiteralInput(
"lon",
"Longitude",
abstract="Longitude coordinate",
data_type="float",
abstract="Longitude coordinate. Accepts a comma separated list of floats for multiple grid cells.",
data_type="string",
min_occurs=1,
),
LiteralInput(
"lat",
"Latitude",
abstract="Latitude coordinate.",
data_type="float",
abstract="Latitude coordinate. Accepts a comma separated list of floats for multiple grid cells.",
data_type="string",
min_occurs=1,
),
start_date,
Expand Down Expand Up @@ -81,10 +82,8 @@ def __init__(self):
)

def subset(self, wps_inputs, response, start_percentage=10, end_percentage=85, threads=1) -> MetaLink4:
lon = wps_inputs["lon"][0].data
lat = wps_inputs["lat"][0].data
# dt0 = wps_inputs['dt0'][0].data or None
# dt1 = wps_inputs['dt1'][0].data or None
longitudes = [float(lon) for lon in wps_inputs["lon"][0].data.split(',')]
latitudes = [float(lat) for lat in wps_inputs["lat"][0].data.split(',')]
start = self.get_input_or_none(wps_inputs, "start_date")
end = self.get_input_or_none(wps_inputs, "end_date")
variables = [r.data for r in wps_inputs.get("variable", [])]
Expand All @@ -108,7 +107,16 @@ def _subset_function(resource):
self.write_log(f"Subsetting file {count} of {n_files}", response, percentage)

dataset = dataset[variables] if variables else dataset
return subset_gridpoint(dataset, lon=lon, lat=lat, start_date=start, end_date=end)

global_attributes = dataset.attrs
output_ds = None
for lon, lat in zip(longitudes, latitudes):
subset = subset_gridpoint(dataset, lon=lon, lat=lat, start_date=start, end_date=end)
subset = subset.expand_dims(["lat", "lon"])
output_ds = output_ds.combine_first(subset) if output_ds is not None else subset
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The different sites are ordered along which dimension ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make more sense to have individual points with a multi-index coordinate of lon&lat together?
e.g. something like this:

index1 = pd.MultiIndex.from_arrays([longitudes, latitudes], names=['lon','lat'])
output = xr.Dataset(coords={'point_id': index1, 'time': subset.time}, attrs=subset.attrs)

@davidcaron I sent you a bit of code via email last week with an exemple not sure if you recieved?


output_ds.attrs = global_attributes
return output_ds

metalink = self.subset_resources(wps_inputs["resource"], _subset_function, threads=threads)

Expand Down
Loading