Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add armycorp as data source #157

Draft
wants to merge 16 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ jobs:
- uses: actions/checkout@main
- uses: actions/setup-python@main
with:
python-version: '3.x'
python-version: "3.x"
- uses: actions/cache@main
with:
path: ${{ env.pythonLocation }}
key: build-${{ runner.os }}-${{ env.pythonLocation }}-${{ hashFiles('pyproject.toml', 'setup.*') }}
- run: pip wheel . --no-deps -w dist
- uses: pypa/gh-action-pypi-publish@master
- uses: pypa/gh-action-pypi-publish@release/v1
with:
user: __token__
password: ${{ secrets.PYPI_TOKEN }}
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ repos:
- id: "shellcheck"

- repo: "https://github.com/python-jsonschema/check-jsonschema"
rev: "0.28.5"
rev: "0.28.6"
hooks:
- id: "check-github-workflows"
- id: "check-readthedocs"
Expand All @@ -60,7 +60,7 @@ repos:

- repo: "https://github.com/charliermarsh/ruff-pre-commit"
# Ruff version.
rev: 'v0.4.9'
rev: 'v0.4.10'
hooks:
- id: "ruff"

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Searvey aims to provide the following functionality:
- U.S. Center for Operational Oceanographic Products and Services (CO-OPS)
- Flanders Marine Institute (VLIZ); Intergovernmental Oceanographic Commission (IOC)
- U.S. Geological Survey (USGS)
- Army Corp WL

## Installation

Expand Down
15 changes: 15 additions & 0 deletions docs/source/usace.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
USACE RiverGages
==============
The U.S. Army Corps of Engineers RiverGages <https://rivergages.mvr.usace.army.mil/>_
system provides water level data for rivers and waterways across the United States.
searvey uses the RiverGages REST API to access this data. Currently, water level
data is exposed in searvey.

The data from an individual station can be retrieved with:
.. autofunction:: searvey.usace.get_usace_station

You can fetch data from multiple stations and multiple different dates with:
.. autofunction:: searvey.usace.fetch_usace

Note: The verify=False parameter in the httpx.Client() is used here to bypass
SSL verification, which is the only way to access the USACE RiverGages API.
125 changes: 125 additions & 0 deletions examples/USACE_data.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Set up for Army Corps WL data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import logging\n",
"import geopandas as gpd\n",
"import matplotlib.pyplot as plt\n",
"import pandas as pd\n",
"import httpx\n",
"from searvey._usace_api import fetch_usace_station\n",
"\n",
"logging.basicConfig(\n",
" level=20,\n",
" style=\"{\",\n",
" format=\"{asctime:s}; {levelname:8s}; {threadName:23s}; {name:<25s} {lineno:5d}; {message:s}\",\n",
")\n",
"\n",
"logging.getLogger(\"urllib3\").setLevel(30)\n",
"logging.getLogger(\"parso\").setLevel(30)\n",
"\n",
"logger = logging.getLogger(__name__)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Fetch WL data from a single station"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\n",
"# Define start and end dates for data retrieval, you can use either datetime.date or string for the dates\n",
"import datetime\n",
"df = fetch_usace_station(\"01300\", datetime.date(2020, 4, 5), end_date=\"2020-04-10\",http_client=httpx.Client(verify=False))\n",
"\n",
"df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Fetch Army Corps Water Level Data from multiple station"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from searvey._usace_api import _fetch_usace\n",
"import httpx\n",
"import pandas as pd\n",
"\n",
"df = _fetch_usace(\n",
" station_ids=[\"01300\"],\n",
" start_dates=[\"2020-04-05\"],\n",
" end_dates=[\"2020-04-10\"],\n",
" rate_limit=None,\n",
" http_client=httpx.Client(verify=False),\n",
" multiprocessing_executor=None,\n",
" multithreading_executor=None\n",
")\n",
"df['01300']\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Graph the data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import hvplot.pandas\n",
"\n",
"df[\"01300\"].hvplot(title=\"Army Corps WL values\", xlabel=\"Index\", ylabel=\"Value\")\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
2 changes: 2 additions & 0 deletions searvey/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from searvey._coops_api import fetch_coops_station
from searvey._ioc_api import fetch_ioc_station
from searvey._usace_api import fetch_usace_station
from searvey.coops import get_coops_stations
from searvey.ioc import get_ioc_data
from searvey.ioc import get_ioc_stations
Expand All @@ -24,4 +25,5 @@
"get_usgs_stations",
"Provider",
"__version__",
"fetch_usace_station",
]
190 changes: 190 additions & 0 deletions searvey/_usace_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import logging
import xml.etree.ElementTree as ET
from collections import abc
from datetime import datetime
from typing import List
from typing import Union
from typing import Optional

import httpx
import multifutures
import pandas as pd

from ._common import _fetch_url
from ._common import _resolve_end_date
from ._common import _resolve_http_client
from ._common import _resolve_rate_limit
from ._common import _resolve_start_date
from .custom_types import DatetimeLike

logger = logging.getLogger(__name__)

BASE_URL = "https://rivergages.mvr.usace.army.mil/watercontrol/webservices/rest/webserviceWaterML.cfc?method=RGWML&meth=getValues&location={location}&site={site}&variable={variable}&beginDate={begin_date}&endDate={end_date}&authToken=RiverGages"


def _parse_xml_data(content: str, station_id: str) -> pd.DataFrame:
try:
namespace = {"wml": "http://www.cuahsi.org/waterML/1.0/"}
root = ET.fromstring(content)
values_element = root.find(".//wml:values", namespaces=namespace)

if values_element is None:
logger.warning(f"{station_id}: No 'values' element found in the XML.")
return pd.DataFrame()

data = []
for value_element in values_element.findall("wml:value", namespaces=namespace):
date_time = value_element.get("dateTime")
value = value_element.text
date_time_obj = datetime.strptime(date_time, "%Y-%m-%dT%H:%M:%S")
data.append({"time": date_time_obj, "value": float(value)})

df = pd.DataFrame(data)
df.set_index("time", inplace=True)
df.index = pd.to_datetime(df.index, utc=True)
df.attrs["station_id"] = f"USACE-{station_id}"
return df
except ET.ParseError:
logger.error(f"{station_id}: Failed to parse XML data.")
return pd.DataFrame()


def _generate_urls(
station_id: str,
start_date: pd.Timestamp,
end_date: pd.Timestamp,
) -> list[str]:
if end_date < start_date:
raise ValueError(f"'end_date' must be after 'start_date': {end_date} vs {start_date}")
if end_date == start_date:
return []

url = BASE_URL.format(
location=station_id,
site=station_id,
variable="HG",
begin_date=start_date.strftime("%Y-%m-%dT%H:%M"),
end_date=end_date.strftime("%Y-%m-%dT%H:%M"),
)
return [url]


def _retrieve_usace_data(
station_ids: abc.Collection[str],
start_dates: abc.Collection[pd.Timestamp],
end_dates: abc.Collection[pd.Timestamp],
rate_limit: multifutures.RateLimit,
http_client: httpx.Client,
executor: Optional[multifutures.ExecutorProtocol] = None,
) -> list[multifutures.FutureResult]:
kwargs = []
for station_id, start_date, end_date in zip(station_ids, start_dates, end_dates):
for url in _generate_urls(station_id=station_id, start_date=start_date, end_date=end_date):
logger.info("USACE-%s: Starting scraping: %s - %s", station_id, start_date, end_date)
if url:
kwargs.append(
dict(
station_id=station_id,
url=url,
client=http_client,
rate_limit=rate_limit,
),
)
with http_client:
logger.debug("Starting data retrieval")
results = multifutures.multithread(
func=_fetch_url, func_kwargs=kwargs, check=False, executor=executor
)
logger.debug("Finished data retrieval")
return results


def _fetch_usace(
station_ids: abc.Collection[str],
start_dates: Union[DatetimeLike, List[DatetimeLike]] = None,
end_dates: Union[DatetimeLike, List[DatetimeLike]] = None,
*,
rate_limit: Optional[multifutures.RateLimit] = None,
http_client: Optional[httpx.Client] = None,
multiprocessing_executor: Optional[multifutures.ExecutorProtocol] = None,
multithreading_executor: Optional[multifutures.ExecutorProtocol] = None,
) -> dict[str, pd.DataFrame]:
rate_limit = _resolve_rate_limit(rate_limit)
http_client = _resolve_http_client(http_client)

now = pd.Timestamp.now("utc")

start_dates = [start_dates] if not isinstance(start_dates, list) else start_dates
end_dates = [end_dates] if not isinstance(end_dates, list) else end_dates

# we get the first index because the output is (DatetimeIndex(['2020-04-05'], dtype='datetime64[ns]', freq=None)
start_dates = [_resolve_start_date(now, date)[0] for date in start_dates]
end_dates = [_resolve_end_date(now, date)[0] for date in end_dates]

usace_responses = _retrieve_usace_data(
station_ids=station_ids,
start_dates=start_dates,
end_dates=end_dates,
rate_limit=rate_limit,
http_client=http_client,
executor=multithreading_executor,
)

dataframes = {}
for response in usace_responses:
station_id = response.kwargs["station_id"]
if response.exception:
logger.error(f"USACE-{station_id}: Failed to retrieve data. Error: {response.exception}")
continue
df = _parse_xml_data(response.result, station_id)
if not df.empty:
dataframes[station_id] = df
else:
logger.warning(f"USACE-{station_id}: No data retrieved or parsed.")

return dataframes


def fetch_usace_station(
station_id: str,
start_date: Optional[DatetimeLike] = None,
end_date: Optional[DatetimeLike] = None,
*,
rate_limit: Optional[multifutures.RateLimit] = None,
http_client: Optional[httpx.Client] = None,
multiprocessing_executor: Optional[multifutures.ExecutorProtocol] = None,
multithreading_executor: Optional[multifutures.ExecutorProtocol] = None,
) -> pd.DataFrame:
"""
Make a query to the USACE API for river gauge data for ``station_id``
and return the results as a ``pandas.DataFrame``.

:param station_id: The station identifier.
:param start_date: The starting date of the query.
:param end_date: The finishing date of the query.
:param rate_limit: The rate limit for making requests to the USACE servers.
:param http_client: The ``httpx.Client``, this should have the parameter verify=False.
:param multiprocessing_executor
:param multithreading_executor
"""
logger.info("USACE-%s: Starting scraping: %s - %s", station_id, start_date, end_date)
try:
df = _fetch_usace(
station_ids=[station_id],
start_dates=start_date,
end_dates=end_date,
rate_limit=rate_limit,
http_client=http_client,
multiprocessing_executor=multiprocessing_executor,
multithreading_executor=multithreading_executor,
).get(station_id, pd.DataFrame())
except Exception as e:
logger.error(f"USACE-{station_id}: An error occurred while fetching data: {str(e)}")
df = pd.DataFrame()

if df.empty:
logger.warning(f"USACE-{station_id}: No data retrieved for the specified period.")
else:
logger.info("USACE-%s: Finished scraping: %s - %s", station_id, start_date, end_date)

return df
Loading
Loading