Skip to content

Commit

Permalink
Cache scheduled jobs in memory so they can be executed without polling
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon committed Jan 25, 2021
1 parent b2f9218 commit fc548e7
Show file tree
Hide file tree
Showing 32 changed files with 489 additions and 35 deletions.
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ Naming/MemoizedInstanceVariableName:
Rails/RakeEnvironment:
Enabled: false

Rails/ApplicationJob:
Enabled: false

Rails/ApplicationRecord:
Enabled: false

Expand Down
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ gemspec

gem 'activerecord-jdbcpostgresql-adapter', platforms: [:jruby]
gem 'pg', platforms: [:mri, :mingw, :x64_mingw]
gem 'rails'

platforms :ruby do
gem "memory_profiler"
Expand Down
65 changes: 65 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,38 @@ PATH
GEM
remote: https://rubygems.org/
specs:
actioncable (6.1.1)
actionpack (= 6.1.1)
activesupport (= 6.1.1)
nio4r (~> 2.0)
websocket-driver (>= 0.6.1)
actionmailbox (6.1.1)
actionpack (= 6.1.1)
activejob (= 6.1.1)
activerecord (= 6.1.1)
activestorage (= 6.1.1)
activesupport (= 6.1.1)
mail (>= 2.7.1)
actionmailer (6.1.1)
actionpack (= 6.1.1)
actionview (= 6.1.1)
activejob (= 6.1.1)
activesupport (= 6.1.1)
mail (~> 2.5, >= 2.5.4)
rails-dom-testing (~> 2.0)
actionpack (6.1.1)
actionview (= 6.1.1)
activesupport (= 6.1.1)
rack (~> 2.0, >= 2.0.9)
rack-test (>= 0.6.3)
rails-dom-testing (~> 2.0)
rails-html-sanitizer (~> 1.0, >= 1.2.0)
actiontext (6.1.1)
actionpack (= 6.1.1)
activerecord (= 6.1.1)
activestorage (= 6.1.1)
activesupport (= 6.1.1)
nokogiri (>= 1.8.5)
actionview (6.1.1)
activesupport (= 6.1.1)
builder (~> 3.1)
Expand All @@ -38,6 +63,13 @@ GEM
activerecord-jdbcpostgresql-adapter (61.0-java)
activerecord-jdbc-adapter (= 61.0)
jdbc-postgres (>= 9.4, < 43)
activestorage (6.1.1)
actionpack (= 6.1.1)
activejob (= 6.1.1)
activerecord (= 6.1.1)
activesupport (= 6.1.1)
marcel (~> 0.3.1)
mimemagic (~> 0.3.2)
activesupport (6.1.1)
concurrent-ruby (~> 1.0, >= 1.0.2)
i18n (>= 1.6, < 2)
Expand Down Expand Up @@ -118,6 +150,10 @@ GEM
loofah (2.9.0)
crass (~> 1.0.2)
nokogiri (>= 1.5.9)
mail (2.7.1)
mini_mime (>= 0.1.1)
marcel (0.3.3)
mimemagic (~> 0.3.2)
mdl (0.11.0)
kramdown (~> 2.3)
kramdown-parser-gfm (~> 1.1)
Expand All @@ -126,6 +162,7 @@ GEM
mixlib-shellout
memory_profiler (1.0.0)
method_source (1.0.0)
mimemagic (0.3.5)
mini_mime (1.0.2)
mini_portile2 (2.5.0)
minitest (5.14.3)
Expand Down Expand Up @@ -174,6 +211,21 @@ GEM
rack (2.2.3)
rack-test (1.1.0)
rack (>= 1.0, < 3)
rails (6.1.1)
actioncable (= 6.1.1)
actionmailbox (= 6.1.1)
actionmailer (= 6.1.1)
actionpack (= 6.1.1)
actiontext (= 6.1.1)
actionview (= 6.1.1)
activejob (= 6.1.1)
activemodel (= 6.1.1)
activerecord (= 6.1.1)
activestorage (= 6.1.1)
activesupport (= 6.1.1)
bundler (>= 1.15.0)
railties (= 6.1.1)
sprockets-rails (>= 2.0.0)
rails-dom-testing (2.0.3)
activesupport (>= 4.2.0)
nokogiri (>= 1.6)
Expand Down Expand Up @@ -245,11 +297,23 @@ GEM
smart_properties (1.15.0)
spoon (0.0.6)
ffi
sprockets (4.0.2)
concurrent-ruby (~> 1.0)
rack (> 1, < 3)
sprockets-rails (3.2.2)
actionpack (>= 4.0)
activesupport (>= 4.0)
sprockets (>= 3.0.0)
thor (1.1.0)
tomlrb (2.0.1)
tzinfo (2.0.4)
concurrent-ruby (~> 1.0)
unicode-display_width (2.0.0)
websocket-driver (0.7.3)
websocket-extensions (>= 0.1.0)
websocket-driver (0.7.3-java)
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.5)
xpath (3.2.0)
nokogiri (~> 1.8)
yard (0.9.26)
Expand Down Expand Up @@ -281,6 +345,7 @@ DEPENDENCIES
pry-byebug
pry-rails
puma
rails
rbtrace
rspec-rails
rubocop
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ Options:
[--max-threads=COUNT] # Maximum number of threads to use for working jobs. (env var: GOOD_JOB_MAX_THREADS, default: 5)
[--queues=QUEUE_LIST] # Queues to work from. (env var: GOOD_JOB_QUEUES, default: *)
[--poll-interval=SECONDS] # Interval between polls for available jobs in seconds (env var: GOOD_JOB_POLL_INTERVAL, default: 1)
[--max-cache=COUNT] # Maximum number of scheduled jobs to cache in memory (env var: GOOD_JOB_MAX_CACHE, default: 10000)
[--daemonize] # Run as a background daemon (default: false)
[--pidfile=PIDFILE] # Path to write daemonized Process ID (env var: GOOD_JOB_PIDFILE, default: tmp/pids/good_job.pid)
Expand Down Expand Up @@ -225,6 +226,7 @@ Available configuration options are:
- `max_threads` (integer) sets the maximum number of threads to use when `execution_mode` is set to `:async`. You can also set this with the environment variable `GOOD_JOB_MAX_THREADS`.
- `queues` (string) determines which queues to execute jobs from when `execution_mode` is set to `:async`. See the description of `good_job start` for more details on the format of this string. You can also set this with the environment variable `GOOD_JOB_QUEUES`.
- `poll_interval` (integer) sets the number of seconds between polls for jobs when `execution_mode` is set to `:async`. You can also set this with the environment variable `GOOD_JOB_POLL_INTERVAL`.
- `max_cache` (integer) sets the maximum number of scheduled jobs that will be stored in memory to reduce execution latency when also polling for scheduled jobs. Caching 10,000 scheduled jobs uses approximately 20MB of memory. You can also set this with the environment variable `GOOD_JOB_MAX_CACHE`.

By default, GoodJob configures the following execution modes per environment:

Expand Down
1 change: 1 addition & 0 deletions exe/good_job
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env ruby
require 'good_job/cli'
GOOD_JOB_WITHIN_CLI = true
GOOD_JOB_LOG_TO_STDOUT = true
GoodJob::CLI.start(ARGV)
2 changes: 2 additions & 0 deletions gemfiles/rails_5.2.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ gem "pg", platforms: [:mri, :mingw, :x64_mingw]
gem "rails", "~> 5.2.0"

platforms :ruby do
gem "memory_profiler"
gem "pry-byebug"
gem "rbtrace"

group :lint do
Expand Down
2 changes: 2 additions & 0 deletions gemfiles/rails_6.0.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ gem "pg", platforms: [:mri, :mingw, :x64_mingw]
gem "rails", "~> 6.0.0"

platforms :ruby do
gem "memory_profiler"
gem "pry-byebug"
gem "rbtrace"

group :lint do
Expand Down
2 changes: 2 additions & 0 deletions gemfiles/rails_6.1.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ gem "pg", platforms: [:mri, :mingw, :x64_mingw]
gem "rails", "~> 6.1"

platforms :ruby do
gem "memory_profiler"
gem "pry-byebug"
gem "rbtrace"

group :lint do
Expand Down
2 changes: 2 additions & 0 deletions gemfiles/rails_head.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ gem "pg", platforms: [:mri, :mingw, :x64_mingw]
gem "rails", branch: "main", git: "https://github.com/rails/rails.git"

platforms :ruby do
gem "memory_profiler"
gem "pry-byebug"
gem "rbtrace"

group :lint do
Expand Down
13 changes: 8 additions & 5 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ def initialize(execution_mode: nil, queues: nil, max_threads: nil, poll_interval
@execution_mode = configuration.execution_mode
raise ArgumentError, "execution_mode: must be one of #{EXECUTION_MODES.join(', ')}." unless EXECUTION_MODES.include?(@execution_mode)

if @execution_mode == :async # rubocop:disable Style/GuardClause
if execute_async? # rubocop:disable Style/GuardClause
@notifier = GoodJob::Notifier.new
@poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval)
@scheduler = GoodJob::Scheduler.from_configuration(configuration)
@scheduler = GoodJob::Scheduler.from_configuration(configuration, warm_cache_on_initialize: Rails.application.initialized?)
@notifier.recipients << [@scheduler, :create_thread]
@poller.recipients << [@scheduler, :create_thread]
end
Expand Down Expand Up @@ -85,10 +85,13 @@ def enqueue_at(active_job, timestamp)
ensure
good_job.advisory_unlock
end
end
else
job_state = { queue_name: good_job.queue_name }
job_state[:scheduled_at] = good_job.scheduled_at if good_job.scheduled_at

executed_locally = execute_async? && @scheduler.create_thread(queue_name: good_job.queue_name)
Notifier.notify(queue_name: good_job.queue_name) unless executed_locally
executed_locally = execute_async? && @scheduler.create_thread(job_state)
Notifier.notify(job_state) unless executed_locally
end

good_job
end
Expand Down
4 changes: 4 additions & 0 deletions lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ def self.exit_on_failure?
type: :numeric,
banner: 'SECONDS',
desc: "Interval between polls for available jobs in seconds (env var: GOOD_JOB_POLL_INTERVAL, default: 5)"
method_option :max_cache,
type: :numeric,
banner: 'COUNT',
desc: "Maximum number of scheduled jobs to cache in memory (env var: GOOD_JOB_MAX_CACHE, default: 10000)"
method_option :daemonize,
type: :boolean,
desc: "Run as a background daemon (default: false)"
Expand Down
21 changes: 19 additions & 2 deletions lib/good_job/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ class Configuration
# Default number of threads to use per {Scheduler}
DEFAULT_MAX_THREADS = 5
# Default number of seconds between polls for jobs
DEFAULT_POLL_INTERVAL = 5
DEFAULT_POLL_INTERVAL = 10
# Default number of threads to use per {Scheduler}
DEFAULT_MAX_CACHE = 10000
# Default number of seconds to preserve jobs for {CLI#cleanup_preserved_jobs}
DEFAULT_CLEANUP_PRESERVED_JOBS_BEFORE_SECONDS_AGO = 24 * 60 * 60

Expand Down Expand Up @@ -42,7 +44,9 @@ def initialize(options, env: ENV)
# Value to use if none was specified in the configuration.
# @return [Symbol]
def execution_mode(default: :external)
if options[:execution_mode]
if defined?(GOOD_JOB_WITHIN_CLI) && GOOD_JOB_WITHIN_CLI
:external
elsif options[:execution_mode]
options[:execution_mode]
elsif rails_config[:execution_mode]
rails_config[:execution_mode]
Expand Down Expand Up @@ -105,6 +109,19 @@ def poll_interval
).to_i
end

# The maximum number of future-scheduled jobs to store in memory.
# Storing future-scheduled jobs in memory reduces execution latency
# at the cost of increased memory usage. 10,000 stored jobs = ~20MB.
# @return [Integer]
def max_cache
(
options[:max_cache] ||
rails_config[:max_cache] ||
env['GOOD_JOB_MAX_CACHE'] ||
DEFAULT_MAX_CACHE
).to_i
end

# Number of seconds to preserve jobs when using the +good_job cleanup_preserved_jobs+ CLI command.
# This configuration is only used when {GoodJob.preserve_job_records} is +true+.
# @return [Integer]
Expand Down
23 changes: 23 additions & 0 deletions lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ def self.queue_parser(string)
# @return [ActiveRecord::Relation]
scope :priority_ordered, -> { order('priority DESC NULLS LAST') }

# Order jobs by scheduled (unscheduled or soonest first).
# @!method schedule_ordered
# @!scope class
# @return [ActiveRecord::Relation]
scope :schedule_ordered, -> { order(Arel.sql('COALESCE(scheduled_at, created_at) ASC')) }

# Get Jobs were completed before the given timestamp. If no timestamp is
# provided, get all jobs that have been completed. By default, GoodJob
# deletes jobs after they are completed and this will find no jobs.
Expand Down Expand Up @@ -147,6 +153,23 @@ def self.perform_with_advisory_lock
[good_job, result, error] if good_job
end

# Fetches the scheduled execution time of the next eligible Job(s).
# @return [Array<(DateTime)>]
def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil)
query = advisory_unlocked.unfinished.schedule_ordered

after ||= Time.current
after_query = query.where('scheduled_at > ?', after).or query.where(scheduled_at: nil).where('created_at > ?', after)
after_at = after_query.limit(limit).pluck(:scheduled_at, :created_at).map { |timestamps| timestamps.compact.first }

if now_limit&.positive?
now_query = query.where('scheduled_at < ?', Time.current).or query.where(scheduled_at: nil)
now_at = now_query.limit(now_limit).pluck(:scheduled_at, :created_at).map { |timestamps| timestamps.compact.first }
end

Array(now_at) + after_at
end

# Places an ActiveJob job on a queue by creating a new {Job} record.
# @param active_job [ActiveJob::Base]
# The job to enqueue.
Expand Down
10 changes: 10 additions & 0 deletions lib/good_job/job_performer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def next
# @return [Boolean] whether the performer's {#next} method should be
# called in the current state.
def next?(state = {})
return true unless state[:queue_name]

if parsed_queues[:exclude]
parsed_queues[:exclude].exclude?(state[:queue_name])
elsif parsed_queues[:include]
Expand All @@ -48,6 +50,14 @@ def next?(state = {})
end
end

# The Returns timestamps of when next tasks may be available.
# @param count [Integer] number of timestamps to return
# @param count [DateTime, Time, nil] jobs scheduled after this time
# @return [Array<(Time, Timestamp)>, nil]
def next_at(after: nil, limit: nil, now_limit: nil)
job_query.next_scheduled_at(after: after, limit: limit, now_limit: now_limit)
end

private

attr_reader :queue_string
Expand Down
4 changes: 4 additions & 0 deletions lib/good_job/railtie.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,9 @@ class Railtie < ::Rails::Railtie
GoodJob::CurrentExecution.error_on_discard = event.payload[:error]
end
end

config.after_initialize do
GoodJob::Scheduler.instances.each(&:warm_cache)
end
end
end
Loading

0 comments on commit fc548e7

Please sign in to comment.