From 8f63d25b54b9b3a42254c0a61683ef93bab31b26 Mon Sep 17 00:00:00 2001 From: Igor Khrol Date: Fri, 6 Sep 2019 17:24:18 +0300 Subject: [PATCH] [AIRFLOW-5343] Remove legacy way of pessimistic disconnect handling Based on discussions in https://github.com/apache/airflow/pull/5949 it was figured out that there is already pessimistic disconnect timeout handling. So instead of hand-written one only SQLAlchemy embedded way should be used. 'sqlalchemy~=1.3' is in `setup.py` requirements and `pool_pre_ping` appeared in SQLAlchemy 1.2. --- airflow/config_templates/default_airflow.cfg | 6 +- airflow/settings.py | 5 +- airflow/utils/sqlalchemy.py | 72 +------------------- 3 files changed, 5 insertions(+), 78 deletions(-) diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 22b55c3ccf77c9..14995e7a3df0a2 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -125,11 +125,7 @@ sql_alchemy_pool_recycle = 1800 # Check connection at the start of each connection pool checkout. # Typically, this is a simple statement like “SELECT 1”. # More information here: https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic -sql_alchemy_pool_pre_ping = False - -# How many seconds to retry re-establishing a DB connection after -# disconnects. Setting this to 0 disables retries. -sql_alchemy_reconnect_timeout = 300 +sql_alchemy_pool_pre_ping = True # The schema to use for the metadata database # SqlAlchemy supports databases with the concept of multiple schemas. diff --git a/airflow/settings.py b/airflow/settings.py index aba548af0c77a4..ba6f4abd035d6b 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -169,7 +169,7 @@ def configure_orm(disable_connection_pool=False): # of some DBAPI-specific method to test the connection for liveness. # More information here: # https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic - pool_pre_ping = conf.getboolean('core', 'SQL_ALCHEMY_POOL_PRE_PING', fallback=False) + pool_pre_ping = conf.getboolean('core', 'SQL_ALCHEMY_POOL_PRE_PING', fallback=True) log.info("settings.configure_orm(): Using pool settings. pool_size={}, max_overflow={}, " "pool_recycle={}, pid={}".format(pool_size, max_overflow, pool_recycle, os.getpid())) @@ -186,8 +186,7 @@ def configure_orm(disable_connection_pool=False): engine_args['encoding'] = engine_args['encoding'].__str__() engine = create_engine(SQL_ALCHEMY_CONN, **engine_args) - reconnect_timeout = conf.getint('core', 'SQL_ALCHEMY_RECONNECT_TIMEOUT') - setup_event_handlers(engine, reconnect_timeout) + setup_event_handlers(engine) Session = scoped_session( sessionmaker(autocommit=False, diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py index 55e018342191af..423cd283c39b27 100644 --- a/airflow/utils/sqlalchemy.py +++ b/airflow/utils/sqlalchemy.py @@ -21,11 +21,9 @@ import os import json import pendulum -import time -import random from dateutil import relativedelta -from sqlalchemy import event, exc, select +from sqlalchemy import event, exc from sqlalchemy.types import Text, DateTime, TypeDecorator from airflow.utils.log.logging_mixin import LoggingMixin @@ -34,73 +32,7 @@ utc = pendulum.timezone('UTC') -def setup_event_handlers(engine, - reconnect_timeout_seconds, - initial_backoff_seconds=0.2, - max_backoff_seconds=120): - @event.listens_for(engine, "engine_connect") - def ping_connection(connection, branch): # pylint: disable=unused-variable - """ - Pessimistic SQLAlchemy disconnect handling. Ensures that each - connection returned from the pool is properly connected to the database. - - http://docs.sqlalchemy.org/en/rel_1_1/core/pooling.html#disconnect-handling-pessimistic - """ - if branch: - # "branch" refers to a sub-connection of a connection, - # we don't want to bother pinging on these. - return - - start = time.time() - backoff = initial_backoff_seconds - - # turn off "close with result". This flag is only used with - # "connectionless" execution, otherwise will be False in any case - save_should_close_with_result = connection.should_close_with_result - - while True: - connection.should_close_with_result = False - - try: - connection.scalar(select([1])) - # If we made it here then the connection appears to be healthy - break - except exc.DBAPIError as err: - if time.time() - start >= reconnect_timeout_seconds: - log.error( - "Failed to re-establish DB connection within %s secs: %s", - reconnect_timeout_seconds, - err) - raise - if err.connection_invalidated: - # Don't log the first time -- this happens a lot and unless - # there is a problem reconnecting is not a sign of a - # problem - if backoff > initial_backoff_seconds: - log.warning("DB connection invalidated. Reconnecting...") - else: - log.debug("DB connection invalidated. Initial reconnect") - - # Use a truncated binary exponential backoff. Also includes - # a jitter to prevent the thundering herd problem of - # simultaneous client reconnects - backoff += backoff * random.random() - time.sleep(min(backoff, max_backoff_seconds)) - - # run the same SELECT again - the connection will re-validate - # itself and establish a new connection. The disconnect detection - # here also causes the whole connection pool to be invalidated - # so that all stale connections are discarded. - continue - else: - log.error( - "Unknown database connection error. Not retrying: %s", - err) - raise - finally: - # restore "close with result" - connection.should_close_with_result = save_should_close_with_result - +def setup_event_handlers(engine): @event.listens_for(engine, "connect") def connect(dbapi_connection, connection_record): # pylint: disable=unused-variable connection_record.info['pid'] = os.getpid()