Skip to content

Commit

Permalink
Ensure that anytime the Notifier uses autoloaded constants (ActiveRec…
Browse files Browse the repository at this point in the history
…ord), they are wrapped with a Rails Executor (#797)
  • Loading branch information
bensheldon authored Jan 31, 2023
1 parent 3452ac0 commit a15bb46
Showing 1 changed file with 27 additions and 23 deletions.
50 changes: 27 additions & 23 deletions lib/good_job/notifier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -168,35 +168,37 @@ def listen(delay: 0)
future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening|
with_connection do
begin
run_callbacks :listen do
ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
connection.execute("LISTEN #{CHANNEL}")
Rails.application.executor.wrap do
run_callbacks :listen do
ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
connection.execute("LISTEN #{CHANNEL}")
end
thr_listening.make_true
end
thr_listening.make_true
end

ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
while thr_executor.running?
wait_for_notify do |channel, payload|
next unless channel == CHANNEL

ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
parsed_payload = JSON.parse(payload, symbolize_names: true)
thr_recipients.each do |recipient|
target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
target.send(method_name, parsed_payload)
end
end
while thr_executor.running?
wait_for_notify do |channel, payload|
next unless channel == CHANNEL

reset_connection_errors
ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
parsed_payload = JSON.parse(payload, symbolize_names: true)
thr_recipients.each do |recipient|
target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
target.send(method_name, parsed_payload)
end
end

reset_connection_errors
end
end
ensure
run_callbacks :unlisten do
thr_listening.make_false
ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
connection.execute("UNLISTEN *")
Rails.application.executor.wrap do
run_callbacks :unlisten do
thr_listening.make_false
ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
connection.execute("UNLISTEN *")
end
end
end
end
Expand All @@ -207,8 +209,10 @@ def listen(delay: 0)
end

def with_connection
self.connection = Execution.connection_pool.checkout.tap do |conn|
Execution.connection_pool.remove(conn)
Rails.application.executor.wrap do
self.connection = Execution.connection_pool.checkout.tap do |conn|
Execution.connection_pool.remove(conn)
end
end
connection.execute("SET application_name = #{connection.quote(self.class.name)}")

Expand Down

0 comments on commit a15bb46

Please sign in to comment.