Skip to content

Commit

Permalink
Release lock for Sidekiq adapter
Browse files Browse the repository at this point in the history
When:
- all Sidekiq attempts were unsuccessful
- job is deleted manually from Sidekiq::Web
  • Loading branch information
vbyno committed Jul 27, 2020
1 parent 6e6f61e commit 703c136
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 0 deletions.
4 changes: 4 additions & 0 deletions lib/active_job/uniqueness/patch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ def lock_key_generator

ActiveSupport.on_load(:active_job) do
ActiveJob::Base.include ActiveJob::Uniqueness::Patch

if ActiveJob::Railtie.config.active_job.queue_adapter.to_sym == :sidekiq
require_relative 'patch/sidekiq'
end
end
end
end
93 changes: 93 additions & 0 deletions lib/active_job/uniqueness/patch/sidekiq.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# frozen_string_literal: true

module ActiveJob
module Uniqueness
module Patch
def self.delete_sidekiq_job!(job)
ActiveJob::Uniqueness.unlock!(
job_class_name: job.fetch('wrapped'),
arguments: job.fetch('args').first.fetch('arguments')
)
end
end
end
end

require 'sidekiq/api'

Sidekiq.configure_server do |config|
config.death_handlers << ->(job, _ex) do
ActiveJob::Uniqueness::Patch.delete_sidekiq_job!(job)
end
end

module Sidekiq
class SortedEntry
module UniqueExtension
def delete
ActiveJob::Uniqueness::Patch.delete_sidekiq_job!(item) if super
item
end

private

def remove_job
super do |message|
ActiveJob::Uniqueness::Patch.delete_sidekiq_job!(Sidekiq.load_json(message))
yield message
end
end
end

prepend UniqueExtension
end

class ScheduledSet
module UniqueExtension
def delete(score, job_id)
entry = find_job(job_id)
ActiveJob::Uniqueness::Patch.delete_sidekiq_job!(entry.item) if super(score, job_id)
entry
end
end

prepend UniqueExtension
end

class Job
module UniqueExtension
def delete
ActiveJob::Uniqueness::Patch.delete_sidekiq_job!(item)
super
end
end

prepend UniqueExtension
end

class Queue
module UniqueExtension
def clear
each(&:delete)
super
end
end

prepend UniqueExtension
end

class JobSet
module UniqueExtension
def clear
each(&:delete)
super
end

def delete_by_value(name, value)
ActiveJob::Uniqueness::Patch.delete_sidekiq_job!(Sidekiq.load_json(value)) if super(name, value)
end
end

prepend UniqueExtension
end
end

0 comments on commit 703c136

Please sign in to comment.