Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract Job querying behavior out of Scheduler #31

Merged
merged 1 commit into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ def enqueue_at(active_job, timestamp)
)

if inline?
good_job.perform
good_job.advisory_unlock
begin
good_job.perform
ensure
good_job.advisory_unlock
end
end

good_job
Expand Down
4 changes: 3 additions & 1 deletion lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ def start
ActiveRecord::Base.connection_pool.size

$stdout.puts "GoodJob starting with max_threads=#{max_threads}"
scheduler = GoodJob::Scheduler.new(pool_options: { max_threads: max_threads })

job_performer = GoodJob::Job.only_scheduled.priority_ordered.to_performer
scheduler = GoodJob::Scheduler.new(job_performer, pool_options: { max_threads: max_threads })

%w[INT TERM].each do |signal|
trap(signal) { @stop_good_job_executable = true }
Expand Down
26 changes: 26 additions & 0 deletions lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,32 @@ class Job < ActiveRecord::Base

self.table_name = 'good_jobs'

scope :only_scheduled, -> { where("scheduled_at < ?", Time.current).or(where(scheduled_at: nil)) }
scope :priority_ordered, -> { order(priority: :desc) }

class Performer
def initialize(query)
@query = query
end

def next
good_job = nil

@query.limit(1).with_advisory_lock do |good_jobs|
good_job = good_jobs.first
break unless good_job

good_job.perform
end

good_job
end
end

def self.to_performer
Performer.new(self)
end

def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false)
good_job = nil
ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload|
Expand Down
32 changes: 10 additions & 22 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ class Scheduler
fallback_policy: :abort, # shouldn't matter -- 0 max queue
}.freeze

def initialize(query = GoodJob::Job.all, timer_options: {}, pool_options: {})
@query = query
def initialize(performer, timer_options: {}, pool_options: {})
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)

@performer = performer
@pool = Concurrent::ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS.merge(pool_options))
@timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS.merge(timer_options)) do
idle_threads = @pool.max_length - @pool.length
Expand All @@ -32,10 +33,6 @@ def initialize(query = GoodJob::Job.all, timer_options: {}, pool_options: {})
@timer.execute
end

def ordered_query
@query.where("scheduled_at < ?", Time.current).or(@query.where(scheduled_at: nil)).order(priority: :desc)
end

def execute
end

Expand All @@ -61,19 +58,10 @@ def shutdown?
end

def create_thread
future = Concurrent::Future.new(args: [ordered_query], executor: @pool) do |query|
good_job = nil

Rails.application.executor.wrap do
query.limit(1).with_advisory_lock do |good_jobs|
good_job = good_jobs.first
break unless good_job

good_job.perform
end
end

good_job
future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer|
result = nil
Rails.application.executor.wrap { result = performer.next }
result
end
future.add_observer(self, :task_observer)
future.execute
Expand All @@ -83,9 +71,9 @@ def timer_observer(time, executed_task, error)
ActiveSupport::Notifications.instrument("finished_timer_task.good_job", { result: executed_task, error: error, time: time })
end

def task_observer(time, performed_job, error)
ActiveSupport::Notifications.instrument("finished_job_task.good_job", { good_job: performed_job, error: error, time: time })
create_thread if performed_job
def task_observer(time, result, error)
ActiveSupport::Notifications.instrument("finished_job_task.good_job", { result: result, error: error, time: time })
create_thread if result
end
end
end
5 changes: 2 additions & 3 deletions spec/good_job/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

before do
stub_const 'GoodJob::CLI::RAILS_ENVIRONMENT_RB', File.expand_path("spec/dummy/config/environment.rb")

allow(GoodJob::Scheduler).to receive(:new).and_return scheduler_mock
end

describe '#start' do
it 'initializes a scheduler' do
allow(GoodJob::Scheduler).to receive(:new).and_call_original
allow(Kernel).to receive(:loop)

cli = described_class.new([], {}, {})
Expand All @@ -22,7 +22,6 @@
end.to output.to_stdout

expect(GoodJob::Scheduler).to have_received(:new)
expect(scheduler_mock).to have_received(:shutdown)
end

it 'can gracefully shut down on INT signal' do
Expand Down Expand Up @@ -56,7 +55,7 @@
cli.start
end.to output.to_stdout

expect(GoodJob::Scheduler).to have_received(:new).with(pool_options: { max_threads: 4 })
expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::Job::Performer), pool_options: { max_threads: 4 })
end
end
end
Expand Down
15 changes: 15 additions & 0 deletions spec/good_job/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,21 @@ def perform(*_args, **_kwargs)
end)
end

describe '.to_performer' do
it 'performs a job' do
allow_any_instance_of(described_class).to receive(:perform), &:destroy!

job_1 = described_class.create!
job_2 = described_class.create!

performer = described_class.to_performer
5.times { performer.next }

expect { job_1.reload }.to raise_error ActiveRecord::RecordNotFound
expect { job_2.reload }.to raise_error ActiveRecord::RecordNotFound
end
end

describe 'lockable' do
describe '.advisory_lock' do
around do |example|
Expand Down
43 changes: 3 additions & 40 deletions spec/good_job/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def perform(*args, **kwargs)
end

it 'pops items off of the queue and runs them' do
scheduler = described_class.new
scheduler = described_class.new(GoodJob::Job.all.to_performer)
sleep_until(max: 5, increments_of: 0.5) { GoodJob::Job.count == 0 }

if RUN_JOBS.size != number_of_jobs
Expand Down Expand Up @@ -81,48 +81,11 @@ def perform(*args, **kwargs)
let!(:jobs) { ErrorJob.perform_later }

it "handles and retries jobs with errors" do
scheduler = described_class.new
scheduler = described_class.new(GoodJob::Job.all.to_performer)

50.times do
sleep 0.1
break if GoodJob::Job.count == 0
end

scheduler.shutdown
end
end

describe 'queue ordering' do
include ActiveSupport::Testing::TimeHelpers

it 'orders by scheduled_at and priority' do
priority_10 = ExampleJob.set(priority: 10).perform_later
priority_5 = ExampleJob.set(priority: 5).perform_later

scheduled_10 = ExampleJob.set(priority: 10, wait: 1.hour).perform_later
scheduled_5 = ExampleJob.set(priority: 5, wait: 1.hour).perform_later
sleep_until(max: 5, increments_of: 0.5) { GoodJob::Job.count == 0 }

scheduler = described_class.new
sleep_until { GoodJob::Job.count == 2 }
scheduler.shutdown

expect(RUN_JOBS).to eq [
priority_10.provider_job_id,
priority_5.provider_job_id,
]

travel 2.hours do
scheduler = described_class.new
sleep_until { GoodJob::Job.count == 0 }
scheduler.shutdown
end

expect(RUN_JOBS).to eq [
priority_10.provider_job_id,
priority_5.provider_job_id,
scheduled_10.provider_job_id,
scheduled_5.provider_job_id,
]
end
end
end