Skip to content

Commit

Permalink
[AIRFLOW-2359] Add set failed for DagRun and task in tree view
Browse files Browse the repository at this point in the history
Closes #3255 from
yrqls21/kevin_yang_add_set_failed
  • Loading branch information
KevinYang21 authored and saguziel committed Jun 28, 2018
1 parent b0061f1 commit 284dbdb
Show file tree
Hide file tree
Showing 11 changed files with 765 additions and 262 deletions.
113 changes: 99 additions & 14 deletions airflow/api/common/experimental/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
# specific language governing permissions and limitations
# under the License.

from sqlalchemy import or_

from airflow.jobs import BackfillJob
from airflow.models import DagRun, TaskInstance
from airflow.operators.subdag_operator import SubDagOperator
from airflow.settings import Session
from airflow.utils import timezone
from airflow.utils.db import provide_session
from airflow.utils.state import State

from sqlalchemy import or_


def _create_dagruns(dag, execution_dates, state, run_id_template):
"""
Expand Down Expand Up @@ -191,34 +192,118 @@ def set_state(task, execution_date, upstream=False, downstream=False,
return tis_altered


def set_dag_run_state(dag, execution_date, state=State.SUCCESS, commit=False):
def _set_dag_run_state(dag_id, execution_date, state, session=None):
"""
Helper method that set dag run state in the DB.
:param dag_id: dag_id of target dag run
:param execution_date: the execution date from which to start looking
:param state: target state
:param session: database session
"""
Set the state of a dag run and all task instances associated with the dag
run for a specific execution date.
DR = DagRun
dr = session.query(DR).filter(
DR.dag_id == dag_id,
DR.execution_date == execution_date
).one()
dr.state = state
dr.end_date = timezone.utcnow()
session.commit()


@provide_session
def set_dag_run_state_to_success(dag, execution_date, commit=False,
session=None):
"""
Set the dag run for a specific execution date and its task instances
to success.
:param dag: the DAG of which to alter state
:param execution_date: the execution date from which to start looking
:param state: the state to which the DAG need to be set
:param commit: commit DAG and tasks to be altered to the database
:return: list of tasks that have been created and updated
:param session: database session
:return: If commit is true, list of tasks that have been updated,
otherwise list of tasks that will be updated
:raises: AssertionError if dag or execution_date is invalid
"""
res = []

if not dag or not execution_date:
return res

# Mark all task instances in the dag run
# Mark the dag run to success.
if commit:
_set_dag_run_state(dag.dag_id, execution_date, State.SUCCESS, session)

# Mark all task instances of the dag run to success.
for task in dag.tasks:
task.dag = dag
new_state = set_state(task=task, execution_date=execution_date,
state=state, commit=commit)
state=State.SUCCESS, commit=commit)
res.extend(new_state)

# Mark the dag run
return res


@provide_session
def set_dag_run_state_to_failed(dag, execution_date, commit=False,
session=None):
"""
Set the dag run for a specific execution date and its running task instances
to failed.
:param dag: the DAG of which to alter state
:param execution_date: the execution date from which to start looking
:param commit: commit DAG and tasks to be altered to the database
:param session: database session
:return: If commit is true, list of tasks that have been updated,
otherwise list of tasks that will be updated
:raises: AssertionError if dag or execution_date is invalid
"""
res = []

if not dag or not execution_date:
return res

# Mark the dag run to failed.
if commit:
_set_dag_run_state(dag.dag_id, execution_date, State.FAILED, session)

# Mark only RUNNING task instances.
TI = TaskInstance
task_ids = [task.task_id for task in dag.tasks]
tis = session.query(TI).filter(
TI.dag_id == dag.dag_id,
TI.execution_date == execution_date,
TI.task_id.in_(task_ids)).filter(TI.state == State.RUNNING)
task_ids_of_running_tis = [ti.task_id for ti in tis]
for task in dag.tasks:
if task.task_id not in task_ids_of_running_tis:
continue
task.dag = dag
new_state = set_state(task=task, execution_date=execution_date,
state=State.FAILED, commit=commit)
res.extend(new_state)

return res


@provide_session
def set_dag_run_state_to_running(dag, execution_date, commit=False,
session=None):
"""
Set the dag run for a specific execution date to running.
:param dag: the DAG of which to alter state
:param execution_date: the execution date from which to start looking
:param commit: commit DAG and tasks to be altered to the database
:param session: database session
:return: If commit is true, list of tasks that have been updated,
otherwise list of tasks that will be updated
"""
res = []
if not dag or not execution_date:
return res

# Mark the dag run to running.
if commit:
drs = DagRun.find(dag.dag_id, execution_date=execution_date)
for dr in drs:
dr.dag = dag
dr.update_state()
_set_dag_run_state(dag.dag_id, execution_date, State.RUNNING, session)

# To keep the return type consistent with the other similar functions.
return res
3 changes: 2 additions & 1 deletion airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,8 @@ def _change_state_for_tis_without_dagrun(self,
models.TaskInstance.dag_id == subq.c.dag_id,
models.TaskInstance.task_id == subq.c.task_id,
models.TaskInstance.execution_date ==
subq.c.execution_date)) \
subq.c.execution_date,
models.TaskInstance.task_id == subq.c.task_id)) \
.update({models.TaskInstance.state: new_state},
synchronize_session=False)
session.commit()
Expand Down
4 changes: 2 additions & 2 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1678,9 +1678,9 @@ def signal_handler(signum, frame):
self.state = State.SKIPPED
except AirflowException as e:
self.refresh_from_db()
# for case when task is marked as success externally
# for case when task is marked as success/failed externally
# current behavior doesn't hit the success callback
if self.state == State.SUCCESS:
if self.state in {State.SUCCESS, State.FAILED}:
return
else:
self.handle_failure(e, test_mode, context)
Expand Down
43 changes: 43 additions & 0 deletions airflow/www/templates/airflow/dag.html
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,24 @@ <h4 class="modal-title" id="myModalLabel">
</button>
</span>
<hr/>
<button id="btn_failed" type="button" class="btn btn-primary">
Mark Failed
</button>
<span class="btn-group">
<button id="btn_failed_past"
type="button" class="btn" data-toggle="button">Past</button>
<button id="btn_failed_future"
type="button" class="btn" data-toggle="button">
Future
</button>
<button id="btn_failed_upstream"
type="button" class="btn" data-toggle="button">Upstream</button>
<button id="btn_failed_downstream"
type="button" class="btn" data-toggle="button">
Downstream
</button>
</span>
<hr/>
<button id="btn_success" type="button" class="btn btn-primary">
Mark Success
</button>
Expand Down Expand Up @@ -241,6 +259,9 @@ <h4 class="modal-title" id="dagModalLabel">
<button id="btn_dagrun_clear" type="button" class="btn btn-primary">
Clear
</button>
<button id="btn_dagrun_failed" type="button" class="btn btn-primary">
Mark Failed
</button>
<button id="btn_dagrun_success" type="button" class="btn btn-primary">
Mark Success
</button>
Expand Down Expand Up @@ -389,6 +410,20 @@ <h4 class="modal-title" id="dagModalLabel">
window.location = url;
});

$("#btn_failed").click(function(){
url = "{{ url_for('airflow.failed') }}" +
"?task_id=" + encodeURIComponent(task_id) +
"&dag_id=" + encodeURIComponent(dag_id) +
"&upstream=" + $('#btn_failed_upstream').hasClass('active') +
"&downstream=" + $('#btn_failed_downstream').hasClass('active') +
"&future=" + $('#btn_failed_future').hasClass('active') +
"&past=" + $('#btn_failed_past').hasClass('active') +
"&execution_date=" + encodeURIComponent(execution_date) +
"&origin=" + encodeURIComponent(window.location);

window.location = url;
});

$("#btn_success").click(function(){
url = "{{ url_for('airflow.success') }}" +
"?task_id=" + encodeURIComponent(task_id) +
Expand All @@ -403,6 +438,14 @@ <h4 class="modal-title" id="dagModalLabel">
window.location = url;
});

$('#btn_dagrun_failed').click(function(){
url = "{{ url_for('airflow.dagrun_failed') }}" +
"?dag_id=" + encodeURIComponent(dag_id) +
"&execution_date=" + encodeURIComponent(execution_date) +
"&origin=" + encodeURIComponent(window.location);
window.location = url;
});

$('#btn_dagrun_success').click(function(){
url = "{{ url_for('airflow.dagrun_success') }}" +
"?dag_id=" + encodeURIComponent(dag_id) +
Expand Down
Loading

0 comments on commit 284dbdb

Please sign in to comment.