Skip to content

Commit

Permalink
Added GoodJob::DiscreteExecution#duration column (#1374)
Browse files Browse the repository at this point in the history
* create GoodJob::DiscreteExecution#duration_ms

* set duration_ms based on monotonic time

* add GoodJob::DiscreteExecution#monotonic_runtime_latency

* replace GoodJob::DiscreteExecution#runtime_latency value by monotonic_runtime_latency

* add migration check for duration_ms column

* change GoodJob.migrated?

* fix namespace and set duration after interrupt error

* added migration to example app

* added tests

* changed from bigint duration_ms to interval duration

---------

Co-authored-by: Arnaud Levy <[email protected]>
  • Loading branch information
SebouChu and arnaudlevy authored Jul 5, 2024
1 parent 58a6605 commit 2b17bb7
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 8 deletions.
15 changes: 13 additions & 2 deletions app/models/good_job/discrete_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ def self.backtrace_migrated?
false
end

def self.monotonic_duration_migrated?
return true if columns_hash["duration"].present?

migration_pending_warning!
false
end

def number
serialized_params.fetch('executions', 0) + 1
end
Expand All @@ -37,9 +44,13 @@ def queue_latency
created_at - scheduled_at
end

# Time between when this job started and finished
# Monotonic time between when this job started and finished
def runtime_latency
(finished_at || Time.current) - performed_at if performed_at
if self.class.monotonic_duration_migrated?
duration
elsif performed_at
(finished_at || Time.current) - performed_at
end
end

def last_status_at
Expand Down
9 changes: 8 additions & 1 deletion app/models/good_job/execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ def perform(lock_id:)
raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at

job_performed_at = Time.current
monotonic_start = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
discrete_execution = nil
result = GoodJob::CurrentThread.within do |current_thread|
current_thread.reset
Expand All @@ -385,12 +386,14 @@ def perform(lock_id:)
interrupt_error_string = self.class.format_error(GoodJob::InterruptError.new("Interrupted after starting perform at '#{existing_performed_at}'"))
self.error = interrupt_error_string
self.error_event = ERROR_EVENT_INTERRUPTED if self.class.error_event_migrated?
monotonic_duration = (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - monotonic_start).seconds

discrete_execution_attrs = {
error: interrupt_error_string,
finished_at: job_performed_at,
}
discrete_execution_attrs[:error_event] = GoodJob::ErrorEvents::ERROR_EVENT_ENUMS[GoodJob::ErrorEvents::ERROR_EVENT_INTERRUPTED] if self.class.error_event_migrated?
discrete_execution_attrs[:duration] = monotonic_duration if GoodJob::DiscreteExecution.monotonic_duration_migrated?
discrete_executions.where(finished_at: nil).where.not(performed_at: nil).update_all(discrete_execution_attrs) # rubocop:disable Rails/SkipsModelValidations
end
end
Expand Down Expand Up @@ -494,8 +497,12 @@ def perform(lock_id:)
job_attributes.delete(:error_event) unless self.class.error_event_migrated?

job_finished_at = Time.current
monotonic_duration = (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - monotonic_start).seconds
job_attributes[:finished_at] = job_finished_at
discrete_execution.finished_at = job_finished_at if discrete_execution
if discrete_execution
discrete_execution.finished_at = job_finished_at
discrete_execution.duration = monotonic_duration if GoodJob::DiscreteExecution.monotonic_duration_migrated?
end

retry_unhandled_error = result.unhandled_error && GoodJob.retry_on_unhandled_error
reenqueued = result.retried? || retried_good_job_id.present? || retry_unhandled_error
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# frozen_string_literal: true

class CreateGoodJobExecutionDuration < ActiveRecord::Migration[7.1]
def change
reversible do |dir|
dir.up do
# Ensure this incremental update migration is idempotent
# with monolithic install migration.
return if connection.column_exists?(:good_job_executions, :duration)
end
end

add_column :good_job_executions, :duration, :interval
end
end
3 changes: 2 additions & 1 deletion demo/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema.define(version: 2024_05_18_175058) do
ActiveRecord::Schema.define(version: 2024_06_13_151310) do
# These are extensions that must be enabled in order to support this database
enable_extension "pgcrypto"
enable_extension "plpgsql"
Expand Down Expand Up @@ -43,6 +43,7 @@
t.integer "error_event", limit: 2
t.text "error_backtrace", array: true
t.uuid "process_id"
t.interval "duration"
t.index ["active_job_id", "created_at"], name: "index_good_job_executions_on_active_job_id_and_created_at"
t.index ["process_id", "created_at"], name: "index_good_job_executions_on_process_id_and_created_at"
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %>
t.integer :error_event, limit: 2
t.text :error_backtrace, array: true
t.uuid :process_id
t.interval :duration
end

create_table :good_job_processes, id: :uuid do |t|
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# frozen_string_literal: true

class CreateGoodJobExecutionDuration < ActiveRecord::Migration<%= migration_version %>
def change
reversible do |dir|
dir.up do
# Ensure this incremental update migration is idempotent
# with monolithic install migration.
return if connection.column_exists?(:good_job_executions, :duration)
end
end

add_column :good_job_executions, :duration, :interval
end
end
4 changes: 2 additions & 2 deletions lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ def self.deprecator
# @return [Boolean]
def self.migrated?
# Always update with the most recent migration check
GoodJob::Execution.reset_column_information
GoodJob::Execution.process_lock_migrated?
GoodJob::DiscreteExecution.reset_column_information
GoodJob::DiscreteExecution.monotonic_duration_migrated?
end

ActiveSupport.run_load_hooks(:good_job, self)
Expand Down
7 changes: 5 additions & 2 deletions spec/app/models/good_job/execution_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ def job_params
created_at: within(0.001).of(good_job.performed_at),
scheduled_at: within(0.001).of(good_job.created_at),
finished_at: within(1.second).of(Time.current),
duration: be_present,
error: nil,
serialized_params: good_job.serialized_params
)
Expand Down Expand Up @@ -739,7 +740,8 @@ def job_params
error: "TestJob::ExpectedError: Raised expected error",
created_at: within(1.second).of(Time.current),
scheduled_at: within(1.second).of(Time.current),
finished_at: within(1.second).of(Time.current)
finished_at: within(1.second).of(Time.current),
duration: be_present
)
end
end
Expand All @@ -763,7 +765,8 @@ def job_params
expect(good_job.discrete_executions.size).to eq(1)
expect(good_job.discrete_executions.first).to have_attributes(
performed_at: within(1.second).of(Time.current),
finished_at: within(1.second).of(Time.current)
finished_at: within(1.second).of(Time.current),
duration: be_present
)
end
end
Expand Down
1 change: 1 addition & 0 deletions spec/app/models/good_job/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
scheduled_at: 1.minute.ago,
created_at: 1.minute.ago,
finished_at: 1.minute.ago,
duration: 60.seconds,
error: "TestJob::Error: TestJob::Error"
)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def perform
expect(initial_discrete_execution).to have_attributes(
performed_at: be_present,
finished_at: be_present,
duration: be_present,
error: start_with('GoodJob::InterruptError: Interrupted after starting perform at'),
error_event: GoodJob::Job::ERROR_EVENT_INTERRUPTED
)
Expand All @@ -91,6 +92,7 @@ def perform
expect(retried_discrete_execution).to have_attributes(
performed_at: be_present,
finished_at: be_present,
duration: be_present,
error: start_with('GoodJob::InterruptError: Interrupted after starting perform at'),
error_event: GoodJob::Job::ERROR_EVENT_RETRIED
)
Expand Down

0 comments on commit 2b17bb7

Please sign in to comment.