Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-1837] Respect task start_date when different from dag's #4010

Merged
merged 1 commit into from
Oct 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5254,6 +5254,8 @@ def verify_integrity(self, session=None):
for task in six.itervalues(dag.task_dict):
if task.adhoc:
continue
if task.start_date > self.execution_date and not self.is_backfill:
continue

if task.task_id not in task_ids:
Stats.incr(
Expand Down
33 changes: 17 additions & 16 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
from airflow.utils import timezone
from airflow.utils.timezone import datetime
from airflow.utils.state import State
from airflow.utils.dates import infer_time_unit, round_time, scale_time_units
from airflow.utils.dates import days_ago, infer_time_unit, round_time, scale_time_units
from lxml import html
from airflow.exceptions import AirflowException
from airflow.configuration import AirflowConfigException, run_command
Expand All @@ -80,6 +80,7 @@
DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
TEST_DAG_ID = 'unit_tests'
EXAMPLE_DAG_DEFAULT_DATE = days_ago(2)

try:
import cPickle as pickle
Expand Down Expand Up @@ -1805,21 +1806,21 @@ def setUp(self):

self.dagrun_python = self.dag_python.create_dagrun(
run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
execution_date=DEFAULT_DATE,
execution_date=EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING
)

self.sub_dag.create_dagrun(
run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
execution_date=DEFAULT_DATE,
execution_date=EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING
)

self.example_xcom.create_dagrun(
run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
execution_date=DEFAULT_DATE,
execution_date=EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING
)
Expand Down Expand Up @@ -1912,7 +1913,7 @@ def test_dag_views(self):
response = self.app.get(
'/admin/airflow/task?'
'task_id=runme_0&dag_id=example_bash_operator&'
'execution_date={}'.format(DEFAULT_DATE_DS))
'execution_date={}'.format(EXAMPLE_DAG_DEFAULT_DATE))
self.assertIn("Attributes", response.data.decode('utf-8'))
response = self.app.get(
'/admin/airflow/dag_stats')
Expand All @@ -1924,22 +1925,21 @@ def test_dag_views(self):
"/admin/airflow/success?task_id=print_the_context&"
"dag_id=example_python_operator&upstream=false&downstream=false&"
"future=false&past=false&execution_date={}&"
"origin=/admin".format(DEFAULT_DATE_DS))
"origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
self.assertIn("Wait a minute", response.data.decode('utf-8'))
response = self.app.get(url + "&confirmed=true")
response = self.app.get(
'/admin/airflow/clear?task_id=print_the_context&'
'dag_id=example_python_operator&future=true&past=false&'
'upstream=true&downstream=false&'
'execution_date={}&'
'origin=/admin'.format(DEFAULT_DATE_DS))
'origin=/admin'.format(EXAMPLE_DAG_DEFAULT_DATE))
self.assertIn("Wait a minute", response.data.decode('utf-8'))
url = (
"/admin/airflow/success?task_id=section-1&"
"dag_id=example_subdag_operator&upstream=true&downstream=true&"
"future=false&past=false&execution_date={}&"
"origin=/admin".format(DEFAULT_DATE_DS))
"origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
self.assertIn("Wait a minute", response.data.decode('utf-8'))
self.assertIn("section-1-task-1", response.data.decode('utf-8'))
Expand All @@ -1953,7 +1953,7 @@ def test_dag_views(self):
"dag_id=example_python_operator&future=false&past=false&"
"upstream=false&downstream=true&"
"execution_date={}&"
"origin=/admin".format(DEFAULT_DATE_DS))
"origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
self.assertIn("Wait a minute", response.data.decode('utf-8'))
response = self.app.get(url + "&confirmed=true")
Expand All @@ -1962,7 +1962,7 @@ def test_dag_views(self):
"dag_id=example_subdag_operator.section-1&future=false&past=false&"
"upstream=false&downstream=true&recursive=true&"
"execution_date={}&"
"origin=/admin".format(DEFAULT_DATE_DS))
"origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
self.assertIn("Wait a minute", response.data.decode('utf-8'))
self.assertIn("example_subdag_operator.end",
Expand All @@ -1989,7 +1989,7 @@ def test_dag_views(self):
"/admin/airflow/run?task_id=runme_0&"
"dag_id=example_bash_operator&ignore_all_deps=false&ignore_ti_state=true&"
"ignore_task_deps=true&execution_date={}&"
"origin=/admin".format(DEFAULT_DATE_DS))
"origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
response = self.app.get(
"/admin/airflow/refresh?dag_id=example_bash_operator")
Expand Down Expand Up @@ -2024,27 +2024,28 @@ def test_fetch_task_instance(self):
url = (
"/admin/airflow/object/task_instances?"
"dag_id=example_python_operator&"
"execution_date={}".format(DEFAULT_DATE_DS))
"execution_date={}".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
self.assertIn("print_the_context", response.data.decode('utf-8'))

def test_dag_view_task_with_python_operator_using_partial(self):
response = self.app.get(
'/admin/airflow/task?'
'task_id=test_dagrun_functool_partial&dag_id=test_task_view_type_check&'
'execution_date={}'.format(DEFAULT_DATE_DS))
'execution_date={}'.format(EXAMPLE_DAG_DEFAULT_DATE))
self.assertIn("A function with two args", response.data.decode('utf-8'))

def test_dag_view_task_with_python_operator_using_instance(self):
response = self.app.get(
'/admin/airflow/task?'
'task_id=test_dagrun_instance&dag_id=test_task_view_type_check&'
'execution_date={}'.format(DEFAULT_DATE_DS))
'execution_date={}'.format(EXAMPLE_DAG_DEFAULT_DATE))
self.assertIn("A __call__ method", response.data.decode('utf-8'))

def tearDown(self):
configuration.conf.set("webserver", "expose_config", "False")
self.dag_bash.clear(start_date=DEFAULT_DATE, end_date=timezone.utcnow())
self.dag_bash.clear(start_date=EXAMPLE_DAG_DEFAULT_DATE,
end_date=timezone.utcnow())
session = Session()
session.query(models.DagRun).delete()
session.query(models.TaskInstance).delete()
Expand Down
22 changes: 19 additions & 3 deletions tests/dags/test_scheduler_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,34 @@
# specific language governing permissions and limitations
# under the License.

from datetime import datetime
from datetime import datetime, timedelta

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
DEFAULT_DATE = datetime(2100, 1, 1)
DEFAULT_DATE = datetime(2016, 1, 1)

# DAG tests backfill with pooled tasks
# Previously backfill would queue the task but never run it
dag1 = DAG(
dag_id='test_start_date_scheduling',
start_date=datetime(2100, 1, 1))
start_date=datetime.utcnow() + timedelta(days=1))
dag1_task1 = DummyOperator(
task_id='dummy',
dag=dag1,
owner='airflow')

dag2 = DAG(
dag_id='test_task_start_date_scheduling',
start_date=DEFAULT_DATE
)
dag2_task1 = DummyOperator(
task_id='dummy1',
dag=dag2,
owner='airflow',
start_date=DEFAULT_DATE + timedelta(days=3)
)
dag2_task2 = DummyOperator(
task_id='dummy2',
dag=dag2,
owner='airflow'
)
23 changes: 22 additions & 1 deletion tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2217,7 +2217,7 @@ def test_scheduler_start_date(self):
dag_id = 'test_start_date_scheduling'
dag = self.dagbag.get_dag(dag_id)
dag.clear()
self.assertTrue(dag.start_date > DEFAULT_DATE)
self.assertTrue(dag.start_date > datetime.datetime.utcnow())

scheduler = SchedulerJob(dag_id,
num_runs=2)
Expand Down Expand Up @@ -2252,6 +2252,27 @@ def test_scheduler_start_date(self):
self.assertEqual(
len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)

def test_scheduler_task_start_date(self):
"""
Test that the scheduler respects task start dates that are different
from DAG start dates
"""
dag_id = 'test_task_start_date_scheduling'
dag = self.dagbag.get_dag(dag_id)
dag.clear()
scheduler = SchedulerJob(dag_id,
num_runs=2)
scheduler.run()

session = settings.Session()
tiq = session.query(TI).filter(TI.dag_id == dag_id)
ti1s = tiq.filter(TI.task_id == 'dummy1').all()
ti2s = tiq.filter(TI.task_id == 'dummy2').all()
self.assertEqual(len(ti1s), 0)
self.assertEqual(len(ti2s), 2)
for t in ti2s:
self.assertEqual(t.state, State.SUCCESS)

def test_scheduler_multiprocessing(self):
"""
Test that the scheduler can successfully queue multiple dags in parallel
Expand Down
26 changes: 13 additions & 13 deletions tests/www_rbac/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from airflow.models import DAG, DagRun, TaskInstance
from airflow.operators.dummy_operator import DummyOperator
from airflow.settings import Session
from airflow.utils import timezone
from airflow.utils import dates, timezone
from airflow.utils.state import State
from airflow.utils.timezone import datetime
from airflow.www_rbac import app as application
Expand Down Expand Up @@ -267,8 +267,8 @@ def test_mount(self):


class TestAirflowBaseViews(TestBase):
default_date = timezone.datetime(2018, 3, 1)
run_id = "test_{}".format(models.DagRun.id_for_date(default_date))
EXAMPLE_DAG_DEFAULT_DATE = dates.days_ago(2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this sort of change needed? It looks like it's an unrelated change from the diff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's needed.

This date gets called in e.g. https://github.com/apache/incubator-airflow/pull/4010/files/49ca7ad528cec88503bd011ca0c71218b168a05c#diff-8919d07682d67296863b9c62a96af963L298

That line creates a DagRun for previously 2018-03-01.
The example DAG that this test is testing has a start date of 2 day ago, e.g. 2018-10-07.

Before this change, task instances would get created regardless of the DAG start date. After this change, only task instances after the DAG start date get created. So if I kept the DagRun for 2018-03-01, the instances being tested wouldn't get created.

run_id = "test_{}".format(models.DagRun.id_for_date(EXAMPLE_DAG_DEFAULT_DATE))

def setUp(self):
super(TestAirflowBaseViews, self).setUp()
Expand Down Expand Up @@ -297,19 +297,19 @@ def prepare_dagruns(self):

self.bash_dagrun = self.bash_dag.create_dagrun(
run_id=self.run_id,
execution_date=self.default_date,
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING)

self.sub_dagrun = self.sub_dag.create_dagrun(
run_id=self.run_id,
execution_date=self.default_date,
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING)

self.xcom_dagrun = self.xcom_dag.create_dagrun(
run_id=self.run_id,
execution_date=self.default_date,
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING)

Expand All @@ -327,19 +327,19 @@ def test_home(self):

def test_task(self):
url = ('task?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
.format(self.percent_encode(self.default_date)))
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url, follow_redirects=True)
self.check_content_in_response('Task Instance Details', resp)

def test_xcom(self):
url = ('xcom?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
.format(self.percent_encode(self.default_date)))
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url, follow_redirects=True)
self.check_content_in_response('XCom', resp)

def test_rendered(self):
url = ('rendered?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
.format(self.percent_encode(self.default_date)))
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url, follow_redirects=True)
self.check_content_in_response('Rendered Template', resp)

Expand Down Expand Up @@ -409,28 +409,28 @@ def test_paused(self):
def test_failed(self):
url = ('failed?task_id=run_this_last&dag_id=example_bash_operator&'
'execution_date={}&upstream=false&downstream=false&future=false&past=false'
.format(self.percent_encode(self.default_date)))
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url)
self.check_content_in_response('Wait a minute', resp)

def test_success(self):
url = ('success?task_id=run_this_last&dag_id=example_bash_operator&'
'execution_date={}&upstream=false&downstream=false&future=false&past=false'
.format(self.percent_encode(self.default_date)))
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url)
self.check_content_in_response('Wait a minute', resp)

def test_clear(self):
url = ('clear?task_id=runme_1&dag_id=example_bash_operator&'
'execution_date={}&upstream=false&downstream=false&future=false&past=false'
.format(self.percent_encode(self.default_date)))
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url)
self.check_content_in_response(['example_bash_operator', 'Wait a minute'], resp)

def test_run(self):
url = ('run?task_id=runme_0&dag_id=example_bash_operator&ignore_all_deps=false&'
'ignore_ti_state=true&execution_date={}'
.format(self.percent_encode(self.default_date)))
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url)
self.check_content_in_response('', resp, resp_code=302)

Expand Down