Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
## [7.0.0] - Unreleased
- Fix: Raise ArgumentError when using delay with FIFO queues
- FIFO queues do not support per-message DelaySeconds
- Previously caused confusing AWS errors with ActiveJob's `retry_on` (Rails 6.1+ defaults to `wait: 3.seconds`)
- Now raises clear ArgumentError with guidance to use `wait: 0`
- Fixes #924

- Enhancement: Use fiber-local storage for logging context
- Replaces thread-local storage with Fiber[] for proper isolation in async environments
- Ensures logging context doesn't leak between fibers in the same thread
Expand Down
19 changes: 18 additions & 1 deletion lib/active_job/queue_adapters/shoryuken_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,25 @@ def enqueue(job, options = {}) # :nodoc:
# @param job [ActiveJob::Base] the job to enqueue
# @param timestamp [Float] Unix timestamp when the job should be processed
# @return [Aws::SQS::Types::SendMessageResult] the send result
# @raise [ArgumentError] if delay is used with a FIFO queue
def enqueue_at(job, timestamp) # :nodoc:
enqueue(job, delay_seconds: calculate_delay(timestamp))
delay = calculate_delay(timestamp)

# FIFO queues do not support per-message delays
# Check early to fail synchronously (before any async wrapping in subclasses)
# Note: negative delays (past timestamps) don't need handling here -
# SQS treats them as immediate delivery (delay_seconds=0)
# See https://github.com/ruby-shoryuken/shoryuken/issues/924
if delay.positive?
queue = Shoryuken::Client.queues(job.queue_name)
if queue.fifo?
raise ArgumentError,
"FIFO queue '#{queue.name}' does not support per-message delays. " \
'When using ActiveJob retry_on with FIFO queues, set `wait: 0`.'
end
end

enqueue(job, delay_seconds: delay)
end

# Bulk enqueue multiple jobs efficiently using SQS batch API.
Expand Down
20 changes: 20 additions & 0 deletions spec/shared_examples_for_active_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,26 @@ class TestJob < ActiveJob::Base; end

subject.enqueue_at(job, nil)
end

context 'when fifo' do
let(:fifo) { true }

it 'raises ArgumentError when delay is positive' do
allow(subject).to receive(:calculate_delay).and_return(3)
allow(queue).to receive(:name).and_return('test.fifo')
expect(queue).not_to receive(:send_message)

expect { subject.enqueue_at(job, nil) }.to raise_error(
ArgumentError, /FIFO queue.*does not support per-message delays/
)
end

it 'does not raise when delay is zero' do
allow(subject).to receive(:calculate_delay).and_return(0)
expect(queue).to receive(:send_message).with(hash_including(delay_seconds: 0))
expect { subject.enqueue_at(job, nil) }.not_to raise_error
end
end
end
end
# rubocop:enable Metrics/BlockLength