Skip to content

Commit

Permalink
add yugabyte adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
ankithads committed Aug 8, 2024
1 parent 78f109b commit d738345
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 35 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,6 @@ jobs:
ruby-version: "${{ matrix.ruby-version }}"
- name: Run specs
run: |
bundle exec rspec
bundle exec rspec
- name: Run Specs With ActiveRecordWithLock Adapter
run: ADAPTER="ActiveRecordWithLock" bundle exec rspec
1 change: 1 addition & 0 deletions lib/que.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def connection=(connection)
Adapters::ActiveRecord.new
else
case connection.class.to_s
when "Que::Adapters::ActiveRecordWithLock" then connection
when "Sequel::Postgres::Database" then Adapters::Sequel.new(connection)
when "ConnectionPool" then Adapters::ConnectionPool.new(connection)
when "PG::Connection" then Adapters::PG.new(connection)
Expand Down
68 changes: 68 additions & 0 deletions lib/que/adapters/active_record_with_lock.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# frozen_string_literal: true

module Que
module Adapters
class ActiveRecordWithLock < Que::Adapters::ActiveRecord
def initialize(job_connection_pool:, lock_connection_pool:)
@job_connection_pool = job_connection_pool
@lock_connection_pool = lock_connection_pool
super
end

def checkout_activerecord_adapter(&block)
checkout_lock_database_connection do
@job_connection_pool.with_connection(&block)
end
end

def checkout_lock_database_connection(&block)
@lock_connection_pool.with_connection(&block)
end

def execute(command, params = [])
case command
when :lock_job
queue, cursor = params
lock_job_with_lock_database(queue, cursor)
when :unlock_job
job_id = params[0]
unlock_job(job_id)
else
super
end
end

def lock_job_with_lock_database(queue, cursor)
result = []
loop do
result = Que.execute(:find_job_to_lock, [queue, cursor])

break if result.empty?

cursor = result.first["job_id"]
break if pg_try_advisory_lock?(cursor)
end
result
end

def pg_try_advisory_lock?(job_id)
checkout_lock_database_connection do |conn|
conn.execute(
"SELECT pg_try_advisory_lock(#{job_id})",
).try(:first)&.fetch("pg_try_advisory_lock")
end
end

def unlock_job(job_id)
# If for any reason the connection that is used to get this advisory lock
# is corrupted, the lock on this job_id would already be released when the
# connection holding the lock goes bad.
# Now, if a new connection tries to release the non existing lock this would just no op
# by returning false and return a warning "WARNING: you don't own a lock of type ExclusiveLock"
checkout_lock_database_connection do |conn|
conn.execute("SELECT pg_advisory_unlock(#{job_id})")
end
end
end
end
end
1 change: 1 addition & 0 deletions lib/que/adapters/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Adapters
autoload :PG, "que/adapters/pg"
autoload :Pond, "que/adapters/pond"
autoload :Sequel, "que/adapters/sequel"
autoload :ActiveRecordWithLock, "que/adapters/active_record_with_lock"

class UnavailableConnection < StandardError; end

Expand Down
9 changes: 3 additions & 6 deletions lib/que/locker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def initialize(queue:, cursor_expiry:, window: nil, budget: nil, secondary_queue
@queue_expires_at = {}
@secondary_queues = secondary_queues
@consolidated_queues = Array.wrap(queue).concat(secondary_queues)

# Create a bucket that has 100% capacity, so even when we don't apply a limit we
# have a valid bucket that we can use everywhere
@leaky_bucket = LeakyBucket.new(window: window || 1.0, budget: budget || 1.0)
Expand Down Expand Up @@ -121,7 +120,7 @@ def with_locked_job
ensure
if job
observe(UnlockTotal, UnlockSecondsTotal, worked_queue: job[:queue]) do
Que.execute("SELECT pg_advisory_unlock($1)", [job[:job_id]])
Que.execute(:unlock_job, [job[:job_id]])
end
end
end
Expand Down Expand Up @@ -154,10 +153,8 @@ def lock_job_query(queue, cursor)

def handle_expired_cursors!
@consolidated_queues.each do |queue|
queue_cursor_expires_at = @queue_expires_at.fetch(queue, monotonic_now)


reset_cursor_for!(queue) if queue_cursor_expires_at <= monotonic_now
queue_cursor_expires_at = @queue_expires_at.fetch(queue, monotonic_now)
reset_cursor_for!(queue) if queue_cursor_expires_at < monotonic_now
end
end

Expand Down
24 changes: 24 additions & 0 deletions lib/que/sql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,30 @@ module Que
WHERE locktype = 'advisory'
) pg USING (job_id)
},

unlock_job: %{
SELECT pg_advisory_unlock($1)
},

find_job_to_lock: %{
SELECT
queue,
priority,
run_at,
job_id,
job_class,
retryable,
args,
error_count,
extract(epoch from (now() - run_at)) as latency
FROM que_jobs
WHERE queue = $1::text
AND run_at <= now()
AND retryable = true
AND job_id >= $2
ORDER BY priority, run_at, job_id
LIMIT 1
},
}
# rubocop:enable Style/MutableConstant
end
30 changes: 30 additions & 0 deletions spec/active_record_with_lock_spec_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# frozen_string_literal: true

class LockDatabaseRecord < ActiveRecord::Base
establish_connection(
adapter: "postgresql",
host: ENV.fetch("LOCK_PGHOST", "localhost"),
user: ENV.fetch("LOCK_PGUSER", "postgres"),
password: ENV.fetch("LOCK_PGPASSWORD", "password"),
database: ENV.fetch("LOCK_PGDATABASE", "lock-test"),
port: ENV.fetch("LOCK_PGPORT", 5434),
pool: 5,
)
end

class JobRecord < ActiveRecord::Base
establish_connection(
adapter: "postgresql",
host: ENV.fetch("PGHOST", "localhost"),
user: ENV.fetch("PGUSER", "ubuntu"),
password: ENV.fetch("PGPASSWORD", "password"),
database: ENV.fetch("PGDATABASE", "que-test"),
)
end

def active_record_with_lock_adapter_connection
Que::Adapters::ActiveRecordWithLock.new(
job_connection_pool: JobRecord.connection_pool,
lock_connection_pool: LockDatabaseRecord.connection_pool,
)
end
8 changes: 2 additions & 6 deletions spec/lib/que/locker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def expect_to_work(job)
with_locked_job do |actual_job|
expect(actual_job[:job_id]).to eql(job[:job_id])
expect(Que).to receive(:execute).
with("SELECT pg_advisory_unlock($1)", [job[:job_id]])
with(:unlock_job, [job[:job_id]])

# Destroy the job to simulate the behaviour of the queue, and allow our lock query
# to discover new jobs.
Expand All @@ -58,9 +58,6 @@ def expect_to_lock_with(cursor:)
end

context "with just one job to lock" do
before do
described_class.instance_variable_set(:@queue_cursors, [0])
end
let!(:job_1) { FakeJob.enqueue(1, queue: queue, priority: 1).attrs }
let(:cursor_expiry) { 60 }

Expand Down Expand Up @@ -122,8 +119,7 @@ def expect_to_lock_with(cursor:)
expect_to_lock_with(cursor: job_1[:job_id])
expect_to_work(job_2)

@epoch += (cursor_expiry) # our cursor should now expire
# puts @epoch
@epoch += cursor_expiry # our cursor should now expire
expect_to_lock_with(cursor: 0)
expect_to_work(job_3)
end
Expand Down
4 changes: 2 additions & 2 deletions spec/lib/que/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@
FakeJob.enqueue(1)

expect(Que).
to receive(:execute).with(:lock_job, include("default", 0)).
to receive(:execute).with(:lock_job, ["default", 0]).
and_raise(ActiveRecord::ConnectionTimeoutError)
expect(work).to eq(:postgres_error)
end
Expand All @@ -232,7 +232,7 @@
FakeJob.enqueue(1)

expect(Que).
to receive(:execute).with(:lock_job, include("default", 0)).
to receive(:execute).with(:lock_job, ["default", 0]).
and_raise(ActiveRecord::ConnectionNotEstablished)
expect(work).to eq(:postgres_error)
end
Expand Down
29 changes: 9 additions & 20 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
require_relative "helpers/sleep_job"
require_relative "helpers/interruptible_sleep_job"
require_relative "helpers/user"
require_relative "active_record_with_lock_spec_helper"

def postgres_now
ActiveRecord::Base.connection.execute("SELECT NOW();")[0]["now"]
Expand All @@ -22,34 +23,22 @@ def establish_database_connection
ActiveRecord::Base.establish_connection(
adapter: "postgresql",
host: ENV.fetch("PGHOST", "localhost"),
user: ENV.fetch("PGUSER", "postgres"),
password: ENV.fetch("PGPASSWORD", ""),
user: ENV.fetch("PGUSER", "ubuntu"),
password: ENV.fetch("PGPASSWORD", "password"),
database: ENV.fetch("PGDATABASE", "que-test"),
)
end

establish_database_connection

# Make sure our test database is prepared to run Que
Que.connection = ActiveRecord
Que.migrate!


class LockDataBaseRecord < ActiveRecord::Base
def self.establish_lock_database_connection
establish_connection(
adapter: "postgresql",
host: ENV.fetch("LOCK_PGHOST", "localhost"),
user: ENV.fetch("LOCK_PGUSER", "ubuntu"),
password: ENV.fetch("LOCK_PGPASSWORD", "password"),
database: ENV.fetch("LOCK_PGDATABASE", "lock-test"),
port: ENV.fetch("LOCK_PGPORT", 5434),
)
end
def self.connection
establish_lock_database_connection.connection
Que.connection =
case ENV["ADAPTER"]
when "ActiveRecordWithLock" then active_record_with_lock_adapter_connection
else ActiveRecord
end
end

Que.migrate!

# Ensure we have a logger, so that we can test the code paths that log
Que.logger = Logger.new("/dev/null")
Expand Down

0 comments on commit d738345

Please sign in to comment.