Skip to content

Commit

Permalink
[AIRFLOW-5444] Fix action_logging so that request.form for POST is lo…
Browse files Browse the repository at this point in the history
…gged (apache#6064) (apache#379)

Log request.values so both GET and POST are properly logged
  • Loading branch information
Ping Zhang authored and GitHub Enterprise committed Nov 22, 2019
1 parent 4074144 commit 780ef23
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 5 deletions.
10 changes: 5 additions & 5 deletions airflow/www_rbac/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ def wrapper(*args, **kwargs):
event=f.__name__,
task_instance=None,
owner=user,
extra=str(list(request.args.items())),
task_id=request.args.get('task_id'),
dag_id=request.args.get('dag_id'))
extra=str(list(request.values.items())),
task_id=request.values.get('task_id'),
dag_id=request.values.get('dag_id'))

if 'execution_date' in request.args:
if 'execution_date' in request.values:
log.execution_date = pendulum.parse(
request.args.get('execution_date'))
request.values.get('execution_date'))

session.add(log)

Expand Down
93 changes: 93 additions & 0 deletions tests/www_rbac/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1981,6 +1981,99 @@ def test_create_dagrun(self):

self.assertEqual(dr.execution_date, timezone.convert_to_utc(datetime(2018, 7, 6, 5, 4, 3)))

class TestDecorators(TestBase):
EXAMPLE_DAG_DEFAULT_DATE = dates.days_ago(2)
run_id = "test_{}".format(models.DagRun.id_for_date(EXAMPLE_DAG_DEFAULT_DATE))

@classmethod
def setUpClass(cls):
super(TestDecorators, cls).setUpClass()
dagbag = models.DagBag(include_examples=True)
for dag in dagbag.dags.values():
dag.sync_to_db()

def setUp(self):
super(TestDecorators, self).setUp()
self.logout()
self.login()
self.cleanup_dagruns()
self.prepare_dagruns()

def cleanup_dagruns(self):
DR = models.DagRun
dag_ids = ['example_bash_operator',
'example_subdag_operator',
'example_xcom']
(self.session
.query(DR)
.filter(DR.dag_id.in_(dag_ids))
.filter(DR.run_id == self.run_id)
.delete(synchronize_session='fetch'))
self.session.commit()

def prepare_dagruns(self):
dagbag = models.DagBag(include_examples=True)
self.bash_dag = dagbag.dags['example_bash_operator']
self.sub_dag = dagbag.dags['example_subdag_operator']
self.xcom_dag = dagbag.dags['example_xcom']

self.bash_dagrun = self.bash_dag.create_dagrun(
run_id=self.run_id,
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING)

self.sub_dagrun = self.sub_dag.create_dagrun(
run_id=self.run_id,
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING)

self.xcom_dagrun = self.xcom_dag.create_dagrun(
run_id=self.run_id,
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING)

def check_last_log(self, dag_id, event, execution_date=None):
from airflow.models import Log
qry = self.session.query(Log.dag_id, Log.task_id, Log.event, Log.execution_date,
Log.owner, Log.extra)
qry = qry.filter(Log.dag_id == dag_id, Log.event == event)
if execution_date:
qry = qry.filter(Log.execution_date == execution_date)
logs = qry.order_by(Log.dttm.desc()).limit(5).all()
self.assertGreaterEqual(len(logs), 1)
self.assertTrue(logs[0].extra)

def test_action_logging_get(self):
url = 'graph?dag_id=example_bash_operator&execution_date={}'.format(
self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE))
resp = self.client.get(url, follow_redirects=True)
self.check_content_in_response('runme_1', resp)

# In mysql backend, this commit() is needed to write down the logs
self.session.commit()
self.check_last_log("example_bash_operator", event="graph",
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE)

def test_action_logging_post(self):
form = dict(
task_id="runme_1",
dag_id="example_bash_operator",
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
upstream="false",
downstream="false",
future="false",
past="false",
only_failed="false",
)
resp = self.client.post("clear", data=form)
self.check_content_in_response(['example_bash_operator', 'Wait a minute'], resp)
# In mysql backend, this commit() is needed to write down the logs
self.session.commit()
self.check_last_log("example_bash_operator", event="clear",
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE)

if __name__ == '__main__':
unittest.main()

0 comments on commit 780ef23

Please sign in to comment.