Skip to content

Commit

Permalink
Run Scheduler#warm_cache operation in threadpool executor
Browse files Browse the repository at this point in the history
Reduces the number of created database connections by not creating a connection on the main thread.
  • Loading branch information
bensheldon committed May 7, 2021
1 parent 9e8c3b4 commit de8c892
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 19 deletions.
5 changes: 5 additions & 0 deletions lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ def self.reperform_jobs_on_standard_error=(value)
# When forking processes you should shut down these background threads before forking, and restart them after forking.
# For example, you should use +shutdown+ and +restart+ when using async execution mode with Puma.
# See the {file:README.md#executing-jobs-async--in-process} for more explanation and examples.
# @param timeout [nil, Numeric] Seconds to wait for actively executing jobs to finish
# * +nil+, the scheduler will trigger a shutdown but not wait for it to complete.
# * +-1+, the scheduler will wait until the shutdown is complete.
# * +0+, the scheduler will immediately shutdown and stop any active tasks.
# * +1..+, the scheduler will wait that many seconds before stopping any remaining active tasks.
# @param wait [Boolean] whether to wait for shutdown
# @return [void]
def self.shutdown(timeout: -1, wait: nil)
Expand Down
2 changes: 1 addition & 1 deletion lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def start

notifier = GoodJob::Notifier.new
poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval)
scheduler = GoodJob::Scheduler.from_configuration(configuration)
scheduler = GoodJob::Scheduler.from_configuration(configuration, warm_cache_on_initialize: true)
notifier.recipients << [scheduler, :create_thread]
poller.recipients << [scheduler, :create_thread]

Expand Down
7 changes: 4 additions & 3 deletions lib/good_job/job.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
module GoodJob
#
# Represents a request to perform an +ActiveJob+ job.
#
# ActiveRecord model that represents an +ActiveJob+ job.
# Parent class can be configured with +GoodJob.active_record_parent_class+.
# @!parse
# class Job < ActiveRecord::Base; end
class Job < Object.const_get(GoodJob.active_record_parent_class)
include Lockable

Expand Down
1 change: 0 additions & 1 deletion lib/good_job/notifier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ def listening?
# This stops the background LISTENing thread.
# Use {#shutdown?} to determine whether threads have stopped.
# @param timeout [nil, Numeric] Seconds to wait for active threads.
#
# * +nil+, the scheduler will trigger a shutdown but not wait for it to complete.
# * +-1+, the scheduler will wait until the shutdown is complete.
# * +0+, the scheduler will immediately shutdown and stop any threads.
Expand Down
1 change: 0 additions & 1 deletion lib/good_job/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def initialize(*recipients, poll_interval: nil)
# Shut down the notifier.
# Use {#shutdown?} to determine whether threads have stopped.
# @param timeout [nil, Numeric] Seconds to wait for active threads.
#
# * +nil+, the scheduler will trigger a shutdown but not wait for it to complete.
# * +-1+, the scheduler will wait until the shutdown is complete.
# * +0+, the scheduler will immediately shutdown and stop any threads.
Expand Down
34 changes: 22 additions & 12 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Scheduler
# @param configuration [GoodJob::Configuration]
# @param warm_cache_on_initialize [Boolean]
# @return [GoodJob::Scheduler, GoodJob::MultiScheduler]
def self.from_configuration(configuration, warm_cache_on_initialize: true)
def self.from_configuration(configuration, warm_cache_on_initialize: false)
schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads|
queue_string, max_threads = queue_string_and_max_threads.split(':')
max_threads = (max_threads || configuration.max_threads).to_i
Expand All @@ -61,8 +61,8 @@ def self.from_configuration(configuration, warm_cache_on_initialize: true)
# @param performer [GoodJob::JobPerformer]
# @param max_threads [Numeric, nil] number of seconds between polls for jobs
# @param max_cache [Numeric, nil] maximum number of scheduled jobs to cache in memory
# @param warm_cache_on_initialize [Boolean] whether to warm the cache immediately
def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: true)
# @param warm_cache_on_initialize [Boolean] whether to warm the cache immediately, or manually by calling +warm_cache+
def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false)
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)

self.class.instances << self
Expand Down Expand Up @@ -93,7 +93,6 @@ def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initia
# This stops all threads in the thread pool.
# Use {#shutdown?} to determine whether threads have stopped.
# @param timeout [nil, Numeric] Seconds to wait for actively executing jobs to finish
#
# * +nil+, the scheduler will trigger a shutdown but not wait for it to complete.
# * +-1+, the scheduler will wait until the shutdown is complete.
# * +0+, the scheduler will immediately shutdown and stop any active tasks.
Expand Down Expand Up @@ -192,12 +191,23 @@ def stats
def warm_cache
return if @max_cache.zero?

performer.next_at(
limit: @max_cache,
now_limit: @executor_options[:max_threads]
).each do |scheduled_at|
create_thread({ scheduled_at: scheduled_at })
future = Concurrent::Future.new(args: [self, @performer], executor: @pool) do |thr_scheduler, thr_performer|
Rails.application.executor.wrap do
thr_performer.next_at(
limit: @max_cache,
now_limit: @executor_options[:max_threads]
).each do |scheduled_at|
thr_scheduler.create_thread({ scheduled_at: scheduled_at })
end
end
end

observer = lambda do |_time, _output, thread_error|
GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
end
future.add_observer(observer, :call)

future.execute
end

private
Expand All @@ -213,9 +223,9 @@ def create_executor

def create_task(delay = 0)
future = Concurrent::ScheduledTask.new(delay, args: [performer], executor: executor, timer_set: timer_set) do |thr_performer|
output = nil
Rails.application.executor.wrap { output = thr_performer.next }
output
Rails.application.executor.wrap do
thr_performer.next
end
end
future.add_observer(self, :task_observer)
future.execute
Expand Down
2 changes: 1 addition & 1 deletion spec/support/reset_good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
config.use_transactional_fixtures = false

config.after do
GoodJob.shutdown(timeout: 0)
GoodJob.shutdown(timeout: -1)
GoodJob::Notifier.instances.clear
GoodJob::Poller.instances.clear
GoodJob::Scheduler.instances.clear
Expand Down

0 comments on commit de8c892

Please sign in to comment.