diff --git a/airflow/utils/log/es_task_handler.py b/airflow/utils/log/es_task_handler.py index 4a20e7c7324697..064e1668a7cf1d 100644 --- a/airflow/utils/log/es_task_handler.py +++ b/airflow/utils/log/es_task_handler.py @@ -78,6 +78,7 @@ def __init__(self, base_log_folder, filename_template, self.json_format = json_format self.json_fields = [label.strip() for label in json_fields.split(",")] self.handler = None + self.context_set = False def _render_log_id(self, ti, try_number): if self.log_id_jinja_template: @@ -200,35 +201,28 @@ def set_context(self, ti): :param ti: task instance object """ - super().set_context(ti) self.mark_end_on_close = not ti.raw + if self.json_format: + self.formatter = JSONFormatter(self.formatter._fmt, json_fields=self.json_fields, extras={ + 'dag_id': str(ti.dag_id), + 'task_id': str(ti.task_id), + 'execution_date': self._clean_execution_date(ti.execution_date), + 'try_number': str(ti.try_number) + }) + if self.write_stdout: + if self.context_set: + # We don't want to re-set up the handler if this logger has + # already been initialized + return + self.handler = logging.StreamHandler(stream=sys.__stdout__) self.handler.setLevel(self.level) - if self.json_format and not ti.raw: - self.handler.setFormatter( - JSONFormatter(self.formatter._fmt, json_fields=self.json_fields, extras={ - 'dag_id': str(ti.dag_id), - 'task_id': str(ti.task_id), - 'execution_date': self._clean_execution_date(ti.execution_date), - 'try_number': str(ti.try_number)})) - else: - self.handler.setFormatter(self.formatter) + self.handler.setFormatter(self.formatter) else: super().set_context(ti) - - def emit(self, record): - if self.write_stdout: - self.formatter.format(record) - if self.handler is not None: - self.handler.emit(record) - else: - super().emit(record) - - def flush(self): - if self.handler is not None: - self.handler.flush() + self.context_set = True def close(self): # When application exit, system shuts down all handlers by