Skip to content

Commit

Permalink
[AIRFLOW-3632] Only replace microseconds if execution_date is None in…
Browse files Browse the repository at this point in the history
… trigger_dag REST API (#6380)

No need to require a user to pass in replace_microseconds to the
request body; instead we should use exactly the date that is given.
We will still replace the microseconds on execution_date if none is
passed in (and the param is True, which is the default)

(cherry picked from commit a45a209)
  • Loading branch information
acroos authored and ashb committed Dec 19, 2019
1 parent a98dd06 commit 9b13046
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 18 deletions.
8 changes: 8 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ assists users migrating to a new version.

## Airflow 1.10.7

### Changes in experimental API execution_date microseconds replacement

The default behavior was to strip the microseconds (and milliseconds, etc) off of all dag runs triggered by
by the experimental REST API. The default behavior will change when an explicit execution_date is
passed in the request body. It will also now be possible to have the execution_date generated, but
keep the microseconds by sending `replace_microseconds=false` in the request body. The default
behavior can be overridden by sending `replace_microseconds=true` along with an explicit execution_date

### Infinite pool size and pool size query optimisation

Pool size can now be set to -1 to indicate infinite size (it also includes
Expand Down
7 changes: 7 additions & 0 deletions airflow/utils/strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,10 @@ def get_random_string(length=8, choices=string.ascii_letters + string.digits):
Generate random string
'''
return ''.join([choice(choices) for _ in range(length)])


def to_boolean(astring):
'''
Convert a string to a boolean
'''
return astring.lower() in ['true', 't', 'y', 'yes', '1']
7 changes: 6 additions & 1 deletion airflow/www/api/experimental/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from airflow.exceptions import AirflowException
from airflow.utils import timezone
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.strings import to_boolean
from airflow.www.app import csrf
from airflow import models

Expand Down Expand Up @@ -78,8 +79,12 @@ def trigger_dag(dag_id):

return response

replace_microseconds = (execution_date is None)
if 'replace_microseconds' in data:
replace_microseconds = to_boolean(data['replace_microseconds'])

try:
dr = trigger.trigger_dag(dag_id, run_id, conf, execution_date)
dr = trigger.trigger_dag(dag_id, run_id, conf, execution_date, replace_microseconds)
except AirflowException as err:
_log.error(err)
response = jsonify(error="{}".format(err))
Expand Down
7 changes: 6 additions & 1 deletion airflow/www_rbac/api/experimental/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from airflow.api.common.experimental.get_dag_run_state import get_dag_run_state
from airflow.exceptions import AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.strings import to_boolean
from airflow.utils import timezone
from airflow.www_rbac.app import csrf
from airflow import models
Expand Down Expand Up @@ -77,8 +78,12 @@ def trigger_dag(dag_id):

return response

replace_microseconds = (execution_date is None)
if 'replace_microseconds' in data:
replace_microseconds = to_boolean(data['replace_microseconds'])

try:
dr = trigger.trigger_dag(dag_id, run_id, conf, execution_date)
dr = trigger.trigger_dag(dag_id, run_id, conf, execution_date, replace_microseconds)
except AirflowException as err:
_log.error(err)
response = jsonify(error="{}".format(err))
Expand Down
33 changes: 25 additions & 8 deletions tests/www/api/experimental/test_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,17 @@ def test_trigger_dag(self):
)

self.assertEqual(200, response.status_code)
response_execution_date = parse_datetime(json.loads(response.data.decode('utf-8'))['execution_date'])
self.assertEqual(0, response_execution_date.microsecond)

# Check execution_date is correct
response = json.loads(response.data.decode('utf-8'))
dagbag = DagBag()
dag = dagbag.get_dag('example_bash_operator')
dag_run = dag.get_dagrun(parse_datetime(response['execution_date']))
dag_run = dag.get_dagrun(response_execution_date)
self.assertEqual(run_id, dag_run.run_id)

# Test error for nonexistent dag
response = self.app.post(
url_template.format('does_not_exist_dag'),
data=json.dumps({}),
Expand Down Expand Up @@ -158,14 +162,10 @@ def test_delete_dag(self):
def test_trigger_dag_for_date(self):
url_template = '/api/experimental/dags/{}/dag_runs'
dag_id = 'example_bash_operator'
hour_from_now = utcnow() + timedelta(hours=1)
execution_date = datetime(hour_from_now.year,
hour_from_now.month,
hour_from_now.day,
hour_from_now.hour)
execution_date = utcnow() + timedelta(hours=1)
datetime_string = execution_date.isoformat()

# Test Correct execution
# Test correct execution with execution date
response = self.app.post(
url_template.format(dag_id),
data=json.dumps({'execution_date': datetime_string}),
Expand All @@ -181,10 +181,27 @@ def test_trigger_dag_for_date(self):
'Dag Run not found for execution date {}'
.format(execution_date))

# Test correct execution with execution date and microseconds replaced
response = self.app.post(
url_template.format(dag_id),
data=json.dumps({'execution_date': datetime_string, 'replace_microseconds': 'true'}),
content_type="application/json"
)
self.assertEqual(200, response.status_code)
response_execution_date = parse_datetime(json.loads(response.data.decode('utf-8'))['execution_date'])
self.assertEqual(0, response_execution_date.microsecond)

dagbag = DagBag()
dag = dagbag.get_dag(dag_id)
dag_run = dag.get_dagrun(response_execution_date)
self.assertTrue(dag_run,
'Dag Run not found for execution date {}'
.format(execution_date))

# Test error for nonexistent dag
response = self.app.post(
url_template.format('does_not_exist_dag'),
data=json.dumps({'execution_date': execution_date.isoformat()}),
data=json.dumps({'execution_date': datetime_string}),
content_type="application/json"
)
self.assertEqual(404, response.status_code)
Expand Down
33 changes: 25 additions & 8 deletions tests/www_rbac/api/experimental/test_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,17 @@ def test_trigger_dag(self):
)

self.assertEqual(200, response.status_code)
response_execution_date = parse_datetime(json.loads(response.data.decode('utf-8'))['execution_date'])
self.assertEqual(0, response_execution_date.microsecond)

# Check execution_date is correct
response = json.loads(response.data.decode('utf-8'))
dagbag = DagBag()
dag = dagbag.get_dag('example_bash_operator')
dag_run = dag.get_dagrun(parse_datetime(response['execution_date']))
dag_run = dag.get_dagrun(response_execution_date)
self.assertEqual(run_id, dag_run.run_id)

# Test error for nonexistent dag
response = self.client.post(
url_template.format('does_not_exist_dag'),
data=json.dumps({}),
Expand All @@ -146,14 +150,10 @@ def test_trigger_dag(self):
def test_trigger_dag_for_date(self):
url_template = '/api/experimental/dags/{}/dag_runs'
dag_id = 'example_bash_operator'
hour_from_now = utcnow() + timedelta(hours=1)
execution_date = datetime(hour_from_now.year,
hour_from_now.month,
hour_from_now.day,
hour_from_now.hour)
execution_date = utcnow() + timedelta(hours=1)
datetime_string = execution_date.isoformat()

# Test Correct execution
# Test correct execution with execution date
response = self.client.post(
url_template.format(dag_id),
data=json.dumps({'execution_date': datetime_string}),
Expand All @@ -169,10 +169,27 @@ def test_trigger_dag_for_date(self):
'Dag Run not found for execution date {}'
.format(execution_date))

# Test correct execution with execution date and microseconds replaced
response = self.client.post(
url_template.format(dag_id),
data=json.dumps({'execution_date': datetime_string, 'replace_microseconds': 'true'}),
content_type="application/json"
)
self.assertEqual(200, response.status_code)
response_execution_date = parse_datetime(json.loads(response.data.decode('utf-8'))['execution_date'])
self.assertEqual(0, response_execution_date.microsecond)

dagbag = DagBag()
dag = dagbag.get_dag(dag_id)
dag_run = dag.get_dagrun(response_execution_date)
self.assertTrue(dag_run,
'Dag Run not found for execution date {}'
.format(execution_date))

# Test error for nonexistent dag
response = self.client.post(
url_template.format('does_not_exist_dag'),
data=json.dumps({'execution_date': execution_date.isoformat()}),
data=json.dumps({'execution_date': datetime_string}),
content_type="application/json"
)
self.assertEqual(404, response.status_code)
Expand Down

0 comments on commit 9b13046

Please sign in to comment.