Skip to content

Commit

Permalink
[AIRFLOW-4858] Deprecate "Historical convenience functions" in airflo…
Browse files Browse the repository at this point in the history
…w.configuration (#5495)

1. Issue old conf method deprecation warnings properly and remove current old conf method usages.
2. Unify the way to use conf as `from airflow.configuration import conf`

(cherry picked from commit f497d1d)
  • Loading branch information
haoliang7 authored and ashb committed Sep 24, 2019
1 parent 914bb92 commit d0b8e1f
Show file tree
Hide file tree
Showing 79 changed files with 389 additions and 387 deletions.
6 changes: 3 additions & 3 deletions airflow/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import lazy_object_proxy
from zope.deprecation import deprecated

from airflow.exceptions import AirflowException
from airflow import configuration as conf
from airflow.exceptions import AirflowException, AirflowConfigException
from airflow.configuration import conf

from airflow.utils.log.logging_mixin import LoggingMixin

Expand All @@ -49,7 +49,7 @@ def load_auth():
auth_backend = 'airflow.api.auth.backend.default'
try:
auth_backend = conf.get("api", "auth_backend")
except conf.AirflowConfigException:
except AirflowConfigException:
pass

try:
Expand Down
2 changes: 1 addition & 1 deletion airflow/api/auth/backend/kerberos_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@

from requests_kerberos import HTTPKerberosAuth

from airflow import configuration as conf
from airflow.configuration import conf
from airflow.utils.log.logging_mixin import LoggingMixin


Expand Down
8 changes: 4 additions & 4 deletions airflow/bin/airflow
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
# specific language governing permissions and limitations
# under the License.
import os
from airflow import configuration
from airflow.configuration import conf
from airflow.bin.cli import CLIFactory

if __name__ == '__main__':

if configuration.conf.get("core", "security") == 'kerberos':
os.environ['KRB5CCNAME'] = configuration.conf.get('kerberos', 'ccache')
os.environ['KRB5_KTNAME'] = configuration.conf.get('kerberos', 'keytab')
if conf.get("core", "security") == 'kerberos':
os.environ['KRB5CCNAME'] = conf.get('kerberos', 'ccache')
os.environ['KRB5_KTNAME'] = conf.get('kerberos', 'keytab')

parser = CLIFactory.get_parser()
args = parser.parse_args()
Expand Down
4 changes: 2 additions & 2 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import airflow
from airflow import api
from airflow import jobs, settings
from airflow import configuration as conf
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowWebServerTimeout
from airflow.executors import get_default_executor
from airflow.models import (
Expand Down Expand Up @@ -516,7 +516,7 @@ def run(args, dag=None):
if os.path.exists(args.cfg_path):
os.remove(args.cfg_path)

conf.conf.read_dict(conf_dict, source=args.cfg_path)
conf.read_dict(conf_dict, source=args.cfg_path)
settings.configure_vars()

# IMPORTANT, have to use the NullPool, otherwise, each "run" command may leave
Expand Down
2 changes: 1 addition & 1 deletion airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import six

from airflow import configuration as conf
from airflow.configuration import conf
from airflow.utils.file import mkdirs

# TODO: Logging format and level should be configured
Expand Down
28 changes: 14 additions & 14 deletions airflow/config_templates/default_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"""Default celery configuration."""
import ssl

from airflow import configuration
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin

Expand All @@ -30,9 +30,9 @@ def _broker_supports_visibility_timeout(url):

log = LoggingMixin().log

broker_url = configuration.conf.get('celery', 'BROKER_URL')
broker_url = conf.get('celery', 'BROKER_URL')

broker_transport_options = configuration.conf.getsection(
broker_transport_options = conf.getsection(
'celery_broker_transport_options'
)
if 'visibility_timeout' not in broker_transport_options:
Expand All @@ -44,31 +44,31 @@ def _broker_supports_visibility_timeout(url):
'event_serializer': 'json',
'worker_prefetch_multiplier': 1,
'task_acks_late': True,
'task_default_queue': configuration.conf.get('celery', 'DEFAULT_QUEUE'),
'task_default_exchange': configuration.conf.get('celery', 'DEFAULT_QUEUE'),
'task_default_queue': conf.get('celery', 'DEFAULT_QUEUE'),
'task_default_exchange': conf.get('celery', 'DEFAULT_QUEUE'),
'broker_url': broker_url,
'broker_transport_options': broker_transport_options,
'result_backend': configuration.conf.get('celery', 'RESULT_BACKEND'),
'worker_concurrency': configuration.conf.getint('celery', 'WORKER_CONCURRENCY'),
'result_backend': conf.get('celery', 'RESULT_BACKEND'),
'worker_concurrency': conf.getint('celery', 'WORKER_CONCURRENCY'),
}

celery_ssl_active = False
try:
celery_ssl_active = configuration.conf.getboolean('celery', 'SSL_ACTIVE')
celery_ssl_active = conf.getboolean('celery', 'SSL_ACTIVE')
except AirflowConfigException:
log.warning("Celery Executor will run without SSL")

try:
if celery_ssl_active:
if 'amqp://' in broker_url:
broker_use_ssl = {'keyfile': configuration.conf.get('celery', 'SSL_KEY'),
'certfile': configuration.conf.get('celery', 'SSL_CERT'),
'ca_certs': configuration.conf.get('celery', 'SSL_CACERT'),
broker_use_ssl = {'keyfile': conf.get('celery', 'SSL_KEY'),
'certfile': conf.get('celery', 'SSL_CERT'),
'ca_certs': conf.get('celery', 'SSL_CACERT'),
'cert_reqs': ssl.CERT_REQUIRED}
elif 'redis://' in broker_url:
broker_use_ssl = {'ssl_keyfile': configuration.conf.get('celery', 'SSL_KEY'),
'ssl_certfile': configuration.conf.get('celery', 'SSL_CERT'),
'ssl_ca_certs': configuration.conf.get('celery', 'SSL_CACERT'),
broker_use_ssl = {'ssl_keyfile': conf.get('celery', 'SSL_KEY'),
'ssl_certfile': conf.get('celery', 'SSL_CERT'),
'ssl_ca_certs': conf.get('celery', 'SSL_CACERT'),
'ssl_cert_reqs': ssl.CERT_REQUIRED}
else:
raise AirflowException('The broker you configured does not support SSL_ACTIVE to be True. '
Expand Down
2 changes: 1 addition & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ def get_airflow_test_config(airflow_home):
for func in [load_test_config, get, getboolean, getfloat, getint, has_option,
remove_option, as_dict, set]:
deprecated(
func,
func.__name__,
"Accessing configuration method '{f.__name__}' directly from "
"the configuration module is deprecated. Please access the "
"configuration from the 'configuration.conf' object via "
Expand Down
6 changes: 3 additions & 3 deletions airflow/contrib/auth/backends/github_enterprise_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@

from flask_oauthlib.client import OAuth

from airflow import models, configuration
from airflow.configuration import AirflowConfigException
from airflow import models
from airflow.configuration import AirflowConfigException, conf
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin

log = LoggingMixin().log


def get_config_param(param):
return str(configuration.conf.get('github_enterprise', param))
return str(conf.get('github_enterprise', param))


class GHEUser(models.User):
Expand Down
5 changes: 3 additions & 2 deletions airflow/contrib/auth/backends/google_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@

from flask_oauthlib.client import OAuth

from airflow import models, configuration
from airflow import models
from airflow.configuration import conf
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin

log = LoggingMixin().log


def get_config_param(param):
return str(configuration.conf.get('google', param))
return str(conf.get('google', param))


class GoogleUser(models.User):
Expand Down
8 changes: 4 additions & 4 deletions airflow/contrib/auth/backends/kerberos_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from flask import url_for, redirect

from airflow import models
from airflow import configuration
from airflow.configuration import conf
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin

Expand All @@ -55,13 +55,13 @@ def __init__(self, user):
@staticmethod
def authenticate(username, password):
service_principal = "%s/%s" % (
configuration.conf.get('kerberos', 'principal'),
conf.get('kerberos', 'principal'),
utils.get_fqdn()
)
realm = configuration.conf.get("kerberos", "default_realm")
realm = conf.get("kerberos", "default_realm")

try:
user_realm = configuration.conf.get("security", "default_realm")
user_realm = conf.get("security", "default_realm")
except AirflowConfigException:
user_realm = realm

Expand Down
50 changes: 25 additions & 25 deletions airflow/contrib/auth/backends/ldap_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from flask import url_for, redirect

from airflow import models
from airflow import configuration
from airflow.configuration import conf
from airflow.configuration import AirflowConfigException
from airflow.utils.db import provide_session

Expand All @@ -56,12 +56,12 @@ class LdapException(Exception):

def get_ldap_connection(dn=None, password=None):
try:
cacert = configuration.conf.get("ldap", "cacert")
cacert = conf.get("ldap", "cacert")
except AirflowConfigException:
pass

try:
ignore_malformed_schema = configuration.conf.get("ldap", "ignore_malformed_schema")
ignore_malformed_schema = conf.get("ldap", "ignore_malformed_schema")
except AirflowConfigException:
pass

Expand All @@ -71,7 +71,7 @@ def get_ldap_connection(dn=None, password=None):
tls_configuration = Tls(validate=ssl.CERT_REQUIRED,
ca_certs_file=cacert)

server = Server(configuration.conf.get("ldap", "uri"),
server = Server(conf.get("ldap", "uri"),
use_ssl=True,
tls=tls_configuration)

Expand Down Expand Up @@ -102,7 +102,7 @@ def group_contains_user(conn, search_base, group_filter, user_name_attr, usernam
def groups_user(conn, search_base, user_filter, user_name_att, username):
search_filter = "(&({0})({1}={2}))".format(user_filter, user_name_att, username)
try:
memberof_attr = configuration.conf.get("ldap", "group_member_attr")
memberof_attr = conf.get("ldap", "group_member_attr")
except Exception:
memberof_attr = "memberOf"
res = conn.search(native(search_base), native(search_filter),
Expand Down Expand Up @@ -138,13 +138,13 @@ def __init__(self, user):
self.ldap_groups = []

# Load and cache superuser and data_profiler settings.
conn = get_ldap_connection(configuration.conf.get("ldap", "bind_user"),
configuration.conf.get("ldap", "bind_password"))
conn = get_ldap_connection(conf.get("ldap", "bind_user"),
conf.get("ldap", "bind_password"))

superuser_filter = None
data_profiler_filter = None
try:
superuser_filter = configuration.conf.get("ldap", "superuser_filter")
superuser_filter = conf.get("ldap", "superuser_filter")
except AirflowConfigException:
pass

Expand All @@ -153,14 +153,14 @@ def __init__(self, user):
log.debug("Missing configuration for superuser settings or empty. Skipping.")
else:
self.superuser = group_contains_user(conn,
configuration.conf.get("ldap", "basedn"),
conf.get("ldap", "basedn"),
superuser_filter,
configuration.conf.get("ldap",
"user_name_attr"),
conf.get("ldap",
"user_name_attr"),
user.username)

try:
data_profiler_filter = configuration.conf.get("ldap", "data_profiler_filter")
data_profiler_filter = conf.get("ldap", "data_profiler_filter")
except AirflowConfigException:
pass

Expand All @@ -171,46 +171,46 @@ def __init__(self, user):
else:
self.data_profiler = group_contains_user(
conn,
configuration.conf.get("ldap", "basedn"),
conf.get("ldap", "basedn"),
data_profiler_filter,
configuration.conf.get("ldap",
"user_name_attr"),
conf.get("ldap",
"user_name_attr"),
user.username
)

# Load the ldap group(s) a user belongs to
try:
self.ldap_groups = groups_user(
conn,
configuration.conf.get("ldap", "basedn"),
configuration.conf.get("ldap", "user_filter"),
configuration.conf.get("ldap", "user_name_attr"),
conf.get("ldap", "basedn"),
conf.get("ldap", "user_filter"),
conf.get("ldap", "user_name_attr"),
user.username
)
except AirflowConfigException:
log.debug("Missing configuration for ldap settings. Skipping")

@staticmethod
def try_login(username, password):
conn = get_ldap_connection(configuration.conf.get("ldap", "bind_user"),
configuration.conf.get("ldap", "bind_password"))
conn = get_ldap_connection(conf.get("ldap", "bind_user"),
conf.get("ldap", "bind_password"))

search_filter = "(&({0})({1}={2}))".format(
configuration.conf.get("ldap", "user_filter"),
configuration.conf.get("ldap", "user_name_attr"),
conf.get("ldap", "user_filter"),
conf.get("ldap", "user_name_attr"),
username
)

search_scope = LEVEL
if configuration.conf.has_option("ldap", "search_scope"):
if configuration.conf.get("ldap", "search_scope") == "SUBTREE":
if conf.has_option("ldap", "search_scope"):
if conf.get("ldap", "search_scope") == "SUBTREE":
search_scope = SUBTREE
else:
search_scope = LEVEL

# todo: BASE or ONELEVEL?

res = conn.search(native(configuration.conf.get("ldap", "basedn")),
res = conn.search(native(conf.get("ldap", "basedn")),
native(search_filter),
search_scope=native(search_scope))

Expand Down
Loading

0 comments on commit d0b8e1f

Please sign in to comment.