Skip to content

Commit

Permalink
Prepare for Sidekiq v7 (#707)
Browse files Browse the repository at this point in the history
  • Loading branch information
mhenrixon authored Apr 23, 2022
1 parent 23e7260 commit aa9fb43
Show file tree
Hide file tree
Showing 36 changed files with 306 additions and 283 deletions.
4 changes: 3 additions & 1 deletion .reek.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ detectors:
- SidekiqUniqueJobs::Orphans::RubyReaper#active?
- SidekiqUniqueJobs::Redis::Entity#exist?
- SidekiqUniqueJobs::Server#self.configure
- SidekiqUniqueJobs::LockArgs#default_job_options
- SidekiqUniqueJobs::SidekiqWorkerMethods#after_unlock_hook
- SidekiqUniqueJobs::SidekiqWorkerMethods#worker_method_defined?
- SidekiqUniqueJobs::SidekiqWorkerMethods#job_method_defined?
- SidekiqUniqueJobs::SidekiqWorkerMethods#default_job_options
- SidekiqUniqueJobs::Web::Helpers#redirect_to
MissingSafeMethod:
exclude:
Expand Down
6 changes: 3 additions & 3 deletions lib/sidekiq_unique_jobs/exceptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ def initialize(lock_config)
class InvalidUniqueArguments < UniqueJobsError
def initialize(options)
given = options[:given]
worker_class = options[:worker_class]
job_class = options[:job_class]
lock_args_method = options[:lock_args_method]
lock_args_meth = worker_class.method(lock_args_method)
lock_args_meth = job_class.method(lock_args_method)
num_args = lock_args_meth.arity
source_location = lock_args_meth.source_location

super(
"#{worker_class}##{lock_args_method} takes #{num_args} arguments, received #{given.inspect}" \
"#{job_class}##{lock_args_method} takes #{num_args} arguments, received #{given.inspect}" \
"\n\n" \
" #{source_location.join(':')}"
)
Expand Down
32 changes: 18 additions & 14 deletions lib/sidekiq_unique_jobs/lock_args.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ def self.call(item)

# @param [Hash] item a Sidekiq job hash
def initialize(item)
@item = item
@worker_class = item[CLASS]
@args = item[ARGS]
@item = item
@args = item[ARGS]
self.job_class = item[CLASS]
end

# The unique arguments to use for creating a lock
Expand Down Expand Up @@ -83,31 +83,31 @@ def filter_by_proc(args)

# Filters unique arguments by method configured in the sidekiq worker
# @param [Array] args the arguments passed to the sidekiq worker
# @return [Array] unfiltered unless {#worker_method_defined?}
# @return [Array] unfiltered unless {#job_method_defined?}
# @return [Array] with the filtered arguments
def filter_by_symbol(args)
return args unless worker_method_defined?(lock_args_method)
return args unless job_method_defined?(lock_args_method)

worker_class.send(lock_args_method, args)
job_class.send(lock_args_method, args)
rescue ArgumentError
raise SidekiqUniqueJobs::InvalidUniqueArguments,
given: args,
worker_class: worker_class,
job_class: job_class,
lock_args_method: lock_args_method
end

# The method to use for filtering unique arguments
def lock_args_method
@lock_args_method ||= worker_options.slice(LOCK_ARGS_METHOD, UNIQUE_ARGS_METHOD).values.first
@lock_args_method ||= :lock_args if worker_method_defined?(:lock_args)
@lock_args_method ||= :unique_args if worker_method_defined?(:unique_args)
@lock_args_method ||= job_options.slice(LOCK_ARGS_METHOD, UNIQUE_ARGS_METHOD).values.first
@lock_args_method ||= :lock_args if job_method_defined?(:lock_args)
@lock_args_method ||= :unique_args if job_method_defined?(:unique_args)
@lock_args_method ||= default_lock_args_method
end

# The global worker options defined in Sidekiq directly
def default_lock_args_method
default_worker_options[LOCK_ARGS_METHOD] ||
default_worker_options[UNIQUE_ARGS_METHOD]
default_job_options[LOCK_ARGS_METHOD] ||
default_job_options[UNIQUE_ARGS_METHOD]
end

#
Expand All @@ -116,8 +116,12 @@ def default_lock_args_method
#
# @return [Hash<String, Object>]
#
def default_worker_options
@default_worker_options ||= Sidekiq.default_worker_options.stringify_keys
def default_job_options
@default_job_options ||= if Sidekiq.respond_to?(:default_job_options)
Sidekiq.default_job_options.stringify_keys
else
Sidekiq.default_worker_options.stringify_keys
end
end
end
end
8 changes: 4 additions & 4 deletions lib/sidekiq_unique_jobs/lock_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ class LockConfig
# @return [Symbol] the type of lock
attr_reader :type
#
# @!attribute [r] worker
# @return [Symbol] the worker class
attr_reader :worker
# @!attribute [r] job
# @return [Symbol] the job class
attr_reader :job
#
# @!attribute [r] limit
# @return [Integer] the number of simultaneous locks
Expand Down Expand Up @@ -58,7 +58,7 @@ def self.from_worker(options)

def initialize(job_hash = {})
@type = job_hash[LOCK]&.to_sym
@worker = SidekiqUniqueJobs.safe_constantize(job_hash[CLASS])
@job = SidekiqUniqueJobs.safe_constantize(job_hash[CLASS])
@limit = job_hash.fetch(LOCK_LIMIT, 1)&.to_i
@timeout = job_hash.fetch(LOCK_TIMEOUT, 0)&.to_i
@ttl = job_hash.fetch(LOCK_TTL) { job_hash.fetch(LOCK_EXPIRATION, nil) }.to_i
Expand Down
12 changes: 6 additions & 6 deletions lib/sidekiq_unique_jobs/lock_digest.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ def self.call(item)

# @param [Hash] item a Sidekiq job hash
def initialize(item)
@item = item
@worker_class = item[CLASS]
@lock_args = item[LOCK_ARGS] || item[UNIQUE_ARGS] # TODO: Deprecate UNIQUE_ARGS
@lock_prefix = item[LOCK_PREFIX] || item[UNIQUE_PREFIX] # TODO: Deprecate UNIQUE_PREFIX
@item = item
@lock_args = item[LOCK_ARGS] || item[UNIQUE_ARGS] # TODO: Deprecate UNIQUE_ARGS
@lock_prefix = item[LOCK_PREFIX] || item[UNIQUE_PREFIX] # TODO: Deprecate UNIQUE_PREFIX
self.job_class = item[CLASS]
end

# Memoized lock_digest
Expand Down Expand Up @@ -67,13 +67,13 @@ def digestable_hash
# Checks if we should disregard the queue when creating the unique digest
# @return [true, false]
def unique_across_queues?
item[UNIQUE_ACROSS_QUEUES] || worker_options[UNIQUE_ACROSS_QUEUES]
item[UNIQUE_ACROSS_QUEUES] || job_options[UNIQUE_ACROSS_QUEUES]
end

# Checks if we should disregard the worker when creating the unique digest
# @return [true, false]
def unique_across_workers?
item[UNIQUE_ACROSS_WORKERS] || worker_options[UNIQUE_ACROSS_WORKERS]
item[UNIQUE_ACROSS_WORKERS] || job_options[UNIQUE_ACROSS_WORKERS]
end
end
end
8 changes: 4 additions & 4 deletions lib/sidekiq_unique_jobs/lock_timeout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ def self.calculate(item)
# @option item [String] :class the class of the sidekiq worker
# @option item [Float] :at the unix time the job is scheduled at
def initialize(item)
@item = item
@worker_class = item[CLASS]
@item = item
self.job_class = item[CLASS]
end

#
Expand All @@ -42,9 +42,9 @@ def initialize(item)
# @return [Integer, nil]
#
def calculate
timeout = default_worker_options[LOCK_TIMEOUT]
timeout = default_job_options[LOCK_TIMEOUT]
timeout = default_lock_timeout if default_lock_timeout
timeout = worker_options[LOCK_TIMEOUT] if worker_options.key?(LOCK_TIMEOUT)
timeout = job_options[LOCK_TIMEOUT] if job_options.key?(LOCK_TIMEOUT)
timeout
end

Expand Down
8 changes: 4 additions & 4 deletions lib/sidekiq_unique_jobs/lock_ttl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def self.calculate(item)
# @option item [String] :class the class of the sidekiq worker
# @option item [Float] :at the unix time the job is scheduled at
def initialize(item)
@item = item
@worker_class = item[CLASS]
@item = item
self.job_class = item[CLASS]
end

#
Expand Down Expand Up @@ -67,9 +67,9 @@ def scheduled_at
#
def calculate
ttl = item[LOCK_TTL]
ttl ||= worker_options[LOCK_TTL]
ttl ||= job_options[LOCK_TTL]
ttl ||= item[LOCK_EXPIRATION] # TODO: Deprecate at some point
ttl ||= worker_options[LOCK_EXPIRATION] # TODO: Deprecate at some point
ttl ||= job_options[LOCK_EXPIRATION] # TODO: Deprecate at some point
ttl ||= SidekiqUniqueJobs.config.lock_ttl
ttl && (ttl.to_i + time_until_scheduled)
end
Expand Down
8 changes: 4 additions & 4 deletions lib/sidekiq_unique_jobs/middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ module Middleware
# @yieldparam [<type>] if <description>
# @yieldreturn [<type>] <describe what yield should return>
def call(worker_class, item, queue, redis_pool = nil)
@worker_class = worker_class
@item = item
@queue = queue
@redis_pool = redis_pool
@item = item
@queue = queue
@redis_pool = redis_pool
self.job_class = item[CLASS]
return yield if unique_disabled?

SidekiqUniqueJobs::Job.prepare(item) unless item[LOCK_DIGEST]
Expand Down
6 changes: 3 additions & 3 deletions lib/sidekiq_unique_jobs/on_conflict/reschedule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ class Reschedule < OnConflict::Strategy
# @param [Hash] item sidekiq job hash
def initialize(item, redis_pool = nil)
super(item, redis_pool)
@worker_class = item[CLASS]
self.job_class = item[CLASS]
end

# Create a new job from the current one.
# This will mess up sidekiq stats because a new job is created
def call
if sidekiq_worker_class?
if worker_class.set(queue: item["queue"].to_sym).perform_in(5, *item[ARGS])
if sidekiq_job_class?
if job_class.set(queue: item["queue"].to_sym).perform_in(5, *item[ARGS])
reflect(:rescheduled, item)
else
reflect(:reschedule_failed, item)
Expand Down
6 changes: 3 additions & 3 deletions lib/sidekiq_unique_jobs/options_with_fallback.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module SidekiqUniqueJobs
# Requires the following methods to be defined in the including class
# 1. item (required)
# 2. options (can be nil)
# 3. worker_class (required, can be anything)
# 3. job_class (required, can be anything)
# @author Mikael Henriksson <[email protected]>
module OptionsWithFallback
def self.included(base)
Expand Down Expand Up @@ -69,8 +69,8 @@ def lock_type
#
def options
@options ||= begin
opts = default_worker_options.dup
opts.merge!(worker_options) if sidekiq_worker_class?
opts = default_job_options.dup
opts.merge!(job_options) if sidekiq_job_class?
(opts || {}).stringify_keys
end
end
Expand Down
57 changes: 33 additions & 24 deletions lib/sidekiq_unique_jobs/sidekiq_worker_methods.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,55 @@ module SidekiqUniqueJobs
#
# @author Mikael Henriksson <[email protected]>
module SidekiqWorkerMethods
#
# @!attribute [r] job_class
# @return [Sidekiq::Worker] The Sidekiq::Worker implementation
attr_reader :job_class

# Avoids duplicating worker_class.respond_to? in multiple places
# @return [true, false]
def worker_method_defined?(method_sym)
worker_class.respond_to?(method_sym)
def job_method_defined?(method_sym)
job_class.respond_to?(method_sym)
end

# Wraps #get_sidekiq_options to always work with a hash
# @return [Hash] of the worker class sidekiq options
def worker_options
return {} unless sidekiq_worker_class?
def job_options
return {} unless sidekiq_job_class?

worker_class.get_sidekiq_options.deep_stringify_keys
job_class.get_sidekiq_options.deep_stringify_keys
end

# Tests that the
# @return [true] if worker_class responds to get_sidekiq_options
# @return [false] if worker_class does not respond to get_sidekiq_options
def sidekiq_worker_class?
worker_method_defined?(:get_sidekiq_options)
# @return [true] if job_class responds to get_sidekiq_options
# @return [false] if job_class does not respond to get_sidekiq_options
def sidekiq_job_class?
job_method_defined?(:get_sidekiq_options)
end

# The Sidekiq::Worker implementation
# @return [Sidekiq::Worker]
def worker_class
@_worker_class ||= worker_class_constantize # rubocop:disable Naming/MemoizedInstanceVariableName
def job_class=(obj)
# this is what was originally passed in, it can be an instance or a class depending on sidekiq version
@original_job_class = obj
@job_class = job_class_constantize(obj)
end

# The hook to call after a successful unlock
# @return [Proc]
def after_unlock_hook # rubocop:disable Metrics/MethodLength
lambda do
if @worker_class.respond_to?(:after_unlock)
if @original_job_class.respond_to?(:after_unlock)
# instance method in sidekiq v6
if @worker_class.method(:after_unlock).arity.positive? # arity check to maintain backwards compatibility
@worker_class.after_unlock(item)
if @original_job_class.method(:after_unlock).arity.positive? # arity check to maintain backwards compatibility
@original_job_class.after_unlock(item)
else
@worker_class.after_unlock
@original_job_class.after_unlock
end
elsif worker_class.respond_to?(:after_unlock)
elsif job_class.respond_to?(:after_unlock)
# class method regardless of sidekiq version
if worker_class.method(:after_unlock).arity.positive? # arity check to maintain backwards compatibility
worker_class.after_unlock(item)
if job_class.method(:after_unlock).arity.positive? # arity check to maintain backwards compatibility
job_class.after_unlock(item)
else
worker_class.after_unlock
job_class.after_unlock
end
end
end
Expand All @@ -58,7 +63,7 @@ def after_unlock_hook # rubocop:disable Metrics/MethodLength
# failing back to the original argument when the constant can't be found
#
# @return [Sidekiq::Worker]
def worker_class_constantize(klazz = @worker_class)
def job_class_constantize(klazz = @job_class)
SidekiqUniqueJobs.safe_constantize(klazz)
end

Expand All @@ -68,8 +73,12 @@ def worker_class_constantize(klazz = @worker_class)
#
# @return [Hash<Symbol, Object>]
#
def default_worker_options
Sidekiq.default_worker_options
def default_job_options
if Sidekiq.respond_to?(:default_job_options)
Sidekiq.default_job_options
else
Sidekiq.default_worker_options
end
end
end
end
12 changes: 6 additions & 6 deletions spec/performance/lock_digest_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

RSpec.describe SidekiqUniqueJobs::LockDigest, perf: true do
let(:lock_digest) { described_class.new(item) }
let(:worker_class) { UntilExecutedJob }
let(:job_class) { UntilExecutedJob }
let(:class_name) { worker_class.to_s }
let(:queue) { "myqueue" }
let(:args) { [[1, 2]] }
Expand All @@ -23,7 +23,7 @@

context "when args are empty" do
let(:another_lock_digest) { described_class.new(item) }
let(:worker_class) { WithoutArgumentJob }
let(:job_class) { WithoutArgumentJob }
let(:args) { [] }

it "performs in under 0.1 ms" do
Expand All @@ -32,17 +32,17 @@
end

context "when unique_args is a proc" do
let(:worker_class) { MyUniqueJobWithFilterProc }
let(:args) { [1, 2, { "type" => "it" }] }
let(:job_class) { MyUniqueJobWithFilterProc }
let(:args) { [1, 2, { "type" => "it" }] }

it "performs in under 0.1 ms" do
expect { lock_digest }.to perform_under(0.1).ms
end
end

context "when unique_args is a symbol" do
let(:worker_class) { MyUniqueJobWithFilterMethod }
let(:args) { [1, 2, { "type" => "it" }] }
let(:job_class) { MyUniqueJobWithFilterMethod }
let(:args) { [1, 2, { "type" => "it" }] }

it "performs in under 0.1 ms" do
expect { lock_digest }.to perform_under(0.1).ms
Expand Down
Loading

0 comments on commit aa9fb43

Please sign in to comment.