Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query updates #149

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions lib/delayed/backend/active_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class Job < ::ActiveRecord::Base
:failed_at, :locked_at, :locked_by, :handler
end

scope :by_priority, lambda { order("priority ASC, run_at ASC") }
scope :by_priority, lambda { order(:priority, :run_at) }
scope :min_priority, lambda { where("priority >= ?", Worker.min_priority) if Worker.min_priority }
scope :max_priority, lambda { where("priority <= ?", Worker.max_priority) if Worker.max_priority }
scope :for_queues, lambda { |queues = Worker.queues| where(queue: queues) if Array(queues).any? }
Expand All @@ -52,13 +52,14 @@ def self.set_delayed_job_table_name

set_delayed_job_table_name

def self.ready_to_run(worker_name, max_run_time)
where(
"((run_at <= ? AND (locked_at IS NULL OR locked_at < ?)) OR locked_by = ?) AND failed_at IS NULL",
db_time_now,
db_time_now - max_run_time,
worker_name
)
def self.ready_to_run(worker_name, max_run_time) # rubocop:disable Metrics/AbcSize
not_failed = arel_table[:failed_at].eq(nil)
time_to_run = arel_table[:run_at].lteq(db_time_now)
not_locked = arel_table[:locked_at].eq(nil)
lock_expired = arel_table[:locked_at].lt(db_time_now - max_run_time)
locked_by_me = arel_table[:locked_by].eq(worker_name)

where(not_failed.and(time_to_run).and(not_locked.or(lock_expired).or(locked_by_me)))
end

def self.before_fork
Expand Down Expand Up @@ -128,11 +129,14 @@ def self.reserve_with_scope_using_optimized_postgres(ready_scope, worker, now)
# http://www.postgresql.org/docs/9.0/static/sql-select.html#SQL-FOR-UPDATE-SHARE
# Note: active_record would attempt to generate UPDATE...LIMIT like
# SQL for Postgres if we use a .limit() filter, but it would not
# use 'FOR UPDATE' and we would have many locking conflicts
quoted_name = connection.quote_table_name(table_name)
subquery = ready_scope.limit(1).lock(true).select("id").to_sql
sql = "UPDATE #{quoted_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery}) RETURNING *"
reserved = find_by_sql([sql, now, worker.name])
# use 'FOR UPDATE' and we would have many locking conflicts.
# This was further updated for some edge cases around locking multiple records
# with solutions discussed at length here
# https://dba.stackexchange.com/questions/69471/postgres-update-limit-1
quoted_name = Delayed::Job.arel_table.alias(:dj).to_sql
subquery = ready_scope.where("pg_try_advisory_xact_lock(id)").limit(1).lock(true).select(:id).to_sql
sql = "UPDATE #{quoted_name} SET locked_at = ?, locked_by = ? WHERE dj.id = (#{subquery}) RETURNING dj.*"
reserved = transaction { find_by_sql([sql, now, worker.name]) }
reserved[0]
end

Expand All @@ -143,12 +147,16 @@ def self.reserve_with_scope_using_optimized_mysql(ready_scope, worker, now)
# while updating. But during the where clause, for mysql(>=5.6.4),
# it queries with precision as well. So removing the precision
now = now.change(usec: 0)

# This works on MySQL and possibly some other DBs that support
# UPDATE...LIMIT. It uses separate queries to lock and return the job
count = ready_scope.limit(1).update_all(locked_at: now, locked_by: worker.name)
return nil if count == 0
sets = "locked_at = :now, locked_by = :name, id = (SELECT @dj_update_id := id)"
transaction do
count = ready_scope.limit(1).update_all([sets, now: now, name: worker.name])
return nil if count == 0

where(locked_at: now, locked_by: worker.name, failed_at: nil).first
where("id = @dj_update_id").first
end
end

def self.reserve_with_scope_using_optimized_mssql(ready_scope, worker, now)
Expand Down