Skip to content

Commit

Permalink
add lock databse
Browse files Browse the repository at this point in the history
  • Loading branch information
ankithads committed Jun 13, 2024
1 parent 0ea7359 commit fcaaed1
Show file tree
Hide file tree
Showing 7 changed files with 367 additions and 6 deletions.
8 changes: 8 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,19 @@ jobs:
PGUSER: ubuntu
PGPASSWORD: password
PGHOST: localhost
LOCK_PGDATABASE: lock-test
LOCK_PGUSER: ubuntu
LOCK_PGPASSWORD: password
- image: postgres:11.2
environment:
POSTGRES_DB: que-test
POSTGRES_USER: ubuntu
POSTGRES_PASSWORD: password
- image: postgres:11.2
environment:
POSTGRES_DB: lock-test
POSTGRES_USER: ubuntu
POSTGRES_PASSWORD: password
steps:
- add_ssh_keys
- checkout
Expand Down
6 changes: 6 additions & 0 deletions bin/que
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ OptionParser.new do |opts|
$stdout.puts opts
exit 0
end

opts.on("--using-lock-database [USING_LOCK_DATABSE]", FalseClass, "sets if we want to use seperate database for locking") do |using_lock_database|
options.using_lock_database = using_lock_database
end
end.parse!(ARGV)
# rubocop:enable Metrics/LineLength

Expand All @@ -93,6 +97,7 @@ cursor_expiry = options.cursor_expiry || wake_interval
worker_count = options.worker_count || 1
timeout = options.timeout
secondary_queues = options.secondary_queues || []
using_lock_database = options.using_lock_database || false

Que.logger ||= Logger.new(STDOUT)

Expand All @@ -118,6 +123,7 @@ worker_group = Que::WorkerGroup.start(
lock_window: options.lock_window,
lock_budget: options.lock_budget,
secondary_queues: secondary_queues,
using_lock_database: using_lock_database,
)

if options.metrics_port
Expand Down
39 changes: 36 additions & 3 deletions lib/que/locker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,18 @@ class Locker
),
].freeze

def initialize(queue:, cursor_expiry:, window: nil, budget: nil, secondary_queues: [])
def initialize(queue:, cursor_expiry:, window: nil, budget: nil, secondary_queues: [], using_lock_database: false)
@queue = queue
@cursor_expiry = cursor_expiry
@queue_cursors = {}
@queue_expires_at = {}
@secondary_queues = secondary_queues
@consolidated_queues = Array.wrap(queue).concat(secondary_queues)
@using_lock_database = using_lock_database

if @using_lock_database
@lock_database_connection = LockDataBaseRecord.connection
end

# Create a bucket that has 100% capacity, so even when we don't apply a limit we
# have a valid bucket that we can use everywhere
Expand Down Expand Up @@ -121,7 +126,11 @@ def with_locked_job
ensure
if job
observe(UnlockTotal, UnlockSecondsTotal, worked_queue: job[:queue]) do
Que.execute("SELECT pg_advisory_unlock($1)", [job[:job_id]])
if @using_lock_database
@lock_database_connection.execute("SELECT pg_advisory_unlock(#{job["job_id"]})")
else
Que.execute("SELECT pg_advisory_unlock($1)", [job[:job_id]])
end
end
end
end
Expand Down Expand Up @@ -149,7 +158,31 @@ def exists?(job)
end

def lock_job_query(queue, cursor)
Que.execute(:lock_job, [queue, cursor]).first
if @using_lock_database
lock_job_with_lock_database(queue, cursor)
else
Que.execute(:lock_job, [queue, cursor]).first
end
end

def lock_job_with_lock_database(queue, cursor)
query = QueJob.select(:job_id, :queue, :priority, :run_at, :job_class, :retryable, :args, :error_count)
.select("extract(epoch from (now() - run_at)) as latency")
.where("queue = ? AND job_id >= ? AND run_at <= ?", queue, cursor, Time.now)
.where(retryable: true)
.order(:priority, :run_at, :job_id)
.limit(1).to_sql
result = Que.execute(query).first
return result if result.nil?

return result if locked?(result['job_id'])

# continue the recursion to fetch the next available job
lock_job_with_lock_database(queue, result['job_id'])
end

def locked?(job_id)
@lock_database_connection.execute("SELECT pg_try_advisory_lock(#{job_id})").first["pg_try_advisory_lock"]
end

def handle_expired_cursors!
Expand Down
4 changes: 3 additions & 1 deletion lib/que/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ def initialize(
lock_cursor_expiry: DEFAULT_WAKE_INTERVAL,
lock_window: nil,
lock_budget: nil,
secondary_queues: []
secondary_queues: [],
using_lock_database: false
)
@queue = queue
@wake_interval = wake_interval
Expand All @@ -135,6 +136,7 @@ def initialize(
window: lock_window,
budget: lock_budget,
secondary_queues: secondary_queues,
using_lock_database: using_lock_database,
)
end

Expand Down
154 changes: 154 additions & 0 deletions spec/integration/integration_with_lock_database_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# frozen_string_literal: true

require "spec_helper"
require "que/worker" # required to prevent autoload races

RSpec.describe "multiple workers" do
def with_workers(num, stop_timeout: 5, secondary_queues: [], &block)
Que::WorkerGroup.start(
num,
wake_interval: 0.01,
secondary_queues: secondary_queues,
using_lock_database: true,
).tap(&block).stop(stop_timeout)
end

# Wait for a maximum of [timeout] seconds for all jobs to be worked
def wait_for_jobs_to_be_worked(timeout: 10)
start = Time.now
loop do
break if QueJob.count == 0 || Time.now - start > timeout

sleep 0.1
end
end

context "with one worker and many jobs" do
it "works each job exactly once" do
QueJob.delete_all
10.times.each { |i| FakeJob.enqueue(i) }

expect(QueJob.count).to eq(10)

with_workers(1) { wait_for_jobs_to_be_worked }

expect(QueJob.count).to eq(0)
expect(FakeJob.log.count).to eq(10)
end
end

context "with a job on a non default queue" do
context "with exclusive workers" do
it "does not work the job on the non-default queue" do
FakeJob.enqueue(1, queue: "default")
FakeJob.enqueue(2, queue: "non-default")

expect(QueJob.count).to eq(2)

with_workers(1) { wait_for_jobs_to_be_worked(timeout: 1) }

expect(QueJob.count).to eq(1)
expect(FakeJob.log.count).to eq(1)
end
end

context "with permissive workers" do
it "works each job exactly once" do
FakeJob.enqueue(1, queue: "default")
FakeJob.enqueue(2, queue: "non-default")

expect(QueJob.count).to eq(2)

with_workers(1, secondary_queues: ["non-default"]) do
wait_for_jobs_to_be_worked(timeout: 1)
end

expect(QueJob.count).to eq(0)
expect(FakeJob.log.count).to eq(2)
end

it "works jobs for defined secondary_queues only" do
FakeJob.enqueue(1, queue: "default")
FakeJob.enqueue(2, queue: "non-default")
FakeJob.enqueue(3, queue: "not-worked")

expect(QueJob.count).to eq(3)

with_workers(1, secondary_queues: ["non-default"]) do
wait_for_jobs_to_be_worked(timeout: 1)
end

expect(QueJob.count).to eq(1)
expect(FakeJob.log.count).to eq(2)
end
end
end

context "with multiple workers contending over the same job" do
it "works that job exactly once" do
FakeJob.enqueue(1)

expect(QueJob.count).to eq(1)

with_workers(5) { wait_for_jobs_to_be_worked }

expect(QueJob.count).to eq(0)
expect(FakeJob.log.count).to eq(1)
end
end

context "with multiple jobs" do
around do |example|
ActiveRecord::Base.connection.execute(
"CREATE TABLE IF NOT EXISTS users ( name text )",
)
User.delete_all
example.run
ActiveRecord::Base.connection.execute("DROP TABLE users")
end

it "works them all exactly once" do
CreateUser.enqueue("alice")
CreateUser.enqueue("bob")
CreateUser.enqueue("charlie")

expect(QueJob.count).to eq(3)

with_workers(5) { wait_for_jobs_to_be_worked }

expect(QueJob.count).to eq(0)
expect(User.count).to eq(3)
expect(User.all.map(&:name).sort).to eq(%w[alice bob charlie])
end
end

context "with jobs that exceed stop timeout" do
it "raises Que::JobTimeoutError" do
SleepJob.enqueue(5) # sleep 5s

# Sleep to let the worker pick-up the SleepJob, then stop the worker with an
# aggressive timeout. This should cause JobTimeout to be raised in the worker
# thread.
with_workers(1, stop_timeout: 0.01) { sleep 0.1 }

sleep_job = QueJob.last

expect(sleep_job).to_not be_nil
expect(sleep_job.last_error).to match(/Job exceeded timeout when requested to stop/)
end

context "but is interruptible" do
it "terminates gracefully" do
# Sleep for 0.2s before checking if it should continue
InterruptibleSleepJob.enqueue(0.2)

# Sleep 0.1s to let the worker pick-up the SleepJob, then stop the worker with a
# a long enough timeout to let an iteration of sleep complete.
with_workers(1, stop_timeout: 0.3) { sleep 0.1 }

expect(QueJob.count).to eq(0)
expect(InterruptibleSleepJob.log.count).to eq(1)
end
end
end
end
Loading

0 comments on commit fcaaed1

Please sign in to comment.