Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mask passwords and sensitive info in task logs and UI #15599

Merged
merged 5 commits into from
May 5, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ def task_run(args, dag=None):
conf.read_dict(conf_dict, source=args.cfg_path)
settings.configure_vars()

settings.MASK_SECRETS_IN_LOGS = True

# IMPORTANT, have to use the NullPool, otherwise, each "run" command may leave
# behind multiple open sleeping connections while heartbeating, which could
# easily exceed the database connection limit when
Expand Down Expand Up @@ -357,6 +359,9 @@ def task_test(args, dag=None):
# We want to log output from operators etc to show up here. Normally
# airflow.task would redirect to a file, but here we want it to propagate
# up to the normal airflow handler.

settings.MASK_SECRETS_IN_LOGS = True

handlers = logging.getLogger('airflow.task').handlers
already_has_stream_handler = False
for handler in handlers:
Expand Down
10 changes: 10 additions & 0 deletions airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,31 @@
'class': COLORED_FORMATTER_CLASS if COLORED_LOG else 'logging.Formatter',
},
},
'filters': {
'mask_secrets': {
'()': 'airflow.utils.log.secrets_masker.SecretsMasker',
},
},
'handlers': {
'console': {
'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
'formatter': 'airflow_coloured',
'stream': 'sys.stdout',
'filters': ['mask_secrets'],
},
'task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
'filters': ['mask_secrets'],
},
'processor': {
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
'filters': ['mask_secrets'],
},
},
'loggers': {
Expand All @@ -93,6 +101,7 @@
'handlers': ['task'],
'level': LOG_LEVEL,
'propagate': False,
'filters': ['mask_secrets'],
},
'flask_appbuilder': {
'handler': ['console'],
Expand All @@ -103,6 +112,7 @@
'root': {
'handlers': ['console'],
'level': LOG_LEVEL,
'filters': ['mask_secrets'],
},
}

Expand Down
34 changes: 17 additions & 17 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,23 @@
type: integer
example: ~
default: "3"
- name: hide_sensitive_var_conn_fields
description: |
Hide sensitive Variables or Connection extra json keys from UI and task logs when set to True

(Connection passwords are always hidden in logs)
version_added: ~
type: boolean
example: ~
default: "True"
- name: sensitive_var_conn_names
description: |
A comma-separated list of extra sensitive keywords to look for in variables names or connection's
extra JSON.
version_added: ~
type: string
example: ~
default: ""

- name: logging
description: ~
Expand Down Expand Up @@ -1911,23 +1928,6 @@
type: string
example: ~
default: "v3"
- name: admin
description: ~
options:
- name: hide_sensitive_variable_fields
description: |
UI to hide sensitive variable fields when set to True
version_added: ~
type: string
example: ~
default: "True"
- name: sensitive_variable_fields
description: |
A comma-separated list of sensitive keywords to look for in variables names.
version_added: ~
type: string
example: ~
default: ""
- name: elasticsearch
description: ~
options:
Expand Down
16 changes: 9 additions & 7 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,15 @@ lazy_discover_providers = True
# Currently it is only used in ``DagFileProcessor.process_file`` to retry ``dagbag.sync_to_db``.
max_db_retries = 3

# Hide sensitive Variables or Connection extra json keys from UI and task logs when set to True
#
# (Connection passwords are always hidden in logs)
hide_sensitive_var_conn_fields = True

# A comma-separated list of extra sensitive keywords to look for in variables names or connection's
# extra JSON.
sensitive_var_conn_names =

[logging]
# The folder where airflow should store its log files
# This path must be absolute
Expand Down Expand Up @@ -949,13 +958,6 @@ keytab = airflow.keytab
[github_enterprise]
api_rev = v3

[admin]
# UI to hide sensitive variable fields when set to True
hide_sensitive_variable_fields = True

# A comma-separated list of sensitive keywords to look for in variables names.
sensitive_variable_fields =

[elasticsearch]
# Elasticsearch host
host =
Expand Down
4 changes: 0 additions & 4 deletions airflow/config_templates/default_test.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,6 @@ scheduler_zombie_task_threshold = 300
dag_dir_list_interval = 0
max_tis_per_query = 512

[admin]
hide_sensitive_variable_fields = True
sensitive_variable_fields =

[elasticsearch]
host =
log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
Expand Down
2 changes: 2 additions & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ class AirflowConfigParser(ConfigParser): # pylint: disable=too-many-ancestors
('metrics', 'statsd_custom_client_path'): ('scheduler', 'statsd_custom_client_path', '2.0.0'),
('scheduler', 'parsing_processes'): ('scheduler', 'max_threads', '1.10.14'),
('operators', 'default_queue'): ('celery', 'default_queue', '2.1.0'),
('core', 'hide_sensitive_var_conn_fields'): ('admin', 'hide_sensitive_variable_fields', '2.1.0'),
ashb marked this conversation as resolved.
Show resolved Hide resolved
('core', 'sensitive_var_conn_names'): ('admin', 'sensitive_variable_fields', '2.1.0'),
}

# A mapping of old default values that we want to change and warn the user
Expand Down
4 changes: 2 additions & 2 deletions airflow/hooks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ def get_connection(cls, conn_id: str) -> "Connection":
conn.port,
conn.schema,
conn.login,
"XXXXXXXX" if conn.password else None,
"XXXXXXXX" if conn.extra_dejson else None,
conn.password,
conn.extra_dejson,
)
return conn

Expand Down
15 changes: 14 additions & 1 deletion airflow/models/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@

from sqlalchemy import Boolean, Column, Integer, String, Text
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import synonym
from sqlalchemy.orm import reconstructor, synonym

from airflow.configuration import ensure_secrets_loaded
from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.models.base import ID_LEN, Base
from airflow.models.crypto import get_fernet
from airflow.providers_manager import ProvidersManager
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.secrets_masker import mask_secret
from airflow.utils.module_loading import import_string


Expand Down Expand Up @@ -141,6 +142,14 @@ def __init__( # pylint: disable=too-many-arguments
self.port = port
self.extra = extra

if self.password:
mask_secret(self.password)

@reconstructor
def on_db_load(self): # pylint: disable=missing-function-docstring
if self.password:
mask_secret(self.password)

def parse_from_uri(self, **uri):
"""This method is deprecated. Please use uri parameter in constructor."""
warnings.warn(
Expand Down Expand Up @@ -346,9 +355,13 @@ def extra_dejson(self) -> Dict:
if self.extra:
try:
obj = json.loads(self.extra)

except JSONDecodeError:
self.log.exception("Failed parsing the json for conn_id %s", self.conn_id)

# Mask sensitive keys from this list
mask_secret(obj)

return obj

@classmethod
Expand Down
12 changes: 3 additions & 9 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
from airflow.models.taskinstance import Context, TaskInstance, clear_task_instances
from airflow.security import permissions
from airflow.stats import Stats
from airflow.typing_compat import RePatternType
from airflow.utils import timezone
from airflow.utils.dates import cron_presets, date_range as utils_date_range
from airflow.utils.file import correct_maybe_zipped
Expand All @@ -79,13 +80,6 @@
from airflow.utils.task_group import TaskGroup


# Before Py 3.7, there is no re.Pattern class
try:
from re import Pattern as PatternType # type: ignore
except ImportError:
PatternType = type(re.compile('', 0))


log = logging.getLogger(__name__)

ScheduleInterval = Union[str, timedelta, relativedelta]
Expand Down Expand Up @@ -1444,7 +1438,7 @@ def sub_dag(self, *args, **kwargs):

def partial_subset(
self,
task_ids_or_regex: Union[str, PatternType, Iterable[str]],
task_ids_or_regex: Union[str, RePatternType, Iterable[str]],
include_downstream=False,
include_upstream=True,
include_direct_upstream=False,
Expand Down Expand Up @@ -1472,7 +1466,7 @@ def partial_subset(
self.task_dict = task_dict
self._task_group = task_group

if isinstance(task_ids_or_regex, (str, PatternType)):
if isinstance(task_ids_or_regex, (str, RePatternType)):
matched_tasks = [t for t in self.tasks if re.findall(task_ids_or_regex, t.task_id)]
else:
matched_tasks = [t for t in self.tasks if t.task_id in task_ids_or_regex]
Expand Down
11 changes: 11 additions & 0 deletions airflow/models/renderedtifields.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,20 @@ def __init__(self, ti: TaskInstance, render_templates=True):
field: serialize_template_field(getattr(self.task, field)) for field in self.task.template_fields
}

self._redact()

def __repr__(self):
return f"<{self.__class__.__name__}: {self.dag_id}.{self.task_id} {self.execution_date}"

def _redact(self):
from airflow.utils.log.secrets_masker import redact

if self.k8s_pod_yaml:
self.k8s_pod_yaml = redact(self.k8s_pod_yaml)

for field, rendered in self.rendered_fields.items():
self.rendered_fields[field] = redact(rendered, field)

@classmethod
@provide_session
def get_templated_fields(cls, ti: TaskInstance, session: Session = None) -> Optional[dict]:
Expand Down
13 changes: 11 additions & 2 deletions airflow/models/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
from cryptography.fernet import InvalidToken as InvalidFernetToken
from sqlalchemy import Boolean, Column, Integer, String, Text
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import Session, synonym
from sqlalchemy.orm import Session, reconstructor, synonym

from airflow.configuration import ensure_secrets_loaded
from airflow.models.base import ID_LEN, Base
from airflow.models.crypto import get_fernet
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.secrets_masker import mask_secret
from airflow.utils.session import provide_session

log = logging.getLogger()
Expand All @@ -56,6 +57,11 @@ def __init__(self, key=None, val=None, description=None):
self.val = val
self.description = description

@reconstructor
def on_db_load(self): # pylint: disable=missing-function-docstring
if self._val:
mask_secret(self.val, self.key)

def __repr__(self):
# Hiding the value
return f'{self.key} : {self._val}'
Expand Down Expand Up @@ -134,8 +140,11 @@ def get(
raise KeyError(f'Variable {key} does not exist')
else:
if deserialize_json:
return json.loads(var_val)
obj = json.loads(var_val)
mask_secret(var_val, key)
return obj
else:
mask_secret(var_val, key)
return var_val

@classmethod
Expand Down
7 changes: 7 additions & 0 deletions airflow/sensors/smart_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,18 @@ def create_new_task_handler():
Create task log handler for a sensor work.
:return: log handler
"""
from airflow.utils.log.secrets_masker import _secrets_masker # noqa

handler_config_copy = {k: handler_config[k] for k in handler_config}
del handler_config_copy['filters']

formatter_config_copy = {k: formatter_config[k] for k in formatter_config}
handler = dictConfigurator.configure_handler(handler_config_copy)
formatter = dictConfigurator.configure_formatter(formatter_config_copy)
handler.setFormatter(formatter)

# We want to share the _global_ filterer instance, not create a new one
handler.addFilter(_secrets_masker())
return handler

def _get_sensor_logger(self, si):
Expand Down
11 changes: 11 additions & 0 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ def configure_vars():

def configure_orm(disable_connection_pool=False):
"""Configure ORM using SQLAlchemy"""
from airflow.utils.log.secrets_masker import mask_secret

log.debug("Setting up DB connection pool (PID %s)", os.getpid())
global engine
global Session
Expand All @@ -220,6 +222,9 @@ def configure_orm(disable_connection_pool=False):
connect_args = {}

engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args)

mask_secret(engine.url.password)

setup_event_handlers(engine)

Session = scoped_session(
Expand Down Expand Up @@ -497,3 +502,9 @@ def initialize():
executor_constants.KUBERNETES_EXECUTOR,
executor_constants.CELERY_KUBERNETES_EXECUTOR,
}

HIDE_SENSITIVE_VAR_CONN_FIELDS = conf.getboolean('core', 'hide_sensitive_var_conn_fields')

# By default this is off, but is automatically configured on when running task
# instances
MASK_SECRETS_IN_LOGS = False
9 changes: 9 additions & 0 deletions airflow/typing_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,12 @@
)
except ImportError:
from typing_extensions import Protocol, TypedDict, runtime_checkable # type: ignore # noqa


# Before Py 3.7, there is no re.Pattern class
try:
from re import Pattern as RePatternType # type: ignore # pylint: disable=unused-import
except ImportError:
import re

RePatternType = type(re.compile('', 0))
Loading