Skip to content

Commit

Permalink
Don't re-initialize JSON/stdout logging ElasticSearch inside forked p…
Browse files Browse the repository at this point in the history
…rocesses

Most of the time we will run the "raw" task in a forked subprocess (the
only time we don't is when we use impersonation) that will have the
logging already configured. So if the EsTaskHandler has already been
configured we don't want to "re"configure it -- otherwise it will
disable JSON output for the actual task!
  • Loading branch information
ashb committed Dec 9, 2019
1 parent f28e6c9 commit 1151395
Showing 1 changed file with 16 additions and 22 deletions.
38 changes: 16 additions & 22 deletions airflow/utils/log/es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(self, base_log_folder, filename_template,
self.json_format = json_format
self.json_fields = [label.strip() for label in json_fields.split(",")]
self.handler = None
self.context_set = False

def _render_log_id(self, ti, try_number):
if self.log_id_jinja_template:
Expand Down Expand Up @@ -200,35 +201,28 @@ def set_context(self, ti):
:param ti: task instance object
"""
super().set_context(ti)
self.mark_end_on_close = not ti.raw

if self.json_format:
self.formatter = JSONFormatter(self.formatter._fmt, json_fields=self.json_fields, extras={
'dag_id': str(ti.dag_id),
'task_id': str(ti.task_id),
'execution_date': self._clean_execution_date(ti.execution_date),
'try_number': str(ti.try_number)
})

if self.write_stdout:
if self.context_set:
# We don't want to re-set up the handler if this logger has
# already been initialized
return

self.handler = logging.StreamHandler(stream=sys.__stdout__)
self.handler.setLevel(self.level)
if self.json_format and not ti.raw:
self.handler.setFormatter(
JSONFormatter(self.formatter._fmt, json_fields=self.json_fields, extras={
'dag_id': str(ti.dag_id),
'task_id': str(ti.task_id),
'execution_date': self._clean_execution_date(ti.execution_date),
'try_number': str(ti.try_number)}))
else:
self.handler.setFormatter(self.formatter)
self.handler.setFormatter(self.formatter)
else:
super().set_context(ti)

def emit(self, record):
if self.write_stdout:
self.formatter.format(record)
if self.handler is not None:
self.handler.emit(record)
else:
super().emit(record)

def flush(self):
if self.handler is not None:
self.handler.flush()
self.context_set = True

def close(self):
# When application exit, system shuts down all handlers by
Expand Down

0 comments on commit 1151395

Please sign in to comment.