Skip to content

Commit

Permalink
Rtofs ingest (#46)
Browse files Browse the repository at this point in the history
* Start building out rtofs routines

* Start adding rtofs to ingest workflow

* Workign ingest lambda with rtofs

* Add RTOFS aggregation to ingest tools

* Working rtofs aggregation workflow
  • Loading branch information
mpiannucci authored Sep 25, 2023
1 parent c4ef203 commit 99c10cb
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 38 deletions.
26 changes: 24 additions & 2 deletions cloud_aggregator/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
# Next we need the sns topic of the NODD service that we want to subscribe to
# TODO: This should be a config value.
nodd_nos_topic_arn = 'arn:aws:sns:us-east-1:123901341784:NewOFSObject'
nodd_rtofs_topic_arn = 'arn:aws:sns:us-east-1:709902155096:NewRTOFSObject'

# First nos ofs queue
new_ofs_object_queue = MessageQueue(
'nos-new-ofs-object-queue',
visibility_timeout=360,
Expand All @@ -36,9 +38,21 @@
sns_arn=nodd_nos_topic_arn,
)

# next, rtofs queue
new_rtofs_object_queue = MessageQueue(
'new-rtofs-object-queue',
visibility_timeout=360,
)

new_rtofs_object_subscription = new_rtofs_object_queue.subscribe_to_sns(
subscription_name='new-rtofs-object-subscription',
sns_arn=nodd_rtofs_topic_arn,
)

# Create the lambda to ingest NODD data into the bucket
# TODO: Decrease memory
ingest_lambda = LocalDockerLambda(
name="ingest-nos-to-zarr",
name="ingest-nos-to-zarr",
repo="nextgen-dmac-ingest",
path='./ingest',
timeout=60,
Expand All @@ -50,12 +64,20 @@
# to the new object queue
ingest_lambda.add_cloudwatch_log_access()
ingest_lambda.add_s3_access('ingest-s3-lambda-policy', bucket.bucket)

# Subscribe to the necessary queues
ingest_lambda.subscribe_to_sqs(
subscription_name='nos-sqs-lambda-mapping',
queue=new_ofs_object_queue.queue,
batch_size=1,
)

ingest_lambda.subscribe_to_sqs(
subscription_name='rtofs-sqs-lambda-mapping',
queue=new_rtofs_object_queue.queue,
batch_size=1,
)

# Okay now for the aggregation. This part of the infra will create an sqs queue that receives bucket notifications
# from the ingest bucket. The queue will then trigger a lambda function that will aggregate the data into a single
# zarr store. The zarr store will then be uploaded to the ingestion bucket.
Expand All @@ -71,7 +93,7 @@
bucket.subscribe_sns_to_bucket_notifications(
subscription_name='ingest-bucket-notifications-subscription',
sns_topic=ingest_bucket_notifications_topic,
filter_prefix='nos/',
filter_prefix=['nos/', 'rtofs/'],
filter_suffix='.zarr',
)

Expand Down
4 changes: 4 additions & 0 deletions cloud_aggregator/aggregation/aggregation.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from ingest_tools.nos_ofs import generate_kerchunked_nos_roms_model_run, generate_kerchunked_nos_roms_best_time_series
from ingest_tools.rtofs import generate_kerchunked_rtofs_best_time_series
from ingest_tools.aws import parse_s3_sqs_payload
from ingest_tools.filters import key_contains


ROMS_FILTERS = ['cbofs', 'ciofs', 'dbofs', 'tbofs', 'wcofs']
RTOFS_FILTERS = ['rtofs']


def handler(event, context):
Expand All @@ -24,5 +26,7 @@ def handler(event, context):
if key_contains(key, ROMS_FILTERS):
generate_kerchunked_nos_roms_model_run(region, bucket, key)
generate_kerchunked_nos_roms_best_time_series(region, bucket, key)
elif key_contains(key, RTOFS_FILTERS):
generate_kerchunked_rtofs_best_time_series(region, bucket, key)
else:
print(f'No aggregation available for key: {key}')
8 changes: 4 additions & 4 deletions cloud_aggregator/infra/public_s3_bucket.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional
from typing import List, Optional
import pulumi
from pulumi_aws import iam, s3, sns

Expand Down Expand Up @@ -101,7 +101,7 @@ def __init__(
)

def subscribe_sns_to_bucket_notifications(
self, subscription_name: str, sns_topic: sns.Topic, filter_prefix: Optional[str] = None, filter_suffix: Optional[str] = None
self, subscription_name: str, sns_topic: sns.Topic, filter_prefix: List[str] = [], filter_suffix: Optional[str] = None
):
"""
Subscribes an SNS topic to bucket object notifications.
Expand Down Expand Up @@ -152,9 +152,9 @@ def subscribe_sns_to_bucket_notifications(
"s3:ObjectCreated:*",
"s3:ObjectRemoved:*",
],
filter_prefix=filter_prefix,
filter_prefix=prefix,
filter_suffix=filter_suffix,
)
) for prefix in filter_prefix
],
opts=pulumi.ResourceOptions(
parent=self, depends_on=[self.bucket, bucket_topic_policy]
Expand Down
19 changes: 14 additions & 5 deletions cloud_aggregator/ingest/ingest.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from ingest_tools.nos_ofs import generate_kerchunked_nos_nc
from ingest_tools.rtofs import generate_kerchunked_rtofs_nc
from ingest_tools.aws import parse_s3_sqs_payload
from ingest_tools.filters import key_contains


# TODO: Make these configurable
DESTINATION_BUCKET_NAME='nextgen-dmac-cloud-ingest'
DESTINATION_PREFIX='nos'
ROMS_FILTERS=['cbofs', 'ciofs', 'dbofs', 'tbofs', 'wcofs']

NOS_DESTINATION_PREFIX='nos'
RTOFS_DESTINATION_PREFIX='rtofs'
NOS_ROMS_FILTERS= ['cbofs', 'ciofs', 'dbofs', 'tbofs', 'wcofs']
RTOFS_FILTERS = ['rtofs']

def handler(event, context):
'''
Expand All @@ -22,7 +24,14 @@ def handler(event, context):

region, bucket, key = parse_s3_sqs_payload(payload)

if key_contains(key, ROMS_FILTERS):
generate_kerchunked_nos_nc(region, bucket, key, DESTINATION_BUCKET_NAME, DESTINATION_PREFIX)
# For now, we only care about nc files
if not key.endswith('.nc'):
print(f'No ingest available for key: {key}')
return

if key_contains(key, NOS_ROMS_FILTERS):
generate_kerchunked_nos_nc(region, bucket, key, DESTINATION_BUCKET_NAME, NOS_DESTINATION_PREFIX)
elif key_contains(key, RTOFS_FILTERS):
generate_kerchunked_rtofs_nc(region, bucket, key, DESTINATION_BUCKET_NAME, RTOFS_DESTINATION_PREFIX)
else:
print(f'No ingest available for key: {key}')
34 changes: 34 additions & 0 deletions ingest_tools/ingest_tools/generic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import fsspec
import ujson
from kerchunk.hdf import SingleHdf5ToZarr


def generate_kerchunked_nc(bucket: str, key: str, dest_key: str, dest_bucket: str, dest_prefix: str):
'''
Generate a kerchunked zarr file from a netcdf file in s3
'''
if not key.endswith('.nc'):
print(f'File {key} does not have a netcdf file postfix. Skipping...')
return

# For now SSL false is solving my cert issues **shrug**
fs_read = fsspec.filesystem('s3', anon=True, skip_instance_cache=True, use_ssl=False)
fs_write = fsspec.filesystem('s3', anon=False, skip_instance_cache=True, use_ssl=False)

url = f"s3://{bucket}/{key}"
outurl = f"s3://{dest_bucket}/{dest_prefix}/{dest_key}"

with fs_read.open(url) as ifile:
print(f"Kerchunking netcdf at {url}")
try:
chunks = SingleHdf5ToZarr(ifile, url)
except Exception as e:
print(f'Failed to kerchunk {url}: {e}')
return

print(f"Writing kerchunked json to {outurl}")
with fs_write.open(outurl, mode="w") as ofile:
data = ujson.dumps(chunks.translate())
ofile.write(data)

print(f'Successfully processed {url}')
30 changes: 3 additions & 27 deletions ingest_tools/ingest_tools/nos_ofs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import fsspec
import ujson
from kerchunk.combine import MultiZarrToZarr
from kerchunk.hdf import SingleHdf5ToZarr

from .generic import generate_kerchunked_nc


def parse_nos_model_run_datestamp(key: str) -> Tuple[str, str]:
Expand Down Expand Up @@ -61,37 +62,12 @@ def generate_nos_output_key(key: str) -> str:
model_name = parts[0].split('.')[0]
return f'{model_name}/{parts[1]}.zarr'


def generate_kerchunked_nos_nc(region: str, bucket: str, key: str, dest_bucket: str, dest_prefix: str):
'''
Generate a kerchunked zarr file from a netcdf file in s3
'''
if not key.endswith('.nc'):
print(f'File {key} does not have a netcdf file postfix. Skipping...')
return

# For now SSL false is solving my cert issues **shrug**
fs_read = fsspec.filesystem('s3', anon=True, skip_instance_cache=True, use_ssl=False)
fs_write = fsspec.filesystem('s3', anon=False, skip_instance_cache=True, use_ssl=False)

url = f"s3://{bucket}/{key}"
filekey = generate_nos_output_key(key)
outurl = f"s3://{dest_bucket}/{dest_prefix}/{filekey}"

with fs_read.open(url) as ifile:
print(f"Kerchunking nos model at {url}")
try:
chunks = SingleHdf5ToZarr(ifile, url)
except Exception as e:
print(f'Failed to kerchunk {url}: {e}')
return

print(f"Writing kerchunked nos model to {outurl}")
with fs_write.open(outurl, mode="w") as ofile:
data = ujson.dumps(chunks.translate())
ofile.write(data)

print(f'Successfully processed {url}')
generate_kerchunked_nc(bucket, key, filekey, dest_bucket, dest_prefix)


def generate_kerchunked_nos_roms_model_run(region: str, bucket: str, key: str):
Expand Down
121 changes: 121 additions & 0 deletions ingest_tools/ingest_tools/rtofs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import re
import datetime
from typing import Tuple

import fsspec
import ujson
from kerchunk.combine import MultiZarrToZarr

from .generic import generate_kerchunked_nc


def generate_rtofs_output_key(key: str) -> str:
'''
Generate the output file key for a given input key and destination bucket and prefix:
'rtofs.20230922/rtofs_glo_2ds_f001_diag.nc'
The following output key will be generated: rtofs.20230922.rtofs_glo_2ds_f001_diag.nc.zarr'
'''
components = key.split('/')
model_date = components[-2]
filename = components[-1]
return f'{model_date}.{filename}.zarr'


def generate_kerchunked_rtofs_nc(region: str, bucket: str, key: str, dest_bucket: str, dest_prefix: str):
'''
Generate a kerchunked zarr file from a netcdf file in s3
'''
filekey = generate_rtofs_output_key(key)
generate_kerchunked_nc(bucket, key, filekey, dest_bucket, dest_prefix)


def generate_rtofs_best_time_series_glob_expression(key: str) -> str:
'''
Parse the glob prefix and postfix given the zarr single file key:
'rtofs/rtofs.20230922.rtofs_glo_2ds_f001_diag.nc.zarr'
The following expression will be created: rtofs/rtofs.*.rtofs_glo_2ds_f*_diag.nc.zarr'
'''
prefix, inner, postfix = re.search(r'(.*).\d{8}.(.*)_f\d{3}_(.*)', key).groups()
return f'{prefix}.*.{inner}_f*_{postfix}'


def parse_rtofs_model_run_datestamp_offset(key: str) -> Tuple[str, int]:
'''
Parse the model run forecast time key from the key of the file in the RTOFS S3 bucket, given the RTOFS naming convention:
'rtofs/rtofs.20230922.rtofs_glo_2ds_f001_diag.nc.zarr'
where the model_date is 20230922 and the offset is 1, this would result in a key of 20230922T01
'''
model_date, offset = re.search(r'(\d{8}).*f(\d{3})', key).groups()
model_date = datetime.datetime.strptime(f'{model_date}T00', '%Y%m%dT%H') + datetime.timedelta(hours=int(offset))
model_date_key = model_date.strftime('%Y%m%dT%H')
return model_date_key, int(offset)


def generate_rtofs_best_timeseries_key(best_timeseries_glob: str) -> str:
'''
Create the best time series key for a given glob expression:
'rtofs/rtofs.*.rtofs_glo_2ds_f*_diag.nc.zarr'
The following key will be generated: rtofs/rtofs.rtofs_glo_2ds_diag.best.nc.zarr'
'''
return best_timeseries_glob.replace('.*', '').replace('_f*', '').replace('.nc.zarr', '.best.nc.zarr')


def generate_kerchunked_rtofs_best_time_series(region: str, bucket: str, key: str):
'''
Generate or update the best time series kerchunked aggregation for the model. If the specified file is not in the best time series,
then the best time series aggregation will not be updated
'''
print(f'Generating best time series multizarr aggregation for key: {key}')

try:
best_time_series_glob = generate_rtofs_best_time_series_glob_expression(key)
except Exception as e:
print(f'Failed to parse model run date and hour from key {key}: {e}. Skipping...')
return

# For now SSL false is solving my cert issues **shrug**
fs_read = fsspec.filesystem('s3', anon=True, skip_instance_cache=True, use_ssl=False)
fs_write = fsspec.filesystem('s3', anon=False, skip_instance_cache=True, use_ssl=False)

model_files = fs_read.glob(f's3://{bucket}/{best_time_series_glob}')
model_files = sorted(['s3://'+f for f in model_files])

indexes = {}

for f in model_files:
model_date_key, offset = parse_rtofs_model_run_datestamp_offset(f)
if model_date_key not in indexes:
indexes[model_date_key] = [offset, f]
else:
if offset < indexes[model_date_key][0]:
indexes[model_date_key] = [offset, f]

model_best_files = [x[1] for x in list(indexes.values())]

target_key = f's3://{bucket}/{key}'
if target_key not in model_best_files:
print(f'{key} is not a part of the current best time series for its model. Skipping...')
return

model_run_file_count = len(model_best_files)
print(f'Aggregating {model_run_file_count} model files for best time series aggregation...')

# TODO: Generalize this somehow?
mzz = MultiZarrToZarr(
model_best_files,
remote_protocol='s3',
remote_options={'anon': True, 'use_ssl': False},
concat_dims=['MT'],
identical_dims=['Y', 'X', 'Latitude', 'Longitude']
)

d = mzz.translate()

outkey = generate_rtofs_best_timeseries_key(best_time_series_glob)
outurl = f's3://{bucket}/{outkey}'

print(f'Writing zarr best time series aggregation to {outurl}')
with fs_write.open(outurl, 'w') as ofile:
ofile.write(ujson.dumps(d))

print(f'Successfully updated {outurl} RTOFS best time series aggregation')
39 changes: 39 additions & 0 deletions ingest_tools/tests/test_rtofs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from ingest_tools.rtofs import generate_rtofs_output_key, generate_rtofs_best_time_series_glob_expression, parse_rtofs_model_run_datestamp_offset, generate_rtofs_best_timeseries_key


def test_generate_rtofs_output_key():
key = 'rtofs.20230922/rtofs_glo_2ds_f001_diag.nc'
output_key = generate_rtofs_output_key(key)
assert output_key == 'rtofs.20230922.rtofs_glo_2ds_f001_diag.nc.zarr'


def test_generate_rtofs_best_time_series_glob_expression():
key = 'rtofs/rtofs.20230922.rtofs_glo_2ds_f001_diag.nc.zarr'
glob_expression = generate_rtofs_best_time_series_glob_expression(key)
assert glob_expression == 'rtofs/rtofs.*.rtofs_glo_2ds_f*_diag.nc.zarr'

key = 'rtofs/rtofs.20230925.rtofs_glo_3dz_f006_6hrly_hvr_US_east.nc.zarr'
glob_expression = generate_rtofs_best_time_series_glob_expression(key)
assert glob_expression == 'rtofs/rtofs.*.rtofs_glo_3dz_f*_6hrly_hvr_US_east.nc.zarr'


def test_parse_rtofs_model_run_datestamp_offset():
key = 'rtofs/rtofs.20230922.rtofs_glo_2ds_f001_diag.nc.zarr'
model_date, offset = parse_rtofs_model_run_datestamp_offset(key)
assert model_date == '20230922T01'
assert offset == 1

key = 'rtofs/rtofs.20230925.rtofs_glo_3dz_f006_6hrly_hvr_US_east.nc.zarr'
model_date, offset = parse_rtofs_model_run_datestamp_offset(key)
assert model_date == '20230925T06'
assert offset == 6


def test_generate_best_timeseries_key():
glob = 'rtofs/rtofs.*.rtofs_glo_2ds_f*_diag.nc.zarr'
best_timeseries_key = generate_rtofs_best_timeseries_key(glob)
assert best_timeseries_key == 'rtofs/rtofs.rtofs_glo_2ds_diag.best.nc.zarr'

glob = 'rtofs/rtofs.*.rtofs_glo_3dz_f*_6hrly_hvr_US_east.nc.zarr'
best_timeseries_key = generate_rtofs_best_timeseries_key(glob)
assert best_timeseries_key == 'rtofs/rtofs.rtofs_glo_3dz_6hrly_hvr_US_east.best.nc.zarr'

0 comments on commit 99c10cb

Please sign in to comment.