diff --git a/app/models/good_job/base_execution.rb b/app/models/good_job/base_execution.rb
index 0cc113c37..03dcfa856 100644
--- a/app/models/good_job/base_execution.rb
+++ b/app/models/good_job/base_execution.rb
@@ -33,6 +33,15 @@ def params_execution_count
def coalesce_scheduled_at_created_at
arel_table.coalesce(arel_table['scheduled_at'], arel_table['created_at'])
end
+
+ def discrete_support?
+ if connection.table_exists?('good_job_executions')
+ true
+ else
+ migration_pending_warning!
+ false
+ end
+ end
end
# The ActiveJob job class, as a string
@@ -40,5 +49,9 @@ def coalesce_scheduled_at_created_at
def job_class
serialized_params['job_class']
end
+
+ def discrete?
+ self.class.discrete_support? && is_discrete?
+ end
end
end
diff --git a/app/models/good_job/discrete_execution.rb b/app/models/good_job/discrete_execution.rb
new file mode 100644
index 000000000..5264d040c
--- /dev/null
+++ b/app/models/good_job/discrete_execution.rb
@@ -0,0 +1,63 @@
+# frozen_string_literal: true
+
+module GoodJob # :nodoc:
+ class DiscreteExecution < BaseRecord
+ self.table_name = 'good_job_executions'
+
+ belongs_to :execution, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :discrete_executions, optional: true
+ belongs_to :job, class_name: 'GoodJob::Job', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :discrete_executions, optional: true
+
+ scope :finished, -> { where.not(finished_at: nil) }
+
+ alias_attribute :performed_at, :created_at
+
+ def number
+ serialized_params.fetch('executions', 0) + 1
+ end
+
+ # Time between when this job was expected to run and when it started running
+ def queue_latency
+ now = Time.zone.now
+ actual_start = performed_at || finished_at || now
+
+ actual_start - perform_expected_at unless perform_expected_at >= now
+ end
+
+ # Time between when this job started and finished
+ def runtime_latency
+ (finished_at || Time.zone.now) - performed_at if performed_at
+ end
+
+ def last_status_at
+ finished_at || performed_at || perform_expected_at || created_at
+ end
+
+ def status
+ if finished_at.present?
+ if error.present?
+ :retried
+ elsif error.present? && job.finished_at.present?
+ :discarded
+ else
+ :succeeded
+ end
+ elsif (perform_expected_at || created_at) > DateTime.current
+ if serialized_params.fetch('executions', 0) > 1
+ :retried
+ else
+ :scheduled
+ end
+ elsif performed_at.present?
+ :running
+ else
+ :queued
+ end
+ end
+
+ def display_serialized_params
+ serialized_params.merge({
+ _good_job_execution: attributes.except('serialized_params'),
+ })
+ end
+ end
+end
diff --git a/app/models/good_job/execution.rb b/app/models/good_job/execution.rb
index b845aeabf..8726aa2dd 100644
--- a/app/models/good_job/execution.rb
+++ b/app/models/good_job/execution.rb
@@ -70,9 +70,13 @@ def self.queue_parser(string)
end
belongs_to :batch, class_name: 'GoodJob::BatchRecord', optional: true, inverse_of: :executions
-
belongs_to :job, class_name: 'GoodJob::Job', foreign_key: 'active_job_id', primary_key: 'active_job_id', optional: true, inverse_of: :executions
- after_destroy -> { self.class.active_job_id(active_job_id).delete_all }, if: -> { @_destroy_job }
+ has_many :discrete_executions, class_name: 'GoodJob::DiscreteExecution', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :execution # rubocop:disable Rails/HasManyOrHasOneDependent
+
+ after_destroy lambda {
+ discrete_executions.delete_all if discrete?
+ self.class.active_job_id(active_job_id).delete_all
+ }, if: -> { @_destroy_job }
# Get executions with given ActiveJob ID
# @!method active_job_id
@@ -201,8 +205,12 @@ def self.queue_parser(string)
end
end)
- # Construct a GoodJob::Execution from an ActiveJob instance.
def self.build_for_enqueue(active_job, overrides = {})
+ new(**enqueue_args(active_job, overrides))
+ end
+
+ # Construct arguments for GoodJob::Execution from an ActiveJob instance.
+ def self.enqueue_args(active_job, overrides = {})
if active_job.priority && GoodJob.configuration.smaller_number_is_higher_priority.nil?
ActiveSupport::Deprecation.warn(<<~DEPRECATION)
The next major version of GoodJob (v4.0) will change job `priority` to give smaller numbers higher priority (default: `0`), in accordance with Active Job's definition of priority.
@@ -218,6 +226,10 @@ def self.build_for_enqueue(active_job, overrides = {})
serialized_params: active_job.serialize,
scheduled_at: active_job.scheduled_at,
}
+ if discrete_support?
+ execution_args[:is_discrete] = true
+ execution_args[:executions_count] = 0
+ end
execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key)
reenqueued_current_execution = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id
@@ -238,7 +250,7 @@ def self.build_for_enqueue(active_job, overrides = {})
execution_args[:cron_at] = CurrentThread.cron_at
end
- new(**execution_args.merge(overrides))
+ execution_args.merge(overrides)
end
# Finds the next eligible Execution, acquire an advisory lock related to it, and
@@ -298,19 +310,36 @@ def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil)
# The new {Execution} instance representing the queued ActiveJob job.
def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false)
ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload|
- execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at })
+ current_execution = CurrentThread.execution
+
+ retried = current_execution && current_execution.active_job_id == active_job.job_id
+ if retried && current_execution.discrete?
+ current_execution.assign_attributes(enqueue_args(active_job, { scheduled_at: scheduled_at }))
+ current_execution.finished_at = nil
+ current_execution.advisory_lock if create_with_advisory_lock
+ execution = current_execution
+ else
+ execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at })
+ execution.is_discrete = nil if retried && discrete_support? && !current_execution.is_discrete?
+ execution.create_with_advisory_lock = create_with_advisory_lock
- execution.create_with_advisory_lock = create_with_advisory_lock
- instrument_payload[:execution] = execution
+ instrument_payload[:execution] = execution
- execution.save!
- active_job.provider_job_id = execution.id
- CurrentThread.execution.retried_good_job_id = execution.id if CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id
+ execution.save!
+ CurrentThread.execution.retried_good_job_id = execution.id if retried
+ end
+ active_job.provider_job_id = execution.id
execution
end
end
+ def self.format_error(error)
+ raise ArgumentError unless error.is_a?(Exception)
+
+ [error.class.to_s, ERROR_MESSAGE_SEPARATOR, error.message].join
+ end
+
# Execute the ActiveJob job this {Execution} represents.
# @return [ExecutionResult]
# An array of the return value of the job's +#perform+ method and the
@@ -320,12 +349,33 @@ def perform
run_callbacks(:perform) do
raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at
+ discrete_execution = nil
result = GoodJob::CurrentThread.within do |current_thread|
current_thread.reset
current_thread.execution = self
- current_thread.execution_interrupted = performed_at if performed_at
- update!(performed_at: Time.current)
+ if performed_at
+ current_thread.execution_interrupted = performed_at
+
+ if discrete?
+ interrupt_error_string = self.class.format_error(GoodJob::InterruptError.new("Interrupted after starting perform at '#{performed_at}'"))
+ self.error = interrupt_error_string
+ discrete_executions.where(finished_at: nil).where.not(performed_at: nil).update_all( # rubocop:disable Rails/SkipsModelValidations
+ error: interrupt_error_string,
+ finished_at: Time.current
+ )
+ end
+ end
+
+ if discrete?
+ transaction do
+ now = Time.current
+ discrete_execution = discrete_executions.create!(serialized_params: serialized_params, perform_expected_at: scheduled_at || created_at, created_at: now)
+ update!(performed_at: now, executions_count: (executions_count || 0) + 1)
+ end
+ else
+ update!(performed_at: Time.current)
+ end
ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload|
value = ActiveJob::Base.execute(active_job_data)
@@ -349,14 +399,43 @@ def perform
end
job_error = result.handled_error || result.unhandled_error
- self.error = [job_error.class, ERROR_MESSAGE_SEPARATOR, job_error.message].join if job_error
+
+ if job_error
+ error_string = self.class.format_error(job_error)
+ self.error = error_string
+ discrete_execution.error = error_string if discrete_execution
+ else
+ self.error = nil
+ end
reenqueued = result.retried? || retried_good_job_id.present?
if result.unhandled_error && GoodJob.retry_on_unhandled_error
- save!
+ if discrete_execution
+ transaction do
+ discrete_execution.update!(finished_at: Time.current)
+ self.performed_at = nil # TODO: will this break something to unset this?
+ save!
+ end
+ else
+ save!
+ end
elsif GoodJob.preserve_job_records == true || reenqueued || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) || cron_key.present?
- self.finished_at = Time.current
- save!
+ now = Time.current
+ if discrete_execution
+ if reenqueued
+ self.performed_at = nil
+ else
+ self.finished_at = now
+ end
+ discrete_execution.finished_at = now
+ transaction do
+ discrete_execution.save!
+ save!
+ end
+ else
+ self.finished_at = now
+ save!
+ end
else
destroy_job
end
diff --git a/app/models/good_job/job.rb b/app/models/good_job/job.rb
index 0761db07a..e490e13e7 100644
--- a/app/models/good_job/job.rb
+++ b/app/models/good_job/job.rb
@@ -30,6 +30,7 @@ def table_name=(_value)
belongs_to :batch, class_name: 'GoodJob::BatchRecord', inverse_of: :jobs, optional: true
has_many :executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', inverse_of: :job # rubocop:disable Rails/HasManyOrHasOneDependent
+ has_many :discrete_executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::DiscreteExecution', foreign_key: 'active_job_id', inverse_of: :job, dependent: :delete_all
# Only the most-recent unretried execution represents a "Job"
default_scope { where(retried_good_job_id: nil) }
@@ -191,9 +192,10 @@ def retry_job
execution.class.transaction(joinable: false, requires_new: true) do
new_active_job = active_job.retry_job(wait: 0, error: execution.error)
- execution.save
+ execution.save!
end
end
+
new_active_job
end
end
@@ -213,7 +215,7 @@ def discard_job(message)
update_execution = proc do
execution.update(
finished_at: Time.current,
- error: [job_error.class, GoodJob::Execution::ERROR_MESSAGE_SEPARATOR, job_error.message].join
+ error: GoodJob::Execution.format_error(job_error)
)
end
diff --git a/app/views/good_job/jobs/show.html.erb b/app/views/good_job/jobs/show.html.erb
index 01e20e306..bd361b22a 100644
--- a/app/views/good_job/jobs/show.html.erb
+++ b/app/views/good_job/jobs/show.html.erb
@@ -3,7 +3,12 @@
@@ -63,4 +68,8 @@
<%= tag.pre @job.serialized_params["arguments"].map(&:inspect).join(', '), class: 'text-wrap text-break' %>
-<%= render 'executions', executions: @job.executions.includes_advisory_locks.reverse %>
+<% if @job.discrete? %>
+ <%= render 'executions', executions: @job.discrete_executions.reverse %>
+<% else %>
+ <%= render 'executions', executions: @job.executions.includes_advisory_locks.reverse %>
+<% end %>
diff --git a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb
index 73cc1dba3..ad0004063 100644
--- a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb
+++ b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb
@@ -22,6 +22,9 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %>
t.uuid :batch_id
t.uuid :batch_callback_id
+
+ t.boolean :is_discrete
+ t.integer :executions_count
end
create_table :good_job_batches, id: :uuid do |t|
@@ -38,6 +41,16 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %>
t.datetime :finished_at
end
+ create_table :good_job_executions, id: :uuid do |t|
+ t.timestamps
+
+ t.uuid :active_job_id
+ t.jsonb :serialized_params
+ t.datetime :perform_expected_at
+ t.datetime :finished_at
+ t.text :error
+ end
+
create_table :good_job_processes, id: :uuid do |t|
t.timestamps
t.jsonb :state
@@ -62,5 +75,7 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %>
where: "finished_at IS NULL", name: :index_good_jobs_jobs_on_priority_created_at_when_unfinished
add_index :good_jobs, [:batch_id], where: "batch_id IS NOT NULL"
add_index :good_jobs, [:batch_callback_id], where: "batch_callback_id IS NOT NULL"
+
+ add_index :good_job_executions, [:active_job_id, :created_at], name: :index_good_job_executions_on_active_job_id_and_created_at
end
end
diff --git a/lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb b/lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb
new file mode 100644
index 000000000..e4f3a48d8
--- /dev/null
+++ b/lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb
@@ -0,0 +1,29 @@
+# frozen_string_literal: true
+class CreateGoodJobExecutions < ActiveRecord::Migration<%= migration_version %>
+ def change
+ reversible do |dir|
+ dir.up do
+ # Ensure this incremental update migration is idempotent
+ # with monolithic install migration.
+ return if connection.table_exists?(:good_job_executions)
+ end
+ end
+
+ create_table :good_job_executions, id: :uuid do |t|
+ t.timestamps
+
+ t.uuid :active_job_id
+ t.jsonb :serialized_params
+ t.datetime :perform_expected_at
+ t.datetime :finished_at
+ t.text :error
+
+ t.index [:active_job_id, :created_at], name: :index_good_job_executions_on_active_job_id_and_created_at
+ end
+
+ change_table :good_jobs do |t|
+ t.boolean :is_discrete
+ t.integer :executions_count
+ end
+ end
+end
diff --git a/lib/good_job.rb b/lib/good_job.rb
index 1995888c3..8baec4190 100644
--- a/lib/good_job.rb
+++ b/lib/good_job.rb
@@ -170,6 +170,13 @@ def self.cleanup_preserved_jobs(older_than: nil)
ActiveSupport::Notifications.instrument("cleanup_preserved_jobs.good_job", { older_than: older_than, timestamp: timestamp }) do |payload|
old_jobs = GoodJob::Job.where('finished_at <= ?', timestamp)
old_jobs = old_jobs.succeeded unless include_discarded
+
+ deleted_discrete_executions_count = if GoodJob::DiscreteExecution.migrated?
+ GoodJob::DiscreteExecution.where(job: old_jobs).delete_all
+ else
+ 0
+ end
+
deleted_executions_count = GoodJob::Execution.where(job: old_jobs).delete_all
if GoodJob::BatchRecord.migrated?
@@ -180,9 +187,10 @@ def self.cleanup_preserved_jobs(older_than: nil)
deleted_batches_count = 0
end
- payload[:destroyed_executions_count] = deleted_executions_count
payload[:destroyed_batches_count] = deleted_batches_count
- payload[:destroyed_records_count] = deleted_executions_count + deleted_batches_count
+ payload[:destroyed_discrete_executions_count] = deleted_discrete_executions_count
+ payload[:destroyed_executions_count] = deleted_executions_count
+ payload[:destroyed_records_count] = deleted_executions_count + deleted_discrete_executions_count + deleted_batches_count
end
end
diff --git a/spec/app/jobs/example_job_spec.rb b/spec/app/jobs/example_job_spec.rb
index e0dea7faf..327b531e4 100644
--- a/spec/app/jobs/example_job_spec.rb
+++ b/spec/app/jobs/example_job_spec.rb
@@ -25,9 +25,9 @@
end
travel_back
- executions = GoodJob::Execution.where(active_job_id: active_job.job_id).order(created_at: :asc)
- expect(executions.size).to eq 2
- expect(executions.last.error).to be_nil
+ good_job = GoodJob::Job.find_by(active_job_id: active_job.job_id)
+ expect(good_job.discrete_executions.count).to eq 2
+ expect(good_job.discrete_executions.last.error).to be_nil
end
end
@@ -40,26 +40,25 @@
end
travel_back
- executions = GoodJob::Execution.where(active_job_id: active_job.job_id).order(created_at: :asc)
- expect(executions.size).to eq 6
- expect(executions.last.error).to be_nil
+ good_job = GoodJob::Job.find_by(active_job_id: active_job.job_id)
+
+ expect(good_job.discrete_executions.count).to eq 6
+ expect(good_job.discrete_executions.last.error).to be_nil
end
end
describe "DEAD_TYPE" do
it 'errors but does not retry' do
- described_class.perform_later(described_class::DEAD_TYPE)
+ active_job = described_class.perform_later(described_class::DEAD_TYPE)
10.times do
GoodJob.perform_inline
travel(5.minutes)
end
travel_back
- active_job_id = GoodJob::Execution.last.active_job_id
-
- executions = GoodJob::Execution.where(active_job_id: active_job_id).order(created_at: :asc)
- expect(executions.size).to eq 3
- expect(executions.last.error).to be_present
+ good_job = GoodJob::Job.find_by(active_job_id: active_job.job_id)
+ expect(good_job.discrete_executions.count).to eq 3
+ expect(good_job.discrete_executions.last.error).to be_present
end
end
diff --git a/spec/app/models/good_job/execution_spec.rb b/spec/app/models/good_job/execution_spec.rb
index 258b4544f..dd2b10f3f 100644
--- a/spec/app/models/good_job/execution_spec.rb
+++ b/spec/app/models/good_job/execution_spec.rb
@@ -3,6 +3,8 @@
RSpec.describe GoodJob::Execution do
before do
+ allow(described_class).to receive(:discrete_support?).and_return(false)
+
stub_const "RUN_JOBS", Concurrent::Array.new
stub_const 'TestJob', (Class.new(ActiveJob::Base) do
self.queue_name = 'test'
@@ -621,6 +623,66 @@ def job_params
end
end
end
+
+ context 'when Discrete' do
+ before do
+ ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :inline)
+ allow(described_class).to receive(:discrete_support?).and_return(true)
+ good_job.update!(is_discrete: true)
+ end
+
+ it 'creates a DiscreteExecution record' do
+ good_job.perform
+
+ dexecution = good_job.discrete_executions.first
+ expect(dexecution).to be_present
+ expect(dexecution).to have_attributes(
+ active_job_id: good_job.active_job_id,
+ created_at: good_job.performed_at,
+ perform_expected_at: good_job.scheduled_at || good_job.created_at,
+ finished_at: within(1.second).of(Time.current),
+ error: nil,
+ serialized_params: good_job.serialized_params
+ )
+ end
+
+ context 'when ActiveJob rescues an error' do
+ let(:active_job) { TestJob.new("a string", raise_error: true) }
+ let!(:good_job) { described_class.enqueue(active_job) }
+
+ before do
+ allow(described_class).to receive(:discrete_support?).and_return(true)
+ allow(GoodJob).to receive(:preserve_job_records).and_return(true)
+ TestJob.retry_on(StandardError, wait: 1.hour, attempts: 2) { nil }
+ good_job.update!(is_discrete: true)
+ end
+
+ it 'updates the existing Execution/Job record instead of creating a new one' do
+ expect { good_job.perform }
+ .to not_change(described_class, :count)
+ .and change { good_job.reload.serialized_params["executions"] }.by(1)
+ .and not_change { good_job.reload.id }
+ .and not_change { described_class.count }
+
+ expect(good_job).to have_attributes(
+ error: "TestJob::ExpectedError: Raised expected error",
+ created_at: within(1.second).of(Time.current),
+ performed_at: nil,
+ finished_at: nil,
+ scheduled_at: within(10.minutes).of(1.hour.from_now) # interval because of retry jitter
+ )
+ expect(GoodJob::DiscreteExecution.count).to eq(1)
+ discrete_execution = good_job.discrete_executions.first
+ expect(discrete_execution).to have_attributes(
+ active_job_id: good_job.active_job_id,
+ error: "TestJob::ExpectedError: Raised expected error",
+ created_at: within(1.second).of(Time.current),
+ perform_expected_at: within(1.second).of(Time.current),
+ finished_at: within(1.second).of(Time.current)
+ )
+ end
+ end
+ end
end
describe '#destroy_job' do
diff --git a/spec/app/models/good_job/job_spec.rb b/spec/app/models/good_job/job_spec.rb
index 18a47eb1d..ffc9e92a2 100644
--- a/spec/app/models/good_job/job_spec.rb
+++ b/spec/app/models/good_job/job_spec.rb
@@ -2,23 +2,38 @@
require 'rails_helper'
RSpec.describe GoodJob::Job do
- subject(:job) { described_class.find(head_execution.active_job_id) }
+ let(:active_job_id) { SecureRandom.uuid }
- before do
- allow(GoodJob).to receive(:preserve_job_records).and_return(true)
- ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :external)
-
- stub_const 'TestJob', (Class.new(ActiveJob::Base) do
- def perform(feline = nil, canine: nil)
- end
- end)
- stub_const 'TestJob::Error', Class.new(StandardError)
+ let(:job) do
+ described_class.create!(
+ is_discrete: true,
+ active_job_id: active_job_id,
+ scheduled_at: 10.minutes.from_now,
+ queue_name: 'mice',
+ priority: 10,
+ serialized_params: {
+ 'job_id' => active_job_id,
+ 'job_class' => 'TestJob',
+ 'executions' => 1,
+ 'exception_executions' => { 'TestJob::Error' => 1 },
+ 'queue_name' => 'mice',
+ 'priority' => 10,
+ 'arguments' => ['cat', { 'canine' => 'dog' }],
+ }
+ ).tap do |job|
+ job.discrete_executions.create!(
+ perform_expected_at: 1.minute.ago,
+ created_at: 1.minute.ago,
+ finished_at: 1.minute.ago,
+ error: "TestJob::Error: TestJob::Error"
+ )
+ end
end
+ let(:undiscrete_job) { described_class.find(head_execution.active_job_id) }
- let!(:tail_execution) do
- active_job_id = SecureRandom.uuid
+ let(:tail_execution) do
GoodJob::Execution.create!(
- active_job_id: SecureRandom.uuid,
+ active_job_id: active_job_id,
created_at: 1.minute.ago,
queue_name: 'mice',
priority: 10,
@@ -33,7 +48,7 @@ def perform(feline = nil, canine: nil)
)
end
- let!(:head_execution) do
+ let(:head_execution) do
GoodJob::Execution.create!(
active_job_id: tail_execution.active_job_id,
scheduled_at: 10.minutes.from_now,
@@ -57,6 +72,18 @@ def perform(feline = nil, canine: nil)
end
end
+ before do
+ allow(GoodJob).to receive(:preserve_job_records).and_return(true)
+ ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :external)
+
+ stub_const 'TestJob', (Class.new(ActiveJob::Base) do
+ def perform
+ raise "Didn't expect to perform this job"
+ end
+ end)
+ stub_const 'TestJob::Error', Class.new(StandardError)
+ end
+
describe '.find' do
it 'returns a record that is the same as the head execution' do
job = described_class.find(head_execution.active_job_id)
@@ -66,7 +93,7 @@ def perform(feline = nil, canine: nil)
describe '#id' do
it 'is the ActiveJob ID' do
- expect(job.id).to eq head_execution.active_job_id
+ expect(job.id).to eq job.active_job_id
end
end
@@ -78,6 +105,7 @@ def perform(feline = nil, canine: nil)
describe '#head_execution' do
it 'is the head execution (which should be the same record)' do
+ job = undiscrete_job
expect(job.head_execution).to eq head_execution
expect(job._execution_id).to eq head_execution.id
end
@@ -85,6 +113,7 @@ def perform(feline = nil, canine: nil)
describe '#tail_execution' do
it 'is the tail execution' do
+ job = undiscrete_job
expect(job.tail_execution).to eq tail_execution
end
end
@@ -160,29 +189,50 @@ def perform(feline = nil, canine: nil)
describe '#retry_job' do
context 'when job is retried' do
before do
- head_execution.update!(
+ job.update!(
finished_at: Time.current,
error: "TestJob::Error: TestJob::Error"
)
end
- it 'enqueues another execution and updates the original job' do
- original_head_execution = job.head_execution
+ it 'updates the original job' do
+ expect(job).to be_discrete
expect do
job.retry_job
- end.to change { job.executions.reload.size }.by(1)
-
- new_head_execution = job.head_execution(reload: true)
- expect(new_head_execution.serialized_params).to include(
- "executions" => 2,
- "queue_name" => "mice",
- "priority" => 10,
- "arguments" => ['cat', hash_including('canine' => 'dog')]
- )
+ end.to change { job.reload.finished? }.from(true).to(false)
+ expect(job.executions.count).to eq 1
+ end
+
+ context 'when job is not discrete' do
+ let(:job) { undiscrete_job }
- original_head_execution.reload
- expect(original_head_execution.retried_good_job_id).to eq new_head_execution.id
+ before do
+ head_execution.update!(
+ finished_at: Time.current,
+ error: "TestJob::Error: TestJob::Error"
+ )
+ end
+
+ it 'enqueues another execution and updates the original job' do
+ original_head_execution = undiscrete_job.head_execution
+
+ expect do
+ job.retry_job
+ end.to change { job.executions.reload.size }.by(1)
+ expect(job.reload).not_to be_finished
+
+ new_head_execution = job.head_execution(reload: true)
+ expect(new_head_execution.serialized_params).to include(
+ "executions" => 2,
+ "queue_name" => "mice",
+ "priority" => 10,
+ "arguments" => ['cat', hash_including('canine' => 'dog')]
+ )
+
+ original_head_execution.reload
+ expect(original_head_execution.retried_good_job_id).to eq new_head_execution.id
+ end
end
end
@@ -279,18 +329,12 @@ def perform(feline = nil, canine: nil)
end
describe '#destroy_job' do
- context 'when a job is finished' do
- before do
- job.head_execution.update! finished_at: Time.current
- end
-
- it 'destroys all the job executions' do
- job.destroy_job
+ it 'destroys job and executions' do
+ job.update! finished_at: Time.current
+ job.destroy_job
- expect { head_execution.reload }.to raise_error ActiveRecord::RecordNotFound
- expect { tail_execution.reload }.to raise_error ActiveRecord::RecordNotFound
- expect { job.reload }.to raise_error ActiveRecord::RecordNotFound
- end
+ expect { job.reload }.to raise_error ActiveRecord::RecordNotFound
+ expect(GoodJob::DiscreteExecution.count).to eq 0
end
context 'when a job is not finished' do
@@ -298,5 +342,29 @@ def perform(feline = nil, canine: nil)
expect { job.destroy_job }.to raise_error GoodJob::Job::ActionForStateMismatchError
end
end
+
+ context "when undiscrete job" do
+ let(:job) { undiscrete_job }
+
+ context 'when a job is finished' do
+ before do
+ job.head_execution.update! finished_at: Time.current
+ end
+
+ it 'destroys all the job executions' do
+ job.destroy_job
+
+ expect { head_execution.reload }.to raise_error ActiveRecord::RecordNotFound
+ expect { tail_execution.reload }.to raise_error ActiveRecord::RecordNotFound
+ expect { job.reload }.to raise_error ActiveRecord::RecordNotFound
+ end
+ end
+
+ context 'when a job is not finished' do
+ it 'raises an ActionForStateMismatchError' do
+ expect { job.destroy_job }.to raise_error GoodJob::Job::ActionForStateMismatchError
+ end
+ end
+ end
end
end
diff --git a/spec/integration/batch_spec.rb b/spec/integration/batch_spec.rb
index 9a00f042b..bb4396fc7 100644
--- a/spec/integration/batch_spec.rb
+++ b/spec/integration/batch_spec.rb
@@ -215,9 +215,10 @@ def perform(*_args, **_kwargs)
GoodJob.perform_inline
expect(GoodJob::Job.count).to eq 3
- expect(GoodJob::Execution.count).to eq 5
- expect(GoodJob::Execution.where(batch_id: batch.id).count).to eq 1
- expect(GoodJob::Execution.where(batch_callback_id: batch.id).count).to eq 4
+ expect(GoodJob::Execution.count).to eq 3
+ expect(GoodJob::DiscreteExecution.count).to eq 5
+ expect(GoodJob::Job.where(batch_id: batch.id).count).to eq 1
+ expect(GoodJob::Job.where(batch_callback_id: batch.id).count).to eq 2
callback_arguments = GoodJob::Job.where(batch_callback_id: batch.id).map(&:head_execution).map(&:active_job).map(&:arguments).map(&:second)
expect(callback_arguments).to contain_exactly({ event: :discard }, { event: :finish })
@@ -230,9 +231,10 @@ def perform(*_args, **_kwargs)
GoodJob.perform_inline
expect(GoodJob::Job.count).to eq 3
- expect(GoodJob::Execution.count).to eq 5
- expect(GoodJob::Execution.where(batch_id: batch.id).count).to eq 1
- expect(GoodJob::Execution.where(batch_callback_id: batch.id).count).to eq 4
+ expect(GoodJob::Execution.count).to eq 3
+ expect(GoodJob::DiscreteExecution.count).to eq 5
+ expect(GoodJob::Job.where(batch_id: batch.id).count).to eq 1
+ expect(GoodJob::Job.where(batch_callback_id: batch.id).count).to eq 2
callback_arguments = GoodJob::Job.where(batch_callback_id: batch.id).map(&:head_execution).map(&:active_job).map(&:arguments).map(&:second)
expect(callback_arguments).to contain_exactly({ event: :success }, { event: :finish })
diff --git a/spec/lib/good_job/active_job_extensions/concurrency_spec.rb b/spec/lib/good_job/active_job_extensions/concurrency_spec.rb
index 3526c7b27..f580bd147 100644
--- a/spec/lib/good_job/active_job_extensions/concurrency_spec.rb
+++ b/spec/lib/good_job/active_job_extensions/concurrency_spec.rb
@@ -113,19 +113,21 @@ def perform(name:)
end
it "will error and retry jobs if concurrency is exceeded" do
- TestJob.perform_later(name: "Alice")
+ active_job = TestJob.perform_later(name: "Alice")
performer = GoodJob::JobPerformer.new('*')
scheduler = GoodJob::Scheduler.new(performer, max_threads: 5)
5.times { scheduler.create_thread }
sleep_until(max: 10, increments_of: 0.5) do
- GoodJob::Execution.where(concurrency_key: "Alice").finished.count >= 1
+ GoodJob::DiscreteExecution.where(active_job_id: active_job.job_id).finished.count >= 1
end
scheduler.shutdown
- expect(GoodJob::Execution.count).to be >= 1
- expect(GoodJob::Execution.where("error LIKE '%GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError%'")).to be_present
+ expect(GoodJob::Job.find_by(active_job_id: active_job.job_id).concurrency_key).to eq "Alice"
+
+ expect(GoodJob::DiscreteExecution.count).to be >= 1
+ expect(GoodJob::DiscreteExecution.where("error LIKE '%GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError%'")).to be_present
end
it 'is ignored with the job is executed via perform_now' do
@@ -153,17 +155,37 @@ def perform
end)
end
- it 'preserves the key value across retries' do
- TestJob.set(wait_until: 5.minutes.ago).perform_later(name: "Alice")
- begin
- GoodJob.perform_inline
- rescue StandardError
- nil
+ describe 'retries' do
+ it 'preserves the value' do
+ TestJob.set(wait_until: 5.minutes.ago).perform_later(name: "Alice")
+
+ begin
+ GoodJob.perform_inline
+ rescue StandardError
+ nil
+ end
+
+ expect(GoodJob::Execution.count).to eq 1
+ expect(GoodJob::Execution.first.concurrency_key).to be_present
+ expect(GoodJob::Job.first).not_to be_finished
end
- expect(GoodJob::Execution.count).to eq 2
- first_execution, retried_execution = GoodJob::Execution.order(created_at: :asc).to_a
- expect(retried_execution.concurrency_key).to eq first_execution.concurrency_key
+ context 'when not discrete' do
+ it 'preserves the key value across retries' do
+ TestJob.set(wait_until: 5.minutes.ago).perform_later(name: "Alice")
+ GoodJob::Job.first.update!(is_discrete: false)
+
+ begin
+ GoodJob.perform_inline
+ rescue StandardError
+ nil
+ end
+
+ expect(GoodJob::Execution.count).to eq 2
+ first_execution, retried_execution = GoodJob::Execution.order(created_at: :asc).to_a
+ expect(retried_execution.concurrency_key).to eq first_execution.concurrency_key
+ end
+ end
end
end
diff --git a/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb b/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb
index 35da102f3..9454e042d 100644
--- a/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb
+++ b/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb
@@ -16,21 +16,63 @@ def perform
context 'when a dequeued job has a performed_at but no finished_at' do
before do
active_job = TestJob.perform_later
- GoodJob::Execution.find_by(active_job_id: active_job.job_id).update!(performed_at: Time.current)
+ good_job = GoodJob::Job.find_by(active_job_id: active_job.job_id)
+ good_job.update!(performed_at: Time.current, finished_at: nil)
+ good_job.discrete_executions.create!(performed_at: Time.current, finished_at: nil)
end
it 'raises a GoodJob::InterruptError' do
expect { GoodJob.perform_inline }.to raise_error(GoodJob::InterruptError)
end
- it 'is rescuable' do
- TestJob.retry_on GoodJob::InterruptError
+ context 'when discrete execution is NOT enabled' do
+ before do
+ allow(GoodJob::Execution).to receive(:discrete_support?).and_return(false)
+ end
- expect { GoodJob.perform_inline }.not_to raise_error
- expect(GoodJob::Execution.count).to eq(2)
+ it 'is rescuable' do
+ TestJob.retry_on GoodJob::InterruptError
- job = GoodJob::Job.first
- expect(job.executions.first.error).to start_with 'GoodJob::InterruptError: Interrupted after starting perform at'
+ expect { GoodJob.perform_inline }.not_to raise_error
+ expect(GoodJob::Execution.count).to eq(2)
+
+ job = GoodJob::Job.first
+ expect(job.executions.first.error).to start_with 'GoodJob::InterruptError: Interrupted after starting perform at'
+ end
+ end
+
+ context 'when discrete execution is enabled' do
+ before do
+ allow(GoodJob::Execution).to receive(:discrete_support?).and_return(true)
+ end
+
+ it 'does not create a new execution' do
+ TestJob.retry_on GoodJob::InterruptError
+
+ expect { GoodJob.perform_inline }.not_to raise_error
+ expect(GoodJob::Job.count).to eq(1)
+ expect(GoodJob::Execution.count).to eq(1)
+ expect(GoodJob::DiscreteExecution.count).to eq(2)
+
+ job = GoodJob::Job.first
+ expect(job.executions.count).to eq(1)
+ expect(job.finished?).to be false
+ expect(job.error).to start_with 'GoodJob::InterruptError: Interrupted after starting perform at'
+
+ initial_discrete_execution = job.discrete_executions.first
+ expect(initial_discrete_execution).to have_attributes(
+ performed_at: be_present,
+ finished_at: be_present,
+ error: start_with('GoodJob::InterruptError: Interrupted after starting perform at')
+ )
+
+ retried_discrete_execution = job.discrete_executions.last
+ expect(retried_discrete_execution).to have_attributes(
+ performed_at: be_present,
+ finished_at: be_present,
+ error: start_with('GoodJob::InterruptError: Interrupted after starting perform at')
+ )
+ end
end
end
diff --git a/spec/lib/good_job/active_job_extensions/notify_options_spec.rb b/spec/lib/good_job/active_job_extensions/notify_options_spec.rb
index 4394e0b39..bd8f4a032 100644
--- a/spec/lib/good_job/active_job_extensions/notify_options_spec.rb
+++ b/spec/lib/good_job/active_job_extensions/notify_options_spec.rb
@@ -85,7 +85,7 @@ def perform
scheduler = GoodJob::Scheduler.new(performer, max_threads: 5)
scheduler.create_thread
- sleep_until(max: 5, increments_of: 0.5) { GoodJob::Execution.count >= 2 }
+ sleep_until(max: 5, increments_of: 0.5) { GoodJob::DiscreteExecution.count >= 2 }
scheduler.shutdown
expect(GoodJob::Notifier).not_to have_received(:notify)
diff --git a/spec/lib/good_job_spec.rb b/spec/lib/good_job_spec.rb
index 65bc2f191..14f6fe81e 100644
--- a/spec/lib/good_job_spec.rb
+++ b/spec/lib/good_job_spec.rb
@@ -42,17 +42,20 @@
let!(:old_unfinished_job) { GoodJob::Execution.create!(active_job_id: SecureRandom.uuid, scheduled_at: 15.days.ago, finished_at: nil) }
let!(:old_finished_job) { GoodJob::Execution.create!(active_job_id: SecureRandom.uuid, finished_at: 15.days.ago) }
let!(:old_finished_job_execution) { GoodJob::Execution.create!(active_job_id: old_finished_job.active_job_id, retried_good_job_id: old_finished_job.id, finished_at: 16.days.ago) }
+ let!(:old_finished_job_discrete_execution) { GoodJob::DiscreteExecution.create!(active_job_id: old_finished_job.active_job_id, finished_at: 16.days.ago) }
let!(:old_discarded_job) { GoodJob::Execution.create!(active_job_id: SecureRandom.uuid, finished_at: 15.days.ago, error: "Error") }
let!(:old_batch) { GoodJob::BatchRecord.create!(finished_at: 15.days.ago) }
it 'deletes finished jobs' do
destroyed_records_count = described_class.cleanup_preserved_jobs
- expect(destroyed_records_count).to eq 4
+ expect(destroyed_records_count).to eq 5
expect { recent_job.reload }.not_to raise_error
expect { old_unfinished_job.reload }.not_to raise_error
expect { old_finished_job.reload }.to raise_error ActiveRecord::RecordNotFound
+ expect { old_finished_job_execution.reload }.to raise_error ActiveRecord::RecordNotFound
+ expect { old_finished_job_discrete_execution.reload }.to raise_error ActiveRecord::RecordNotFound
expect { old_discarded_job.reload }.to raise_error ActiveRecord::RecordNotFound
expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound
end
diff --git a/spec/requests/good_job/jobs_controller_spec.rb b/spec/requests/good_job/jobs_controller_spec.rb
index ac57b23ce..8e247fa71 100644
--- a/spec/requests/good_job/jobs_controller_spec.rb
+++ b/spec/requests/good_job/jobs_controller_spec.rb
@@ -92,19 +92,22 @@
describe 'mass_action=retry' do
before do
job.update(error: "Error message")
+ job.discrete_executions.first.update(error: "Error message")
end
it 'retries the job' do
- put good_job.mass_update_jobs_path, params: {
- mass_action: 'retry',
- job_ids: [job.id],
- }
+ expect do
+ put good_job.mass_update_jobs_path, params: {
+ mass_action: 'retry',
+ job_ids: [job.id],
+ }
+ end.to change { job.reload.status }.from(:discarded).to(:succeeded)
expect(response).to have_http_status(:found)
expect(flash[:notice]).to eq('Successfully retried 1 job')
job.reload
- expect(job.executions.count).to eq 2
+ expect(job.discrete_executions.count).to eq 2
end
end
diff --git a/spec/support/example_app_helper.rb b/spec/support/example_app_helper.rb
index 30b46317c..70ab9af75 100644
--- a/spec/support/example_app_helper.rb
+++ b/spec/support/example_app_helper.rb
@@ -48,6 +48,7 @@ def within_example_app
tables = %i[
good_jobs
good_job_batches
+ good_job_executions
good_job_processes
good_job_settings
]
diff --git a/spec/support/rspec_not_change.rb b/spec/support/rspec_not_change.rb
new file mode 100644
index 000000000..8f0ffae77
--- /dev/null
+++ b/spec/support/rspec_not_change.rb
@@ -0,0 +1,2 @@
+# frozen_string_literal: true
+RSpec::Matchers.define_negated_matcher :not_change, :change
diff --git a/spec/system/jobs_spec.rb b/spec/system/jobs_spec.rb
index 4aa2f2177..d1f14370f 100644
--- a/spec/system/jobs_spec.rb
+++ b/spec/system/jobs_spec.rb
@@ -123,7 +123,7 @@
accept_confirm { click_on 'Retry job' }
end
expect(page).to have_content "Job has been retried"
- end.to change { discarded_job.executions.reload.size }.by(1)
+ end.to change { discarded_job.reload.status }.from(:discarded).to(:queued)
end
it 'can discard jobs' do
diff --git a/spec/test_app/app/jobs/example_job.rb b/spec/test_app/app/jobs/example_job.rb
index 491eff8bf..27842b80f 100644
--- a/spec/test_app/app/jobs/example_job.rb
+++ b/spec/test_app/app/jobs/example_job.rb
@@ -4,7 +4,7 @@ class ExampleJob < ApplicationJob
TYPES = [
SUCCESS_TYPE = 'success',
- ERROR_ONCE_TYPE = 'error_once',
+ ERROR_ONCE_TYPE = 'error_once',
ERROR_FIVE_TIMES_TYPE = 'error_five_times',
DEAD_TYPE = 'dead',
SLOW_TYPE = 'slow',
diff --git a/spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb b/spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb
new file mode 100644
index 000000000..a98c24915
--- /dev/null
+++ b/spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb
@@ -0,0 +1,29 @@
+# frozen_string_literal: true
+class CreateGoodJobExecutions < ActiveRecord::Migration[7.0]
+ def change
+ reversible do |dir|
+ dir.up do
+ # Ensure this incremental update migration is idempotent
+ # with monolithic install migration.
+ return if connection.table_exists?(:good_job_executions)
+ end
+ end
+
+ create_table :good_job_executions, id: :uuid do |t|
+ t.timestamps
+
+ t.uuid :active_job_id
+ t.jsonb :serialized_params
+ t.datetime :perform_expected_at
+ t.datetime :finished_at
+ t.text :error
+
+ t.index [:active_job_id, :created_at], name: :index_good_job_executions_on_active_job_id_and_created_at
+ end
+
+ change_table :good_jobs do |t|
+ t.boolean :is_discrete
+ t.integer :executions_count
+ end
+ end
+end
diff --git a/spec/test_app/db/schema.rb b/spec/test_app/db/schema.rb
index 9b68e84ab..e3e0c90ee 100644
--- a/spec/test_app/db/schema.rb
+++ b/spec/test_app/db/schema.rb
@@ -10,15 +10,14 @@
#
# It's strongly recommended that you check this file into your version control system.
-ActiveRecord::Schema.define(version: 2023_01_31_214927) do
-
+ActiveRecord::Schema[7.0].define(version: 2023_04_12_144442) do
# These are extensions that must be enabled in order to support this database
enable_extension "pgcrypto"
enable_extension "plpgsql"
create_table "good_job_batches", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t|
- t.datetime "created_at", precision: 6, null: false
- t.datetime "updated_at", precision: 6, null: false
+ t.datetime "created_at", null: false
+ t.datetime "updated_at", null: false
t.text "description"
t.jsonb "serialized_properties"
t.text "on_finish"
@@ -26,20 +25,31 @@
t.text "on_discard"
t.text "callback_queue_name"
t.integer "callback_priority"
- t.datetime "enqueued_at"
- t.datetime "discarded_at"
+ t.datetime "enqueued_at", precision: nil
+ t.datetime "discarded_at", precision: nil
+ t.datetime "finished_at", precision: nil
+ end
+
+ create_table "good_job_executions", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t|
+ t.datetime "created_at", null: false
+ t.datetime "updated_at", null: false
+ t.uuid "active_job_id"
+ t.jsonb "serialized_params"
+ t.datetime "perform_expected_at"
t.datetime "finished_at"
+ t.text "error"
+ t.index ["active_job_id", "created_at"], name: "index_good_job_executions_on_active_job_id_and_created_at"
end
create_table "good_job_processes", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t|
- t.datetime "created_at", precision: 6, null: false
- t.datetime "updated_at", precision: 6, null: false
+ t.datetime "created_at", null: false
+ t.datetime "updated_at", null: false
t.jsonb "state"
end
create_table "good_job_settings", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t|
- t.datetime "created_at", precision: 6, null: false
- t.datetime "updated_at", precision: 6, null: false
+ t.datetime "created_at", null: false
+ t.datetime "updated_at", null: false
t.text "key"
t.jsonb "value"
t.index ["key"], name: "index_good_job_settings_on_key", unique: true
@@ -49,19 +59,21 @@
t.text "queue_name"
t.integer "priority"
t.jsonb "serialized_params"
- t.datetime "scheduled_at"
- t.datetime "performed_at"
- t.datetime "finished_at"
+ t.datetime "scheduled_at", precision: nil
+ t.datetime "performed_at", precision: nil
+ t.datetime "finished_at", precision: nil
t.text "error"
- t.datetime "created_at", null: false
- t.datetime "updated_at", null: false
+ t.datetime "created_at", precision: nil, null: false
+ t.datetime "updated_at", precision: nil, null: false
t.uuid "active_job_id"
t.text "concurrency_key"
t.text "cron_key"
t.uuid "retried_good_job_id"
- t.datetime "cron_at"
+ t.datetime "cron_at", precision: nil
t.uuid "batch_id"
t.uuid "batch_callback_id"
+ t.boolean "is_discrete"
+ t.integer "executions_count"
t.index ["active_job_id", "created_at"], name: "index_good_jobs_on_active_job_id_and_created_at"
t.index ["active_job_id"], name: "index_good_jobs_on_active_job_id"
t.index ["batch_callback_id"], name: "index_good_jobs_on_batch_callback_id", where: "(batch_callback_id IS NOT NULL)"