Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update fix/clean up #3615

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 34 additions & 29 deletions src/workflows/airqo_etl_utils/airqo_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import simplejson
import urllib3
from urllib3.util.retry import Retry
from typing import List, Dict, Any
from typing import List, Dict, Any, Union, Generator, Tuple

from .config import configuration
from .constants import DeviceCategory, Tenant
Expand Down Expand Up @@ -172,7 +172,7 @@ def get_devices(
"previous_sites": List[Dict[str, Any]],
"cohorts": List,
"site": Dict[str, Any],
device_number
"device_number": int
},
]
"""
Expand All @@ -188,64 +188,69 @@ def get_devices(
response = self.__request("devices/summary", params)
devices = [
{
**device,
"device_id": device.pop("name"),
"device_codes": device.pop("device_codes", []),
"mongo_id": device.pop("_id"),
"site_id": device.get("site", {}).get("_id"),
"site_location": device.pop("site", {}).get("location_name", ""),
"site_id": device.get("site", {}).get("_id", None),
"site_location": device.pop("site", {}).get("location_name", None),
"device_category": str(
DeviceCategory.from_str(device.pop("category", ""))
DeviceCategory.from_str(device.pop("category", None))
),
"tenant": device.get("network"),
"device_manufacturer": Tenant.from_str(
device.pop("network")
).device_manufacturer(),
**device,
}
for device in response.get("devices", [])
]
return devices

def get_thingspeak_read_keys(self, devices: List) -> Dict[int, str]:
def get_thingspeak_read_keys(
self, devices: pd.DataFrame, return_type: str = "all"
) -> Union[Dict[int, str], Generator[Tuple[int, str], None, None]]:
"""
Retrieve read keys from the AirQo API given a list of devices.

Args:
- tenant (Tenant, optional): An Enum that represents site ownership. Defaults to `Tenant.ALL` if not supplied.
- device_category (DeviceCategory, optional): An Enum that represents device category. Defaults to `DeviceCategory.None` if not supplied.
devices (List): A list of devices, where each device contains a readKey and a device_number.
return_type (str): Defines the return behavior. If 'all', returns a dictionary of all keys.
If 'yield', yields each key one by one as a generator. Defaults to 'all'.

Returns:
Dict[int, str]: A dictionary containing device decrypted keys. The dictionary has the following structure.
{
"device_number":str,
}
Union[Dict[int, str], Generator[Tuple[int, str], None, None]]:
- A dictionary containing device decrypted keys when `return_type='all'`. The dictionary has the structure {device_number: decrypted_key}.
- A generator yielding (device_number, decrypted_key) when `return_type='yield'`.
"""

body: List = []
decrypted_keys: List[Dict[str, str]] = []
decrypted_read_keys: Dict[int, str] = {}

for device in devices:
read_key = device.get("readKey", None)
device_number = device.get("device_number", None)
if read_key and device_number:
for device_number, row in devices.iterrows():
if pd.notna(row["readKey"]) and pd.notna(device_number):
body.append(
{
"encrypted_key": read_key,
"device_number": device_number,
}
{"encrypted_key": row["readKey"], "device_number": device_number}
)

response = self.__request("devices/decrypt/bulk", body=body, method="post")

if response:
decrypted_keys = response.get("decrypted_keys", [])
return {
int(entry["device_number"]): entry["decrypted_key"]
for entry in decrypted_keys
}
# TODO Find a better way to do better handling vs returning an empty object.
return decrypted_read_keys

if return_type == "all":
return {
int(entry["device_number"]): entry["decrypted_key"]
for entry in decrypted_keys
}
elif return_type == "yield":
for entry in decrypted_keys:
device_number = int(entry["device_number"])
decrypted_key = entry["decrypted_key"]
yield device_number, decrypted_key

if return_type == "all":
return decrypted_read_keys
elif return_type == "yield":
return

def get_forecast(self, frequency: str, site_id: str) -> List:
"""
Expand Down
44 changes: 22 additions & 22 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,12 @@ def extract_devices_data(
Retrieves sensor data from Thingspeak API for devices belonging to the specified device category (BAM or low-cost sensors).
Optionally filters data by specific device numbers and removes outliers if requested.

Parameters:
- start_date_time (str): Start date and time (ISO 8601 format) for data extraction.
- end_date_time (str): End date and time (ISO 8601 format) for data extraction.
- device_category (DeviceCategory): Category of devices to extract data from (BAM or low-cost sensors).
- device_numbers (list, optional): List of device numbers whose data to extract. Defaults to None (all devices).
- remove_outliers (bool, optional): If True, removes outliers from the extracted data. Defaults to True.

Args:
start_date_time (str): Start date and time (ISO 8601 format) for data extraction.
end_date_time (str): End date and time (ISO 8601 format) for data extraction.
device_category (DeviceCategory): Category of devices to extract data from (BAM or low-cost sensors).
device_numbers (list, optional): List of device numbers whose data to extract. Defaults to None (all devices).
remove_outliers (bool, optional): If True, removes outliers from the extracted data. Defaults to True.
"""

airqo_api = AirQoApi()
Expand Down Expand Up @@ -368,19 +367,19 @@ def extract_devices_data(
]
data_columns = list(set(data_columns))

read_keys = airqo_api.get_thingspeak_read_keys(devices=devices)

devices_data = pd.DataFrame()
dates = Utils.query_dates_array(
start_date_time=start_date_time,
end_date_time=end_date_time,
data_source=DataSource.THINGSPEAK,
)

for device in devices:
device_number = device.get("device_number", None)
read_key = read_keys.get(device_number, None)
devices = pd.DataFrame(devices)
devices.set_index("device_number", inplace=True)

for device_number, read_key in airqo_api.get_thingspeak_read_keys(
devices[["readKey"]], return_type="yield"
):
if read_key is None or device_number is None:
logger.exception(f"{device_number} does not have a read key")
continue
Expand Down Expand Up @@ -411,12 +410,12 @@ def extract_devices_data(

meta_data = data.attrs.pop("meta_data", {})
data["device_number"] = device_number
data["device_id"] = device.get("device_id", None)
data["site_id"] = device.get("site_id", None)
data["device_id"] = devices.loc[device_number].device_id
data["site_id"] = devices.loc[device_number].site_id

if device_category in AirQoDataUtils.Device_Field_Mapping:
data["latitude"] = device.get("latitude", None)
data["longitude"] = device.get("longitude", None)
data["latitude"] = devices.loc[device_number].latitude
data["longitude"] = devices.loc[device_number].longitude
data.rename(
columns=AirQoDataUtils.Device_Field_Mapping[device_category],
inplace=True,
Expand Down Expand Up @@ -629,11 +628,13 @@ def process_data_for_api(data: pd.DataFrame, frequency: Frequency) -> list:
"""
Formats device measurements into a format required by the events endpoint.

:param data: device measurements
:param frequency: frequency of the measurements.
:return: a list of measurements
"""
Args:
data: device measurements
frequency: frequency of the measurements.

Return:
A list of measurements
"""
restructured_data = []

data["timestamp"] = pd.to_datetime(data["timestamp"])
Expand Down Expand Up @@ -667,7 +668,6 @@ def process_data_for_api(data: pd.DataFrame, frequency: Frequency) -> list:
"site_id": row["site_id"],
"device_number": device_number,
"tenant": str(Tenant.AIRQO),
"tenant": str(Tenant.AIRQO),
"location": {
"latitude": {"value": row["latitude"]},
"longitude": {"value": row["longitude"]},
Expand Down Expand Up @@ -810,7 +810,7 @@ def extract_devices_deployment_logs() -> pd.DataFrame:
try:
maintenance_logs = airqo_api.get_maintenance_logs(
tenant="airqo",
device=dict(device).get("name", None),
device=device.get("name", None),
activity_type="deployment",
)

Expand Down
8 changes: 5 additions & 3 deletions src/workflows/airqo_etl_utils/kcca_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import traceback

import numpy as np
import pandas as pd
import requests
Expand All @@ -12,6 +10,10 @@
from .date import date_to_str
from .utils import Utils

import logging

logger = logging.getLogger(__name__)


class KccaUtils:
@staticmethod
Expand Down Expand Up @@ -64,7 +66,7 @@ def add_site_and_device_details(devices, device_id):
}
)
except Exception as ex:
print(ex)
logger.exception(ex)
return pd.Series({"site_id": None, "device_number": None})

@staticmethod
Expand Down
Loading
Loading