diff --git a/lib/que/adapters/active_record_with_lock.rb b/lib/que/adapters/active_record_with_lock.rb index 4ff206f..0f43887 100644 --- a/lib/que/adapters/active_record_with_lock.rb +++ b/lib/que/adapters/active_record_with_lock.rb @@ -16,11 +16,14 @@ def checkout_activerecord_adapter(&block) end def lock_database_connection + if Thread.current[:db_connection] + return Thread.current[:db_connection] if Thread.current[:db_connection].active? + end # We are storing this in thread variable here to make sure # same connection is used to acquire and release the advisory locks. # Advisory lock will not be released if any other connection from the # pool tries to release the lock - Thread.current[:db_connection] ||= @lock_record.connection + Thread.current[:db_connection] = @lock_record.connection end def execute(command, params=[]) @@ -51,7 +54,7 @@ def lock_job_with_lock_database(queue, cursor) def cleanup! @job_connection_pool.release_connection - @lock_record.release_connection + @lock_record.remove_connection end def pg_try_advisory_lock?(job_id) @@ -59,6 +62,11 @@ def pg_try_advisory_lock?(job_id) end def unlock_job(job_id) + # If for any reason the connection that is used to get this advisory lock + # is corrupted, the lock on this job_id would already be released when the + # connection holding the lock goes bad. + # Now, if a new connection tries to release the non existing lock this would just no op + # by returning false and return a warning "WARNING: you don't own a lock of type ExclusiveLock" lock_database_connection.execute("SELECT pg_advisory_unlock(#{job_id})") end end