Skip to content

Commit

Permalink
Merge branch 'refactor-locks'
Browse files Browse the repository at this point in the history
  • Loading branch information
mhenrixon committed Oct 5, 2015
2 parents 1ab43a3 + 23b3350 commit 25c36a4
Show file tree
Hide file tree
Showing 73 changed files with 1,631 additions and 871 deletions.
14 changes: 14 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# This file is for unifying the coding style for different editors and IDEs
# editorconfig.org

root = true

[*]
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true
indent_style = space
indent_size = 2

[*.md]
trim_trailing_whitespace = true
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ Gemfile.lock
gemfiles/*.lock
*.sw?
coverage/
tmp/
12 changes: 12 additions & 0 deletions .simplecov
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
require 'simplecov-json'

SimpleCov.refuse_coverage_drop
SimpleCov.formatters = [
SimpleCov::Formatter::HTMLFormatter,
SimpleCov::Formatter::JSONFormatter
]
SimpleCov.start do
add_filter '/spec/'
add_filter '/bin/'
add_filter '/gemfiles/'
end
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## v3.0.16 (Unreleased)

- Improved uniqueness handling (complete refactoring, upgrade with causion)
- 100% breaking changes

## v3.0.15
- Jobs only ever unlock themselves now (see #96 & #94 for info) thanks @pik
Expand Down
11 changes: 7 additions & 4 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ gemspec

gem 'appraisal', '~> 2.0.0'

gem 'rspec-its', require: false

platform :mri do
gem 'pry-suite', require: false
gem 'let_it_go', require: false
gem 'memory-profiler', require: false
gem 'simplecov-json', require: false
gem 'pry', require: false
gem 'pry-rescue', require: false
gem 'pry-byebug', require: false
gem 'simplecov-json', require: false
gem 'memory_profiler', require: false
gem 'codeclimate-test-reporter', require: false
end
43 changes: 39 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ The missing unique jobs for sidekiq

See https://github.com/mperham/sidekiq#requirements for what is required. Starting from 3.0.13 only sidekiq 3 is supported and support for MRI 1.9 is dropped (it might work but won't be worked on)

Version 4 requires redis 2.6.2!! Don't upgrade to version 4 unless you are on redis 2.6.2.

## Upgrade instructions

Easy path - Drop all your unique jobs before upgrading the gem!
Hard path - See above... Start with a clean slate :)

## Installation

Add this line to your application's Gemfile:
Expand All @@ -20,6 +27,34 @@ Or install it yourself as:

$ gem install sidekiq-unique-jobs

## A word on locking

Like @mperham mentions on (this wiki page)[https://github.com/mperham/sidekiq/wiki/Related-Projects#unique-jobs] it is hard to enforce uniqueness with redis in a distributed redis setting.

To make things worse there are many ways of wanting to enforce uniqueness.

### While Executing

Is to make sure that a job can be scheduled any number of times but only executed a single time per argument provided to the job we call this runtime uniqueness. This is probably most useful forbackground jobs that are fast to execute. (See mhenrixon/sidekiq-unique-jobs#111 for a great example of when this would be right.) While the job is executing/performing no other jobs can be executed at the same time.

### Until Executing

This means that a job can only be scheduled into redis once per whatever the configuration of unique arguments. Any jobs added until the first one of the same arguments has been unlocked will just be dropped. This is what was tripping many people up. They would schedule a job to run in the future and it would be impossible to schedule new jobs with those same arguments even immediately. There was some forth and back between also locking jobs on the scheduled queue and the regular queues but in the end I decided it was best to separate these two features out into different locking mechanisms. I think what most people are after is to be able to lock a job while executing or that seems to be what people are most missing at the moment.

### Until Executed

This is the combination of the two above. First we lock the job until it executes, then as the job begins executes we keep the lock so that no other jobs with the same arguments can execute at the same time.

### Until Timeout

The job won't be unlocked until the timeout/expiry runs out.

### Uniqueness Scope

- Queue specific locks
- Across all queues.
- Timed / Scheduled jobs

## Usage

All that is required is that you specifically set the sidekiq option for *unique* to true like below:
Expand All @@ -33,12 +68,12 @@ should be unique. The job will be unique for the number of seconds configured (d
or until the job has been completed. Thus, the job will be unique for the shorter of the two. Note that Sidekiq versions before 3.0 will remove job keys after an hour, which means jobs can remain unique for at most an hour.

*If you want the unique job to stick around even after it has been successfully
processed then just set the unique_unlock_order to anything except `:before_yield` or `:after_yield` (`unique_unlock_order = :never`)
processed then just set the unique_lock to anything except `:before_yield` or `:after_yield` (`unique_lock = :until_timeout`)

You can also control the expiration length of the uniqueness check. If you want to enforce uniqueness over a longer period than the default of 30 minutes then you can pass the number of seconds you want to use to the sidekiq options:

```ruby
sidekiq_options unique: true, unique_job_expiration: 120 * 60 # 2 hours
sidekiq_options unique: true, unique_expiration: 120 * 60 # 2 hours
```

Requiring the gem in your gemfile should be sufficient to enable unique jobs.
Expand Down Expand Up @@ -96,13 +131,13 @@ Note that objects passed into workers are converted to JSON *after* running thro

### Unlock Ordering

By default the server middleware will release the worker lock after yielding to the next middleware or worker. Alternatively, this can be changed by passing the `unique_unlock_order` option:
By default the server middleware will release the worker lock after yielding to the next middleware or worker. Alternatively, this can be changed by passing the `unique_lock` option:

```ruby
class UniqueJobWithFilterMethod
include Sidekiq::Worker
sidekiq_options unique: true,
unique_unlock_order: :before_yield
unique_locks: :until_executing

...

Expand Down
3 changes: 2 additions & 1 deletion Rakefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env rake

require 'rubygems'
require 'bundler/setup'
require 'bundler/gem_tasks'
Expand All @@ -8,4 +9,4 @@ require 'rubocop/rake_task'
RuboCop::RakeTask.new(:style)
RSpec::Core::RakeTask.new(:spec)

task default: [:spec, :style]
task default: [:style, :spec]
80 changes: 32 additions & 48 deletions lib/sidekiq-unique-jobs.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
require 'yaml' if RUBY_VERSION.include?('2.0.0') # rubocop:disable FileName
require 'sidekiq_unique_jobs/core_ext'
require 'sidekiq_unique_jobs/options_with_fallback'
require 'sidekiq_unique_jobs/scripts'
require 'sidekiq_unique_jobs/unique_args'
require 'sidekiq_unique_jobs/unlockable'
require 'sidekiq_unique_jobs/lock'
require 'sidekiq_unique_jobs/middleware'
require 'sidekiq_unique_jobs/version'
require 'sidekiq_unique_jobs/config'
Expand All @@ -11,11 +17,10 @@ module SidekiqUniqueJobs

def config
@config ||= Config.new(
unique_prefix: 'sidekiq_unique',
unique_args_enabled: false,
unique_prefix: 'uniquejobs',
unique_args_enabled: true,
default_expiration: 30 * 60,
default_unlock_order: :after_yield,
unique_storage_method: :new,
default_lock: :while_executing,
redis_test_mode: :redis # :mock
)
end
Expand All @@ -24,8 +29,22 @@ def unique_args_enabled?
config.unique_args_enabled
end

def configure
yield config
def default_lock
config.default_lock
end

def configure(options = {})
if block_given?
yield config
else
options.each do |key, val|
config[key] = val
end
end
end

def namespace
@namespace ||= Sidekiq.redis { |c| c.respond_to?(:namespace) ? c.namespace : nil }
end

# Attempt to constantize a string worker_class argument, always
Expand All @@ -37,55 +56,20 @@ def worker_class_constantize(worker_class)
worker_class
end

def get_payload(klass, queue, *args)
unique_on_all_queues = false
if config.unique_args_enabled
worker_class = worker_class_constantize(klass)
args = yield_unique_args(worker_class, *args)
unique_on_all_queues =
worker_class.get_sidekiq_options['unique_on_all_queues']
end
md5_arguments = { class: klass, args: args }
md5_arguments[:queue] = queue unless unique_on_all_queues
"#{config.unique_prefix}:" \
"#{Digest::MD5.hexdigest(Sidekiq.dump_json(md5_arguments))}"
end

def payload_hash(item)
get_payload(item['class'], item['queue'], item['args'])
end

def yield_unique_args(worker_class, args)
unique_args = worker_class.get_sidekiq_options['unique_args']
filtered_args(worker_class, unique_args, args)
rescue NameError
# fallback to not filtering args when class can't be instantiated
args
def redis_version
@redis_version ||= Sidekiq.redis { |c| c.info('server')['redis_version'] }
end

def connection(redis_pool = nil, &block)
return mock_redis if SidekiqUniqueJobs.config.mocking?
return mock_redis if config.mocking?
redis_pool ? redis_pool.with(&block) : Sidekiq.redis(&block)
end

def self.mock_redis
@redis_mock ||= MockRedis.new
end

def filtered_args(worker_class, unique_args, args)
case unique_args
when Proc
unique_args.call(args)
when Symbol
if worker_class.respond_to?(unique_args)
worker_class.send(unique_args, *args)
end
else
args
end
def mock_redis
@redis_mock ||= MockRedis.new if defined?(MockRedis)
end

def synchronize(key, redis, item = nil, &blk)
SidekiqUniqueJobs::RunLock.synchronize(key, redis, item, &blk)
def synchronize(item, redis_pool, &blk)
Lock::WhileExecuting.synchronize(item, redis_pool, &blk)
end
end
98 changes: 16 additions & 82 deletions lib/sidekiq_unique_jobs/client/middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,110 +3,44 @@
module SidekiqUniqueJobs
module Client
class Middleware
SCHEDULED = 'scheduled'.freeze
extend Forwardable
def_delegators :SidekiqUniqueJobs, :connection, :config, :payload_hash
def_delegators :config, :unique_storage_method
def_delegators :SidekiqUniqueJobs, :connection, :config
def_delegators :Sidekiq, :logger

include OptionsWithFallback

def call(worker_class, item, queue, redis_pool = nil)
@worker_class = SidekiqUniqueJobs.worker_class_constantize(worker_class)
@item = item
@queue = queue
@redis_pool = redis_pool

return yield unless unique_enabled?
item['unique_hash'] = payload_hash(item)
unless unique_for_connection?
logger.warn "payload is not unique #{item}" if log_duplicate_payload?
return
end

yield
yield if ordinary_or_locked?
end

private

attr_reader :item, :worker_class, :redis_pool, :queue

def unique_enabled?
worker_class.get_sidekiq_options['unique'] || item['unique']
end

def log_duplicate_payload?
worker_class.get_sidekiq_options['log_duplicate_payload'] || item['log_duplicate_payload']
end

def unique_for_connection?
send("#{unique_storage_method}_unique_for?")
end

def old_unique_for?
item['at'] ? unique_schedule_old : unique_enqueue_old
end

def unique_enqueue_old
connection do |conn|
conn.watch(payload_hash(item))
lock_val = conn.get(payload_hash(item))
if !lock_val || lock_val[0..8] == SCHEDULED
conn.multi do
conn.setex(payload_hash(item), expires_at, item['jid'])
end
else
conn.unwatch
false
end
end
end

def unique_schedule_old
connection do |conn|
conn.watch(payload_hash(item))
if expires_at < 0 || conn.get(payload_hash(item))
conn.unwatch
false
else
conn.setex(payload_hash(item), expires_at, "scheduled-#{item['jid']}")
end
end
end

def new_unique_for?
item['at'] ? unique_schedule : unique_enqueue
end

def unique_schedule
connection do |conn|
conn.set(payload_hash(item), "scheduled-#{item['jid']}", nx: true, ex: expires_at)
end
def ordinary_or_locked?
unique_disabled? || unlockable? || aquire_lock
end

def unique_enqueue
connection do |conn|
conn.eval(lock_queue_script, keys: [payload_hash(item)], argv: [expires_at, item['jid']])
end
def unlockable?
!lockable?
end

def expires_at
# if the job was previously scheduled and is now being queued,
# or we've never seen it before
ex = unique_job_expiration || config.default_expiration
ex = ((Time.at(item['at']) - Time.now.utc) + ex).to_i if item['at']
ex
def lockable?
lock.respond_to?(:lock)
end

def unique_job_expiration
worker_class.get_sidekiq_options['unique_job_expiration']
def aquire_lock
locked = lock.lock(:client)
warn_about_duplicate(item) unless locked
locked
end

def lock_queue_script
<<-LUA
local ret = redis.call('GET', KEYS[1])
if not ret or string.sub(ret, 1, 9) == 'scheduled' then
return redis.call('SETEX', KEYS[1], ARGV[1], ARGV[2])
end
LUA
def warn_about_duplicate(item)
logger.warn "payload is not unique #{item}" if log_duplicate_payload?
end
end
end
Expand Down
Loading

0 comments on commit 25c36a4

Please sign in to comment.