diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py index 373f933d4cfa52..dc38b87835c3d3 100644 --- a/airflow/cli/commands/task_command.py +++ b/airflow/cli/commands/task_command.py @@ -202,6 +202,8 @@ def task_run(args, dag=None): conf.read_dict(conf_dict, source=args.cfg_path) settings.configure_vars() + settings.MASK_SECRETS_IN_LOGS = True + # IMPORTANT, have to use the NullPool, otherwise, each "run" command may leave # behind multiple open sleeping connections while heartbeating, which could # easily exceed the database connection limit when @@ -357,6 +359,9 @@ def task_test(args, dag=None): # We want to log output from operators etc to show up here. Normally # airflow.task would redirect to a file, but here we want it to propagate # up to the normal airflow handler. + + settings.MASK_SECRETS_IN_LOGS = True + handlers = logging.getLogger('airflow.task').handlers already_has_stream_handler = False for handler in handlers: diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index ad64078806fb46..3c8ffb844bea68 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -64,23 +64,31 @@ 'class': COLORED_FORMATTER_CLASS if COLORED_LOG else 'logging.Formatter', }, }, + 'filters': { + 'mask_secrets': { + '()': 'airflow.utils.log.secrets_masker.SecretsMasker', + }, + }, 'handlers': { 'console': { 'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler', 'formatter': 'airflow_coloured', 'stream': 'sys.stdout', + 'filters': ['mask_secrets'], }, 'task': { 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler', 'formatter': 'airflow', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 'filename_template': FILENAME_TEMPLATE, + 'filters': ['mask_secrets'], }, 'processor': { 'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler', 'formatter': 'airflow', 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), 'filename_template': PROCESSOR_FILENAME_TEMPLATE, + 'filters': ['mask_secrets'], }, }, 'loggers': { @@ -93,6 +101,7 @@ 'handlers': ['task'], 'level': LOG_LEVEL, 'propagate': False, + 'filters': ['mask_secrets'], }, 'flask_appbuilder': { 'handler': ['console'], @@ -103,6 +112,7 @@ 'root': { 'handlers': ['console'], 'level': LOG_LEVEL, + 'filters': ['mask_secrets'], }, } diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 8f66df9676d4b0..4d40234810b9d0 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -414,6 +414,23 @@ type: integer example: ~ default: "3" + - name: hide_sensitive_var_conn_fields + description: | + Hide sensitive Variables or Connection extra json keys from UI and task logs when set to True + + (Connection passwords are always hidden in logs) + version_added: ~ + type: boolean + example: ~ + default: "True" + - name: sensitive_var_conn_names + description: | + A comma-separated list of extra sensitive keywords to look for in variables names or connection's + extra JSON. + version_added: ~ + type: string + example: ~ + default: "" - name: logging description: ~ @@ -1911,23 +1928,6 @@ type: string example: ~ default: "v3" -- name: admin - description: ~ - options: - - name: hide_sensitive_variable_fields - description: | - UI to hide sensitive variable fields when set to True - version_added: ~ - type: string - example: ~ - default: "True" - - name: sensitive_variable_fields - description: | - A comma-separated list of sensitive keywords to look for in variables names. - version_added: ~ - type: string - example: ~ - default: "" - name: elasticsearch description: ~ options: diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 0f906063e6f52b..c72008a6061e35 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -237,6 +237,15 @@ lazy_discover_providers = True # Currently it is only used in ``DagFileProcessor.process_file`` to retry ``dagbag.sync_to_db``. max_db_retries = 3 +# Hide sensitive Variables or Connection extra json keys from UI and task logs when set to True +# +# (Connection passwords are always hidden in logs) +hide_sensitive_var_conn_fields = True + +# A comma-separated list of extra sensitive keywords to look for in variables names or connection's +# extra JSON. +sensitive_var_conn_names = + [logging] # The folder where airflow should store its log files # This path must be absolute @@ -949,13 +958,6 @@ keytab = airflow.keytab [github_enterprise] api_rev = v3 -[admin] -# UI to hide sensitive variable fields when set to True -hide_sensitive_variable_fields = True - -# A comma-separated list of sensitive keywords to look for in variables names. -sensitive_variable_fields = - [elasticsearch] # Elasticsearch host host = diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg index 5b647aa90d5a54..2cbdb16446cbc4 100644 --- a/airflow/config_templates/default_test.cfg +++ b/airflow/config_templates/default_test.cfg @@ -113,10 +113,6 @@ scheduler_zombie_task_threshold = 300 dag_dir_list_interval = 0 max_tis_per_query = 512 -[admin] -hide_sensitive_variable_fields = True -sensitive_variable_fields = - [elasticsearch] host = log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}} diff --git a/airflow/configuration.py b/airflow/configuration.py index 20a03a0cccc5bf..4420ddaf278e37 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -160,6 +160,8 @@ class AirflowConfigParser(ConfigParser): # pylint: disable=too-many-ancestors ('metrics', 'statsd_custom_client_path'): ('scheduler', 'statsd_custom_client_path', '2.0.0'), ('scheduler', 'parsing_processes'): ('scheduler', 'max_threads', '1.10.14'), ('operators', 'default_queue'): ('celery', 'default_queue', '2.1.0'), + ('core', 'hide_sensitive_var_conn_fields'): ('admin', 'hide_sensitive_variable_fields', '2.1.0'), + ('core', 'sensitive_var_conn_names'): ('admin', 'sensitive_variable_fields', '2.1.0'), } # A mapping of old default values that we want to change and warn the user diff --git a/airflow/hooks/base.py b/airflow/hooks/base.py index dee76dc70a90f7..f286a6f2ce3d37 100644 --- a/airflow/hooks/base.py +++ b/airflow/hooks/base.py @@ -74,8 +74,8 @@ def get_connection(cls, conn_id: str) -> "Connection": conn.port, conn.schema, conn.login, - "XXXXXXXX" if conn.password else None, - "XXXXXXXX" if conn.extra_dejson else None, + conn.password, + conn.extra_dejson, ) return conn diff --git a/airflow/models/connection.py b/airflow/models/connection.py index f631aaef2c6771..9021edb56a9e9f 100644 --- a/airflow/models/connection.py +++ b/airflow/models/connection.py @@ -24,7 +24,7 @@ from sqlalchemy import Boolean, Column, Integer, String, Text from sqlalchemy.ext.declarative import declared_attr -from sqlalchemy.orm import synonym +from sqlalchemy.orm import reconstructor, synonym from airflow.configuration import ensure_secrets_loaded from airflow.exceptions import AirflowException, AirflowNotFoundException @@ -32,6 +32,7 @@ from airflow.models.crypto import get_fernet from airflow.providers_manager import ProvidersManager from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.log.secrets_masker import mask_secret from airflow.utils.module_loading import import_string @@ -141,6 +142,14 @@ def __init__( # pylint: disable=too-many-arguments self.port = port self.extra = extra + if self.password: + mask_secret(self.password) + + @reconstructor + def on_db_load(self): # pylint: disable=missing-function-docstring + if self.password: + mask_secret(self.password) + def parse_from_uri(self, **uri): """This method is deprecated. Please use uri parameter in constructor.""" warnings.warn( @@ -346,9 +355,13 @@ def extra_dejson(self) -> Dict: if self.extra: try: obj = json.loads(self.extra) + except JSONDecodeError: self.log.exception("Failed parsing the json for conn_id %s", self.conn_id) + # Mask sensitive keys from this list + mask_secret(obj) + return obj @classmethod diff --git a/airflow/models/dag.py b/airflow/models/dag.py index b55698eb2742f0..2c9088ff3013ea 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -65,6 +65,7 @@ from airflow.models.taskinstance import Context, TaskInstance, clear_task_instances from airflow.security import permissions from airflow.stats import Stats +from airflow.typing_compat import RePatternType from airflow.utils import timezone from airflow.utils.dates import cron_presets, date_range as utils_date_range from airflow.utils.file import correct_maybe_zipped @@ -79,13 +80,6 @@ from airflow.utils.task_group import TaskGroup -# Before Py 3.7, there is no re.Pattern class -try: - from re import Pattern as PatternType # type: ignore -except ImportError: - PatternType = type(re.compile('', 0)) - - log = logging.getLogger(__name__) ScheduleInterval = Union[str, timedelta, relativedelta] @@ -1444,7 +1438,7 @@ def sub_dag(self, *args, **kwargs): def partial_subset( self, - task_ids_or_regex: Union[str, PatternType, Iterable[str]], + task_ids_or_regex: Union[str, RePatternType, Iterable[str]], include_downstream=False, include_upstream=True, include_direct_upstream=False, @@ -1472,7 +1466,7 @@ def partial_subset( self.task_dict = task_dict self._task_group = task_group - if isinstance(task_ids_or_regex, (str, PatternType)): + if isinstance(task_ids_or_regex, (str, RePatternType)): matched_tasks = [t for t in self.tasks if re.findall(task_ids_or_regex, t.task_id)] else: matched_tasks = [t for t in self.tasks if t.task_id in task_ids_or_regex] diff --git a/airflow/models/renderedtifields.py b/airflow/models/renderedtifields.py index 91ec0278df9cff..0572c89acbcb3b 100644 --- a/airflow/models/renderedtifields.py +++ b/airflow/models/renderedtifields.py @@ -57,9 +57,20 @@ def __init__(self, ti: TaskInstance, render_templates=True): field: serialize_template_field(getattr(self.task, field)) for field in self.task.template_fields } + self._redact() + def __repr__(self): return f"<{self.__class__.__name__}: {self.dag_id}.{self.task_id} {self.execution_date}" + def _redact(self): + from airflow.utils.log.secrets_masker import redact + + if self.k8s_pod_yaml: + self.k8s_pod_yaml = redact(self.k8s_pod_yaml) + + for field, rendered in self.rendered_fields.items(): + self.rendered_fields[field] = redact(rendered, field) + @classmethod @provide_session def get_templated_fields(cls, ti: TaskInstance, session: Session = None) -> Optional[dict]: diff --git a/airflow/models/variable.py b/airflow/models/variable.py index 0341e7b2c65ddb..44627c0fdefa2f 100644 --- a/airflow/models/variable.py +++ b/airflow/models/variable.py @@ -24,12 +24,13 @@ from cryptography.fernet import InvalidToken as InvalidFernetToken from sqlalchemy import Boolean, Column, Integer, String, Text from sqlalchemy.ext.declarative import declared_attr -from sqlalchemy.orm import Session, synonym +from sqlalchemy.orm import Session, reconstructor, synonym from airflow.configuration import ensure_secrets_loaded from airflow.models.base import ID_LEN, Base from airflow.models.crypto import get_fernet from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.log.secrets_masker import mask_secret from airflow.utils.session import provide_session log = logging.getLogger() @@ -56,6 +57,11 @@ def __init__(self, key=None, val=None, description=None): self.val = val self.description = description + @reconstructor + def on_db_load(self): # pylint: disable=missing-function-docstring + if self._val: + mask_secret(self.val, self.key) + def __repr__(self): # Hiding the value return f'{self.key} : {self._val}' @@ -134,8 +140,11 @@ def get( raise KeyError(f'Variable {key} does not exist') else: if deserialize_json: - return json.loads(var_val) + obj = json.loads(var_val) + mask_secret(var_val, key) + return obj else: + mask_secret(var_val, key) return var_val @classmethod diff --git a/airflow/sensors/smart_sensor.py b/airflow/sensors/smart_sensor.py index 3df7313c05e6eb..4e7a6070027450 100644 --- a/airflow/sensors/smart_sensor.py +++ b/airflow/sensors/smart_sensor.py @@ -106,11 +106,18 @@ def create_new_task_handler(): Create task log handler for a sensor work. :return: log handler """ + from airflow.utils.log.secrets_masker import _secrets_masker # noqa + handler_config_copy = {k: handler_config[k] for k in handler_config} + del handler_config_copy['filters'] + formatter_config_copy = {k: formatter_config[k] for k in formatter_config} handler = dictConfigurator.configure_handler(handler_config_copy) formatter = dictConfigurator.configure_formatter(formatter_config_copy) handler.setFormatter(formatter) + + # We want to share the _global_ filterer instance, not create a new one + handler.addFilter(_secrets_masker()) return handler def _get_sensor_logger(self, si): diff --git a/airflow/settings.py b/airflow/settings.py index bf68020e4199a7..c80a218f82c089 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -205,6 +205,8 @@ def configure_vars(): def configure_orm(disable_connection_pool=False): """Configure ORM using SQLAlchemy""" + from airflow.utils.log.secrets_masker import mask_secret + log.debug("Setting up DB connection pool (PID %s)", os.getpid()) global engine global Session @@ -220,6 +222,9 @@ def configure_orm(disable_connection_pool=False): connect_args = {} engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args) + + mask_secret(engine.url.password) + setup_event_handlers(engine) Session = scoped_session( @@ -497,3 +502,9 @@ def initialize(): executor_constants.KUBERNETES_EXECUTOR, executor_constants.CELERY_KUBERNETES_EXECUTOR, } + +HIDE_SENSITIVE_VAR_CONN_FIELDS = conf.getboolean('core', 'hide_sensitive_var_conn_fields') + +# By default this is off, but is automatically configured on when running task +# instances +MASK_SECRETS_IN_LOGS = False diff --git a/airflow/typing_compat.py b/airflow/typing_compat.py index 6fd6d8c8252f2e..d98eb7b5562a00 100644 --- a/airflow/typing_compat.py +++ b/airflow/typing_compat.py @@ -32,3 +32,12 @@ ) except ImportError: from typing_extensions import Protocol, TypedDict, runtime_checkable # type: ignore # noqa + + +# Before Py 3.7, there is no re.Pattern class +try: + from re import Pattern as RePatternType # type: ignore # pylint: disable=unused-import +except ImportError: + import re + + RePatternType = type(re.compile('', 0)) diff --git a/airflow/utils/log/secrets_masker.py b/airflow/utils/log/secrets_masker.py new file mode 100644 index 00000000000000..5d279697f008b8 --- /dev/null +++ b/airflow/utils/log/secrets_masker.py @@ -0,0 +1,223 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Mask sensitive information from logs""" +import collections +import logging +import re +from typing import TYPE_CHECKING, Iterable, Optional, Set, TypeVar, Union + +try: + # 3.8+ + from functools import cached_property +except ImportError: + from cached_property import cached_property + +try: + # 3.9+ + from functools import cache +except ImportError: + from functools import lru_cache + + cache = lru_cache(maxsize=None) + + +if TYPE_CHECKING: + from airflow.typing_compat import RePatternType + + RedactableItem = TypeVar('RedctableItem') + +DEFAULT_SENSITIVE_FIELDS = frozenset( + { + 'password', + 'secret', + 'passwd', + 'authorization', + 'api_key', + 'apikey', + 'access_token', + } +) +"""Names of fields (Connection extra, Variable key name etc.) that are deemed sensitive""" + + +@cache +def get_sensitive_variables_fields(): + """Get comma-separated sensitive Variable Fields from airflow.cfg.""" + from airflow.configuration import conf + + sensitive_fields = DEFAULT_SENSITIVE_FIELDS.copy() + sensitive_variable_fields = conf.get('core', 'sensitive_var_conn_names') + if sensitive_variable_fields: + sensitive_fields |= frozenset({field.strip() for field in sensitive_variable_fields.split(',')}) + return sensitive_fields + + +def should_hide_value_for_key(name): + """Should the value for this given name (Variable name, or key in conn.extra_dejson) be hidden""" + from airflow import settings + + if name and settings.HIDE_SENSITIVE_VAR_CONN_FIELDS: + name = name.strip().lower() + return any(s in name for s in get_sensitive_variables_fields()) + return False + + +def mask_secret(secret: Union[str, dict, Iterable], name: str = None) -> None: + """ + Mask a secret from appearing in the task logs. + + If ``name`` is provided, then it will only be masked if the name matches + one of the configured "sensitive" names. + + If ``secret`` is a dict or a iterable (excluding str) then it will be + recursively walked and keys with sensitive names will be hidden. + """ + # Delay import + from airflow import settings + + # Filtering all log messages is not a free process, so we only do it when + # running tasks + if not settings.MASK_SECRETS_IN_LOGS or not secret: + return + + _secrets_masker().add_mask(secret, name) + + +def redact(value: "RedactableItem", name: str = None) -> "RedactableItem": + """Redact any secrets found in ``value``.""" + return _secrets_masker().redact(value, name) + + +@cache +def _secrets_masker() -> "SecretsMasker": + + for flt in logging.getLogger('airflow.task').filters: + if isinstance(flt, SecretsMasker): + return flt + raise RuntimeError("No SecretsMasker found!") + + +class SecretsMasker(logging.Filter): + """Redact secrets from logs""" + + replacer: Optional["RePatternType"] = None + patterns: Set[str] + + ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered" + + def __init__(self): + super().__init__() + self.patterns = set() + + @cached_property + def _record_attrs_to_ignore(self) -> Iterable[str]: + # Doing log.info(..., extra={'foo': 2}) sets extra properties on + # record, i.e. record.foo. And we need to filter those too. Fun + # + # Create a record, and look at what attributes are on it, and ignore + # all the default ones! + + record = logging.getLogRecordFactory()( + # name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None, + "x", + logging.INFO, + __file__, + 1, + "", + tuple(), + exc_info=None, + func="funcname", + ) + return frozenset(record.__dict__).difference({'msg', 'args'}) + + def filter(self, record) -> bool: + if self.ALREADY_FILTERED_FLAG in record.__dict__: + # Filters are attached to multiple handlers and logs, keep a + # "private" flag that stops us needing to process it more than once + return True + + if self.replacer: + for k, v in record.__dict__.items(): + if k in self._record_attrs_to_ignore: + continue + record.__dict__[k] = self.redact(v) + if record.exc_info: + exc = record.exc_info[1] + # I'm not sure if this is a good idea! + exc.args = (self.redact(v) for v in exc.args) + record.__dict__[self.ALREADY_FILTERED_FLAG] = True + + return True + + def _redact_all(self, item: "RedactableItem") -> "RedactableItem": + if isinstance(item, dict): + return {dict_key: self._redact_all(subval) for dict_key, subval in item.items()} + elif isinstance(item, str): + return '***' + elif isinstance(item, (tuple, set)): + # Turn set in to tuple! + return tuple(self._redact_all(subval) for subval in item) + elif isinstance(item, Iterable): + return list(self._redact_all(subval) for subval in item) + else: + return item + + # pylint: disable=too-many-return-statements + def redact(self, item: "RedactableItem", name: str = None) -> "RedactableItem": + """ + Redact an any secrets found in ``item``, if it is a string. + + If ``name`` is given, and it's a "sensitve" name (see + :func:`should_hide_value_for_key`) then all string values in the item + is redacted. + + """ + if name and should_hide_value_for_key(name): + return self._redact_all(item) + + if isinstance(item, dict): + return {dict_key: self.redact(subval, dict_key) for dict_key, subval in item.items()} + elif isinstance(item, str): + if self.replacer: + # We can't replace specific values, but the key-based redacting + # can still happen, so we can't short-circuit, we need to walk + # the strucutre. + return self.replacer.sub('***', item) + return item + elif isinstance(item, (tuple, set)): + # Turn set in to tuple! + return tuple(self.redact(subval) for subval in item) + elif isinstance(item, Iterable): + return list(self.redact(subval) for subval in item) + else: + return item + + # pylint: enable=too-many-return-statements + + def add_mask(self, secret: Union[str, dict, Iterable], name: str = None): + """Add a new secret to be masked to this filter instance.""" + if isinstance(secret, dict): + for k, v in secret.items(): + self.add_mask(v, k) + elif isinstance(secret, str): + pattern = re.escape(secret) + if pattern not in self.patterns and (not name or should_hide_value_for_key(name)): + self.patterns.add(pattern) + self.replacer = re.compile('|'.join(self.patterns)) + elif isinstance(secret, collections.abc.Iterable): + for v in secret: + self.add_mask(v, name) diff --git a/airflow/www/utils.py b/airflow/www/utils.py index af34536c4f84f8..d4fa8cdd1e64e2 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -28,7 +28,6 @@ from pygments import highlight, lexers from pygments.formatters import HtmlFormatter # noqa pylint: disable=no-name-in-module -from airflow.configuration import conf from airflow.utils import timezone from airflow.utils.code_utils import get_python_source from airflow.utils.json import AirflowJsonEncoder @@ -36,35 +35,33 @@ from airflow.www.forms import DateTimeWithTimezoneField from airflow.www.widgets import AirflowDateTimePickerWidget -DEFAULT_SENSITIVE_VARIABLE_FIELDS = [ - 'password', - 'secret', - 'passwd', - 'authorization', - 'api_key', - 'apikey', - 'access_token', -] - - -def get_sensitive_variables_fields(): - """Get comma-separated sensitive Variable Fields from airflow.cfg.""" - sensitive_fields = set(DEFAULT_SENSITIVE_VARIABLE_FIELDS) - sensitive_variable_fields = conf.get('admin', 'sensitive_variable_fields') - if sensitive_variable_fields: - sensitive_fields.update({field.strip() for field in sensitive_variable_fields.split(',')}) - return sensitive_fields - - -def should_hide_value_for_key(key_name): - """Returns True if hide_sensitive_variable_fields is True, else False""" - # It is possible via importing variables from file that a key is empty. - if key_name: - config_set = conf.getboolean('admin', 'hide_sensitive_variable_fields') - - field_comp = any(s in key_name.strip().lower() for s in get_sensitive_variables_fields()) - return config_set and field_comp - return False + +def get_sensitive_variables_fields(): # noqa: D103 + import warnings + + from airflow.utils.log.secrets_masker import get_sensitive_variables_fields + + warnings.warn( + "This function is deprecated. Please use " + "`airflow.utils.log.secrets_masker.get_sensitive_variables_fields`", + DeprecationWarning, + stacklevel=2, + ) + return get_sensitive_variables_fields() + + +def should_hide_value_for_key(key_name): # noqa: D103 + import warnings + + from airflow.utils.log.secrets_masker import should_hide_value_for_key + + warnings.warn( + "This function is deprecated. Please use " + "`airflow.utils.log.secrets_masker.should_hide_value_for_key`", + DeprecationWarning, + stacklevel=2, + ) + return should_hide_value_for_key(key_name) def get_params(**kwargs): diff --git a/airflow/www/views.py b/airflow/www/views.py index 21d4a820489e15..6e38cd67e2c31b 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -106,6 +106,7 @@ from airflow.utils.dates import infer_time_unit, scale_time_units from airflow.utils.docs import get_docs_url from airflow.utils.helpers import alchemy_to_dict +from airflow.utils.log import secrets_masker from airflow.utils.log.log_reader import TaskLogReader from airflow.utils.session import create_session, provide_session from airflow.utils.state import State @@ -3229,7 +3230,7 @@ def hidden_field_formatter(self): """Formats hidden fields""" key = self.get('key') # noqa pylint: disable=no-member val = self.get('val') # noqa pylint: disable=no-member - if wwwutils.should_hide_value_for_key(key): + if secrets_masker.should_hide_value_for_key(key): return Markup('*' * 8) if val: return val @@ -3243,7 +3244,7 @@ def hidden_field_formatter(self): validators_columns = {'key': [validators.DataRequired()]} def prefill_form(self, form, request_id): # pylint: disable=unused-argument - if wwwutils.should_hide_value_for_key(form.key.data): + if secrets_masker.should_hide_value_for_key(form.key.data): form.val.data = '*' * 8 @action('muldelete', 'Delete', 'Are you sure you want to delete selected records?', single=False) diff --git a/docs/apache-airflow/security/secrets/index.rst b/docs/apache-airflow/security/secrets/index.rst index 5cb0594fb76b66..fd5bbd0ce4f4ed 100644 --- a/docs/apache-airflow/security/secrets/index.rst +++ b/docs/apache-airflow/security/secrets/index.rst @@ -30,6 +30,69 @@ The following are particularly protected: .. toctree:: :maxdepth: 1 :glob: + :caption: Further reading: - fernet - secrets-backend/index + Encryption at rest + Using external Secret stores + +.. _security:mask-sensitive-values: + +Masking sensitive data +---------------------- + +Airflow will by default mask Connection passwords and sensitive Variables and keys from a Connection's +extra (JSON) field when they appear in Task logs, in the Variable and in the Rendered fields views of the UI. + +It does this by looking for the specific *value* appearing anywhere in your output. This means that if you +have a connection with a password of ``a``, then every instance of the letter a in your logs will be replaced +with ``***``. + +To disable masking you can setting :ref:`config:core__hide_sensitive_var_conn_fields` to false. + +The automatic masking is triggered by Connection or Variable access. This means that if you pass a sensitive +value via XCom or any other side-channel it will not be masked when printed in the downstream task. + +Sensitive field names +""""""""""""""""""""" + +When masking is enabled, Airflow will always mask the password field of every Connection that is accessed by a +task. + +It will also mask the value of a Variable, or the field of a Connection's extra JSON blob if the name contains +any words in ('password', 'secret', 'passwd', 'authorization', 'api_key', 'apikey', 'access_token'). This list +can also be extended: + +.. code-block:: ini + + [core] + sensitive_var_conn_names = comma,separated,sensitive,names + +Adding your own masks +""""""""""""""""""""" + +If you want to mask an additional secret that is already masked by one of the above methods, you can do it in +your DAG file or operator's ``execute`` function using the ``mask_secret`` function. For example: + +.. code-block:: python + + @task + def my_func(): + from airflow.utils.log.secrets_masker import mask_secret + mask_secret("custom_value") + + ... + +or + +.. code-block:: python + + + class MyOperator(BaseOperator): + + def execute(self, context): + from airflow.utils.log.secrets_masker import mask_secret + mask_secret("custom_value") + + ... + +The mask must be set before any log/output is produced to have any effect. diff --git a/docs/apache-airflow/security/webserver.rst b/docs/apache-airflow/security/webserver.rst index 249cc01e93690a..627da13b57864c 100644 --- a/docs/apache-airflow/security/webserver.rst +++ b/docs/apache-airflow/security/webserver.rst @@ -38,14 +38,8 @@ set the below: Sensitive Variable fields ------------------------- -By default, Airflow Value of a variable will be hidden if the key contains any words in -(‘password’, ‘secret’, ‘passwd’, ‘authorization’, ‘api_key’, ‘apikey’, ‘access_token’), but can be configured -to extend this list by using the following configurations option: - -.. code-block:: ini - - [admin] - hide_sensitive_variable_fields = comma_separated_sensitive_variable_fields_list +Variable values that are deemed "sensitive" based on the variable name will be masked in the UI automatically. +See :ref:`security:mask-sensitive-values` for more details. .. _web-authentication: diff --git a/docs/apache-airflow/ui.rst b/docs/apache-airflow/ui.rst index 879d4d793d4643..728c2aed46ef81 100644 --- a/docs/apache-airflow/ui.rst +++ b/docs/apache-airflow/ui.rst @@ -86,15 +86,7 @@ Variable View The variable view allows you to list, create, edit or delete the key-value pair of a variable used during jobs. Value of a variable will be hidden if the key contains any words in ('password', 'secret', 'passwd', 'authorization', 'api_key', 'apikey', 'access_token') -by default, but can be configured to show in clear-text (by configuration option -``hide_sensitive_variable_fields``). - -Users can also extend this list by using the following configurations option: - -.. code-block:: ini - - [admin] - sensitive_variable_fields = comma_separated_sensitive_variable_fields_list +by default, but can be configured to show in clear-text. See :ref:`security:mask-sensitive-values`. ------------ diff --git a/tests/core/test_config_templates.py b/tests/core/test_config_templates.py index 2efa838741abd3..aa98fa93bf4eff 100644 --- a/tests/core/test_config_templates.py +++ b/tests/core/test_config_templates.py @@ -47,7 +47,6 @@ 'scheduler', 'kerberos', 'github_enterprise', - 'admin', 'elasticsearch', 'elasticsearch_configs', 'kubernetes', @@ -66,7 +65,6 @@ 'smtp', 'celery', 'scheduler', - 'admin', 'elasticsearch', 'elasticsearch_configs', 'kubernetes', diff --git a/tests/models/test_connection.py b/tests/models/test_connection.py index 21fb7aad618b2a..29eed1c5be0eb3 100644 --- a/tests/models/test_connection.py +++ b/tests/models/test_connection.py @@ -16,6 +16,7 @@ # specific language governing permissions and limitations # under the License. import json +import os import re import unittest from collections import namedtuple @@ -61,6 +62,10 @@ def uri_test_name(func, num, param): class TestConnection(unittest.TestCase): def setUp(self): crypto._fernet = None + patcher = mock.patch('airflow.models.connection.mask_secret', autospec=True) + self.mask_secret = patcher.start() + + self.addCleanup(patcher.stop) def tearDown(self): crypto._fernet = None @@ -359,6 +364,15 @@ def test_connection_from_uri(self, test_config: UriTestCaseConfig): else: assert expected_val == actual_val + expected_calls = [] + if test_config.test_conn_attributes.get('password'): + expected_calls.append(mock.call(test_config.test_conn_attributes['password'])) + + if test_config.test_conn_attributes.get('extra_dejson'): + expected_calls.append(mock.call(test_config.test_conn_attributes['extra_dejson'])) + + self.mask_secret.assert_has_calls(expected_calls) + # pylint: disable=undefined-variable @parameterized.expand([(x,) for x in test_from_uri_params], UriTestCaseConfig.uri_test_name) def test_connection_get_uri_from_uri(self, test_config: UriTestCaseConfig): @@ -501,6 +515,8 @@ def test_using_env_var(self): assert 'password' == conn.password assert 5432 == conn.port + self.mask_secret.assert_called_once_with('password') + @mock.patch.dict( 'os.environ', { @@ -601,3 +617,35 @@ def test_connection_mixed(self): ), ): Connection(conn_id="TEST_ID", uri="mysql://", schema="AAA") + + def test_masking_from_db(self): + """Test secrets are masked when loaded directly from the DB""" + from airflow.settings import Session + + session = Session() + + try: + conn = Connection( + conn_id=f"test-{os.getpid()}", + conn_type="http", + password="s3cr3t", + extra='{"apikey":"masked too"}', + ) + session.add(conn) + session.flush() + + # Make sure we re-load it, not just get the cached object back + session.expunge(conn) + + self.mask_secret.reset_mock() + + from_db = session.query(Connection).get(conn.id) + from_db.extra_dejson + + assert self.mask_secret.mock_calls == [ + # We should have called it _again_ when loading from the DB + mock.call("s3cr3t"), + mock.call({"apikey": "masked too"}), + ] + finally: + session.rollback() diff --git a/tests/models/test_renderedtifields.py b/tests/models/test_renderedtifields.py index f7547535a030be..d83f542f875b9a 100644 --- a/tests/models/test_renderedtifields.py +++ b/tests/models/test_renderedtifields.py @@ -229,8 +229,9 @@ def test_write(self): ) == result_updated @mock.patch.dict(os.environ, {"AIRFLOW_IS_K8S_EXECUTOR_POD": "True"}) + @mock.patch('airflow.utils.log.secrets_masker.redact', autospec=True, side_effect=lambda d, _=None: d) @mock.patch("airflow.settings.pod_mutation_hook") - def test_get_k8s_pod_yaml(self, mock_pod_mutation_hook): + def test_get_k8s_pod_yaml(self, mock_pod_mutation_hook, redact): """ Test that k8s_pod_yaml is rendered correctly, stored in the Database, and are correctly fetched using RTIF.get_k8s_pod_yaml @@ -289,6 +290,8 @@ def test_get_k8s_pod_yaml(self, mock_pod_mutation_hook): } assert expected_pod_yaml == rtif.k8s_pod_yaml + # K8s pod spec dict was passed to redact + redact.assert_any_call(rtif.k8s_pod_yaml) with create_session() as session: session.add(rtif) @@ -303,3 +306,26 @@ def test_get_k8s_pod_yaml(self, mock_pod_mutation_hook): ti2 = TI(task_2, EXECUTION_DATE) assert RTIF.get_k8s_pod_yaml(ti=ti2) is None + + @mock.patch.dict(os.environ, {"AIRFLOW_VAR_API_KEY": "secret"}) + @mock.patch('airflow.utils.log.secrets_masker.redact', autospec=True) + def test_redact(self, redact): + dag = DAG("test_ritf_redact", start_date=START_DATE) + with dag: + task = BashOperator( + task_id="test", + bash_command="echo {{ var.value.api_key }}", + env={'foo': 'secret', 'other_api_key': 'masked based on key name'}, + ) + + redact.side_effect = [ + 'val 1', + 'val 2', + ] + + ti = TI(task=task, execution_date=EXECUTION_DATE) + rtif = RTIF(ti=ti) + assert rtif.rendered_fields == { + 'bash_command': 'val 1', + 'env': 'val 2', + } diff --git a/tests/models/test_variable.py b/tests/models/test_variable.py index e5e1d7aeda605e..28aa0561a5f5f4 100644 --- a/tests/models/test_variable.py +++ b/tests/models/test_variable.py @@ -16,6 +16,7 @@ # specific language governing permissions and limitations # under the License. +import os import unittest from unittest import mock @@ -33,9 +34,16 @@ class TestVariable(unittest.TestCase): def setUp(self): crypto._fernet = None db.clear_db_variables() + patcher = mock.patch('airflow.models.variable.mask_secret', autospec=True) + self.mask_secret = patcher.start() + + self.addCleanup(patcher.stop) def tearDown(self): crypto._fernet = None + + @classmethod + def tearDownClass(cls): db.clear_db_variables() @conf_vars({('core', 'fernet_key'): ''}) @@ -48,6 +56,9 @@ def test_variable_no_encryption(self): test_var = session.query(Variable).filter(Variable.key == 'key').one() assert not test_var.is_encrypted assert test_var.val == 'value' + # We always call mask_secret for variables, and let the SecretsMasker decide based on the name if it + # should mask anything. That logic is tested in test_secrets_masker.py + self.mask_secret.assert_called_once_with('value', 'key') @conf_vars({('core', 'fernet_key'): Fernet.generate_key().decode()}) def test_variable_with_encryption(self): @@ -172,3 +183,32 @@ def test_variable_delete(self): Variable.delete(key) with pytest.raises(KeyError): Variable.get(key) + + def test_masking_from_db(self): + """Test secrets are masked when loaded directly from the DB""" + + # Normally people will use `Variable.get`, but just in case, catch direct DB access too + + session = settings.Session() + + try: + var = Variable( + key=f"password-{os.getpid()}", + val="s3cr3t", + ) + session.add(var) + session.flush() + + # Make sure we re-load it, not just get the cached object back + session.expunge(var) + + self.mask_secret.reset_mock() + + session.query(Variable).get(var.id) + + assert self.mask_secret.mock_calls == [ + # We should have called it _again_ when loading from the DB + mock.call("s3cr3t", var.key), + ] + finally: + session.rollback() diff --git a/tests/utils/log/test_secrets_masker.py b/tests/utils/log/test_secrets_masker.py new file mode 100644 index 00000000000000..f2a7dece291670 --- /dev/null +++ b/tests/utils/log/test_secrets_masker.py @@ -0,0 +1,236 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import inspect +import logging +import logging.config +import os +import textwrap + +import pytest + +from airflow.utils.log.secrets_masker import SecretsMasker, should_hide_value_for_key +from tests.test_utils.config import conf_vars + + +@pytest.fixture +def logger(caplog): + logging.config.dictConfig( + { + 'version': 1, + 'handlers': { + __name__: { + # Reset later + 'class': 'logging.StreamHandler', + 'stream': 'ext://sys.stdout', + } + }, + 'loggers': { + __name__: { + 'handlers': [__name__], + 'level': logging.INFO, + 'propagate': False, + } + }, + 'disable_existing_loggers': False, + } + ) + formatter = ShortExcFormatter("%(levelname)s %(message)s") + logger = logging.getLogger(__name__) + + caplog.handler.setFormatter(formatter) + logger.handlers = [caplog.handler] + filt = SecretsMasker() + logger.addFilter(filt) + + filt.add_mask('password') + + return logger + + +class TestSecretsMasker: + def test_message(self, logger, caplog): + logger.info("XpasswordY") + + assert caplog.text == "INFO X***Y\n" + + def test_args(self, logger, caplog): + logger.info("Cannot connect to %s", "user:password") + + assert caplog.text == "INFO Cannot connect to user:***\n" + + def test_extra(self, logger, caplog): + logger.handlers[0].formatter = ShortExcFormatter("%(levelname)s %(message)s %(conn)s") + logger.info("Cannot connect", extra={'conn': "user:password"}) + + assert caplog.text == "INFO Cannot connect user:***\n" + + def test_exception(self, logger, caplog): + try: + conn = "user:password" + raise RuntimeError("Cannot connect to " + conn) + except RuntimeError: + logger.exception("Err") + + line = lineno() - 4 + + assert caplog.text == textwrap.dedent( + f"""\ + ERROR Err + Traceback (most recent call last): + File ".../test_secrets_masker.py", line {line}, in test_exception + raise RuntimeError("Cannot connect to " + conn) + RuntimeError: Cannot connect to user:*** + """ + ) + + @pytest.mark.xfail(reason="Cannot filter secrets in traceback source") + def test_exc_tb(self, logger, caplog): + """ + Show it is not possible to filter secrets in the source. + + It is not possible to (regularly/reliably) filter out secrets that + appear directly in the source code. This is because the formatting of + exc_info is not done in the filter, it is done after the filter is + called, and fixing this "properly" is hard/impossible. + + (It would likely need to construct a custom traceback that changed the + source. I have no idead if that is even possible) + + This test illustrates that, but ix marked xfail incase someone wants to + fix this later. + """ + try: + raise RuntimeError("Cannot connect to user:password") + except RuntimeError: + logger.exception("Err") + + line = lineno() - 4 + + assert caplog.text == textwrap.dedent( + f"""\ + ERROR Err + Traceback (most recent call last): + File ".../test_secrets_masker.py", line {line}, in test_exc_tb + raise RuntimeError("Cannot connect to user:***) + RuntimeError: Cannot connect to user:*** + """ + ) + + @pytest.mark.parametrize( + ("name", "value", "expected_mask"), + [ + (None, "secret", {"secret"}), + ("apikey", "secret", {"secret"}), + # the value for "apikey", and "password" should end up masked + (None, {"apikey": "secret", "other": {"val": "innocent", "password": "foo"}}, {"secret", "foo"}), + (None, ["secret", "other"], {"secret", "other"}), + # When the "sensitive value" is a dict, don't mask anything + # (Or should this be mask _everything_ under it ? + ("api_key", {"other": "innoent"}, set()), + ], + ) + def test_mask_secret(self, name, value, expected_mask): + filt = SecretsMasker() + filt.add_mask(value, name) + + assert filt.patterns == expected_mask + + @pytest.mark.parametrize( + ("patterns", "name", "value", "expected"), + [ + ({"secret"}, None, "secret", "***"), + ( + {"secret", "foo"}, + None, + {"apikey": "secret", "other": {"val": "innocent", "password": "foo"}}, + {"apikey": "***", "other": {"val": "innocent", "password": "***"}}, + ), + ({"secret", "other"}, None, ["secret", "other"], ["***", "***"]), + # We don't mask dict _keys_. + ({"secret", "other"}, None, {"data": {"secret": "secret"}}, {"data": {"secret": "***"}}), + ( + # Since this is a sensitve name, all the values should be redacted! + {"secret"}, + "api_key", + {"other": "innoent", "nested": ["x", "y"]}, + {"other": "***", "nested": ["***", "***"]}, + ), + ( + # Test that masking still works based on name even when no patterns given + set(), + 'env', + {'api_key': 'masked based on key name', 'other': 'foo'}, + {'api_key': '***', 'other': 'foo'}, + ), + ], + ) + def test_redact(self, patterns, name, value, expected): + filt = SecretsMasker() + for val in patterns: + filt.add_mask(val) + + assert filt.redact(value, name) == expected + + +class TestShouldHideValueForKey: + @pytest.mark.parametrize( + ("key", "expected_result"), + [ + ('', False), + (None, False), + ("key", False), + ("google_api_key", True), + ("GOOGLE_API_KEY", True), + ("GOOGLE_APIKEY", True), + ], + ) + def test_hiding_defaults(self, key, expected_result): + assert expected_result == should_hide_value_for_key(key) + + @pytest.mark.parametrize( + ("sensitive_variable_fields", "key", "expected_result"), + [ + ('key', 'TRELLO_KEY', True), + ('key', 'TRELLO_API_KEY', True), + ('key', 'GITHUB_APIKEY', True), + ('key, token', 'TRELLO_TOKEN', True), + ('mysecretword, mysensitivekey', 'GITHUB_mysecretword', True), + (None, 'TRELLO_API', False), + ('token', 'TRELLO_KEY', False), + ('token, mysecretword', 'TRELLO_KEY', False), + ], + ) + def test_hiding_config(self, sensitive_variable_fields, key, expected_result): + from airflow.utils.log.secrets_masker import get_sensitive_variables_fields + + with conf_vars({('core', 'sensitive_var_conn_names'): str(sensitive_variable_fields)}): + get_sensitive_variables_fields.cache_clear() + assert expected_result == should_hide_value_for_key(key) + get_sensitive_variables_fields.cache_clear() + + +class ShortExcFormatter(logging.Formatter): + """Don't include full path in exc_info messages""" + + def formatException(self, exc_info): + formatted = super().formatException(exc_info) + return formatted.replace(__file__, ".../" + os.path.basename(__file__)) + + +def lineno(): + """Returns the current line number in our program.""" + return inspect.currentframe().f_back.f_lineno diff --git a/tests/www/test_utils.py b/tests/www/test_utils.py index f3f2db0f4ad823..8f381a54b8dfa1 100644 --- a/tests/www/test_utils.py +++ b/tests/www/test_utils.py @@ -22,55 +22,12 @@ from urllib.parse import parse_qs from bs4 import BeautifulSoup -from parameterized import parameterized from airflow.www import utils from airflow.www.utils import wrapped_markdown -from tests.test_utils.config import conf_vars class TestUtils(unittest.TestCase): - def test_empty_variable_should_not_be_hidden(self): - assert not utils.should_hide_value_for_key("") - assert not utils.should_hide_value_for_key(None) - - def test_normal_variable_should_not_be_hidden(self): - assert not utils.should_hide_value_for_key("key") - - def test_sensitive_variable_should_be_hidden(self): - assert utils.should_hide_value_for_key("google_api_key") - - def test_sensitive_variable_should_be_hidden_ic(self): - assert utils.should_hide_value_for_key("GOOGLE_API_KEY") - - @parameterized.expand( - [ - ('key', 'TRELLO_KEY', True), - ('key', 'TRELLO_API_KEY', True), - ('key', 'GITHUB_APIKEY', True), - ('key, token', 'TRELLO_TOKEN', True), - ('mysecretword, mysensitivekey', 'GITHUB_mysecretword', True), - ], - ) - def test_sensitive_variable_fields_should_be_hidden( - self, sensitive_variable_fields, key, expected_result - ): - with conf_vars({('admin', 'sensitive_variable_fields'): str(sensitive_variable_fields)}): - assert expected_result == utils.should_hide_value_for_key(key) - - @parameterized.expand( - [ - (None, 'TRELLO_API', False), - ('token', 'TRELLO_KEY', False), - ('token, mysecretword', 'TRELLO_KEY', False), - ], - ) - def test_normal_variable_fields_should_not_be_hidden( - self, sensitive_variable_fields, key, expected_result - ): - with conf_vars({('admin', 'sensitive_variable_fields'): str(sensitive_variable_fields)}): - assert expected_result == utils.should_hide_value_for_key(key) - def check_generate_pages_html(self, current_page, total_pages, window=7, check_middle=False): extra_links = 4 # first, prev, next, last search = "'>\"/>"