From d0b8e1f9a7578ed949fa379d3fd0141de7b2a068 Mon Sep 17 00:00:00 2001 From: Hao Liang Date: Wed, 4 Sep 2019 00:08:55 +0800 Subject: [PATCH] [AIRFLOW-4858] Deprecate "Historical convenience functions" in airflow.configuration (#5495) 1. Issue old conf method deprecation warnings properly and remove current old conf method usages. 2. Unify the way to use conf as `from airflow.configuration import conf` (cherry picked from commit f497d1d5aad9d88d6c1cd1dfa28546e7a8c5cb5f) --- airflow/api/__init__.py | 6 +- airflow/api/auth/backend/kerberos_auth.py | 2 +- airflow/bin/airflow | 8 +-- airflow/bin/cli.py | 4 +- .../airflow_local_settings.py | 2 +- airflow/config_templates/default_celery.py | 28 +++++----- airflow/configuration.py | 2 +- .../auth/backends/github_enterprise_auth.py | 6 +- airflow/contrib/auth/backends/google_auth.py | 5 +- .../contrib/auth/backends/kerberos_auth.py | 8 +-- airflow/contrib/auth/backends/ldap_auth.py | 50 ++++++++--------- .../contrib/executors/kubernetes_executor.py | 26 ++++----- airflow/contrib/executors/mesos_executor.py | 42 +++++++------- airflow/contrib/hooks/qubole_hook.py | 6 +- airflow/contrib/operators/ssh_operator.py | 4 +- airflow/contrib/operators/winrm_operator.py | 4 +- airflow/executors/__init__.py | 4 +- airflow/executors/base_executor.py | 4 +- airflow/executors/celery_executor.py | 10 ++-- airflow/executors/dask_executor.py | 10 ++-- airflow/hooks/hdfs_hook.py | 4 +- airflow/hooks/hive_hooks.py | 15 ++--- airflow/hooks/webhdfs_hook.py | 4 +- airflow/jobs/base_job.py | 2 +- airflow/jobs/local_task_job.py | 2 +- airflow/jobs/scheduler_job.py | 2 +- airflow/lineage/__init__.py | 2 +- airflow/lineage/backend/atlas/__init__.py | 2 +- airflow/logging_config.py | 2 +- airflow/models/base.py | 4 +- airflow/models/baseoperator.py | 7 ++- airflow/models/crypto.py | 4 +- airflow/models/dag.py | 19 ++++--- airflow/models/dagbag.py | 17 +++--- airflow/models/taskinstance.py | 19 ++++--- airflow/models/xcom.py | 8 +-- airflow/security/kerberos.py | 29 ++++------ airflow/task/task_runner/__init__.py | 4 +- airflow/task/task_runner/base_task_runner.py | 2 +- airflow/utils/configuration.py | 2 +- airflow/utils/dag_processing.py | 2 +- airflow/utils/email.py | 18 +++--- airflow/utils/helpers.py | 4 +- airflow/utils/log/file_task_handler.py | 2 +- airflow/utils/log/gcs_task_handler.py | 4 +- airflow/utils/log/s3_task_handler.py | 6 +- airflow/utils/log/wasb_task_handler.py | 4 +- airflow/utils/operator_resources.py | 10 ++-- airflow/www/app.py | 13 ++--- airflow/www/utils.py | 7 ++- airflow/www/views.py | 9 +-- airflow/www_rbac/app.py | 2 +- airflow/www_rbac/utils.py | 8 +-- airflow/www_rbac/views.py | 8 +-- scripts/perf/scheduler_ops_metrics.py | 5 +- .../common/experimental/test_mark_tasks.py | 5 +- .../operators/test_s3_to_sftp_operator.py | 4 +- tests/core.py | 55 ++++++++++--------- tests/dags/test_impersonation_custom.py | 2 +- tests/executors/test_celery_executor.py | 8 +-- tests/executors/test_dask_executor.py | 16 +++--- tests/jobs/test_backfill_job.py | 16 +++--- tests/jobs/test_local_task_job.py | 6 +- tests/jobs/test_scheduler_job.py | 23 ++++---- tests/models/test_cleartasks.py | 11 ++-- tests/models/test_dag.py | 5 +- tests/models/test_dagbag.py | 11 ++-- tests/models/test_taskinstance.py | 9 +-- tests/operators/test_hive_operator.py | 5 +- tests/operators/test_operators.py | 39 ++++++------- tests/security/test_kerberos.py | 12 ++-- tests/sensors/test_sql_sensor.py | 6 +- tests/test_configuration.py | 8 +++ tests/test_logging_config.py | 27 +++------ tests/utils/test_dag_processing.py | 3 +- .../experimental/test_kerberos_endpoints.py | 8 +-- tests/www/test_views.py | 6 +- .../experimental/test_kerberos_endpoints.py | 4 +- tests/www_rbac/test_views.py | 4 +- 79 files changed, 389 insertions(+), 387 deletions(-) diff --git a/airflow/api/__init__.py b/airflow/api/__init__.py index 3d470ff875c3de..2feb7f4ab3c8b6 100644 --- a/airflow/api/__init__.py +++ b/airflow/api/__init__.py @@ -26,8 +26,8 @@ import lazy_object_proxy from zope.deprecation import deprecated -from airflow.exceptions import AirflowException -from airflow import configuration as conf +from airflow.exceptions import AirflowException, AirflowConfigException +from airflow.configuration import conf from airflow.utils.log.logging_mixin import LoggingMixin @@ -49,7 +49,7 @@ def load_auth(): auth_backend = 'airflow.api.auth.backend.default' try: auth_backend = conf.get("api", "auth_backend") - except conf.AirflowConfigException: + except AirflowConfigException: pass try: diff --git a/airflow/api/auth/backend/kerberos_auth.py b/airflow/api/auth/backend/kerberos_auth.py index bf0b89710321d3..3e340f86330d01 100644 --- a/airflow/api/auth/backend/kerberos_auth.py +++ b/airflow/api/auth/backend/kerberos_auth.py @@ -60,7 +60,7 @@ from requests_kerberos import HTTPKerberosAuth -from airflow import configuration as conf +from airflow.configuration import conf from airflow.utils.log.logging_mixin import LoggingMixin diff --git a/airflow/bin/airflow b/airflow/bin/airflow index d0b7db3bf5c89b..010eeb37f8fcfe 100755 --- a/airflow/bin/airflow +++ b/airflow/bin/airflow @@ -18,14 +18,14 @@ # specific language governing permissions and limitations # under the License. import os -from airflow import configuration +from airflow.configuration import conf from airflow.bin.cli import CLIFactory if __name__ == '__main__': - if configuration.conf.get("core", "security") == 'kerberos': - os.environ['KRB5CCNAME'] = configuration.conf.get('kerberos', 'ccache') - os.environ['KRB5_KTNAME'] = configuration.conf.get('kerberos', 'keytab') + if conf.get("core", "security") == 'kerberos': + os.environ['KRB5CCNAME'] = conf.get('kerberos', 'ccache') + os.environ['KRB5_KTNAME'] = conf.get('kerberos', 'keytab') parser = CLIFactory.get_parser() args = parser.parse_args() diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 2044016cab9e30..920bde9aeb25af 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -55,7 +55,7 @@ import airflow from airflow import api from airflow import jobs, settings -from airflow import configuration as conf +from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowWebServerTimeout from airflow.executors import get_default_executor from airflow.models import ( @@ -516,7 +516,7 @@ def run(args, dag=None): if os.path.exists(args.cfg_path): os.remove(args.cfg_path) - conf.conf.read_dict(conf_dict, source=args.cfg_path) + conf.read_dict(conf_dict, source=args.cfg_path) settings.configure_vars() # IMPORTANT, have to use the NullPool, otherwise, each "run" command may leave diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index 295a2a211b53ab..4555b9822f7a68 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -22,7 +22,7 @@ import six -from airflow import configuration as conf +from airflow.configuration import conf from airflow.utils.file import mkdirs # TODO: Logging format and level should be configured diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py index 3d8a767b0ea9c0..35a7c510ed810c 100644 --- a/airflow/config_templates/default_celery.py +++ b/airflow/config_templates/default_celery.py @@ -19,7 +19,7 @@ """Default celery configuration.""" import ssl -from airflow import configuration +from airflow.configuration import conf from airflow.exceptions import AirflowConfigException, AirflowException from airflow.utils.log.logging_mixin import LoggingMixin @@ -30,9 +30,9 @@ def _broker_supports_visibility_timeout(url): log = LoggingMixin().log -broker_url = configuration.conf.get('celery', 'BROKER_URL') +broker_url = conf.get('celery', 'BROKER_URL') -broker_transport_options = configuration.conf.getsection( +broker_transport_options = conf.getsection( 'celery_broker_transport_options' ) if 'visibility_timeout' not in broker_transport_options: @@ -44,31 +44,31 @@ def _broker_supports_visibility_timeout(url): 'event_serializer': 'json', 'worker_prefetch_multiplier': 1, 'task_acks_late': True, - 'task_default_queue': configuration.conf.get('celery', 'DEFAULT_QUEUE'), - 'task_default_exchange': configuration.conf.get('celery', 'DEFAULT_QUEUE'), + 'task_default_queue': conf.get('celery', 'DEFAULT_QUEUE'), + 'task_default_exchange': conf.get('celery', 'DEFAULT_QUEUE'), 'broker_url': broker_url, 'broker_transport_options': broker_transport_options, - 'result_backend': configuration.conf.get('celery', 'RESULT_BACKEND'), - 'worker_concurrency': configuration.conf.getint('celery', 'WORKER_CONCURRENCY'), + 'result_backend': conf.get('celery', 'RESULT_BACKEND'), + 'worker_concurrency': conf.getint('celery', 'WORKER_CONCURRENCY'), } celery_ssl_active = False try: - celery_ssl_active = configuration.conf.getboolean('celery', 'SSL_ACTIVE') + celery_ssl_active = conf.getboolean('celery', 'SSL_ACTIVE') except AirflowConfigException: log.warning("Celery Executor will run without SSL") try: if celery_ssl_active: if 'amqp://' in broker_url: - broker_use_ssl = {'keyfile': configuration.conf.get('celery', 'SSL_KEY'), - 'certfile': configuration.conf.get('celery', 'SSL_CERT'), - 'ca_certs': configuration.conf.get('celery', 'SSL_CACERT'), + broker_use_ssl = {'keyfile': conf.get('celery', 'SSL_KEY'), + 'certfile': conf.get('celery', 'SSL_CERT'), + 'ca_certs': conf.get('celery', 'SSL_CACERT'), 'cert_reqs': ssl.CERT_REQUIRED} elif 'redis://' in broker_url: - broker_use_ssl = {'ssl_keyfile': configuration.conf.get('celery', 'SSL_KEY'), - 'ssl_certfile': configuration.conf.get('celery', 'SSL_CERT'), - 'ssl_ca_certs': configuration.conf.get('celery', 'SSL_CACERT'), + broker_use_ssl = {'ssl_keyfile': conf.get('celery', 'SSL_KEY'), + 'ssl_certfile': conf.get('celery', 'SSL_CERT'), + 'ssl_ca_certs': conf.get('celery', 'SSL_CACERT'), 'ssl_cert_reqs': ssl.CERT_REQUIRED} else: raise AirflowException('The broker you configured does not support SSL_ACTIVE to be True. ' diff --git a/airflow/configuration.py b/airflow/configuration.py index 01d6ec65ca5d03..68149eb6a4416d 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -657,7 +657,7 @@ def get_airflow_test_config(airflow_home): for func in [load_test_config, get, getboolean, getfloat, getint, has_option, remove_option, as_dict, set]: deprecated( - func, + func.__name__, "Accessing configuration method '{f.__name__}' directly from " "the configuration module is deprecated. Please access the " "configuration from the 'configuration.conf' object via " diff --git a/airflow/contrib/auth/backends/github_enterprise_auth.py b/airflow/contrib/auth/backends/github_enterprise_auth.py index deb5e4aec5f0ed..28819222e59438 100644 --- a/airflow/contrib/auth/backends/github_enterprise_auth.py +++ b/airflow/contrib/auth/backends/github_enterprise_auth.py @@ -26,8 +26,8 @@ from flask_oauthlib.client import OAuth -from airflow import models, configuration -from airflow.configuration import AirflowConfigException +from airflow import models +from airflow.configuration import AirflowConfigException, conf from airflow.utils.db import provide_session from airflow.utils.log.logging_mixin import LoggingMixin @@ -35,7 +35,7 @@ def get_config_param(param): - return str(configuration.conf.get('github_enterprise', param)) + return str(conf.get('github_enterprise', param)) class GHEUser(models.User): diff --git a/airflow/contrib/auth/backends/google_auth.py b/airflow/contrib/auth/backends/google_auth.py index 10dc3a91407437..2ce23ae56cd567 100644 --- a/airflow/contrib/auth/backends/google_auth.py +++ b/airflow/contrib/auth/backends/google_auth.py @@ -26,7 +26,8 @@ from flask_oauthlib.client import OAuth -from airflow import models, configuration +from airflow import models +from airflow.configuration import conf from airflow.utils.db import provide_session from airflow.utils.log.logging_mixin import LoggingMixin @@ -34,7 +35,7 @@ def get_config_param(param): - return str(configuration.conf.get('google', param)) + return str(conf.get('google', param)) class GoogleUser(models.User): diff --git a/airflow/contrib/auth/backends/kerberos_auth.py b/airflow/contrib/auth/backends/kerberos_auth.py index 6aa61988709460..e84a0b2aeb9866 100644 --- a/airflow/contrib/auth/backends/kerberos_auth.py +++ b/airflow/contrib/auth/backends/kerberos_auth.py @@ -33,7 +33,7 @@ from flask import url_for, redirect from airflow import models -from airflow import configuration +from airflow.configuration import conf from airflow.utils.db import provide_session from airflow.utils.log.logging_mixin import LoggingMixin @@ -55,13 +55,13 @@ def __init__(self, user): @staticmethod def authenticate(username, password): service_principal = "%s/%s" % ( - configuration.conf.get('kerberos', 'principal'), + conf.get('kerberos', 'principal'), utils.get_fqdn() ) - realm = configuration.conf.get("kerberos", "default_realm") + realm = conf.get("kerberos", "default_realm") try: - user_realm = configuration.conf.get("security", "default_realm") + user_realm = conf.get("security", "default_realm") except AirflowConfigException: user_realm = realm diff --git a/airflow/contrib/auth/backends/ldap_auth.py b/airflow/contrib/auth/backends/ldap_auth.py index 4b17515bace742..7368deae88dd8c 100644 --- a/airflow/contrib/auth/backends/ldap_auth.py +++ b/airflow/contrib/auth/backends/ldap_auth.py @@ -30,7 +30,7 @@ from flask import url_for, redirect from airflow import models -from airflow import configuration +from airflow.configuration import conf from airflow.configuration import AirflowConfigException from airflow.utils.db import provide_session @@ -56,12 +56,12 @@ class LdapException(Exception): def get_ldap_connection(dn=None, password=None): try: - cacert = configuration.conf.get("ldap", "cacert") + cacert = conf.get("ldap", "cacert") except AirflowConfigException: pass try: - ignore_malformed_schema = configuration.conf.get("ldap", "ignore_malformed_schema") + ignore_malformed_schema = conf.get("ldap", "ignore_malformed_schema") except AirflowConfigException: pass @@ -71,7 +71,7 @@ def get_ldap_connection(dn=None, password=None): tls_configuration = Tls(validate=ssl.CERT_REQUIRED, ca_certs_file=cacert) - server = Server(configuration.conf.get("ldap", "uri"), + server = Server(conf.get("ldap", "uri"), use_ssl=True, tls=tls_configuration) @@ -102,7 +102,7 @@ def group_contains_user(conn, search_base, group_filter, user_name_attr, usernam def groups_user(conn, search_base, user_filter, user_name_att, username): search_filter = "(&({0})({1}={2}))".format(user_filter, user_name_att, username) try: - memberof_attr = configuration.conf.get("ldap", "group_member_attr") + memberof_attr = conf.get("ldap", "group_member_attr") except Exception: memberof_attr = "memberOf" res = conn.search(native(search_base), native(search_filter), @@ -138,13 +138,13 @@ def __init__(self, user): self.ldap_groups = [] # Load and cache superuser and data_profiler settings. - conn = get_ldap_connection(configuration.conf.get("ldap", "bind_user"), - configuration.conf.get("ldap", "bind_password")) + conn = get_ldap_connection(conf.get("ldap", "bind_user"), + conf.get("ldap", "bind_password")) superuser_filter = None data_profiler_filter = None try: - superuser_filter = configuration.conf.get("ldap", "superuser_filter") + superuser_filter = conf.get("ldap", "superuser_filter") except AirflowConfigException: pass @@ -153,14 +153,14 @@ def __init__(self, user): log.debug("Missing configuration for superuser settings or empty. Skipping.") else: self.superuser = group_contains_user(conn, - configuration.conf.get("ldap", "basedn"), + conf.get("ldap", "basedn"), superuser_filter, - configuration.conf.get("ldap", - "user_name_attr"), + conf.get("ldap", + "user_name_attr"), user.username) try: - data_profiler_filter = configuration.conf.get("ldap", "data_profiler_filter") + data_profiler_filter = conf.get("ldap", "data_profiler_filter") except AirflowConfigException: pass @@ -171,10 +171,10 @@ def __init__(self, user): else: self.data_profiler = group_contains_user( conn, - configuration.conf.get("ldap", "basedn"), + conf.get("ldap", "basedn"), data_profiler_filter, - configuration.conf.get("ldap", - "user_name_attr"), + conf.get("ldap", + "user_name_attr"), user.username ) @@ -182,9 +182,9 @@ def __init__(self, user): try: self.ldap_groups = groups_user( conn, - configuration.conf.get("ldap", "basedn"), - configuration.conf.get("ldap", "user_filter"), - configuration.conf.get("ldap", "user_name_attr"), + conf.get("ldap", "basedn"), + conf.get("ldap", "user_filter"), + conf.get("ldap", "user_name_attr"), user.username ) except AirflowConfigException: @@ -192,25 +192,25 @@ def __init__(self, user): @staticmethod def try_login(username, password): - conn = get_ldap_connection(configuration.conf.get("ldap", "bind_user"), - configuration.conf.get("ldap", "bind_password")) + conn = get_ldap_connection(conf.get("ldap", "bind_user"), + conf.get("ldap", "bind_password")) search_filter = "(&({0})({1}={2}))".format( - configuration.conf.get("ldap", "user_filter"), - configuration.conf.get("ldap", "user_name_attr"), + conf.get("ldap", "user_filter"), + conf.get("ldap", "user_name_attr"), username ) search_scope = LEVEL - if configuration.conf.has_option("ldap", "search_scope"): - if configuration.conf.get("ldap", "search_scope") == "SUBTREE": + if conf.has_option("ldap", "search_scope"): + if conf.get("ldap", "search_scope") == "SUBTREE": search_scope = SUBTREE else: search_scope = LEVEL # todo: BASE or ONELEVEL? - res = conn.search(native(configuration.conf.get("ldap", "basedn")), + res = conn.search(native(conf.get("ldap", "basedn")), native(search_filter), search_scope=native(search_scope)) diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index b0e45cac3fcf90..96e1c3265cdcb3 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -40,7 +40,7 @@ from airflow.models import KubeResourceVersion, KubeWorkerIdentifier, TaskInstance from airflow.utils.state import State from airflow.utils.db import provide_session, create_session -from airflow import configuration, settings +from airflow import settings from airflow.exceptions import AirflowConfigException, AirflowException from airflow.utils.log.logging_mixin import LoggingMixin @@ -135,24 +135,24 @@ class KubeConfig: kubernetes_section = 'kubernetes' def __init__(self): - configuration_dict = configuration.as_dict(display_sensitive=True) + configuration_dict = conf.as_dict(display_sensitive=True) self.core_configuration = configuration_dict['core'] self.kube_secrets = configuration_dict.get('kubernetes_secrets', {}) self.kube_env_vars = configuration_dict.get('kubernetes_environment_variables', {}) - self.env_from_configmap_ref = configuration.get(self.kubernetes_section, - 'env_from_configmap_ref') - self.env_from_secret_ref = configuration.get(self.kubernetes_section, - 'env_from_secret_ref') + self.env_from_configmap_ref = conf.get(self.kubernetes_section, + 'env_from_configmap_ref') + self.env_from_secret_ref = conf.get(self.kubernetes_section, + 'env_from_secret_ref') self.airflow_home = settings.AIRFLOW_HOME - self.dags_folder = configuration.get(self.core_section, 'dags_folder') - self.parallelism = configuration.getint(self.core_section, 'PARALLELISM') - self.worker_container_repository = configuration.get( + self.dags_folder = conf.get(self.core_section, 'dags_folder') + self.parallelism = conf.getint(self.core_section, 'parallelism') + self.worker_container_repository = conf.get( self.kubernetes_section, 'worker_container_repository') - self.worker_container_tag = configuration.get( + self.worker_container_tag = conf.get( self.kubernetes_section, 'worker_container_tag') self.kube_image = '{}:{}'.format( self.worker_container_repository, self.worker_container_tag) - self.kube_image_pull_policy = configuration.get( + self.kube_image_pull_policy = conf.get( self.kubernetes_section, "worker_container_image_pull_policy" ) self.kube_node_selectors = configuration_dict.get('kubernetes_node_selectors', {}) @@ -224,7 +224,7 @@ def __init__(self): self.logs_volume_host = conf.get(self.kubernetes_section, 'logs_volume_host') # This prop may optionally be set for PV Claims and is used to write logs - self.base_log_folder = configuration.get(self.core_section, 'base_log_folder') + self.base_log_folder = conf.get(self.core_section, 'base_log_folder') # The Kubernetes Namespace in which the Scheduler and Webserver reside. Note # that if your @@ -285,7 +285,7 @@ def __init__(self): # pod security context items should return integers # and only return a blank string if contexts are not set. def _get_security_context_val(self, scontext): - val = configuration.get(self.kubernetes_section, scontext) + val = conf.get(self.kubernetes_section, scontext) if not val: return 0 else: diff --git a/airflow/contrib/executors/mesos_executor.py b/airflow/contrib/executors/mesos_executor.py index 4748091916ec53..9c34ff2d8aa325 100644 --- a/airflow/contrib/executors/mesos_executor.py +++ b/airflow/contrib/executors/mesos_executor.py @@ -26,7 +26,7 @@ from mesos.interface import mesos_pb2 import mesos.native -from airflow import configuration +from airflow.configuration import conf from airflow.executors.base_executor import BaseExecutor from airflow.settings import Session from airflow.utils.state import State @@ -38,9 +38,9 @@ def get_framework_name(): - if not configuration.conf.get('mesos', 'FRAMEWORK_NAME'): + if not conf.get('mesos', 'FRAMEWORK_NAME'): return DEFAULT_FRAMEWORK_NAME - return configuration.conf.get('mesos', 'FRAMEWORK_NAME') + return conf.get('mesos', 'FRAMEWORK_NAME') # AirflowMesosScheduler, implements Mesos Scheduler interface @@ -64,8 +64,8 @@ def __init__(self, self.task_mem = task_mem self.task_counter = 0 self.task_key_map = {} - if configuration.get('mesos', 'DOCKER_IMAGE_SLAVE'): - self.mesos_slave_docker_image = configuration.get( + if conf.get('mesos', 'DOCKER_IMAGE_SLAVE'): + self.mesos_slave_docker_image = conf.get( 'mesos', 'DOCKER_IMAGE_SLAVE' ) @@ -73,8 +73,8 @@ def registered(self, driver, frameworkId, masterInfo): self.log.info("AirflowScheduler registered to Mesos with framework ID %s", frameworkId.value) - if configuration.conf.getboolean('mesos', 'CHECKPOINT') and \ - configuration.conf.get('mesos', 'FAILOVER_TIMEOUT'): + if conf.getboolean('mesos', 'CHECKPOINT') and \ + conf.get('mesos', 'FAILOVER_TIMEOUT'): # Import here to work around a circular import error from airflow.models import Connection @@ -226,28 +226,28 @@ def start(self): framework = mesos_pb2.FrameworkInfo() framework.user = '' - if not configuration.conf.get('mesos', 'MASTER'): + if not conf.get('mesos', 'MASTER'): self.log.error("Expecting mesos master URL for mesos executor") raise AirflowException("mesos.master not provided for mesos executor") - master = configuration.conf.get('mesos', 'MASTER') + master = conf.get('mesos', 'MASTER') framework.name = get_framework_name() - if not configuration.conf.get('mesos', 'TASK_CPU'): + if not conf.get('mesos', 'TASK_CPU'): task_cpu = 1 else: - task_cpu = configuration.conf.getint('mesos', 'TASK_CPU') + task_cpu = conf.getint('mesos', 'TASK_CPU') - if not configuration.conf.get('mesos', 'TASK_MEMORY'): + if not conf.get('mesos', 'TASK_MEMORY'): task_memory = 256 else: - task_memory = configuration.conf.getint('mesos', 'TASK_MEMORY') + task_memory = conf.getint('mesos', 'TASK_MEMORY') - if configuration.conf.getboolean('mesos', 'CHECKPOINT'): + if conf.getboolean('mesos', 'CHECKPOINT'): framework.checkpoint = True - if configuration.conf.get('mesos', 'FAILOVER_TIMEOUT'): + if conf.get('mesos', 'FAILOVER_TIMEOUT'): # Import here to work around a circular import error from airflow.models import Connection @@ -260,7 +260,7 @@ def start(self): # with running tasks. framework.id.value = connection.extra - framework.failover_timeout = configuration.conf.getint( + framework.failover_timeout = conf.getint( 'mesos', 'FAILOVER_TIMEOUT' ) else: @@ -274,19 +274,19 @@ def start(self): implicit_acknowledgements = 1 - if configuration.conf.getboolean('mesos', 'AUTHENTICATE'): - if not configuration.conf.get('mesos', 'DEFAULT_PRINCIPAL'): + if conf.getboolean('mesos', 'AUTHENTICATE'): + if not conf.get('mesos', 'DEFAULT_PRINCIPAL'): self.log.error("Expecting authentication principal in the environment") raise AirflowException( "mesos.default_principal not provided in authenticated mode") - if not configuration.conf.get('mesos', 'DEFAULT_SECRET'): + if not conf.get('mesos', 'DEFAULT_SECRET'): self.log.error("Expecting authentication secret in the environment") raise AirflowException( "mesos.default_secret not provided in authenticated mode") credential = mesos_pb2.Credential() - credential.principal = configuration.conf.get('mesos', 'DEFAULT_PRINCIPAL') - credential.secret = configuration.conf.get('mesos', 'DEFAULT_SECRET') + credential.principal = conf.get('mesos', 'DEFAULT_PRINCIPAL') + credential.secret = conf.get('mesos', 'DEFAULT_SECRET') framework.principal = credential.principal diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py index ef3afb7527933a..1cc579e4ecf879 100644 --- a/airflow/contrib/hooks/qubole_hook.py +++ b/airflow/contrib/hooks/qubole_hook.py @@ -31,7 +31,7 @@ from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook -from airflow import configuration +from airflow.configuration import conf, mkdir_p from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import State from airflow.models import TaskInstance @@ -181,10 +181,10 @@ def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True): if fp is None: iso = datetime.datetime.utcnow().isoformat() logpath = os.path.expanduser( - configuration.conf.get('core', 'BASE_LOG_FOLDER') + conf.get('core', 'BASE_LOG_FOLDER') ) resultpath = logpath + '/' + self.dag_id + '/' + self.task_id + '/results' - configuration.mkdir_p(resultpath) + mkdir_p(resultpath) fp = open(resultpath + '/' + iso, 'wb') if self.cmd is None: diff --git a/airflow/contrib/operators/ssh_operator.py b/airflow/contrib/operators/ssh_operator.py index 28b77507e092bc..4e4a86cd5768e9 100644 --- a/airflow/contrib/operators/ssh_operator.py +++ b/airflow/contrib/operators/ssh_operator.py @@ -20,7 +20,7 @@ from base64 import b64encode from select import select -from airflow import configuration +from airflow.configuration import conf from airflow.contrib.hooks.ssh_hook import SSHHook from airflow.exceptions import AirflowException from airflow.models import BaseOperator @@ -156,7 +156,7 @@ def execute(self, context): if exit_status == 0: # returning output if do_xcom_push is set if self.do_xcom_push: - enable_pickling = configuration.conf.getboolean( + enable_pickling = conf.getboolean( 'core', 'enable_xcom_pickling' ) if enable_pickling: diff --git a/airflow/contrib/operators/winrm_operator.py b/airflow/contrib/operators/winrm_operator.py index 3f9dbe6360d2e5..e773ab07f1b8b6 100644 --- a/airflow/contrib/operators/winrm_operator.py +++ b/airflow/contrib/operators/winrm_operator.py @@ -22,7 +22,7 @@ from winrm.exceptions import WinRMOperationTimeoutError -from airflow import configuration +from airflow.configuration import conf from airflow.contrib.hooks.winrm_hook import WinRMHook from airflow.exceptions import AirflowException from airflow.models import BaseOperator @@ -129,7 +129,7 @@ def execute(self, context): if return_code == 0: # returning output if do_xcom_push is set if self.do_xcom_push: - enable_pickling = configuration.conf.getboolean( + enable_pickling = conf.getboolean( 'core', 'enable_xcom_pickling' ) if enable_pickling: diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py index b9d8a020a70ae7..48020ee78a562e 100644 --- a/airflow/executors/__init__.py +++ b/airflow/executors/__init__.py @@ -19,7 +19,7 @@ import sys from airflow.utils.log.logging_mixin import LoggingMixin -from airflow import configuration +from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.executors.base_executor import BaseExecutor # noqa from airflow.executors.local_executor import LocalExecutor @@ -43,7 +43,7 @@ def get_default_executor(): if DEFAULT_EXECUTOR is not None: return DEFAULT_EXECUTOR - executor_name = configuration.conf.get('core', 'EXECUTOR') + executor_name = conf.get('core', 'EXECUTOR') DEFAULT_EXECUTOR = _get_executor(executor_name) diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 7c93be08a5a000..61522a24f6c36f 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -22,12 +22,12 @@ # To avoid circular imports import airflow.utils.dag_processing -from airflow import configuration +from airflow.configuration import conf from airflow.settings import Stats from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import State -PARALLELISM = configuration.conf.getint('core', 'PARALLELISM') +PARALLELISM = conf.getint('core', 'PARALLELISM') class BaseExecutor(LoggingMixin): diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 2c108bc7c76c3b..fa763f2a7d0352 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -27,7 +27,7 @@ from celery import Celery from celery import states as celery_states -from airflow import configuration +from airflow.configuration import conf from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG from airflow.exceptions import AirflowException from airflow.executors.base_executor import BaseExecutor @@ -45,15 +45,15 @@ airflow worker ''' -if configuration.conf.has_option('celery', 'celery_config_options'): +if conf.has_option('celery', 'celery_config_options'): celery_configuration = import_string( - configuration.conf.get('celery', 'celery_config_options') + conf.get('celery', 'celery_config_options') ) else: celery_configuration = DEFAULT_CELERY_CONFIG app = Celery( - configuration.conf.get('celery', 'CELERY_APP_NAME'), + conf.get('celery', 'CELERY_APP_NAME'), config_source=celery_configuration) @@ -141,7 +141,7 @@ def __init__(self): # (which can become a bottleneck on bigger clusters) so we use # a multiprocessing pool to speed this up. # How many worker processes are created for checking celery task state. - self._sync_parallelism = configuration.getint('celery', 'SYNC_PARALLELISM') + self._sync_parallelism = conf.getint('celery', 'SYNC_PARALLELISM') if self._sync_parallelism == 0: self._sync_parallelism = max(1, cpu_count() - 1) diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py index 0dae25180600e4..d322f34bdc8294 100644 --- a/airflow/executors/dask_executor.py +++ b/airflow/executors/dask_executor.py @@ -21,7 +21,7 @@ import subprocess import warnings -from airflow import configuration +from airflow.configuration import conf from airflow.executors.base_executor import BaseExecutor @@ -31,15 +31,15 @@ class DaskExecutor(BaseExecutor): """ def __init__(self, cluster_address=None): if cluster_address is None: - cluster_address = configuration.conf.get('dask', 'cluster_address') + cluster_address = conf.get('dask', 'cluster_address') if not cluster_address: raise ValueError( 'Please provide a Dask cluster address in airflow.cfg') self.cluster_address = cluster_address # ssl / tls parameters - self.tls_ca = configuration.get('dask', 'tls_ca') - self.tls_key = configuration.get('dask', 'tls_key') - self.tls_cert = configuration.get('dask', 'tls_cert') + self.tls_ca = conf.get('dask', 'tls_ca') + self.tls_key = conf.get('dask', 'tls_key') + self.tls_cert = conf.get('dask', 'tls_cert') super(DaskExecutor, self).__init__(parallelism=0) def start(self): diff --git a/airflow/hooks/hdfs_hook.py b/airflow/hooks/hdfs_hook.py index 31b2f11501a012..0275efd11cbc41 100644 --- a/airflow/hooks/hdfs_hook.py +++ b/airflow/hooks/hdfs_hook.py @@ -16,7 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from airflow import configuration +from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook @@ -62,7 +62,7 @@ def get_conn(self): # take the first. effective_user = self.proxy_user autoconfig = self.autoconfig - use_sasl = configuration.conf.get('core', 'security') == 'kerberos' + use_sasl = conf.get('core', 'security') == 'kerberos' try: connections = self.get_connections(self.hdfs_conn_id) diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index 81780be09421d3..e50508f19fd1d7 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -34,7 +34,7 @@ from six.moves import zip import airflow.security.utils as utils -from airflow import configuration +from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook from airflow.utils.file import TemporaryDirectory @@ -102,8 +102,8 @@ def __init__( "Invalid Mapred Queue Priority. Valid values are: " "{}".format(', '.join(HIVE_QUEUE_PRIORITIES))) - self.mapred_queue = mapred_queue or configuration.get('hive', - 'default_hive_mapred_queue') + self.mapred_queue = mapred_queue or conf.get('hive', + 'default_hive_mapred_queue') self.mapred_queue_priority = mapred_queue_priority self.mapred_job_name = mapred_job_name @@ -134,7 +134,7 @@ def _prepare_cli_cmd(self): hive_bin = 'beeline' jdbc_url = "jdbc:hive2://{host}:{port}/{schema}".format( host=conn.host, port=conn.port, schema=conn.schema) - if configuration.conf.get('core', 'security') == 'kerberos': + if conf.get('core', 'security') == 'kerberos': template = conn.extra_dejson.get( 'principal', "hive/_HOST@EXAMPLE.COM") if "_HOST" in template: @@ -512,12 +512,13 @@ def get_metastore_client(self): from thrift.protocol import TBinaryProtocol ms = self.metastore_conn auth_mechanism = ms.extra_dejson.get('authMechanism', 'NOSASL') - if configuration.conf.get('core', 'security') == 'kerberos': + + if conf.get('core', 'security') == 'kerberos': auth_mechanism = ms.extra_dejson.get('authMechanism', 'GSSAPI') kerberos_service_name = ms.extra_dejson.get('kerberos_service_name', 'hive') socket = TSocket.TSocket(ms.host, ms.port) - if configuration.conf.get('core', 'security') == 'kerberos' \ + if conf.get('core', 'security') == 'kerberos' \ and auth_mechanism == 'GSSAPI': try: import saslwrapper as sasl @@ -786,7 +787,7 @@ def get_conn(self, schema=None): # we need to give a username username = 'airflow' kerberos_service_name = None - if configuration.conf.get('core', 'security') == 'kerberos': + if conf.get('core', 'security') == 'kerberos': auth_mechanism = db.extra_dejson.get('authMechanism', 'KERBEROS') kerberos_service_name = db.extra_dejson.get('kerberos_service_name', 'hive') diff --git a/airflow/hooks/webhdfs_hook.py b/airflow/hooks/webhdfs_hook.py index b1514afe135473..6d260eeda46898 100644 --- a/airflow/hooks/webhdfs_hook.py +++ b/airflow/hooks/webhdfs_hook.py @@ -19,12 +19,12 @@ from hdfs import InsecureClient, HdfsError -from airflow import configuration +from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook from airflow.utils.log.logging_mixin import LoggingMixin -_kerberos_security_mode = configuration.conf.get("core", "security") == "kerberos" +_kerberos_security_mode = conf.get("core", "security") == "kerberos" if _kerberos_security_mode: try: from hdfs.ext.kerberos import KerberosClient diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py index 9067443a960022..5be68acb414230 100644 --- a/airflow/jobs/base_job.py +++ b/airflow/jobs/base_job.py @@ -30,7 +30,7 @@ from sqlalchemy.orm.session import make_transient, Session from typing import Optional -from airflow import configuration as conf +from airflow.configuration import conf from airflow import executors, models from airflow.exceptions import ( AirflowException, diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py index e2d15a9f5c839b..fdcdcf7c008a9a 100644 --- a/airflow/jobs/local_task_job.py +++ b/airflow/jobs/local_task_job.py @@ -28,7 +28,7 @@ from sqlalchemy.exc import OperationalError -from airflow import configuration as conf +from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.settings import Stats from airflow.task.task_runner import get_task_runner diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 8ac1f38c681816..cb46533735e69b 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -39,7 +39,7 @@ from sqlalchemy import and_, func, not_, or_ from sqlalchemy.orm.session import make_transient -from airflow import configuration as conf +from airflow.configuration import conf from airflow import executors, models, settings from airflow.exceptions import AirflowException from airflow.models import DagRun, SlaMiss, errors diff --git a/airflow/lineage/__init__.py b/airflow/lineage/__init__.py index e3f7e8489d48ac..f444139f009794 100644 --- a/airflow/lineage/__init__.py +++ b/airflow/lineage/__init__.py @@ -18,7 +18,7 @@ # under the License. from functools import wraps -from airflow import configuration as conf +from airflow.configuration import conf from airflow.lineage.datasets import DataSet from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.module_loading import import_string diff --git a/airflow/lineage/backend/atlas/__init__.py b/airflow/lineage/backend/atlas/__init__.py index 0f626a08447c45..8e0dfc5dc3dfc1 100644 --- a/airflow/lineage/backend/atlas/__init__.py +++ b/airflow/lineage/backend/atlas/__init__.py @@ -17,7 +17,7 @@ # specific language governing permissions and limitations # under the License. # -from airflow import configuration as conf +from airflow.configuration import conf from airflow.lineage import datasets from airflow.lineage.backend import LineageBackend from airflow.lineage.backend.atlas.typedefs import operator_typedef diff --git a/airflow/logging_config.py b/airflow/logging_config.py index 0cf8f4db0d59bb..c8350982748815 100644 --- a/airflow/logging_config.py +++ b/airflow/logging_config.py @@ -21,7 +21,7 @@ import warnings from logging.config import dictConfig -from airflow import configuration as conf +from airflow.configuration import conf from airflow.exceptions import AirflowConfigException from airflow.utils.module_loading import import_string diff --git a/airflow/models/base.py b/airflow/models/base.py index 97c6b777984d86..3dacb8adcedb1e 100644 --- a/airflow/models/base.py +++ b/airflow/models/base.py @@ -21,9 +21,9 @@ from sqlalchemy import MetaData from sqlalchemy.ext.declarative import declarative_base -import airflow +from airflow.configuration import conf -SQL_ALCHEMY_SCHEMA = airflow.configuration.get("core", "SQL_ALCHEMY_SCHEMA") +SQL_ALCHEMY_SCHEMA = conf.get("core", "SQL_ALCHEMY_SCHEMA") metadata = ( None diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index d3e846fab0f2a8..ec60b0d8a3439b 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -33,7 +33,8 @@ import jinja2 import six -from airflow import configuration, settings +from airflow import settings +from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.lineage import prepare_lineage, apply_lineage, DataSet from airflow.models.dag import DAG @@ -265,7 +266,7 @@ class derived from this one results in the creation of a task object, def __init__( self, task_id, # type: str - owner=configuration.conf.get('operators', 'DEFAULT_OWNER'), # type: str + owner=conf.get('operators', 'DEFAULT_OWNER'), # type: str email=None, # type: Optional[str] email_on_retry=True, # type: bool email_on_failure=True, # type: bool @@ -283,7 +284,7 @@ def __init__( default_args=None, # type: Optional[Dict] priority_weight=1, # type: int weight_rule=WeightRule.DOWNSTREAM, # type: str - queue=configuration.conf.get('celery', 'default_queue'), # type: str + queue=conf.get('celery', 'default_queue'), # type: str pool=Pool.DEFAULT_POOL_NAME, # type: str sla=None, # type: Optional[timedelta] execution_timeout=None, # type: Optional[timedelta] diff --git a/airflow/models/crypto.py b/airflow/models/crypto.py index 07e754fe186eb0..abc7d2d24f481a 100644 --- a/airflow/models/crypto.py +++ b/airflow/models/crypto.py @@ -19,7 +19,7 @@ from builtins import ImportError as BuiltinImportError -from airflow import configuration +from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.utils.log.logging_mixin import LoggingMixin @@ -79,7 +79,7 @@ def get_fernet(): return _fernet try: - fernet_key = configuration.conf.get('core', 'FERNET_KEY') + fernet_key = conf.get('core', 'FERNET_KEY') if not fernet_key: log.warning( "empty cryptography key - values will not be stored encrypted." diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 42789d98490999..4568eb904d23b5 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -39,7 +39,8 @@ from future.standard_library import install_aliases from sqlalchemy import Column, String, Boolean, Integer, Text, func, or_ -from airflow import configuration, settings, utils +from airflow import settings, utils +from airflow.configuration import conf from airflow.dag.base_dag import BaseDag from airflow.exceptions import AirflowException, AirflowDagCycleException from airflow.executors import LocalExecutor, get_default_executor @@ -195,14 +196,14 @@ def __init__( user_defined_macros=None, # type: Optional[Dict] user_defined_filters=None, # type: Optional[Dict] default_args=None, # type: Optional[Dict] - concurrency=configuration.conf.getint('core', 'dag_concurrency'), # type: int - max_active_runs=configuration.conf.getint( + concurrency=conf.getint('core', 'dag_concurrency'), # type: int + max_active_runs=conf.getint( 'core', 'max_active_runs_per_dag'), # type: int dagrun_timeout=None, # type: Optional[timedelta] sla_miss_callback=None, # type: Optional[Callable] default_view=None, # type: Optional[str] - orientation=configuration.conf.get('webserver', 'dag_orientation'), # type: str - catchup=configuration.conf.getboolean('scheduler', 'catchup_by_default'), # type: bool + orientation=conf.get('webserver', 'dag_orientation'), # type: str + catchup=conf.getboolean('scheduler', 'catchup_by_default'), # type: bool on_success_callback=None, # type: Optional[Callable] on_failure_callback=None, # type: Optional[Callable] doc_md=None, # type: Optional[str] @@ -345,7 +346,7 @@ def __exit__(self, _type, _value, _tb): def get_default_view(self): """This is only there for backward compatible jinja2 templates""" if self._default_view is None: - return configuration.conf.get('webserver', 'dag_default_view').lower() + return conf.get('webserver', 'dag_default_view').lower() else: return self._default_view @@ -1170,7 +1171,7 @@ def run( mark_success=False, local=False, executor=None, - donot_pickle=configuration.conf.getboolean('core', 'donot_pickle'), + donot_pickle=conf.getboolean('core', 'donot_pickle'), ignore_task_deps=False, ignore_first_depends_on_past=False, pool=None, @@ -1461,7 +1462,7 @@ class DagModel(Base): dag_id = Column(String(ID_LEN), primary_key=True) # A DAG can be paused from the UI / DB # Set this default value of is_paused based on a configuration value! - is_paused_at_creation = configuration.conf\ + is_paused_at_creation = conf\ .getboolean('core', 'dags_are_paused_at_creation') is_paused = Column(Boolean, default=is_paused_at_creation) @@ -1513,7 +1514,7 @@ def get_current(cls, dag_id, session=None): def get_default_view(self): if self.default_view is None: - return configuration.conf.get('webserver', 'dag_default_view').lower() + return conf.get('webserver', 'dag_default_view').lower() else: return self.default_view diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 36ef374961f427..ee7423089f81d2 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -34,7 +34,8 @@ from croniter import croniter, CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError from sqlalchemy import or_ -from airflow import configuration, settings +from airflow import settings +from airflow.configuration import conf from airflow.dag.base_dag import BaseDagBag from airflow.exceptions import AirflowDagCycleException from airflow.executors import get_default_executor @@ -80,8 +81,8 @@ def __init__( self, dag_folder=None, executor=None, - include_examples=configuration.conf.getboolean('core', 'LOAD_EXAMPLES'), - safe_mode=configuration.conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE')): + include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'), + safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE')): # do not use default arg in signature, to fix import cycle on plugin load if executor is None: @@ -197,7 +198,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): if mod_name in sys.modules: del sys.modules[mod_name] - with timeout(configuration.conf.getint('core', "DAGBAG_IMPORT_TIMEOUT")): + with timeout(conf.getint('core', "DAGBAG_IMPORT_TIMEOUT")): try: m = imp.load_source(mod_name, filepath) mods.append(m) @@ -287,7 +288,7 @@ def kill_zombies(self, session=None): # How many seconds do we wait for tasks to heartbeat before mark them as zombies. zombie_threshold_secs = ( - configuration.getint('scheduler', 'scheduler_zombie_task_threshold')) + conf.getint('scheduler', 'scheduler_zombie_task_threshold')) limit_dttm = timezone.utcnow() - timedelta( seconds=zombie_threshold_secs) self.log.debug("Failing jobs without heartbeat after %s", limit_dttm) @@ -307,7 +308,7 @@ def kill_zombies(self, session=None): for ti in tis: self.log.info("Detected zombie job with dag_id %s, task_id %s, and execution date %s", ti.dag_id, ti.task_id, ti.execution_date.isoformat()) - ti.test_mode = configuration.getboolean('core', 'unit_test_mode') + ti.test_mode = conf.getboolean('core', 'unit_test_mode') ti.task = self.dags[ti.dag_id].get_task(ti.task_id) ti.handle_failure("{} detected as zombie".format(ti), ti.test_mode, ti.get_template_context()) @@ -355,8 +356,8 @@ def collect_dags( self, dag_folder=None, only_if_updated=True, - include_examples=configuration.conf.getboolean('core', 'LOAD_EXAMPLES'), - safe_mode=configuration.conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE')): + include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'), + safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE')): """ Given a file path or a folder, this method looks for python modules, imports them and adds them to the dagbag collection. diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index ed69796b7ad5f7..500ddfd0d4fd39 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -38,7 +38,8 @@ from sqlalchemy.orm import reconstructor from sqlalchemy.orm.session import Session -from airflow import configuration, settings +from airflow import settings +from airflow.configuration import conf from airflow.exceptions import ( AirflowException, AirflowTaskTimeout, AirflowSkipException, AirflowRescheduleException ) @@ -369,25 +370,25 @@ def generate_command(dag_id, @property def log_filepath(self): iso = self.execution_date.isoformat() - log = os.path.expanduser(configuration.conf.get('core', 'BASE_LOG_FOLDER')) + log = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) return ("{log}/{dag_id}/{task_id}/{iso}.log".format( log=log, dag_id=self.dag_id, task_id=self.task_id, iso=iso)) @property def log_url(self): iso = self.execution_date.isoformat() - base_url = configuration.conf.get('webserver', 'BASE_URL') + base_url = conf.get('webserver', 'BASE_URL') relative_url = '/log?execution_date={iso}&task_id={task_id}&dag_id={dag_id}'.format( iso=quote_plus(iso), task_id=quote_plus(self.task_id), dag_id=quote_plus(self.dag_id)) - if configuration.conf.getboolean('webserver', 'rbac'): + if conf.getboolean('webserver', 'rbac'): return '{base_url}{relative_url}'.format(base_url=base_url, relative_url=relative_url) return '{base_url}/admin/airflow{relative_url}'.format(base_url=base_url, relative_url=relative_url) @property def mark_success_url(self): iso = quote(self.execution_date.isoformat()) - base_url = configuration.conf.get('webserver', 'BASE_URL') + base_url = conf.get('webserver', 'BASE_URL') return base_url + ( "/success" "?task_id={task_id}" @@ -1172,7 +1173,7 @@ def get_template_context(self, session=None): if task.params: params.update(task.params) - if configuration.getboolean('core', 'dag_run_conf_overrides_params'): + if conf.getboolean('core', 'dag_run_conf_overrides_params'): self.overwrite_params_with_dag_run_conf(params=params, dag_run=dag_run) class VariableAccessor: @@ -1206,6 +1207,7 @@ def __repr__(self): return str(self.var) return { + 'conf': conf, 'dag': task.dag, 'ds': ds, 'next_ds': next_ds, @@ -1238,7 +1240,6 @@ def __repr__(self): 'task_instance': self, 'ti': self, 'task_instance_key_str': ti_key_str, - 'conf': configuration, 'test_mode': self.test_mode, 'var': { 'value': VariableAccessor(), @@ -1286,8 +1287,8 @@ def email_alert(self, exception): ) def render(key, content): - if configuration.has_option('email', key): - path = configuration.get('email', key) + if conf.has_option('email', key): + path = conf.get('email', key) with open(path) as f: content = f.read() diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py index 78ec7bee0f1ef5..434cb7151f5dc1 100644 --- a/airflow/models/xcom.py +++ b/airflow/models/xcom.py @@ -23,7 +23,7 @@ from sqlalchemy import Column, Integer, String, Index, LargeBinary, and_ from sqlalchemy.orm import reconstructor -from airflow import configuration +from airflow.configuration import conf from airflow.models.base import Base, ID_LEN from airflow.utils import timezone from airflow.utils.db import provide_session @@ -65,7 +65,7 @@ class XCom(Base, LoggingMixin): """ @reconstructor def init_on_load(self): - enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling') + enable_pickling = conf.getboolean('core', 'enable_xcom_pickling') if enable_pickling: self.value = pickle.loads(self.value) else: @@ -155,7 +155,7 @@ def get_one(cls, result = query.first() if result: - enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling') + enable_pickling = conf.getboolean('core', 'enable_xcom_pickling') if enable_pickling: return pickle.loads(result.value) else: @@ -220,7 +220,7 @@ def delete(cls, xcoms, session=None): def serialize_value(value): # TODO: "pickling" has been deprecated and JSON is preferred. # "pickling" will be removed in Airflow 2.0. - if configuration.getboolean('core', 'enable_xcom_pickling'): + if conf.getboolean('core', 'enable_xcom_pickling'): return pickle.dumps(value) try: diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py index f072e193bbb653..35fb671d70195e 100644 --- a/airflow/security/kerberos.py +++ b/airflow/security/kerberos.py @@ -38,7 +38,8 @@ import sys import time -from airflow import configuration, LoggingMixin +from airflow import LoggingMixin +from airflow.configuration import conf NEED_KRB181_WORKAROUND = None @@ -56,18 +57,18 @@ def renew_from_kt(principal, keytab): # The config is specified in seconds. But we ask for that same amount in # minutes to give ourselves a large renewal buffer. - renewal_lifetime = "%sm" % configuration.conf.getint('kerberos', 'reinit_frequency') + renewal_lifetime = "%sm" % conf.getint('kerberos', 'reinit_frequency') - cmd_principal = principal or configuration.conf.get('kerberos', 'principal').replace( + cmd_principal = principal or conf.get('kerberos', 'principal').replace( "_HOST", socket.getfqdn() ) cmdv = [ - configuration.conf.get('kerberos', 'kinit_path'), + conf.get('kerberos', 'kinit_path'), "-r", renewal_lifetime, "-k", # host ticket "-t", keytab, # specify keytab - "-c", configuration.conf.get('kerberos', 'ccache'), # specify credentials cache + "-c", conf.get('kerberos', 'ccache'), # specify credentials cache cmd_principal ] log.info("Re-initialising kerberos from keytab: %s", " ".join(cmdv)) @@ -97,14 +98,8 @@ def renew_from_kt(principal, keytab): def perform_krb181_workaround(principal): - """ - Workaround for Kerberos 1.8.1. - - :param principal: principal name - :return: None - """ - cmdv = [configuration.conf.get('kerberos', 'kinit_path'), - "-c", configuration.conf.get('kerberos', 'ccache'), + cmdv = [conf.get('kerberos', 'kinit_path'), + "-c", conf.get('kerberos', 'ccache'), "-R"] # Renew ticket_cache log.info( @@ -114,10 +109,10 @@ def perform_krb181_workaround(principal): ret = subprocess.call(cmdv, close_fds=True) if ret != 0: - principal = "%s/%s" % (principal or configuration.conf.get('kerberos', 'principal'), + principal = "%s/%s" % (principal or conf.get('kerberos', 'principal'), socket.getfqdn()) princ = principal - ccache = configuration.conf.get('kerberos', 'principal') + ccache = conf.get('kerberos', 'principal') log.error( "Couldn't renew kerberos ticket in order to work around Kerberos 1.8.1 issue. Please check that " "the ticket for '%s' is still renewable:\n $ kinit -f -c %s\nIf the 'renew until' date is the " @@ -134,7 +129,7 @@ def detect_conf_var(): Sun Java Krb5LoginModule in Java6, so we need to take an action to work around it. """ - ticket_cache = configuration.conf.get('kerberos', 'ccache') + ticket_cache = conf.get('kerberos', 'ccache') with open(ticket_cache, 'rb') as f: # Note: this file is binary, so we check against a bytearray. @@ -155,4 +150,4 @@ def run(principal, keytab): while True: renew_from_kt(principal, keytab) - time.sleep(configuration.conf.getint('kerberos', 'reinit_frequency')) + time.sleep(conf.getint('kerberos', 'reinit_frequency')) diff --git a/airflow/task/task_runner/__init__.py b/airflow/task/task_runner/__init__.py index 960a9e9e8e4262..0c422013dbe192 100644 --- a/airflow/task/task_runner/__init__.py +++ b/airflow/task/task_runner/__init__.py @@ -17,11 +17,11 @@ # specific language governing permissions and limitations # under the License. -from airflow import configuration +from airflow.configuration import conf from airflow.task.task_runner.standard_task_runner import StandardTaskRunner from airflow.exceptions import AirflowException -_TASK_RUNNER = configuration.conf.get('core', 'TASK_RUNNER') +_TASK_RUNNER = conf.get('core', 'TASK_RUNNER') def get_task_runner(local_task_job): diff --git a/airflow/task/task_runner/base_task_runner.py b/airflow/task/task_runner/base_task_runner.py index a88f5765d682fa..efd53e05ed3b60 100644 --- a/airflow/task/task_runner/base_task_runner.py +++ b/airflow/task/task_runner/base_task_runner.py @@ -26,7 +26,7 @@ from airflow.utils.log.logging_mixin import LoggingMixin -from airflow import configuration as conf +from airflow.configuration import conf from airflow.utils.configuration import tmp_configuration_copy diff --git a/airflow/utils/configuration.py b/airflow/utils/configuration.py index 13594a01fef107..ca16f871a30932 100644 --- a/airflow/utils/configuration.py +++ b/airflow/utils/configuration.py @@ -23,7 +23,7 @@ import json from tempfile import mkstemp -from airflow import configuration as conf +from airflow.configuration import conf def tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True): diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index d4dcddc7c4ea1d..2c14022a647b9e 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -44,7 +44,7 @@ # To avoid circular imports import airflow.models -from airflow import configuration as conf +from airflow.configuration import conf from airflow.dag.base_dag import BaseDag, BaseDagBag from airflow.exceptions import AirflowException from airflow.settings import Stats diff --git a/airflow/utils/email.py b/airflow/utils/email.py index 532880161eaf9a..efd0def8440942 100644 --- a/airflow/utils/email.py +++ b/airflow/utils/email.py @@ -33,7 +33,7 @@ from email.mime.application import MIMEApplication from email.utils import formatdate -from airflow import configuration +from airflow.configuration import conf from airflow.exceptions import AirflowConfigException from airflow.utils.log.logging_mixin import LoggingMixin @@ -44,7 +44,7 @@ def send_email(to, subject, html_content, """ Send email using backend specified in EMAIL_BACKEND. """ - path, attr = configuration.conf.get('email', 'EMAIL_BACKEND').rsplit('.', 1) + path, attr = conf.get('email', 'EMAIL_BACKEND').rsplit('.', 1) module = importlib.import_module(path) backend = getattr(module, attr) to = get_email_address_list(to) @@ -64,7 +64,7 @@ def send_email_smtp(to, subject, html_content, files=None, >>> send_email('test@example.com', 'foo', 'Foo bar', ['/dev/null'], dryrun=True) """ - smtp_mail_from = configuration.conf.get('smtp', 'SMTP_MAIL_FROM') + smtp_mail_from = conf.get('smtp', 'SMTP_MAIL_FROM') to = get_email_address_list(to) @@ -104,16 +104,16 @@ def send_email_smtp(to, subject, html_content, files=None, def send_MIME_email(e_from, e_to, mime_msg, dryrun=False): log = LoggingMixin().log - SMTP_HOST = configuration.conf.get('smtp', 'SMTP_HOST') - SMTP_PORT = configuration.conf.getint('smtp', 'SMTP_PORT') - SMTP_STARTTLS = configuration.conf.getboolean('smtp', 'SMTP_STARTTLS') - SMTP_SSL = configuration.conf.getboolean('smtp', 'SMTP_SSL') + SMTP_HOST = conf.get('smtp', 'SMTP_HOST') + SMTP_PORT = conf.getint('smtp', 'SMTP_PORT') + SMTP_STARTTLS = conf.getboolean('smtp', 'SMTP_STARTTLS') + SMTP_SSL = conf.getboolean('smtp', 'SMTP_SSL') SMTP_USER = None SMTP_PASSWORD = None try: - SMTP_USER = configuration.conf.get('smtp', 'SMTP_USER') - SMTP_PASSWORD = configuration.conf.get('smtp', 'SMTP_PASSWORD') + SMTP_USER = conf.get('smtp', 'SMTP_USER') + SMTP_PASSWORD = conf.get('smtp', 'SMTP_PASSWORD') except AirflowConfigException: log.debug("No user/password found for SMTP, so logging in with no authentication.") diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py index ce535a7059cbe9..81e72b2257a9de 100644 --- a/airflow/utils/helpers.py +++ b/airflow/utils/helpers.py @@ -41,12 +41,12 @@ from jinja2 import Template -from airflow import configuration +from airflow.configuration import conf from airflow.exceptions import AirflowException # When killing processes, time to wait after issuing a SIGTERM before issuing a # SIGKILL. -DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = configuration.conf.getint( +DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = conf.getint( 'core', 'KILLED_TASK_CLEANUP_TIME' ) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 041b3b291abe81..188234b1c4612c 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -23,7 +23,7 @@ import requests -from airflow import configuration as conf +from airflow.configuration import conf from airflow.configuration import AirflowConfigException from airflow.utils.file import mkdirs from airflow.utils.helpers import parse_template_string diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py index 1f2360c2d7db95..0eb621d33fe48d 100644 --- a/airflow/utils/log/gcs_task_handler.py +++ b/airflow/utils/log/gcs_task_handler.py @@ -20,7 +20,7 @@ from cached_property import cached_property -from airflow import configuration +from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.log.file_task_handler import FileTaskHandler @@ -43,7 +43,7 @@ def __init__(self, base_log_folder, gcs_log_folder, filename_template): @cached_property def hook(self): - remote_conn_id = configuration.conf.get('core', 'REMOTE_LOG_CONN_ID') + remote_conn_id = conf.get('core', 'REMOTE_LOG_CONN_ID') try: from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook return GoogleCloudStorageHook( diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py index da0d3bb111c276..0f5bf7c90facf0 100644 --- a/airflow/utils/log/s3_task_handler.py +++ b/airflow/utils/log/s3_task_handler.py @@ -20,7 +20,7 @@ from cached_property import cached_property -from airflow import configuration +from airflow.configuration import conf from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.log.file_task_handler import FileTaskHandler @@ -41,7 +41,7 @@ def __init__(self, base_log_folder, s3_log_folder, filename_template): @cached_property def hook(self): - remote_conn_id = configuration.conf.get('core', 'REMOTE_LOG_CONN_ID') + remote_conn_id = conf.get('core', 'REMOTE_LOG_CONN_ID') try: from airflow.hooks.S3_hook import S3Hook return S3Hook(remote_conn_id) @@ -164,7 +164,7 @@ def s3_write(self, log, remote_log_location, append=True): log, key=remote_log_location, replace=True, - encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'), + encrypt=conf.getboolean('core', 'ENCRYPT_S3_LOGS'), ) except Exception: self.log.exception('Could not write logs to %s', remote_log_location) diff --git a/airflow/utils/log/wasb_task_handler.py b/airflow/utils/log/wasb_task_handler.py index 5672eec93c34a3..c6a22099059f23 100644 --- a/airflow/utils/log/wasb_task_handler.py +++ b/airflow/utils/log/wasb_task_handler.py @@ -21,7 +21,7 @@ from cached_property import cached_property -from airflow import configuration +from airflow.configuration import conf from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.log.file_task_handler import FileTaskHandler from azure.common import AzureHttpError @@ -47,7 +47,7 @@ def __init__(self, base_log_folder, wasb_log_folder, wasb_container, @cached_property def hook(self): - remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID') + remote_conn_id = conf.get('core', 'REMOTE_LOG_CONN_ID') try: from airflow.contrib.hooks.wasb_hook import WasbHook return WasbHook(remote_conn_id) diff --git a/airflow/utils/operator_resources.py b/airflow/utils/operator_resources.py index 44df83eafb5089..fbe66ed85cbdbb 100644 --- a/airflow/utils/operator_resources.py +++ b/airflow/utils/operator_resources.py @@ -17,7 +17,7 @@ # specific language governing permissions and limitations # under the License. -from airflow import configuration +from airflow.configuration import conf from airflow.exceptions import AirflowException # Constants for resources (megabytes are the base unit) @@ -105,10 +105,10 @@ class Resources(object): :type gpus: long """ def __init__(self, - cpus=configuration.conf.getint('operators', 'default_cpus'), - ram=configuration.conf.getint('operators', 'default_ram'), - disk=configuration.conf.getint('operators', 'default_disk'), - gpus=configuration.conf.getint('operators', 'default_gpus') + cpus=conf.getint('operators', 'default_cpus'), + ram=conf.getint('operators', 'default_ram'), + disk=conf.getint('operators', 'default_disk'), + gpus=conf.getint('operators', 'default_gpus') ): self.cpus = CpuResource(cpus) self.ram = RamResource(ram) diff --git a/airflow/www/app.py b/airflow/www/app.py index 5c1c24e11ee05a..05a216f0b57a3d 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -29,8 +29,8 @@ from werkzeug.middleware.dispatcher import DispatcherMiddleware import airflow -from airflow import configuration as conf from airflow import models, LoggingMixin +from airflow.configuration import conf from airflow.models.connection import Connection from airflow.settings import Session @@ -38,7 +38,6 @@ from airflow.logging_config import configure_logging from airflow import jobs from airflow import settings -from airflow import configuration from airflow.utils.net import get_hostname csrf = CSRFProtect() @@ -46,7 +45,7 @@ def create_app(config=None, testing=False): app = Flask(__name__) - if configuration.conf.getboolean('webserver', 'ENABLE_PROXY_FIX'): + if conf.getboolean('webserver', 'ENABLE_PROXY_FIX'): app.wsgi_app = ProxyFix( app.wsgi_app, num_proxies=None, @@ -56,8 +55,8 @@ def create_app(config=None, testing=False): x_port=1, x_prefix=1 ) - app.secret_key = configuration.conf.get('webserver', 'SECRET_KEY') - app.config['LOGIN_DISABLED'] = not configuration.conf.getboolean( + app.secret_key = conf.get('webserver', 'SECRET_KEY') + app.config['LOGIN_DISABLED'] = not conf.getboolean( 'webserver', 'AUTHENTICATE') app.config['SESSION_COOKIE_HTTPONLY'] = True @@ -172,7 +171,7 @@ def integrate_plugins(): def jinja_globals(): return { 'hostname': get_hostname(), - 'navbar_color': configuration.get('webserver', 'NAVBAR_COLOR'), + 'navbar_color': conf.get('webserver', 'NAVBAR_COLOR'), } @app.teardown_appcontext @@ -193,7 +192,7 @@ def root_app(env, resp): def cached_app(config=None, testing=False): global app if not app: - base_url = urlparse(configuration.conf.get('webserver', 'base_url'))[2] + base_url = urlparse(conf.get('webserver', 'base_url'))[2] if not base_url or base_url == '/': base_url = "" diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 32cf5bd2d3016d..a9bcc3e1e59eb7 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -41,7 +41,8 @@ from flask_login import current_user from six.moves.urllib.parse import urlencode -from airflow import configuration, models, settings +from airflow import models, settings +from airflow.configuration import conf from airflow.utils.db import create_session from airflow.utils import timezone from airflow.utils.json import AirflowJsonEncoder @@ -53,7 +54,7 @@ # Use cgi.escape for Python 2 from cgi import escape # type: ignore -AUTHENTICATE = configuration.conf.getboolean('webserver', 'AUTHENTICATE') +AUTHENTICATE = conf.getboolean('webserver', 'AUTHENTICATE') DEFAULT_SENSITIVE_VARIABLE_FIELDS = ( 'password', @@ -69,7 +70,7 @@ def should_hide_value_for_key(key_name): # It is possible via importing variables from file that a key is empty. if key_name: - config_set = configuration.conf.getboolean('admin', + config_set = conf.getboolean('admin', 'hide_sensitive_variable_fields') field_comp = any(s in key_name.lower() for s in DEFAULT_SENSITIVE_VARIABLE_FIELDS) return config_set and field_comp diff --git a/airflow/www/views.py b/airflow/www/views.py index d6a4dc3b8eb12e..d13b536084b5a1 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -62,7 +62,8 @@ StringField, IntegerField, validators) import airflow -from airflow import configuration as conf, LoggingMixin, configuration +from airflow import LoggingMixin, configuration +from airflow.configuration import conf from airflow import models from airflow import settings from airflow import jobs @@ -2696,7 +2697,7 @@ class XComView(wwwutils.SuperUserMixin, AirflowModelView): form_overrides = dict(execution_date=DateTimeField) def on_model_change(self, form, model, is_created): - enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling') + enable_pickling = conf.getboolean('core', 'enable_xcom_pickling') if enable_pickling: model.value = pickle.dumps(model.value) else: @@ -3166,9 +3167,9 @@ class ConfigurationView(wwwutils.SuperUserMixin, AirflowViewMixin, BaseView): def conf(self): raw = request.args.get('raw') == "true" title = "Airflow Configuration" - subtitle = conf.AIRFLOW_CONFIG + subtitle = configuration.AIRFLOW_CONFIG if conf.getboolean("webserver", "expose_config"): - with open(conf.AIRFLOW_CONFIG, 'r') as f: + with open(configuration.AIRFLOW_CONFIG, 'r') as f: config = f.read() table = [(section, key, value, source) for section, parameters in conf.as_dict(True, True).items() diff --git a/airflow/www_rbac/app.py b/airflow/www_rbac/app.py index a47e2ce92fd0c1..94b4e76c296b09 100644 --- a/airflow/www_rbac/app.py +++ b/airflow/www_rbac/app.py @@ -31,7 +31,7 @@ from werkzeug.middleware.dispatcher import DispatcherMiddleware from airflow import settings -from airflow import configuration as conf +from airflow.configuration import conf from airflow.logging_config import configure_logging from airflow.www_rbac.static_config import configure_manifest_files diff --git a/airflow/www_rbac/utils.py b/airflow/www_rbac/utils.py index ad0732fa4ddcee..5e1f83261395ca 100644 --- a/airflow/www_rbac/utils.py +++ b/airflow/www_rbac/utils.py @@ -40,14 +40,14 @@ import sqlalchemy as sqla from six.moves.urllib.parse import urlencode -from airflow import configuration +from airflow.configuration import conf from airflow.models import BaseOperator from airflow.operators.subdag_operator import SubDagOperator from airflow.utils import timezone from airflow.utils.json import AirflowJsonEncoder from airflow.utils.state import State -AUTHENTICATE = configuration.getboolean('webserver', 'AUTHENTICATE') +AUTHENTICATE = conf.getboolean('webserver', 'AUTHENTICATE') DEFAULT_SENSITIVE_VARIABLE_FIELDS = ( 'password', @@ -63,8 +63,8 @@ def should_hide_value_for_key(key_name): # It is possible via importing variables from file that a key is empty. if key_name: - config_set = configuration.conf.getboolean('admin', - 'hide_sensitive_variable_fields') + config_set = conf.getboolean('admin', + 'hide_sensitive_variable_fields') field_comp = any(s in key_name.lower() for s in DEFAULT_SENSITIVE_VARIABLE_FIELDS) return config_set and field_comp return False diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py index 1d301e163d6cd9..f2b2a912d7b403 100644 --- a/airflow/www_rbac/views.py +++ b/airflow/www_rbac/views.py @@ -49,9 +49,9 @@ from wtforms import SelectField, validators import airflow -from airflow import configuration as conf from airflow import models, jobs -from airflow import settings +from airflow import settings, configuration +from airflow.configuration import conf from airflow.api.common.experimental.mark_tasks import (set_dag_run_state_to_success, set_dag_run_state_to_failed) from airflow.models import Connection, DagModel, DagRun, errors, Log, SlaMiss, TaskFail, XCom @@ -1957,10 +1957,10 @@ class ConfigurationView(AirflowBaseView): def conf(self): raw = request.args.get('raw') == "true" title = "Airflow Configuration" - subtitle = conf.AIRFLOW_CONFIG + subtitle = configuration.AIRFLOW_CONFIG # Don't show config when expose_config variable is False in airflow config if conf.getboolean("webserver", "expose_config"): - with open(conf.AIRFLOW_CONFIG, 'r') as f: + with open(configuration.AIRFLOW_CONFIG, 'r') as f: config = f.read() table = [(section, key, value, source) for section, parameters in conf.as_dict(True, True).items() diff --git a/scripts/perf/scheduler_ops_metrics.py b/scripts/perf/scheduler_ops_metrics.py index ef274d6cd233e5..6000ce2e9a0c01 100644 --- a/scripts/perf/scheduler_ops_metrics.py +++ b/scripts/perf/scheduler_ops_metrics.py @@ -21,7 +21,8 @@ import pandas as pd import sys -from airflow import configuration, settings +from airflow import settings +from airflow.configuration import conf from airflow.jobs import SchedulerJob from airflow.models import DagBag, DagModel, DagRun, TaskInstance from airflow.utils import timezone @@ -191,7 +192,7 @@ def main(): logging.error('Specify a positive integer for timeout.') sys.exit(1) - configuration.load_test_config() + conf.load_test_config() set_dags_paused_state(False) clear_dag_runs() diff --git a/tests/api/common/experimental/test_mark_tasks.py b/tests/api/common/experimental/test_mark_tasks.py index 593634d50657b3..f94911a13daa3e 100644 --- a/tests/api/common/experimental/test_mark_tasks.py +++ b/tests/api/common/experimental/test_mark_tasks.py @@ -21,7 +21,8 @@ import time from datetime import datetime, timedelta -from airflow import configuration, models +from airflow import models +from airflow.configuration import conf from airflow.api.common.experimental.mark_tasks import ( set_state, _create_dagruns, set_dag_run_state_to_success, set_dag_run_state_to_failed, set_dag_run_state_to_running) @@ -251,7 +252,7 @@ def test_mark_tasks_multiple(self): # TODO: this skipIf should be removed once a fixing solution is found later # We skip it here because this test case is working with Postgres & SQLite # but not with MySQL - @unittest.skipIf('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'), "Flaky with MySQL") + @unittest.skipIf('mysql' in conf.get('core', 'sql_alchemy_conn'), "Flaky with MySQL") def test_mark_tasks_subdag(self): # set one task to success towards end of scheduled dag runs task = self.dag2.get_task("section-1") diff --git a/tests/contrib/operators/test_s3_to_sftp_operator.py b/tests/contrib/operators/test_s3_to_sftp_operator.py index 6139ca7073f9ae..cef922d3178ef3 100644 --- a/tests/contrib/operators/test_s3_to_sftp_operator.py +++ b/tests/contrib/operators/test_s3_to_sftp_operator.py @@ -19,7 +19,7 @@ import unittest -from airflow import configuration +from airflow.configuration import conf from airflow import models from airflow.contrib.operators.s3_to_sftp_operator import S3ToSFTPOperator from airflow.contrib.operators.ssh_operator import SSHOperator @@ -88,7 +88,7 @@ def setUp(self): @mock_s3 def test_s3_to_sftp_operation(self): # Setting - configuration.conf.set("core", "enable_xcom_pickling", "True") + conf.set("core", "enable_xcom_pickling", "True") test_remote_file_content = \ "This is remote file content \n which is also multiline " \ "another line here \n this is last line. EOF" diff --git a/tests/core.py b/tests/core.py index 7c74bbb3d12522..8c363c6f1cd01b 100644 --- a/tests/core.py +++ b/tests/core.py @@ -42,7 +42,6 @@ from bs4 import BeautifulSoup -from airflow import configuration from airflow.executors import SequentialExecutor from airflow.models import Variable, TaskInstance @@ -65,7 +64,9 @@ from airflow.utils.state import State from airflow.utils.dates import days_ago, infer_time_unit, round_time, scale_time_units from airflow.exceptions import AirflowException -from airflow.configuration import AirflowConfigException, run_command +from airflow.configuration import ( + AirflowConfigException, run_command, conf, parameterized_config, DEFAULT_CONFIG +) from jinja2.exceptions import SecurityError from jinja2 import UndefinedError from pendulum import utcnow @@ -342,7 +343,7 @@ def test_schedule_dag_no_end_date_up_to_today_only(self): self.assertIsNone(additional_dag_run) def test_confirm_unittest_mod(self): - self.assertTrue(configuration.conf.get('core', 'unit_test_mode')) + self.assertTrue(conf.get('core', 'unit_test_mode')) def test_pickling(self): dp = self.dag.pickle() @@ -750,7 +751,7 @@ def test_variable_setdefault_existing_json(self): def test_parameterized_config_gen(self): - cfg = configuration.parameterized_config(configuration.DEFAULT_CONFIG) + cfg = parameterized_config(DEFAULT_CONFIG) # making sure some basic building blocks are present: self.assertIn("[core]", cfg) @@ -763,13 +764,13 @@ def test_parameterized_config_gen(self): self.assertNotIn("{FERNET_KEY}", cfg) def test_config_use_original_when_original_and_fallback_are_present(self): - self.assertTrue(configuration.conf.has_option("core", "FERNET_KEY")) - self.assertFalse(configuration.conf.has_option("core", "FERNET_KEY_CMD")) + self.assertTrue(conf.has_option("core", "FERNET_KEY")) + self.assertFalse(conf.has_option("core", "FERNET_KEY_CMD")) - FERNET_KEY = configuration.conf.get('core', 'FERNET_KEY') + FERNET_KEY = conf.get('core', 'FERNET_KEY') with conf_vars({('core', 'FERNET_KEY_CMD'): 'printf HELLO'}): - FALLBACK_FERNET_KEY = configuration.conf.get( + FALLBACK_FERNET_KEY = conf.get( "core", "FERNET_KEY" ) @@ -777,12 +778,12 @@ def test_config_use_original_when_original_and_fallback_are_present(self): self.assertEqual(FERNET_KEY, FALLBACK_FERNET_KEY) def test_config_throw_error_when_original_and_fallback_is_absent(self): - self.assertTrue(configuration.conf.has_option("core", "FERNET_KEY")) - self.assertFalse(configuration.conf.has_option("core", "FERNET_KEY_CMD")) + self.assertTrue(conf.has_option("core", "FERNET_KEY")) + self.assertFalse(conf.has_option("core", "FERNET_KEY_CMD")) with conf_vars({('core', 'fernet_key'): None}): with self.assertRaises(AirflowConfigException) as cm: - configuration.conf.get("core", "FERNET_KEY") + conf.get("core", "FERNET_KEY") exception = str(cm.exception) message = "section/key [core/fernet_key] not found in config" @@ -794,7 +795,7 @@ def test_config_override_original_when_non_empty_envvar_is_provided(self): self.assertNotIn(key, os.environ) os.environ[key] = value - FERNET_KEY = configuration.conf.get('core', 'FERNET_KEY') + FERNET_KEY = conf.get('core', 'FERNET_KEY') self.assertEqual(value, FERNET_KEY) # restore the envvar back to the original state @@ -806,7 +807,7 @@ def test_config_override_original_when_empty_envvar_is_provided(self): self.assertNotIn(key, os.environ) os.environ[key] = value - FERNET_KEY = configuration.conf.get('core', 'FERNET_KEY') + FERNET_KEY = conf.get('core', 'FERNET_KEY') self.assertEqual(value, FERNET_KEY) # restore the envvar back to the original state @@ -2258,7 +2259,7 @@ def logout(self): return self.app.get('/admin/airflow/logout', follow_redirects=True) def test_login_logout_password_auth(self): - self.assertTrue(configuration.conf.getboolean('webserver', 'authenticate')) + self.assertTrue(conf.getboolean('webserver', 'authenticate')) response = self.login('user1', 'whatever') self.assertIn('Incorrect login details', response.data.decode('utf-8')) @@ -2317,7 +2318,7 @@ def logout(self): return self.app.get('/admin/airflow/logout', follow_redirects=True) def test_login_logout_ldap(self): - self.assertTrue(configuration.conf.getboolean('webserver', 'authenticate')) + self.assertTrue(conf.getboolean('webserver', 'authenticate')) response = self.login('user1', 'userx') self.assertIn('Incorrect login details', response.data.decode('utf-8')) @@ -2715,11 +2716,11 @@ def test_send_smtp(self, mock_send_mime): utils.email.send_email_smtp('to', 'subject', 'content', files=[attachment.name]) self.assertTrue(mock_send_mime.called) call_args = mock_send_mime.call_args[0] - self.assertEqual(configuration.conf.get('smtp', 'SMTP_MAIL_FROM'), call_args[0]) + self.assertEqual(conf.get('smtp', 'SMTP_MAIL_FROM'), call_args[0]) self.assertEqual(['to'], call_args[1]) msg = call_args[2] self.assertEqual('subject', msg['Subject']) - self.assertEqual(configuration.conf.get('smtp', 'SMTP_MAIL_FROM'), msg['From']) + self.assertEqual(conf.get('smtp', 'SMTP_MAIL_FROM'), msg['From']) self.assertEqual(2, len(msg.get_payload())) filename = u'attachment; filename="' + os.path.basename(attachment.name) + '"' self.assertEqual(filename, msg.get_payload()[-1].get(u'Content-Disposition')) @@ -2743,11 +2744,11 @@ def test_send_bcc_smtp(self, mock_send_mime): utils.email.send_email_smtp('to', 'subject', 'content', files=[attachment.name], cc='cc', bcc='bcc') self.assertTrue(mock_send_mime.called) call_args = mock_send_mime.call_args[0] - self.assertEqual(configuration.conf.get('smtp', 'SMTP_MAIL_FROM'), call_args[0]) + self.assertEqual(conf.get('smtp', 'SMTP_MAIL_FROM'), call_args[0]) self.assertEqual(['to', 'cc', 'bcc'], call_args[1]) msg = call_args[2] self.assertEqual('subject', msg['Subject']) - self.assertEqual(configuration.conf.get('smtp', 'SMTP_MAIL_FROM'), msg['From']) + self.assertEqual(conf.get('smtp', 'SMTP_MAIL_FROM'), msg['From']) self.assertEqual(2, len(msg.get_payload())) self.assertEqual(u'attachment; filename="' + os.path.basename(attachment.name) + '"', msg.get_payload()[-1].get(u'Content-Disposition')) @@ -2762,13 +2763,13 @@ def test_send_mime(self, mock_smtp, mock_smtp_ssl): msg = MIMEMultipart() utils.email.send_MIME_email('from', 'to', msg, dryrun=False) mock_smtp.assert_called_with( - configuration.conf.get('smtp', 'SMTP_HOST'), - configuration.conf.getint('smtp', 'SMTP_PORT'), + conf.get('smtp', 'SMTP_HOST'), + conf.getint('smtp', 'SMTP_PORT'), ) self.assertTrue(mock_smtp.return_value.starttls.called) mock_smtp.return_value.login.assert_called_with( - configuration.conf.get('smtp', 'SMTP_USER'), - configuration.conf.get('smtp', 'SMTP_PASSWORD'), + conf.get('smtp', 'SMTP_USER'), + conf.get('smtp', 'SMTP_PASSWORD'), ) mock_smtp.return_value.sendmail.assert_called_with('from', 'to', msg.as_string()) self.assertTrue(mock_smtp.return_value.quit.called) @@ -2782,8 +2783,8 @@ def test_send_mime_ssl(self, mock_smtp, mock_smtp_ssl): utils.email.send_MIME_email('from', 'to', MIMEMultipart(), dryrun=False) self.assertFalse(mock_smtp.called) mock_smtp_ssl.assert_called_with( - configuration.conf.get('smtp', 'SMTP_HOST'), - configuration.conf.getint('smtp', 'SMTP_PORT'), + conf.get('smtp', 'SMTP_HOST'), + conf.getint('smtp', 'SMTP_PORT'), ) @mock.patch('smtplib.SMTP_SSL') @@ -2798,8 +2799,8 @@ def test_send_mime_noauth(self, mock_smtp, mock_smtp_ssl): utils.email.send_MIME_email('from', 'to', MIMEMultipart(), dryrun=False) self.assertFalse(mock_smtp_ssl.called) mock_smtp.assert_called_with( - configuration.conf.get('smtp', 'SMTP_HOST'), - configuration.conf.getint('smtp', 'SMTP_PORT'), + conf.get('smtp', 'SMTP_HOST'), + conf.getint('smtp', 'SMTP_PORT'), ) self.assertFalse(mock_smtp.login.called) diff --git a/tests/dags/test_impersonation_custom.py b/tests/dags/test_impersonation_custom.py index 93045d789a81ad..416968c5607be8 100644 --- a/tests/dags/test_impersonation_custom.py +++ b/tests/dags/test_impersonation_custom.py @@ -47,7 +47,7 @@ def print_today(): def check_hive_conf(): - from airflow import configuration as conf + from airflow.configuration import conf assert conf.get('hive', 'default_hive_mapred_queue') == 'airflow' diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index bde4e5b5379f53..fe575d0b803d0c 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -30,14 +30,14 @@ from celery import states as celery_states from airflow.utils.state import State -from airflow import configuration +from airflow.configuration import conf # leave this it is used by the test worker -import celery.contrib.testing.tasks # noqa: F401 +import celery.contrib.testing.tasks # noqa: F401 pylint: disable=ungrouped-imports class CeleryExecutorTest(unittest.TestCase): - @unittest.skipIf('sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipIf('sqlite' in conf.get('core', 'sql_alchemy_conn'), "sqlite is configured with SequentialExecutor") def test_celery_integration(self): executor = CeleryExecutor() @@ -88,7 +88,7 @@ def test_celery_integration(self): self.assertNotIn('success', executor.last_state) self.assertNotIn('fail', executor.last_state) - @unittest.skipIf('sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipIf('sqlite' in conf.get('core', 'sql_alchemy_conn'), "sqlite is configured with SequentialExecutor") def test_error_sending_task(self): @app.task diff --git a/tests/executors/test_dask_executor.py b/tests/executors/test_dask_executor.py index 22484ce61d5142..2658658cdd159a 100644 --- a/tests/executors/test_dask_executor.py +++ b/tests/executors/test_dask_executor.py @@ -20,7 +20,7 @@ import unittest from tests.compat import mock -from airflow import configuration +from airflow.configuration import conf from airflow.models import DagBag from airflow.jobs import BackfillJob from airflow.utils import timezone @@ -41,7 +41,7 @@ except ImportError: SKIP_DASK = True -if 'sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'): +if 'sqlite' in conf.get('core', 'sql_alchemy_conn'): SKIP_DASK = True # Always skip due to issues on python 3 issues @@ -140,9 +140,9 @@ def test_tls(self): # These use test certs that ship with dask/distributed and should not be # used in production - configuration.set('dask', 'tls_ca', get_cert('tls-ca-cert.pem')) - configuration.set('dask', 'tls_cert', get_cert('tls-key-cert.pem')) - configuration.set('dask', 'tls_key', get_cert('tls-key.pem')) + conf.set('dask', 'tls_ca', get_cert('tls-ca-cert.pem')) + conf.set('dask', 'tls_cert', get_cert('tls-key-cert.pem')) + conf.set('dask', 'tls_key', get_cert('tls-key.pem')) try: executor = DaskExecutor(cluster_address=s['address']) @@ -153,9 +153,9 @@ def test_tls(self): # and tasks to have completed. executor.client.close() finally: - configuration.set('dask', 'tls_ca', '') - configuration.set('dask', 'tls_key', '') - configuration.set('dask', 'tls_cert', '') + conf.set('dask', 'tls_ca', '') + conf.set('dask', 'tls_key', '') + conf.set('dask', 'tls_cert', '') @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration') @mock.patch('airflow.executors.dask_executor.DaskExecutor.sync') diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py index 19d6723b502050..b9f28a3ef0593d 100644 --- a/tests/jobs/test_backfill_job.py +++ b/tests/jobs/test_backfill_job.py @@ -28,7 +28,7 @@ from parameterized import parameterized from airflow import AirflowException, settings -from airflow import configuration +from airflow.configuration import conf from airflow.bin import cli from airflow.exceptions import DagConcurrencyLimitReached, NoAvailablePoolSlot, \ TaskConcurrencyLimitReached @@ -128,7 +128,7 @@ def test_dag_run_with_finished_tasks_set_to_success(self): self.assertEquals(State.SUCCESS, dag_run.state) - @unittest.skipIf('sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipIf('sqlite' in conf.get('core', 'sql_alchemy_conn'), "concurrent access not supported in sqlite") def test_trigger_controller_dag(self): dag = self.dagbag.get_dag('example_trigger_controller_dag') @@ -152,7 +152,7 @@ def test_trigger_controller_dag(self): self.assertTrue(task_instances_list.append.called) - @unittest.skipIf('sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipIf('sqlite' in conf.get('core', 'sql_alchemy_conn'), "concurrent access not supported in sqlite") def test_backfill_multi_dates(self): dag = self.dagbag.get_dag('example_bash_operator') @@ -205,6 +205,10 @@ def test_backfill_multi_dates(self): dag.clear() session.close() + @unittest.skipIf( + "sqlite" in conf.get("core", "sql_alchemy_conn"), + "concurrent access not supported in sqlite", + ) @parameterized.expand( [ [ @@ -243,10 +247,6 @@ def test_backfill_multi_dates(self): ["latest_only", ("latest_only", "task1")], ] ) - @unittest.skipIf( - "sqlite" in configuration.conf.get("core", "sql_alchemy_conn"), - "concurrent access not supported in sqlite", - ) def test_backfill_examples(self, dag_id, expected_execution_order): """ Test backfilling example dags @@ -800,7 +800,7 @@ def test_backfill_depends_on_past(self): ti.refresh_from_db() self.assertEqual(ti.state, State.SUCCESS) - @unittest.skipIf('sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipIf('sqlite' in conf.get('core', 'sql_alchemy_conn'), "concurrent access not supported in sqlite") def test_run_ignores_all_dependencies(self): """ diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index 742091e20b6d8c..06cc2f98904d1f 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -23,7 +23,7 @@ import unittest from airflow import AirflowException, models, settings -from airflow import configuration +from airflow.configuration import conf from airflow.executors import SequentialExecutor from airflow.jobs import LocalTaskJob from airflow.models import DAG, TaskInstance as TI @@ -116,9 +116,9 @@ def test_localtaskjob_heartbeat(self, mock_pid): mock_pid.return_value = 2 self.assertRaises(AirflowException, job1.heartbeat_callback) - @unittest.skipIf('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipIf('mysql' in conf.get('core', 'sql_alchemy_conn'), "flaky when run on mysql") - @unittest.skipIf('postgresql' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipIf('postgresql' in conf.get('core', 'sql_alchemy_conn'), 'flaky when run on postgresql') def test_mark_success_no_kill(self): """ diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 7d0cc947b163ed..6ecda7504e1f4b 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -36,7 +36,7 @@ import airflow.example_dags from airflow import AirflowException, models, settings -from airflow import configuration +from airflow.configuration import conf from airflow.executors import BaseExecutor from airflow.jobs import BackfillJob, SchedulerJob from airflow.models import DAG, DagBag, DagModel, DagRun, Pool, SlaMiss, \ @@ -85,20 +85,17 @@ def setUp(self): @classmethod def setUpClass(cls): cls.dagbag = DagBag() - - def getboolean(section, key): - if section.lower() == 'core' and key.lower() == 'load_examples': - return False - else: - return configuration.conf.getboolean(section, key) - - cls.patcher = mock.patch('airflow.jobs.scheduler_job.conf.getboolean') - cls.mock_getboolean = cls.patcher.start() - cls.mock_getboolean.side_effect = getboolean + cls.old_val = None + if conf.has_option('core', 'load_examples'): + cls.old_val = conf.get('core', 'load_examples') + conf.set('core', 'load_examples', 'false') @classmethod def tearDownClass(cls): - cls.patcher.stop() + if cls.old_val is not None: + conf.set('core', 'load_examples', cls.old_val) + else: + conf.remove_option('core', 'load_examples') def test_is_alive(self): job = SchedulerJob(None, heartrate=10, state=State.RUNNING) @@ -2301,7 +2298,7 @@ def setup_dag(dag_id, schedule_interval, start_date, catchup): schedule_interval='* * * * *', start_date=six_hours_ago_to_the_hour, catchup=True) - default_catchup = configuration.conf.getboolean('scheduler', 'catchup_by_default') + default_catchup = conf.getboolean('scheduler', 'catchup_by_default') self.assertEqual(default_catchup, True) self.assertEqual(dag1.catchup, True) diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py index 93636765dc7f0c..c1d0dad598eaa0 100644 --- a/tests/models/test_cleartasks.py +++ b/tests/models/test_cleartasks.py @@ -21,7 +21,8 @@ import os import unittest -from airflow import settings, configuration +from airflow import settings +from airflow.configuration import conf from airflow.models import DAG, TaskInstance as TI, clear_task_instances, XCom from airflow.operators.dummy_operator import DummyOperator from airflow.utils import timezone @@ -239,7 +240,7 @@ def test_xcom_disable_pickle_type(self): dag_id = "test_dag1" task_id = "test_task1" - configuration.set("core", "enable_xcom_pickling", "False") + conf.set("core", "enable_xcom_pickling", "False") XCom.set(key=key, value=json_obj, @@ -269,7 +270,7 @@ def test_xcom_enable_pickle_type(self): dag_id = "test_dag2" task_id = "test_task2" - configuration.set("core", "enable_xcom_pickling", "True") + conf.set("core", "enable_xcom_pickling", "True") XCom.set(key=key, value=json_obj, @@ -297,7 +298,7 @@ class PickleRce(object): def __reduce__(self): return os.system, ("ls -alt",) - configuration.set("core", "xcom_enable_pickling", "False") + conf.set("core", "xcom_enable_pickling", "False") self.assertRaises(TypeError, XCom.set, key="xcom_test3", @@ -315,7 +316,7 @@ def test_xcom_get_many(self): dag_id2 = "test_dag5" task_id2 = "test_task5" - configuration.set("core", "xcom_enable_pickling", "True") + conf.set("core", "xcom_enable_pickling", "True") XCom.set(key=key, value=json_obj, diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 76c721d682fdd5..d6098612f134eb 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -28,7 +28,8 @@ import six from mock import patch -from airflow import models, settings, configuration +from airflow import models, settings +from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowDagCycleException from airflow.models import DAG, DagModel, TaskInstance as TI from airflow.operators.dummy_operator import DummyOperator @@ -675,7 +676,7 @@ def test_sync_to_db(self, mock_now): self.assertTrue(orm_dag.is_active) self.assertIsNone(orm_dag.default_view) self.assertEqual(orm_dag.get_default_view(), - configuration.conf.get('webserver', 'dag_default_view').lower()) + conf.get('webserver', 'dag_default_view').lower()) self.assertEqual(orm_dag.safe_dag_id, 'dag') orm_subdag = session.query(DagModel).filter( diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index 49c03ca2b27ff2..f717f617a1211a 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -27,7 +27,8 @@ from mock import patch, ANY -from airflow import models, configuration +from airflow import models +from airflow.configuration import conf from airflow.jobs import LocalTaskJob as LJ from airflow.models import DagModel, DagBag, TaskInstance as TI from airflow.utils.db import create_session @@ -629,8 +630,7 @@ def test_kill_zombies_when_job_state_is_not_running(self, mock_ti_handle_failure dagbag.kill_zombies() mock_ti_handle_failure \ .assert_called_with(ANY, - configuration.getboolean('core', - 'unit_test_mode'), + conf.getboolean('core', 'unit_test_mode'), ANY) @patch.object(TI, 'handle_failure') @@ -639,7 +639,7 @@ def test_kill_zombie_when_job_received_no_heartbeat(self, mock_ti_handle_failure Test that kill zombies calls TI's failure handler with proper context """ zombie_threshold_secs = ( - configuration.getint('scheduler', 'scheduler_zombie_task_threshold')) + conf.getint('scheduler', 'scheduler_zombie_task_threshold')) dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=True) with create_session() as session: session.query(TI).delete() @@ -661,8 +661,7 @@ def test_kill_zombie_when_job_received_no_heartbeat(self, mock_ti_handle_failure dagbag.kill_zombies() mock_ti_handle_failure \ .assert_called_with(ANY, - configuration.getboolean('core', - 'unit_test_mode'), + conf.getboolean('core', 'unit_test_mode'), ANY) @patch.object(TI, 'handle_failure') diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 2661a03a9bb3c1..8f93b0695809eb 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -27,7 +27,8 @@ from mock import patch, mock_open from parameterized import parameterized, param from sqlalchemy.orm.session import Session -from airflow import models, settings, configuration +from airflow import models, settings +from airflow.configuration import conf from airflow.contrib.sensors.python_sensor import PythonSensor from airflow.exceptions import AirflowException, AirflowSkipException from airflow.models import DAG, TaskFail, TaskInstance as TI, TaskReschedule, DagRun @@ -901,7 +902,7 @@ def test_log_url_rbac(self): dag = DAG('dag', start_date=DEFAULT_DATE) task = DummyOperator(task_id='op', dag=dag) ti = TI(task=task, execution_date=datetime.datetime(2018, 1, 1)) - configuration.conf.set('webserver', 'rbac', 'True') + conf.set('webserver', 'rbac', 'True') expected_url = ( 'http://localhost:8080/log?' @@ -989,8 +990,8 @@ def test_email_alert_with_config(self, mock_send_email): ti = TI( task=task, execution_date=datetime.datetime.now()) - configuration.set('email', 'subject_template', '/subject/path') - configuration.set('email', 'html_content_template', '/html_content/path') + conf.set('email', 'subject_template', '/subject/path') + conf.set('email', 'html_content_template', '/html_content/path') opener = mock_open(read_data='template: {{ti.task_id}}') with patch('airflow.models.taskinstance.open', opener, create=True): diff --git a/tests/operators/test_hive_operator.py b/tests/operators/test_hive_operator.py index 445e9fac45c827..0dea7a0cf0087e 100644 --- a/tests/operators/test_hive_operator.py +++ b/tests/operators/test_hive_operator.py @@ -25,7 +25,8 @@ from tests.compat import mock import nose -from airflow import DAG, configuration +from airflow import DAG +from airflow.configuration import conf import airflow.operators.hive_operator @@ -92,7 +93,7 @@ def test_hive_airflow_default_config_queue(self): dag=self.dag) # just check that the correct default value in test_default.cfg is used - test_config_hive_mapred_queue = configuration.conf.get( + test_config_hive_mapred_queue = conf.get( 'hive', 'default_hive_mapred_queue' ) diff --git a/tests/operators/test_operators.py b/tests/operators/test_operators.py index d7b8193be42608..d87cfc006a32d2 100644 --- a/tests/operators/test_operators.py +++ b/tests/operators/test_operators.py @@ -19,7 +19,8 @@ from __future__ import print_function -from airflow import DAG, configuration, operators +from airflow import DAG, operators +from airflow.configuration import conf from airflow.utils import timezone from collections import OrderedDict @@ -50,7 +51,7 @@ def tearDown(self): for table in drop_tables: conn.execute("DROP TABLE IF EXISTS {}".format(table)) - @unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipUnless('mysql' in conf.get('core', 'sql_alchemy_conn'), "This is a MySQL test") def test_mysql_operator_test(self): sql = """ @@ -65,7 +66,7 @@ def test_mysql_operator_test(self): dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - @unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipUnless('mysql' in conf.get('core', 'sql_alchemy_conn'), "This is a MySQL test") def test_mysql_operator_test_multi(self): sql = [ @@ -81,7 +82,7 @@ def test_mysql_operator_test_multi(self): ) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - @unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipUnless('mysql' in conf.get('core', 'sql_alchemy_conn'), "This is a MySQL test") def test_mysql_hook_test_bulk_load(self): records = ("foo", "bar", "baz") @@ -105,7 +106,7 @@ def test_mysql_hook_test_bulk_load(self): results = tuple(result[0] for result in c.fetchall()) self.assertEqual(sorted(results), sorted(records)) - @unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipUnless('mysql' in conf.get('core', 'sql_alchemy_conn'), "This is a MySQL test") def test_mysql_hook_test_bulk_dump(self): from airflow.hooks.mysql_hook import MySqlHook @@ -118,7 +119,7 @@ def test_mysql_hook_test_bulk_dump(self): self.skipTest("Skip test_mysql_hook_test_bulk_load " "since file output is not permitted") - @unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipUnless('mysql' in conf.get('core', 'sql_alchemy_conn'), "This is a MySQL test") @mock.patch('airflow.hooks.mysql_hook.MySqlHook.get_conn') def test_mysql_hook_test_bulk_dump_mock(self, mock_get_conn): @@ -139,7 +140,7 @@ def test_mysql_hook_test_bulk_dump_mock(self, mock_get_conn): """.format(tmp_file=tmp_file, table=table) assertEqualIgnoreMultipleSpaces(self, mock_execute.call_args[0][0], query) - @unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipUnless('mysql' in conf.get('core', 'sql_alchemy_conn'), "This is a MySQL test") def test_mysql_to_mysql(self): sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;" @@ -158,7 +159,7 @@ def test_mysql_to_mysql(self): dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - @unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipUnless('mysql' in conf.get('core', 'sql_alchemy_conn'), "This is a MySQL test") def test_overwrite_schema(self): """ @@ -196,7 +197,7 @@ def tearDown(self): for t in tables_to_drop: cur.execute("DROP TABLE IF EXISTS {}".format(t)) - @unittest.skipUnless('postgres' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipUnless('postgres' in conf.get('core', 'sql_alchemy_conn'), "This is a Postgres test") def test_postgres_operator_test(self): sql = """ @@ -218,7 +219,7 @@ def test_postgres_operator_test(self): end_date=DEFAULT_DATE, ignore_ti_state=True) - @unittest.skipUnless('postgres' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipUnless('postgres' in conf.get('core', 'sql_alchemy_conn'), "This is a Postgres test") def test_postgres_operator_test_multi(self): sql = [ @@ -231,7 +232,7 @@ def test_postgres_operator_test_multi(self): task_id='postgres_operator_test_multi', sql=sql, dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - @unittest.skipUnless('postgres' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipUnless('postgres' in conf.get('core', 'sql_alchemy_conn'), "This is a Postgres test") def test_postgres_to_postgres(self): sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;" @@ -250,7 +251,7 @@ def test_postgres_to_postgres(self): dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - @unittest.skipUnless('postgres' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipUnless('postgres' in conf.get('core', 'sql_alchemy_conn'), "This is a Postgres test") def test_vacuum(self): """ @@ -266,7 +267,7 @@ def test_vacuum(self): autocommit=True) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - @unittest.skipUnless('postgres' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipUnless('postgres' in conf.get('core', 'sql_alchemy_conn'), "This is a Postgres test") def test_overwrite_schema(self): """ @@ -372,14 +373,14 @@ def tearDown(self): with MySqlHook().get_conn() as cur: cur.execute("DROP TABLE IF EXISTS baby_names CASCADE;") - @unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipUnless('mysql' in conf.get('core', 'sql_alchemy_conn'), "This is a MySQL test") def test_clear(self): self.dag.clear( start_date=DEFAULT_DATE, end_date=timezone.utcnow()) - @unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipUnless('mysql' in conf.get('core', 'sql_alchemy_conn'), "This is a MySQL test") def test_mysql_to_hive(self): from airflow.operators.mysql_to_hive import MySqlToHiveTransfer @@ -394,7 +395,7 @@ def test_mysql_to_hive(self): dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - @unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipUnless('mysql' in conf.get('core', 'sql_alchemy_conn'), "This is a MySQL test") def test_mysql_to_hive_partition(self): from airflow.operators.mysql_to_hive import MySqlToHiveTransfer @@ -411,7 +412,7 @@ def test_mysql_to_hive_partition(self): dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - @unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipUnless('mysql' in conf.get('core', 'sql_alchemy_conn'), "This is a MySQL test") def test_mysql_to_hive_tblproperties(self): from airflow.operators.mysql_to_hive import MySqlToHiveTransfer @@ -427,7 +428,7 @@ def test_mysql_to_hive_tblproperties(self): dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - @unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipUnless('mysql' in conf.get('core', 'sql_alchemy_conn'), "This is a MySQL test") @mock.patch('airflow.hooks.hive_hooks.HiveCliHook.load_file') def test_mysql_to_hive_type_conversion(self, mock_load_file): @@ -472,7 +473,7 @@ def test_mysql_to_hive_type_conversion(self, mock_load_file): with m.get_conn() as c: c.execute("DROP TABLE IF EXISTS {}".format(mysql_table)) - @unittest.skipUnless('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipUnless('mysql' in conf.get('core', 'sql_alchemy_conn'), "This is a MySQL test") def test_mysql_to_hive_verify_loaded_values(self): mysql_table = 'test_mysql_to_hive' diff --git a/tests/security/test_kerberos.py b/tests/security/test_kerberos.py index ee3541f2646917..cac73e912859ec 100644 --- a/tests/security/test_kerberos.py +++ b/tests/security/test_kerberos.py @@ -21,7 +21,7 @@ import unittest from argparse import Namespace -from airflow import configuration +from airflow.configuration import conf from airflow.security.kerberos import renew_from_kt from airflow import LoggingMixin from tests.test_utils.config import conf_vars @@ -31,11 +31,11 @@ 'Skipping Kerberos API tests due to missing KRB5_KTNAME') class KerberosTest(unittest.TestCase): def setUp(self): - if not configuration.conf.has_section("kerberos"): - configuration.conf.add_section("kerberos") - configuration.conf.set("kerberos", "keytab", - os.environ['KRB5_KTNAME']) - keytab_from_cfg = configuration.conf.get("kerberos", "keytab") + if not conf.has_section("kerberos"): + conf.add_section("kerberos") + conf.set("kerberos", "keytab", + os.environ['KRB5_KTNAME']) + keytab_from_cfg = conf.get("kerberos", "keytab") self.args = Namespace(keytab=keytab_from_cfg, principal=None, pid=None, daemon=None, stdout=None, stderr=None, log_file=None) diff --git a/tests/sensors/test_sql_sensor.py b/tests/sensors/test_sql_sensor.py index b47ea8f521d412..3c297c3e8ec6c1 100644 --- a/tests/sensors/test_sql_sensor.py +++ b/tests/sensors/test_sql_sensor.py @@ -20,7 +20,7 @@ import unittest from airflow import DAG -from airflow import configuration +from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.sensors.sql_sensor import SqlSensor from airflow.utils.timezone import datetime @@ -49,7 +49,7 @@ def test_unsupported_conn_type(self): t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) @unittest.skipUnless( - 'mysql' in configuration.conf.get('core', 'sql_alchemy_conn'), "this is a mysql test") + 'mysql' in conf.get('core', 'sql_alchemy_conn'), "this is a mysql test") def test_sql_sensor_mysql(self): t1 = SqlSensor( task_id='sql_sensor_check', @@ -69,7 +69,7 @@ def test_sql_sensor_mysql(self): t2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) @unittest.skipUnless( - 'postgresql' in configuration.conf.get('core', 'sql_alchemy_conn'), "this is a postgres test") + 'postgresql' in conf.get('core', 'sql_alchemy_conn'), "this is a postgres test") def test_sql_sensor_postgres(self): t1 = SqlSensor( task_id='sql_sensor_check', diff --git a/tests/test_configuration.py b/tests/test_configuration.py index 6e10bafffdd092..f2ca9589104581 100644 --- a/tests/test_configuration.py +++ b/tests/test_configuration.py @@ -29,6 +29,7 @@ from airflow import configuration from airflow.configuration import conf, AirflowConfigParser, parameterized_config +from tests.compat import mock if six.PY2: # Need `assertWarns` back-ported from unittest2 @@ -438,3 +439,10 @@ def make_config(): self.assertEqual(test_conf.get('core', 'task_runner'), 'NotBashTaskRunner') self.assertListEqual([], w) + + def test_deprecated_funcs(self): + for func in ['load_test_config', 'get', 'getboolean', 'getfloat', 'getint', 'has_option', + 'remove_option', 'as_dict', 'set']: + with mock.patch('airflow.configuration.{}'.format(func)): + with self.assertWarns(DeprecationWarning): + getattr(configuration, func)() diff --git a/tests/test_logging_config.py b/tests/test_logging_config.py index 67667cb2a40acf..f0c7b44b3defb6 100644 --- a/tests/test_logging_config.py +++ b/tests/test_logging_config.py @@ -21,10 +21,8 @@ import sys import tempfile -from airflow import configuration as conf -from airflow.configuration import mkdir_p -from airflow.exceptions import AirflowConfigException -from tests.compat import mock, patch +from airflow.configuration import conf, mkdir_p +from tests.compat import patch from tests.test_utils.config import conf_vars @@ -217,21 +215,12 @@ def test_loading_no_local_settings(self): # When the key is not available in the configuration def test_when_the_config_key_does_not_exists(self): from airflow import logging_config - conf_get = conf.get - - def side_effect(*args): - if args[1] == 'logging_config_class': - raise AirflowConfigException - else: - return conf_get(*args) - - logging_config.conf.get = mock.Mock(side_effect=side_effect) - - with patch.object(logging_config.log, 'debug') as mock_debug: - logging_config.configure_logging() - mock_debug.assert_any_call( - 'Could not find key logging_config_class in config' - ) + with conf_vars({('core', 'logging_config_class'): None}): + with patch.object(logging_config.log, 'debug') as mock_debug: + logging_config.configure_logging() + mock_debug.assert_any_call( + 'Could not find key logging_config_class in config' + ) # Just default def test_loading_local_settings_without_logging_config(self): diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py index b8b0e057ee72f3..0bcc8d88180d14 100644 --- a/tests/utils/test_dag_processing.py +++ b/tests/utils/test_dag_processing.py @@ -26,8 +26,7 @@ from mock import MagicMock, PropertyMock -from airflow import configuration as conf -from airflow.configuration import mkdir_p +from airflow.configuration import conf, mkdir_p from airflow.jobs import DagFileProcessor from airflow.utils import timezone from airflow.utils.dag_processing import (DagFileProcessorAgent, DagFileProcessorManager, diff --git a/tests/www/api/experimental/test_kerberos_endpoints.py b/tests/www/api/experimental/test_kerberos_endpoints.py index 16337e2316e7c4..ae5c547d725255 100644 --- a/tests/www/api/experimental/test_kerberos_endpoints.py +++ b/tests/www/api/experimental/test_kerberos_endpoints.py @@ -24,7 +24,7 @@ from datetime import datetime -from airflow import configuration +from airflow.configuration import conf from airflow.api.auth.backend.kerberos_auth import CLIENT_AUTH from airflow.utils.net import get_hostname from airflow.www import app as application @@ -36,9 +36,9 @@ @conf_vars({('api', 'auth_backend'): 'airflow.contrib.auth.backends.kerberos_auth'}) class ApiKerberosTests(unittest.TestCase): def setUp(self): - configuration.conf.set("kerberos", - "keytab", - os.environ['KRB5_KTNAME']) + conf.set("kerberos", + "keytab", + os.environ['KRB5_KTNAME']) self.app = application.create_app(testing=True) diff --git a/tests/www/test_views.py b/tests/www/test_views.py index 0c518f727963cc..96a193a9417e2d 100644 --- a/tests/www/test_views.py +++ b/tests/www/test_views.py @@ -34,14 +34,14 @@ import airflow -from airflow import models, configuration +from airflow import models +from airflow.configuration import conf from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.models import DAG, DagRun, TaskInstance from airflow.operators.dummy_operator import DummyOperator from airflow.settings import Session from airflow.utils.timezone import datetime from airflow.www import app as application -from airflow import configuration as conf from tests.test_utils.config import conf_vars @@ -673,7 +673,7 @@ def test_with_execution_date_parameter_only(self): data = response.data.decode('utf-8') self.assertBaseDateAndNumRuns( self.runs[1].execution_date, - configuration.getint('webserver', 'default_dag_run_display_number'), + conf.getint('webserver', 'default_dag_run_display_number'), data) self.assertRunIsNotInDropdown(self.runs[0], data) self.assertRunIsSelected(self.runs[1], data) diff --git a/tests/www_rbac/api/experimental/test_kerberos_endpoints.py b/tests/www_rbac/api/experimental/test_kerberos_endpoints.py index 6d2ca42e359a6c..149fcbc7461449 100644 --- a/tests/www_rbac/api/experimental/test_kerberos_endpoints.py +++ b/tests/www_rbac/api/experimental/test_kerberos_endpoints.py @@ -25,7 +25,7 @@ from datetime import datetime -from airflow import configuration +from airflow.configuration import conf from airflow.api.auth.backend.kerberos_auth import CLIENT_AUTH from airflow.www_rbac import app as application from tests.test_utils.config import conf_vars @@ -36,7 +36,7 @@ @conf_vars({('api', 'auth_backend'): 'airflow.contrib.auth.backends.kerberos_auth'}) class ApiKerberosTests(unittest.TestCase): def setUp(self): - configuration.conf.set("kerberos", + conf.set("kerberos", "keytab", os.environ['KRB5_KTNAME']) diff --git a/tests/www_rbac/test_views.py b/tests/www_rbac/test_views.py index a74d380c814ef4..f5f358982dfad8 100644 --- a/tests/www_rbac/test_views.py +++ b/tests/www_rbac/test_views.py @@ -37,8 +37,8 @@ from werkzeug.test import Client from werkzeug.wrappers import BaseResponse -from airflow import configuration as conf from airflow import models, settings +from airflow.configuration import conf from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.jobs import BaseJob from airflow.models import BaseOperator, Connection, DAG, DagRun, TaskInstance @@ -1720,7 +1720,7 @@ def test_trigger_dag_button_normal_exist(self): self.assertIn('/trigger?dag_id=example_bash_operator', resp.data.decode('utf-8')) self.assertIn("return confirmDeleteDag(this, 'example_bash_operator')", resp.data.decode('utf-8')) - @unittest.skipIf('mysql' in conf.conf.get('core', 'sql_alchemy_conn'), + @unittest.skipIf('mysql' in conf.get('core', 'sql_alchemy_conn'), "flaky when run on mysql") def test_trigger_dag_button(self):