diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 000000000..65d44abc7 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,14 @@ +# This file is for unifying the coding style for different editors and IDEs +# editorconfig.org + +root = true + +[*] +charset = utf-8 +trim_trailing_whitespace = true +insert_final_newline = true +indent_style = space +indent_size = 2 + +[*.md] +trim_trailing_whitespace = true diff --git a/.gitignore b/.gitignore index 23a977be6..9c84de440 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ Gemfile.lock gemfiles/*.lock *.sw? coverage/ +tmp/ diff --git a/.simplecov b/.simplecov new file mode 100644 index 000000000..dee8fdf40 --- /dev/null +++ b/.simplecov @@ -0,0 +1,12 @@ +require 'simplecov-json' + +SimpleCov.refuse_coverage_drop +SimpleCov.formatters = [ + SimpleCov::Formatter::HTMLFormatter, + SimpleCov::Formatter::JSONFormatter +] +SimpleCov.start do + add_filter '/spec/' + add_filter '/bin/' + add_filter '/gemfiles/' +end diff --git a/CHANGELOG.md b/CHANGELOG.md index 1aa3ff76e..631425d57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ ## v3.0.16 (Unreleased) +- Improved uniqueness handling (complete refactoring, upgrade with causion) +- 100% breaking changes ## v3.0.15 - Jobs only ever unlock themselves now (see #96 & #94 for info) thanks @pik diff --git a/Gemfile b/Gemfile index 87cfb6874..b3fe5c6b4 100644 --- a/Gemfile +++ b/Gemfile @@ -3,10 +3,13 @@ gemspec gem 'appraisal', '~> 2.0.0' +gem 'rspec-its', require: false + platform :mri do - gem 'pry-suite', require: false - gem 'let_it_go', require: false - gem 'memory-profiler', require: false - gem 'simplecov-json', require: false + gem 'pry', require: false + gem 'pry-rescue', require: false + gem 'pry-byebug', require: false + gem 'simplecov-json', require: false + gem 'memory_profiler', require: false gem 'codeclimate-test-reporter', require: false end diff --git a/README.md b/README.md index 32509a2fc..525b7947a 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,13 @@ The missing unique jobs for sidekiq See https://github.com/mperham/sidekiq#requirements for what is required. Starting from 3.0.13 only sidekiq 3 is supported and support for MRI 1.9 is dropped (it might work but won't be worked on) +Version 4 requires redis 2.6.2!! Don't upgrade to version 4 unless you are on redis 2.6.2. + +## Upgrade instructions + +Easy path - Drop all your unique jobs before upgrading the gem! +Hard path - See above... Start with a clean slate :) + ## Installation Add this line to your application's Gemfile: @@ -20,6 +27,34 @@ Or install it yourself as: $ gem install sidekiq-unique-jobs +## A word on locking + +Like @mperham mentions on (this wiki page)[https://github.com/mperham/sidekiq/wiki/Related-Projects#unique-jobs] it is hard to enforce uniqueness with redis in a distributed redis setting. + +To make things worse there are many ways of wanting to enforce uniqueness. + +### While Executing + +Is to make sure that a job can be scheduled any number of times but only executed a single time per argument provided to the job we call this runtime uniqueness. This is probably most useful forbackground jobs that are fast to execute. (See mhenrixon/sidekiq-unique-jobs#111 for a great example of when this would be right.) While the job is executing/performing no other jobs can be executed at the same time. + +### Until Executing + +This means that a job can only be scheduled into redis once per whatever the configuration of unique arguments. Any jobs added until the first one of the same arguments has been unlocked will just be dropped. This is what was tripping many people up. They would schedule a job to run in the future and it would be impossible to schedule new jobs with those same arguments even immediately. There was some forth and back between also locking jobs on the scheduled queue and the regular queues but in the end I decided it was best to separate these two features out into different locking mechanisms. I think what most people are after is to be able to lock a job while executing or that seems to be what people are most missing at the moment. + +### Until Executed + +This is the combination of the two above. First we lock the job until it executes, then as the job begins executes we keep the lock so that no other jobs with the same arguments can execute at the same time. + +### Until Timeout + +The job won't be unlocked until the timeout/expiry runs out. + +### Uniqueness Scope + +- Queue specific locks +- Across all queues. +- Timed / Scheduled jobs + ## Usage All that is required is that you specifically set the sidekiq option for *unique* to true like below: @@ -33,12 +68,12 @@ should be unique. The job will be unique for the number of seconds configured (d or until the job has been completed. Thus, the job will be unique for the shorter of the two. Note that Sidekiq versions before 3.0 will remove job keys after an hour, which means jobs can remain unique for at most an hour. *If you want the unique job to stick around even after it has been successfully -processed then just set the unique_unlock_order to anything except `:before_yield` or `:after_yield` (`unique_unlock_order = :never`) +processed then just set the unique_lock to anything except `:before_yield` or `:after_yield` (`unique_lock = :until_timeout`) You can also control the expiration length of the uniqueness check. If you want to enforce uniqueness over a longer period than the default of 30 minutes then you can pass the number of seconds you want to use to the sidekiq options: ```ruby -sidekiq_options unique: true, unique_job_expiration: 120 * 60 # 2 hours +sidekiq_options unique: true, unique_expiration: 120 * 60 # 2 hours ``` Requiring the gem in your gemfile should be sufficient to enable unique jobs. @@ -96,13 +131,13 @@ Note that objects passed into workers are converted to JSON *after* running thro ### Unlock Ordering -By default the server middleware will release the worker lock after yielding to the next middleware or worker. Alternatively, this can be changed by passing the `unique_unlock_order` option: +By default the server middleware will release the worker lock after yielding to the next middleware or worker. Alternatively, this can be changed by passing the `unique_lock` option: ```ruby class UniqueJobWithFilterMethod include Sidekiq::Worker sidekiq_options unique: true, - unique_unlock_order: :before_yield + unique_locks: :until_executing ... diff --git a/Rakefile b/Rakefile index 113a2b8ee..90ec4f2ca 100644 --- a/Rakefile +++ b/Rakefile @@ -1,4 +1,5 @@ #!/usr/bin/env rake + require 'rubygems' require 'bundler/setup' require 'bundler/gem_tasks' @@ -8,4 +9,4 @@ require 'rubocop/rake_task' RuboCop::RakeTask.new(:style) RSpec::Core::RakeTask.new(:spec) -task default: [:spec, :style] +task default: [:style, :spec] diff --git a/lib/sidekiq-unique-jobs.rb b/lib/sidekiq-unique-jobs.rb index 3ab5c5aa2..6a1ab2bf1 100644 --- a/lib/sidekiq-unique-jobs.rb +++ b/lib/sidekiq-unique-jobs.rb @@ -1,4 +1,10 @@ require 'yaml' if RUBY_VERSION.include?('2.0.0') # rubocop:disable FileName +require 'sidekiq_unique_jobs/core_ext' +require 'sidekiq_unique_jobs/options_with_fallback' +require 'sidekiq_unique_jobs/scripts' +require 'sidekiq_unique_jobs/unique_args' +require 'sidekiq_unique_jobs/unlockable' +require 'sidekiq_unique_jobs/lock' require 'sidekiq_unique_jobs/middleware' require 'sidekiq_unique_jobs/version' require 'sidekiq_unique_jobs/config' @@ -11,11 +17,10 @@ module SidekiqUniqueJobs def config @config ||= Config.new( - unique_prefix: 'sidekiq_unique', - unique_args_enabled: false, + unique_prefix: 'uniquejobs', + unique_args_enabled: true, default_expiration: 30 * 60, - default_unlock_order: :after_yield, - unique_storage_method: :new, + default_lock: :while_executing, redis_test_mode: :redis # :mock ) end @@ -24,8 +29,22 @@ def unique_args_enabled? config.unique_args_enabled end - def configure - yield config + def default_lock + config.default_lock + end + + def configure(options = {}) + if block_given? + yield config + else + options.each do |key, val| + config[key] = val + end + end + end + + def namespace + @namespace ||= Sidekiq.redis { |c| c.respond_to?(:namespace) ? c.namespace : nil } end # Attempt to constantize a string worker_class argument, always @@ -37,55 +56,20 @@ def worker_class_constantize(worker_class) worker_class end - def get_payload(klass, queue, *args) - unique_on_all_queues = false - if config.unique_args_enabled - worker_class = worker_class_constantize(klass) - args = yield_unique_args(worker_class, *args) - unique_on_all_queues = - worker_class.get_sidekiq_options['unique_on_all_queues'] - end - md5_arguments = { class: klass, args: args } - md5_arguments[:queue] = queue unless unique_on_all_queues - "#{config.unique_prefix}:" \ - "#{Digest::MD5.hexdigest(Sidekiq.dump_json(md5_arguments))}" - end - - def payload_hash(item) - get_payload(item['class'], item['queue'], item['args']) - end - - def yield_unique_args(worker_class, args) - unique_args = worker_class.get_sidekiq_options['unique_args'] - filtered_args(worker_class, unique_args, args) - rescue NameError - # fallback to not filtering args when class can't be instantiated - args + def redis_version + @redis_version ||= Sidekiq.redis { |c| c.info('server')['redis_version'] } end def connection(redis_pool = nil, &block) - return mock_redis if SidekiqUniqueJobs.config.mocking? + return mock_redis if config.mocking? redis_pool ? redis_pool.with(&block) : Sidekiq.redis(&block) end - def self.mock_redis - @redis_mock ||= MockRedis.new - end - - def filtered_args(worker_class, unique_args, args) - case unique_args - when Proc - unique_args.call(args) - when Symbol - if worker_class.respond_to?(unique_args) - worker_class.send(unique_args, *args) - end - else - args - end + def mock_redis + @redis_mock ||= MockRedis.new if defined?(MockRedis) end - def synchronize(key, redis, item = nil, &blk) - SidekiqUniqueJobs::RunLock.synchronize(key, redis, item, &blk) + def synchronize(item, redis_pool, &blk) + Lock::WhileExecuting.synchronize(item, redis_pool, &blk) end end diff --git a/lib/sidekiq_unique_jobs/client/middleware.rb b/lib/sidekiq_unique_jobs/client/middleware.rb index 72801bdd7..672e6c92c 100644 --- a/lib/sidekiq_unique_jobs/client/middleware.rb +++ b/lib/sidekiq_unique_jobs/client/middleware.rb @@ -3,110 +3,44 @@ module SidekiqUniqueJobs module Client class Middleware - SCHEDULED = 'scheduled'.freeze extend Forwardable - def_delegators :SidekiqUniqueJobs, :connection, :config, :payload_hash - def_delegators :config, :unique_storage_method + def_delegators :SidekiqUniqueJobs, :connection, :config def_delegators :Sidekiq, :logger + include OptionsWithFallback + def call(worker_class, item, queue, redis_pool = nil) @worker_class = SidekiqUniqueJobs.worker_class_constantize(worker_class) @item = item @queue = queue @redis_pool = redis_pool - - return yield unless unique_enabled? - item['unique_hash'] = payload_hash(item) - unless unique_for_connection? - logger.warn "payload is not unique #{item}" if log_duplicate_payload? - return - end - - yield + yield if ordinary_or_locked? end private attr_reader :item, :worker_class, :redis_pool, :queue - def unique_enabled? - worker_class.get_sidekiq_options['unique'] || item['unique'] - end - - def log_duplicate_payload? - worker_class.get_sidekiq_options['log_duplicate_payload'] || item['log_duplicate_payload'] - end - - def unique_for_connection? - send("#{unique_storage_method}_unique_for?") - end - - def old_unique_for? - item['at'] ? unique_schedule_old : unique_enqueue_old - end - - def unique_enqueue_old - connection do |conn| - conn.watch(payload_hash(item)) - lock_val = conn.get(payload_hash(item)) - if !lock_val || lock_val[0..8] == SCHEDULED - conn.multi do - conn.setex(payload_hash(item), expires_at, item['jid']) - end - else - conn.unwatch - false - end - end - end - - def unique_schedule_old - connection do |conn| - conn.watch(payload_hash(item)) - if expires_at < 0 || conn.get(payload_hash(item)) - conn.unwatch - false - else - conn.setex(payload_hash(item), expires_at, "scheduled-#{item['jid']}") - end - end - end - - def new_unique_for? - item['at'] ? unique_schedule : unique_enqueue - end - - def unique_schedule - connection do |conn| - conn.set(payload_hash(item), "scheduled-#{item['jid']}", nx: true, ex: expires_at) - end + def ordinary_or_locked? + unique_disabled? || unlockable? || aquire_lock end - def unique_enqueue - connection do |conn| - conn.eval(lock_queue_script, keys: [payload_hash(item)], argv: [expires_at, item['jid']]) - end + def unlockable? + !lockable? end - def expires_at - # if the job was previously scheduled and is now being queued, - # or we've never seen it before - ex = unique_job_expiration || config.default_expiration - ex = ((Time.at(item['at']) - Time.now.utc) + ex).to_i if item['at'] - ex + def lockable? + lock.respond_to?(:lock) end - def unique_job_expiration - worker_class.get_sidekiq_options['unique_job_expiration'] + def aquire_lock + locked = lock.lock(:client) + warn_about_duplicate(item) unless locked + locked end - def lock_queue_script - <<-LUA - local ret = redis.call('GET', KEYS[1]) - if not ret or string.sub(ret, 1, 9) == 'scheduled' then - return redis.call('SETEX', KEYS[1], ARGV[1], ARGV[2]) - end - LUA + def warn_about_duplicate(item) + logger.warn "payload is not unique #{item}" if log_duplicate_payload? end end end diff --git a/lib/sidekiq_unique_jobs/config.rb b/lib/sidekiq_unique_jobs/config.rb index 6b8e90254..b692ed16d 100644 --- a/lib/sidekiq_unique_jobs/config.rb +++ b/lib/sidekiq_unique_jobs/config.rb @@ -1,41 +1,12 @@ module SidekiqUniqueJobs class Config < OpenStruct + TESTING_CONSTANT ||= 'Testing'.freeze CONFIG_ACCESSORS = [ :unique_prefix, - :unique_args_enabled, :default_expiration, - :default_unlock_order, - :unique_storage_method, - :redis_mode, - :default_run_lock, - :default_run_lock_retry_interval, - :default_run_lock_retries, - :default_reschedule_on_lock_fail, - :default_run_lock_expire - ] - - class << self - CONFIG_ACCESSORS.each do |method| - define_method(method) do - warn("#{method} has been deprecated. See readme for information") - config.send(method) - end - - define_method("#{method}=") do |obj| - warn("#{method} has been deprecated. See readme for information") - config.send("#{method}=", obj) - end - end - - def unique_args_enabled? - warn('unique_args_enabled has been deprecated. See readme for information') - config.unique_args_enabled - end - - def config - SidekiqUniqueJobs.config - end - end + :default_lock, + :redis_mode + ].freeze def inline_testing_enabled? testing_enabled? && Sidekiq::Testing.inline? @@ -46,7 +17,7 @@ def mocking? end def testing_enabled? - Sidekiq.const_defined?('Testing') && Sidekiq::Testing.enabled? + Sidekiq.const_defined?(TESTING_CONSTANT) && Sidekiq::Testing.enabled? end def unique_args_enabled? diff --git a/lib/sidekiq_unique_jobs/core_ext.rb b/lib/sidekiq_unique_jobs/core_ext.rb new file mode 100644 index 000000000..d8fa333c6 --- /dev/null +++ b/lib/sidekiq_unique_jobs/core_ext.rb @@ -0,0 +1,46 @@ +begin + require 'active_support/core_ext/hash/keys' + require 'active_support/core_ext/hash/deep_merge' +rescue LoadError + class Hash + def slice(*keys) + keys.map! { |key| convert_key(key) } if respond_to?(:convert_key, true) + keys.each_with_object(self.class.new) { |k, hash| hash[k] = self[k] if key?(k) } + end unless {}.respond_to?(:slice) + + def slice!(*keys) + keys.map! { |key| convert_key(key) } if respond_to?(:convert_key, true) + omit = slice(*self.keys - keys) + hash = slice(*keys) + hash.default = default + hash.default_proc = default_proc if default_proc + replace(hash) + omit + end unless {}.respond_to?(:slice!) + end +end +begin + require 'active_support/core_ext/string/inflections' +rescue LoadError + class String + # File activesupport/lib/active_support/inflector/methods.rb, line 178 + def classify + camelize(singularize(sub(/.*\./, ''))) + end unless ''.respond_to?(:classify) + + # File activesupport/lib/active_support/inflector/methods.rb, line 67 + def camelize(uppercase_first_letter = true) + string = self + if uppercase_first_letter + string = string.sub(/^[a-z\d]*/) { $&.capitalize } + else + string = string.sub(/^(?:(?=\b|[A-Z_])|\w)/) { $&.downcase } + end + string.gsub!(%r{(?:_|(\/))([a-z\d]*)}i) do + "#{Regexp.last_match(1)}#{Regexp.last_match(2).capitalize}" + end + string.gsub!(%r{/}, '::') + string + end unless ''.respond_to?(:camelize) + end +end diff --git a/lib/sidekiq_unique_jobs/lock.rb b/lib/sidekiq_unique_jobs/lock.rb new file mode 100644 index 000000000..4888764c3 --- /dev/null +++ b/lib/sidekiq_unique_jobs/lock.rb @@ -0,0 +1,10 @@ +require 'sidekiq_unique_jobs/lock/time_calculator' +require 'sidekiq_unique_jobs/lock/until_executed' +require 'sidekiq_unique_jobs/lock/until_executing' +require 'sidekiq_unique_jobs/lock/while_executing' +require 'sidekiq_unique_jobs/lock/until_timeout' + +module SidekiqUniqueJobs + module Lock + end +end diff --git a/lib/sidekiq_unique_jobs/lock/time_calculator.rb b/lib/sidekiq_unique_jobs/lock/time_calculator.rb new file mode 100644 index 000000000..50793805d --- /dev/null +++ b/lib/sidekiq_unique_jobs/lock/time_calculator.rb @@ -0,0 +1,44 @@ +module SidekiqUniqueJobs + module Lock + class TimeCalculator + def self.for_item(item) + new(item) + end + + def initialize(item) + @item = item + end + + def seconds + time_until_scheduled + unique_expiration + end + + def time_until_scheduled + scheduled = item['at'.freeze] + return 0 unless scheduled + (Time.at(scheduled) - Time.now.utc).to_i + end + + def unique_expiration + @unique_expiration ||= + ( + worker_class_unique_expiration || + SidekiqUniqueJobs.config.default_expiration + ).to_i + end + + def worker_class_unique_expiration + return unless worker_class.respond_to?(:get_sidekiq_options) + worker_class.get_sidekiq_options['unique_expiration'.freeze] + end + + def worker_class + @worker_class ||= SidekiqUniqueJobs.worker_class_constantize(item['class'.freeze]) + end + + private + + attr_reader :item + end + end +end diff --git a/lib/sidekiq_unique_jobs/lock/until_executed.rb b/lib/sidekiq_unique_jobs/lock/until_executed.rb new file mode 100644 index 000000000..70bafeb9b --- /dev/null +++ b/lib/sidekiq_unique_jobs/lock/until_executed.rb @@ -0,0 +1,56 @@ +module SidekiqUniqueJobs + module Lock + class UntilExecuted + OK ||= 'OK'.freeze + + extend Forwardable + def_delegators :Sidekiq, :logger + + def initialize(item, redis_pool = nil) + @item = item + @redis_pool = redis_pool + end + + def unlock(scope) + unless [:server, :api, :test].include?(scope) + fail ArgumentError, "#{scope} middleware can't #{__method__} #{unique_key}" + end + SidekiqUniqueJobs::Unlockable.unlock(unique_key, item['jid'.freeze], redis_pool) + end + + # rubocop:disable MethodLength + def lock(scope) + if scope.to_sym != :client + fail ArgumentError, "#{scope} middleware can't #{__method__} #{unique_key}" + end + + result = Scripts.call(:aquire_lock, redis_pool, + keys: [unique_key], + argv: [item['jid'.freeze], max_lock_time]) + case result + when 1 + logger.debug { "successfully locked #{unique_key} for #{max_lock_time} seconds" } + true + when 0 + logger.debug { "failed to aquire lock for #{unique_key}" } + false + else + fail "#{__method__} returned an unexpected value (#{result})" + end + end + # rubocop:enable MethodLength + + def unique_key + @unique_key ||= UniqueArgs.digest(item) + end + + def max_lock_time + @max_lock_time ||= TimeCalculator.for_item(item).seconds + end + + private + + attr_reader :item, :redis_pool + end + end +end diff --git a/lib/sidekiq_unique_jobs/lock/until_executing.rb b/lib/sidekiq_unique_jobs/lock/until_executing.rb new file mode 100644 index 000000000..8b5a4225e --- /dev/null +++ b/lib/sidekiq_unique_jobs/lock/until_executing.rb @@ -0,0 +1,6 @@ +module SidekiqUniqueJobs + module Lock + class UntilExecuting < UntilExecuted + end + end +end diff --git a/lib/sidekiq_unique_jobs/lock/until_timeout.rb b/lib/sidekiq_unique_jobs/lock/until_timeout.rb new file mode 100644 index 000000000..8ddef9b85 --- /dev/null +++ b/lib/sidekiq_unique_jobs/lock/until_timeout.rb @@ -0,0 +1,10 @@ +module SidekiqUniqueJobs + module Lock + class UntilTimeout < UntilExecuted + def unlock(scope) + return true if scope.to_sym == :server + fail ArgumentError, "#{scope} middleware can't #{__method__} #{unique_key}" + end + end + end +end diff --git a/lib/sidekiq_unique_jobs/lock/while_executing.rb b/lib/sidekiq_unique_jobs/lock/while_executing.rb new file mode 100644 index 000000000..046f3c582 --- /dev/null +++ b/lib/sidekiq_unique_jobs/lock/while_executing.rb @@ -0,0 +1,31 @@ +module SidekiqUniqueJobs + module Lock + class WhileExecuting + def self.synchronize(item, redis_pool = nil, &block) + new(item, redis_pool).synchronize(&block) + end + + def initialize(item, redis_pool = nil) + @unique_digest = item['unique_digest'.freeze] + @run_key = "#{@unique_digest}:run" + @mutex = Mutex.new + @redis_pool = redis_pool + end + + def synchronize(&_block) + @mutex.lock + sleep 0.001 until locked? + + yield + + ensure + SidekiqUniqueJobs.connection(@redis_pool) { |c| c.del @run_key } + @mutex.unlock + end + + def locked? + Scripts.call(:synchronize, @redis_pool, keys: [@run_key], argv: [Time.now.to_i]) == 1 + end + end + end +end diff --git a/lib/sidekiq_unique_jobs/normalizer.rb b/lib/sidekiq_unique_jobs/normalizer.rb new file mode 100644 index 000000000..da6416892 --- /dev/null +++ b/lib/sidekiq_unique_jobs/normalizer.rb @@ -0,0 +1,7 @@ +module SidekiqUniqueJobs + module Normalizer + def self.jsonify(args) + JSON.parse(args.to_json) + end + end +end diff --git a/lib/sidekiq_unique_jobs/options_with_fallback.rb b/lib/sidekiq_unique_jobs/options_with_fallback.rb new file mode 100644 index 000000000..f16f6992c --- /dev/null +++ b/lib/sidekiq_unique_jobs/options_with_fallback.rb @@ -0,0 +1,36 @@ +module SidekiqUniqueJobs + module OptionsWithFallback + UNIQUE_KEY ||= 'unique'.freeze + UNIQUE_LOCK_KEY ||= 'unique_lock'.freeze + LOG_DUPLICATE_KEY ||= 'log_duplicate_payload'.freeze + + def unique_enabled? + options[UNIQUE_KEY] || item[UNIQUE_KEY] + end + + def unique_disabled? + !unique_enabled? + end + + def log_duplicate_payload? + options[LOG_DUPLICATE_KEY] || item[LOG_DUPLICATE_KEY] + end + + def lock + @lock = lock_class.new(item) + end + + def lock_class + "SidekiqUniqueJobs::Lock::#{unique_lock.to_s.classify}".constantize + end + + def unique_lock + options[UNIQUE_LOCK_KEY] || item[UNIQUE_LOCK_KEY] || SidekiqUniqueJobs.default_lock + end + + def options + @options ||= worker_class.get_sidekiq_options if worker_class.respond_to?(:get_sidekiq_options) + @options ||= {} + end + end +end diff --git a/lib/sidekiq_unique_jobs/run_lock.rb b/lib/sidekiq_unique_jobs/run_lock.rb deleted file mode 100644 index 86ceb243e..000000000 --- a/lib/sidekiq_unique_jobs/run_lock.rb +++ /dev/null @@ -1,74 +0,0 @@ -# Cross-process locking using Redis. -require 'sidekiq_unique_jobs/run_lock_failed' - -module SidekiqUniqueJobs - class RunLock - extend Forwardable - - def self.synchronize(key, redis, options = {}, &blk) - new(key, redis, options).synchronize(&blk) - end - - def_delegators :'SidekiqUniqueJobs.config', :default_run_lock_retries, - :default_run_lock_retry_interval, :default_run_lock_expire - - attr_reader :options - - def initialize(key, redis, options = {}) - @key = "#{key}:run" - @redis = redis - @options = options - @mutex = Mutex.new - end - - # NOTE wrapped in mutex to maintain its semantics - def synchronize - @mutex.lock - sleep 0.001 until locked? - - yield - - ensure - @redis.del @key - @mutex.unlock - end - - private - - # rubocop:disable MethodLength - def locked? - got_lock = false - if @redis.setnx @key, Time.now.to_i + 60 - @redis.expire @key, 60 - got_lock = true - else - begin - @redis.watch @key - time = @redis.get @key - if time && time.to_i < Time.now.to_i - got_lock = @redis.multi do - @redis.set @key, Time.now.to_i + 60 - end - end - ensure - @redis.unwatch - end - end - - got_lock - end - # rubocop:enable MethodLength - - def run_lock_retries - options['run_lock_retries'] || default_run_lock_retries.to_i - end - - def run_lock_retry_interval - options['run_lock_retry_interval'] || default_run_lock_retry_interval.to_i - end - - def run_lock_expire - options['run_lock_expire'] || default_run_lock_expire.to_i - end - end -end diff --git a/lib/sidekiq_unique_jobs/scripts.rb b/lib/sidekiq_unique_jobs/scripts.rb new file mode 100644 index 000000000..9e1abe030 --- /dev/null +++ b/lib/sidekiq_unique_jobs/scripts.rb @@ -0,0 +1,50 @@ +require 'pathname' +require 'digest/sha1' + +module SidekiqUniqueJobs + ScriptError = Class.new(StandardError) + + module Scripts + extend Forwardable + LUA_PATHNAME ||= Pathname.new(__FILE__).dirname.join('../../redis').freeze + SOURCE_FILES ||= Dir[LUA_PATHNAME.join('**/*.lua')].compact.freeze + DEFINED_METHODS ||= [] + + module_function + + def script_shas + @script_shas ||= {} + end + + def logger + Sidekiq.logger + end + + def call(file_name, redis_pool, options = {}) + connection(redis_pool) do |redis| + script_shas[file_name] ||= redis.script(:load, script_source(file_name)) + redis.evalsha(script_shas[file_name], options) + end + rescue Redis::CommandError => ex + raise ScriptError, + "#{file_name}.lua\n\n" + + ex.message + "\n\n" + + script_source(file_name) + + ex.backtrace.join("\n") + end + + def connection(redis_pool, &_block) + SidekiqUniqueJobs.connection(redis_pool) do |conn| + yield conn + end + end + + def script_source(file_name) + script_path(file_name).read + end + + def script_path(file_name) + LUA_PATHNAME.join("#{file_name}.lua") + end + end +end diff --git a/lib/sidekiq_unique_jobs/server/extensions.rb b/lib/sidekiq_unique_jobs/server/extensions.rb deleted file mode 100644 index abf68e051..000000000 --- a/lib/sidekiq_unique_jobs/server/extensions.rb +++ /dev/null @@ -1,22 +0,0 @@ -module SidekiqUniqueJobs - module Server - module Extensions - def remove_on_match - <<-LUA - if redis.call('GET', KEYS[1]) == ARGV[1] then - redis.call('DEL', KEYS[1]) - end - LUA - end - - def remove_scheduled_on_match - <<-LUA - local ret = redis.call('GET', KEYS[1]) - if ret and string.sub(ret, 11, -1) == ARGV[1] then - redis.call('DEL', KEYS[1]) - end - LUA - end - end - end -end diff --git a/lib/sidekiq_unique_jobs/server/middleware.rb b/lib/sidekiq_unique_jobs/server/middleware.rb index f43c6e513..166f42887 100644 --- a/lib/sidekiq_unique_jobs/server/middleware.rb +++ b/lib/sidekiq_unique_jobs/server/middleware.rb @@ -1,89 +1,71 @@ require 'digest' require 'forwardable' -require 'sidekiq_unique_jobs/server/extensions' module SidekiqUniqueJobs module Server class Middleware extend Forwardable - def_delegators :SidekiqUniqueJobs, :connection, :payload_hash, :synchronize - def_delegators :'SidekiqUniqueJobs.config', :default_unlock_order, - :default_reschedule_on_lock_fail def_delegators :Sidekiq, :logger + def_instance_delegator :@worker, :class, :worker_class - include Extensions + include OptionsWithFallback - attr_reader :redis_pool, - :worker, - :options - - def call(worker, item, _queue, redis_pool = nil, &blk) + def call(worker, item, queue, redis_pool = nil, &blk) @worker = worker @redis_pool = redis_pool - setup_options(worker.class) - send("#{unlock_order}_call", item, &blk) - end + @queue = queue + @item = item - def setup_options(klass) - @options = {} - options.merge!(klass.get_sidekiq_options) if klass.respond_to?(:get_sidekiq_options) + send(unique_lock, &blk) end - def before_yield_call(item) - unlock(payload_hash(item), item) + private + + attr_reader :redis_pool, :worker, :item, :worker_class + + def until_executing + unlock yield end - def after_yield_call(item) + def until_executed(&block) operative = true - yield + after_yield_yield(&block) rescue Sidekiq::Shutdown operative = false raise ensure - unlock(payload_hash(item), item) if operative + unlock if operative end - def run_lock_call(item) - lock_key = payload_hash(item) - connection do |con| - synchronize(lock_key, con, item.dup.merge(options)) do - unlock(lock_key, item) - yield - end + def after_yield_yield + yield + end + + def while_executing + lock.synchronize do + yield end rescue SidekiqUniqueJobs::RunLockFailed - return reschedule(item) if reschedule_on_lock_fail + return reschedule if reschedule_on_lock_fail raise end - def unlock_order - return :never unless options['unique'] - options['unique_unlock_order'] || default_unlock_order - end - - def never_call(*) + def until_timeout yield if block_given? end - def reschedule_on_lock_fail - options['reschedule_on_lock_fail'] || default_reschedule_on_lock_fail - end - protected - def unlock(lock_key, item) - connection do |con| - con.eval(remove_on_match, keys: [lock_key], argv: [item['jid']]) - end - after_unlock_hook + def unlock + after_unlock_hook if lock.unlock(:server) end def after_unlock_hook worker.after_unlock if worker.respond_to?(:after_unlock) end - def reschedule(item) + def reschedule Sidekiq::Client.new(redis_pool).raw_push([item]) end end diff --git a/lib/sidekiq_unique_jobs/sidekiq_unique_ext.rb b/lib/sidekiq_unique_jobs/sidekiq_unique_ext.rb index 490a50d81..4fa0d00ab 100644 --- a/lib/sidekiq_unique_jobs/sidekiq_unique_ext.rb +++ b/lib/sidekiq_unique_jobs/sidekiq_unique_ext.rb @@ -1,13 +1,17 @@ require 'sidekiq/api' -require 'sidekiq_unique_jobs/server/extensions' module Sidekiq + module UnlockMethod + def unlock(item) + SidekiqUniqueJobs::Unlockable.unlock(item['unique_digest'.freeze], item['jid'.freeze]) + end + end + class SortedEntry module UniqueExtension - include SidekiqUniqueJobs::Server::Extensions - def self.included(base) base.class_eval do + include UnlockMethod alias_method :delete_orig, :delete alias_method :delete, :delete_ext alias_method :remove_job_orig, :remove_job @@ -16,13 +20,14 @@ def self.included(base) end def delete_ext - unlock(payload_hash(item), item) - delete_orig + unlock(item) if delete_orig end + private + def remove_job_ext remove_job_orig do |message| - unlock(payload_hash(Sidekiq.load_json(message)), item) + unlock(Sidekiq.load_json(message)) yield message end end @@ -31,38 +36,46 @@ def remove_job_ext include UniqueExtension if Gem::Version.new(Sidekiq::VERSION) >= Gem::Version.new('3.1') end - class Job + class ScheduledSet module UniqueExtension - SCHEDULED ||= 'schedule'.freeze - include SidekiqUniqueJobs::Server::Extensions - extend Forwardable - def_delegator :SidekiqUniqueJobs, :payload_hash - def self.included(base) base.class_eval do + include UnlockMethod alias_method :delete_orig, :delete alias_method :delete, :delete_ext end end def delete_ext - unlock(payload_hash(item), item) - delete_orig + unlock(item) if delete_orig end - protected + def remove_job_ext + remove_job_orig do |message| + unlock(Sidekiq.load_json(message)) + yield message + end + end + end + include UniqueExtension if Gem::Version.new(Sidekiq::VERSION) >= Gem::Version.new('3.1') + end - def unlock(lock_key, item) - Sidekiq.redis do |con| - con.eval(remove_on_match, keys: [lock_key], argv: [item['jid']]) - if defined?(@parent) && @parent && @parent.name == SCHEDULED - con.eval(remove_scheduled_on_match, keys: [lock_key], argv: [item['jid']]) - else - con.eval(remove_on_match, keys: [lock_key], argv: [item['jid']]) - end + class Job + module UniqueExtension + def self.included(base) + base.class_eval do + include UnlockMethod + alias_method :delete_orig, :delete + alias_method :delete, :delete_ext end end + + def delete_ext + unlock(item) + delete_orig + end end + include UniqueExtension end @@ -70,6 +83,7 @@ class Queue module UniqueExtension def self.included(base) base.class_eval do + include UnlockMethod alias_method :clear_orig, :clear alias_method :clear, :clear_ext end @@ -88,17 +102,27 @@ class JobSet module UniqueExtension def self.included(base) base.class_eval do + include UnlockMethod if base.method_defined?(:clear) alias_method :clear_orig, :clear alias_method :clear, :clear_ext end + + if base.method_defined?(:delete_by_value) + alias_method :delete_by_value_orig, :delete_by_value + alias_method :delete_by_value, :delete_by_value_ext + end end end def clear_ext - each(&:delete_ext) + each(&:delete) clear_orig end + + def delete_by_value_ext(name, value) + unlock(JSON.parse(value)) if delete_by_value_orig(name, value) + end end include UniqueExtension diff --git a/lib/sidekiq_unique_jobs/testing.rb b/lib/sidekiq_unique_jobs/testing.rb index 4c6158cb7..b21bdb027 100644 --- a/lib/sidekiq_unique_jobs/testing.rb +++ b/lib/sidekiq_unique_jobs/testing.rb @@ -4,8 +4,6 @@ module SidekiqUniqueJobs module Client class Middleware alias_method :call_real, :call - alias_method :unique_for_connection_real?, :unique_for_connection? - def call(worker_class, item, queue, redis_pool = nil) worker_class = SidekiqUniqueJobs.worker_class_constantize(worker_class) @@ -25,32 +23,6 @@ def call(worker_class, item, queue, redis_pool = nil) def _server SidekiqUniqueJobs::Server::Middleware.new end - - def unique_for_connection? - return unique_for_connection_real? unless Sidekiq::Testing.fake? - return true if worker_class.jobs.empty? - - worker_class.jobs.find do |job| - item['unique_hash'] == job['unique_hash'] - end.nil? - end - end - end - - module Server - class Middleware - alias_method :unlock_real, :unlock - - def unlock(lock_key, item) - return unlock_real(lock_key, item) unless SidekiqUniqueJobs.config.mocking? - - connection do |con| - con.watch(lock_key) - return con.unwatch unless con.get(lock_key) == item['jid'] - - con.multi { con.del(lock_key) } - end - end end end diff --git a/lib/sidekiq_unique_jobs/testing/sidekiq_overrides.rb b/lib/sidekiq_unique_jobs/testing/sidekiq_overrides.rb index 60ae1af9e..af5c3e9e0 100644 --- a/lib/sidekiq_unique_jobs/testing/sidekiq_overrides.rb +++ b/lib/sidekiq_unique_jobs/testing/sidekiq_overrides.rb @@ -2,10 +2,43 @@ module Sidekiq module Worker + module ClassMethods + include SidekiqUniqueJobs::Unlockable + + # Drain and run all jobs for this worker + def drain + while (job = jobs.shift) + worker = new + worker.jid = job['jid'] + worker.bid = job['bid'] if worker.respond_to?(:bid=) + execute_job(worker, job['args']) + unlock(job['unique_digest'], job['jid']) if Sidekiq::Testing.fake? + end + end + + # Pop out a single job and perform it + def perform_one + fail(EmptyQueueError, 'perform_one called with empty job queue') if jobs.empty? + job = jobs.shift + worker = new + worker.jid = job['jid'] + worker.bid = job['bid'] if worker.respond_to?(:bid=) + execute_job(worker, job['args']) + unlock(job['unique_digest'], job['jid']) if Sidekiq::Testing.fake? + end + + # Clear all jobs for this worker + def clear + jobs.each do |job| + unlock(job['unique_digest'], job['jid']) if Sidekiq::Testing.fake? + end + jobs.clear + end + end + module Overrides def self.included(base) - base.extend ClassMethods - + base.extend Sidekiq::Worker::Overrides::ClassMethods base.class_eval do class << self alias_method :clear_all_orig, :clear_all @@ -16,12 +49,11 @@ class << self module ClassMethods def clear_all_ext + Sidekiq.redis do |c| + unique_keys = c.keys("#{SidekiqUniqueJobs.config.unique_prefix}:*") + c.del(*unique_keys) unless unique_keys.empty? + end clear_all_orig - unique_prefix = SidekiqUniqueJobs.config.unique_prefix - unique_keys = Sidekiq.redis { |conn| conn.keys("#{unique_prefix}*") } - return if unique_keys.empty? - - Sidekiq.redis { |conn| conn.del(*unique_keys) } end end end diff --git a/lib/sidekiq_unique_jobs/unique_args.rb b/lib/sidekiq_unique_jobs/unique_args.rb new file mode 100644 index 000000000..76c9302d9 --- /dev/null +++ b/lib/sidekiq_unique_jobs/unique_args.rb @@ -0,0 +1,132 @@ +require 'sidekiq_unique_jobs/normalizer' + +module SidekiqUniqueJobs + # This class exists to be testable and the entire api should be considered private + # rubocop:disable ClassLength + class UniqueArgs + extend Forwardable + include Normalizer + + def_delegators :SidekiqUniqueJobs, :config, :worker_class_constantize + def_delegators :'Sidekiq.logger', :logger, :debug, :warn, :error, :fatal + + def self.digest(item) + new(item).unique_digest + end + + def initialize(job) + Sidekiq::Logging.with_context(self.class.name) do + @item = job + @worker_class ||= worker_class_constantize(@item['class'.freeze]) + @item['unique_prefix'.freeze] ||= unique_prefix + @item['unique_args'.freeze] ||= unique_args(@item['args'.freeze]) + @item['unique_digest'.freeze] ||= unique_digest + end + end + + def unique_digest + @unique_digest ||= begin + digest = Digest::MD5.hexdigest(Sidekiq.dump_json(digestable_hash)) + digest = "#{unique_prefix}:#{digest}" + debug { "#{__method__} : #{digestable_hash} into #{digest}" } + digest + end + end + + def unique_prefix + return config.unique_prefix unless sidekiq_worker_class? + @worker_class.get_sidekiq_options['unique_prefix'.freeze] || config.unique_prefix + end + + def digestable_hash + hash = @item.slice('class', 'queue', 'unique_args') + + if unique_on_all_queues? + debug { "uniqueness specified across all queues (deleting queue: #{@item['queue']} from hash)" } + hash.delete('queue') + end + hash + end + + def unique_args(args) + if unique_args_enabled? + filtered_args(args) + else + debug { "#{__method__} : unique arguments disabled" } + args + end + rescue NameError + # fallback to not filtering args when class can't be instantiated + return args + end + + def unique_on_all_queues? + return unless sidekiq_worker_class? + return unless unique_args_enabled? + @worker_class.get_sidekiq_options['unique_on_all_queues'.freeze] + end + + def unique_args_enabled? + unique_args_enabled_in_worker? || + config.unique_args_enabled + end + + def unique_args_enabled_in_worker? + return unless sidekiq_worker_class? + @worker_class.get_sidekiq_options['unique_args_enabled'.freeze] || + @worker_class.get_sidekiq_options['unique_args'.freeze] + end + + def sidekiq_worker_class? + if @worker_class.respond_to?(:get_sidekiq_options) + true + else + debug { "#{@worker_class} does not respond to :get_sidekiq_options" } + nil + end + end + + # Filters unique arguments by proc or symbol + # returns provided arguments for other configurations + def filtered_args(args) + return args if args.empty? + json_args = Normalizer.jsonify(args) + debug { "#filtered_args #{args} => #{json_args}" } + + case unique_args_method + when Proc + filter_by_proc(json_args) + when Symbol + filter_by_symbol(json_args) + else + debug { 'arguments not filtered (the combined arguments count towards uniqueness)' } + json_args + end + end + + def filter_by_proc(args) + filter_args = unique_args_method.call(args) + debug { "#{__method__} : #{args} -> #{filter_args}" } + filter_args + end + + def filter_by_symbol(args) + unless @worker_class.respond_to?(unique_args_method) + warn do + "#{__method__} : #{unique_args_method}) not defined in #{@worker_class} " \ + "returning #{args} unchanged" + end + return args + end + + filter_args = @worker_class.send(unique_args_method, args) + debug { "#{__method__} : #{unique_args_method}(#{args}) => #{filter_args}" } + filter_args + end + + def unique_args_method + @unique_args_method ||= + @worker_class.get_sidekiq_options['unique_args'.freeze] if sidekiq_worker_class? + end + end +end diff --git a/lib/sidekiq_unique_jobs/unlockable.rb b/lib/sidekiq_unique_jobs/unlockable.rb new file mode 100644 index 000000000..792bc3652 --- /dev/null +++ b/lib/sidekiq_unique_jobs/unlockable.rb @@ -0,0 +1,26 @@ +module SidekiqUniqueJobs + module Unlockable + module_function + + # rubocop:disable MethodLength + def unlock(unique_key, jid, redis_pool = nil) + result = Scripts.call(:release_lock, redis_pool, + keys: [unique_key], + argv: [jid]) + case result + when 1 + Sidekiq.logger.debug { "successfully unlocked #{unique_key}" } + true + when 0 + Sidekiq.logger.debug { "expiring lock #{unique_key} is not owned by #{jid}" } + false + when -1 + Sidekiq.logger.debug { "#{unique_key} is not a known key" } + false + else + fail "#{__method__} returned an unexpected value (#{result})" + end + end + # rubocop:enable MethodLength + end +end diff --git a/lib/sidekiq_unique_jobs/version.rb b/lib/sidekiq_unique_jobs/version.rb index effceb669..9fe02ece7 100644 --- a/lib/sidekiq_unique_jobs/version.rb +++ b/lib/sidekiq_unique_jobs/version.rb @@ -1,3 +1,3 @@ module SidekiqUniqueJobs - VERSION = '3.0.16' + VERSION = '4.0.0' end diff --git a/redis/aquire_lock.lua b/redis/aquire_lock.lua new file mode 100644 index 000000000..574670d46 --- /dev/null +++ b/redis/aquire_lock.lua @@ -0,0 +1,9 @@ +local unique_key = KEYS[1] +local job_id = ARGV[1] +local expires = ARGV[2] + +if redis.pcall('set', unique_key, job_id, 'nx', 'ex', expires) then + return 1 +else + return 0 +end diff --git a/redis/release_lock.lua b/redis/release_lock.lua new file mode 100644 index 000000000..b9201f8aa --- /dev/null +++ b/redis/release_lock.lua @@ -0,0 +1,14 @@ +local unique_key = KEYS[1] +local job_id = ARGV[1] +local stored_jid = redis.pcall('get', unique_key) + +if stored_jid then + if stored_jid == job_id then + return redis.pcall('del', unique_key) + else + return 0 + end +else + return -1 +end + diff --git a/redis/synchronize.lua b/redis/synchronize.lua new file mode 100644 index 000000000..31f75727a --- /dev/null +++ b/redis/synchronize.lua @@ -0,0 +1,15 @@ +local unique_key = KEYS[1] +local time = ARGV[1] + +if redis.pcall('set', unique_key, time + 60, 'nx', 'ex', 60) then + return 1 +end + +local stored_time = redis.pcall('get', unique_key) +if stored_time and stored_time < time then + if redis.call('set', unique_key, time + 60, 'nx', 'ex') then + return 1 + end +end + +return 0 diff --git a/sidekiq-unique-jobs.gemspec b/sidekiq-unique-jobs.gemspec index c0c17598b..ce6000472 100644 --- a/sidekiq-unique-jobs.gemspec +++ b/sidekiq-unique-jobs.gemspec @@ -13,11 +13,9 @@ Gem::Specification.new do |gem| gem.test_files = `git ls-files -- test/*`.split("\n") gem.name = 'sidekiq-unique-jobs' gem.require_paths = ['lib'] - gem.post_install_message = 'If you are relying on `mock_redis` you will now have to add' \ - 'gem "mock_redis" to your desired bundler group.' + gem.post_install_message = 'WARNING: VERSION 4.0.0 HAS BREAKING CHANGES! Please see Readme for info' gem.version = SidekiqUniqueJobs::VERSION gem.add_dependency 'sidekiq', '>= 2.6' - gem.add_development_dependency 'mock_redis' gem.add_development_dependency 'rspec', '~> 3.1.0' gem.add_development_dependency 'rake' gem.add_development_dependency 'rspec-sidekiq' diff --git a/spec/lib/client/middleware_spec.rb b/spec/lib/client/middleware_spec.rb deleted file mode 100644 index b60f86d78..000000000 --- a/spec/lib/client/middleware_spec.rb +++ /dev/null @@ -1,249 +0,0 @@ -require 'spec_helper' -require 'celluloid' -require 'sidekiq/worker' -require 'sidekiq-unique-jobs' -require 'sidekiq/scheduled' - -RSpec.describe SidekiqUniqueJobs::Client::Middleware do - describe 'with real redis' do - before do - Sidekiq.redis = REDIS - Sidekiq.redis(&:flushdb) - QueueWorker.sidekiq_options unique: nil, unique_job_expiration: nil - end - - class QueueWorker - include Sidekiq::Worker - sidekiq_options queue: :customqueue - def perform(_) - end - end - - class PlainClass - def run(_x) - end - end - - class MyUniqueWorker - include Sidekiq::Worker - sidekiq_options queue: :customqueue, retry: true, unique: true, - unique_job_expiration: 7200, retry_count: 10 - def perform(_) - end - end - - class MainJob - include Sidekiq::Worker - sidekiq_options queue: :customqueue, unique: true, log_duplicate_payload: true - - def perform(_) - end - end - - describe 'when a job is already scheduled' do - before 'schedule a job' do - MyUniqueWorker.perform_in(3600, 1) - end - - context '#old_unique_for' do - it 'rejects new scheduled jobs with the same argument' do - allow(SidekiqUniqueJobs.config).to receive(:unique_storage_method).and_return(:old) - expect(MyUniqueWorker.perform_in(1800, 1)).to eq(nil) - end - it 'will run a job in real time with the same arguments' do - allow(SidekiqUniqueJobs.config).to receive(:unique_storage_method).and_return(:old) - expect(MyUniqueWorker.perform_async(1)).not_to eq(nil) - end - it 'schedules new jobs when arguments differ' do - [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20].each do |x| - MainJob.perform_in(x.seconds.from_now, x) - end - result = Sidekiq.redis { |c| c.zcount('schedule', -1, Time.now.to_f + 2 * 60) } - expect(result).to eq(20) - end - end - context '#new_unique_for' do - it 'rejects new scheduled jobs with the same argument' do - allow(SidekiqUniqueJobs.config).to receive(:unique_storage_method).and_return(:new) - expect(MyUniqueWorker.perform_in(3600, 1)).to eq(nil) - end - it 'will run a job in real time with the same arguments' do - allow(SidekiqUniqueJobs.config).to receive(:unique_storage_method).and_return(:new) - expect(MyUniqueWorker.perform_async(1)).not_to eq(nil) - end - it 'schedules new jobs when arguments differ' do - [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20].each do |x| - MainJob.perform_in(x.seconds.from_now, x) - end - result = Sidekiq.redis { |c| c.zcount('schedule', -1, Time.now.to_f + 2 * 60) } - expect(result).to eq(20) - end - end - end - - it 'does not push duplicate messages when configured for unique only' do - QueueWorker.sidekiq_options unique: true - 10.times { Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) } - result = Sidekiq.redis { |c| c.llen('queue:customqueue') } - expect(result).to eq 1 - end - - it 'does push duplicate messages to different queues' do - QueueWorker.sidekiq_options unique: true - Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) - Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue2', 'args' => [1, 2]) - q1_length = Sidekiq.redis { |c| c.llen('queue:customqueue') } - q2_length = Sidekiq.redis { |c| c.llen('queue:customqueue2') } - expect(q1_length).to eq 1 - expect(q2_length).to eq 1 - end - - it 'does not queue duplicates when when calling delay' do - 10.times { PlainClass.delay(unique: true, queue: 'customqueue').run(1) } - result = Sidekiq.redis { |c| c.llen('queue:customqueue') } - expect(result).to eq 1 - end - - it 'does not schedule duplicates when calling perform_in' do - QueueWorker.sidekiq_options unique: true - 10.times { QueueWorker.perform_in(60, [1, 2]) } - result = Sidekiq.redis { |c| c.zcount('schedule', -1, Time.now.to_f + 2 * 60) } - expect(result).to eq 1 - end - - it 'enqueues previously scheduled job' do - QueueWorker.sidekiq_options unique: true - jid = QueueWorker.perform_in(60 * 60, 1, 2) - - # time passes and the job is pulled off the schedule: - Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2], 'jid' => jid) - - result = Sidekiq.redis { |c| c.llen('queue:customqueue') } - expect(result).to eq 1 - end - - it 'sets an expiration when provided by sidekiq options' do - one_hour_expiration = 60 * 60 - QueueWorker.sidekiq_options unique: true, unique_job_expiration: one_hour_expiration - Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) - - payload_hash = SidekiqUniqueJobs.get_payload('QueueWorker', 'customqueue', [1, 2]) - actual_expires_at = Sidekiq.redis { |c| c.ttl(payload_hash) } - - Sidekiq.redis { |c| c.llen('queue:customqueue') } - expect(actual_expires_at).to be_within(2).of(one_hour_expiration) - end - - it 'does push duplicate messages when not configured for unique only' do - QueueWorker.sidekiq_options unique: false - 10.times { Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) } - expect(Sidekiq.redis { |c| c.llen('queue:customqueue') }).to eq 10 - - result = Sidekiq.redis { |c| c.llen('queue:customqueue') } - expect(result).to eq 10 - end - - describe 'when unique_args is defined' do - before { SidekiqUniqueJobs.config.unique_args_enabled = true } - after { SidekiqUniqueJobs.config.unique_args_enabled = false } - - class QueueWorkerWithFilterMethod < QueueWorker - sidekiq_options unique: true, unique_args: :args_filter - - def self.args_filter(*args) - args.first - end - end - - class QueueWorkerWithFilterProc < QueueWorker - # slightly contrived example of munging args to the - # worker and removing a random bit. - sidekiq_options unique: true, unique_args: (lambda do |args| - a = args.last.dup - a.delete(:random) - [args.first, a] - end) - end - - it 'does not push duplicate messages based on args filter method' do - expect(QueueWorkerWithFilterMethod).to respond_to(:args_filter) - expect(QueueWorkerWithFilterMethod.get_sidekiq_options['unique_args']).to eq :args_filter - - (0..10).each do |i| - Sidekiq::Client.push( - 'class' => QueueWorkerWithFilterMethod, - 'queue' => 'customqueue', - 'args' => [1, i] - ) - end - result = Sidekiq.redis { |c| c.llen('queue:customqueue') } - expect(result).to eq 1 - end - - it 'does not push duplicate messages based on args filter proc' do - expect(QueueWorkerWithFilterProc.get_sidekiq_options['unique_args']).to be_a(Proc) - - 10.times do - Sidekiq::Client.push( - 'class' => QueueWorkerWithFilterProc, - 'queue' => 'customqueue', - 'args' => [1, { random: rand, name: 'foobar' }] - ) - end - result = Sidekiq.redis { |c| c.llen('queue:customqueue') } - expect(result).to eq 1 - end - - describe 'when unique_on_all_queues is set' do - before { QueueWorker.sidekiq_options unique: true, unique_on_all_queues: true } - before { QueueWorker.sidekiq_options unique: true } - it 'does not push duplicate messages on different queues' do - Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) - Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue2', 'args' => [1, 2]) - q1_length = Sidekiq.redis { |c| c.llen('queue:customqueue') } - q2_length = Sidekiq.redis { |c| c.llen('queue:customqueue2') } - expect(q1_length).to eq 1 - expect(q2_length).to eq 0 - end - end - end - - # TODO: If anyone know of a better way to check that the expiration for scheduled - # jobs are set around the same time as the scheduled job itself feel free to improve. - it 'expires the payload_hash when a scheduled job is scheduled at' do - require 'active_support/all' - QueueWorker.sidekiq_options unique: true - - at = 15.minutes.from_now - expected_expires_at = (Time.at(at) - Time.now.utc) + SidekiqUniqueJobs.config.default_expiration - - QueueWorker.perform_in(at, 'mike') - payload_hash = SidekiqUniqueJobs.get_payload('QueueWorker', 'customqueue', ['mike']) - - # deconstruct this into a time format we can use to get a decent delta for - actual_expires_at = Sidekiq.redis { |c| c.ttl(payload_hash) } - - expect(actual_expires_at).to be_within(2).of(expected_expires_at) - end - - it 'logs duplicate payload when config turned on' do - expect(Sidekiq.logger).to receive(:warn).with(/^payload is not unique/) - - QueueWorker.sidekiq_options unique: true, log_duplicate_payload: true - - 2.times { Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) } - result = Sidekiq.redis { |c| c.llen('queue:customqueue') } - expect(result).to eq 1 - end - - it 'does not log duplicate payload when config turned off' do - expect(Sidekiq.logger).to_not receive(:warn).with(/^payload is not unique/) - - QueueWorker.sidekiq_options unique: true, log_duplicate_payload: false - - 2.times { Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) } - result = Sidekiq.redis { |c| c.llen('queue:customqueue') } - expect(result).to eq 1 - end - end -end diff --git a/spec/lib/server/middleware_spec.rb b/spec/lib/server/middleware_spec.rb deleted file mode 100644 index 8ec9f43cc..000000000 --- a/spec/lib/server/middleware_spec.rb +++ /dev/null @@ -1,143 +0,0 @@ -require 'spec_helper' -require 'sidekiq/api' -require 'sidekiq/cli' -require 'sidekiq/worker' -require 'sidekiq_unique_jobs/server/middleware' - -RSpec.describe SidekiqUniqueJobs::Server::Middleware do - describe '#call' do - describe 'unlock order' do - QUEUE = 'unlock_ordering'.freeze unless defined?(QUEUE) - - def get_payload(item) - SidekiqUniqueJobs.get_payload( - item['class'], item['queue'], item['args']) - end - - class BeforeYieldOrderingWorker - include Sidekiq::Worker - - sidekiq_options unique: true, unique_unlock_order: :before_yield, queue: QUEUE - - def perform - end - end - - class AfterYieldOrderingWorker - include Sidekiq::Worker - - sidekiq_options unique: true, unique_unlock_order: :after_yield, queue: QUEUE - - def perform - end - end - - class RunLockOrderingWorker - include Sidekiq::Worker - - sidekiq_options unique: true, unique_unlock_order: :run_lock, queue: QUEUE - def perform - end - end - - class RunLockSpinningWorker - include Sidekiq::Worker - - sidekiq_options unique: true, - unique_unlock_order: :run_lock, - queue: QUEUE, - run_lock_retries: 10, - run_lock_retry_interval: 0, - reschedule_on_lock_fail: true - def perform - end - end - - before do - Sidekiq.redis = REDIS - Sidekiq.redis(&:flushdb) - end - - describe '#unlock' do - it 'does not unlock mutexes it does not own' do - jid = AfterYieldOrderingWorker.perform_async - item = Sidekiq::Queue.new(QUEUE).find_job(jid).item - Sidekiq.redis do |c| - c.set(get_payload(item), 'NOT_DELETED') - end - - result = subject.call(AfterYieldOrderingWorker.new, item, QUEUE) do - Sidekiq.redis do |c| - c.get(get_payload(item)) - end - end - expect(result).to eq 'NOT_DELETED' - end - end - - describe ':before_yield' do - it 'removes the lock before yielding to the worker' do - jid = BeforeYieldOrderingWorker.perform_async - item = Sidekiq::Queue.new(QUEUE).find_job(jid).item - - result = subject.call(BeforeYieldOrderingWorker.new, item, QUEUE) do - Sidekiq.redis do |c| - c.get(get_payload(item)) - end - end - - expect(result).to eq nil - end - end - - describe ':after_yield' do - it 'removes the lock after yielding to the worker' do - jid = AfterYieldOrderingWorker.perform_async - item = Sidekiq::Queue.new(QUEUE).find_job(jid).item - - result = subject.call(AfterYieldOrderingWorker.new, item, QUEUE) do - Sidekiq.redis do |c| - c.get(get_payload(item)) - end - end - - expect(result).to eq jid - end - end - end - - context 'unlock' do - let(:items) { [AfterYieldWorker.new, { 'class' => 'testClass' }, 'fudge'] } - - it 'should unlock after yield when call succeeds' do - expect(subject).to receive(:unlock) - - subject.call(*items) { true } - end - - it 'should unlock after yield when call errors' do - expect(subject).to receive(:unlock) - - expect { subject.call(*items) { fail } }.to raise_error(RuntimeError) - end - - it 'should not unlock after yield on shutdown, but still raise error' do - expect(subject).to_not receive(:unlock) - - expect { subject.call(*items) { fail Sidekiq::Shutdown } }.to raise_error(Sidekiq::Shutdown) - end - end - - context 'after unlock' do - let(:worker) { AfterUnlockWorker.new } - let(:items) { [worker, { 'class' => 'testClass' }, 'test'] } - it 'should call the after_unlock hook if defined' do - expect(subject).to receive(:unlock).and_call_original - # expect(subject).to receive(:after_unlock_hook) - expect(worker).to receive(:after_unlock) - - subject.call(*items) { true } - end - end - end -end diff --git a/spec/lib/sidekiq_unique_jobs/client/middleware_spec.rb b/spec/lib/sidekiq_unique_jobs/client/middleware_spec.rb new file mode 100644 index 000000000..f869100f7 --- /dev/null +++ b/spec/lib/sidekiq_unique_jobs/client/middleware_spec.rb @@ -0,0 +1,195 @@ +require 'spec_helper' +require 'celluloid/current' +require 'sidekiq/worker' +require 'sidekiq-unique-jobs' +require 'sidekiq/scheduled' + +RSpec.describe SidekiqUniqueJobs::Client::Middleware do + def digest_for(item) + SidekiqUniqueJobs::UniqueArgs.digest(item) + end + + describe 'with real redis' do + before do + Sidekiq.redis = REDIS + Sidekiq.redis(&:flushdb) + QueueWorker.sidekiq_options unique: nil, unique_expiration: nil + end + + describe 'when a job is already scheduled' do + context '#new_unique_for' do + it 'rejects new scheduled jobs with the same argument' do + MyUniqueWorker.perform_in(3600, 1) + expect(MyUniqueWorker.perform_in(3600, 1)).to eq(nil) + end + + it 'will run a job in real time with the same arguments' do + WhileExecutingWorker.perform_in(3600, 1) + expect(WhileExecutingWorker.perform_async(1)).not_to eq(nil) + end + + it 'schedules new jobs when arguments differ' do + [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20].each do |x| + MainJob.perform_in(x.seconds.from_now, x) + end + + Sidekiq.redis do |c| + count = c.zcount('schedule', -1, Time.now.to_f + 2 * 60) + expect(count).to eq(20) + end + end + end + end + + it 'does not push duplicate messages when configured for unique only' do + item = { 'class' => MyUniqueWorker, 'queue' => 'customqueue', 'args' => [1, 2] } + 10.times { Sidekiq::Client.push(item) } + Sidekiq.redis do |c| + expect(c.llen('queue:customqueue')).to eq(1) + end + end + + it 'does push duplicate messages to different queues' do + Sidekiq::Client.push('class' => MyUniqueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) + Sidekiq::Client.push('class' => MyUniqueWorker, 'queue' => 'customqueue2', 'args' => [1, 2]) + Sidekiq.redis do |c| + expect(c.llen('queue:customqueue')).to eq 1 + expect(c.llen('queue:customqueue2')).to eq 1 + end + end + + it 'does not queue duplicates when when calling delay' do + 10.times { PlainClass.delay(unique_lock: :until_executed, unique: true, queue: 'customqueue').run(1) } + Sidekiq.redis do |c| + expect(c.llen('queue:customqueue')).to eq(1) + end + end + + it 'does not schedule duplicates when calling perform_in' do + 10.times { MyUniqueWorker.perform_in(60, [1, 2]) } + Sidekiq.redis do |c| + expect(c.zcount('schedule', -1, Time.now.to_f + 2 * 60)) + .to eq(1) + end + end + + it 'enqueues previously scheduled job' do + jid = WhileExecutingWorker.perform_in(60 * 60, 1, 2) + item = { 'class' => WhileExecutingWorker, 'queue' => 'customqueue', 'args' => [1, 2], 'jid' => jid } + + # time passes and the job is pulled off the schedule: + Sidekiq::Client.push(item) + + Sidekiq.redis do |c| + expect(c.llen('queue:customqueue')).to eq 1 + end + end + + it 'sets an expiration when provided by sidekiq options' do + item = { 'class' => ExpiringWorker, 'queue' => 'customqueue', 'args' => [1, 2] } + Sidekiq::Client.push(item) + + Sidekiq.redis do |c| + expect(c.llen('queue:customqueue')).to eq(1) + expect(c.ttl(digest_for(item))) + .to eq(ExpiringWorker.get_sidekiq_options['unique_expiration']) + end + end + + it 'does push duplicate messages when not configured for unique only' do + 10.times { Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) } + + Sidekiq.redis do |c| + expect(c.llen('queue:customqueue')).to eq(10) + end + end + + describe 'when unique_args is defined' do + before(:all) { SidekiqUniqueJobs.config.unique_args_enabled = true } + after(:all) { SidekiqUniqueJobs.config.unique_args_enabled = false } + + it 'does not push duplicate messages based on args filter method' do + expect(QueueWorkerWithFilterMethod).to respond_to(:args_filter) + expect(QueueWorkerWithFilterMethod.get_sidekiq_options['unique_args']).to eq :args_filter + + (0..10).each do |i| + Sidekiq::Client.push( + 'class' => QueueWorkerWithFilterMethod, + 'queue' => 'customqueue', + 'args' => [1, i] + ) + end + + Sidekiq.redis do |c| + expect(c.llen('queue:customqueue')).to eq(1) + end + end + + it 'does not push duplicate messages based on args filter proc' do + expect(QueueWorkerWithFilterProc.get_sidekiq_options['unique_args']).to be_a(Proc) + + 100.times do + Sidekiq::Client.push( + 'class' => QueueWorkerWithFilterProc, + 'queue' => 'customqueue', + 'args' => [1, { random: rand, name: 'foobar' }] + ) + end + + Sidekiq.redis do |c| + expect(c.llen('queue:customqueue')).to eq(1) + end + end + + describe 'when unique_on_all_queues is set' do + it 'does not push duplicate messages on different queues' do + item = { 'class' => UniqueOnAllQueuesWorker, 'args' => [1, 2] } + Sidekiq::Client.push(item.merge('queue' => 'customqueue')) + Sidekiq::Client.push(item.merge('queue' => 'customqueue2')) + Sidekiq.redis do |c| + expect(c.llen('queue:customqueue')).to eq(1) + expect(c.llen('queue:customqueue2')).to eq(0) + end + end + end + end + + # TODO: If anyone know of a better way to check that the expiration for scheduled + # jobs are set around the same time as the scheduled job itself feel free to improve. + it 'expires the digest when a scheduled job is scheduled at' do + expected_expires_at = + (Time.at(15.minutes.from_now) - Time.now.utc) + SidekiqUniqueJobs.config.default_expiration + jid = MyUniqueWorker.perform_in(expected_expires_at, 'mike') + item = { 'class' => MyUniqueWorker, + 'queue' => 'customqueue', + 'args' => ['mike'], + 'at' => expected_expires_at } + digest = digest_for(item.merge('jid' => jid)) + Sidekiq.redis do |c| + expect(c.ttl(digest)).to eq(9_899) + end + end + + it 'logs duplicate payload when config turned on' do + expect(Sidekiq.logger).to receive(:warn).with(/^payload is not unique/) + UniqueWorker.sidekiq_options log_duplicate_payload: true + 2.times { Sidekiq::Client.push('class' => UniqueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) } + Sidekiq.redis do |c| + expect(c.llen('queue:customqueue')).to eq 1 + end + UniqueWorker.sidekiq_options log_duplicate_payload: true + end + + it 'does not log duplicate payload when config turned off' do + expect(Sidekiq.logger).to_not receive(:warn).with(/^payload is not unique/) + + UniqueWorker.sidekiq_options log_duplicate_payload: false + + 2.times { Sidekiq::Client.push('class' => UniqueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) } + Sidekiq.redis do |c| + expect(c.llen('queue:customqueue')).to eq 1 + end + UniqueWorker.sidekiq_options log_duplicate_payload: true + end + end +end diff --git a/spec/lib/sidekiq_unique_jobs/core_ext_spec.rb b/spec/lib/sidekiq_unique_jobs/core_ext_spec.rb new file mode 100644 index 000000000..0ac8b5621 --- /dev/null +++ b/spec/lib/sidekiq_unique_jobs/core_ext_spec.rb @@ -0,0 +1,25 @@ +require 'spec_helper' + +RSpec.describe Hash do + subject { { test: :me, not: :me } } + + describe '#slice' do + specify { expect(subject.slice(:test)).to eq(test: :me) } + end + + describe '#slice!' do + specify { expect { subject.slice!(:test) }.to change { subject }.to(test: :me) } + end +end + +RSpec.describe String do + describe '#classify' do + subject { 'under_scored_string' } + its(:classify) { is_expected.to eq('UnderScoredString') } + end + + describe '#camelize' do + subject { 'under_scored_string' } + its(:camelize) { is_expected.to eq('UnderScoredString') } + end +end diff --git a/spec/lib/sidekiq_unique_jobs/lock/time_calculator_spec.rb b/spec/lib/sidekiq_unique_jobs/lock/time_calculator_spec.rb new file mode 100644 index 000000000..2ca102f8b --- /dev/null +++ b/spec/lib/sidekiq_unique_jobs/lock/time_calculator_spec.rb @@ -0,0 +1,81 @@ +require 'spec_helper' + +RSpec.describe SidekiqUniqueJobs::Lock::TimeCalculator do + include ActiveSupport::Testing::TimeHelpers + shared_context 'undefined worker class' do + subject { described_class.new('class' => 'test') } + end + + shared_context 'item not scheduled' do + subject { described_class.new('class' => 'MyUniqueWorker') } + end + + describe 'public api' do + subject { described_class.new(nil) } + it { is_expected.to respond_to(:time_until_scheduled) } + it { is_expected.to respond_to(:unique_expiration) } + it { is_expected.to respond_to(:worker_class_unique_expiration) } + it { is_expected.to respond_to(:worker_class) } + it { is_expected.to respond_to(:seconds) } + end + + describe '.for_item' do + it 'initializes a new calculator' do + expect(described_class).to receive(:new).with('WAT') + described_class.for_item('WAT') + end + end + + describe '#seconds' do + subject { described_class.new(nil) } + + before do + allow(subject).to receive(:time_until_scheduled).and_return(10) + allow(subject).to receive(:unique_expiration).and_return(9) + end + its(:seconds) { is_expected.to eq(19) } + end + + describe '#time_until_scheduled' do + it_behaves_like 'item not scheduled' do + its(:time_until_scheduled) { is_expected.to eq(0) } + end + + subject { described_class.new('class' => 'MyUniqueWorker', 'at' => schedule_time) } + let(:schedule_time) { 1.day.from_now.to_i } + let(:now_in_utc) { Time.now.utc.to_i } + + its(:time_until_scheduled) do + travel_to(Time.at(now_in_utc)) do + is_expected.to eq(schedule_time - now_in_utc) + end + end + end + + describe '#unique_expiration' do + it_behaves_like 'undefined worker class' do + its(:unique_expiration) { is_expected.to eq(SidekiqUniqueJobs.config.default_expiration) } + end + + subject { described_class.new('class' => 'MyUniqueWorker') } + its(:unique_expiration) { is_expected.to eq(7_200) } + end + + describe '#worker_class_unique_expiration' do + it_behaves_like 'undefined worker class' do + its(:worker_class_unique_expiration) { is_expected.to eq(nil) } + end + + subject { described_class.new('class' => 'MyUniqueWorker') } + its(:worker_class_unique_expiration) { is_expected.to eq(7_200) } + end + + describe '#worker_class' do + it_behaves_like 'undefined worker class' do + its(:worker_class) { is_expected.to eq('test') } + end + + subject { described_class.new('class' => 'MyWorker') } + its(:worker_class) { is_expected.to eq(MyWorker) } + end +end diff --git a/spec/lib/run_lock_spec.rb b/spec/lib/sidekiq_unique_jobs/lock/while_executing_spec.rb similarity index 60% rename from spec/lib/run_lock_spec.rb rename to spec/lib/sidekiq_unique_jobs/lock/while_executing_spec.rb index ad8f81185..a11bfc046 100644 --- a/spec/lib/run_lock_spec.rb +++ b/spec/lib/sidekiq_unique_jobs/lock/while_executing_spec.rb @@ -1,12 +1,9 @@ require 'spec_helper' -require 'sidekiq_unique_jobs/run_lock' -RSpec.describe SidekiqUniqueJobs::RunLock do +RSpec.describe SidekiqUniqueJobs::Lock::WhileExecuting do it 'allows only one mutex object to have the lock at a time' do mutexes = (1..10).map do - SidekiqUniqueJobs.connection do |conn| - SidekiqUniqueJobs::RunLock.new('test_mutex_key', conn) - end + described_class.new('test_mutex_key') end x = 0 @@ -24,12 +21,10 @@ end it 'handles auto cleanup correctly' do - m = SidekiqUniqueJobs.connection do |conn| - SidekiqUniqueJobs::RunLock.new('test_mutex_key', conn) - end + m = described_class.new('test_mutex_key') SidekiqUniqueJobs.connection do |conn| - conn.setnx 'test_mutex_key', Time.now.to_i - 1 + conn.set 'test_mutex_key', Time.now.to_i - 1, nx: true end start = Time.now.to_i @@ -42,9 +37,7 @@ end it 'maintains mutex semantics' do - m = SidekiqUniqueJobs.connection do |conn| - SidekiqUniqueJobs::RunLock.new('test_mutex_key', conn) - end + m = described_class.new('test_mutex_key') expect do m.synchronize do diff --git a/spec/lib/sidekiq_unique_jobs/normalizer_spec.rb b/spec/lib/sidekiq_unique_jobs/normalizer_spec.rb new file mode 100644 index 000000000..74e65328b --- /dev/null +++ b/spec/lib/sidekiq_unique_jobs/normalizer_spec.rb @@ -0,0 +1,21 @@ +require 'spec_helper' + +RSpec.describe SidekiqUniqueJobs::Normalizer do + def jsonify(args) + described_class.jsonify(args) + end + + describe '.jsonify' do + specify do + original = [1, :test, [test: :test]] + expected = [1, 'test', ['test' => 'test']] + expect(jsonify(original)).to eq(expected) + end + + specify do + original = [1, :test, [test: [test: :test]]] + expected = [1, 'test', ['test' => ['test' => 'test']]] + expect(jsonify(original)).to eq(expected) + end + end +end diff --git a/spec/lib/sidekiq_unique_jobs/scripts_spec.rb b/spec/lib/sidekiq_unique_jobs/scripts_spec.rb new file mode 100644 index 000000000..36fb6b9f2 --- /dev/null +++ b/spec/lib/sidekiq_unique_jobs/scripts_spec.rb @@ -0,0 +1,74 @@ +require 'spec_helper' +RSpec.describe SidekiqUniqueJobs::Scripts do + include ActiveSupport::Testing::TimeHelpers + MD5_DIGEST ||= 'unique'.freeze + UNIQUE_KEY ||= 'uniquejobs:unique'.freeze + JID ||= 'fuckit'.freeze + ANOTHER_JID ||= 'anotherjid'.freeze + + context 'class methods' do + subject { SidekiqUniqueJobs::Scripts } + + it { is_expected.to respond_to(:call).with(3).arguments } + it { is_expected.to respond_to(:logger) } + it { is_expected.to respond_to(:script_shas) } + it { is_expected.to respond_to(:connection).with(1).arguments } + it { is_expected.to respond_to(:script_source).with(1).arguments } + it { is_expected.to respond_to(:script_path).with(1).arguments } + + describe '.script_shas' do + its(:script_shas) { is_expected.to be_a(Hash) } + end + + describe '.logger' do + its(:logger) { is_expected.to eq(Sidekiq.logger) } + end + + def lock_for(seconds = 1, jid = JID, key = UNIQUE_KEY) + subject.call(:aquire_lock, nil, keys: [key], argv: [jid, seconds]) + end + + def unlock(key = UNIQUE_KEY, jid = JID) + subject.call(:release_lock, nil, keys: [key], argv: [jid]) + end + + describe '.aquire_lock' do + context 'when job is unique' do + specify { expect(lock_for).to eq(1) } + specify do + expect(lock_for(0.5.seconds)).to eq(1) + expect(Redis) + .to have_key(UNIQUE_KEY) + .for_seconds(1) + .with_value('fuckit') + sleep 0.5 + expect(lock_for).to eq(1) + end + + context 'when job is locked' do + before { expect(lock_for).to eq(1) } + specify { expect(lock_for).to eq(0) } + end + end + end + + describe '.release_lock' do + context 'when job is locked by another jid' do + before { expect(lock_for(10.seconds, 'anotherjid')).to eq(1) } + specify { expect(unlock).to eq(0) } + after { unlock(UNIQUE_KEY, ANOTHER_JID) } + end + + context 'when job is not locked at all' do + specify { expect(unlock).to eq(-1) } + end + + context 'when job is locked by the same jid' do + specify do + expect(lock_for(10.seconds)).to eq(1) + expect(unlock).to eq(1) + end + end + end + end +end diff --git a/spec/lib/sidekiq_unique_jobs/server/middleware_spec.rb b/spec/lib/sidekiq_unique_jobs/server/middleware_spec.rb new file mode 100644 index 000000000..8f3becbac --- /dev/null +++ b/spec/lib/sidekiq_unique_jobs/server/middleware_spec.rb @@ -0,0 +1,100 @@ +require 'spec_helper' +require 'sidekiq/api' +require 'sidekiq/cli' +require 'sidekiq/worker' +require 'sidekiq_unique_jobs/server/middleware' + +RSpec.describe SidekiqUniqueJobs::Server::Middleware do + QUEUE ||= 'unlock_ordering' + + def digest_for(item) + SidekiqUniqueJobs::UniqueArgs.digest(item) + end + + describe '#call' do + describe 'unlock order' do + before do + Sidekiq.redis = REDIS + Sidekiq.redis(&:flushdb) + end + + describe '#unlock' do + it 'does not unlock mutexes it does not own' do + jid = AfterYieldWorker.perform_async + item = Sidekiq::Queue.new(QUEUE).find_job(jid).item + Sidekiq.redis do |c| + c.set(digest_for(item), 'NOT_DELETED') + end + + subject.call(AfterYieldWorker.new, item, QUEUE) do + Sidekiq.redis do |c| + expect(c.get(digest_for(item))).to eq('NOT_DELETED') + end + end + end + end + + describe ':before_yield' do + it 'removes the lock before yielding to the worker' do + jid = BeforeYieldWorker.perform_async + item = Sidekiq::Queue.new(QUEUE).find_job(jid).item + worker = BeforeYieldWorker.new + subject.call(worker, item, QUEUE) do + Sidekiq.redis do |c| + expect(c.ttl(digest_for(item))).to eq(-2) # key does not exist + end + end + end + end + + describe ':after_yield' do + it 'removes the lock after yielding to the worker' do + jid = AfterYieldWorker.perform_async + item = Sidekiq::Queue.new(QUEUE).find_job(jid).item + + subject.call('AfterYieldWorker', item, QUEUE) do + Sidekiq.redis do |c| + expect(c.get(digest_for(item))).to eq jid + end + end + end + end + end + + context 'unlock' do + let(:worker) { AfterYieldWorker.new } + + before do + jid = AfterYieldWorker.perform_async + @item = Sidekiq::Queue.new('unlock_ordering').find_job(jid).item + end + + it 'unlocks after yield when call succeeds' do + expect(subject).to receive(:unlock) + subject.call(worker, @item, 'unlock_ordering') { true } + end + + it 'unlocks after yield when call errors' do + expect(subject).to receive(:unlock) + allow(subject).to receive(:after_yield_yield) { fail 'WAT!' } + expect { subject.call(worker, @item, 'unlock_ordering') } + .to raise_error + end + + it 'should not unlock after yield on shutdown, but still raise error' do + expect(subject).not_to receive(:unlock) + allow(subject).to receive(:after_yield_yield) { fail Sidekiq::Shutdown } + expect { subject.call(worker, @item, 'unlock_ordering') } + .to raise_error(Sidekiq::Shutdown) + end + + it 'calls after_unlock_hook if defined' do + allow(subject).to receive(:unlock).and_call_original + allow(subject).to receive(:after_unlock_hook).and_call_original + + expect(worker).to receive(:after_unlock) + subject.call(worker, @item, 'unlock_ordering') { true } + end + end + end +end diff --git a/spec/lib/sidekiq_testing_enabled_spec.rb b/spec/lib/sidekiq_unique_jobs/sidekiq_testing_enabled_spec.rb similarity index 85% rename from spec/lib/sidekiq_testing_enabled_spec.rb rename to spec/lib/sidekiq_unique_jobs/sidekiq_testing_enabled_spec.rb index 7f8772b15..855a586c1 100644 --- a/spec/lib/sidekiq_testing_enabled_spec.rb +++ b/spec/lib/sidekiq_unique_jobs/sidekiq_testing_enabled_spec.rb @@ -107,12 +107,13 @@ expect { Sidekiq::Worker.jobs.size }.not_to raise_error end - it 'adds the unique_hash to the message' do + it 'adds the unique_digest to the message' do param = 'hash' - hash = SidekiqUniqueJobs.get_payload(UniqueWorker, :working, [param]) + item = { 'class' => 'UniqueWorker', 'queue' => 'working', 'args' => [param] } + hash = SidekiqUniqueJobs::UniqueArgs.digest(item) expect(UniqueWorker.perform_async(param)).to_not be_nil expect(UniqueWorker.jobs.size).to eq(1) - expect(UniqueWorker.jobs.first['unique_hash']).to eq(hash) + expect(UniqueWorker.jobs.last['unique_digest']).to eq(hash) end end @@ -130,42 +131,9 @@ end describe 'when set to :inline!', sidekiq: :inline do - class InlineWorker - include Sidekiq::Worker - sidekiq_options unique: true - - def perform(x) - TestClass.run(x) - end - end - - class InlineUnlockOrderWorker - include Sidekiq::Worker - sidekiq_options unique: true, unique_unlock_order: :never - - def perform(x) - TestClass.run(x) - end - end - - class InlineExpirationWorker - include Sidekiq::Worker - sidekiq_options unique: true, unique_unlock_order: :never, - unique_job_expiration: 10 * 60 - def perform(x) - TestClass.run(x) - end - end - - class TestClass - def self.run(_) - end - end - it 'once the job is completed allows to run another one' do - expect(TestClass).to receive(:run).with('test') + expect(TestClass).to receive(:run).with('test').twice InlineWorker.perform_async('test') - expect(TestClass).to receive(:run).with('test') InlineWorker.perform_async('test') end diff --git a/spec/lib/sidekiq_unique_ext_spec.rb b/spec/lib/sidekiq_unique_jobs/sidekiq_unique_ext_spec.rb similarity index 66% rename from spec/lib/sidekiq_unique_ext_spec.rb rename to spec/lib/sidekiq_unique_jobs/sidekiq_unique_ext_spec.rb index afbbc13cf..9db7d0649 100644 --- a/spec/lib/sidekiq_unique_ext_spec.rb +++ b/spec/lib/sidekiq_unique_jobs/sidekiq_unique_ext_spec.rb @@ -6,25 +6,27 @@ require 'sidekiq_unique_jobs/sidekiq_unique_ext' RSpec.describe 'Sidekiq::Api' do - class JustAWorker - include Sidekiq::Worker - - sidekiq_options unique: true, queue: 'testqueue' - - def perform - end - end - before do Sidekiq.redis = REDIS Sidekiq.redis(&:flushdb) end - let(:params) { { foo: 'bar' } } - let(:payload_hash) { SidekiqUniqueJobs.get_payload('JustAWorker', 'testqueue', [params]) } + let(:item) do + { 'class' => JustAWorker, + 'queue' => 'testqueue', + 'args' => [foo: 'bar'] } + end + + def unique_key + SidekiqUniqueJobs::UniqueArgs.digest( + 'class' => JustAWorker, + 'queue' => 'testqueue', + 'args' => [foo: 'bar'], + 'at' => Time.now.tomorrow.to_i) + end def schedule_job - JustAWorker.perform_in(60 * 60 * 3, params) + JustAWorker.perform_in(60 * 60 * 3, foo: 'bar') end def perform_async @@ -33,14 +35,14 @@ def perform_async describe Sidekiq::SortedEntry::UniqueExtension, sidekiq_ver: '>= 3.1' do it 'deletes uniqueness lock on delete' do - schedule_job + expect(schedule_job).to be_truthy Sidekiq::ScheduledSet.new.each(&:delete) Sidekiq.redis do |c| - expect(c.exists(payload_hash)).to be_falsy + expect(c.exists(unique_key)).to be_falsy end - expect(schedule_job).not_to eq(nil) + expect(schedule_job).to be_truthy end end @@ -49,8 +51,9 @@ def perform_async jid = perform_async Sidekiq::Queue.new('testqueue').find_job(jid).delete Sidekiq.redis do |c| - expect(c.exists(payload_hash)).to be_falsy + expect(c.exists(unique_key)).to be_falsy end + expect(true).to be_truthy end end @@ -59,7 +62,7 @@ def perform_async perform_async Sidekiq::Queue.new('testqueue').clear Sidekiq.redis do |c| - expect(c.exists(payload_hash)).to be_falsy + expect(c.exists(unique_key)).to be_falsy end end end @@ -69,7 +72,7 @@ def perform_async schedule_job Sidekiq::JobSet.new('schedule').clear Sidekiq.redis do |c| - expect(c.exists(payload_hash)).to be_falsy + expect(c.exists(unique_key)).to be_falsy end end end diff --git a/spec/lib/sidekiq_unique_jobs_spec.rb b/spec/lib/sidekiq_unique_jobs/sidekiq_unique_jobs_spec.rb similarity index 100% rename from spec/lib/sidekiq_unique_jobs_spec.rb rename to spec/lib/sidekiq_unique_jobs/sidekiq_unique_jobs_spec.rb diff --git a/spec/lib/sidekiq_unique_jobs/unique_args_spec.rb b/spec/lib/sidekiq_unique_jobs/unique_args_spec.rb new file mode 100644 index 000000000..1eff08655 --- /dev/null +++ b/spec/lib/sidekiq_unique_jobs/unique_args_spec.rb @@ -0,0 +1,106 @@ +require 'spec_helper' + +RSpec.describe SidekiqUniqueJobs::UniqueArgs do + let(:item) { { 'class' => 'UniqueWorker', 'queue' => 'myqueue', 'args' => [[1, 2]] } } + subject { described_class.new(item) } + + describe '#unique_args_enabled_in_worker?' do + with_sidekiq_options_for(UniqueWorker, unique_args: :unique_args) do + its(:unique_args_enabled_in_worker?) { is_expected.to eq(:unique_args) } + end + + with_sidekiq_options_for(UniqueWorker, unique_args: false) do + its(:unique_args_enabled_in_worker?) { is_expected.to eq(false) } + end + + # For when a worker doesn't exist in the current context + with_sidekiq_options_for('NotAWorker') do + its(:unique_args_enabled_in_worker?) { is_expected.to eq(nil) } + end + end + + describe '#unique_args_enabled?' do + with_global_config(unique_args_enabled: true) do + with_sidekiq_options_for(UniqueWorker, unique_args: :unique_args) do + its(:unique_args_enabled?) { is_expected.to eq(:unique_args) } + end + + with_sidekiq_options_for(UniqueWorker, unique_args: false) do + its(:unique_args_enabled?) { is_expected.to eq(true) } + end + end + + with_global_config(unique_args_enabled: false) do + with_sidekiq_options_for(UniqueWorker, unique_args: :unique_args) do + its(:unique_args_enabled?) { is_expected.to eq(:unique_args) } + end + + with_sidekiq_options_for(UniqueWorker, unique_args: false) do + its(:unique_args_enabled?) { is_expected.to eq(false) } + end + + its(:unique_args_enabled?) { is_expected.to eq(false) } + end + end + + describe '#unique_on_all_queues?' do + with_global_config(unique_args_enabled: true) do + its(:unique_on_all_queues?) { is_expected.to eq(nil) } + + with_sidekiq_options_for(UniqueWorker, unique_args: :unique_args, unique_on_all_queues: true) do + its(:unique_on_all_queues?) { is_expected.to eq(true) } + end + + with_sidekiq_options_for(UniqueWorker, unique_args: :unique_args, unique_on_all_queues: false) do + its(:unique_on_all_queues?) { is_expected.to eq(false) } + end + + # For when a worker doesn't exist in the current context + with_sidekiq_options_for('NotAWorker', unique_args: :unique_args, unique_on_all_queues: true) do + its(:unique_args_enabled_in_worker?) { is_expected.to eq(nil) } + end + end + + with_global_config(unique_args_enabled: false) do + its(:unique_on_all_queues?) { is_expected.to eq(nil) } + + with_sidekiq_options_for(UniqueWorker, unique_args: :unique_args, unique_on_all_queues: false) do + its(:unique_on_all_queues?) { is_expected.to eq(false) } + end + + with_sidekiq_options_for(UniqueWorker, unique_args: :unique_args, unique_on_all_queues: true) do + its(:unique_on_all_queues?) { is_expected.to eq(true) } + end + + # For when a worker doesn't exist in the current context + with_sidekiq_options_for('NotAWorker', unique_args: :unique_args, unique_on_all_queues: true) do + its(:unique_args_enabled_in_worker?) { is_expected.to eq(nil) } + end + end + end + + describe '#filter_by_proc' do + let(:proc) { ->(args) { args[1]['test'] } } + before do + allow(subject).to receive(:unique_args_method).and_return(proc) + end + + it 'returns the value of the provided ' do + expect(subject.filter_by_proc([1, 'test' => 'it'])).to eq('it') + end + end + + describe '#filter_by_symbol' do + let(:item) do + { 'class' => 'UniqueJobWithFilterMethod', + 'queue' => 'myqueue', + 'args' => [[1, 2]] } + end + subject { described_class.new(item) } + + it 'returns the value of the provided class method' do + expect(subject.filter_by_symbol(['name', 2, 'whatever' => nil, 'type' => 'test'])) + .to eq(%w(name test)) + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 34d7e196e..dba6fdb4f 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,45 +1,40 @@ -if RUBY_ENGINE == 'ruby' - if ENV['CI'] - require 'codeclimate-test-reporter' - CodeClimate::TestReporter.start - else - require 'simplecov' - require 'simplecov-json' - SimpleCov.refuse_coverage_drop - SimpleCov.formatters = [ - SimpleCov::Formatter::HTMLFormatter, - SimpleCov::Formatter::JSONFormatter - ] - SimpleCov.start do - add_filter '/spec/' - add_filter '/bin/' - add_filter '/gemfiles/' - end - end +if ENV['CI'] && RUBY_ENGINE == 'ruby' + require 'codeclimate-test-reporter' + CodeClimate::TestReporter.start +else + require 'simplecov' end begin - require 'pry-suite' + require 'pry-byebug' rescue LoadError puts 'Pry unavailable' end require 'rspec' - +require 'rspec/its' +require 'celluloid/current' require 'celluloid/test' require 'sidekiq' require 'sidekiq/util' require 'sidekiq-unique-jobs' -Sidekiq.logger.level = Logger::ERROR require 'sidekiq_unique_jobs/testing' +require 'active_support/all' +require 'active_support/testing/time_helpers' require 'rspec-sidekiq' Sidekiq::Testing.disable! +Sidekiq.logger.level = "Logger::#{ENV.fetch('LOGLEVEL') { 'error' }.upcase}".constantize require 'sidekiq/redis_connection' -redis_url = ENV['REDIS_URL'] || 'redis://localhost/15' -REDIS = Sidekiq::RedisConnection.create(url: redis_url, namespace: 'sidekiq-unique-jobs-testing') +REDIS_URL ||= ENV['REDIS_URL'] || 'redis://localhost/15'.freeze +REDIS_NAMESPACE ||= 'unique-test'.freeze +REDIS ||= Sidekiq::RedisConnection.create(url: REDIS_URL, namespace: REDIS_NAMESPACE) + +Sidekiq.configure_client do |config| + config.redis = { url: REDIS_URL, namespace: REDIS_NAMESPACE } +end Dir[File.join(File.dirname(__FILE__), 'support', '**', '*.rb')].each { |f| require f } RSpec.configure do |config| @@ -68,3 +63,5 @@ # Warn when jobs are not enqueued to Redis but to a job array config.warn_when_jobs_not_processed_by_sidekiq = false end + +Dir[File.join(File.dirname(__FILE__), 'workers', '**', '*.rb')].each { |f| require f } diff --git a/spec/support/matchers/redis_matchers.rb b/spec/support/matchers/redis_matchers.rb new file mode 100644 index 000000000..e2ec064f2 --- /dev/null +++ b/spec/support/matchers/redis_matchers.rb @@ -0,0 +1,19 @@ +require 'rspec/expectations' + +RSpec::Matchers.define :have_key do |unique_key| + Sidekiq.redis do |redis| + match do |_actual| + with_value && for_seconds + end + + chain :with_value do |value = nil| + value.nil? || + redis.get(unique_key) == value + end + + chain :for_seconds do |ttl = nil| + ttl.nil? || + redis.ttl(unique_key) == ttl + end + end +end diff --git a/spec/support/sidekiq_meta.rb b/spec/support/sidekiq_meta.rb index 54d135265..29d30b1a0 100644 --- a/spec/support/sidekiq_meta.rb +++ b/spec/support/sidekiq_meta.rb @@ -1,6 +1,7 @@ RSpec.configure do |config| VERSION_REGEX = /(?[<>=]+)?\s?(?(\d+.?)+)/m.freeze config.before(:each) do |example| + Sidekiq.redis(&:flushdb) Sidekiq::Worker.clear_all if (sidekiq = example.metadata[:sidekiq]) sidekiq = :fake if sidekiq == true @@ -14,7 +15,7 @@ end unless Sidekiq::VERSION.send(operator, version) - skip("Skipped due to version check (requirement was that sidekiq version is " \ + skip('Skipped due to version check (requirement was that sidekiq version is ' \ "#{operator} #{version}; was #{Sidekiq::VERSION})") end if version && operator end diff --git a/spec/support/unique_macros.rb b/spec/support/unique_macros.rb new file mode 100644 index 000000000..e887aeb28 --- /dev/null +++ b/spec/support/unique_macros.rb @@ -0,0 +1,52 @@ +module SidekiqUniqueJobs + module RSpec + module InstanceMethods + # enable versioning for specific blocks (at instance-level) + def with_global_config(config) + was_config = SidekiqUniqueJobs.config + SidekiqUniqueJobs.configure(config) + yield + ensure + SidekiqUniqueJobs.configure(was_config) + end + + # enable versioning for specific blocks (at instance-level) + def with_sidekiq_options_for(worker_class, options) + worker_class = SidekiqUniqueJobs.worker_class_constantize(worker_class) + if worker_class.respond_to?(:sidekiq_options) + was_options = worker_class.get_sidekiq_options + worker_class.sidekiq_options(options) + end + yield + ensure + worker_class.sidekiq_options_hash = was_options if worker_class.respond_to?(:sidekiq_options_hash=) + end + end + + module ClassMethods + def with_sidekiq_options_for(worker_class, options = {}, &block) + context "with sidekiq options #{options}" do + around(:each) do |ex| + with_sidekiq_options_for(worker_class, options, &ex) + end + class_exec(&block) + end + end + + # enable versioning for specific blocks (at class-level) + def with_global_config(config = {}, &block) + context "with global configuration #{config}" do + around(:each) do |ex| + with_global_config(config, &ex) + end + class_exec(&block) + end + end + end + end +end + +RSpec.configure do |config| + config.include SidekiqUniqueJobs::RSpec::InstanceMethods + config.extend SidekiqUniqueJobs::RSpec::ClassMethods +end diff --git a/spec/workers/after_unlock_worker.rb b/spec/workers/after_unlock_worker.rb new file mode 100644 index 000000000..f538e3fdb --- /dev/null +++ b/spec/workers/after_unlock_worker.rb @@ -0,0 +1,13 @@ +class AfterUnlockWorker + include Sidekiq::Worker + sidekiq_options queue: :working, retry: 1, backtrace: 10 + sidekiq_options unique: true, unique_lock: :until_executed + + sidekiq_retries_exhausted do |msg| + Sidekiq.logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']}" + end + + def perform(_) + fail 'HELL' + end +end diff --git a/spec/support/after_yield_worker.rb b/spec/workers/after_yield_worker.rb similarity index 57% rename from spec/support/after_yield_worker.rb rename to spec/workers/after_yield_worker.rb index 6ac0a49e4..6a3b39ced 100644 --- a/spec/support/after_yield_worker.rb +++ b/spec/workers/after_yield_worker.rb @@ -1,7 +1,7 @@ class AfterYieldWorker include Sidekiq::Worker - sidekiq_options queue: :working, retry: 1, backtrace: 10, unique_unlock_order: :after_yield, - unique: true + sidekiq_options queue: :unlock_ordering, retry: 1, backtrace: 10 + sidekiq_options unique: true, unique_lock: :until_executed sidekiq_retries_exhausted do |msg| Sidekiq.logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']}" @@ -10,4 +10,8 @@ class AfterYieldWorker def perform(*) # NO-OP end + + def after_unlock + fail 'HELL' + end end diff --git a/spec/support/another_unique_worker.rb b/spec/workers/another_unique_worker.rb similarity index 82% rename from spec/support/another_unique_worker.rb rename to spec/workers/another_unique_worker.rb index e41401af9..be5ee512c 100644 --- a/spec/support/another_unique_worker.rb +++ b/spec/workers/another_unique_worker.rb @@ -1,7 +1,7 @@ class AnotherUniqueWorker include Sidekiq::Worker sidekiq_options queue: :working, retry: 1, backtrace: 10 - sidekiq_options unique: true + sidekiq_options unique: true, unique_lock: :until_executed sidekiq_retries_exhausted do |msg| Sidekiq.logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']}" diff --git a/spec/workers/before_yield_worker.rb b/spec/workers/before_yield_worker.rb new file mode 100644 index 000000000..6773e7095 --- /dev/null +++ b/spec/workers/before_yield_worker.rb @@ -0,0 +1,9 @@ +class BeforeYieldWorker + include Sidekiq::Worker + + sidekiq_options queue: :unlock_ordering + sidekiq_options unique: true, unique_lock: :until_executing + + def perform + end +end diff --git a/spec/workers/expiring_worker.rb b/spec/workers/expiring_worker.rb new file mode 100644 index 000000000..2b858464d --- /dev/null +++ b/spec/workers/expiring_worker.rb @@ -0,0 +1,4 @@ +class ExpiringWorker + include Sidekiq::Worker + sidekiq_options unique: true, unique_expiration: 10 * 60, unique_lock: :until_executed +end diff --git a/spec/workers/inline_expiration_worker.rb b/spec/workers/inline_expiration_worker.rb new file mode 100644 index 000000000..086c2f389 --- /dev/null +++ b/spec/workers/inline_expiration_worker.rb @@ -0,0 +1,8 @@ +class InlineExpirationWorker + include Sidekiq::Worker + sidekiq_options unique: true, unique_lock: :until_timeout, + unique_expiration: 10 * 60 + def perform(x) + TestClass.run(x) + end +end diff --git a/spec/workers/inline_unlock_order_worker.rb b/spec/workers/inline_unlock_order_worker.rb new file mode 100644 index 000000000..81ae087fa --- /dev/null +++ b/spec/workers/inline_unlock_order_worker.rb @@ -0,0 +1,8 @@ +class InlineUnlockOrderWorker + include Sidekiq::Worker + sidekiq_options unique: true, unique_lock: :until_timeout + + def perform(x) + TestClass.run(x) + end +end diff --git a/spec/workers/inline_worker.rb b/spec/workers/inline_worker.rb new file mode 100644 index 000000000..38d402fd1 --- /dev/null +++ b/spec/workers/inline_worker.rb @@ -0,0 +1,8 @@ +class InlineWorker + include Sidekiq::Worker + sidekiq_options unique: true + + def perform(x) + TestClass.run(x) + end +end diff --git a/spec/workers/just_a_worker.rb b/spec/workers/just_a_worker.rb new file mode 100644 index 000000000..d9ec18239 --- /dev/null +++ b/spec/workers/just_a_worker.rb @@ -0,0 +1,8 @@ +class JustAWorker + include Sidekiq::Worker + + sidekiq_options unique: true, queue: 'testqueue', unique_lock: :until_executed + + def perform + end +end diff --git a/spec/workers/main_job.rb b/spec/workers/main_job.rb new file mode 100644 index 000000000..044e675e1 --- /dev/null +++ b/spec/workers/main_job.rb @@ -0,0 +1,8 @@ +class MainJob + include Sidekiq::Worker + sidekiq_options queue: :customqueue, unique: true, unique_lock: :until_executed + sidekiq_options log_duplicate_payload: true + + def perform(_) + end +end diff --git a/spec/workers/my_unique_worker.rb b/spec/workers/my_unique_worker.rb new file mode 100644 index 000000000..4e1af8a38 --- /dev/null +++ b/spec/workers/my_unique_worker.rb @@ -0,0 +1,8 @@ +class MyUniqueWorker + include Sidekiq::Worker + sidekiq_options queue: :customqueue, retry: true, unique: true, + unique_expiration: 7_200, retry_count: 10, + unique_lock: :until_executed + def perform(_) + end +end diff --git a/spec/support/my_worker.rb b/spec/workers/my_worker.rb similarity index 100% rename from spec/support/my_worker.rb rename to spec/workers/my_worker.rb diff --git a/spec/workers/plain_class.rb b/spec/workers/plain_class.rb new file mode 100644 index 000000000..013b9c7a0 --- /dev/null +++ b/spec/workers/plain_class.rb @@ -0,0 +1,4 @@ +class PlainClass + def run(_x) + end +end diff --git a/spec/workers/queue_worker.rb b/spec/workers/queue_worker.rb new file mode 100644 index 000000000..0268e26fe --- /dev/null +++ b/spec/workers/queue_worker.rb @@ -0,0 +1,6 @@ +class QueueWorker + include Sidekiq::Worker + sidekiq_options queue: :customqueue + def perform(_) + end +end diff --git a/spec/workers/queue_worker_with_filter_method.rb b/spec/workers/queue_worker_with_filter_method.rb new file mode 100644 index 000000000..5b0efff02 --- /dev/null +++ b/spec/workers/queue_worker_with_filter_method.rb @@ -0,0 +1,7 @@ +class QueueWorkerWithFilterMethod < QueueWorker + sidekiq_options unique: true, unique_args: :args_filter, unique_lock: :until_executed + + def self.args_filter(*args) + args.first + end +end diff --git a/spec/workers/queue_worker_with_filter_proc.rb b/spec/workers/queue_worker_with_filter_proc.rb new file mode 100644 index 000000000..a42b34421 --- /dev/null +++ b/spec/workers/queue_worker_with_filter_proc.rb @@ -0,0 +1,11 @@ +class QueueWorkerWithFilterProc < QueueWorker + # slightly contrived example of munging args to the + # worker and removing a random bit. + sidekiq_options unique: true, + unique_lock: :until_timeout, + unique_args: (lambda do |*args| + options = args.extract_options! + options.delete(:random) + [args, options] + end) +end diff --git a/spec/workers/run_lock_with_retries_worker.rb b/spec/workers/run_lock_with_retries_worker.rb new file mode 100644 index 000000000..618c66aa3 --- /dev/null +++ b/spec/workers/run_lock_with_retries_worker.rb @@ -0,0 +1,12 @@ +class RunLockWithRetriesWorker + include Sidekiq::Worker + + sidekiq_options unique: true, + unique_locks: :while_executing, + queue: :unlock_ordering, + run_lock_retries: 10, + run_lock_retry_interval: 0, + reschedule_on_lock_fail: true + def perform + end +end diff --git a/spec/workers/run_lock_worker.rb b/spec/workers/run_lock_worker.rb new file mode 100644 index 000000000..936cb1264 --- /dev/null +++ b/spec/workers/run_lock_worker.rb @@ -0,0 +1,7 @@ +class RunLockWorker + include Sidekiq::Worker + + sidekiq_options unique: true, unique_locks: :while_executing, queue: :unlock_ordering + def perform + end +end diff --git a/spec/workers/test_class.rb b/spec/workers/test_class.rb new file mode 100644 index 000000000..681e6bdd3 --- /dev/null +++ b/spec/workers/test_class.rb @@ -0,0 +1,4 @@ +class TestClass + def self.run(_) + end +end diff --git a/spec/workers/unique_job_with_filter_method.rb b/spec/workers/unique_job_with_filter_method.rb new file mode 100644 index 000000000..d8dd49d02 --- /dev/null +++ b/spec/workers/unique_job_with_filter_method.rb @@ -0,0 +1,18 @@ +class UniqueJobWithFilterMethod + include Sidekiq::Worker + sidekiq_options queue: :customqueue, retry: 1, backtrace: 10 + sidekiq_options unique: true, unique_args: :filtered_args + + sidekiq_retries_exhausted do |msg| + Sidekiq.logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']}" + end + + def perform(*) + # NO-OP + end + + def self.filtered_args(args) + options = args.extract_options! + [args.first, options['type']] + end +end diff --git a/spec/support/after_unlock_worker.rb b/spec/workers/unique_on_all_queues_worker.rb similarity index 51% rename from spec/support/after_unlock_worker.rb rename to spec/workers/unique_on_all_queues_worker.rb index e69f3f8ae..593fae384 100644 --- a/spec/support/after_unlock_worker.rb +++ b/spec/workers/unique_on_all_queues_worker.rb @@ -1,7 +1,7 @@ -class AfterUnlockWorker +class UniqueOnAllQueuesWorker include Sidekiq::Worker - sidekiq_options queue: :working, retry: 1, backtrace: 10, unique_unlock_order: :after_yield, - unique: true + sidekiq_options queue: :working, retry: 1, backtrace: 10 + sidekiq_options unique: true, unique_on_all_queues: true, unique_lock: :until_executed sidekiq_retries_exhausted do |msg| Sidekiq.logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']}" @@ -10,8 +10,4 @@ class AfterUnlockWorker def perform(*) # NO-OP end - - def after_unlock(*) - # NO-OP - end end diff --git a/spec/support/unique_worker.rb b/spec/workers/unique_worker.rb similarity index 82% rename from spec/support/unique_worker.rb rename to spec/workers/unique_worker.rb index 0dec83333..e6f35d677 100644 --- a/spec/support/unique_worker.rb +++ b/spec/workers/unique_worker.rb @@ -1,7 +1,7 @@ class UniqueWorker include Sidekiq::Worker sidekiq_options queue: :working, retry: 1, backtrace: 10 - sidekiq_options unique: true + sidekiq_options unique: true, unique_lock: :until_executed sidekiq_retries_exhausted do |msg| Sidekiq.logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']}" diff --git a/spec/workers/while_executing_worker.rb b/spec/workers/while_executing_worker.rb new file mode 100644 index 000000000..3b81fa009 --- /dev/null +++ b/spec/workers/while_executing_worker.rb @@ -0,0 +1,13 @@ +class WhileExecutingWorker + include Sidekiq::Worker + sidekiq_options queue: :working, retry: 1, backtrace: 10 + sidekiq_options unique: true, unique_lock: :while_executing + + sidekiq_retries_exhausted do |msg| + Sidekiq.logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']}" + end + + def perform(_) + fail 'HELL' + end +end