Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-5343] Remove legacy way of pessimistic disconnect handling #6034

Merged
merged 1 commit into from
Sep 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,7 @@ sql_alchemy_pool_recycle = 1800
# Check connection at the start of each connection pool checkout.
# Typically, this is a simple statement like “SELECT 1”.
# More information here: https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic
sql_alchemy_pool_pre_ping = False

# How many seconds to retry re-establishing a DB connection after
# disconnects. Setting this to 0 disables retries.
sql_alchemy_reconnect_timeout = 300
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens now with disconnects/errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If connection from the pool is not healthy, it's removed from the pool and new one is created (with three attempts). Other connections are invalidated from the pool as well.

sqlalchemy/sqlalchemy@f881dae#diff-31816cdb15e64b0af1b862f51abe1226R920 - the test visualizes this.

This commit from SQLAlchemy is quite a good explanation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pool_pre_ping argument should take care of it and the code for re connect is within sql alchemy library. We don't have to handle explicitly with new sql alchemy version

sql_alchemy_pool_pre_ping = True

# The schema to use for the metadata database
# SqlAlchemy supports databases with the concept of multiple schemas.
Expand Down
5 changes: 2 additions & 3 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def configure_orm(disable_connection_pool=False):
# of some DBAPI-specific method to test the connection for liveness.
# More information here:
# https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic
pool_pre_ping = conf.getboolean('core', 'SQL_ALCHEMY_POOL_PRE_PING', fallback=False)
pool_pre_ping = conf.getboolean('core', 'SQL_ALCHEMY_POOL_PRE_PING', fallback=True)

log.info("settings.configure_orm(): Using pool settings. pool_size={}, max_overflow={}, "
"pool_recycle={}, pid={}".format(pool_size, max_overflow, pool_recycle, os.getpid()))
Expand All @@ -186,8 +186,7 @@ def configure_orm(disable_connection_pool=False):
engine_args['encoding'] = engine_args['encoding'].__str__()

engine = create_engine(SQL_ALCHEMY_CONN, **engine_args)
reconnect_timeout = conf.getint('core', 'SQL_ALCHEMY_RECONNECT_TIMEOUT')
setup_event_handlers(engine, reconnect_timeout)
setup_event_handlers(engine)

Session = scoped_session(
sessionmaker(autocommit=False,
Expand Down
72 changes: 2 additions & 70 deletions airflow/utils/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@
import os
import json
import pendulum
import time
import random

from dateutil import relativedelta
from sqlalchemy import event, exc, select
from sqlalchemy import event, exc
from sqlalchemy.types import Text, DateTime, TypeDecorator

from airflow.utils.log.logging_mixin import LoggingMixin
Expand All @@ -34,73 +32,7 @@
utc = pendulum.timezone('UTC')


def setup_event_handlers(engine,
reconnect_timeout_seconds,
initial_backoff_seconds=0.2,
max_backoff_seconds=120):
@event.listens_for(engine, "engine_connect")
def ping_connection(connection, branch): # pylint: disable=unused-variable
"""
Pessimistic SQLAlchemy disconnect handling. Ensures that each
connection returned from the pool is properly connected to the database.

http://docs.sqlalchemy.org/en/rel_1_1/core/pooling.html#disconnect-handling-pessimistic
"""
if branch:
# "branch" refers to a sub-connection of a connection,
# we don't want to bother pinging on these.
return

start = time.time()
backoff = initial_backoff_seconds

# turn off "close with result". This flag is only used with
# "connectionless" execution, otherwise will be False in any case
save_should_close_with_result = connection.should_close_with_result

while True:
connection.should_close_with_result = False

try:
connection.scalar(select([1]))
# If we made it here then the connection appears to be healthy
break
except exc.DBAPIError as err:
if time.time() - start >= reconnect_timeout_seconds:
log.error(
"Failed to re-establish DB connection within %s secs: %s",
reconnect_timeout_seconds,
err)
raise
if err.connection_invalidated:
# Don't log the first time -- this happens a lot and unless
# there is a problem reconnecting is not a sign of a
# problem
if backoff > initial_backoff_seconds:
log.warning("DB connection invalidated. Reconnecting...")
else:
log.debug("DB connection invalidated. Initial reconnect")

# Use a truncated binary exponential backoff. Also includes
# a jitter to prevent the thundering herd problem of
# simultaneous client reconnects
backoff += backoff * random.random()
time.sleep(min(backoff, max_backoff_seconds))

# run the same SELECT again - the connection will re-validate
# itself and establish a new connection. The disconnect detection
# here also causes the whole connection pool to be invalidated
# so that all stale connections are discarded.
continue
else:
log.error(
"Unknown database connection error. Not retrying: %s",
err)
raise
finally:
# restore "close with result"
connection.should_close_with_result = save_should_close_with_result

def setup_event_handlers(engine):
@event.listens_for(engine, "connect")
def connect(dbapi_connection, connection_record): # pylint: disable=unused-variable
connection_record.info['pid'] = os.getpid()
Expand Down