From 293f7adb9edc6569f12fc4d36e2013bdd4d85597 Mon Sep 17 00:00:00 2001 From: Nils Hamerlinck Date: Mon, 5 Apr 2021 11:17:41 +0700 Subject: [PATCH] [FIX] idle_in_transaction_session_timeout --- queue_job/jobrunner/runner.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index b88a47f62d..1e5dcbe3e8 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -289,8 +289,8 @@ def __init__(self, db_name, lock_name): self.lock_ident = self.name_to_int64(lock_name) connection_info = _connection_info_for(db_name) self.conn = psycopg2.connect(**connection_info) - self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) self.acquired = False + self._keep_alive_cursor = None self.try_acquire() @staticmethod @@ -302,10 +302,22 @@ def name_to_int64(lock_name): def try_acquire(self): self.acquired = self._acquire() + if self.acquired: + # we open a transaction that we will never commit; + # at most every SHARED_LOCK_KEEP_ALIVE seconds we will + # keep it alive with a simple SELECT 1 query; + # if the process crashes or if the connection is cut, + # the pg server will terminate self.conn after + # 2*SHARED_LOCK_KEEP_ALIVE seconds, which will + # free the advisory lock and let another worker take over + self._keep_alive_cursor = self.conn.cursor() + self._keep_alive_cursor.execute( + "SET idle_in_transaction_session_timeout = %s;", + (SHARED_LOCK_KEEP_ALIVE * 1000 * 2,), + ) def _acquire(self): with closing(self.conn.cursor()) as cr: - cr.execute("SET idle_in_transaction_session_timeout = 60000;") # session level lock cr.execute("SELECT pg_try_advisory_lock(%s);", (self.lock_ident,)) acquired = cr.fetchone()[0] @@ -313,8 +325,7 @@ def _acquire(self): def keep_alive(self): query = "SELECT 1" - with closing(self.conn.cursor()) as cr: - cr.execute(query) + self._keep_alive_cursor.execute(query) def close(self): # pylint: disable=except-pass @@ -624,7 +635,7 @@ def run(self): ): last_keep_alive = time.time() # send a keepalive on the shared lock connection at - # most every 60 seconds + # most every SHARED_LOCK_KEEP_ALIVE seconds self.keep_alive_shared_lock() # TODO here, when we have no "db_name", we could list again # the databases and if the list changed, try to acquire a new