Skip to content

Commit

Permalink
[AIRFLOW-4428] error if exec_date before default_args.start_date in t… (
Browse files Browse the repository at this point in the history
#6948)

(cherry picked from commit 41a9bfd)
  • Loading branch information
tooptoop4 authored and potiuk committed Jan 21, 2020
1 parent 6bc841e commit aca7f27
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 0 deletions.
8 changes: 8 additions & 0 deletions airflow/api/common/experimental/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ def _trigger_dag(
if replace_microseconds:
execution_date = execution_date.replace(microsecond=0)

if dag.default_args and 'start_date' in dag.default_args:
min_dag_start_date = dag.default_args["start_date"]
if min_dag_start_date and execution_date < min_dag_start_date:
raise ValueError(
"The execution_date [{0}] should be >= start_date [{1}] from DAG's default_args".format(
execution_date.isoformat(),
min_dag_start_date.isoformat()))

if not run_id:
run_id = "manual__{0}".format(execution_date.isoformat())

Expand Down
41 changes: 41 additions & 0 deletions tests/api/common/experimental/test_trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import timezone

import mock
import unittest
Expand Down Expand Up @@ -108,6 +109,46 @@ def test_trigger_dag_with_str_conf(self, dag_bag_mock):

self.assertEqual(triggers[0].conf, json.loads(conf))

@mock.patch('airflow.models.DagBag')
def test_trigger_dag_with_too_early_start_date(self, dag_bag_mock):
dag_id = "trigger_dag_with_too_early_start_date"
dag = DAG(dag_id, default_args={'start_date': timezone.datetime(2016, 9, 5, 10, 10, 0)})
dag_bag_mock.dags = [dag_id]
dag_bag_mock.get_dag.return_value = dag
dag_run = DagRun()

self.assertRaises(
ValueError,
_trigger_dag,
dag_id,
dag_bag_mock,
dag_run,
run_id=None,
conf=None,
execution_date=timezone.datetime(2015, 7, 5, 10, 10, 0),
replace_microseconds=True,
)

@mock.patch('airflow.models.DagBag')
def test_trigger_dag_with_valid_start_date(self, dag_bag_mock):
dag_id = "trigger_dag_with_valid_start_date"
dag = DAG(dag_id, default_args={'start_date': timezone.datetime(2016, 9, 5, 10, 10, 0)})
dag_bag_mock.dags = [dag_id]
dag_bag_mock.get_dag.return_value = dag
dag_run = DagRun()

triggers = _trigger_dag(
dag_id,
dag_bag_mock,
dag_run,
run_id=None,
conf=None,
execution_date=timezone.datetime(2018, 7, 5, 10, 10, 0),
replace_microseconds=True,
)

assert len(triggers) == 1

@mock.patch('airflow.models.DagBag')
def test_trigger_dag_with_dict_conf(self, dag_bag_mock):
dag_id = "trigger_dag_with_dict_conf"
Expand Down

0 comments on commit aca7f27

Please sign in to comment.