diff --git a/CHANGELOG.md b/CHANGELOG.md index ae3c75cc..71d5684b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,10 @@ ## [7.0.0] - Unreleased +- Fix: Raise ArgumentError when using delay with FIFO queues + - FIFO queues do not support per-message DelaySeconds + - Previously caused confusing AWS errors with ActiveJob's `retry_on` (Rails 6.1+ defaults to `wait: 3.seconds`) + - Now raises clear ArgumentError with guidance to use `wait: 0` + - Fixes #924 + - Enhancement: Use fiber-local storage for logging context - Replaces thread-local storage with Fiber[] for proper isolation in async environments - Ensures logging context doesn't leak between fibers in the same thread diff --git a/lib/active_job/queue_adapters/shoryuken_adapter.rb b/lib/active_job/queue_adapters/shoryuken_adapter.rb index 3f5c673f..2c7cf25c 100644 --- a/lib/active_job/queue_adapters/shoryuken_adapter.rb +++ b/lib/active_job/queue_adapters/shoryuken_adapter.rb @@ -98,8 +98,25 @@ def enqueue(job, options = {}) # :nodoc: # @param job [ActiveJob::Base] the job to enqueue # @param timestamp [Float] Unix timestamp when the job should be processed # @return [Aws::SQS::Types::SendMessageResult] the send result + # @raise [ArgumentError] if delay is used with a FIFO queue def enqueue_at(job, timestamp) # :nodoc: - enqueue(job, delay_seconds: calculate_delay(timestamp)) + delay = calculate_delay(timestamp) + + # FIFO queues do not support per-message delays + # Check early to fail synchronously (before any async wrapping in subclasses) + # Note: negative delays (past timestamps) don't need handling here - + # SQS treats them as immediate delivery (delay_seconds=0) + # See https://github.com/ruby-shoryuken/shoryuken/issues/924 + if delay.positive? + queue = Shoryuken::Client.queues(job.queue_name) + if queue.fifo? + raise ArgumentError, + "FIFO queue '#{queue.name}' does not support per-message delays. " \ + 'When using ActiveJob retry_on with FIFO queues, set `wait: 0`.' + end + end + + enqueue(job, delay_seconds: delay) end # Bulk enqueue multiple jobs efficiently using SQS batch API. diff --git a/spec/shared_examples_for_active_job.rb b/spec/shared_examples_for_active_job.rb index c7c0ba56..cef92f19 100644 --- a/spec/shared_examples_for_active_job.rb +++ b/spec/shared_examples_for_active_job.rb @@ -281,6 +281,26 @@ class TestJob < ActiveJob::Base; end subject.enqueue_at(job, nil) end + + context 'when fifo' do + let(:fifo) { true } + + it 'raises ArgumentError when delay is positive' do + allow(subject).to receive(:calculate_delay).and_return(3) + allow(queue).to receive(:name).and_return('test.fifo') + expect(queue).not_to receive(:send_message) + + expect { subject.enqueue_at(job, nil) }.to raise_error( + ArgumentError, /FIFO queue.*does not support per-message delays/ + ) + end + + it 'does not raise when delay is zero' do + allow(subject).to receive(:calculate_delay).and_return(0) + expect(queue).to receive(:send_message).with(hash_including(delay_seconds: 0)) + expect { subject.enqueue_at(job, nil) }.not_to raise_error + end + end end end # rubocop:enable Metrics/BlockLength