Skip to content

Commit

Permalink
[AIRFLOW-3632] Only replace microseconds if execution_date is None in…
Browse files Browse the repository at this point in the history
… trigger_dag REST API (#6380)

No need to require a user to pass in replace_microseconds to the
request body; instead we should use exactly the date that is given.
We will still replace the microseconds on execution_date if none is
passed in (and the param is True, which is the default)
  • Loading branch information
acroos authored and ashb committed Nov 20, 2019
1 parent 9ed4895 commit a45a209
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 9 deletions.
8 changes: 8 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ delete this option.
The TriggerDagRunOperator now takes a `conf` argument to which a dict can be provided as conf for the DagRun.
As a result, the `python_callable` argument was removed. PR: https://github.com/apache/airflow/pull/6317.

### Changes in experimental API execution_date microseconds replacement

The default behavior was to strip the microseconds (and milliseconds, etc) off of all dag runs triggered by
by the experimental REST API. The default behavior will change when an explicit execution_date is
passed in the request body. It will also now be possible to have the execution_date generated, but
keep the microseconds by sending `replace_microseconds=false` in the request body. The default
behavior can be overridden by sending `replace_microseconds=true` along with an explicit execution_date

### Changes in Google Cloud Platform related hooks

The change in GCP operators implies that GCP Hooks for those operators require now keyword parameters rather
Expand Down
7 changes: 7 additions & 0 deletions airflow/utils/strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,10 @@ def get_random_string(length=8, choices=string.ascii_letters + string.digits):
Generate random string
'''
return ''.join([choice(choices) for _ in range(length)])


def to_boolean(astring):
'''
Convert a string to a boolean
'''
return astring.lower() in ['true', 't', 'y', 'yes', '1']
7 changes: 6 additions & 1 deletion airflow/www/api/experimental/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from airflow.exceptions import AirflowException
from airflow.utils import timezone
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.strings import to_boolean
from airflow.www.app import csrf

_log = LoggingMixin().log
Expand Down Expand Up @@ -74,8 +75,12 @@ def trigger_dag(dag_id):

return response

replace_microseconds = (execution_date is None)
if 'replace_microseconds' in data:
replace_microseconds = to_boolean(data['replace_microseconds'])

try:
dr = trigger.trigger_dag(dag_id, run_id, conf, execution_date)
dr = trigger.trigger_dag(dag_id, run_id, conf, execution_date, replace_microseconds)
except AirflowException as err:
_log.error(err)
response = jsonify(error="{}".format(err))
Expand Down
33 changes: 25 additions & 8 deletions tests/www/api/experimental/test_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,17 @@ def test_trigger_dag(self):
)

self.assertEqual(200, response.status_code)
response_execution_date = parse_datetime(json.loads(response.data.decode('utf-8'))['execution_date'])
self.assertEqual(0, response_execution_date.microsecond)

# Check execution_date is correct
response = json.loads(response.data.decode('utf-8'))
dagbag = DagBag()
dag = dagbag.get_dag('example_bash_operator')
dag_run = dag.get_dagrun(parse_datetime(response['execution_date']))
dag_run = dag.get_dagrun(response_execution_date)
self.assertEqual(run_id, dag_run.run_id)

# Test error for nonexistent dag
response = self.client.post(
url_template.format('does_not_exist_dag'),
data=json.dumps({}),
Expand All @@ -146,14 +150,10 @@ def test_trigger_dag(self):
def test_trigger_dag_for_date(self):
url_template = '/api/experimental/dags/{}/dag_runs'
dag_id = 'example_bash_operator'
hour_from_now = utcnow() + timedelta(hours=1)
execution_date = datetime(hour_from_now.year,
hour_from_now.month,
hour_from_now.day,
hour_from_now.hour)
execution_date = utcnow() + timedelta(hours=1)
datetime_string = execution_date.isoformat()

# Test Correct execution
# Test correct execution with execution date
response = self.client.post(
url_template.format(dag_id),
data=json.dumps({'execution_date': datetime_string}),
Expand All @@ -169,10 +169,27 @@ def test_trigger_dag_for_date(self):
'Dag Run not found for execution date {}'
.format(execution_date))

# Test correct execution with execution date and microseconds replaced
response = self.client.post(
url_template.format(dag_id),
data=json.dumps({'execution_date': datetime_string, 'replace_microseconds': 'true'}),
content_type="application/json"
)
self.assertEqual(200, response.status_code)
response_execution_date = parse_datetime(json.loads(response.data.decode('utf-8'))['execution_date'])
self.assertEqual(0, response_execution_date.microsecond)

dagbag = DagBag()
dag = dagbag.get_dag(dag_id)
dag_run = dag.get_dagrun(response_execution_date)
self.assertTrue(dag_run,
'Dag Run not found for execution date {}'
.format(execution_date))

# Test error for nonexistent dag
response = self.client.post(
url_template.format('does_not_exist_dag'),
data=json.dumps({'execution_date': execution_date.isoformat()}),
data=json.dumps({'execution_date': datetime_string}),
content_type="application/json"
)
self.assertEqual(404, response.status_code)
Expand Down

0 comments on commit a45a209

Please sign in to comment.