Skip to content

Commit

Permalink
Pass SQLAlchemy engine options to FAB based UI (apache#11395)
Browse files Browse the repository at this point in the history
Co-authored-by: Tomek Urbaszek <[email protected]>
  • Loading branch information
michalmisiewicz and turbaszek authored Oct 16, 2020
1 parent 0823d46 commit 91484b9
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 21 deletions.
48 changes: 27 additions & 21 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,35 @@ def configure_orm(disable_connection_pool=False):
log.debug("Setting up DB connection pool (PID %s)", os.getpid())
global engine
global Session
engine_args = {}
engine_args = prepare_engine_args(disable_connection_pool)

# Allow the user to specify an encoding for their DB otherwise default
# to utf-8 so jobs & users with non-latin1 characters can still use us.
engine_args['encoding'] = conf.get('core', 'SQL_ENGINE_ENCODING', fallback='utf-8')

if conf.has_option('core', 'sql_alchemy_connect_args'):
connect_args = conf.getimport('core', 'sql_alchemy_connect_args')
else:
connect_args = {}

engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args)
setup_event_handlers(engine)

Session = scoped_session(sessionmaker(
autocommit=False,
autoflush=False,
bind=engine,
expire_on_commit=False,
))


def prepare_engine_args(disable_connection_pool=False):
"""Prepare SQLAlchemy engine args"""
engine_args = {}
pool_connections = conf.getboolean('core', 'SQL_ALCHEMY_POOL_ENABLED')
if disable_connection_pool or not pool_connections:
engine_args['poolclass'] = NullPool
log.debug("settings.configure_orm(): Using NullPool")
log.debug("settings.prepare_engine_args(): Using NullPool")
elif 'sqlite' not in SQL_ALCHEMY_CONN:
# Pool size engine args not supported by sqlite.
# If no config value is defined for the pool size, select a reasonable value.
Expand Down Expand Up @@ -195,30 +218,13 @@ def configure_orm(disable_connection_pool=False):
# https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic
pool_pre_ping = conf.getboolean('core', 'SQL_ALCHEMY_POOL_PRE_PING', fallback=True)

log.debug("settings.configure_orm(): Using pool settings. pool_size=%d, max_overflow=%d, "
log.debug("settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, "
"pool_recycle=%d, pid=%d", pool_size, max_overflow, pool_recycle, os.getpid())
engine_args['pool_size'] = pool_size
engine_args['pool_recycle'] = pool_recycle
engine_args['pool_pre_ping'] = pool_pre_ping
engine_args['max_overflow'] = max_overflow

# Allow the user to specify an encoding for their DB otherwise default
# to utf-8 so jobs & users with non-latin1 characters can still use us.
engine_args['encoding'] = conf.get('core', 'SQL_ENGINE_ENCODING', fallback='utf-8')

if conf.has_option('core', 'sql_alchemy_connect_args'):
connect_args = conf.getimport('core', 'sql_alchemy_connect_args')
else:
connect_args = {}

engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args)
setup_event_handlers(engine)

Session = scoped_session(
sessionmaker(autocommit=False,
autoflush=False,
bind=engine,
expire_on_commit=False))
return engine_args


def dispose_orm():
Expand Down
3 changes: 3 additions & 0 deletions airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ def create_app(config=None, testing=False, app_name="Airflow"):
if config:
flask_app.config.from_mapping(config)

if 'SQLALCHEMY_ENGINE_OPTIONS' not in flask_app.config:
flask_app.config['SQLALCHEMY_ENGINE_OPTIONS'] = settings.prepare_engine_args()

# Configure the JSON encoder used by `|tojson` filter from Flask
flask_app.json_encoder = AirflowJsonEncoder

Expand Down
20 changes: 20 additions & 0 deletions tests/www/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import unittest
from unittest import mock

import pytest
from werkzeug.routing import Rule
from werkzeug.test import create_environ
from werkzeug.wrappers import Response
Expand Down Expand Up @@ -195,3 +196,22 @@ def debug_view():

self.assertEqual(b"success", response.get_data())
self.assertEqual(response.status_code, 200)

@conf_vars({
('core', 'sql_alchemy_pool_enabled'): 'True',
('core', 'sql_alchemy_pool_size'): '3',
('core', 'sql_alchemy_max_overflow'): '5',
('core', 'sql_alchemy_pool_recycle'): '120',
('core', 'sql_alchemy_pool_pre_ping'): 'True',
})
@mock.patch("airflow.www.app.app", None)
@pytest.mark.backend("mysql", "postgres")
def test_should_set_sqlalchemy_engine_options(self):
app = application.cached_app(testing=True)
engine_params = {
'pool_size': 3,
'pool_recycle': 120,
'pool_pre_ping': True,
'max_overflow': 5
}
self.assertEqual(app.config['SQLALCHEMY_ENGINE_OPTIONS'], engine_params)

0 comments on commit 91484b9

Please sign in to comment.