Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that anytime the Notifier uses autoloaded constants (ActiveRecord), they are wrapped with a Rails Executor #797

Merged
merged 1 commit into from
Jan 31, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 27 additions & 23 deletions lib/good_job/notifier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -168,35 +168,37 @@ def listen(delay: 0)
future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening|
with_connection do
begin
run_callbacks :listen do
ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
connection.execute("LISTEN #{CHANNEL}")
Rails.application.executor.wrap do
run_callbacks :listen do
ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
connection.execute("LISTEN #{CHANNEL}")
end
thr_listening.make_true
end
thr_listening.make_true
end

ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
while thr_executor.running?
wait_for_notify do |channel, payload|
next unless channel == CHANNEL

ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
parsed_payload = JSON.parse(payload, symbolize_names: true)
thr_recipients.each do |recipient|
target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
target.send(method_name, parsed_payload)
end
end
while thr_executor.running?
wait_for_notify do |channel, payload|
next unless channel == CHANNEL

reset_connection_errors
ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
parsed_payload = JSON.parse(payload, symbolize_names: true)
thr_recipients.each do |recipient|
target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
target.send(method_name, parsed_payload)
end
end

reset_connection_errors
end
end
ensure
run_callbacks :unlisten do
thr_listening.make_false
ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
connection.execute("UNLISTEN *")
Rails.application.executor.wrap do
run_callbacks :unlisten do
thr_listening.make_false
ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
connection.execute("UNLISTEN *")
end
end
end
end
Expand All @@ -207,8 +209,10 @@ def listen(delay: 0)
end

def with_connection
self.connection = Execution.connection_pool.checkout.tap do |conn|
Execution.connection_pool.remove(conn)
Rails.application.executor.wrap do
self.connection = Execution.connection_pool.checkout.tap do |conn|
Execution.connection_pool.remove(conn)
end
end
connection.execute("SET application_name = #{connection.quote(self.class.name)}")

Expand Down