Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6821] - Success callback not called when task marked as succ… #7447

Merged
merged 1 commit into from
Feb 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ https://developers.google.com/style/inclusive-documentation

-->

### Success Callback will be called when a task in marked as success from UI

When a task is marked as success by a used from Airflow UI - on_success_callback will be called

### Added `airflow dags test` CLI command

A new command was added to the CLI for executing one full run of a DAG for a given execution date, similar to
Expand Down
3 changes: 3 additions & 0 deletions airflow/jobs/local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,5 +156,8 @@ def heartbeat_callback(self, session=None):
if ti.state == State.FAILED and ti.task.on_failure_callback:
context = ti.get_template_context()
ti.task.on_failure_callback(context)
if ti.state == State.SUCCESS and ti.task.on_success_callback:
context = ti.get_template_context()
ti.task.on_success_callback(context)
self.task_runner.terminate()
self.terminating = True
54 changes: 54 additions & 0 deletions tests/jobs/test_local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,3 +344,57 @@ def check_failure(context):
self.assertTrue(data['called'])
process.join(timeout=10)
self.assertFalse(process.is_alive())

def test_mark_success_on_success_callback(self):
"""
Test that ensures that where a task is marked suceess in the UI
on_success_callback gets executed
"""
data = {'called': False}

def success_callback(context):
self.assertEqual(context['dag_run'].dag_id,
'test_mark_success')
data['called'] = True

dag = DAG(dag_id='test_mark_success',
start_date=DEFAULT_DATE,
default_args={'owner': 'owner1'})

task = DummyOperator(
task_id='test_state_succeeded1',
dag=dag,
on_success_callback=success_callback)

session = settings.Session()

dag.clear()
dag.create_dagrun(run_id="test",
state=State.RUNNING,
execution_date=DEFAULT_DATE,
start_date=DEFAULT_DATE,
session=session)
ti = TI(task=task, execution_date=DEFAULT_DATE)
ti.refresh_from_db()
job1 = LocalTaskJob(task_instance=ti,
ignore_ti_state=True,
executor=SequentialExecutor())
from airflow.task.task_runner.standard_task_runner import StandardTaskRunner
job1.task_runner = StandardTaskRunner(job1)
process = multiprocessing.Process(target=job1.run)
process.start()
ti.refresh_from_db()
for _ in range(0, 50):
if ti.state == State.RUNNING:
break
time.sleep(0.1)
ti.refresh_from_db()
self.assertEqual(State.RUNNING, ti.state)
ti.state = State.SUCCESS
session.merge(ti)
session.commit()

job1.heartbeat_callback(session=None)
self.assertTrue(data['called'])
process.join(timeout=10)
self.assertFalse(process.is_alive())