Skip to content

Commit

Permalink
Support multi-nodes with lock on jobrunner
Browse files Browse the repository at this point in the history
Starting several odoo (main) processes with "--load=web,queue_job"
was unsupported, as it would start several jobrunner, which would all
listen to postgresql notifications and try to enqueue jobs in concurrent
workers.

This is an issue in several cases:

* it causes issues on odoo.sh that uses an hybrid model for workers
and starts several jobrunners [0]
* it defeats any setup that would use several nodes to keep the service
available in case of failure of a node/host

The solution implemented here is using a PostgreSQL advisory lock,
at session level, to lock a database on a specific jobrunner.

At loading, the jobrunner tries to acquire the lock. If it can, it
initializes the connection and listen for jobs. Every 30 seconds,
it tries to acquire again all the databases that it couldn't acquire,
in case a concurrent node is stopped.
If all the databases have been acquired by a concurrent node, it will do
nothing but wait and retry every 30 seconds.

Example when a jobrunner is started for databases queue1 and queue2 and
another node already works on queue2:

    INFO ? odoo.addons.queue_job.jobrunner.runner: starting
    INFO ? odoo.addons.queue_job.jobrunner.runner: initializing database connections
    INFO ? odoo.addons.queue_job.jobrunner.runner: queue job runner ready for db queue1
    DEBUG ? odoo.addons.queue_job.jobrunner.runner: queue job runner already started on another node for db queue2
    INFO ? odoo.addons.queue_job.jobrunner.runner: database connections ready for: queue1

Important: new databases need a restart of the jobrunner. This was
already the case, and would be a great improvement, but is out of
scope for this improvement.

[0] OCA#169 (comment)
  • Loading branch information
guewen committed Sep 30, 2020
1 parent 3b49e06 commit fdad134
Showing 1 changed file with 77 additions and 21 deletions.
98 changes: 77 additions & 21 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
...INFO...queue_job.jobrunner.runner: starting
...INFO...queue_job.jobrunner.runner: initializing database connections
...INFO...queue_job.jobrunner.runner: queue job runner ready for db <dbname>
...INFO...queue_job.jobrunner.runner: database connections ready
...INFO...queue_job.jobrunner.runner: database connections ready for: <dbname>
* Create jobs (eg using base_import_async) and observe they
start immediately and in parallel.
Expand Down Expand Up @@ -152,6 +152,7 @@
from .channels import ENQUEUED, NOT_DONE, PENDING, ChannelManager

SELECT_TIMEOUT = 60
TRY_ACQUIRE_INTERVAL = 30 # seconds
ERROR_RECOVERY_DELAY = 5

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -269,15 +270,31 @@ def urlopen():


class Database(object):

ADVISORY_LOCK_IDENT = 8373372352660922455

def __init__(self, db_name):
self.db_name = db_name
connection_info = _connection_info_for(db_name)
self.conn = psycopg2.connect(**connection_info)
self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.has_queue_job = self._has_queue_job()
self.acquired = False
if self.has_queue_job:
self.try_acquire()

def try_acquire(self):
if self._acquire():
self.acquired = True
self._initialize()

def _acquire(self):
with closing(self.conn.cursor()) as cr:
# session level lock
cr.execute("SELECT pg_try_advisory_lock(%s);", (self.ADVISORY_LOCK_IDENT,))
acquired = cr.fetchone()[0]
return acquired

def close(self):
# pylint: disable=except-pass
# if close fail for any reason, it's either because it's already closed
Expand Down Expand Up @@ -351,7 +368,10 @@ def __init__(
if channel_config_string is None:
channel_config_string = _channels()
self.channel_manager.simple_configure(channel_config_string)
self.db_by_name = {}
# TODO: how to detect new databases or databases
# on which queue_job is installed after server start?
self.database_names = self.get_db_names()
self.started_db_by_name = {}
self._stop = False
self._stop_pipe = os.pipe()

Expand Down Expand Up @@ -393,34 +413,45 @@ def get_db_names(self):
return db_names

def close_databases(self, remove_jobs=True):
for db_name, db in self.db_by_name.items():
for db_name, db in self.started_db_by_name.items():
try:
if remove_jobs:
self.channel_manager.remove_db(db_name)
db.close()
except Exception:
_logger.warning("error closing database %s", db_name, exc_info=True)
self.db_by_name = {}
self.started_db_by_name = {}

def initialize_databases(self):
for db_name in self.get_db_names():
for db_name in self.database_names:
if db_name in self.started_db_by_name:
continue
db = Database(db_name)
if not db.has_queue_job:
_logger.debug("queue_job is not installed for db %s", db_name)
elif not db.acquired:
_logger.debug(
"queue job runner already started on another node for db %s",
db_name,
)
else:
self.db_by_name[db_name] = db
with db.select_jobs("state in %s", (NOT_DONE,)) as cr:
for job_data in cr:
self.channel_manager.notify(db_name, *job_data)
_logger.info("queue job runner ready for db %s", db_name)
self.started_db_by_name[db_name] = db
self._start_database(db_name)

def _start_database(self, db_name):
db = self.started_db_by_name[db_name]
with db.select_jobs("state in %s", (NOT_DONE,)) as cr:
for job_data in cr:
self.channel_manager.notify(db_name, *job_data)
_logger.info("queue job runner ready for db %s", db_name)

def run_jobs(self):
now = _odoo_now()
for job in self.channel_manager.get_jobs_to_run(now):
if self._stop:
break
_logger.info("asking Odoo to run job %s on db %s", job.uuid, job.db_name)
self.db_by_name[job.db_name].set_job_enqueued(job.uuid)
self.started_db_by_name[job.db_name].set_job_enqueued(job.uuid)
_async_http_get(
self.scheme,
self.host,
Expand All @@ -432,7 +463,7 @@ def run_jobs(self):
)

def process_notifications(self):
for db in self.db_by_name.values():
for db in self.started_db_by_name.values():
while db.conn.notifies:
if self._stop:
break
Expand All @@ -446,13 +477,13 @@ def process_notifications(self):
self.channel_manager.remove_job(uuid)

def wait_notification(self):
for db in self.db_by_name.values():
for db in self.started_db_by_name.values():
if db.conn.notifies:
# something is going on in the queue, no need to wait
return
# wait for something to happen in the queue_job tables
# we'll select() on database connections and the stop pipe
conns = [db.conn for db in self.db_by_name.values()]
conns = [db.conn for db in self.started_db_by_name.values()]
conns.append(self._stop_pipe[0])
# look if the channels specify a wakeup time
wakeup_time = self.channel_manager.get_wakeup_time()
Expand Down Expand Up @@ -488,15 +519,40 @@ def run(self):
# outer loop does exception recovery
try:
_logger.info("initializing database connections")
# TODO: how to detect new databases or databases
# on which queue_job is installed after server start?
self.initialize_databases()
_logger.info("database connections ready")
started_dbs = set(self.started_db_by_name.keys())
# inner loop does the normal processing
last_check = None
while not self._stop:
self.process_notifications()
self.run_jobs()
self.wait_notification()

# After jobs have been executed or after max. the select
# timeout (max 60s), databases which are not handled by the
# current runner are retried for acquiring a lock, in case
# another node has stopped meanwhile.
# When we run jobs, the loop restarts, so we could try
# initialization of database every second, if we don't measure how
# much time passed.
if (
not last_check
or time.time() >= last_check + TRY_ACQUIRE_INTERVAL
):
last_check = time.time()
self.initialize_databases()
if set(self.started_db_by_name.keys()) != started_dbs:
_logger.info(
"database connections ready for: {}".format(
", ".join(self.started_db_by_name.keys())
)
)
elif not self.started_db_by_name:
_logger.info("no database to listen")
if self.started_db_by_name:
self.process_notifications()
self.run_jobs()
self.wait_notification()
else:
# no database to work with... retry later in case a concurrent
# node is stopped
time.sleep(TRY_ACQUIRE_INTERVAL)
except KeyboardInterrupt:
self.stop()
except Exception as e:
Expand Down

0 comments on commit fdad134

Please sign in to comment.