diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py index ae04f2f0a204e..e749a2b5559bc 100644 --- a/airflow/api/common/experimental/trigger_dag.py +++ b/airflow/api/common/experimental/trigger_dag.py @@ -60,6 +60,14 @@ def _trigger_dag( if replace_microseconds: execution_date = execution_date.replace(microsecond=0) + if dag.default_args and 'start_date' in dag.default_args: + min_dag_start_date = dag.default_args["start_date"] + if min_dag_start_date and execution_date < min_dag_start_date: + raise ValueError( + "The execution_date [{0}] should be >= start_date [{1}] from DAG's default_args".format( + execution_date.isoformat(), + min_dag_start_date.isoformat())) + if not run_id: run_id = "manual__{0}".format(execution_date.isoformat()) diff --git a/tests/api/common/experimental/test_trigger_dag.py b/tests/api/common/experimental/test_trigger_dag.py index b787f840ffbc3..fe0b6375c765e 100644 --- a/tests/api/common/experimental/test_trigger_dag.py +++ b/tests/api/common/experimental/test_trigger_dag.py @@ -24,6 +24,7 @@ from airflow.api.common.experimental.trigger_dag import _trigger_dag from airflow.exceptions import AirflowException from airflow.models import DAG, DagRun +from airflow.utils import timezone class TestTriggerDag(unittest.TestCase): @@ -108,6 +109,46 @@ def test_trigger_dag_with_str_conf(self, dag_bag_mock): self.assertEqual(triggers[0].conf, json.loads(conf)) + @mock.patch('airflow.models.DagBag') + def test_trigger_dag_with_too_early_start_date(self, dag_bag_mock): + dag_id = "trigger_dag_with_too_early_start_date" + dag = DAG(dag_id, default_args={'start_date': timezone.datetime(2016, 9, 5, 10, 10, 0)}) + dag_bag_mock.dags = [dag_id] + dag_bag_mock.get_dag.return_value = dag + dag_run = DagRun() + + self.assertRaises( + ValueError, + _trigger_dag, + dag_id, + dag_bag_mock, + dag_run, + run_id=None, + conf=None, + execution_date=timezone.datetime(2015, 7, 5, 10, 10, 0), + replace_microseconds=True, + ) + + @mock.patch('airflow.models.DagBag') + def test_trigger_dag_with_valid_start_date(self, dag_bag_mock): + dag_id = "trigger_dag_with_valid_start_date" + dag = DAG(dag_id, default_args={'start_date': timezone.datetime(2016, 9, 5, 10, 10, 0)}) + dag_bag_mock.dags = [dag_id] + dag_bag_mock.get_dag.return_value = dag + dag_run = DagRun() + + triggers = _trigger_dag( + dag_id, + dag_bag_mock, + dag_run, + run_id=None, + conf=None, + execution_date=timezone.datetime(2018, 7, 5, 10, 10, 0), + replace_microseconds=True, + ) + + assert len(triggers) == 1 + @mock.patch('airflow.models.DagBag') def test_trigger_dag_with_dict_conf(self, dag_bag_mock): dag_id = "trigger_dag_with_dict_conf"