From bd44eac6e9e5d298711112f8de138a245d74028c Mon Sep 17 00:00:00 2001 From: Ryan May Date: Fri, 7 Apr 2023 17:03:44 -0600 Subject: [PATCH 1/4] ENH: Add clients for accessing AWS data --- src/metpy/remote/__init__.py | 20 +++ src/metpy/remote/aws.py | 245 +++++++++++++++++++++++++++++++++++ 2 files changed, 265 insertions(+) create mode 100644 src/metpy/remote/__init__.py create mode 100644 src/metpy/remote/aws.py diff --git a/src/metpy/remote/__init__.py b/src/metpy/remote/__init__.py new file mode 100644 index 00000000000..ffd41b234cd --- /dev/null +++ b/src/metpy/remote/__init__.py @@ -0,0 +1,20 @@ +# Copyright (c) 2015,2016,2018,2021 MetPy Developers. +# Distributed under the terms of the BSD 3-Clause License. +# SPDX-License-Identifier: BSD-3-Clause +"""Tools for reading various file formats. + +Classes supporting formats are written to take both file names (for local files) or file-like +objects; this allows reading files that are already in memory +(using :class:`python:io.StringIO`) or remote files +(using :func:`~python:urllib.request.urlopen`). + +``station_info`` is an instance of `StationLookup` to find information about station locations +(e.g. latitude, longitude, altitude) from various sources. +""" + +from .aws import * # noqa: F403 +from ..package_tools import set_module + +__all__ = aws.__all__[:] # pylint: disable=undefined-variable + +set_module(globals()) diff --git a/src/metpy/remote/aws.py b/src/metpy/remote/aws.py new file mode 100644 index 00000000000..793b11de1f9 --- /dev/null +++ b/src/metpy/remote/aws.py @@ -0,0 +1,245 @@ +# Copyright (c) 2023 MetPy Developers. +# Distributed under the terms of the BSD 3-Clause License. +# SPDX-License-Identifier: BSD-3-Clause +"""Tools for reading known collections of data that are hosted on Amazon Web Services (AWS). + +""" +import bisect +from datetime import datetime, timedelta +from functools import cached_property +import itertools +from pathlib import Path +import shutil + +import boto3 +import botocore +from botocore.client import Config +import xarray as xr + +from ..io import Level2File, Level3File +from ..package_tools import Exporter + +exporter = Exporter(globals()) + + +class Product: + def __init__(self, obj, reader): + self.path = obj.key + self._obj = obj + self._reader = reader + + @property + def url(self): + return f'https://{self._obj.Bucket().name}.s3.amazonaws.com/{self.path}' + + @property + def name(self): + return Path(self.path).name + + @cached_property + def file(self): + return self._obj.get()['Body'] + + def download(self, path=None): + if path is None: + path = Path() / self.name + elif (path := Path(path)).is_dir(): + path = path / self.name + else: + path = Path(path) + + with open(path, 'wb') as outfile: + shutil.copyfileobj(self.file, outfile) + + def parse(self): + return self._reader(self) + + +def date_iterator(start, end, **step_kw): + while start < end: + yield start + start = start + timedelta(**step_kw) + + +class S3DataStore: + s3 = boto3.resource('s3', config=Config(signature_version=botocore.UNSIGNED, + user_agent_extra='Resource')) + + def __init__(self, bucket_name, delimiter): + self.bucket_name = bucket_name + self.delimiter = delimiter + self._bucket = self.s3.Bucket(bucket_name) + + def common_prefixes(self, prefix, delim=None): + delim = delim or self.delimiter + try: + return (p['Prefix'] for p in + self._bucket.meta.client.list_objects_v2( + Bucket=self.bucket_name, Prefix=prefix, + Delimiter=delim)['CommonPrefixes']) + except KeyError: + return [] + + def objects(self, prefix): + return self._bucket.objects.filter(Prefix=prefix) + + def _build_result(self, obj): + return Product(obj, lambda s: None) + + +@exporter.export +class NEXRADLevel3Archive(S3DataStore): + def __init__(self): + super().__init__('unidata-nexrad-level3', '_') + + def sites(self): + """Return sites available.""" + return (item.rstrip(self.delimiter) for item in self.common_prefixes('')) + + def product_ids(self, site='TLX'): + """Return product_ids available. + + Takes a site, defaults to TLX. + """ + return (item.split(self.delimiter)[-2] for item in + self.common_prefixes(f'{site}{self.delimiter}')) + + def build_key(self, site, prod_id, dt, depth=None): + parts = [site, prod_id, f'{dt:%Y}', f'{dt:%m}', f'{dt:%d}', f'{dt:%H}', f'{dt:%M}', + f'{dt:%S}'] + return self.delimiter.join(parts[slice(0, depth)]) + + def dt_from_key(self, key): + return datetime.strptime(key.split(self.delimiter, maxsplit=2)[-1], + '%Y_%m_%d_%H_%M_%S') + + def get_range(self, site, prod_id, start, end): + for dt in date_iterator(start, end, days=1): + for obj in self.objects(self.build_key(site, prod_id, dt, depth=5)): + if start <= self.dt_from_key(obj.key) < end: + yield self._build_result(obj) + + def get_product(self, site, prod_id, dt): + search_key = self.build_key(site, prod_id, dt) + bounding_keys = [self.build_key(site, prod_id, dt, 2) + self.delimiter] + for depth in range(3, 8): + prefixes = list(itertools.chain(*(self.common_prefixes(b) for b in bounding_keys))) + loc = bisect.bisect_left(prefixes, search_key) + rng = slice(loc - 1, loc + 1) if loc else slice(0, 1) + bounding_keys = prefixes[rng] + + min_obj = min(itertools.chain(*(self.objects(p) for p in bounding_keys)), + key=lambda o: abs((self.dt_from_key(o.key) - dt).total_seconds())) + + return self._build_result(min_obj) + + def _build_result(self, obj): + return Product(obj, lambda s: Level3File(s.file)) + + +@exporter.export +class NEXRADLevel2Archive(S3DataStore): + def __init__(self): + super().__init__('noaa-nexrad-level2', '/') + + def sites(self, dt=None): + """Return sites available for a date.""" + if dt is None: + dt = datetime.utcnow() + prefix = self.build_key('', dt, depth=3) + self.delimiter + return (item.split('/')[-2] for item in self.common_prefixes(prefix)) + + def build_key(self, site, dt, depth=None): + parts = [f'{dt:%Y}', f'{dt:%m}', f'{dt:%d}', site, f'{site}{dt:%Y%m%d_%H%M%S}'] + return self.delimiter.join(parts[slice(0, depth)]) + + def dt_from_key(self, key): + return datetime.strptime(key.rsplit(self.delimiter, maxsplit=1)[-1][4:19], + '%Y%m%d_%H%M%S') + + def get_range(self, site, start, end): + for dt in date_iterator(start, end, days=1): + for obj in self.objects(self.build_key(site, dt, depth=4)): + try: + if start <= self.dt_from_key(obj.key) < end: + yield self._build_result(obj) + except ValueError: + continue + + def get_product(self, site, dt): + search_key = self.build_key(site, dt) + prefix = search_key.split('_')[0] + min_obj = min(self.objects(prefix), + key=lambda o: abs((self.dt_from_key(o.key) - dt).total_seconds())) + + return self._build_result(min_obj) + + def _build_result(self, obj): + return Product(obj, lambda s: Level2File(s.file)) + + +@exporter.export +class GOES16Archive(S3DataStore): + def __init__(self, bucket_name='noaa-goes16'): + super().__init__(bucket_name, delimiter='/') + + def product_ids(self): + """Return product_ids available.""" + return (item.rstrip(self.delimiter) for item in self.common_prefixes('')) + + def build_key(self, product, dt, depth=None): + parts = [product, f'{dt:%Y}', f'{dt:%j}', f'{dt:%H}', f'OR_{product}'] + return self.delimiter.join(parts[slice(0, depth)]) + + def _subprod_prefix(self, prefix, mode, channel): + subprods = set(item.rstrip('_').rsplit('-', maxsplit=1)[-1] for item in + self.common_prefixes(prefix + '-', '_')) + if len(subprods) > 1: + if modes := set(item[1] for item in subprods): + if len(modes) == 1: + mode = next(iter(modes)) + if str(mode) in modes: + prefix += f'-M{mode}' + else: + raise ValueError( + f'Need to specify a valid operating mode. Available options are ' + f'{", ".join(sorted(modes))}') + if channels := set(item[-2:] for item in subprods): + if len(channels) == 1: + channel = next(iter(channels)) + if str(channel) in channels: + prefix += f'C{channel}' + elif isinstance(channel, int) and f'{channel:02d}' in channels: + prefix += f'C{channel:02d}' + else: + raise ValueError( + f'Need to specify a valid channel. Available options are ' + f'{", ".join(sorted(channels))}') + return prefix + + def dt_from_key(self, key): + start_time = key.split('_')[-3] + return datetime.strptime(start_time[:-1], 's%Y%j%H%M%S') + + def get_product(self, product, dt, mode=None, channel=None): + prefix = self.build_key(product, dt) + prefix = self._subprod_prefix(prefix, mode, channel) + min_obj = min(self.objects(prefix), + key=lambda o: abs((self.dt_from_key(o.key) - dt).total_seconds())) + + return self._build_result(min_obj) + + def _build_result(self, obj): + return Product(obj, lambda s: xr.open_dataset(s.url + '#mode=bytes', engine='netcdf4')) + + +@exporter.export +class GOES17Archive(GOES16Archive): + def __init__(self): + super().__init__('noaa-goes17') + + +@exporter.export +class GOES18Archive(GOES16Archive): + def __init__(self): + super().__init__('noaa-goes18') From 7a1329b743fb4ae61b824202e6757861e8fe98e4 Mon Sep 17 00:00:00 2001 From: Ryan May Date: Fri, 7 Apr 2023 17:04:01 -0600 Subject: [PATCH 2/4] DOC: Add basic example showing some of the client usage --- examples/remote/basic.py | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 examples/remote/basic.py diff --git a/examples/remote/basic.py b/examples/remote/basic.py new file mode 100644 index 00000000000..2a68df21568 --- /dev/null +++ b/examples/remote/basic.py @@ -0,0 +1,38 @@ +# Copyright (c) 2023 MetPy Developers. +# Distributed under the terms of the BSD 3-Clause License. +# SPDX-License-Identifier: BSD-3-Clause +""" +================== +Remote Data Access +================== + +Use MetPy to access data hosted in known AWS S3 buckets +""" +from datetime import datetime, timedelta + +from metpy.remote import NEXRADLevel2Archive, NEXRADLevel3Archive, GOES16Archive, GOES18Archive + +################### +# NEXRAD Level 2 + +# Get the nearest product to a time +prod = NEXRADLevel2Archive().get_product('KTLX', datetime(2013, 5, 22, 21, 53)) + +# Open using MetPy's Level2File class +l2 = prod.parse() + +################### +# NEXRAD Level 3 +start = datetime(2022, 10, 30, 15) +end = start + timedelta(hours=2) +products = NEXRADLevel3Archive().get_range('FTG', 'N0B', start, end) + +# Get all the file names--could also get a file-like object or open with MetPy Level3File +print([prod.name for prod in products]) + +################ +#GOES Archives +prod = GOES16Archive().get_product('ABI-L1b-RadC', datetime.utcnow(), channel=2) + +# Retrieve using xarray + netcdf-c's S3 support +nc = prod.parse() From d98faec7c18dca2eae4be73bb4b0d3bd08ff3ec7 Mon Sep 17 00:00:00 2001 From: Ryan May Date: Thu, 27 Apr 2023 16:09:01 -0600 Subject: [PATCH 3/4] Don't cache the file object The file object will be left at the end of the file after reading, leading to other things that use it (like parse) trying to read from EOF. --- src/metpy/remote/aws.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/metpy/remote/aws.py b/src/metpy/remote/aws.py index 793b11de1f9..b00027de000 100644 --- a/src/metpy/remote/aws.py +++ b/src/metpy/remote/aws.py @@ -6,7 +6,6 @@ """ import bisect from datetime import datetime, timedelta -from functools import cached_property import itertools from pathlib import Path import shutil @@ -36,7 +35,7 @@ def url(self): def name(self): return Path(self.path).name - @cached_property + @property def file(self): return self._obj.get()['Body'] From 3d3ec2e1b17d093b4c91a6bdbde4037db2e39dc1 Mon Sep 17 00:00:00 2001 From: Ryan May Date: Thu, 27 Apr 2023 16:11:48 -0600 Subject: [PATCH 4/4] Just have a single client for the GOES archives --- src/metpy/remote/aws.py | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/src/metpy/remote/aws.py b/src/metpy/remote/aws.py index b00027de000..79df9319733 100644 --- a/src/metpy/remote/aws.py +++ b/src/metpy/remote/aws.py @@ -178,9 +178,9 @@ def _build_result(self, obj): @exporter.export -class GOES16Archive(S3DataStore): - def __init__(self, bucket_name='noaa-goes16'): - super().__init__(bucket_name, delimiter='/') +class GOESArchive(S3DataStore): + def __init__(self, satellite): + super().__init__(f'noaa-goes{satellite}', delimiter='/') def product_ids(self): """Return product_ids available.""" @@ -230,15 +230,3 @@ def get_product(self, product, dt, mode=None, channel=None): def _build_result(self, obj): return Product(obj, lambda s: xr.open_dataset(s.url + '#mode=bytes', engine='netcdf4')) - - -@exporter.export -class GOES17Archive(GOES16Archive): - def __init__(self): - super().__init__('noaa-goes17') - - -@exporter.export -class GOES18Archive(GOES16Archive): - def __init__(self): - super().__init__('noaa-goes18')