Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update fix/reduce redundant device data called[wip] #3526

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions src/workflows/airqo_etl_utils/airnow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

import json

import logging

logger = logging.getLogger(__name__)


class AirnowDataUtils:
@staticmethod
Expand Down Expand Up @@ -74,26 +78,48 @@ def extract_bam_data(start_date_time: str, end_date_time: str) -> pd.DataFrame:

@staticmethod
def process_bam_data(data: pd.DataFrame) -> pd.DataFrame:
"""
Processes BAM data by matching it to device details and constructing a list of air quality measurements.

Args:
data (pd.DataFrame): A DataFrame containing raw BAM device data.

Returns:
pd.DataFrame: A DataFrame containing processed air quality data, with relevant device information and pollutant values.
"""
air_now_data = []

devices = AirQoApi().get_devices(tenant=Tenant.ALL)

# Precompute device mapping for faster lookup
device_mapping = {}
for device in devices:
for device_code in device["device_codes"]:
device_mapping[device_code] = device
Comment on lines +94 to +98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Excellent performance optimization with device mapping!

The introduction of a precomputed device mapping is a smart move. This change significantly improves the efficiency of device lookups, reducing the time complexity from O(n) to O(1) for each iteration of the main loop. Well done!

A small optimization to consider:

You could potentially further optimize memory usage by using a generator expression instead of a list comprehension when creating the devices list. This would be beneficial if the list of devices is large. Here's how you could modify line 93:

devices = AirQoApi().get_devices(tenant=Tenant.ALL, category=DeviceCategory.BAM)

This change would fetch only the BAM devices, reducing the amount of data processed and stored in memory.


for _, row in data.iterrows():
try:
device_id = row["FullAQSCode"]
device_details = list(
filter(lambda y: str(device_id) in y["device_codes"], devices)
)[0]
device_id = str(row["FullAQSCode"])

pollutant_value = dict({"pm2_5": None, "pm10": None, "no2": None})
# Lookup device details based on FullAQSCode
device_details = device_mapping.get(device_id)
if not device_details:
logger.exception(f"Device with ID {device_id} not found")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Robust error handling and logging improvements!

The addition of specific error logging for various scenarios (device not found, tenant mismatch, and general exceptions) greatly enhances the debugging capabilities of this method. The use of logger.exception is particularly appropriate as it includes the stack trace in the log, which will be invaluable for troubleshooting.

A suggestion to consider:

To make the logs even more informative, consider including more context in the log messages. For example, you could modify the tenant mismatch log as follows:

logger.exception(f"Tenant mismatch for device ID {device_id}. Expected: {device_details.get('tenant')}, Got: {row['tenant']}")

This additional information could help quickly identify the source of mismatches without needing to dig through the data.

Also applies to: 121-121, 148-148

continue

# Initialize pollutant values (note: pm10 and no2 are not always present)
pollutant_value = {"pm2_5": None, "pm10": None, "no2": None}

# Get the corresponding pollutant value for the current parameter
parameter_col_name = AirnowDataUtils.parameter_column_name(
row["Parameter"]
)

pollutant_value[parameter_col_name] = row["Value"]
if parameter_col_name in pollutant_value:
pollutant_value[parameter_col_name] = row["Value"]

if row["tenant"] != device_details.get("tenant"):
raise Exception("tenants dont match")
logger.exception(f"Tenant mismatch for device ID {device_id}")
continue

air_now_data.append(
{
Expand All @@ -119,8 +145,6 @@ def process_bam_data(data: pd.DataFrame) -> pd.DataFrame:
}
)
except Exception as ex:
print(ex)
traceback.print_exc()
logger.exception(f"Error processing row: {ex}")

air_now_data = pd.DataFrame(air_now_data)
return DataValidationUtils.remove_outliers(air_now_data)
return pd.DataFrame(air_now_data)
34 changes: 12 additions & 22 deletions src/workflows/airqo_etl_utils/airqo_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,49 +176,39 @@ def get_devices(
},
]
"""
params = {"tenant": str(Tenant.AIRQO)}
params = {"tenant": str(Tenant.AIRQO), "category": str(device_category)}
if configuration.ENVIRONMENT == "production":
# Query for active devices only when in production
params["active"] = "yes"

if tenant != Tenant.ALL:
params["network"] = str(tenant)

response = self.__request("devices", params)
# Note: There is an option of using <api/v2/devices> if more device details are required as shown in the doc string return payload.
response = self.__request("devices/summary", params)
devices = [
{
**device,
"device_number": device.get("device_number"),
"latitude": device.get("latitude")
or device.get("approximate_latitude"),
"longitude": device.get("longitude")
or device.get("approximate_longitude"),
"device_id": device.get("name"),
"device_codes": [str(code) for code in device.get("device_codes", [])],
"mongo_id": device.get("_id"),
"device_id": device.pop("name"),
"device_codes": device.pop("device_codes", []),
"mongo_id": device.pop("_id"),
"site_id": device.get("site", {}).get("_id"),
"site_location": device.get("site", {}).get("location_name"),
"site_location": device.pop("site", {}).get("location_name", ""),
"device_category": str(
DeviceCategory.from_str(device.get("category", ""))
DeviceCategory.from_str(device.pop("category", ""))
),
"tenant": device.get("network"),
"device_manufacturer": device.get("device_manufacturer")
or Tenant.from_str(device.get("network")).device_manufacturer(),
"device_manufacturer": Tenant.from_str(
device.pop("network")
).device_manufacturer(),
}
for device in response.get("devices", [])
]

if device_category != DeviceCategory.NONE:
devices = [
device
for device in devices
if device["device_category"] == str(device_category)
]
return devices

def get_thingspeak_read_keys(self, devices: List) -> Dict[int, str]:
"""
Retrieve read keys from thingspeak given a list of devices.
Retrieve read keys from the AirQo API given a list of devices.

Args:
- tenant (Tenant, optional): An Enum that represents site ownership. Defaults to `Tenant.ALL` if not supplied.
Expand Down
2 changes: 1 addition & 1 deletion src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def flatten_field_8(device_category: DeviceCategory, field_8: str = None):
case DeviceCategory.LOW_COST:
mappings = configuration.AIRQO_LOW_COST_CONFIG
case _:
raise ValueError("A valid device category must be provided")
logger.exception("A valid device category must be provided")

for key, value in mappings.items():
try:
Expand Down
37 changes: 18 additions & 19 deletions src/workflows/dags/dag_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@
Streams and calibrates measurements for AirQo low cost sensors on an hourly frequency.
#### Notes
Data sources:
API(devices/):
ThingSpeak(device measurements):
# TODO Soon to change to OpenWeatherApi
Tahmo(Weather data):
- API(devices/):
- ThingSpeak(device measurements):
- Tahmo(Weather data):
Data Destinations:
Bigquery(stage):averaged_data_stage.hourly_device_measurements
Bigquery(prod):averaged_data.hourly_device_measurements
Bigquery(stage):raw_data_stage.device_measurements
Bigquery(prod):raw_data.device_measurements
API(devices/events):
Kafka(hourly-measurements-topic):
- Bigquery(stage):averaged_data_stage.hourly_device_measurements
- Bigquery(prod):averaged_data.hourly_device_measurements
- Bigquery(stage):raw_data_stage.device_measurements
- Bigquery(prod):raw_data.device_measurements
- API(devices/events):
- Kafka(hourly-measurements-topic):
- <a href="https://airqo.africa/" target="_blank">AirQo</a>
"""

Expand All @@ -25,13 +24,13 @@
Re-calibrates measurements for AirQo sensors once a day if for any reason this did not happen.
#### Notes
Data sources:
Bigquery:raw_data.device_measurements
Bigquery:averaged_data.hourly_weather_data
- Bigquery:raw_data.device_measurements
- Bigquery:averaged_data.hourly_weather_data
Data Destinations:
Bigquery(stage):averaged_data_stage.hourly_device_measurements
Bigquery(prod):averaged_data.hourly_device_measurements
API(devices/events):
Kafka(hourly-measurements-topic):
- Bigquery(stage):averaged_data_stage.hourly_device_measurements
- Bigquery(prod):averaged_data.hourly_device_measurements
- API(devices/events):
- Kafka(hourly-measurements-topic):
- <a href="https://airqo.africa/" target="_blank">AirQo</a>
"""

Expand All @@ -41,9 +40,9 @@
Streams measurements for AirQo Gaseous low cost sensors on an hourly frequency.
#### Notes
Data sources:
API(devices/):
ThingSpeak:
- API(devices/):
- ThingSpeak:
Data Destinations:
Bigquery raw_data.gaseous_measurements
- Bigquery raw_data.gaseous_measurements
- <a href="https://airqo.africa/" target="_blank">AirQo</a>
"""
Loading