-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update fix/pipeline task retries #3786
Update fix/pipeline task retries #3786
Conversation
Updates from airqo staging
📝 Walkthrough📝 WalkthroughWalkthroughThe pull request introduces several modifications across multiple files to enhance the data processing and validation logic within the AirQo workflows. Key changes include stricter validation in the Changes
Possibly related PRs
Suggested reviewers
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (5)
src/workflows/dags/airqo_kafka_workflows.py (1)
Line range hint
37-46
: Consider adding error logging to broker taskThe broker task has good retry configuration, but consider adding explicit error logging for failed publish attempts to aid in debugging.
def send_device_data_to_broker(devices: pd.DataFrame) -> None: from airqo_etl_utils.message_broker_utils import MessageBrokerUtils + from airflow.utils.log.logging_mixin import LoggingMixin + logger = LoggingMixin().log if not devices.empty: broker = MessageBrokerUtils() - broker.publish_to_topic( - data=devices, - topic=configuration.DEVICES_TOPIC, - column_key="device_name", - ) + try: + broker.publish_to_topic( + data=devices, + topic=configuration.DEVICES_TOPIC, + column_key="device_name", + ) + except Exception as e: + logger.error(f"Failed to publish devices to broker: {str(e)}") + raisesrc/workflows/airqo_etl_utils/date.py (1)
69-70
: Excellent improvement for task idempotency.Using
execution_date
from the DAG run instead of the current time is a significant improvement that aligns perfectly with the PR objectives. This change ensures that tasks will maintain consistent timing even when retried, preventing potential data gaps and improving the reliability of the pipeline.Consider adding a debug log statement here to track the calculated start_date_time for easier troubleshooting:
execution_date = kwargs["dag_run"].execution_date start_date_time = execution_date - timedelta(**delta_kwargs) + logger.debug("Calculated start_date_time: %s from execution_date: %s", start_date_time, execution_date)
src/workflows/airqo_etl_utils/data_validator.py (1)
173-179
: Consider documenting the device identifier convention.The standardization of device identifier handling across the codebase is a positive architectural change. To ensure long-term maintainability and prevent future inconsistencies, consider:
- Adding docstring comments explaining the device identifier convention
- Creating a central configuration for column mapping constants
- Adding validation checks for required device fields
Example docstring addition:
def transform_devices(devices: List[Dict[str, Any]], taskinstance) -> pd.DataFrame: """ Transforms and processes the devices DataFrame. Device Identifier Convention: - MongoDB's '_id' is mapped to 'device_id' for internal processing - Original 'device_id' is mapped to 'device_name' for compatibility - Location fields are prefixed with 'device_' to avoid conflicts Args: devices (List[Dict[str, Any]]): Raw device data from MongoDB taskinstance: Airflow task instance for XCom operations Returns: pd.DataFrame: Transformed device data with standardized identifiers """Also applies to: 295-298
src/workflows/dags/airqo_measurements.py (1)
287-287
: Remove unused import.The
timezone
import fromdatetime
is not used in the code.Apply this diff to remove the unused import:
-from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta🧰 Tools
🪛 Ruff
287-287:
datetime.timezone
imported but unusedRemove unused import:
datetime.timezone
(F401)
src/workflows/airqo_etl_utils/airqo_utils.py (1)
1061-1061
: Enhanced validation looks good, but warning message could be more descriptive.The additional validation for
device_id
improves data integrity. However, the warning message could be more informative by indicating all missing required fields.Consider updating the warning message to be more specific:
- logger.warning( - f"Skipping message with key: {key}, missing 'device_id'." - ) + missing_fields = [] + if not value.get("_id"): missing_fields.append("_id") + if not value.get("device_id"): missing_fields.append("device_id") + logger.warning( + f"Skipping message with key: {key}, missing required fields: {', '.join(missing_fields)}" + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (6)
- src/workflows/airqo_etl_utils/airqo_utils.py (1 hunks)
- src/workflows/airqo_etl_utils/data_validator.py (2 hunks)
- src/workflows/airqo_etl_utils/date.py (3 hunks)
- src/workflows/airqo_etl_utils/workflows_custom_utils.py (0 hunks)
- src/workflows/dags/airqo_kafka_workflows.py (2 hunks)
- src/workflows/dags/airqo_measurements.py (22 hunks)
💤 Files with no reviewable changes (1)
- src/workflows/airqo_etl_utils/workflows_custom_utils.py
🧰 Additional context used
🪛 Ruff
src/workflows/dags/airqo_measurements.py
287-287:
datetime.timezone
imported but unusedRemove unused import:
datetime.timezone
(F401)
🔇 Additional comments (10)
src/workflows/dags/airqo_kafka_workflows.py (4)
7-8
: Excellent improvements to the DAG configuration!The addition of type hints and centralized default arguments through
AirflowUtils
demonstrates thoughtful engineering practices. This promotes better code maintainability and consistency across DAGs.Also applies to: 17-17
23-24
: Well-structured retry configuration for extract_devices!The retry configuration with 3 attempts and 5-minute delays provides robust error handling while maintaining task idempotency.
49-51
: Excellent task dependency structure!The task flow is clear and well-organized, following a logical extract -> transform -> send pattern. The use of meaningful variable names makes the flow easy to understand.
29-35
: Verify DataValidationUtils integrationWhile the transformation logic is well-structured, let's ensure the DataValidationUtils.transform_devices method properly handles the device identifiers mentioned in the summary (transition from "device_id" to "_id").
src/workflows/airqo_etl_utils/date.py (1)
4-6
: LGTM: Proper logging setup following best practices.The logging configuration follows Python's recommended practices by using
__name__
for the logger name, which ensures proper logger hierarchy.src/workflows/airqo_etl_utils/data_validator.py (1)
173-173
: LGTM! Device identifier handling looks consistent.The changes standardize the device identifier mapping by correctly selecting and renaming the MongoDB
_id
field todevice_id
. This aligns with the broader changes in the codebase for consistent device identification.Let's verify the consistency of this change across the codebase:
Also applies to: 176-179
✅ Verification successful
Let me gather more context about the device identifier handling by checking the actual implementation in the data_validator.py file.
Device identifier transformation is correctly implemented
The code follows a consistent pattern for device identifier handling:
- For hourly measurements:
device_id
→device_name
and MongoDB_id
→device_id
- The field selection
["tenant", "_id", "site_id", "latitude", "longitude"]
is part of this standardized transformation- The renaming operations maintain data consistency across the pipeline
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for any remaining direct references to device._id that might need updating rg -g '!*.{json,md}' -l '_id.*device' # Search for potential inconsistencies in device_id column renaming rg -g '*.py' 'rename.*columns.*device_id'Length of output: 3349
Script:
#!/bin/bash # Check the implementation details in data_validator.py rg -A 5 -B 5 '_id.*device' src/workflows/airqo_etl_utils/data_validator.py # Also check for any device_id column usage rg -A 3 -B 3 'device_id' src/workflows/airqo_etl_utils/data_validator.pyLength of output: 2373
src/workflows/dags/airqo_measurements.py (3)
26-26
: LGTM! Consistent DAG configurations.The standardization of DAG configurations using
AirflowUtils.dag_default_configs()
across all DAGs improves maintainability and ensures consistent retry behavior.Also applies to: 133-133, 203-203, 281-281, 457-457, 521-521
299-303
: Excellent improvement in task idempotency!The switch from using
datetime.now()
toexecution_date
ensures consistent behavior during task retries and backfills. This change directly addresses the PR objective of enhancing idempotency by using the originally scheduled start_date_time.Also applies to: 343-347, 474-478, 538-542
33-33
: Well-structured retry configuration for external service interactions!The consistent retry pattern (3 retries with 5-minute delays) has been appropriately added to tasks that interact with external services. This improves reliability by handling transient failures gracefully.
Also applies to: 44-44, 64-64, 68-68, 78-78, 93-93, 138-138, 162-162, 176-176, 339-339, 367-367, 379-379, 400-400, 412-412, 421-421, 464-466, 498-501, 528-530
src/workflows/airqo_etl_utils/airqo_utils.py (1)
1061-1061
: Verify consistent validation across the codebase.Let's ensure that similar validation patterns are consistently applied across the codebase.
✅ Verification successful
Validation pattern is appropriate and consistent with the codebase
The validation check for
key
,_id
, anddevice_id
is well-implemented and aligns with the codebase's patterns. Our analysis shows:
- The validation occurs in a Kafka consumer context where message integrity is crucial
- The pattern is unique to this specific consumer's requirements, as other Kafka consumers in the codebase handle different types of messages with their own validation needs
- The validation is followed by appropriate error logging and graceful handling (skipping invalid messages)
- The
device_id
field is consistently used across the codebase for device identification🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for similar validation patterns and device_id usage # Search for similar validation patterns echo "Checking for similar validation patterns..." rg -A 2 'not.*get\("_id"\).*or.*not.*get\("device_id"\)' # Search for device_id usage in data processing echo "Checking device_id usage in data processing..." rg -A 5 'device_id.*=.*value\.get\("device_id"\)'Length of output: 645
Script:
#!/bin/bash # Let's broaden our search to understand the validation context better # Check for device_id validation patterns in different forms echo "Checking for device_id validation patterns..." rg -A 3 'value\.get\("device_id"\)' # Look for message validation patterns in general echo "Checking message validation patterns..." rg -A 3 'if not.*value\.get\(' # Search for similar message processing functions echo "Looking for similar message processing functions..." ast-grep --pattern 'def process_message($$$):'Length of output: 1226
Script:
#!/bin/bash # Let's look for the broader message processing context # Search for the class containing this message processing echo "Looking for the class context..." rg -B 10 -A 5 'if not key or not value\.get\("_id"\)' # Check for other Kafka consumer implementations echo "Checking for other Kafka consumers..." rg -A 5 'KafkaConsumer' # Look for similar message validation in other files echo "Checking for message validation in other files..." fd -e py -x rg -A 3 'value\.get\(' {}Length of output: 9259
logger.info("KWARGS:", kwargs) | ||
print("IS MANUAL RUN:", is_manual_run) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix logging implementation.
There are a couple of issues with the logging implementation:
- The
logger.info
call uses incorrect syntax. String concatenation with ":" won't work as expected. - There's an inconsistent mix of
logger.info
andprint
statements.
Apply this diff to fix the issues:
- logger.info("KWARGS:", kwargs)
- print("IS MANUAL RUN:", is_manual_run)
+ logger.info("KWARGS: %s", kwargs)
+ logger.info("IS MANUAL RUN: %s", is_manual_run)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
logger.info("KWARGS:", kwargs) | |
print("IS MANUAL RUN:", is_manual_run) | |
logger.info("KWARGS: %s", kwargs) | |
logger.info("IS MANUAL RUN: %s", is_manual_run) |
"_id": "device_id", | ||
"latitude": "device_latitude", | ||
"longitude": "device_longitude", | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider enhancing error handling for device identifier transformation.
The column renaming logic correctly maintains consistency with the standardized device identifier approach. However, the method could benefit from explicit error handling for cases where expected columns are missing.
Consider wrapping the rename operation in a try-except block:
- devices.rename(
- columns={
- "device_id": "device_name",
- "_id": "device_id",
- "latitude": "device_latitude",
- "longitude": "device_longitude",
- },
- inplace=True,
- )
+ try:
+ devices.rename(
+ columns={
+ "device_id": "device_name",
+ "_id": "device_id",
+ "latitude": "device_latitude",
+ "longitude": "device_longitude",
+ },
+ inplace=True,
+ )
+ except KeyError as e:
+ logger.error(f"Missing required column during device transformation: {e}")
+ raise ValueError(f"Device data is missing required column: {e}")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
"_id": "device_id", | |
"latitude": "device_latitude", | |
"longitude": "device_longitude", | |
}, | |
try: | |
devices.rename( | |
columns={ | |
"device_id": "device_name", | |
"_id": "device_id", | |
"latitude": "device_latitude", | |
"longitude": "device_longitude", | |
}, | |
inplace=True, | |
) | |
except KeyError as e: | |
logger.error(f"Missing required column during device transformation: {e}") | |
raise ValueError(f"Device data is missing required column: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @NicholasTurner23
Description
This PR makes tasks idempotent by making sure that they always run with the start_date_time that they had been scheduled for. This fixes/removes the use of datetime.datetime.now().
This will reduce data gaps due to running with wrong start_date_time when rerunning pipelines.
Related Issues
Summary by CodeRabbit
_id
anddevice_id
are present.transform_devices
task in the Airflow DAG for improved data processing.