Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/workflows/specs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,19 @@

- name: Run Rails specs
run: bundle exec rake spec:rails

ci-success:
name: CI Success
runs-on: ubuntu-latest
if: always()
needs:
- all_specs
- rails_specs
steps:
- name: Check all jobs passed
if: |
contains(needs.*.result, 'failure') ||
contains(needs.*.result, 'cancelled') ||
contains(needs.*.result, 'skipped')
run: exit 1
- run: echo "All CI checks passed!"
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
## [7.0.0] - Unreleased
- Enhancement: Add ActiveJob Continuations support (Rails 8.1+)
- Implements `stopping?` method in ActiveJob adapters to signal graceful shutdown
- Enables jobs to checkpoint progress and resume after interruption
- Handles past timestamps correctly (SQS treats negative delays as immediate delivery)
- Tracks shutdown state in Launcher via `stopping?` flag
- Leverages existing Shoryuken shutdown lifecycle (stop/stop! methods)
- Includes comprehensive integration tests with continuable jobs
- See Rails PR #55127 for more details on ActiveJob Continuations

- Breaking: Remove support for Rails versions older than 7.2
- Rails 7.0 and 7.1 have reached end-of-life and are no longer supported
- Supported versions: Rails 7.2, 8.0, and 8.1
Expand Down
13 changes: 13 additions & 0 deletions lib/shoryuken/extensions/active_job_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ def enqueue_after_transaction_commit?
true
end

# Indicates whether Shoryuken is in the process of shutting down.
#
# This method is required for ActiveJob Continuations support (Rails 8.1+).
# When true, it signals to jobs that they should checkpoint their progress
# and gracefully interrupt execution to allow for resumption after restart.
#
# @return [Boolean] true if Shoryuken is shutting down, false otherwise
# @see https://github.com/rails/rails/pull/55127 Rails ActiveJob Continuations
def stopping?
launcher = Shoryuken::Runner.instance.launcher
launcher&.stopping? || false
end

def enqueue(job, options = {}) # :nodoc:
register_worker!(job)

Expand Down
14 changes: 14 additions & 0 deletions lib/shoryuken/launcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,18 @@ class Launcher

def initialize
@managers = create_managers
@stopping = false
end

# Indicates whether the launcher is in the process of stopping.
#
# This flag is set to true when either {#stop} or {#stop!} is called,
# and is used by ActiveJob adapters to signal jobs that they should
# checkpoint and prepare for graceful shutdown.
#
# @return [Boolean] true if stopping, false otherwise
def stopping?
@stopping
end

def start
Expand All @@ -16,6 +28,7 @@ def start
end

def stop!
@stopping = true
initiate_stop

# Don't await here so the timeout below is not delayed
Expand All @@ -28,6 +41,7 @@ def stop!
end

def stop
@stopping = true
fire_event(:quiet, true)

initiate_stop
Expand Down
3 changes: 3 additions & 0 deletions lib/shoryuken/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ class Runner
include Util
include Singleton

# @return [Shoryuken::Launcher, nil] the launcher instance, or nil if not yet initialized
attr_reader :launcher

def run(options)
self_read, self_write = IO.pipe

Expand Down
145 changes: 145 additions & 0 deletions spec/integration/active_job_continuation_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# frozen_string_literal: true

require 'securerandom'
require 'active_job'
require 'shoryuken/extensions/active_job_adapter'
require 'shoryuken/extensions/active_job_extensions'

RSpec.describe 'ActiveJob Continuations Integration' do
# Skip all tests in this suite if ActiveJob::Continuable is not available (Rails < 8.0)
before(:all) do
skip 'ActiveJob::Continuable not available (Rails < 8.0)' unless defined?(ActiveJob::Continuable)
end

# Test job that uses ActiveJob Continuations
class ContinuableTestJob < ActiveJob::Base
include ActiveJob::Continuable if defined?(ActiveJob::Continuable)

queue_as :default

class_attribute :executions_log, default: []
class_attribute :checkpoints_reached, default: []

def perform(max_iterations: 10)
self.class.executions_log << { execution: executions, started_at: Time.current }

step :initialize_work do
self.class.checkpoints_reached << "initialize_work_#{executions}"
end

step :process_items, start: cursor || 0 do
(cursor..max_iterations).each do |i|
self.class.checkpoints_reached << "processing_item_#{i}"

# Check if we should stop (checkpoint)
checkpoint

# Simulate some work
sleep 0.01

# Advance cursor
cursor.advance!
end
end

step :finalize_work do
self.class.checkpoints_reached << 'finalize_work'
end

self.class.executions_log.last[:completed] = true
end
end

describe 'stopping? method (unit tests)' do
it 'returns false when launcher is not initialized' do
adapter = ActiveJob::QueueAdapters::ShoryukenAdapter.new
expect(adapter.stopping?).to be false
end

it 'returns true when launcher is stopping' do
launcher = Shoryuken::Launcher.new
runner = Shoryuken::Runner.instance
runner.instance_variable_set(:@launcher, launcher)

adapter = ActiveJob::QueueAdapters::ShoryukenAdapter.new
expect(adapter.stopping?).to be false

launcher.instance_variable_set(:@stopping, true)
expect(adapter.stopping?).to be true
end
end

describe 'timestamp handling for continuation retries' do
it 'handles past timestamps for continuation retries' do
adapter = ActiveJob::QueueAdapters::ShoryukenAdapter.new
job = ContinuableTestJob.new
job.sqs_send_message_parameters = {}

# Mock the queue
queue = instance_double(Shoryuken::Queue, fifo?: false)
allow(Shoryuken::Client).to receive(:queues).and_return(queue)
allow(Shoryuken).to receive(:register_worker)
allow(queue).to receive(:send_message) do |params|
# Verify past timestamp results in immediate delivery (delay_seconds <= 0)
expect(params[:delay_seconds]).to be <= 0
end

# Enqueue with past timestamp (simulating continuation retry)
past_timestamp = Time.current.to_f - 60
adapter.enqueue_at(job, past_timestamp)
end
end

describe 'enqueue_at with continuation timestamps (unit tests)' do
let(:adapter) { ActiveJob::QueueAdapters::ShoryukenAdapter.new }
let(:job) do
job = ContinuableTestJob.new
job.sqs_send_message_parameters = {}
job
end
let(:queue) { instance_double(Shoryuken::Queue, fifo?: false) }

before do
allow(Shoryuken::Client).to receive(:queues).and_return(queue)
allow(Shoryuken).to receive(:register_worker)
@sent_messages = []
allow(queue).to receive(:send_message) do |params|
@sent_messages << params
end
end

it 'accepts past timestamps without error' do
past_timestamp = Time.current.to_f - 30

expect {
adapter.enqueue_at(job, past_timestamp)
}.not_to raise_error

expect(@sent_messages.size).to eq(1)
expect(@sent_messages.first[:delay_seconds]).to be <= 0
end

it 'accepts current timestamp' do
current_timestamp = Time.current.to_f

expect {
adapter.enqueue_at(job, current_timestamp)
}.not_to raise_error

expect(@sent_messages.size).to eq(1)
expect(@sent_messages.first[:delay_seconds]).to be_between(-1, 1)
end

it 'accepts future timestamp' do
future_timestamp = Time.current.to_f + 30

expect {
adapter.enqueue_at(job, future_timestamp)
}.not_to raise_error

expect(@sent_messages.size).to eq(1)
expect(@sent_messages.first[:delay_seconds]).to be > 0
expect(@sent_messages.first[:delay_seconds]).to be <= 30
end
end
end
110 changes: 110 additions & 0 deletions spec/shoryuken/extensions/active_job_continuation_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# frozen_string_literal: true

require 'active_job'
require 'shared_examples_for_active_job'
require 'shoryuken/extensions/active_job_adapter'
require 'shoryuken/extensions/active_job_extensions'

RSpec.describe 'ActiveJob Continuation support' do
let(:adapter) { ActiveJob::QueueAdapters::ShoryukenAdapter.new }
let(:job) do
job = TestJob.new
job.sqs_send_message_parameters = {}
job
end
let(:queue) { double('Queue', fifo?: false) }

before do
allow(Shoryuken::Client).to receive(:queues).with(job.queue_name).and_return(queue)
allow(Shoryuken).to receive(:register_worker)
end

describe '#stopping?' do
context 'when Launcher is not initialized' do
it 'returns false' do
runner = instance_double(Shoryuken::Runner, launcher: nil)
allow(Shoryuken::Runner).to receive(:instance).and_return(runner)

expect(adapter.stopping?).to be false
end
end

context 'when Launcher is initialized' do
let(:runner) { instance_double(Shoryuken::Runner) }
let(:launcher) { instance_double(Shoryuken::Launcher) }

before do
allow(Shoryuken::Runner).to receive(:instance).and_return(runner)
allow(runner).to receive(:launcher).and_return(launcher)
end

it 'returns false when not stopping' do
allow(launcher).to receive(:stopping?).and_return(false)
expect(adapter.stopping?).to be false
end

it 'returns true when stopping' do
allow(launcher).to receive(:stopping?).and_return(true)
expect(adapter.stopping?).to be true
end
end
end

describe '#enqueue_at with past timestamps' do
let(:past_timestamp) { Time.current.to_f - 60 } # 60 seconds ago

it 'enqueues with negative delay_seconds when timestamp is in the past' do
expect(queue).to receive(:send_message) do |hash|
expect(hash[:delay_seconds]).to be <= 0
expect(hash[:delay_seconds]).to be >= -61 # Allow for rounding and timing
end

adapter.enqueue_at(job, past_timestamp)
end

it 'does not raise an error for past timestamps' do
allow(queue).to receive(:send_message)

expect { adapter.enqueue_at(job, past_timestamp) }.not_to raise_error
end
end

describe '#enqueue_at with future timestamps' do
let(:future_timestamp) { Time.current.to_f + 60 } # 60 seconds from now

it 'enqueues with delay_seconds when timestamp is in the future' do
expect(queue).to receive(:send_message) do |hash|
expect(hash[:delay_seconds]).to be > 0
expect(hash[:delay_seconds]).to be <= 60
end

adapter.enqueue_at(job, future_timestamp)
end
end

describe '#enqueue_at with current timestamp' do
let(:current_timestamp) { Time.current.to_f }

it 'enqueues with delay_seconds close to 0' do
expect(queue).to receive(:send_message) do |hash|
expect(hash[:delay_seconds]).to be_between(-1, 1) # Allow for timing/rounding
end

adapter.enqueue_at(job, current_timestamp)
end
end

describe 'retry_on with zero wait' do
it 'allows immediate retries through continuation mechanism' do
# Simulate a job with retry_on configuration that uses zero wait
past_timestamp = Time.current.to_f - 1

expect(queue).to receive(:send_message) do |hash|
# Negative delay for past timestamp - SQS will handle immediate delivery
expect(hash[:delay_seconds]).to be <= 0
end

adapter.enqueue_at(job, past_timestamp)
end
end
end
22 changes: 22 additions & 0 deletions spec/shoryuken/launcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,26 @@
expect(second_group_manager).to have_received(:stop_new_dispatching)
end
end

describe '#stopping?' do
it 'returns false by default' do
expect(subject.stopping?).to be false
end

it 'returns true after stop is called' do
allow(first_group_manager).to receive(:stop_new_dispatching)
allow(first_group_manager).to receive(:await_dispatching_in_progress)
allow(second_group_manager).to receive(:stop_new_dispatching)
allow(second_group_manager).to receive(:await_dispatching_in_progress)

expect { subject.stop }.to change { subject.stopping? }.from(false).to(true)
end

it 'returns true after stop! is called' do
allow(first_group_manager).to receive(:stop_new_dispatching)
allow(second_group_manager).to receive(:stop_new_dispatching)

expect { subject.stop! }.to change { subject.stopping? }.from(false).to(true)
end
end
end