diff --git a/lib/good_job/notifier.rb b/lib/good_job/notifier.rb index adae5f3d3..115fa1819 100644 --- a/lib/good_job/notifier.rb +++ b/lib/good_job/notifier.rb @@ -40,6 +40,7 @@ class Notifier PG::UnableToSend PG::Error ].freeze + CONNECTION_ERRORS_REPORTING_THRESHOLD = 3 # @!attribute [r] instances # @!scope class @@ -70,6 +71,8 @@ def self.notify(message) def initialize(*recipients) @recipients = Concurrent::Array.new(recipients) @listening = Concurrent::AtomicBoolean.new(false) + @connection_errors_count = Concurrent::AtomicFixnum.new(0) + @connection_errors_reported = Concurrent::AtomicBoolean.new(false) self.class.instances << self @@ -128,7 +131,6 @@ def restart(timeout: -1) # @return [void] def listen_observer(_time, _result, thread_error) if thread_error - GoodJob._on_thread_error(thread_error) ActiveSupport::Notifications.instrument("notifier_notify_error.good_job", { error: thread_error }) connection_error = CONNECTION_ERRORS.any? do |error_string| @@ -137,6 +139,16 @@ def listen_observer(_time, _result, thread_error) thread_error.is_a? error_class end + + if connection_error + @connection_errors_count.increment + if @connection_errors_reported.false? && @connection_errors_count.value >= CONNECTION_ERRORS_REPORTING_THRESHOLD + GoodJob._on_thread_error(thread_error) + @connection_errors_reported.make_true + end + else + GoodJob._on_thread_error(thread_error) + end end return if shutdown? @@ -175,6 +187,8 @@ def listen(delay: 0) target.send(method_name, parsed_payload) end end + + reset_connection_errors end end end @@ -223,5 +237,10 @@ def wait_for_notify sleep WAIT_INTERVAL end end + + def reset_connection_errors + @connection_errors_count.value = 0 + @connection_errors_reported.make_false + end end end diff --git a/spec/lib/good_job/notifier_spec.rb b/spec/lib/good_job/notifier_spec.rb index 42d65855e..f820557b5 100644 --- a/spec/lib/good_job/notifier_spec.rb +++ b/spec/lib/good_job/notifier_spec.rb @@ -48,6 +48,22 @@ notifier.shutdown end + + it 'raises exception to GoodJob.on_thread_error when there is a connection error' do + stub_const('ExpectedError', Class.new(ActiveRecord::ConnectionNotEstablished)) + stub_const('GoodJob::Notifier::CONNECTION_ERRORS_REPORTING_THRESHOLD', 1) + on_thread_error = instance_double(Proc, call: nil) + allow(GoodJob).to receive(:on_thread_error).and_return(on_thread_error) + allow(JSON).to receive(:parse).and_raise ExpectedError + + notifier = described_class.new + sleep_until(max: 5, increments_of: 0.5) { notifier.listening? } + + described_class.notify(true) + wait_until(max: 5, increments_of: 0.5) { expect(on_thread_error).to have_received(:call).at_least(:once).with instance_of(ExpectedError) } + + notifier.shutdown + end end describe 'Process tracking' do