Skip to content

Commit

Permalink
[EWT-7] a93d550: (HEAD, twitter/1.10+twtr) [TWTR][[AIRFLOW-4939]] Add…
Browse files Browse the repository at this point in the history
… Default Retries and fix a small DAG refresh bug (apache#8)

* fb64f2e: [TWTR][AIRFLOW-XXX] Twitter Airflow Customizations + Fixup job scheduling without explicit_defaults_for_timestamp

* reformat

* 6607e48(airflow:master): [AIRFLOW-3160] Load latest_dagruns asynchronously, speed up front page load time apache#4005

* a93d550:

* a93d550: (HEAD, twitter/1.10+twtr) [TWTR][[AIRFLOW-4939]] Add Default Retries and fix a small DAG refresh bug (apache#3) (2 weeks ago)

* flake8 fix
  • Loading branch information
vshshjn7 authored and aoen committed Sep 11, 2019
1 parent 63bae92 commit 02293ad
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 4 deletions.
3 changes: 3 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ worker_precheck = False
# When discovering DAGs, ignore any files that don't contain the strings `DAG` and `airflow`.
dag_discovery_safe_mode = True

# The number of retries each task is going to have by default. Can be overridden at dag or task level.
default_task_retries = 0


[cli]
# In what way should the cli access the API. The LocalClient will use the
Expand Down
1 change: 1 addition & 0 deletions airflow/config_templates/default_test.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ killed_task_cleanup_time = 5
secure_mode = False
hostname_callable = socket:getfqdn
worker_precheck = False
default_task_retries = 0

[cli]
api_client = airflow.api.client.local_client
Expand Down
6 changes: 4 additions & 2 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def __init__(
email=None, # type: Optional[str]
email_on_retry=True, # type: bool
email_on_failure=True, # type: bool
retries=0, # type: int
retries=None, # type: int
retry_delay=timedelta(seconds=300), # type: timedelta
retry_exponential_backoff=False, # type: bool
max_retry_delay=None, # type: Optional[datetime]
Expand Down Expand Up @@ -348,14 +348,16 @@ def __init__(
self
)
self._schedule_interval = schedule_interval
self.retries = retries
self.retries = retries if retries is not None else \
configuration.conf.getint('core', 'default_task_retries', fallback=0)
self.queue = queue
self.pool = pool
self.sla = sla
self.execution_timeout = execution_timeout
self.on_failure_callback = on_failure_callback
self.on_success_callback = on_success_callback
self.on_retry_callback = on_retry_callback

if isinstance(retry_delay, timedelta):
self.retry_delay = retry_delay
else:
Expand Down
4 changes: 3 additions & 1 deletion airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,10 @@ def get_dag(self, dag_id):
)
):
# Reprocess source file
# TODO: remove the below hack to find relative dag location in webserver
filepath = dag.fileloc if dag else orm_dag.fileloc
found_dags = self.process_file(
filepath=correct_maybe_zipped(orm_dag.fileloc), only_if_updated=False)
filepath=correct_maybe_zipped(filepath), only_if_updated=False)

# If the source file no longer exports `dag_id`, delete it from self.dags
if found_dags and dag_id in [found_dag.dag_id for found_dag in found_dags]:
Expand Down
34 changes: 33 additions & 1 deletion tests/operators/test_bash_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import os
import unittest
from datetime import datetime, timedelta
from tempfile import NamedTemporaryFile
from tests.compat import mock

from airflow import DAG
from airflow import DAG, configuration
from airflow.operators.bash_operator import BashOperator
from airflow.utils import timezone
from airflow.utils.state import State
Expand Down Expand Up @@ -88,3 +90,33 @@ def test_echo_env_variables(self):
self.assertIn('manual__' + DEFAULT_DATE.isoformat(), output)

os.environ['AIRFLOW_HOME'] = original_AIRFLOW_HOME

def test_return_value(self):
bash_operator = BashOperator(
bash_command='echo "stdout"',
task_id='test_return_value',
dag=None
)
return_value = bash_operator.execute(context={})

self.assertEqual(return_value, 'stdout')

def test_task_retries(self):
bash_operator = BashOperator(
bash_command='echo "stdout"',
task_id='test_task_retries',
retries=2,
dag=None
)

self.assertEqual(bash_operator.retries, 2)

@mock.patch.object(configuration.conf, 'getint', return_value=3)
def test_default_retries(self, mock_config):
bash_operator = BashOperator(
bash_command='echo "stdout"',
task_id='test_default_retries',
dag=None
)

self.assertEqual(bash_operator.retries, 3)

0 comments on commit 02293ad

Please sign in to comment.