-
Notifications
You must be signed in to change notification settings - Fork 152
/
queue_classic.rb
117 lines (99 loc) · 3.12 KB
/
queue_classic.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# frozen_string_literal: true
require_relative 'queue_classic/config'
# QC
module QC
extend QC::Config
# Assign constants for backwards compatibility.
# They should no longer be used. Prefer the corresponding methods.
# See +QC::Config+ for more details.
DEPRECATED_CONSTANTS = {
APP_NAME: :app_name,
WAIT_TIME: :wait_time,
TABLE_NAME: :table_name,
QUEUE: :queue,
QUEUES: :queues,
TOP_BOUND: :top_bound,
FORK_WORKER: :fork_worker?
}.freeze
def self.const_missing(const_name)
if DEPRECATED_CONSTANTS.key? const_name
config_method = DEPRECATED_CONSTANTS[const_name]
warn <<~MSG
The constant QC::#{const_name} is deprecated and will be removed in the future.
Please use the method QC.#{config_method} instead.
MSG
QC.public_send config_method
else
super
end
end
# Defer method calls on the QC module to the
# default queue. This facilitates QC.enqueue()
def self.method_missing(sym, ...)
if default_queue.respond_to? sym
default_queue.public_send(sym, ...)
else
super
end
end
# Ensure QC.respond_to?(:enqueue) equals true (ruby 1.9 only)
def self.respond_to_missing?(method_name, _include_private = false)
default_queue.respond_to?(method_name)
end
def self.has_connection? # rubocop:disable Naming/PredicateName
!default_conn_adapter.nil?
end
def self.default_conn_adapter
Thread.current[:qc_conn_adapter] ||= ConnAdapter.new(active_record_connection_share: rails_connection_sharing_enabled?)
end
def self.default_conn_adapter=(conn)
Thread.current[:qc_conn_adapter] = conn
end
def self.log_yield(data)
t0 = Time.now
begin
yield
rescue StandardError => e
log({ at: 'error', error: e.inspect }.merge(data))
raise
ensure
t = Integer((Time.now - t0) * 1000)
log(data.merge(elapsed: t)) unless e
end
end
def self.log(data)
result = nil
data = { lib: 'queue-classic' }.merge(data)
if block_given?
result = yield
data.merge(elapsed: Integer((Time.now - t0) * 1000))
end
data.reduce(out = String.new) do |s, tup|
s << [tup.first, tup.last].join('=') << ' '
end
puts(out) if ENV['DEBUG']
result
end
def self.measure(data)
return unless ENV['QC_MEASURE']
$stdout.puts("measure#qc.#{data}")
end
# This will unlock all jobs any postgres' PID that is not existing anymore
# to prevent any infinitely locked jobs
def self.unlock_jobs_of_dead_workers
default_conn_adapter.execute("UPDATE #{QC.table_name} SET locked_at = NULL, locked_by = NULL WHERE locked_by NOT IN (SELECT pid FROM pg_stat_activity);")
end
# private class methods
class << self
private
def rails_connection_sharing_enabled?
enabled = ENV.fetch('QC_RAILS_DATABASE', 'true') != 'false'
return false unless enabled
Object.const_defined?('ActiveRecord') && ActiveRecord::Base.respond_to?('connection')
end
end
end
require_relative 'queue_classic/queue'
require_relative 'queue_classic/worker'
require_relative 'queue_classic/setup'
require_relative 'queue_classic/railtie' if defined?(Rails)