-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update/kafka implementations #3737
Update/kafka implementations #3737
Conversation
Updates from airqo staging
Caution Review failedThe pull request is closed. 📝 Walkthrough📝 WalkthroughWalkthroughThe changes in this pull request focus on the Changes
Possibly related PRs
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (1)
src/workflows/airqo_etl_utils/airqo_utils.py (1)
1060-1074
: Remove redundant continue statement.The
continue
statement on line 1073 is redundant as it's at the end of the loop. This doesn't affect functionality but can be removed for cleaner code.except KafkaException as e: logger.exception(f"Error while consuming message: {e}") - continue
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- src/workflows/airqo_etl_utils/airqo_utils.py (1 hunks)
🧰 Additional context used
🪛 Ruff
src/workflows/airqo_etl_utils/airqo_utils.py
1083-1083: Local variable
e
is assigned to but never usedRemove assignment to unused variable
e
(F841)
🔇 Additional comments (3)
src/workflows/airqo_etl_utils/airqo_utils.py (3)
1039-1046
: Well-documented method signature!The docstring clearly describes the purpose, parameters, and return value of the method, including the error case where an empty DataFrame is returned.
1049-1053
: Clean initialization with proper exception handling import!The code properly imports the specific KafkaException class and initializes the message broker and devices list.
1054-1059
: Appropriate Kafka consumer configuration!The consumer is configured to read all messages from the beginning, which is essential for maintaining a complete list of devices.
try: | ||
devices = pd.DataFrame(devices_list) | ||
# Will be removed in the future. Just here for initial tests. | ||
devices.drop( | ||
devices.columns[devices.columns.str.contains("^Unnamed")], | ||
axis=1, | ||
inplace=True, | ||
) | ||
except Exception as e: | ||
logger.exception("Failed to convert consumed messages to DataFrame") | ||
# Return empty DataFrame on failure | ||
devices = pd.DataFrame() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Address unused exception variable.
The exception variable 'e' is caught but not used in the error message. Either include it in the log message or use Exception
without capturing the variable.
- except Exception as e:
+ except Exception:
logger.exception("Failed to convert consumed messages to DataFrame")
# Return empty DataFrame on failure
devices = pd.DataFrame()
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
try: | |
devices = pd.DataFrame(devices_list) | |
# Will be removed in the future. Just here for initial tests. | |
devices.drop( | |
devices.columns[devices.columns.str.contains("^Unnamed")], | |
axis=1, | |
inplace=True, | |
) | |
except Exception as e: | |
logger.exception("Failed to convert consumed messages to DataFrame") | |
# Return empty DataFrame on failure | |
devices = pd.DataFrame() | |
try: | |
devices = pd.DataFrame(devices_list) | |
# Will be removed in the future. Just here for initial tests. | |
devices.drop( | |
devices.columns[devices.columns.str.contains("^Unnamed")], | |
axis=1, | |
inplace=True, | |
) | |
except Exception: | |
logger.exception("Failed to convert consumed messages to DataFrame") | |
# Return empty DataFrame on failure | |
devices = pd.DataFrame() |
🧰 Tools
🪛 Ruff
1083-1083: Local variable
e
is assigned to but never usedRemove assignment to unused variable
e
(F841)
Log error
Description
Improve error handling for the devices consumer.
Related Issues
Summary by CodeRabbit
New Features
get_devices
method.Bug Fixes
device_id
to ensure data integrity.