From 491f54a2cf11d86cad144dad78fddcf39c8642f0 Mon Sep 17 00:00:00 2001 From: benk-gc Date: Fri, 5 Jul 2024 13:07:47 +0100 Subject: [PATCH] Instrument adapter queries for external programs. When executing queries via the adapter, we may not recieve the instrumentation we expect. The ActiveRecord adapter uses a raw connection which bypasses the ActiveRecord instrumentation layer and instead talks directly to PG, making it impossible to know what queries are being executed. This change cribs heavily from the implementation of the `log` method on the `AbstractAdapter` class in ActiveRecord to make it easy to add instrumentation to an adapter. By default the instrumenter is set to `nil`, and instrumentation will be a no-op; otherwise the query will be instrumented via the `instrument` method and passed the relevant payload to handle. The structure of this is deliberately similar to that of the event emitted by ActiveRecord's `sql.active_record` notification. --- lib/que/adapters/active_record.rb | 5 +++++ lib/que/adapters/base.rb | 33 +++++++++++++++++++++++++++++-- spec/lib/que/job_spec.rb | 33 +++++++++++++++++++++++++++++-- 3 files changed, 67 insertions(+), 4 deletions(-) diff --git a/lib/que/adapters/active_record.rb b/lib/que/adapters/active_record.rb index 1a0edfd..6a4b02f 100644 --- a/lib/que/adapters/active_record.rb +++ b/lib/que/adapters/active_record.rb @@ -13,6 +13,11 @@ class ActiveRecord < Base ::PG::UnableToSend, ].freeze + def initialize(_thing = nil) + super + @instrumenter = ActiveSupport::Notifications.instrumenter + end + def checkout checkout_activerecord_adapter { |conn| yield conn.raw_connection } rescue *AR_UNAVAILABLE_CONNECTION_ERRORS => e diff --git a/lib/que/adapters/base.rb b/lib/que/adapters/base.rb index edf0b92..b1eb31b 100644 --- a/lib/que/adapters/base.rb +++ b/lib/que/adapters/base.rb @@ -13,8 +13,11 @@ module Adapters class UnavailableConnection < StandardError; end class Base + attr_reader :instrumenter + def initialize(_thing = nil) @prepared_statements = {} + @instrumenter = nil end # The only method that adapters really need to implement. Should lock a @@ -62,7 +65,14 @@ def in_transaction? def execute_sql(sql, params) args = params.empty? ? [sql] : [sql, params] - checkout { |conn| conn.async_exec(*args) } + + checkout do |conn| + log(sql, params, conn, async: true) do |notification_payload| + conn.async_exec(*args).tap do |result| + notification_payload[:row_count] = result.count + end + end + end end def execute_prepared(name, params) @@ -81,7 +91,11 @@ def execute_prepared(name, params) prepared_just_now = statements[name] = true end - conn.exec_prepared("que_#{name}", params) + log(SQL[name], params, conn, async: false) do |notification_payload| + conn.exec_prepared("que_#{name}", params).tap do |result| + notification_payload[:row_count] = result.count + end + end rescue ::PG::InvalidSqlStatementName => error # Reconnections on ActiveRecord can cause the same connection # objects to refer to new backends, so recover as well as we can. @@ -97,6 +111,21 @@ def execute_prepared(name, params) end end + def log(sql, binds, conn, statement_name: nil, async: false, &block) + return yield({}) if instrumenter.nil? + + instrumenter.instrument( + "que.execute", + sql: sql, + binds: binds, + type_casted_binds: [], + async: async, + statement_name: statement_name, + connection: conn, + &block + ) + end + CAST_PROCS = { # booleans 16 => ->(value) { diff --git a/spec/lib/que/job_spec.rb b/spec/lib/que/job_spec.rb index 09a7112..0b5f989 100644 --- a/spec/lib/que/job_spec.rb +++ b/spec/lib/que/job_spec.rb @@ -5,6 +5,11 @@ RSpec.describe Que::Job do describe ".enqueue" do let(:run_at) { postgres_now } + let(:job_class) do + Class.new(described_class).tap do |klass| + stub_const("FooBarJob", klass) + end + end it "adds job to que_jobs table" do expect { described_class.enqueue(:hello, run_at: run_at) }. @@ -41,7 +46,7 @@ que_job_id: an_instance_of(Integer), queue: "default", priority: 100, - job_class: a_string_including("Class"), + job_class: "FooBarJob", retryable: true, run_at: run_at, args: [500, "gbp", "testing"], @@ -49,7 +54,6 @@ custom_log_2: "test-log", ) - job_class = Class.new(described_class) job_class.custom_log_context ->(attrs) { { custom_log_1: attrs[:args][0], @@ -59,6 +63,31 @@ job_class.enqueue(500, :gbp, :testing, run_at: run_at) end + it "instruments queries using ActiveSupport::Notifications" do + events = [] + + ActiveSupport::Notifications.subscribe("que.execute") do |event| + events << event + end + + job_class.enqueue("foobar") + + expect(events).to include( + having_attributes( + name: "que.execute", + payload: hash_including( + sql: /INSERT INTO que_jobs/, + binds: [nil, nil, nil, "FooBarJob", true, "[\"foobar\"]"], + type_casted_binds: [], + async: false, + statement_name: nil, + connection: instance_of(PG::Connection), + row_count: 1, + ), + ), + ) + end + context "with a custom adapter specified" do let(:custom_adapter) { Que.adapter.dup } let(:job_with_adapter) { Class.new(described_class) }