Skip to content

Commit

Permalink
[AIRFLOW-6047] Simplify the logging configuration template (#6644)
Browse files Browse the repository at this point in the history
  • Loading branch information
mik-laj authored Nov 27, 2019
1 parent ac2d0be commit e82008d
Showing 1 changed file with 84 additions and 76 deletions.
160 changes: 84 additions & 76 deletions airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import os
from typing import Any, Dict

from airflow import AirflowException
from airflow.configuration import conf
from airflow.utils.file import mkdirs

Expand Down Expand Up @@ -54,26 +55,6 @@

PROCESSOR_FILENAME_TEMPLATE = conf.get('core', 'LOG_PROCESSOR_FILENAME_TEMPLATE')

# Storage bucket url for remote logging
# s3 buckets should start with "s3://"
# gcs buckets should start with "gs://"
# wasb buckets should start with "wasb"
# just to help Airflow select correct handler
REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')

ELASTICSEARCH_HOST = conf.get('elasticsearch', 'HOST')

ELASTICSEARCH_LOG_ID_TEMPLATE = conf.get('elasticsearch', 'LOG_ID_TEMPLATE')

ELASTICSEARCH_END_OF_LOG_MARK = conf.get('elasticsearch', 'END_OF_LOG_MARK')

ELASTICSEARCH_WRITE_STDOUT = conf.get('elasticsearch', 'WRITE_STDOUT')

ELASTICSEARCH_JSON_FORMAT = conf.get('elasticsearch', 'JSON_FORMAT')

ELASTICSEARCH_JSON_FIELDS = conf.get('elasticsearch', 'JSON_FIELDS')


DEFAULT_LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
Expand Down Expand Up @@ -148,54 +129,6 @@
}
}

REMOTE_HANDLERS = {
's3': {
'task': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': REMOTE_BASE_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
},
'gcs': {
'task': {
'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
},
'wasb': {
'task': {
'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
'wasb_container': 'airflow-logs',
'filename_template': FILENAME_TEMPLATE,
'delete_local_copy': False,
},
},
'elasticsearch': {
'task': {
'class': 'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'log_id_template': ELASTICSEARCH_LOG_ID_TEMPLATE,
'filename_template': FILENAME_TEMPLATE,
'end_of_log_mark': ELASTICSEARCH_END_OF_LOG_MARK,
'host': ELASTICSEARCH_HOST,
'write_stdout': ELASTICSEARCH_WRITE_STDOUT,
'json_format': ELASTICSEARCH_JSON_FORMAT,
'json_fields': ELASTICSEARCH_JSON_FIELDS
},
},
}

REMOTE_LOGGING = conf.getboolean('core', 'remote_logging')

# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set.
# This is to avoid exceptions when initializing RotatingFileHandler multiple times
# in multiple processes.
Expand All @@ -212,11 +145,86 @@
directory = os.path.dirname(processor_manager_handler_config['filename'])
mkdirs(directory, 0o755)

if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])
##################
# Remote logging #
##################

REMOTE_LOGGING = conf.getboolean('core', 'remote_logging')

if REMOTE_LOGGING:

ELASTICSEARCH_HOST = conf.get('elasticsearch', 'HOST')

# Storage bucket URL for remote logging
# S3 buckets should start with "s3://"
# GCS buckets should start with "gs://"
# WASB buckets should start with "wasb"
# just to help Airflow select correct handler
REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')

if REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
S3_REMOTE_HANDLERS = {
'task': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': REMOTE_BASE_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
}

DEFAULT_LOGGING_CONFIG['handlers'].update(S3_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
GCS_REMOTE_HANDLERS = {
'task': {
'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
}

DEFAULT_LOGGING_CONFIG['handlers'].update(GCS_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
WASB_REMOTE_HANDLERS = {
'task': {
'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
'wasb_container': 'airflow-logs',
'filename_template': FILENAME_TEMPLATE,
'delete_local_copy': False,
},
}

DEFAULT_LOGGING_CONFIG['handlers'].update(WASB_REMOTE_HANDLERS)
elif ELASTICSEARCH_HOST:
ELASTICSEARCH_LOG_ID_TEMPLATE = conf.get('elasticsearch', 'LOG_ID_TEMPLATE')
ELASTICSEARCH_END_OF_LOG_MARK = conf.get('elasticsearch', 'END_OF_LOG_MARK')
ELASTICSEARCH_WRITE_STDOUT = conf.get('elasticsearch', 'WRITE_STDOUT')
ELASTICSEARCH_JSON_FORMAT = conf.get('elasticsearch', 'JSON_FORMAT')
ELASTICSEARCH_JSON_FIELDS = conf.get('elasticsearch', 'JSON_FIELDS')

ELASTIC_REMOTE_HANDLERS = {
'task': {
'class': 'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'log_id_template': ELASTICSEARCH_LOG_ID_TEMPLATE,
'filename_template': FILENAME_TEMPLATE,
'end_of_log_mark': ELASTICSEARCH_END_OF_LOG_MARK,
'host': ELASTICSEARCH_HOST,
'write_stdout': ELASTICSEARCH_WRITE_STDOUT,
'json_format': ELASTICSEARCH_JSON_FORMAT,
'json_fields': ELASTICSEARCH_JSON_FIELDS
},
}

DEFAULT_LOGGING_CONFIG['handlers'].update(ELASTIC_REMOTE_HANDLERS)
else:
raise AirflowException(
"Incorrect remote log configuration. Please check the configuration of option 'host' in "
"section 'elasticsearch' if you are using Elasticsearch. In the other case, "
"'remote_base_log_folder' option in 'core' section.")

0 comments on commit e82008d

Please sign in to comment.