Skip to content

Commit

Permalink
[FIX] idle_in_transaction_session_timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
nilshamerlinck committed Apr 5, 2021
1 parent 4137c87 commit 293f7ad
Showing 1 changed file with 16 additions and 5 deletions.
21 changes: 16 additions & 5 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,8 @@ def __init__(self, db_name, lock_name):
self.lock_ident = self.name_to_int64(lock_name)
connection_info = _connection_info_for(db_name)
self.conn = psycopg2.connect(**connection_info)
self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.acquired = False
self._keep_alive_cursor = None
self.try_acquire()

@staticmethod
Expand All @@ -302,19 +302,30 @@ def name_to_int64(lock_name):

def try_acquire(self):
self.acquired = self._acquire()
if self.acquired:
# we open a transaction that we will never commit;
# at most every SHARED_LOCK_KEEP_ALIVE seconds we will
# keep it alive with a simple SELECT 1 query;
# if the process crashes or if the connection is cut,
# the pg server will terminate self.conn after
# 2*SHARED_LOCK_KEEP_ALIVE seconds, which will
# free the advisory lock and let another worker take over
self._keep_alive_cursor = self.conn.cursor()
self._keep_alive_cursor.execute(
"SET idle_in_transaction_session_timeout = %s;",
(SHARED_LOCK_KEEP_ALIVE * 1000 * 2,),

This comment has been minimized.

Copy link
@guewen

guewen Apr 6, 2021

💯

)

def _acquire(self):
with closing(self.conn.cursor()) as cr:
cr.execute("SET idle_in_transaction_session_timeout = 60000;")
# session level lock
cr.execute("SELECT pg_try_advisory_lock(%s);", (self.lock_ident,))
acquired = cr.fetchone()[0]
return acquired

def keep_alive(self):
query = "SELECT 1"
with closing(self.conn.cursor()) as cr:
cr.execute(query)
self._keep_alive_cursor.execute(query)

def close(self):
# pylint: disable=except-pass
Expand Down Expand Up @@ -624,7 +635,7 @@ def run(self):
):
last_keep_alive = time.time()
# send a keepalive on the shared lock connection at
# most every 60 seconds
# most every SHARED_LOCK_KEEP_ALIVE seconds
self.keep_alive_shared_lock()
# TODO here, when we have no "db_name", we could list again
# the databases and if the list changed, try to acquire a new
Expand Down

0 comments on commit 293f7ad

Please sign in to comment.