diff --git a/airflow/settings.py b/airflow/settings.py index 479694d92b0702..30850ed28d9577 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -158,12 +158,35 @@ def configure_orm(disable_connection_pool=False): log.debug("Setting up DB connection pool (PID %s)", os.getpid()) global engine global Session - engine_args = {} + engine_args = prepare_engine_args(disable_connection_pool) + + # Allow the user to specify an encoding for their DB otherwise default + # to utf-8 so jobs & users with non-latin1 characters can still use us. + engine_args['encoding'] = conf.get('core', 'SQL_ENGINE_ENCODING', fallback='utf-8') + + if conf.has_option('core', 'sql_alchemy_connect_args'): + connect_args = conf.getimport('core', 'sql_alchemy_connect_args') + else: + connect_args = {} + + engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args) + setup_event_handlers(engine) + + Session = scoped_session(sessionmaker( + autocommit=False, + autoflush=False, + bind=engine, + expire_on_commit=False, + )) + +def prepare_engine_args(disable_connection_pool=False): + """Prepare SQLAlchemy engine args""" + engine_args = {} pool_connections = conf.getboolean('core', 'SQL_ALCHEMY_POOL_ENABLED') if disable_connection_pool or not pool_connections: engine_args['poolclass'] = NullPool - log.debug("settings.configure_orm(): Using NullPool") + log.debug("settings.prepare_engine_args(): Using NullPool") elif 'sqlite' not in SQL_ALCHEMY_CONN: # Pool size engine args not supported by sqlite. # If no config value is defined for the pool size, select a reasonable value. @@ -195,30 +218,13 @@ def configure_orm(disable_connection_pool=False): # https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic pool_pre_ping = conf.getboolean('core', 'SQL_ALCHEMY_POOL_PRE_PING', fallback=True) - log.debug("settings.configure_orm(): Using pool settings. pool_size=%d, max_overflow=%d, " + log.debug("settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, " "pool_recycle=%d, pid=%d", pool_size, max_overflow, pool_recycle, os.getpid()) engine_args['pool_size'] = pool_size engine_args['pool_recycle'] = pool_recycle engine_args['pool_pre_ping'] = pool_pre_ping engine_args['max_overflow'] = max_overflow - - # Allow the user to specify an encoding for their DB otherwise default - # to utf-8 so jobs & users with non-latin1 characters can still use us. - engine_args['encoding'] = conf.get('core', 'SQL_ENGINE_ENCODING', fallback='utf-8') - - if conf.has_option('core', 'sql_alchemy_connect_args'): - connect_args = conf.getimport('core', 'sql_alchemy_connect_args') - else: - connect_args = {} - - engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args) - setup_event_handlers(engine) - - Session = scoped_session( - sessionmaker(autocommit=False, - autoflush=False, - bind=engine, - expire_on_commit=False)) + return engine_args def dispose_orm(): diff --git a/airflow/www/app.py b/airflow/www/app.py index 21d7d3b19e0ef6..5338dddcc17b52 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -82,6 +82,9 @@ def create_app(config=None, testing=False, app_name="Airflow"): if config: flask_app.config.from_mapping(config) + if 'SQLALCHEMY_ENGINE_OPTIONS' not in flask_app.config: + flask_app.config['SQLALCHEMY_ENGINE_OPTIONS'] = settings.prepare_engine_args() + # Configure the JSON encoder used by `|tojson` filter from Flask flask_app.json_encoder = AirflowJsonEncoder diff --git a/tests/www/test_app.py b/tests/www/test_app.py index 3e46abf82616e4..198a73ed1f8259 100644 --- a/tests/www/test_app.py +++ b/tests/www/test_app.py @@ -19,6 +19,7 @@ import unittest from unittest import mock +import pytest from werkzeug.routing import Rule from werkzeug.test import create_environ from werkzeug.wrappers import Response @@ -195,3 +196,22 @@ def debug_view(): self.assertEqual(b"success", response.get_data()) self.assertEqual(response.status_code, 200) + + @conf_vars({ + ('core', 'sql_alchemy_pool_enabled'): 'True', + ('core', 'sql_alchemy_pool_size'): '3', + ('core', 'sql_alchemy_max_overflow'): '5', + ('core', 'sql_alchemy_pool_recycle'): '120', + ('core', 'sql_alchemy_pool_pre_ping'): 'True', + }) + @mock.patch("airflow.www.app.app", None) + @pytest.mark.backend("mysql", "postgres") + def test_should_set_sqlalchemy_engine_options(self): + app = application.cached_app(testing=True) + engine_params = { + 'pool_size': 3, + 'pool_recycle': 120, + 'pool_pre_ping': True, + 'max_overflow': 5 + } + self.assertEqual(app.config['SQLALCHEMY_ENGINE_OPTIONS'], engine_params)