diff --git a/cloud_aggregator/__main__.py b/cloud_aggregator/__main__.py index 13d103b..5c19a09 100644 --- a/cloud_aggregator/__main__.py +++ b/cloud_aggregator/__main__.py @@ -25,7 +25,9 @@ # Next we need the sns topic of the NODD service that we want to subscribe to # TODO: This should be a config value. nodd_nos_topic_arn = 'arn:aws:sns:us-east-1:123901341784:NewOFSObject' +nodd_rtofs_topic_arn = 'arn:aws:sns:us-east-1:709902155096:NewRTOFSObject' +# First nos ofs queue new_ofs_object_queue = MessageQueue( 'nos-new-ofs-object-queue', visibility_timeout=360, @@ -36,9 +38,21 @@ sns_arn=nodd_nos_topic_arn, ) +# next, rtofs queue +new_rtofs_object_queue = MessageQueue( + 'new-rtofs-object-queue', + visibility_timeout=360, +) + +new_rtofs_object_subscription = new_rtofs_object_queue.subscribe_to_sns( + subscription_name='new-rtofs-object-subscription', + sns_arn=nodd_rtofs_topic_arn, +) + +# Create the lambda to ingest NODD data into the bucket # TODO: Decrease memory ingest_lambda = LocalDockerLambda( - name="ingest-nos-to-zarr", + name="ingest-nos-to-zarr", repo="nextgen-dmac-ingest", path='./ingest', timeout=60, @@ -50,12 +64,20 @@ # to the new object queue ingest_lambda.add_cloudwatch_log_access() ingest_lambda.add_s3_access('ingest-s3-lambda-policy', bucket.bucket) + +# Subscribe to the necessary queues ingest_lambda.subscribe_to_sqs( subscription_name='nos-sqs-lambda-mapping', queue=new_ofs_object_queue.queue, batch_size=1, ) +ingest_lambda.subscribe_to_sqs( + subscription_name='rtofs-sqs-lambda-mapping', + queue=new_rtofs_object_queue.queue, + batch_size=1, +) + # Okay now for the aggregation. This part of the infra will create an sqs queue that receives bucket notifications # from the ingest bucket. The queue will then trigger a lambda function that will aggregate the data into a single # zarr store. The zarr store will then be uploaded to the ingestion bucket. @@ -71,7 +93,7 @@ bucket.subscribe_sns_to_bucket_notifications( subscription_name='ingest-bucket-notifications-subscription', sns_topic=ingest_bucket_notifications_topic, - filter_prefix='nos/', + filter_prefix=['nos/', 'rtofs/'], filter_suffix='.zarr', ) diff --git a/cloud_aggregator/aggregation/aggregation.py b/cloud_aggregator/aggregation/aggregation.py index 78bf571..388a0b2 100644 --- a/cloud_aggregator/aggregation/aggregation.py +++ b/cloud_aggregator/aggregation/aggregation.py @@ -1,9 +1,11 @@ from ingest_tools.nos_ofs import generate_kerchunked_nos_roms_model_run, generate_kerchunked_nos_roms_best_time_series +from ingest_tools.rtofs import generate_kerchunked_rtofs_best_time_series from ingest_tools.aws import parse_s3_sqs_payload from ingest_tools.filters import key_contains ROMS_FILTERS = ['cbofs', 'ciofs', 'dbofs', 'tbofs', 'wcofs'] +RTOFS_FILTERS = ['rtofs'] def handler(event, context): @@ -24,5 +26,7 @@ def handler(event, context): if key_contains(key, ROMS_FILTERS): generate_kerchunked_nos_roms_model_run(region, bucket, key) generate_kerchunked_nos_roms_best_time_series(region, bucket, key) + elif key_contains(key, RTOFS_FILTERS): + generate_kerchunked_rtofs_best_time_series(region, bucket, key) else: print(f'No aggregation available for key: {key}') diff --git a/cloud_aggregator/infra/public_s3_bucket.py b/cloud_aggregator/infra/public_s3_bucket.py index 768c3b1..8702843 100644 --- a/cloud_aggregator/infra/public_s3_bucket.py +++ b/cloud_aggregator/infra/public_s3_bucket.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import List, Optional import pulumi from pulumi_aws import iam, s3, sns @@ -101,7 +101,7 @@ def __init__( ) def subscribe_sns_to_bucket_notifications( - self, subscription_name: str, sns_topic: sns.Topic, filter_prefix: Optional[str] = None, filter_suffix: Optional[str] = None + self, subscription_name: str, sns_topic: sns.Topic, filter_prefix: List[str] = [], filter_suffix: Optional[str] = None ): """ Subscribes an SNS topic to bucket object notifications. @@ -152,9 +152,9 @@ def subscribe_sns_to_bucket_notifications( "s3:ObjectCreated:*", "s3:ObjectRemoved:*", ], - filter_prefix=filter_prefix, + filter_prefix=prefix, filter_suffix=filter_suffix, - ) + ) for prefix in filter_prefix ], opts=pulumi.ResourceOptions( parent=self, depends_on=[self.bucket, bucket_topic_policy] diff --git a/cloud_aggregator/ingest/ingest.py b/cloud_aggregator/ingest/ingest.py index 9573d19..9ccc142 100644 --- a/cloud_aggregator/ingest/ingest.py +++ b/cloud_aggregator/ingest/ingest.py @@ -1,13 +1,15 @@ from ingest_tools.nos_ofs import generate_kerchunked_nos_nc +from ingest_tools.rtofs import generate_kerchunked_rtofs_nc from ingest_tools.aws import parse_s3_sqs_payload from ingest_tools.filters import key_contains # TODO: Make these configurable DESTINATION_BUCKET_NAME='nextgen-dmac-cloud-ingest' -DESTINATION_PREFIX='nos' -ROMS_FILTERS=['cbofs', 'ciofs', 'dbofs', 'tbofs', 'wcofs'] - +NOS_DESTINATION_PREFIX='nos' +RTOFS_DESTINATION_PREFIX='rtofs' +NOS_ROMS_FILTERS= ['cbofs', 'ciofs', 'dbofs', 'tbofs', 'wcofs'] +RTOFS_FILTERS = ['rtofs'] def handler(event, context): ''' @@ -22,7 +24,14 @@ def handler(event, context): region, bucket, key = parse_s3_sqs_payload(payload) - if key_contains(key, ROMS_FILTERS): - generate_kerchunked_nos_nc(region, bucket, key, DESTINATION_BUCKET_NAME, DESTINATION_PREFIX) + # For now, we only care about nc files + if not key.endswith('.nc'): + print(f'No ingest available for key: {key}') + return + + if key_contains(key, NOS_ROMS_FILTERS): + generate_kerchunked_nos_nc(region, bucket, key, DESTINATION_BUCKET_NAME, NOS_DESTINATION_PREFIX) + elif key_contains(key, RTOFS_FILTERS): + generate_kerchunked_rtofs_nc(region, bucket, key, DESTINATION_BUCKET_NAME, RTOFS_DESTINATION_PREFIX) else: print(f'No ingest available for key: {key}') \ No newline at end of file diff --git a/ingest_tools/ingest_tools/generic.py b/ingest_tools/ingest_tools/generic.py new file mode 100644 index 0000000..64fe604 --- /dev/null +++ b/ingest_tools/ingest_tools/generic.py @@ -0,0 +1,34 @@ +import fsspec +import ujson +from kerchunk.hdf import SingleHdf5ToZarr + + +def generate_kerchunked_nc(bucket: str, key: str, dest_key: str, dest_bucket: str, dest_prefix: str): + ''' + Generate a kerchunked zarr file from a netcdf file in s3 + ''' + if not key.endswith('.nc'): + print(f'File {key} does not have a netcdf file postfix. Skipping...') + return + + # For now SSL false is solving my cert issues **shrug** + fs_read = fsspec.filesystem('s3', anon=True, skip_instance_cache=True, use_ssl=False) + fs_write = fsspec.filesystem('s3', anon=False, skip_instance_cache=True, use_ssl=False) + + url = f"s3://{bucket}/{key}" + outurl = f"s3://{dest_bucket}/{dest_prefix}/{dest_key}" + + with fs_read.open(url) as ifile: + print(f"Kerchunking netcdf at {url}") + try: + chunks = SingleHdf5ToZarr(ifile, url) + except Exception as e: + print(f'Failed to kerchunk {url}: {e}') + return + + print(f"Writing kerchunked json to {outurl}") + with fs_write.open(outurl, mode="w") as ofile: + data = ujson.dumps(chunks.translate()) + ofile.write(data) + + print(f'Successfully processed {url}') \ No newline at end of file diff --git a/ingest_tools/ingest_tools/nos_ofs.py b/ingest_tools/ingest_tools/nos_ofs.py index f879fb6..f29fc63 100644 --- a/ingest_tools/ingest_tools/nos_ofs.py +++ b/ingest_tools/ingest_tools/nos_ofs.py @@ -5,7 +5,8 @@ import fsspec import ujson from kerchunk.combine import MultiZarrToZarr -from kerchunk.hdf import SingleHdf5ToZarr + +from .generic import generate_kerchunked_nc def parse_nos_model_run_datestamp(key: str) -> Tuple[str, str]: @@ -61,37 +62,12 @@ def generate_nos_output_key(key: str) -> str: model_name = parts[0].split('.')[0] return f'{model_name}/{parts[1]}.zarr' - def generate_kerchunked_nos_nc(region: str, bucket: str, key: str, dest_bucket: str, dest_prefix: str): ''' Generate a kerchunked zarr file from a netcdf file in s3 ''' - if not key.endswith('.nc'): - print(f'File {key} does not have a netcdf file postfix. Skipping...') - return - - # For now SSL false is solving my cert issues **shrug** - fs_read = fsspec.filesystem('s3', anon=True, skip_instance_cache=True, use_ssl=False) - fs_write = fsspec.filesystem('s3', anon=False, skip_instance_cache=True, use_ssl=False) - - url = f"s3://{bucket}/{key}" filekey = generate_nos_output_key(key) - outurl = f"s3://{dest_bucket}/{dest_prefix}/{filekey}" - - with fs_read.open(url) as ifile: - print(f"Kerchunking nos model at {url}") - try: - chunks = SingleHdf5ToZarr(ifile, url) - except Exception as e: - print(f'Failed to kerchunk {url}: {e}') - return - - print(f"Writing kerchunked nos model to {outurl}") - with fs_write.open(outurl, mode="w") as ofile: - data = ujson.dumps(chunks.translate()) - ofile.write(data) - - print(f'Successfully processed {url}') + generate_kerchunked_nc(bucket, key, filekey, dest_bucket, dest_prefix) def generate_kerchunked_nos_roms_model_run(region: str, bucket: str, key: str): diff --git a/ingest_tools/ingest_tools/rtofs.py b/ingest_tools/ingest_tools/rtofs.py new file mode 100644 index 0000000..a0045d2 --- /dev/null +++ b/ingest_tools/ingest_tools/rtofs.py @@ -0,0 +1,121 @@ +import re +import datetime +from typing import Tuple + +import fsspec +import ujson +from kerchunk.combine import MultiZarrToZarr + +from .generic import generate_kerchunked_nc + + +def generate_rtofs_output_key(key: str) -> str: + ''' + Generate the output file key for a given input key and destination bucket and prefix: + 'rtofs.20230922/rtofs_glo_2ds_f001_diag.nc' + The following output key will be generated: rtofs.20230922.rtofs_glo_2ds_f001_diag.nc.zarr' + ''' + components = key.split('/') + model_date = components[-2] + filename = components[-1] + return f'{model_date}.{filename}.zarr' + + +def generate_kerchunked_rtofs_nc(region: str, bucket: str, key: str, dest_bucket: str, dest_prefix: str): + ''' + Generate a kerchunked zarr file from a netcdf file in s3 + ''' + filekey = generate_rtofs_output_key(key) + generate_kerchunked_nc(bucket, key, filekey, dest_bucket, dest_prefix) + + +def generate_rtofs_best_time_series_glob_expression(key: str) -> str: + ''' + Parse the glob prefix and postfix given the zarr single file key: + 'rtofs/rtofs.20230922.rtofs_glo_2ds_f001_diag.nc.zarr' + The following expression will be created: rtofs/rtofs.*.rtofs_glo_2ds_f*_diag.nc.zarr' + ''' + prefix, inner, postfix = re.search(r'(.*).\d{8}.(.*)_f\d{3}_(.*)', key).groups() + return f'{prefix}.*.{inner}_f*_{postfix}' + + +def parse_rtofs_model_run_datestamp_offset(key: str) -> Tuple[str, int]: + ''' + Parse the model run forecast time key from the key of the file in the RTOFS S3 bucket, given the RTOFS naming convention: + 'rtofs/rtofs.20230922.rtofs_glo_2ds_f001_diag.nc.zarr' + where the model_date is 20230922 and the offset is 1, this would result in a key of 20230922T01 + ''' + model_date, offset = re.search(r'(\d{8}).*f(\d{3})', key).groups() + model_date = datetime.datetime.strptime(f'{model_date}T00', '%Y%m%dT%H') + datetime.timedelta(hours=int(offset)) + model_date_key = model_date.strftime('%Y%m%dT%H') + return model_date_key, int(offset) + + +def generate_rtofs_best_timeseries_key(best_timeseries_glob: str) -> str: + ''' + Create the best time series key for a given glob expression: + 'rtofs/rtofs.*.rtofs_glo_2ds_f*_diag.nc.zarr' + The following key will be generated: rtofs/rtofs.rtofs_glo_2ds_diag.best.nc.zarr' + ''' + return best_timeseries_glob.replace('.*', '').replace('_f*', '').replace('.nc.zarr', '.best.nc.zarr') + + +def generate_kerchunked_rtofs_best_time_series(region: str, bucket: str, key: str): + ''' + Generate or update the best time series kerchunked aggregation for the model. If the specified file is not in the best time series, + then the best time series aggregation will not be updated + ''' + print(f'Generating best time series multizarr aggregation for key: {key}') + + try: + best_time_series_glob = generate_rtofs_best_time_series_glob_expression(key) + except Exception as e: + print(f'Failed to parse model run date and hour from key {key}: {e}. Skipping...') + return + + # For now SSL false is solving my cert issues **shrug** + fs_read = fsspec.filesystem('s3', anon=True, skip_instance_cache=True, use_ssl=False) + fs_write = fsspec.filesystem('s3', anon=False, skip_instance_cache=True, use_ssl=False) + + model_files = fs_read.glob(f's3://{bucket}/{best_time_series_glob}') + model_files = sorted(['s3://'+f for f in model_files]) + + indexes = {} + + for f in model_files: + model_date_key, offset = parse_rtofs_model_run_datestamp_offset(f) + if model_date_key not in indexes: + indexes[model_date_key] = [offset, f] + else: + if offset < indexes[model_date_key][0]: + indexes[model_date_key] = [offset, f] + + model_best_files = [x[1] for x in list(indexes.values())] + + target_key = f's3://{bucket}/{key}' + if target_key not in model_best_files: + print(f'{key} is not a part of the current best time series for its model. Skipping...') + return + + model_run_file_count = len(model_best_files) + print(f'Aggregating {model_run_file_count} model files for best time series aggregation...') + + # TODO: Generalize this somehow? + mzz = MultiZarrToZarr( + model_best_files, + remote_protocol='s3', + remote_options={'anon': True, 'use_ssl': False}, + concat_dims=['MT'], + identical_dims=['Y', 'X', 'Latitude', 'Longitude'] + ) + + d = mzz.translate() + + outkey = generate_rtofs_best_timeseries_key(best_time_series_glob) + outurl = f's3://{bucket}/{outkey}' + + print(f'Writing zarr best time series aggregation to {outurl}') + with fs_write.open(outurl, 'w') as ofile: + ofile.write(ujson.dumps(d)) + + print(f'Successfully updated {outurl} RTOFS best time series aggregation') \ No newline at end of file diff --git a/ingest_tools/tests/test_rtofs.py b/ingest_tools/tests/test_rtofs.py new file mode 100644 index 0000000..ac14703 --- /dev/null +++ b/ingest_tools/tests/test_rtofs.py @@ -0,0 +1,39 @@ +from ingest_tools.rtofs import generate_rtofs_output_key, generate_rtofs_best_time_series_glob_expression, parse_rtofs_model_run_datestamp_offset, generate_rtofs_best_timeseries_key + + +def test_generate_rtofs_output_key(): + key = 'rtofs.20230922/rtofs_glo_2ds_f001_diag.nc' + output_key = generate_rtofs_output_key(key) + assert output_key == 'rtofs.20230922.rtofs_glo_2ds_f001_diag.nc.zarr' + + +def test_generate_rtofs_best_time_series_glob_expression(): + key = 'rtofs/rtofs.20230922.rtofs_glo_2ds_f001_diag.nc.zarr' + glob_expression = generate_rtofs_best_time_series_glob_expression(key) + assert glob_expression == 'rtofs/rtofs.*.rtofs_glo_2ds_f*_diag.nc.zarr' + + key = 'rtofs/rtofs.20230925.rtofs_glo_3dz_f006_6hrly_hvr_US_east.nc.zarr' + glob_expression = generate_rtofs_best_time_series_glob_expression(key) + assert glob_expression == 'rtofs/rtofs.*.rtofs_glo_3dz_f*_6hrly_hvr_US_east.nc.zarr' + + +def test_parse_rtofs_model_run_datestamp_offset(): + key = 'rtofs/rtofs.20230922.rtofs_glo_2ds_f001_diag.nc.zarr' + model_date, offset = parse_rtofs_model_run_datestamp_offset(key) + assert model_date == '20230922T01' + assert offset == 1 + + key = 'rtofs/rtofs.20230925.rtofs_glo_3dz_f006_6hrly_hvr_US_east.nc.zarr' + model_date, offset = parse_rtofs_model_run_datestamp_offset(key) + assert model_date == '20230925T06' + assert offset == 6 + + +def test_generate_best_timeseries_key(): + glob = 'rtofs/rtofs.*.rtofs_glo_2ds_f*_diag.nc.zarr' + best_timeseries_key = generate_rtofs_best_timeseries_key(glob) + assert best_timeseries_key == 'rtofs/rtofs.rtofs_glo_2ds_diag.best.nc.zarr' + + glob = 'rtofs/rtofs.*.rtofs_glo_3dz_f*_6hrly_hvr_US_east.nc.zarr' + best_timeseries_key = generate_rtofs_best_timeseries_key(glob) + assert best_timeseries_key == 'rtofs/rtofs.rtofs_glo_3dz_6hrly_hvr_US_east.best.nc.zarr' \ No newline at end of file