-
Notifications
You must be signed in to change notification settings - Fork 415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AWS Data Clients #3000
base: main
Are you sure you want to change the base?
AWS Data Clients #3000
Changes from all commits
bd44eac
7a1329b
d98faec
3d3ec2e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
# Copyright (c) 2023 MetPy Developers. | ||
# Distributed under the terms of the BSD 3-Clause License. | ||
# SPDX-License-Identifier: BSD-3-Clause | ||
""" | ||
================== | ||
Remote Data Access | ||
================== | ||
|
||
Use MetPy to access data hosted in known AWS S3 buckets | ||
""" | ||
from datetime import datetime, timedelta | ||
|
||
from metpy.remote import NEXRADLevel2Archive, NEXRADLevel3Archive, GOES16Archive, GOES18Archive | ||
|
||
################### | ||
# NEXRAD Level 2 | ||
|
||
# Get the nearest product to a time | ||
prod = NEXRADLevel2Archive().get_product('KTLX', datetime(2013, 5, 22, 21, 53)) | ||
|
||
# Open using MetPy's Level2File class | ||
l2 = prod.parse() | ||
|
||
################### | ||
# NEXRAD Level 3 | ||
start = datetime(2022, 10, 30, 15) | ||
end = start + timedelta(hours=2) | ||
products = NEXRADLevel3Archive().get_range('FTG', 'N0B', start, end) | ||
|
||
# Get all the file names--could also get a file-like object or open with MetPy Level3File | ||
print([prod.name for prod in products]) | ||
|
||
################ | ||
#GOES Archives | ||
prod = GOES16Archive().get_product('ABI-L1b-RadC', datetime.utcnow(), channel=2) | ||
|
||
# Retrieve using xarray + netcdf-c's S3 support | ||
nc = prod.parse() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
# Copyright (c) 2015,2016,2018,2021 MetPy Developers. | ||
# Distributed under the terms of the BSD 3-Clause License. | ||
# SPDX-License-Identifier: BSD-3-Clause | ||
"""Tools for reading various file formats. | ||
|
||
Classes supporting formats are written to take both file names (for local files) or file-like | ||
objects; this allows reading files that are already in memory | ||
(using :class:`python:io.StringIO`) or remote files | ||
(using :func:`~python:urllib.request.urlopen`). | ||
|
||
``station_info`` is an instance of `StationLookup` to find information about station locations | ||
(e.g. latitude, longitude, altitude) from various sources. | ||
""" | ||
|
||
from .aws import * # noqa: F403 | ||
from ..package_tools import set_module | ||
|
||
__all__ = aws.__all__[:] # pylint: disable=undefined-variable | ||
|
||
set_module(globals()) | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,232 @@ | ||
# Copyright (c) 2023 MetPy Developers. | ||
# Distributed under the terms of the BSD 3-Clause License. | ||
# SPDX-License-Identifier: BSD-3-Clause | ||
"""Tools for reading known collections of data that are hosted on Amazon Web Services (AWS). | ||
|
||
""" | ||
import bisect | ||
from datetime import datetime, timedelta | ||
import itertools | ||
from pathlib import Path | ||
import shutil | ||
|
||
import boto3 | ||
import botocore | ||
from botocore.client import Config | ||
import xarray as xr | ||
|
||
from ..io import Level2File, Level3File | ||
from ..package_tools import Exporter | ||
|
||
exporter = Exporter(globals()) | ||
|
||
|
||
class Product: | ||
def __init__(self, obj, reader): | ||
self.path = obj.key | ||
self._obj = obj | ||
self._reader = reader | ||
|
||
@property | ||
def url(self): | ||
return f'https://{self._obj.Bucket().name}.s3.amazonaws.com/{self.path}' | ||
|
||
@property | ||
def name(self): | ||
return Path(self.path).name | ||
|
||
@property | ||
def file(self): | ||
return self._obj.get()['Body'] | ||
|
||
def download(self, path=None): | ||
if path is None: | ||
path = Path() / self.name | ||
elif (path := Path(path)).is_dir(): | ||
path = path / self.name | ||
else: | ||
path = Path(path) | ||
|
||
with open(path, 'wb') as outfile: | ||
shutil.copyfileobj(self.file, outfile) | ||
|
||
def parse(self): | ||
return self._reader(self) | ||
|
||
|
||
def date_iterator(start, end, **step_kw): | ||
while start < end: | ||
yield start | ||
start = start + timedelta(**step_kw) | ||
|
||
|
||
class S3DataStore: | ||
s3 = boto3.resource('s3', config=Config(signature_version=botocore.UNSIGNED, | ||
user_agent_extra='Resource')) | ||
|
||
def __init__(self, bucket_name, delimiter): | ||
self.bucket_name = bucket_name | ||
self.delimiter = delimiter | ||
self._bucket = self.s3.Bucket(bucket_name) | ||
|
||
def common_prefixes(self, prefix, delim=None): | ||
delim = delim or self.delimiter | ||
try: | ||
return (p['Prefix'] for p in | ||
self._bucket.meta.client.list_objects_v2( | ||
Bucket=self.bucket_name, Prefix=prefix, | ||
Delimiter=delim)['CommonPrefixes']) | ||
except KeyError: | ||
return [] | ||
|
||
def objects(self, prefix): | ||
return self._bucket.objects.filter(Prefix=prefix) | ||
|
||
def _build_result(self, obj): | ||
return Product(obj, lambda s: None) | ||
|
||
|
||
@exporter.export | ||
class NEXRADLevel3Archive(S3DataStore): | ||
def __init__(self): | ||
super().__init__('unidata-nexrad-level3', '_') | ||
|
||
def sites(self): | ||
"""Return sites available.""" | ||
return (item.rstrip(self.delimiter) for item in self.common_prefixes('')) | ||
|
||
def product_ids(self, site='TLX'): | ||
"""Return product_ids available. | ||
|
||
Takes a site, defaults to TLX. | ||
""" | ||
return (item.split(self.delimiter)[-2] for item in | ||
self.common_prefixes(f'{site}{self.delimiter}')) | ||
|
||
def build_key(self, site, prod_id, dt, depth=None): | ||
parts = [site, prod_id, f'{dt:%Y}', f'{dt:%m}', f'{dt:%d}', f'{dt:%H}', f'{dt:%M}', | ||
f'{dt:%S}'] | ||
return self.delimiter.join(parts[slice(0, depth)]) | ||
Check failure Code scanning / CodeQL Unhashable object hashed
This [instance](1) of [slice](2) is unhashable.
|
||
|
||
def dt_from_key(self, key): | ||
return datetime.strptime(key.split(self.delimiter, maxsplit=2)[-1], | ||
'%Y_%m_%d_%H_%M_%S') | ||
|
||
def get_range(self, site, prod_id, start, end): | ||
for dt in date_iterator(start, end, days=1): | ||
for obj in self.objects(self.build_key(site, prod_id, dt, depth=5)): | ||
if start <= self.dt_from_key(obj.key) < end: | ||
yield self._build_result(obj) | ||
|
||
def get_product(self, site, prod_id, dt): | ||
search_key = self.build_key(site, prod_id, dt) | ||
bounding_keys = [self.build_key(site, prod_id, dt, 2) + self.delimiter] | ||
for depth in range(3, 8): | ||
prefixes = list(itertools.chain(*(self.common_prefixes(b) for b in bounding_keys))) | ||
loc = bisect.bisect_left(prefixes, search_key) | ||
rng = slice(loc - 1, loc + 1) if loc else slice(0, 1) | ||
bounding_keys = prefixes[rng] | ||
Check failure Code scanning / CodeQL Unhashable object hashed
This [instance](1) of [slice](2) is unhashable.
This [instance](3) of [slice](2) is unhashable.
|
||
|
||
min_obj = min(itertools.chain(*(self.objects(p) for p in bounding_keys)), | ||
key=lambda o: abs((self.dt_from_key(o.key) - dt).total_seconds())) | ||
|
||
return self._build_result(min_obj) | ||
|
||
def _build_result(self, obj): | ||
return Product(obj, lambda s: Level3File(s.file)) | ||
|
||
|
||
@exporter.export | ||
class NEXRADLevel2Archive(S3DataStore): | ||
def __init__(self): | ||
super().__init__('noaa-nexrad-level2', '/') | ||
|
||
def sites(self, dt=None): | ||
"""Return sites available for a date.""" | ||
if dt is None: | ||
dt = datetime.utcnow() | ||
prefix = self.build_key('', dt, depth=3) + self.delimiter | ||
return (item.split('/')[-2] for item in self.common_prefixes(prefix)) | ||
|
||
def build_key(self, site, dt, depth=None): | ||
parts = [f'{dt:%Y}', f'{dt:%m}', f'{dt:%d}', site, f'{site}{dt:%Y%m%d_%H%M%S}'] | ||
return self.delimiter.join(parts[slice(0, depth)]) | ||
Check failure Code scanning / CodeQL Unhashable object hashed
This [instance](1) of [slice](2) is unhashable.
|
||
|
||
def dt_from_key(self, key): | ||
return datetime.strptime(key.rsplit(self.delimiter, maxsplit=1)[-1][4:19], | ||
'%Y%m%d_%H%M%S') | ||
|
||
def get_range(self, site, start, end): | ||
for dt in date_iterator(start, end, days=1): | ||
for obj in self.objects(self.build_key(site, dt, depth=4)): | ||
try: | ||
if start <= self.dt_from_key(obj.key) < end: | ||
yield self._build_result(obj) | ||
except ValueError: | ||
continue | ||
|
||
def get_product(self, site, dt): | ||
search_key = self.build_key(site, dt) | ||
prefix = search_key.split('_')[0] | ||
min_obj = min(self.objects(prefix), | ||
key=lambda o: abs((self.dt_from_key(o.key) - dt).total_seconds())) | ||
|
||
return self._build_result(min_obj) | ||
|
||
def _build_result(self, obj): | ||
return Product(obj, lambda s: Level2File(s.file)) | ||
|
||
|
||
@exporter.export | ||
class GOESArchive(S3DataStore): | ||
def __init__(self, satellite): | ||
super().__init__(f'noaa-goes{satellite}', delimiter='/') | ||
|
||
def product_ids(self): | ||
"""Return product_ids available.""" | ||
return (item.rstrip(self.delimiter) for item in self.common_prefixes('')) | ||
|
||
def build_key(self, product, dt, depth=None): | ||
parts = [product, f'{dt:%Y}', f'{dt:%j}', f'{dt:%H}', f'OR_{product}'] | ||
return self.delimiter.join(parts[slice(0, depth)]) | ||
Check failure Code scanning / CodeQL Unhashable object hashed
This [instance](1) of [slice](2) is unhashable.
|
||
|
||
def _subprod_prefix(self, prefix, mode, channel): | ||
subprods = set(item.rstrip('_').rsplit('-', maxsplit=1)[-1] for item in | ||
self.common_prefixes(prefix + '-', '_')) | ||
if len(subprods) > 1: | ||
if modes := set(item[1] for item in subprods): | ||
if len(modes) == 1: | ||
mode = next(iter(modes)) | ||
if str(mode) in modes: | ||
prefix += f'-M{mode}' | ||
else: | ||
raise ValueError( | ||
f'Need to specify a valid operating mode. Available options are ' | ||
f'{", ".join(sorted(modes))}') | ||
if channels := set(item[-2:] for item in subprods): | ||
if len(channels) == 1: | ||
channel = next(iter(channels)) | ||
if str(channel) in channels: | ||
prefix += f'C{channel}' | ||
elif isinstance(channel, int) and f'{channel:02d}' in channels: | ||
prefix += f'C{channel:02d}' | ||
else: | ||
raise ValueError( | ||
f'Need to specify a valid channel. Available options are ' | ||
f'{", ".join(sorted(channels))}') | ||
return prefix | ||
|
||
def dt_from_key(self, key): | ||
start_time = key.split('_')[-3] | ||
return datetime.strptime(start_time[:-1], 's%Y%j%H%M%S') | ||
|
||
def get_product(self, product, dt, mode=None, channel=None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the kwarg should be "band" instead of "channel". To my knowledge the word "channel" was since the 90s more and more replaced with the synonymous word "band" e.g. AVHRR has "channels" but MODIS, VIIRS have "bands". Sometimes the word "channel" is still used today in official documents when talking more about the hardware side of the instruments. The GOES-R SERIES PRODUCT DEFINITION AND USERS’ GUIDE with 726 pages yields 25 search results for "channel" but several hundreds for "band". Oddly the GOES-ABI L2 filenames use C13, "C" for Channel though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Glad I'm not the only one scratching my head over when to say "band" or "channel" 😂 It does seem "band" is the preferred term in the GOES NetCDF files; some examples...
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, the "C" prefix for band/channel is the only inconsistency from the GOES side with regards to what we call it. I'm happy to just use "band", but I think that's why "channel" comes to my mind first. |
||
prefix = self.build_key(product, dt) | ||
prefix = self._subprod_prefix(prefix, mode, channel) | ||
min_obj = min(self.objects(prefix), | ||
key=lambda o: abs((self.dt_from_key(o.key) - dt).total_seconds())) | ||
|
||
return self._build_result(min_obj) | ||
|
||
def _build_result(self, obj): | ||
return Product(obj, lambda s: xr.open_dataset(s.url + '#mode=bytes', engine='netcdf4')) | ||
Check notice
Code scanning / CodeQL
'import *' may pollute namespace