diff --git a/.github/workflows/specs.yml b/.github/workflows/specs.yml index 2a6eeb7a..d859f648 100644 --- a/.github/workflows/specs.yml +++ b/.github/workflows/specs.yml @@ -70,3 +70,19 @@ jobs: - name: Run Rails specs run: bundle exec rake spec:rails + + ci-success: + name: CI Success + runs-on: ubuntu-latest + if: always() + needs: + - all_specs + - rails_specs + steps: + - name: Check all jobs passed + if: | + contains(needs.*.result, 'failure') || + contains(needs.*.result, 'cancelled') || + contains(needs.*.result, 'skipped') + run: exit 1 + - run: echo "All CI checks passed!" diff --git a/CHANGELOG.md b/CHANGELOG.md index c69008a1..76e32079 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,13 @@ ## [7.0.0] - Unreleased +- Enhancement: Add ActiveJob Continuations support (Rails 8.1+) + - Implements `stopping?` method in ActiveJob adapters to signal graceful shutdown + - Enables jobs to checkpoint progress and resume after interruption + - Handles past timestamps correctly (SQS treats negative delays as immediate delivery) + - Tracks shutdown state in Launcher via `stopping?` flag + - Leverages existing Shoryuken shutdown lifecycle (stop/stop! methods) + - Includes comprehensive integration tests with continuable jobs + - See Rails PR #55127 for more details on ActiveJob Continuations + - Breaking: Remove support for Rails versions older than 7.2 - Rails 7.0 and 7.1 have reached end-of-life and are no longer supported - Supported versions: Rails 7.2, 8.0, and 8.1 diff --git a/lib/shoryuken/extensions/active_job_adapter.rb b/lib/shoryuken/extensions/active_job_adapter.rb index 4a341cb9..fbaac0b7 100644 --- a/lib/shoryuken/extensions/active_job_adapter.rb +++ b/lib/shoryuken/extensions/active_job_adapter.rb @@ -37,6 +37,19 @@ def enqueue_after_transaction_commit? true end + # Indicates whether Shoryuken is in the process of shutting down. + # + # This method is required for ActiveJob Continuations support (Rails 8.1+). + # When true, it signals to jobs that they should checkpoint their progress + # and gracefully interrupt execution to allow for resumption after restart. + # + # @return [Boolean] true if Shoryuken is shutting down, false otherwise + # @see https://github.com/rails/rails/pull/55127 Rails ActiveJob Continuations + def stopping? + launcher = Shoryuken::Runner.instance.launcher + launcher&.stopping? || false + end + def enqueue(job, options = {}) # :nodoc: register_worker!(job) diff --git a/lib/shoryuken/launcher.rb b/lib/shoryuken/launcher.rb index b708a584..b3d93650 100644 --- a/lib/shoryuken/launcher.rb +++ b/lib/shoryuken/launcher.rb @@ -6,6 +6,18 @@ class Launcher def initialize @managers = create_managers + @stopping = false + end + + # Indicates whether the launcher is in the process of stopping. + # + # This flag is set to true when either {#stop} or {#stop!} is called, + # and is used by ActiveJob adapters to signal jobs that they should + # checkpoint and prepare for graceful shutdown. + # + # @return [Boolean] true if stopping, false otherwise + def stopping? + @stopping end def start @@ -16,6 +28,7 @@ def start end def stop! + @stopping = true initiate_stop # Don't await here so the timeout below is not delayed @@ -28,6 +41,7 @@ def stop! end def stop + @stopping = true fire_event(:quiet, true) initiate_stop diff --git a/lib/shoryuken/runner.rb b/lib/shoryuken/runner.rb index d42764bd..7f7516d0 100644 --- a/lib/shoryuken/runner.rb +++ b/lib/shoryuken/runner.rb @@ -16,6 +16,9 @@ class Runner include Util include Singleton + # @return [Shoryuken::Launcher, nil] the launcher instance, or nil if not yet initialized + attr_reader :launcher + def run(options) self_read, self_write = IO.pipe diff --git a/spec/integration/active_job_continuation_spec.rb b/spec/integration/active_job_continuation_spec.rb new file mode 100644 index 00000000..b6e32920 --- /dev/null +++ b/spec/integration/active_job_continuation_spec.rb @@ -0,0 +1,145 @@ +# frozen_string_literal: true + +require 'securerandom' +require 'active_job' +require 'shoryuken/extensions/active_job_adapter' +require 'shoryuken/extensions/active_job_extensions' + +RSpec.describe 'ActiveJob Continuations Integration' do + # Skip all tests in this suite if ActiveJob::Continuable is not available (Rails < 8.0) + before(:all) do + skip 'ActiveJob::Continuable not available (Rails < 8.0)' unless defined?(ActiveJob::Continuable) + end + + # Test job that uses ActiveJob Continuations + class ContinuableTestJob < ActiveJob::Base + include ActiveJob::Continuable if defined?(ActiveJob::Continuable) + + queue_as :default + + class_attribute :executions_log, default: [] + class_attribute :checkpoints_reached, default: [] + + def perform(max_iterations: 10) + self.class.executions_log << { execution: executions, started_at: Time.current } + + step :initialize_work do + self.class.checkpoints_reached << "initialize_work_#{executions}" + end + + step :process_items, start: cursor || 0 do + (cursor..max_iterations).each do |i| + self.class.checkpoints_reached << "processing_item_#{i}" + + # Check if we should stop (checkpoint) + checkpoint + + # Simulate some work + sleep 0.01 + + # Advance cursor + cursor.advance! + end + end + + step :finalize_work do + self.class.checkpoints_reached << 'finalize_work' + end + + self.class.executions_log.last[:completed] = true + end + end + + describe 'stopping? method (unit tests)' do + it 'returns false when launcher is not initialized' do + adapter = ActiveJob::QueueAdapters::ShoryukenAdapter.new + expect(adapter.stopping?).to be false + end + + it 'returns true when launcher is stopping' do + launcher = Shoryuken::Launcher.new + runner = Shoryuken::Runner.instance + runner.instance_variable_set(:@launcher, launcher) + + adapter = ActiveJob::QueueAdapters::ShoryukenAdapter.new + expect(adapter.stopping?).to be false + + launcher.instance_variable_set(:@stopping, true) + expect(adapter.stopping?).to be true + end + end + + describe 'timestamp handling for continuation retries' do + it 'handles past timestamps for continuation retries' do + adapter = ActiveJob::QueueAdapters::ShoryukenAdapter.new + job = ContinuableTestJob.new + job.sqs_send_message_parameters = {} + + # Mock the queue + queue = instance_double(Shoryuken::Queue, fifo?: false) + allow(Shoryuken::Client).to receive(:queues).and_return(queue) + allow(Shoryuken).to receive(:register_worker) + allow(queue).to receive(:send_message) do |params| + # Verify past timestamp results in immediate delivery (delay_seconds <= 0) + expect(params[:delay_seconds]).to be <= 0 + end + + # Enqueue with past timestamp (simulating continuation retry) + past_timestamp = Time.current.to_f - 60 + adapter.enqueue_at(job, past_timestamp) + end + end + + describe 'enqueue_at with continuation timestamps (unit tests)' do + let(:adapter) { ActiveJob::QueueAdapters::ShoryukenAdapter.new } + let(:job) do + job = ContinuableTestJob.new + job.sqs_send_message_parameters = {} + job + end + let(:queue) { instance_double(Shoryuken::Queue, fifo?: false) } + + before do + allow(Shoryuken::Client).to receive(:queues).and_return(queue) + allow(Shoryuken).to receive(:register_worker) + @sent_messages = [] + allow(queue).to receive(:send_message) do |params| + @sent_messages << params + end + end + + it 'accepts past timestamps without error' do + past_timestamp = Time.current.to_f - 30 + + expect { + adapter.enqueue_at(job, past_timestamp) + }.not_to raise_error + + expect(@sent_messages.size).to eq(1) + expect(@sent_messages.first[:delay_seconds]).to be <= 0 + end + + it 'accepts current timestamp' do + current_timestamp = Time.current.to_f + + expect { + adapter.enqueue_at(job, current_timestamp) + }.not_to raise_error + + expect(@sent_messages.size).to eq(1) + expect(@sent_messages.first[:delay_seconds]).to be_between(-1, 1) + end + + it 'accepts future timestamp' do + future_timestamp = Time.current.to_f + 30 + + expect { + adapter.enqueue_at(job, future_timestamp) + }.not_to raise_error + + expect(@sent_messages.size).to eq(1) + expect(@sent_messages.first[:delay_seconds]).to be > 0 + expect(@sent_messages.first[:delay_seconds]).to be <= 30 + end + end +end diff --git a/spec/shoryuken/extensions/active_job_continuation_spec.rb b/spec/shoryuken/extensions/active_job_continuation_spec.rb new file mode 100644 index 00000000..61f0f1f8 --- /dev/null +++ b/spec/shoryuken/extensions/active_job_continuation_spec.rb @@ -0,0 +1,110 @@ +# frozen_string_literal: true + +require 'active_job' +require 'shared_examples_for_active_job' +require 'shoryuken/extensions/active_job_adapter' +require 'shoryuken/extensions/active_job_extensions' + +RSpec.describe 'ActiveJob Continuation support' do + let(:adapter) { ActiveJob::QueueAdapters::ShoryukenAdapter.new } + let(:job) do + job = TestJob.new + job.sqs_send_message_parameters = {} + job + end + let(:queue) { double('Queue', fifo?: false) } + + before do + allow(Shoryuken::Client).to receive(:queues).with(job.queue_name).and_return(queue) + allow(Shoryuken).to receive(:register_worker) + end + + describe '#stopping?' do + context 'when Launcher is not initialized' do + it 'returns false' do + runner = instance_double(Shoryuken::Runner, launcher: nil) + allow(Shoryuken::Runner).to receive(:instance).and_return(runner) + + expect(adapter.stopping?).to be false + end + end + + context 'when Launcher is initialized' do + let(:runner) { instance_double(Shoryuken::Runner) } + let(:launcher) { instance_double(Shoryuken::Launcher) } + + before do + allow(Shoryuken::Runner).to receive(:instance).and_return(runner) + allow(runner).to receive(:launcher).and_return(launcher) + end + + it 'returns false when not stopping' do + allow(launcher).to receive(:stopping?).and_return(false) + expect(adapter.stopping?).to be false + end + + it 'returns true when stopping' do + allow(launcher).to receive(:stopping?).and_return(true) + expect(adapter.stopping?).to be true + end + end + end + + describe '#enqueue_at with past timestamps' do + let(:past_timestamp) { Time.current.to_f - 60 } # 60 seconds ago + + it 'enqueues with negative delay_seconds when timestamp is in the past' do + expect(queue).to receive(:send_message) do |hash| + expect(hash[:delay_seconds]).to be <= 0 + expect(hash[:delay_seconds]).to be >= -61 # Allow for rounding and timing + end + + adapter.enqueue_at(job, past_timestamp) + end + + it 'does not raise an error for past timestamps' do + allow(queue).to receive(:send_message) + + expect { adapter.enqueue_at(job, past_timestamp) }.not_to raise_error + end + end + + describe '#enqueue_at with future timestamps' do + let(:future_timestamp) { Time.current.to_f + 60 } # 60 seconds from now + + it 'enqueues with delay_seconds when timestamp is in the future' do + expect(queue).to receive(:send_message) do |hash| + expect(hash[:delay_seconds]).to be > 0 + expect(hash[:delay_seconds]).to be <= 60 + end + + adapter.enqueue_at(job, future_timestamp) + end + end + + describe '#enqueue_at with current timestamp' do + let(:current_timestamp) { Time.current.to_f } + + it 'enqueues with delay_seconds close to 0' do + expect(queue).to receive(:send_message) do |hash| + expect(hash[:delay_seconds]).to be_between(-1, 1) # Allow for timing/rounding + end + + adapter.enqueue_at(job, current_timestamp) + end + end + + describe 'retry_on with zero wait' do + it 'allows immediate retries through continuation mechanism' do + # Simulate a job with retry_on configuration that uses zero wait + past_timestamp = Time.current.to_f - 1 + + expect(queue).to receive(:send_message) do |hash| + # Negative delay for past timestamp - SQS will handle immediate delivery + expect(hash[:delay_seconds]).to be <= 0 + end + + adapter.enqueue_at(job, past_timestamp) + end + end +end diff --git a/spec/shoryuken/launcher_spec.rb b/spec/shoryuken/launcher_spec.rb index d2df6476..606f0474 100644 --- a/spec/shoryuken/launcher_spec.rb +++ b/spec/shoryuken/launcher_spec.rb @@ -101,4 +101,26 @@ expect(second_group_manager).to have_received(:stop_new_dispatching) end end + + describe '#stopping?' do + it 'returns false by default' do + expect(subject.stopping?).to be false + end + + it 'returns true after stop is called' do + allow(first_group_manager).to receive(:stop_new_dispatching) + allow(first_group_manager).to receive(:await_dispatching_in_progress) + allow(second_group_manager).to receive(:stop_new_dispatching) + allow(second_group_manager).to receive(:await_dispatching_in_progress) + + expect { subject.stop }.to change { subject.stopping? }.from(false).to(true) + end + + it 'returns true after stop! is called' do + allow(first_group_manager).to receive(:stop_new_dispatching) + allow(second_group_manager).to receive(:stop_new_dispatching) + + expect { subject.stop! }.to change { subject.stopping? }.from(false).to(true) + end + end end