From c8c4a12f440cb0a3222e547fc1cde1ea65db5b10 Mon Sep 17 00:00:00 2001 From: Qian Yu Date: Mon, 9 Sep 2019 11:29:45 +0800 Subject: [PATCH] [AIRFLOW-5444] Fix action_logging so that request.form for POST is logged - Log request.values so both GET and POST are properly logged - Add a test for action_logging --- airflow/www/decorators.py | 10 ++--- tests/www/test_views.py | 95 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 5 deletions(-) diff --git a/airflow/www/decorators.py b/airflow/www/decorators.py index 91ef51e4f0383a..5b3361ed309539 100644 --- a/airflow/www/decorators.py +++ b/airflow/www/decorators.py @@ -43,13 +43,13 @@ def wrapper(*args, **kwargs): event=f.__name__, task_instance=None, owner=user, - extra=str(list(request.args.items())), - task_id=request.args.get('task_id'), - dag_id=request.args.get('dag_id')) + extra=str(list(request.values.items())), + task_id=request.values.get('task_id'), + dag_id=request.values.get('dag_id')) - if 'execution_date' in request.args: + if 'execution_date' in request.values: log.execution_date = pendulum.parse( - request.args.get('execution_date')) + request.values.get('execution_date')) session.add(log) diff --git a/tests/www/test_views.py b/tests/www/test_views.py index 021d9fe8ac0f84..42de45c7d21651 100644 --- a/tests/www/test_views.py +++ b/tests/www/test_views.py @@ -1882,5 +1882,100 @@ def test_create_dagrun(self): self.assertEqual(dr.execution_date, timezone.convert_to_utc(datetime(2018, 7, 6, 5, 4, 3))) +class TestDecorators(TestBase): + EXAMPLE_DAG_DEFAULT_DATE = dates.days_ago(2) + run_id = "test_{}".format(models.DagRun.id_for_date(EXAMPLE_DAG_DEFAULT_DATE)) + + @classmethod + def setUpClass(cls): + super().setUpClass() + dagbag = models.DagBag(include_examples=True) + for dag in dagbag.dags.values(): + dag.sync_to_db() + + def setUp(self): + super().setUp() + self.logout() + self.login() + self.cleanup_dagruns() + self.prepare_dagruns() + + def cleanup_dagruns(self): + DR = models.DagRun + dag_ids = ['example_bash_operator', + 'example_subdag_operator', + 'example_xcom'] + (self.session + .query(DR) + .filter(DR.dag_id.in_(dag_ids)) + .filter(DR.run_id == self.run_id) + .delete(synchronize_session='fetch')) + self.session.commit() + + def prepare_dagruns(self): + dagbag = models.DagBag(include_examples=True) + self.bash_dag = dagbag.dags['example_bash_operator'] + self.sub_dag = dagbag.dags['example_subdag_operator'] + self.xcom_dag = dagbag.dags['example_xcom'] + + self.bash_dagrun = self.bash_dag.create_dagrun( + run_id=self.run_id, + execution_date=self.EXAMPLE_DAG_DEFAULT_DATE, + start_date=timezone.utcnow(), + state=State.RUNNING) + + self.sub_dagrun = self.sub_dag.create_dagrun( + run_id=self.run_id, + execution_date=self.EXAMPLE_DAG_DEFAULT_DATE, + start_date=timezone.utcnow(), + state=State.RUNNING) + + self.xcom_dagrun = self.xcom_dag.create_dagrun( + run_id=self.run_id, + execution_date=self.EXAMPLE_DAG_DEFAULT_DATE, + start_date=timezone.utcnow(), + state=State.RUNNING) + + def check_last_log(self, dag_id, event, execution_date=None): + from airflow.models import Log + qry = self.session.query(Log.dag_id, Log.task_id, Log.event, Log.execution_date, + Log.owner, Log.extra) + qry = qry.filter(Log.dag_id == dag_id, Log.event == event) + if execution_date: + qry = qry.filter(Log.execution_date == execution_date) + logs = qry.order_by(Log.dttm.desc()).limit(5).all() + self.assertGreaterEqual(len(logs), 1) + self.assertTrue(logs[0].extra) + + def test_action_logging_get(self): + url = 'graph?dag_id=example_bash_operator&execution_date={}'.format( + self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)) + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('runme_1', resp) + + # In mysql backend, this commit() is needed to write down the logs + self.session.commit() + self.check_last_log("example_bash_operator", event="graph", + execution_date=self.EXAMPLE_DAG_DEFAULT_DATE) + + def test_action_logging_post(self): + form = dict( + task_id="runme_1", + dag_id="example_bash_operator", + execution_date=self.EXAMPLE_DAG_DEFAULT_DATE, + upstream="false", + downstream="false", + future="false", + past="false", + only_failed="false", + ) + resp = self.client.post("clear", data=form) + self.check_content_in_response(['example_bash_operator', 'Wait a minute'], resp) + # In mysql backend, this commit() is needed to write down the logs + self.session.commit() + self.check_last_log("example_bash_operator", event="clear", + execution_date=self.EXAMPLE_DAG_DEFAULT_DATE) + + if __name__ == '__main__': unittest.main()