Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a Timer object that will wake up the Scheduler at specific times #155

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def self.reperform_jobs_on_standard_error=(value)
def self.shutdown(wait: true)
Notifier.instances.each { |notifier| notifier.shutdown(wait: wait) }
Scheduler.instances.each { |scheduler| scheduler.shutdown(wait: wait) }
Timer.instances.each { |timer| timer.shutdown(wait: wait) }
end

# Tests whether jobs have stopped executing.
Expand Down
8 changes: 6 additions & 2 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ def enqueue_at(active_job, timestamp)
end
end

executed_locally = execute_async? && @scheduler.create_thread(queue_name: good_job.queue_name)
Notifier.notify(queue_name: good_job.queue_name) unless executed_locally
job_state = {
queue_name: good_job.queue_name,
scheduled_at: good_job.scheduled_at&.to_i,
}
executed_locally = execute_async? && @scheduler.create_thread(job_state)
Notifier.notify(job_state) unless executed_locally

good_job
end
Expand Down
12 changes: 12 additions & 0 deletions lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ def self.queue_parser(string)
# @return [ActiveRecord::Relation]
scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(Time.current)).or(where(scheduled_at: nil)) }

# Order jobs by scheduled (unscheduled or soonest first).
# @!method schedule_ordered
# @!scope class
# @return [ActiveRecord::Relation]
scope :schedule_ordered, -> { order('scheduled_at ASC NULLS FIRST, created_at ASC') }

# Order jobs by priority (highest priority first).
# @!method priority_ordered
# @!scope class
Expand Down Expand Up @@ -147,6 +153,12 @@ def self.perform_with_advisory_lock
[good_job, result, error] if good_job
end

# Fetches the scheduled execution time of the next eligible Job(s).
# @return [Array<(DateTime)>]
def self.next_at(count = 1)
advisory_unlocked.unfinished.schedule_ordered.limit(count).pluck(:created_at, :scheduled_at).map { |timestamps| timestamps.compact.max }
end

# Places an ActiveJob job on a queue by creating a new {Job} record.
# @param active_job [ActiveJob::Base]
# The job to enqueue.
Expand Down
15 changes: 14 additions & 1 deletion lib/good_job/performer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@ class Performer
# Used to determine whether the performer should be used in GoodJob's
# current state. GoodJob state is a +Hash+ that will be passed as the
# first argument to +filter+ and includes info like the current queue.
def initialize(target, method_name, name: nil, filter: nil)
# @param next_at_method [Symbol]
# The name of a method on +target+ that returns timestamps of when next
# tasks may be available.
def initialize(target, method_name, name: nil, filter: nil, next_at_method: nil)
@target = target
@method_name = method_name
@name = name
@filter = filter
@next_at_method_name = next_at_method
end

# Find and perform any eligible jobs.
Expand All @@ -56,5 +60,14 @@ def next?(state = {})

@filter.call(state)
end

# The Returns timestamps of when next tasks may be available.
# @param count [Integer] number of timestamps to return.
# @return [Array<(Time, Timestamp)>, nil]
def next_at(count = 1)
return unless @next_at_method_name

@target.public_send(@next_at_method_name, count)
end
end
end
30 changes: 27 additions & 3 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ def self.from_configuration(configuration)
true
end
end
job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock, name: queue_string, filter: job_filter)
job_performer = GoodJob::Performer.new(
job_query,
:perform_with_advisory_lock,
name: queue_string,
filter: job_filter,
next_at_method: :next_at
)

GoodJob::Scheduler.new(job_performer, max_threads: max_threads)
end
Expand All @@ -71,6 +77,7 @@ def initialize(performer, max_threads: nil)
self.class.instances << self

@performer = performer
@timer_wake = GoodJob::Timer.new([self, :create_thread])

@pool_options = DEFAULT_POOL_OPTIONS.dup
@pool_options[:max_threads] = max_threads if max_threads.present?
Expand Down Expand Up @@ -122,7 +129,19 @@ def restart(wait: true)
# Returns +false+ if the performer decides not to attempt to execute a task based on the +state+ that is passed to it.
def create_thread(state = nil)
return nil unless @pool.running? && @pool.ready_worker_count.positive?
return false if state && [email protected]?(state)

if state
return false unless @performer.next?(state)

if state[:scheduled_at]
scheduled_at = Time.zone.at(state[:scheduled_at])

if scheduled_at > Time.current
@timer_wake.push(scheduled_at)
return true
end
end
end

future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer|
output = nil
Expand All @@ -141,7 +160,12 @@ def create_thread(state = nil)
def task_observer(time, output, thread_error)
GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
instrument("finished_job_task", { result: output, error: thread_error, time: time })
create_thread if output
if output
create_thread
elsif @performer.respond_to?(:next_at)
next_at = @performer.next_at(1).first
@timer_wake.push(next_at) if next_at
end
end

private
Expand Down
114 changes: 114 additions & 0 deletions lib/good_job/timer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
module GoodJob
#
# Timers will wake at the provided times to check for new work.
#
# Timers manage a discrete set of wake up times, sorted by soonest.
# New times can be pushed onto a Timer, and they will added if they are
# sooner than existing tracked times, or discarded if they are later than
# existing tracked times and the Timer's queue of tracked times is full.
#
# @todo Allow Timer to track an unbounded number of wake times.
#
# Timers are intended to be used with a {GoodJob::Scheduler} to provide
# reduced execution scheduling latency compared to a {GoodJob::Poller}.
#
class Timer
# Default number of wake times to track
DEFAULT_MAX_QUEUE = 5

# Defaults for instance of +Concurrent::ThreadPoolExecutor+.
EXECUTOR_OPTIONS = {
name: 'timer',
min_threads: 0,
max_threads: 1,
auto_terminate: true,
idletime: 60,
max_queue: 0,
fallback_policy: :discard, # shouldn't matter -- 0 max queue
}.freeze

# @!attribute [r] instances
# @!scope class
# List of all instantiated Timers in the current process.
# @return [array<GoodJob:Timer>]
cattr_reader :instances, default: [], instance_reader: false

# @!attribute [r] queue
# List of scheduled wakeups.
# @return [GoodJob::Timer::ScheduleTask]
attr_reader :queue

# @!attribute [r] queue
# Number of wake times to track.
# @return [Integer]
attr_reader :max_queue

# List of recipients that will receive wakeups.
# @return [Array<#call, Array(Object, Symbol)>]
attr_reader :recipients

# @param recipients [Array<#call, Array(Object, Symbol)>]
# @param max_queue [nil, Integer] Maximum number of times to track
def initialize(*recipients, max_queue: nil)
@recipients = Concurrent::Array.new(recipients)
@max_queue = max_queue || DEFAULT_MAX_QUEUE
@queue = Concurrent::Array.new
@mutex = Mutex.new

self.class.instances << self

create_executor
end

# Add a wake time to be tracked.
# The timestamp value be be discarded it is not sooner than the
# @param timestamp [Time, DateTime] the wake time
def push(timestamp)
@mutex.synchronize do
queue.select!(&:pending?)
return if queue.size == max_queue && timestamp > queue.last.scheduled_at

task = ScheduledTask.new(timestamp, args: [@recipients], executor: @executor) do |recipients|
recipients.each do |recipient|
target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
target.send(method_name)
end
end
task.execute

queue.unshift(task)
queue.sort_by!(&:scheduled_at)

removed_items = queue.slice!(max_queue..-1)
removed_items&.each(&:cancel)

task
end
end

# Shut down the timer.
def shutdown(wait: true)
return unless @executor&.running?

@executor.shutdown
@executor.wait_for_termination if wait
end

private

def create_executor
@executor = Concurrent::ThreadPoolExecutor.new(EXECUTOR_OPTIONS)
end

class ScheduledTask < Concurrent::ScheduledTask
attr_reader :scheduled_at

def initialize(timestamp, **args, &block)
@scheduled_at = timestamp

delay = [(timestamp - Time.current).to_f, 0].max
super(delay, **args, &block)
end
end
end
end
2 changes: 1 addition & 1 deletion spec/lib/good_job/adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
RSpec.describe GoodJob::Adapter do
let(:adapter) { described_class.new }
let(:active_job) { instance_double(ApplicationJob) }
let(:good_job) { instance_double(GoodJob::Job, queue_name: 'default') }
let(:good_job) { instance_double(GoodJob::Job, queue_name: 'default', scheduled_at: nil) }

describe '#initialize' do
it 'guards against improper execution modes' do
Expand Down
9 changes: 9 additions & 0 deletions spec/lib/good_job/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ def perform(result_value = nil, raise_error: false)
end
end

describe '.next_at' do
let(:active_job) { ExampleJob.new }
let!(:good_job) { described_class.enqueue(active_job) }

it 'returns an array of timestamps' do
expect(described_class.next_at).to eq [good_job.created_at]
end
end

describe '.queue_parser' do
it 'creates an intermediary hash' do
result = described_class.queue_parser('first,second')
Expand Down
65 changes: 65 additions & 0 deletions spec/lib/good_job/timer_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
require 'rails_helper'

RSpec.describe GoodJob::Timer do
describe '#initialize' do
it 'succeeds' do
described_class.new
end
end

describe '#push' do
let(:timer) { described_class.new(max_queue: 2) }

it 'adds a future to the queue' do
run_at = 1.minute.from_now
timer.push(run_at)

task = timer.queue.first
expect(task).to be_a Concurrent::ScheduledTask
expect(task.scheduled_at).to eq(run_at)
end

it 'maintains the appropriate queue size' do
one_minute = 1.minute.from_now
two_minutes = 2.minutes.from_now

timer.push(one_minute)
timer.push(two_minutes)

(3..5).to_a.each { |i| timer.push(i.minutes.from_now) }

expect(timer.queue.map(&:scheduled_at)).to eq [one_minute, two_minutes]
end
end

describe '#recipients' do
let(:recipient) { -> { RUNS << Time.current } }
let(:timer) { described_class.new(recipient, max_queue: 2) }

before do
stub_const "RUNS", Concurrent::Array.new
end

it 'triggers the recipient at the appropriate time' do
scheduled_at = 0.1.seconds.from_now
timer.push(scheduled_at)
sleep_until(max: 5) { RUNS.any? }

expect(RUNS.size).to eq(1)
expect(RUNS.first).to be_within(0.01.seconds).of(scheduled_at)
end

it 'only triggers scheduled items' do
one_tenth = 0.1.seconds.from_now
two_tenths = 0.2.seconds.from_now

timer.push(one_tenth)
timer.push(two_tenths)
(3..5).to_a.each { |i| timer.push((i * 0.1).minutes.from_now) }

sleep(1)

expect(RUNS.size).to eq 2
end
end
end
1 change: 1 addition & 0 deletions spec/support/reset_good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
GoodJob.shutdown
GoodJob::Notifier.instances.clear
GoodJob::Scheduler.instances.clear
GoodJob::Timer.instances.clear
end
end