Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-4858] Deprecate "Historical convenience functions" in conf #5495

Merged
merged 2 commits into from
Sep 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions airflow/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

from importlib import import_module

from airflow.exceptions import AirflowException
from airflow import configuration as conf
from airflow.exceptions import AirflowException, AirflowConfigException
from airflow.configuration import conf

from airflow.utils.log.logging_mixin import LoggingMixin

Expand All @@ -42,7 +42,7 @@ def load_auth():
auth_backend = 'airflow.api.auth.backend.default'
try:
auth_backend = conf.get("api", "auth_backend")
except conf.AirflowConfigException:
except AirflowConfigException:
pass

try:
Expand Down
2 changes: 1 addition & 1 deletion airflow/api/auth/backend/kerberos_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

from requests_kerberos import HTTPKerberosAuth

from airflow import configuration as conf
from airflow.configuration import conf
from airflow.utils.log.logging_mixin import LoggingMixin


Expand Down
8 changes: 4 additions & 4 deletions airflow/bin/airflow
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import os
import argcomplete

from airflow import configuration
from airflow.configuration import conf
from airflow.bin.cli import CLIFactory

if __name__ == '__main__':

if configuration.conf.get("core", "security") == 'kerberos':
os.environ['KRB5CCNAME'] = configuration.conf.get('kerberos', 'ccache')
os.environ['KRB5_KTNAME'] = configuration.conf.get('kerberos', 'keytab')
if conf.get("core", "security") == 'kerberos':
os.environ['KRB5CCNAME'] = conf.get('kerberos', 'ccache')
os.environ['KRB5_KTNAME'] = conf.get('kerberos', 'keytab')

parser = CLIFactory.get_parser()
argcomplete.autocomplete(parser)
Expand Down
4 changes: 2 additions & 2 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import airflow
from airflow import api
from airflow import jobs, settings
from airflow import configuration as conf
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowWebServerTimeout
from airflow.executors import get_default_executor
from airflow.models import (
Expand Down Expand Up @@ -507,7 +507,7 @@ def run(args, dag=None):
if os.path.exists(args.cfg_path):
os.remove(args.cfg_path)

conf.conf.read_dict(conf_dict, source=args.cfg_path)
conf.read_dict(conf_dict, source=args.cfg_path)
settings.configure_vars()

# IMPORTANT, have to use the NullPool, otherwise, each "run" command may leave
Expand Down
2 changes: 1 addition & 1 deletion airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import os
from typing import Dict, Any

from airflow import configuration as conf
from airflow.configuration import conf
from airflow.utils.file import mkdirs

# TODO: Logging format and level should be configured
Expand Down
28 changes: 14 additions & 14 deletions airflow/config_templates/default_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import ssl

from airflow import configuration
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin

Expand All @@ -30,9 +30,9 @@ def _broker_supports_visibility_timeout(url):

log = LoggingMixin().log

broker_url = configuration.conf.get('celery', 'BROKER_URL')
broker_url = conf.get('celery', 'BROKER_URL')

broker_transport_options = configuration.conf.getsection(
broker_transport_options = conf.getsection(
'celery_broker_transport_options'
)
if 'visibility_timeout' not in broker_transport_options:
Expand All @@ -44,31 +44,31 @@ def _broker_supports_visibility_timeout(url):
'event_serializer': 'json',
'worker_prefetch_multiplier': 1,
'task_acks_late': True,
'task_default_queue': configuration.conf.get('celery', 'DEFAULT_QUEUE'),
'task_default_exchange': configuration.conf.get('celery', 'DEFAULT_QUEUE'),
'task_default_queue': conf.get('celery', 'DEFAULT_QUEUE'),
'task_default_exchange': conf.get('celery', 'DEFAULT_QUEUE'),
'broker_url': broker_url,
'broker_transport_options': broker_transport_options,
'result_backend': configuration.conf.get('celery', 'RESULT_BACKEND'),
'worker_concurrency': configuration.conf.getint('celery', 'WORKER_CONCURRENCY'),
'result_backend': conf.get('celery', 'RESULT_BACKEND'),
'worker_concurrency': conf.getint('celery', 'WORKER_CONCURRENCY'),
}

celery_ssl_active = False
try:
celery_ssl_active = configuration.conf.getboolean('celery', 'SSL_ACTIVE')
celery_ssl_active = conf.getboolean('celery', 'SSL_ACTIVE')
except AirflowConfigException:
log.warning("Celery Executor will run without SSL")

try:
if celery_ssl_active:
if 'amqp://' in broker_url:
broker_use_ssl = {'keyfile': configuration.conf.get('celery', 'SSL_KEY'),
'certfile': configuration.conf.get('celery', 'SSL_CERT'),
'ca_certs': configuration.conf.get('celery', 'SSL_CACERT'),
broker_use_ssl = {'keyfile': conf.get('celery', 'SSL_KEY'),
'certfile': conf.get('celery', 'SSL_CERT'),
'ca_certs': conf.get('celery', 'SSL_CACERT'),
'cert_reqs': ssl.CERT_REQUIRED}
elif 'redis://' in broker_url:
broker_use_ssl = {'ssl_keyfile': configuration.conf.get('celery', 'SSL_KEY'),
'ssl_certfile': configuration.conf.get('celery', 'SSL_CERT'),
'ssl_ca_certs': configuration.conf.get('celery', 'SSL_CACERT'),
broker_use_ssl = {'ssl_keyfile': conf.get('celery', 'SSL_KEY'),
'ssl_certfile': conf.get('celery', 'SSL_CERT'),
'ssl_ca_certs': conf.get('celery', 'SSL_CACERT'),
'ssl_cert_reqs': ssl.CERT_REQUIRED}
else:
raise AirflowException('The broker you configured does not support SSL_ACTIVE to be True. '
Expand Down
2 changes: 1 addition & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ def get_airflow_test_config(airflow_home):
for func in [load_test_config, get, getboolean, getfloat, getint, has_option,
remove_option, as_dict, set]:
deprecated(
func,
func.__name__,
"Accessing configuration method '{f.__name__}' directly from "
"the configuration module is deprecated. Please access the "
"configuration from the 'configuration.conf' object via "
Expand Down
6 changes: 3 additions & 3 deletions airflow/contrib/auth/backends/github_enterprise_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@

from flask_oauthlib.client import OAuth

from airflow import models, configuration
from airflow.configuration import AirflowConfigException
from airflow import models
from airflow.configuration import AirflowConfigException, conf
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin

log = LoggingMixin().log


def get_config_param(param):
return str(configuration.conf.get('github_enterprise', param))
return str(conf.get('github_enterprise', param))


class GHEUser(models.User):
Expand Down
5 changes: 3 additions & 2 deletions airflow/contrib/auth/backends/google_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@

from flask_oauthlib.client import OAuth

from airflow import models, configuration
from airflow import models
from airflow.configuration import conf
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin

log = LoggingMixin().log


def get_config_param(param):
return str(configuration.conf.get('google', param))
return str(conf.get('google', param))


class GoogleUser(models.User):
Expand Down
8 changes: 4 additions & 4 deletions airflow/contrib/auth/backends/kerberos_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from flask import url_for, redirect

from airflow import models
from airflow import configuration
from airflow.configuration import conf
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin

Expand All @@ -55,13 +55,13 @@ def __init__(self, user):
@staticmethod
def authenticate(username, password):
service_principal = "%s/%s" % (
configuration.conf.get('kerberos', 'principal'),
conf.get('kerberos', 'principal'),
utils.get_fqdn()
)
realm = configuration.conf.get("kerberos", "default_realm")
realm = conf.get("kerberos", "default_realm")

try:
user_realm = configuration.conf.get("security", "default_realm")
user_realm = conf.get("security", "default_realm")
except AirflowConfigException:
user_realm = realm

Expand Down
50 changes: 25 additions & 25 deletions airflow/contrib/auth/backends/ldap_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from flask import url_for, redirect

from airflow import models
from airflow import configuration
from airflow.configuration import conf
from airflow.configuration import AirflowConfigException
from airflow.utils.db import provide_session

Expand All @@ -55,12 +55,12 @@ class LdapException(Exception):

def get_ldap_connection(dn=None, password=None):
try:
cacert = configuration.conf.get("ldap", "cacert")
cacert = conf.get("ldap", "cacert")
except AirflowConfigException:
pass

try:
ignore_malformed_schema = configuration.conf.get("ldap", "ignore_malformed_schema")
ignore_malformed_schema = conf.get("ldap", "ignore_malformed_schema")
except AirflowConfigException:
pass

Expand All @@ -70,7 +70,7 @@ def get_ldap_connection(dn=None, password=None):
tls_configuration = Tls(validate=ssl.CERT_REQUIRED,
ca_certs_file=cacert)

server = Server(configuration.conf.get("ldap", "uri"),
server = Server(conf.get("ldap", "uri"),
use_ssl=True,
tls=tls_configuration)

Expand Down Expand Up @@ -100,7 +100,7 @@ def group_contains_user(conn, search_base, group_filter, user_name_attr, usernam
def groups_user(conn, search_base, user_filter, user_name_att, username):
search_filter = "(&({0})({1}={2}))".format(user_filter, user_name_att, username)
try:
memberof_attr = configuration.conf.get("ldap", "group_member_attr")
memberof_attr = conf.get("ldap", "group_member_attr")
except Exception:
memberof_attr = "memberOf"
res = conn.search(search_base, search_filter, attributes=[memberof_attr])
Expand Down Expand Up @@ -135,13 +135,13 @@ def __init__(self, user):
self.ldap_groups = []

# Load and cache superuser and data_profiler settings.
conn = get_ldap_connection(configuration.conf.get("ldap", "bind_user"),
configuration.conf.get("ldap", "bind_password"))
conn = get_ldap_connection(conf.get("ldap", "bind_user"),
conf.get("ldap", "bind_password"))

superuser_filter = None
data_profiler_filter = None
try:
superuser_filter = configuration.conf.get("ldap", "superuser_filter")
superuser_filter = conf.get("ldap", "superuser_filter")
except AirflowConfigException:
pass

Expand All @@ -150,14 +150,14 @@ def __init__(self, user):
log.debug("Missing configuration for superuser settings or empty. Skipping.")
else:
self.superuser = group_contains_user(conn,
configuration.conf.get("ldap", "basedn"),
conf.get("ldap", "basedn"),
superuser_filter,
configuration.conf.get("ldap",
"user_name_attr"),
conf.get("ldap",
"user_name_attr"),
user.username)

try:
data_profiler_filter = configuration.conf.get("ldap", "data_profiler_filter")
data_profiler_filter = conf.get("ldap", "data_profiler_filter")
except AirflowConfigException:
pass

Expand All @@ -168,46 +168,46 @@ def __init__(self, user):
else:
self.data_profiler = group_contains_user(
conn,
configuration.conf.get("ldap", "basedn"),
conf.get("ldap", "basedn"),
data_profiler_filter,
configuration.conf.get("ldap",
"user_name_attr"),
conf.get("ldap",
"user_name_attr"),
user.username
)

# Load the ldap group(s) a user belongs to
try:
self.ldap_groups = groups_user(
conn,
configuration.conf.get("ldap", "basedn"),
configuration.conf.get("ldap", "user_filter"),
configuration.conf.get("ldap", "user_name_attr"),
conf.get("ldap", "basedn"),
conf.get("ldap", "user_filter"),
conf.get("ldap", "user_name_attr"),
user.username
)
except AirflowConfigException:
log.debug("Missing configuration for ldap settings. Skipping")

@staticmethod
def try_login(username, password):
conn = get_ldap_connection(configuration.conf.get("ldap", "bind_user"),
configuration.conf.get("ldap", "bind_password"))
conn = get_ldap_connection(conf.get("ldap", "bind_user"),
conf.get("ldap", "bind_password"))

search_filter = "(&({0})({1}={2}))".format(
configuration.conf.get("ldap", "user_filter"),
configuration.conf.get("ldap", "user_name_attr"),
conf.get("ldap", "user_filter"),
conf.get("ldap", "user_name_attr"),
username
)

search_scope = LEVEL
if configuration.conf.has_option("ldap", "search_scope"):
if configuration.conf.get("ldap", "search_scope") == "SUBTREE":
if conf.has_option("ldap", "search_scope"):
if conf.get("ldap", "search_scope") == "SUBTREE":
search_scope = SUBTREE
else:
search_scope = LEVEL

# todo: BASE or ONELEVEL?

res = conn.search(configuration.conf.get("ldap", "basedn"), search_filter, search_scope=search_scope)
res = conn.search(conf.get("ldap", "basedn"), search_filter, search_scope=search_scope)

# todo: use list or result?
if not res:
Expand Down
4 changes: 2 additions & 2 deletions airflow/contrib/hooks/qubole_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow import configuration
from airflow.configuration import conf
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
from airflow.models import TaskInstance
Expand Down Expand Up @@ -174,7 +174,7 @@ def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True):
if fp is None:
iso = datetime.datetime.utcnow().isoformat()
logpath = os.path.expanduser(
configuration.conf.get('core', 'BASE_LOG_FOLDER')
conf.get('core', 'BASE_LOG_FOLDER')
)
resultpath = logpath + '/' + self.dag_id + '/' + self.task_id + '/results'
pathlib.Path(resultpath).mkdir(parents=True, exist_ok=True)
Expand Down
4 changes: 2 additions & 2 deletions airflow/contrib/operators/ssh_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from base64 import b64encode
from select import select

from airflow import configuration
from airflow.configuration import conf
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
Expand Down Expand Up @@ -152,7 +152,7 @@ def execute(self, context):

exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
enable_pickling = configuration.conf.getboolean(
enable_pickling = conf.getboolean(
'core', 'enable_xcom_pickling'
)
if enable_pickling:
Expand Down
Loading