diff --git a/UPDATING.md b/UPDATING.md index 7da20adb306ac2..cb27425b148946 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -106,6 +106,14 @@ delete this option. The TriggerDagRunOperator now takes a `conf` argument to which a dict can be provided as conf for the DagRun. As a result, the `python_callable` argument was removed. PR: https://github.com/apache/airflow/pull/6317. +### Changes in experimental API execution_date microseconds replacement + +The default behavior was to strip the microseconds (and milliseconds, etc) off of all dag runs triggered by +by the experimental REST API. The default behavior will change when an explicit execution_date is +passed in the request body. It will also now be possible to have the execution_date generated, but +keep the microseconds by sending `replace_microseconds=false` in the request body. The default +behavior can be overridden by sending `replace_microseconds=true` along with an explicit execution_date + ### Changes in Google Cloud Platform related hooks The change in GCP operators implies that GCP Hooks for those operators require now keyword parameters rather diff --git a/airflow/utils/strings.py b/airflow/utils/strings.py index 63c44b821259bc..d8a52ff47809e5 100644 --- a/airflow/utils/strings.py +++ b/airflow/utils/strings.py @@ -28,3 +28,10 @@ def get_random_string(length=8, choices=string.ascii_letters + string.digits): Generate random string ''' return ''.join([choice(choices) for _ in range(length)]) + + +def to_boolean(astring): + ''' + Convert a string to a boolean + ''' + return astring.lower() in ['true', 't', 'y', 'yes', '1'] diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py index 1aa0662abfacac..67204f72d2c344 100644 --- a/airflow/www/api/experimental/endpoints.py +++ b/airflow/www/api/experimental/endpoints.py @@ -29,6 +29,7 @@ from airflow.exceptions import AirflowException from airflow.utils import timezone from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.strings import to_boolean from airflow.www.app import csrf _log = LoggingMixin().log @@ -74,8 +75,12 @@ def trigger_dag(dag_id): return response + replace_microseconds = (execution_date is None) + if 'replace_microseconds' in data: + replace_microseconds = to_boolean(data['replace_microseconds']) + try: - dr = trigger.trigger_dag(dag_id, run_id, conf, execution_date) + dr = trigger.trigger_dag(dag_id, run_id, conf, execution_date, replace_microseconds) except AirflowException as err: _log.error(err) response = jsonify(error="{}".format(err)) diff --git a/tests/www/api/experimental/test_endpoints.py b/tests/www/api/experimental/test_endpoints.py index 8a6da70131e3e8..e2de9974529075 100644 --- a/tests/www/api/experimental/test_endpoints.py +++ b/tests/www/api/experimental/test_endpoints.py @@ -129,13 +129,17 @@ def test_trigger_dag(self): ) self.assertEqual(200, response.status_code) + response_execution_date = parse_datetime(json.loads(response.data.decode('utf-8'))['execution_date']) + self.assertEqual(0, response_execution_date.microsecond) + # Check execution_date is correct response = json.loads(response.data.decode('utf-8')) dagbag = DagBag() dag = dagbag.get_dag('example_bash_operator') - dag_run = dag.get_dagrun(parse_datetime(response['execution_date'])) + dag_run = dag.get_dagrun(response_execution_date) self.assertEqual(run_id, dag_run.run_id) + # Test error for nonexistent dag response = self.client.post( url_template.format('does_not_exist_dag'), data=json.dumps({}), @@ -146,14 +150,10 @@ def test_trigger_dag(self): def test_trigger_dag_for_date(self): url_template = '/api/experimental/dags/{}/dag_runs' dag_id = 'example_bash_operator' - hour_from_now = utcnow() + timedelta(hours=1) - execution_date = datetime(hour_from_now.year, - hour_from_now.month, - hour_from_now.day, - hour_from_now.hour) + execution_date = utcnow() + timedelta(hours=1) datetime_string = execution_date.isoformat() - # Test Correct execution + # Test correct execution with execution date response = self.client.post( url_template.format(dag_id), data=json.dumps({'execution_date': datetime_string}), @@ -169,10 +169,27 @@ def test_trigger_dag_for_date(self): 'Dag Run not found for execution date {}' .format(execution_date)) + # Test correct execution with execution date and microseconds replaced + response = self.client.post( + url_template.format(dag_id), + data=json.dumps({'execution_date': datetime_string, 'replace_microseconds': 'true'}), + content_type="application/json" + ) + self.assertEqual(200, response.status_code) + response_execution_date = parse_datetime(json.loads(response.data.decode('utf-8'))['execution_date']) + self.assertEqual(0, response_execution_date.microsecond) + + dagbag = DagBag() + dag = dagbag.get_dag(dag_id) + dag_run = dag.get_dagrun(response_execution_date) + self.assertTrue(dag_run, + 'Dag Run not found for execution date {}' + .format(execution_date)) + # Test error for nonexistent dag response = self.client.post( url_template.format('does_not_exist_dag'), - data=json.dumps({'execution_date': execution_date.isoformat()}), + data=json.dumps({'execution_date': datetime_string}), content_type="application/json" ) self.assertEqual(404, response.status_code)