Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow ExternalTaskSensor to wait for taskgroup #14640

Closed
wants to merge 6 commits into from

Conversation

xinbinhuang
Copy link
Contributor

@xinbinhuang xinbinhuang commented Mar 6, 2021

closes: #14563

This PR enables ExternalTaskSensor to also wait for the external task_group.

The implementation is to retrieve the external DAG from the DagBag and then check if the TaskGroup exists. If so, query and wait for the states of all tasks within that TaskGroup during the poking cycle.


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@xinbinhuang xinbinhuang marked this pull request as draft March 6, 2021 09:29
@xinbinhuang xinbinhuang changed the title Allow ExternalSensor to wait for taskgroup Allow ExternalTaskSensor to wait for taskgroup Mar 6, 2021
airflow/sensors/external_task.py Outdated Show resolved Hide resolved
.scalar()
)
) / len(external_task_group_task_ids)
Copy link
Contributor Author

@xinbinhuang xinbinhuang Mar 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to keep the poking check behavior return count_allowed == len(dttm_filter)

@@ -134,20 +146,23 @@ def __init__(
self.execution_delta = execution_delta
self.execution_date_fn = execution_date_fn
self.external_dag_id = external_dag_id
self.external_task_group_id = external_task_group_id
self.external_task_id = external_task_id
self.check_existence = check_existence
Copy link
Contributor Author

@xinbinhuang xinbinhuang Mar 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.check_existence = check_existence is False by default, which maybe make sense for external_dag or external_task. But external_task_group has to check and get an existing dag in order to get the list of task_ids.

https://github.com/apache/airflow/blob/fce49402461ee4e7a5f6ffd18cee3121f3496a39/airflow/sensors/external_task.py#L174-L180

I wonder if we can change the default to True or even have check_existence enabled required? This can give more useful errors if the external task/dag does not exist as well as having a consistent behavior as external_task_group. Also, what would be use case to have a Sensor waiting for an object that doesn't exist until it times out?

@github-actions
Copy link

github-actions bot commented Mar 6, 2021

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

airflow/models/dag.py Outdated Show resolved Hide resolved
@xinbinhuang xinbinhuang marked this pull request as ready for review March 30, 2021 07:09
@@ -164,18 +184,23 @@ def poke(self, context, session=None):
if self.failed_states:
count_failed = self.get_count(dttm_filter, session, self.failed_states)

if count_failed == len(dttm_filter):
if count_failed > 0:
Copy link
Contributor Author

@xinbinhuang xinbinhuang Mar 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I am making the assumption that as long as there is at least one external task failure, then we will want to fail the sensor. Though this changes the original behavior, I think this will be a better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add this comment in code?

Copy link
Contributor Author

@xinbinhuang xinbinhuang Apr 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's necessary more than an entry in the UPDATING.md. I think the only situation where you will have multiple counts is when the execution_date_fn returns more than one execution date to wait for. However, the original behavior will get you into a weird state when only part of the TIs fail, i.e. one fail and one succeeds, resulting in time out. IMHO, I think this's more like a bug than intended behavior. WDYT?

Comment on lines +263 to +301
def get_external_task_group_task_ids(self, session):
"""Return task ids for the external TaskGroup"""
refreshed_dag_info = DagBag(read_dags_from_db=True).get_dag(self.external_dag_id, session)
task_group: Optional["TaskGroup"] = refreshed_dag_info.task_group_dict.get(
self.external_task_group_id
)
if not task_group:
raise AirflowException(
f"The external task group {self.external_task_group_id} in "
f"DAG {self.external_dag_id} does not exist."
)
task_ids = [task.task_id for task in task_group]
return task_ids

Copy link
Contributor Author

@xinbinhuang xinbinhuang Mar 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main piece that you retrieve a list of tasks for a TaskGroup. I believe that read_dags_from_db=True is safe to use here because serialized dag is enabled by default in 2.0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing task execution code is creating DagBag on its own instead of reading serialized dags from db. For example this line is creating a DagBag. I think we should do the same here. It's important for tasks to get the latest view of the dag during execution.

https://github.com/apache/airflow/blob/f1edc220d3f9cb050016d23246a682276bd09eee/airflow/sensors/external_task.py#L213

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@xinbinhuang xinbinhuang force-pushed the taskgroup-sensor branch 2 times, most recently from 4e72ea9 to fce4940 Compare April 3, 2021 16:19
@xinbinhuang xinbinhuang requested a review from kaxil April 3, 2021 16:25
@xinbinhuang
Copy link
Contributor Author

Test fails on K8S image build job, and I think it's not relevant to this PR

@potiuk
Copy link
Member

potiuk commented Apr 4, 2021

Just fixed the K8S problem in #15182 - can you please rebase.

airflow/models/dag.py Outdated Show resolved Hide resolved
@xinbinhuang xinbinhuang removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Sep 3, 2021
@xinbinhuang xinbinhuang reopened this Sep 3, 2021
xinbinhuang and others added 5 commits September 3, 2021 05:20
Co-authored-by: Kaxil Naik <[email protected]>
fixup! Test external task group sensor

fixup! fixup! Test external task group sensor

fixup! fixup! fixup! Test external task group sensor

fixup! fixup! fixup! fixup! Test external task group sensor
Co-authored-by: Tomek Urbaszek <[email protected]>
@kaxil kaxil modified the milestones: Airflow 2.2, Airflow 2.3 Sep 14, 2021
@kaxil
Copy link
Member

kaxil commented Sep 14, 2021

Can you fix the conflicts please @xinbinhuang

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Nov 28, 2021
@github-actions github-actions bot closed this Dec 9, 2021
@eladkal
Copy link
Contributor

eladkal commented Dec 19, 2021

@xinbinhuang will you have time to complete it?

@xinbinhuang
Copy link
Contributor Author

@xinbinhuang will you have time to complete it?

Thanks for the nudge! Will try to wrap it up before the holidays hit.

@eladkal
Copy link
Contributor

eladkal commented Dec 19, 2021

Thanks for the nudge! Will try to wrap it up before the holidays hit.

Great :) re-opening so it won't be missed

@eladkal eladkal reopened this Dec 19, 2021
@eladkal eladkal removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Dec 19, 2021
@github-actions
Copy link

github-actions bot commented Feb 3, 2022

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Feb 3, 2022
@github-actions github-actions bot closed this Feb 8, 2022
@russellpierce
Copy link
Contributor

Darn, it's a shame this didn't get through - it is exactly what I was looking for!

@potiuk
Copy link
Member

potiuk commented Jun 1, 2022

Feel free to open a PR and contribute it on your own. You willl just have to make sure to follow it up and implement it to the quality that we expect @russellpierce - Airflow is created by > 2000 contributors like you, so if you need something, implementing it yourself is the fastest way to get things done @russellpierce.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

TaskGroup Sensor
9 participants