-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add option to wait for completion on the EmrCreateJobFlowOperator #28827
Add option to wait for completion on the EmrCreateJobFlowOperator #28827
Conversation
waiter( | ||
get_state_callable=self._emr_hook.get_conn().describe_cluster, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Glad to see someone using this already to create new customer waiters! 🤩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this isn't the waiter setup I thought it was originally (thanks @ferruzzi for pointing that out!). You can find details on the new custom waiters here. Though I'm actually happy to merge this PR with the waiter you used, and then move all of the EMR waiters to the new waiter system in another PR rather than scope creeping this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I didn't realize you could actually implement custom waiters that way, hence my comment above: https://github.com/BasPH/airflow/blob/add-emrcreatejobflow-waitforcompletion/airflow/providers/amazon/aws/operators/emr.py#L595-L596.
I'll take a look.
@@ -169,6 +169,15 @@ def add_job_flow_steps( | |||
) | |||
return response["StepIds"] | |||
|
|||
def terminate_job_flow(self, job_flow_id: str) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We generally try to avoid functions in hooks which just wrap boto3 api. You can call the boto3 api directly from the operator
"""Terminate job flow.""" | ||
if self._job_flow_id: | ||
self.log.info("Terminating job flow %s", self._job_flow_id) | ||
self._emr_hook.terminate_job_flow(self._job_flow_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self._emr_hook.terminate_job_flow(self._job_flow_id) | |
self._emr_hook.get_conn().terminate_job_flows(JobFlowIds=[job_flow_id]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @BasPH, thoughts on this suggested change? Otherwise the PR looks good
There hasn't been much movement on this one. The remaining comments are minor and we can always circle back on them. I'm going to merge this as is. |
This PR adds the option to wait for completion with the EmrCreateJobFlowOperator. It includes:
waiter
to wait for WAITING or TERMINATED state.wait_for_completion=False
. This matches the current behavior and will start a job flow and immediately succeed the Airflow task.on_kill
method. Moved EMR hook creation & caching to acached_property
because the hook is called from multiple methods.None
for thewaiter.countdown
argument type to represent waiting for infinity (or until the Airflow task times out based onexecution_timeout
). Defaulted theEmrCreateJobFlowOperator.waiter_countdown
toNone
. Checked all other usages ofwaiter_countdown
and they all default to25 * 60
so no breaking changes.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.