Skip to content

Commit

Permalink
Run Scheduler#warm_cache operation in threadpool executor (#242)
Browse files Browse the repository at this point in the history
Reduces the number of created database connections by not creating a connection on the main thread.
  • Loading branch information
bensheldon authored May 10, 2021
1 parent 9e8c3b4 commit 680ff1b
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 20 deletions.
5 changes: 5 additions & 0 deletions lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ def self.reperform_jobs_on_standard_error=(value)
# When forking processes you should shut down these background threads before forking, and restart them after forking.
# For example, you should use +shutdown+ and +restart+ when using async execution mode with Puma.
# See the {file:README.md#executing-jobs-async--in-process} for more explanation and examples.
# @param timeout [nil, Numeric] Seconds to wait for actively executing jobs to finish
# * +nil+, the scheduler will trigger a shutdown but not wait for it to complete.
# * +-1+, the scheduler will wait until the shutdown is complete.
# * +0+, the scheduler will immediately shutdown and stop any active tasks.
# * +1..+, the scheduler will wait that many seconds before stopping any remaining active tasks.
# @param wait [Boolean] whether to wait for shutdown
# @return [void]
def self.shutdown(timeout: -1, wait: nil)
Expand Down
2 changes: 1 addition & 1 deletion lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def start

notifier = GoodJob::Notifier.new
poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval)
scheduler = GoodJob::Scheduler.from_configuration(configuration)
scheduler = GoodJob::Scheduler.from_configuration(configuration, warm_cache_on_initialize: true)
notifier.recipients << [scheduler, :create_thread]
poller.recipients << [scheduler, :create_thread]

Expand Down
7 changes: 4 additions & 3 deletions lib/good_job/job.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
module GoodJob
#
# Represents a request to perform an +ActiveJob+ job.
#
# ActiveRecord model that represents an +ActiveJob+ job.
# Parent class can be configured with +GoodJob.active_record_parent_class+.
# @!parse
# class Job < ActiveRecord::Base; end
class Job < Object.const_get(GoodJob.active_record_parent_class)
include Lockable

Expand Down
1 change: 0 additions & 1 deletion lib/good_job/notifier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ def listening?
# This stops the background LISTENing thread.
# Use {#shutdown?} to determine whether threads have stopped.
# @param timeout [nil, Numeric] Seconds to wait for active threads.
#
# * +nil+, the scheduler will trigger a shutdown but not wait for it to complete.
# * +-1+, the scheduler will wait until the shutdown is complete.
# * +0+, the scheduler will immediately shutdown and stop any threads.
Expand Down
1 change: 0 additions & 1 deletion lib/good_job/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def initialize(*recipients, poll_interval: nil)
# Shut down the notifier.
# Use {#shutdown?} to determine whether threads have stopped.
# @param timeout [nil, Numeric] Seconds to wait for active threads.
#
# * +nil+, the scheduler will trigger a shutdown but not wait for it to complete.
# * +-1+, the scheduler will wait until the shutdown is complete.
# * +0+, the scheduler will immediately shutdown and stop any threads.
Expand Down
35 changes: 23 additions & 12 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Scheduler
# @param configuration [GoodJob::Configuration]
# @param warm_cache_on_initialize [Boolean]
# @return [GoodJob::Scheduler, GoodJob::MultiScheduler]
def self.from_configuration(configuration, warm_cache_on_initialize: true)
def self.from_configuration(configuration, warm_cache_on_initialize: false)
schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads|
queue_string, max_threads = queue_string_and_max_threads.split(':')
max_threads = (max_threads || configuration.max_threads).to_i
Expand All @@ -61,8 +61,8 @@ def self.from_configuration(configuration, warm_cache_on_initialize: true)
# @param performer [GoodJob::JobPerformer]
# @param max_threads [Numeric, nil] number of seconds between polls for jobs
# @param max_cache [Numeric, nil] maximum number of scheduled jobs to cache in memory
# @param warm_cache_on_initialize [Boolean] whether to warm the cache immediately
def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: true)
# @param warm_cache_on_initialize [Boolean] whether to warm the cache immediately, or manually by calling +warm_cache+
def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false)
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)

self.class.instances << self
Expand Down Expand Up @@ -93,7 +93,6 @@ def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initia
# This stops all threads in the thread pool.
# Use {#shutdown?} to determine whether threads have stopped.
# @param timeout [nil, Numeric] Seconds to wait for actively executing jobs to finish
#
# * +nil+, the scheduler will trigger a shutdown but not wait for it to complete.
# * +-1+, the scheduler will wait until the shutdown is complete.
# * +0+, the scheduler will immediately shutdown and stop any active tasks.
Expand Down Expand Up @@ -192,12 +191,24 @@ def stats
def warm_cache
return if @max_cache.zero?

performer.next_at(
limit: @max_cache,
now_limit: @executor_options[:max_threads]
).each do |scheduled_at|
create_thread({ scheduled_at: scheduled_at })
future = Concurrent::Future.new(args: [self, @performer], executor: executor) do |thr_scheduler, thr_performer|
Rails.application.executor.wrap do
thr_performer.next_at(
limit: @max_cache,
now_limit: @executor_options[:max_threads]
).each do |scheduled_at|
thr_scheduler.create_thread({ scheduled_at: scheduled_at })
end
end
end

observer = lambda do |_time, _output, thread_error|
GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
create_task # If cache-warming exhausts the threads, ensure there isn't an executable task remaining
end
future.add_observer(observer, :call)

future.execute
end

private
Expand All @@ -213,9 +224,9 @@ def create_executor

def create_task(delay = 0)
future = Concurrent::ScheduledTask.new(delay, args: [performer], executor: executor, timer_set: timer_set) do |thr_performer|
output = nil
Rails.application.executor.wrap { output = thr_performer.next }
output
Rails.application.executor.wrap do
thr_performer.next
end
end
future.add_observer(self, :task_observer)
future.execute
Expand Down
2 changes: 1 addition & 1 deletion spec/integration/adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def perform(*_args, **_kwargs)
let(:adapter) { GoodJob::Adapter.new execution_mode: :async, queues: 'mice:1', poll_interval: -1 }

it 'Jobs are directly handed to the performer, if they match the queues' do
elephant_ajob = ExampleJob.set(queue: 'elepehants').perform_later
elephant_ajob = ExampleJob.set(queue: 'elephants').perform_later
mice_ajob = ExampleJob.set(queue: 'mice').perform_later

sleep_until { RUN_JOBS.include? mice_ajob.provider_job_id }
Expand Down
2 changes: 1 addition & 1 deletion spec/support/reset_good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
config.use_transactional_fixtures = false

config.after do
GoodJob.shutdown(timeout: 0)
GoodJob.shutdown(timeout: -1)
GoodJob::Notifier.instances.clear
GoodJob::Poller.instances.clear
GoodJob::Scheduler.instances.clear
Expand Down

0 comments on commit 680ff1b

Please sign in to comment.