Skip to content

Commit

Permalink
Fix conflict in nemo2cmor.py when merging master into clim branch: In…
Browse files Browse the repository at this point in the history
… "def initialize()" merge clim_dir_ & test_mode_ additions #262.
  • Loading branch information
treerink committed Aug 27, 2020
2 parents 7b4e49d + 0e9c8f0 commit 67bc819
Show file tree
Hide file tree
Showing 11 changed files with 247 additions and 56 deletions.
64 changes: 64 additions & 0 deletions ece2cmor3/__load_nemo_vertices__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import logging
import os
import requests

import netCDF4
import numpy

# Logger object
log = logging.getLogger(__name__)

orca1_grid_shape = (292, 362)
orca025_grid_shape = (1050, 1442)

cached_vertices = {}


def load_vertices_from_file(gridtype, shape):
global cached_vertices
gridchar = gridtype
if shape == orca1_grid_shape:
mesh = "ORCA1"
elif shape == orca025_grid_shape:
mesh = "ORCA025"
else:
log.fatal("Unsupported grid resolution for NEMO: %s" % str(shape))
return None, None
if (mesh, gridchar) in cached_vertices.keys():
return cached_vertices[(mesh, gridchar)][0], cached_vertices[(mesh, gridchar)][1]
file_name = '-'.join(["nemo", "vertices", mesh, gridchar, "grid"]) + ".nc"
fullpath = os.path.join(os.path.dirname(__file__), "resources", "nemo-vertices", file_name)
if not os.path.isfile(fullpath):
if not get_from_b2share(file_name, fullpath):
log.fatal("The file %s could not be downloaded, please install manually at %s" % (file_name, fullpath))
return None, None
nemo_vertices_file_name = os.path.join("ece2cmor3/resources/nemo-vertices/", fullpath)
nemo_vertices_netcdf_file = netCDF4.Dataset(nemo_vertices_file_name, 'r')
lon_vertices_raw = numpy.array(nemo_vertices_netcdf_file.variables["vertices_longitude"][...], copy=True)
lat_vertices = numpy.array(nemo_vertices_netcdf_file.variables["vertices_latitude"][...], copy=True)
nemo_vertices_netcdf_file.close()
lon_vertices = numpy.where(lon_vertices_raw < 0, lon_vertices_raw + 360., lon_vertices_raw)
cached_vertices[(mesh, gridchar)] = (lon_vertices, lat_vertices)
return lon_vertices, lat_vertices


def get_from_b2share(fname, fullpath):
site = "https://b2share.eudat.eu/api"
record = "3ad7d5c5f1ab419297c1e02bded8d70f"
resp = requests.get('/'.join([site, "records", record]))
if not resp:
log.error("Problem getting record data from b2share server: %d" % resp.status_code)
return False
d = resp.json()
for f in d["files"]:
if f["key"] == fname:
url = '/'.join([site, "files", f["bucket"], f["key"]])
log.info("Downloading file %s from b2share archive..." % fname)
fresp = requests.get(url)
if not fresp:
log.error("Problem getting file %s from b2share server: %d" % (fname, resp.status_code))
return False
with open(fullpath, 'wb') as fd:
fd.write(fresp.content)
log.info("...success, file %s created" % fullpath)
return True
2 changes: 1 addition & 1 deletion ece2cmor3/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = 'v1.4.0'
version = 'v1.5.0'
1 change: 0 additions & 1 deletion ece2cmor3/ece2cmorlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ def perform_ifs_tasks(datadir, expname,
get_cmor_target("areacella", "fx"))
ifs2cmor.execute(ifs_tasks + [area_task], nthreads=taskthreads)


# Performs a NEMO cmorization processing:
def perform_nemo_tasks(datadir, expname, refdate):
global log, tasks, table_dir, prefix
Expand Down
7 changes: 6 additions & 1 deletion ece2cmor3/grib_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
# Key names
date_key = "dataDate"
time_key = "dataTime"
param_key = "paramId"
param_key = "indicatorOfParameter"
levtype_key = "indicatorOfTypeOfLevel"
table_key = "table2Version"
level_key = "level"

test_mode = False
Expand Down Expand Up @@ -111,9 +112,13 @@ def write(self, file_object_):
writer.writerow(self.row)

def set_field(self, name, value):
if name == table_key:
pass
self.row[csv_grib_mock.columns.index(name)] = value

def get_field(self, name):
if name == table_key:
return 128
return int(self.row[csv_grib_mock.columns.index(name)])

def release(self):
Expand Down
133 changes: 93 additions & 40 deletions ece2cmor3/grib_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,15 @@
varsfreq = {}
spvar = None
fxvars = []

record_keys = {}
starttimes = {}


# Initializes the module, looks up previous month files and inspects the first
# day in the input files to set up an administration of the fields.
def update_sp_key(fname):
global spvar
for key in varsfreq:
freq = varsfreq[key]
if key[0] == 154:
if spvar is None or spvar[1] >= freq:
spvar = (154, freq, fname)
if key[0] == 134:
if spvar is None or spvar[1] > freq:
spvar = (134, freq, fname)


def initialize(gpfiles, shfiles, tmpdir, ini_gpfile=None, ini_shfile=None):
global gridpoint_files, spectral_files, ini_gridpoint_file, ini_spectral_file, temp_dir, varsfreq, accum_codes
global gridpoint_files, spectral_files, ini_gridpoint_file, ini_spectral_file, temp_dir, varsfreq, accum_codes, \
record_keys
grib_file.initialize()
gridpoint_files = {d: (get_prev_file(gpfiles[d]), gpfiles[d]) for d in gpfiles.keys()}
spectral_files = {d: (get_prev_file(shfiles[d]), shfiles[d]) for d in shfiles.keys()}
Expand All @@ -56,11 +45,15 @@ def initialize(gpfiles, shfiles, tmpdir, ini_gpfile=None, ini_shfile=None):
shfile = spectral_files[shdate][1] if any(spectral_files) else None
if gpfile is not None:
with open(gpfile) as gpf:
varsfreq.update(inspect_day(grib_file.create_grib_file(gpf), grid=cmor_source.ifs_grid.point))
freqs, records = inspect_day(grib_file.create_grib_file(gpf), grid=cmor_source.ifs_grid.point)
varsfreq.update(freqs)
record_keys[cmor_source.ifs_grid.point] = records
update_sp_key(gpfile)
if shfile is not None:
with open(shfile) as shf:
varsfreq.update(inspect_day(grib_file.create_grib_file(shf), grid=cmor_source.ifs_grid.spec))
freqs, records = inspect_day(grib_file.create_grib_file(shf), grid=cmor_source.ifs_grid.spec)
varsfreq.update(freqs)
record_keys[cmor_source.ifs_grid.spec] = records
update_sp_key(shfile)
if ini_gpfile is not None:
with open(ini_gpfile) as gpf:
Expand All @@ -70,6 +63,19 @@ def initialize(gpfiles, shfiles, tmpdir, ini_gpfile=None, ini_shfile=None):
fxvars.extend(inspect_hr(grib_file.create_grib_file(shf), grid=cmor_source.ifs_grid.spec))


# Fix for finding the surface pressure, necessary to store 3d model level fields
def update_sp_key(fname):
global spvar
for key in varsfreq:
freq = varsfreq[key]
if key[0] == 154:
if spvar is None or spvar[1] >= freq:
spvar = (154, freq, fname)
if key[0] == 134:
if spvar is None or spvar[1] > freq:
spvar = (134, freq, fname)


# Function reading the file with grib-codes of accumulated fields
def load_accum_codes(path):
global accum_key
Expand All @@ -87,9 +93,9 @@ def grib_tuple_from_string(s):


# Utility to make grib tuple of codes from string
def grib_tuple_from_int(i):
if i < 256:
return i, 128
def grib_tuple_from_ints(i, j):
if i < 10 ** 3:
return i, j
return i % 10 ** 3, i / 10 ** 3


Expand All @@ -105,16 +111,22 @@ def inspect_hr(gribfile, grid):
def inspect_day(gribfile, grid):
inidate, initime = -99, -1
records = {}
keylist = []
while gribfile.read_next(headers_only=True):
date = gribfile.get_field(grib_file.date_key)
time = gribfile.get_field(grib_file.time_key) / 100
if date == inidate + 1 and time == initime:
gribfile.release()
break
if inidate < 0:
inidate = date
if initime < 0:
initime = time
key = get_record_key(gribfile, grid) + (grid,)
short_key = get_record_key(gribfile, grid)
if short_key[1] == 0:
log.error("Invalid key at first day inspection: %s" % str(short_key))
keylist.append((time,) + short_key)
key = short_key + (grid,)
if key in records:
if time not in records[key]:
records[key].append(time)
Expand All @@ -137,14 +149,15 @@ def inspect_day(gribfile, grid):
"intervals in first day in file %s" % (key[0], key[1], key[3], key[2], gribfile.file_object.name))
else:
result[key] = frq
return result
return result, keylist


# TODO: Merge the 2 functions below into one matching function:

# Creates a key (code + table + level type + level) for a grib message iterator
def get_record_key(gribfile, gridtype):
codevar, codetab = grib_tuple_from_int(gribfile.get_field(grib_file.param_key))
codevar, codetab = grib_tuple_from_ints(gribfile.get_field(grib_file.param_key),
gribfile.get_field(grib_file.table_key))
levtype, level = gribfile.get_field(grib_file.levtype_key), gribfile.get_field(grib_file.level_key)
if levtype == grib_file.pressure_level_hPa_code:
level *= 100
Expand All @@ -166,8 +179,7 @@ def get_record_key(gribfile, gridtype):
levtype = grib_file.surface_level_code
# Fix for spectral height level fields in gridpoint file:
if cmor_source.grib_code(codevar) in cmor_source.ifs_source.grib_codes_sh and \
gridtype != cmor_source.ifs_grid.spec and \
levtype == grib_file.hybrid_level_code:
gridtype != cmor_source.ifs_grid.spec and levtype == grib_file.hybrid_level_code:
levtype = grib_file.height_level_code
return codevar, codetab, levtype, level

Expand Down Expand Up @@ -336,12 +348,14 @@ def filter_fx_variables(gribfile, keys2files, gridtype, startdate, handles=None)
keys = set()
timestamp = t
keys.add(key)
write_record(gribfile, key, keys2files, shift=0, handles=handles, once=True, setdate=startdate)
write_record(gribfile, key + (gridtype,), keys2files, shift=0, handles=handles, once=True, setdate=startdate)
gribfile.release()


def execute_tasks(tasks, filter_files=True, multi_threaded=False, once=False):
valid_tasks, varstasks = validate_tasks(tasks)
if not any(valid_tasks):
return []
task2files, task2freqs, fxkeys, keys2files = cluster_files(valid_tasks, varstasks)
grids = [cmor_source.ifs_grid.point, cmor_source.ifs_grid.spec]
if filter_files:
Expand Down Expand Up @@ -470,9 +484,33 @@ def open_files(vars2files):
return {f: open(os.path.join(temp_dir, f), 'w') for f in files}


def build_fast_forward_cache(keys2files, grid):
result = {}
i = 0
prev_key = (-1, -1, -1, -1, -1)
if grid not in record_keys:
return {}
for key in record_keys[grid]:
if key[:4] != prev_key[:4]: # flush
if i > 1:
result[prev_key] = i
prev_key = key
i = 0
if key[3] == grib_file.hybrid_level_code:
comp_key = key[1:4] + (-1, grid,)
if comp_key not in keys2files:
i += 1
else:
i = 0
else:
i = 0
return result


# Processes month of grib data, including 0-hour fields in the previous month file.
def filter_grib_files(file_list, keys2files, grid, handles=None, month=0, year=0, once=False):
dates = sorted(file_list.keys())
cache = None if once else build_fast_forward_cache(keys2files, grid)
for i in range(len(dates)):
date = dates[i]
if month != 0 and year != 0 and (date.month, date.year) != (month, year):
Expand All @@ -486,7 +524,7 @@ def filter_grib_files(file_list, keys2files, grid, handles=None, month=0, year=0
with open(cur_grib_file, 'r') as fin:
log.info("Filtering grib file %s..." % cur_grib_file)
if next_chained:
proc_grib_file(grib_file.create_grib_file(fin), keys2files, grid, handles, once)
proc_grib_file(grib_file.create_grib_file(fin), keys2files, grid, handles, once, cache)
else:
proc_final_month(date.month, grib_file.create_grib_file(fin), keys2files, grid, handles, once)

Expand All @@ -501,30 +539,42 @@ def proc_initial_month(month, gribfile, keys2files, gridtype, handles, once=Fals
t = gribfile.get_field(grib_file.time_key)
key = get_record_key(gribfile, gridtype)
if t == timestamp and key in keys:
gribfile.release()
continue # Prevent double grib messages
if t != timestamp:
keys = set()
timestamp = t
keys.add(key)
if (key[0], key[1]) not in accum_codes:
write_record(gribfile, key, keys2files, handles=handles, once=once, setdate=None)
write_record(gribfile, key + (gridtype,), keys2files, handles=handles, once=once, setdate=None)
gribfile.release()


# Function writing data from previous monthly file, writing the 0-hour fields
def proc_grib_file(gribfile, keys2files, gridtype, handles, once=False):
def proc_grib_file(gribfile, keys2files, gridtype, handles, once=False, ff_cache=None):
timestamp = -1
keys = set()
while gribfile.read_next() and (handles is None or any(handles.keys())):
t = gribfile.get_field(grib_file.time_key)
fast_forward_count = 0
while gribfile.read_next(headers_only=(fast_forward_count > 0)) and (handles is None or any(handles.keys())):
if fast_forward_count > 0:
fast_forward_count -= 1
gribfile.release()
continue
key = get_record_key(gribfile, gridtype)
t = gribfile.get_field(grib_file.time_key)
fast_forward_count = ff_cache.get((t,) + key, 0) if ff_cache is not None else 0
if fast_forward_count > 0:
fast_forward_count -= 1
gribfile.release()
continue
if t == timestamp and key in keys:
gribfile.release()
continue # Prevent double grib messages
if t != timestamp:
keys = set()
timestamp = t
keys.add(key)
write_record(gribfile, key, keys2files, shift=-1 if (key[0], key[1]) in accum_codes else 0,
write_record(gribfile, key + (gridtype,), keys2files, shift=-1 if (key[0], key[1]) in accum_codes else 0,
handles=handles, once=once, setdate=None)
gribfile.release()

Expand All @@ -540,46 +590,49 @@ def proc_final_month(month, gribfile, keys2files, gridtype, handles, once=False)
t = gribfile.get_field(grib_file.time_key)
key = get_record_key(gribfile, gridtype)
if t == timestamp and key in keys:
gribfile.release()
continue # Prevent double grib messages
if t != timestamp:
keys = set()
timestamp = t
keys.add(key)
write_record(gribfile, key, keys2files, shift=-1 if (key[0], key[1]) in accum_codes else 0,
write_record(gribfile, key + (gridtype,), keys2files, shift=-1 if (key[0], key[1]) in accum_codes else 0,
handles=handles, once=once, setdate=None)
elif mon == month % 12 + 1:
t = gribfile.get_field(grib_file.time_key)
key = get_record_key(gribfile, gridtype)
if t == timestamp and key in keys:
gribfile.release()
continue # Prevent double grib messages
if t != timestamp:
keys = set()
timestamp = t
keys.add(key)
if (key[0], key[1]) in accum_codes:
write_record(gribfile, key, keys2files, shift=-1, handles=handles, once=once, setdate=None)
write_record(gribfile, key + (gridtype,), keys2files, shift=-1, handles=handles, once=once, setdate=None)
gribfile.release()


# Writes the grib messages
def write_record(gribfile, key, keys2files, shift=0, handles=None, once=False, setdate=None):
global starttimes
var_infos = set()
if key[2] == grib_file.hybrid_level_code:
matches = [keys2files[k] for k in keys2files if k[:3] == key[:3]]
for k, v in keys2files.items():
if k[:3] == key[:3]:
var_infos.update(v)
else:
matches = [keys2files[k] for k in keys2files if k[:4] == key[:4]]
var_infos = set()
for match in matches:
var_infos.update(match)
f = keys2files.get(key, None)
if f is not None:
var_infos.update(f)
if not any(var_infos):
return
if setdate is not None:
gribfile.set_field(grib_file.date_key, int(setdate.strftime("%Y%m%d")))
gribfile.set_field(grib_file.time_key, 0)
timestamp = gribfile.get_field(grib_file.time_key)
if shift != 0 and setdate is None:
matches = [k for k in varsfreq.keys() if k[:-1] == key]
freq = varsfreq[matches[0]] if any(matches) else 0
freq = varsfreq.get(key, 0)
shifttime = timestamp + shift * freq * 100
if shifttime < 0 or shifttime >= 2400:
newdate, hours = fix_date_time(gribfile.get_field(grib_file.date_key), shifttime / 100)
Expand Down
Loading

0 comments on commit 67bc819

Please sign in to comment.