Skip to content

Commit

Permalink
Extract Job querying behavior out of Scheduler (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon authored Jul 15, 2020
1 parent de4f971 commit 6367588
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 68 deletions.
7 changes: 5 additions & 2 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ def enqueue_at(active_job, timestamp)
)

if inline?
good_job.perform
good_job.advisory_unlock
begin
good_job.perform
ensure
good_job.advisory_unlock
end
end

good_job
Expand Down
4 changes: 3 additions & 1 deletion lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ def start
ActiveRecord::Base.connection_pool.size

$stdout.puts "GoodJob starting with max_threads=#{max_threads}"
scheduler = GoodJob::Scheduler.new(pool_options: { max_threads: max_threads })

job_performer = GoodJob::Job.only_scheduled.priority_ordered.to_performer
scheduler = GoodJob::Scheduler.new(job_performer, pool_options: { max_threads: max_threads })

%w[INT TERM].each do |signal|
trap(signal) { @stop_good_job_executable = true }
Expand Down
26 changes: 26 additions & 0 deletions lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,32 @@ class Job < ActiveRecord::Base

self.table_name = 'good_jobs'

scope :only_scheduled, -> { where("scheduled_at < ?", Time.current).or(where(scheduled_at: nil)) }
scope :priority_ordered, -> { order(priority: :desc) }

class Performer
def initialize(query)
@query = query
end

def next
good_job = nil

@query.limit(1).with_advisory_lock do |good_jobs|
good_job = good_jobs.first
break unless good_job

good_job.perform
end

good_job
end
end

def self.to_performer
Performer.new(self)
end

def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false)
good_job = nil
ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload|
Expand Down
32 changes: 10 additions & 22 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ class Scheduler
fallback_policy: :abort, # shouldn't matter -- 0 max queue
}.freeze

def initialize(query = GoodJob::Job.all, timer_options: {}, pool_options: {})
@query = query
def initialize(performer, timer_options: {}, pool_options: {})
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)

@performer = performer
@pool = Concurrent::ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS.merge(pool_options))
@timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS.merge(timer_options)) do
idle_threads = @pool.max_length - @pool.length
Expand All @@ -32,10 +33,6 @@ def initialize(query = GoodJob::Job.all, timer_options: {}, pool_options: {})
@timer.execute
end

def ordered_query
@query.where("scheduled_at < ?", Time.current).or(@query.where(scheduled_at: nil)).order(priority: :desc)
end

def execute
end

Expand All @@ -61,19 +58,10 @@ def shutdown?
end

def create_thread
future = Concurrent::Future.new(args: [ordered_query], executor: @pool) do |query|
good_job = nil

Rails.application.executor.wrap do
query.limit(1).with_advisory_lock do |good_jobs|
good_job = good_jobs.first
break unless good_job

good_job.perform
end
end

good_job
future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer|
result = nil
Rails.application.executor.wrap { result = performer.next }
result
end
future.add_observer(self, :task_observer)
future.execute
Expand All @@ -83,9 +71,9 @@ def timer_observer(time, executed_task, error)
ActiveSupport::Notifications.instrument("finished_timer_task.good_job", { result: executed_task, error: error, time: time })
end

def task_observer(time, performed_job, error)
ActiveSupport::Notifications.instrument("finished_job_task.good_job", { good_job: performed_job, error: error, time: time })
create_thread if performed_job
def task_observer(time, result, error)
ActiveSupport::Notifications.instrument("finished_job_task.good_job", { result: result, error: error, time: time })
create_thread if result
end
end
end
5 changes: 2 additions & 3 deletions spec/good_job/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

before do
stub_const 'GoodJob::CLI::RAILS_ENVIRONMENT_RB', File.expand_path("spec/dummy/config/environment.rb")

allow(GoodJob::Scheduler).to receive(:new).and_return scheduler_mock
end

describe '#start' do
it 'initializes a scheduler' do
allow(GoodJob::Scheduler).to receive(:new).and_call_original
allow(Kernel).to receive(:loop)

cli = described_class.new([], {}, {})
Expand All @@ -22,7 +22,6 @@
end.to output.to_stdout

expect(GoodJob::Scheduler).to have_received(:new)
expect(scheduler_mock).to have_received(:shutdown)
end

it 'can gracefully shut down on INT signal' do
Expand Down Expand Up @@ -56,7 +55,7 @@
cli.start
end.to output.to_stdout

expect(GoodJob::Scheduler).to have_received(:new).with(pool_options: { max_threads: 4 })
expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::Job::Performer), pool_options: { max_threads: 4 })
end
end
end
Expand Down
15 changes: 15 additions & 0 deletions spec/good_job/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,21 @@ def perform(*_args, **_kwargs)
end)
end

describe '.to_performer' do
it 'performs a job' do
allow_any_instance_of(described_class).to receive(:perform), &:destroy!

job_1 = described_class.create!
job_2 = described_class.create!

performer = described_class.to_performer
5.times { performer.next }

expect { job_1.reload }.to raise_error ActiveRecord::RecordNotFound
expect { job_2.reload }.to raise_error ActiveRecord::RecordNotFound
end
end

describe 'lockable' do
describe '.advisory_lock' do
around do |example|
Expand Down
43 changes: 3 additions & 40 deletions spec/good_job/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def perform(*args, **kwargs)
end

it 'pops items off of the queue and runs them' do
scheduler = described_class.new
scheduler = described_class.new(GoodJob::Job.all.to_performer)
sleep_until(max: 5, increments_of: 0.5) { GoodJob::Job.count == 0 }

if RUN_JOBS.size != number_of_jobs
Expand Down Expand Up @@ -81,48 +81,11 @@ def perform(*args, **kwargs)
let!(:jobs) { ErrorJob.perform_later }

it "handles and retries jobs with errors" do
scheduler = described_class.new
scheduler = described_class.new(GoodJob::Job.all.to_performer)

50.times do
sleep 0.1
break if GoodJob::Job.count == 0
end

scheduler.shutdown
end
end

describe 'queue ordering' do
include ActiveSupport::Testing::TimeHelpers

it 'orders by scheduled_at and priority' do
priority_10 = ExampleJob.set(priority: 10).perform_later
priority_5 = ExampleJob.set(priority: 5).perform_later

scheduled_10 = ExampleJob.set(priority: 10, wait: 1.hour).perform_later
scheduled_5 = ExampleJob.set(priority: 5, wait: 1.hour).perform_later
sleep_until(max: 5, increments_of: 0.5) { GoodJob::Job.count == 0 }

scheduler = described_class.new
sleep_until { GoodJob::Job.count == 2 }
scheduler.shutdown

expect(RUN_JOBS).to eq [
priority_10.provider_job_id,
priority_5.provider_job_id,
]

travel 2.hours do
scheduler = described_class.new
sleep_until { GoodJob::Job.count == 0 }
scheduler.shutdown
end

expect(RUN_JOBS).to eq [
priority_10.provider_job_id,
priority_5.provider_job_id,
scheduled_10.provider_job_id,
scheduled_5.provider_job_id,
]
end
end
end

0 comments on commit 6367588

Please sign in to comment.