Skip to content

Commit

Permalink
Add workflow to run model with mean nudging tendency (#215)
Browse files Browse the repository at this point in the history
* Add workflow runfile, configs, Makefile

* Update runtime module

* Fix fv3net runtime __init__

* Pin pandas in prognostic run to 1.0.1

* Update runfile and Makefile

* Update experiment names in configs

* Update Makefile

* Remove workflow submit_job.py

* Refactor nudge file handling to kube_jobs

* Add tests for nudge file handling

* Use common transfer_local_to_remote function

* Add type hints to nudge_to_obs.py

* Lint

* Update configurations and Makefile to enable remote runs

* Remove leftover debugging logging statement

* Use common unique_tag() function

* Change outdirs in Makefile

* Update rule name in Makefile

* Change run length to 91 days

* Make layout 2,2 for nudge_mean_T

* Make runfile work for multiple procs per tile

* Add prepare_config.py script to simplify submission

* Add get_fs and get_protocol to vcm.cloud.__init__.py

* Fix Makefile

* Make sure absolute paths are used for config_url

* Update runfile for n_proc>6

* Set layout=2,2 in config

* Rename dimensions as well

* Load all timesteps of nudging data

* Add submit_to_kubernetes script

* Refactor runfile

* Update scripts to allow local and remote runs

* Make run length 91 days

* Make flush_nc_files=True in namelist

* Change nudging tendency year to model year instead of reverse

* Update diagnostic to zarr pipeline

* Add post-processing script

* Lint

* Add GFS analysis data to catalog.yml

* Add back runtime get_runfile_config function

* Add docstring

* Add README.md

* Add get_runfile_config back to runtime __init__

* Update postprocessing script

* Address Jeremy PR comments

* Rename nudging_tendency to mean_nudging_tendency

* Update fv3config submdule to v0.3.1

* Use fv3config get_timestep and config_from_yaml

* Address Noah PR comments

* Update HISTORY.rst and workflow readme

* Fix typo

* Add quotes to filename_pattern in nudge config yamls

* Update length of runs
  • Loading branch information
Oliver Watt-Meyer authored Apr 9, 2020
1 parent 712603c commit 541ae51
Show file tree
Hide file tree
Showing 28 changed files with 665 additions and 95 deletions.
8 changes: 7 additions & 1 deletion HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,19 @@ latest
* Fixed integration tests to use same version tags of the `fv3net` and `prognostic_run` images
* Added makefile targets to submit integration tests to cluster from local machine and to get docker image names
* Made simple step output directory names the default in the orchestrator
* Add `run_with_learned_nudging` workflow
* Update fv3config submodule to v0.3.1
* Add `get_config()` function to fv3net.runtime
* Change API of `diagnostics_to_zarr` workflow so that it saves output zarrs in the given run directory
* Add `nudge_to_obs` module to `kube_jobs`, which helps with the configuration of FV3GFS model runs that are nudged towards GFS analysis
* Add public function: vcm.convert_timestamps
* Add pipeline to load C384 restart data into a zarr


0.1.1 (2020-03-25)
------------------
* Updates to make end-to-end workflow work with fv3atm (fv3gfs-python:v0.3.1)
* Added bump2version for automated versioning of `fv3net` resources
* Added bump2version for automated versioning of `fv3net` resources
* Add CircleCI build/push capabilities for `fv3net` images


Expand Down
17 changes: 17 additions & 0 deletions catalog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,23 @@ sources:
access: read_only
urlpath: "gs://vcm-ml-data/2020-02-25-additional-november-C3072-simulation-C384-diagnostics/atmos_8xdaily_C3072_to_C384.zarr"

GFS_analysis_T85_2015_2016:
description: 4x daily GFS analysis data at approximately 2deg resolution, typically used for nudging FV3GFS. Spans 2015-01-01T00:00:00 to 2017-01-01T00:00:00.
driver: zarr
args:
storage_options:
project: 'vcm-ml'
access: read_only
urlpath: "gs://vcm-ml-data/2020-03-27-T85-GFS-nudging-data-as-zarr/nudging_data_2015-2016.zarr"

GFS_analysis_T85_2015_2016_1M_mean:
description: Monthly mean GFS analysis data at approximately 2deg resolution. Spans 2015-01 to 2016-12.
driver: zarr
args:
storage_options:
project: 'vcm-ml'
access: read_only
urlpath: "gs://vcm-ml-data/2020-03-27-T85-GFS-nudging-data-as-zarr/nudging_data_mean_1M.zarr"

## Local Data Intake ##
# TODO: Could this be replicated with intake caching? Or switch to an ignored file?
Expand Down
13 changes: 13 additions & 0 deletions docker/learned_nudging_run/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM us.gcr.io/vcm-ml/fv3gfs-python:v0.4.0


COPY docker/learned_nudging_run/requirements.txt /tmp/requirements.txt
RUN pip3 install -r /tmp/requirements.txt

# cache external package installation
COPY external/fv3config /fv3net/external/fv3config
COPY external/vcm /fv3net/external/vcm
RUN pip3 install -e /fv3net/external/vcm -e /fv3net/external/fv3config

COPY . /fv3net
RUN pip3 install --no-deps -e /fv3net
6 changes: 6 additions & 0 deletions docker/learned_nudging_run/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
scikit-learn==0.22.1
dask
zarr
scikit-image
google-cloud-logging
pandas==1.0.1
2 changes: 1 addition & 1 deletion external/fv3config
2 changes: 1 addition & 1 deletion external/vcm/vcm/cloud/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .fsspec import get_fs
from .fsspec import get_fs, get_protocol


__all__ = [item for item in dir() if not item.startswith("_")]
20 changes: 3 additions & 17 deletions fv3net/pipelines/diagnostics_to_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
INITIAL_CHUNKS = {"time": 192}
TILES = range(1, 7)
COMMON_SUFFIX = ".tile1.nc"
DEFAULT_DIAGNOSTIC_DIR = "diagnostic_zarr"


def run(args, pipeline_args):
rundir = args.rundir
diagnostic_dir = _parse_diagnostic_dir(args.diagnostic_dir, rundir)
diagnostic_dir = rundir if args.diagnostic_dir is None else args.diagnostic_dir
diagnostic_categories = _parse_categories(args.diagnostic_categories, rundir)
logger.info(f"Diagnostic zarrs being written to {diagnostic_dir}")
logger.info(f"Diagnostic categories to convert are {diagnostic_categories}")
Expand All @@ -38,7 +37,7 @@ def run(args, pipeline_args):


def open_convert_save(diagnostic_category, rundir, diagnostic_dir):
remote_zarr = os.path.join(diagnostic_dir, diagnostic_category)
remote_zarr = os.path.join(diagnostic_dir, f"{diagnostic_category}.zarr")
with tempfile.TemporaryDirectory() as local_zarr:
for tile in TILES:
logger.info(f"Converting category {diagnostic_category} tile {tile}")
Expand All @@ -63,13 +62,6 @@ def _parse_categories(diagnostic_categories, rundir):
return diagnostic_categories


def _parse_diagnostic_dir(diagnostic_dir, rundir):
if diagnostic_dir is None:
return os.path.join(_get_parent_dir(rundir), DEFAULT_DIAGNOSTIC_DIR)
else:
return diagnostic_dir


def _get_all_diagnostic_categories(rundir, fs):
""" get full paths for all files in rundir that end in COMMON_SUFFIX """
full_paths = fs.glob(os.path.join(rundir, f"*{COMMON_SUFFIX}"))
Expand All @@ -82,12 +74,6 @@ def _get_category_from_path(path):
return basename[: -len(COMMON_SUFFIX)]


def _get_parent_dir(path):
if path[-1] == "/":
path = path[:-1]
return os.path.split(path)[0]


def _get_fs(path):
"""Return the fsspec filesystem required to handle a given path."""
if path.startswith("gs://"):
Expand All @@ -108,7 +94,7 @@ def _get_fs(path):
"--diagnostic-dir",
type=str,
default=None,
help="Location to save zarr stores. Defaults to the parent of rundir.",
help="Location to save zarr stores. Defaults to rundir.",
)
parser.add_argument(
"--diagnostic-categories",
Expand Down
1 change: 1 addition & 0 deletions fv3net/pipelines/kube_jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
update_nested_dict,
get_base_fv3config,
)
from .nudge_to_obs import update_config_for_nudging
102 changes: 102 additions & 0 deletions fv3net/pipelines/kube_jobs/nudge_to_obs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from datetime import datetime, timedelta
import os
import numpy as np
from typing import List, Mapping

import fsspec
import fv3config


# this module assumes that analysis files are at 00Z, 06Z, 12Z and 18Z
SECONDS_IN_HOUR = 60 * 60
NUDGE_HOURS = np.array([0, 6, 12, 18]) # hours at which analysis data is available
NUDGE_FILE_TARGET = "INPUT" # where to put analysis files in rundir


def _most_recent_nudge_time(start_time: datetime) -> datetime:
"""Return datetime object for the last nudging time preceding or concurrent
with start_time"""
first_nudge_hour = _most_recent_hour(start_time.hour)
return datetime(start_time.year, start_time.month, start_time.day, first_nudge_hour)


def _most_recent_hour(current_hour, hour_array=NUDGE_HOURS) -> int:
"""Return latest hour in hour_array that precedes or is concurrent with
current_hour"""
first_nudge_hour = hour_array[np.argmax(hour_array > current_hour) - 1]
return first_nudge_hour


def _get_nudge_time_list(config: Mapping) -> List[datetime]:
"""Return list of datetime objects corresponding to times at which analysis files
are required for nudging for a given model run configuration"""
current_date = config["namelist"]["coupler_nml"]["current_date"]
start_time = datetime(*current_date)
first_nudge_time = _most_recent_nudge_time(start_time)
run_duration = fv3config.get_run_duration(config)
nudge_duration = run_duration + (start_time - first_nudge_time)
nudge_duration_hours = int(
np.ceil(nudge_duration.total_seconds() / SECONDS_IN_HOUR)
)
nudge_interval = NUDGE_HOURS[1] - NUDGE_HOURS[0]
nudging_hours = range(0, nudge_duration_hours + nudge_interval, nudge_interval)
return [first_nudge_time + timedelta(hours=hour) for hour in nudging_hours]


def _get_nudge_filename_list(config: Mapping) -> List[str]:
"""Return list of filenames of all nudging files required"""
nudge_filename_pattern = config["gfs_analysis_data"]["filename_pattern"]
time_list = _get_nudge_time_list(config)
return [time.strftime(nudge_filename_pattern) for time in time_list]


def _get_nudge_files_asset_list(config: Mapping) -> List[Mapping]:
"""Return list of fv3config assets for all nudging files required for a given
model run configuration"""
nudge_url = config["gfs_analysis_data"]["url"]
return [
fv3config.get_asset_dict(nudge_url, file, target_location=NUDGE_FILE_TARGET)
for file in _get_nudge_filename_list(config)
]


def _get_nudge_files_description_asset(config: Mapping, config_url: str) -> Mapping:
"""Return an fv3config asset pointing to the text file that the
model requires to describe the list of nudging files."""
fname_list_filename = config["namelist"]["fv_nwp_nudge_nml"]["input_fname_list"]
return fv3config.get_asset_dict(config_url, fname_list_filename)


def _write_nudge_files_description(config: Mapping, url: str):
"""Write a text file with list of all nudging files (which the
model requires to know what the nudging files are called)."""
fname_list_contents = "\n".join(_get_nudge_filename_list(config))
with fsspec.open(url, "w") as f:
f.write(fname_list_contents)


def update_config_for_nudging(config: Mapping, config_url: str) -> Mapping:
"""Add assets to config for all nudging files and for the text file listing
nudging files. This text file will be written to config_url.
Args:
config: an fv3config configuration dictionary
config_url: path where text file describing nudging files will be written.
File will be written to {config_url}/{input_fname_list} where
input_fname_list is a namelist parameter in the fv_nwp_nudge_nml namelist
of config.
Returns:
config dict updated to include all required nudging files
"""
nudge_files_description = _get_nudge_files_description_asset(config, config_url)
nudge_files_description_url = os.path.join(
nudge_files_description["source_location"],
nudge_files_description["source_name"],
)
_write_nudge_files_description(config, nudge_files_description_url)
if "patch_files" not in config:
config["patch_files"] = []
config["patch_files"].append(nudge_files_description)
config["patch_files"].extend(_get_nudge_files_asset_list(config))
return config
2 changes: 1 addition & 1 deletion fv3net/runtime/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from . import sklearn_interface as sklearn
from .state_io import init_writers, append_to_writers, CF_TO_RESTART_MAP
from .config import get_runfile_config, get_namelist
from .config import get_runfile_config, get_config, get_namelist
10 changes: 9 additions & 1 deletion fv3net/runtime/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import yaml
import f90nml
import fv3config

FV3CONFIG_FILENAME = "fv3config.yml"


class dotdict(dict):
Expand All @@ -11,10 +14,15 @@ class dotdict(dict):


def get_runfile_config():
with open("fv3config.yml") as f:
with open(FV3CONFIG_FILENAME) as f:
config = yaml.safe_load(f)
return dotdict(config["scikit_learn"])


def get_config():
"""Return fv3config dictionary"""
return fv3config.config_from_yaml(FV3CONFIG_FILENAME)


def get_namelist():
return f90nml.read("input.nml")
57 changes: 57 additions & 0 deletions tests/test_kube_jobs_nudge_to_obs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import pytest
from datetime import datetime

from fv3net.pipelines.kube_jobs import nudge_to_obs


@pytest.mark.parametrize(
"start_time, expected",
[
(datetime(2016, 1, 1), datetime(2016, 1, 1, 0)),
(datetime(2016, 1, 1, 1), datetime(2016, 1, 1, 0)),
(datetime(2016, 1, 1, 7), datetime(2016, 1, 1, 6)),
(datetime(2016, 1, 1, 12), datetime(2016, 1, 1, 12)),
(datetime(2016, 1, 2, 18, 1), datetime(2016, 1, 2, 18)),
],
)
def test__get_first_nudge_file_time(start_time, expected):
assert nudge_to_obs._most_recent_nudge_time(start_time) == expected


@pytest.mark.parametrize(
"coupler_nml, expected_length, expected_first_datetime, expected_last_datetime",
[
(
{"current_date": [2016, 1, 1, 0, 0, 0], "days": 1},
4 + 1,
datetime(2016, 1, 1),
datetime(2016, 1, 2),
),
(
{"current_date": [2016, 1, 1, 0, 0, 0], "days": 1, "hours": 5},
4 + 1 + 1,
datetime(2016, 1, 1),
datetime(2016, 1, 2, 6),
),
(
{"current_date": [2016, 1, 1, 0, 0, 0], "days": 1, "hours": 7},
4 + 2 + 1,
datetime(2016, 1, 1),
datetime(2016, 1, 2, 12),
),
(
{"current_date": [2016, 1, 2, 1, 0, 0], "days": 1},
4 + 2,
datetime(2016, 1, 2),
datetime(2016, 1, 3, 6),
),
],
)
def test__get_nudge_time_list(
coupler_nml, expected_length, expected_first_datetime, expected_last_datetime
):
config = {"namelist": {"coupler_nml": coupler_nml}}
nudge_file_list = nudge_to_obs._get_nudge_time_list(config)
assert len(nudge_file_list) == expected_length
assert nudge_file_list[0] == expected_first_datetime
assert nudge_file_list[-1] == expected_last_datetime
7 changes: 3 additions & 4 deletions workflows/diagnostics_to_zarr/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
## Diagnostics-to-zarr workflow
This workflow takes a path/url to a run directory as an input and saves zarr stores
of the diagnostic model output to a specified location. This workflow requires a
specific xarray version (0.14.0) and so to run locally, one must ensure your
of the diagnostic model output to a specified location. This workflow requires a
specific xarray version and so to run locally, one must ensure your
environment is using that version. For dataflow jobs, a custom setup.py is provided
which pins this exact version.

Expand All @@ -17,8 +17,7 @@ optional arguments:
--rundir RUNDIR Location of run directory. May be local or remote
path.
--diagnostic-dir DIAGNOSTIC_DIR
Location to save zarr stores. Defaults to the parent
of rundir.
Location to save zarr stores. Defaults to rundir.
--diagnostic-categories DIAGNOSTIC_CATEGORIES [DIAGNOSTIC_CATEGORIES ...]
Optionally specify one or more categories of
diagnostic files. Provide part of filename before
Expand Down
4 changes: 2 additions & 2 deletions workflows/diagnostics_to_zarr/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
"numba",
"scikit-image",
"netCDF4",
"xarray==0.14.0",
"xarray==0.15.0",
"partd",
"pyyaml>=5.0",
"xgcm",
"zarr",
]

setup(
name="fv3net",
name="diags-to-zarr",
packages=find_packages(),
install_requires=dependencies,
version="0.1.0",
Expand Down
28 changes: 28 additions & 0 deletions workflows/run_with_learned_nudging/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
IMAGE = us.gcr.io/vcm-ml/learned_nudging_run:v0.1.1
RUNFILE = mean_nudging_runfile.py
TEMPLATE = fv3config_template.yml
LOCAL_OUTDIR = rundir/nudge_mean_$*
LOCAL_CONFIGDIR = $(LOCAL_OUTDIR)/config
LOCAL_FV3CONFIG = $(LOCAL_CONFIGDIR)/fv3config.yml
GCS_OUTDIR = gs://vcm-ml-data/2020-03-30-learned-nudging-FV3GFS-runs/nudge_mean_$*
GCS_CONFIGDIR = $(GCS_OUTDIR)/config
GCS_FV3CONFIG = $(GCS_CONFIGDIR)/fv3config.yml

run_all_remote: run_remote_T run_remote_T_ps run_remote_T_ps_u_v

run_remote_%: prepare_remote_%
python submit_job.py --dockerimage $(IMAGE) --runfile $(RUNFILE) $(GCS_FV3CONFIG) $(GCS_OUTDIR)

run_local_%: prepare_local_%
fv3run --dockerimage $(IMAGE) --runfile $(RUNFILE) $(LOCAL_FV3CONFIG) $(LOCAL_OUTDIR)

prepare_remote_%:
python prepare_config.py $(TEMPLATE) $* $(GCS_CONFIGDIR)

prepare_local_%:
python prepare_config.py $(TEMPLATE) $* $(LOCAL_CONFIGDIR)

clean:
rm -rf rundir configdir

.PHONY: run_all_remote
Loading

0 comments on commit 541ae51

Please sign in to comment.