diff --git a/lib/shoryuken.rb b/lib/shoryuken.rb index 02516a9e..43881a50 100644 --- a/lib/shoryuken.rb +++ b/lib/shoryuken.rb @@ -36,6 +36,32 @@ def self.healthy? Shoryuken::Runner.instance.healthy? end + # Returns the global instrumentation monitor. + # Use this to subscribe to Shoryuken lifecycle events. + # + # @return [Shoryuken::Instrumentation::Notifications] the monitor instance + # + # @example Subscribe to message processing events + # Shoryuken.monitor.subscribe('message.processed') do |event| + # StatsD.timing('shoryuken.process_time', event.duration * 1000) + # end + # + # @example Subscribe to all events + # Shoryuken.monitor.subscribe do |event| + # logger.info("Event: #{event.name}") + # end + def self.monitor + @_monitor ||= Instrumentation::Notifications.new + end + + # Resets the monitor instance (useful for testing) + # + # @return [void] + # @api private + def self.reset_monitor! + @_monitor = nil + end + def_delegators( :shoryuken_options, :active_job?, diff --git a/lib/shoryuken/fetcher.rb b/lib/shoryuken/fetcher.rb index f2623827..e827fd8c 100644 --- a/lib/shoryuken/fetcher.rb +++ b/lib/shoryuken/fetcher.rb @@ -25,12 +25,14 @@ def fetch(queue, limit) fetch_with_auto_retry(3) do started_at = Time.now - logger.debug { "Looking for new messages in #{queue}" } + Shoryuken.monitor.publish('fetcher.started', queue: queue.name, limit: limit) sqs_msgs = Array(receive_messages(queue, limit)) - logger.debug { "Found #{sqs_msgs.size} messages for #{queue.name}" } unless sqs_msgs.empty? - logger.debug { "Fetcher for #{queue} completed in #{elapsed(started_at)} ms" } + Shoryuken.monitor.publish('fetcher.completed', + queue: queue.name, + message_count: sqs_msgs.size, + duration_ms: elapsed(started_at)) sqs_msgs end @@ -54,7 +56,11 @@ def fetch_with_auto_retry(max_attempts) attempts += 1 - logger.debug { "Retrying fetch attempt #{attempts} for #{e.message}" } + Shoryuken.monitor.publish('fetcher.retry', + attempt: attempts, + max_attempts: max_attempts, + error_message: e.message, + error_class: e.class.name) sleep((1..5).to_a.sample) diff --git a/lib/shoryuken/instrumentation.rb b/lib/shoryuken/instrumentation.rb new file mode 100644 index 00000000..8450e347 --- /dev/null +++ b/lib/shoryuken/instrumentation.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +require_relative 'instrumentation/event' +require_relative 'instrumentation/notifications' +require_relative 'instrumentation/logger_listener' + +module Shoryuken + # Instrumentation module providing pub/sub event notifications. + # Inspired by Karafka's instrumentation architecture. + # + # @example Subscribing to events + # Shoryuken.monitor.subscribe('message.processed') do |event| + # StatsD.timing('shoryuken.process_time', event.duration * 1000) + # end + # + module Instrumentation + end +end diff --git a/lib/shoryuken/instrumentation/event.rb b/lib/shoryuken/instrumentation/event.rb new file mode 100644 index 00000000..b51a8f29 --- /dev/null +++ b/lib/shoryuken/instrumentation/event.rb @@ -0,0 +1,62 @@ +# frozen_string_literal: true + +module Shoryuken + module Instrumentation + # Represents an instrumentation event with metadata. + # Events are published through the Notifications system and contain + # information about what happened, when, and relevant context. + # + # @example Creating an event + # event = Event.new('message.processed', queue: 'default', duration: 0.5) + # event.name # => 'message.processed' + # event[:queue] # => 'default' + # event.duration # => 0.5 + # + class Event + # @return [String] the event name (e.g., 'message.processed') + attr_reader :name + + # @return [Hash] the event payload containing contextual data + attr_reader :payload + + # @return [Time] when the event was created + attr_reader :time + + # Creates a new Event instance + # + # @param name [String] the event name using dot notation (e.g., 'message.processed') + # @param payload [Hash] contextual data for the event + def initialize(name, payload = {}) + @name = name + @payload = payload + @time = Time.now + end + + # Accesses a value from the payload by key + # + # @param key [Symbol, String] the payload key + # @return [Object, nil] the value or nil if not found + def [](key) + payload[key] + end + + # Returns the duration from the payload if present + # + # @return [Float, nil] the duration in seconds or nil + def duration + payload[:duration] + end + + # Returns a hash representation of the event + # + # @return [Hash] the event as a hash + def to_h + { + name: name, + payload: payload, + time: time + } + end + end + end +end diff --git a/lib/shoryuken/instrumentation/logger_listener.rb b/lib/shoryuken/instrumentation/logger_listener.rb new file mode 100644 index 00000000..0111f6a7 --- /dev/null +++ b/lib/shoryuken/instrumentation/logger_listener.rb @@ -0,0 +1,143 @@ +# frozen_string_literal: true + +module Shoryuken + module Instrumentation + # Default listener that logs instrumentation events. + # This provides human-readable log output for key Shoryuken events. + # + # @example Subscribing the logger listener + # Shoryuken.monitor.subscribe(&LoggerListener.new.method(:call)) + # + class LoggerListener + # Creates a new LoggerListener + # + # @param logger [Logger] the logger to use (defaults to Shoryuken.logger) + def initialize(logger = nil) + @logger = logger + end + + # Returns the logger instance + # + # @return [Logger] the logger + def logger + @logger || Shoryuken.logger + end + + # Handles an instrumentation event by logging it appropriately + # + # @param event [Event] the event to handle + # @return [void] + def call(event) + method_name = "on_#{event.name.tr('.', '_')}" + send(method_name, event) if respond_to?(method_name, true) + end + + private + + # App lifecycle events + + def on_app_started(event) + groups = event[:groups] || [] + logger.info { "Shoryuken started with #{groups.size} group(s)" } + end + + def on_app_stopping(_event) + logger.info { 'Shoryuken shutting down...' } + end + + def on_app_stopped(_event) + logger.info { 'Shoryuken stopped' } + end + + def on_app_quiet(_event) + logger.info { 'Shoryuken is quiet' } + end + + # Fetcher events + + def on_fetcher_started(event) + logger.debug { "Looking for new messages in #{event[:queue]}" } + end + + def on_fetcher_completed(event) + queue = event[:queue] + message_count = event[:message_count] || 0 + duration_ms = event[:duration_ms] + + logger.debug { "Found #{message_count} messages for #{queue}" } if message_count.positive? + logger.debug { "Fetcher for #{queue} completed in #{duration_ms} ms" } + end + + def on_fetcher_retry(event) + logger.debug { "Retrying fetch attempt #{event[:attempt]} for #{event[:error_message]}" } + end + + # Manager events + + def on_manager_dispatch(event) + logger.debug do + "Ready: #{event[:ready]}, Busy: #{event[:busy]}, Active Queues: #{event[:active_queues]}" + end + end + + def on_manager_processor_assigned(event) + logger.debug { "Assigning #{event[:message_id]}" } + end + + def on_manager_failed(event) + logger.error { "Manager failed: #{event[:error_message]}" } + logger.error { event[:backtrace].join("\n") } if event[:backtrace] + end + + # Message processing events + + def on_message_processed(event) + # Skip logging if there was an exception - error.occurred handles that + return if event[:exception] + + duration_ms = event.duration ? (event.duration * 1000).round(2) : 0 + worker = event[:worker] || 'Unknown' + queue = event[:queue] || 'Unknown' + + logger.info { "Processed #{worker}/#{queue} in #{duration_ms}ms" } + end + + def on_message_failed(event) + worker = event[:worker] || 'Unknown' + queue = event[:queue] || 'Unknown' + error = event[:error] + error_message = error.respond_to?(:message) ? error.message : error.to_s + + logger.error { "Failed #{worker}/#{queue}: #{error_message}" } + end + + # Error events + + def on_error_occurred(event) + error = event[:error] + error_class = error.respond_to?(:class) ? error.class.name : 'Unknown' + error_message = error.respond_to?(:message) ? error.message : error.to_s + type = event[:type] + + if type + logger.error { "Error in #{type}: #{error_class} - #{error_message}" } + else + logger.error { "Error occurred: #{error_class} - #{error_message}" } + end + + logger.error { error.backtrace.join("\n") } if error.respond_to?(:backtrace) && error.backtrace + end + + # Queue events + + def on_queue_polling(event) + queue = event[:queue] || 'Unknown' + logger.debug { "Polling queue: #{queue}" } + end + + def on_queue_empty(event) + logger.debug { "Queue #{event[:queue]} is empty" } + end + end + end +end diff --git a/lib/shoryuken/instrumentation/notifications.rb b/lib/shoryuken/instrumentation/notifications.rb new file mode 100644 index 00000000..3e1f4c80 --- /dev/null +++ b/lib/shoryuken/instrumentation/notifications.rb @@ -0,0 +1,206 @@ +# frozen_string_literal: true + +module Shoryuken + module Instrumentation + # A thread-safe pub/sub notification system for instrumentation events. + # Inspired by Karafka's instrumentation architecture, this allows external + # systems (APM, logging, metrics) to subscribe to Shoryuken lifecycle events. + # + # @example Subscribing to specific events + # Shoryuken.monitor.subscribe('message.processed') do |event| + # StatsD.timing('shoryuken.process_time', event.duration * 1000) + # end + # + # @example Subscribing to all events + # Shoryuken.monitor.subscribe do |event| + # logger.info("Event: #{event.name}") + # end + # + # @example Instrumenting a block + # Shoryuken.monitor.instrument('message.processed', queue: 'default') do + # process_message + # end + # + class Notifications + # List of all supported events in the system + EVENTS = %w[ + app.started + app.stopping + app.stopped + app.quiet + + fetcher.started + fetcher.completed + fetcher.retry + + manager.dispatch + manager.processor_assigned + manager.processor_done + manager.utilization_changed + manager.failed + + message.received + message.processed + message.failed + message.deleted + + worker.started + worker.completed + worker.failed + + queue.polling + queue.empty + + error.occurred + ].freeze + + # Creates a new Notifications instance + def initialize + @subscribers = Hash.new { |h, k| h[k] = [] } + @mutex = Mutex.new + end + + # Subscribes to events + # + # @param event_name [String, nil] the event name to subscribe to, or nil for all events + # @yield [Event] block called when matching events are published + # @return [void] + # + # @example Subscribe to specific event + # subscribe('message.processed') { |event| puts event.name } + # + # @example Subscribe to all events + # subscribe { |event| puts event.name } + def subscribe(event_name = nil, &block) + @mutex.synchronize do + if event_name + @subscribers[event_name] << block + else + @subscribers[:all] << block + end + end + end + + # Unsubscribes a block from events + # + # @param event_name [String, nil] the event name to unsubscribe from, or nil for all events + # @param block [Proc] the block to unsubscribe + # @return [void] + def unsubscribe(event_name = nil, &block) + @mutex.synchronize do + key = event_name || :all + @subscribers[key].delete(block) + end + end + + # Instruments a block of code, measuring its duration and publishing an event. + # Compatible with ActiveSupport::Notifications - if an exception occurs, + # it adds :exception and :exception_object to the payload and re-raises. + # + # Additionally, on exception, publishes a separate 'error.occurred' event + # (Karafka-style) with a :type key indicating the original event name. + # + # @param event_name [String] the event name to publish + # @param payload [Hash] additional data to include in the event + # @yield [payload] the code block to instrument (payload is yielded for modification) + # @return [Object] the result of the block + # + # @example Basic usage + # monitor.instrument('message.processed', queue: 'default') do + # worker.perform(message) + # end + # + # @example Checking for exceptions in subscriber + # monitor.subscribe('message.processed') do |event| + # if event[:exception] + # # Handle error case + # Sentry.capture_exception(event[:exception_object]) + # else + # # Handle success case + # StatsD.timing('process_time', event.duration) + # end + # end + # + # @example Subscribing to all errors (Karafka-style) + # monitor.subscribe('error.occurred') do |event| + # Sentry.capture_exception(event[:error], extra: { type: event[:type] }) + # end + def instrument(event_name, payload = {}) + started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + exception_raised = nil + begin + result = yield payload if block_given? + rescue Exception => e + exception_raised = e + payload[:exception] = [e.class.name, e.message] + payload[:exception_object] = e + raise e + ensure + duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at + event = Event.new(event_name, payload.merge(duration: duration)) + publish(event) + + # Publish a separate error.occurred event (Karafka-style) for centralized error handling + if exception_raised + error_payload = payload.merge( + type: event_name, + error: exception_raised, + error_class: exception_raised.class.name, + error_message: exception_raised.message, + duration: duration + ) + publish('error.occurred', error_payload) + end + end + result + end + + # Publishes an event to all matching subscribers + # + # @param event_or_name [Event, String] an Event instance or event name + # @param payload [Hash] payload hash (only used if first arg is a String) + # @return [void] + # + # @example With Event instance + # publish(Event.new('message.processed', queue: 'default')) + # + # @example With name and payload + # publish('message.processed', queue: 'default') + def publish(event_or_name, payload = {}) + event = event_or_name.is_a?(Event) ? event_or_name : Event.new(event_or_name, payload) + + subscribers_for_event = @mutex.synchronize do + @subscribers[event.name] + @subscribers[:all] + end + + subscribers_for_event.each do |subscriber| + subscriber.call(event) + rescue StandardError => e + # Log but don't raise - subscribers should not break the main flow + Shoryuken.logger.error { "Instrumentation subscriber error: #{e.message}" } + Shoryuken.logger.error { e.backtrace.join("\n") } if e.backtrace + end + end + + # Clears all subscribers (useful for testing) + # + # @return [void] + def clear + @mutex.synchronize do + @subscribers.clear + end + end + + # Returns the number of subscribers for an event + # + # @param event_name [String, nil] the event name, or nil for global subscribers + # @return [Integer] the subscriber count + def subscriber_count(event_name = nil) + @mutex.synchronize do + key = event_name || :all + @subscribers[key].size + end + end + end + end +end diff --git a/lib/shoryuken/launcher.rb b/lib/shoryuken/launcher.rb index 28205b42..5ccf735f 100644 --- a/lib/shoryuken/launcher.rb +++ b/lib/shoryuken/launcher.rb @@ -27,8 +27,6 @@ def stopping? # # @return [void] def start - logger.info { 'Starting' } - start_callback start_managers end @@ -113,8 +111,6 @@ def start_managers # # @return [void] def initiate_stop - logger.info { 'Shutting down' } - stop_callback end @@ -122,11 +118,7 @@ def initiate_stop # # @return [void] def start_callback - if (callback = Shoryuken.start_callback) - logger.debug { 'Calling start_callback' } - callback.call - end - + Shoryuken.start_callback&.call fire_event(:startup) end @@ -134,11 +126,7 @@ def start_callback # # @return [void] def stop_callback - if (callback = Shoryuken.stop_callback) - logger.debug { 'Calling stop_callback' } - callback.call - end - + Shoryuken.stop_callback&.call fire_event(:shutdown, true) end diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index 54c2cc91..ee013bba 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -93,7 +93,12 @@ def dispatch fire_event(:dispatch, false, queue_name: queue.name) - logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" } + Shoryuken.monitor.publish('manager.dispatch', + group: @group, + queue: queue.name, + ready: ready, + busy: busy, + active_queues: @polling_strategy.active_queues) batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue) rescue => e @@ -139,7 +144,10 @@ def processor_done(queue) def assign(queue_name, sqs_msg) return unless running? - logger.debug { "Assigning #{sqs_msg.message_id}" } + Shoryuken.monitor.publish('manager.processor_assigned', + group: @group, + queue: queue_name, + message_id: sqs_msg.message_id) @busy_processors.increment fire_utilization_update_event @@ -205,8 +213,12 @@ def message_id # @param ex [Exception] the exception that occurred # @return [void] def handle_dispatch_error(ex) - logger.error { "Manager failed: #{ex.message}" } - logger.error { ex.backtrace.join("\n") } unless ex.backtrace.nil? + Shoryuken.monitor.publish('manager.failed', + group: @group, + error: ex, + error_message: ex.message, + error_class: ex.class.name, + backtrace: ex.backtrace) Process.kill('USR1', Process.pid) diff --git a/lib/shoryuken/processor.rb b/lib/shoryuken/processor.rb index ca3f0878..7f6b795f 100644 --- a/lib/shoryuken/processor.rb +++ b/lib/shoryuken/processor.rb @@ -38,8 +38,10 @@ def process return logger.error { "No worker found for #{queue}" } unless worker Shoryuken::Logging.with_context("#{worker_name(worker.class, sqs_msg, body)}/#{queue}/#{sqs_msg.message_id}") do - worker.class.server_middleware.invoke(worker, queue, sqs_msg, body) do - worker.perform(sqs_msg, body) + Shoryuken.monitor.instrument('message.processed', message_payload) do + worker.class.server_middleware.invoke(worker, queue, sqs_msg, body) do + worker.perform(sqs_msg, body) + end end end end @@ -52,6 +54,8 @@ def process worker_perform.call end rescue Exception => e + # Note: message.processed event is already published by instrument() with + # :exception and :exception_object in the payload (ActiveSupport-compatible) Array(Shoryuken.exception_handlers).each { |handler| handler.call(e, queue, sqs_msg) } raise @@ -59,6 +63,17 @@ def process private + # Returns payload hash for instrumentation events + # + # @return [Hash] the payload for instrumentation + def message_payload + { + queue: queue, + message_id: sqs_msg.is_a?(Array) ? sqs_msg.map(&:message_id) : sqs_msg.message_id, + worker: worker&.class&.name + } + end + # Fetches the worker instance for processing # # @return [Object, nil] the worker instance or nil if not found diff --git a/lib/shoryuken/util.rb b/lib/shoryuken/util.rb index d4a0741c..21ff9992 100644 --- a/lib/shoryuken/util.rb +++ b/lib/shoryuken/util.rb @@ -11,7 +11,16 @@ def logger Shoryuken.logger end - # Fires a lifecycle event to all registered handlers + # Maps legacy lifecycle events to new instrumentation event names + LEGACY_EVENT_MAP = { + startup: 'app.started', + quiet: 'app.quiet', + shutdown: 'app.stopping', + stopped: 'app.stopped' + }.freeze + + # Fires a lifecycle event to all registered handlers. + # Also publishes to the instrumentation monitor for subscribers. # # @param event [Symbol] the event name to fire # @param reverse [Boolean] whether to call handlers in reverse order @@ -19,6 +28,12 @@ def logger # @return [void] def fire_event(event, reverse = false, event_options = {}) logger.debug { "Firing '#{event}' lifecycle event" } + + # Publish to the new instrumentation system + new_event_name = LEGACY_EVENT_MAP[event] || "legacy.#{event}" + Shoryuken.monitor.publish(new_event_name, event_options.merge(legacy_event: event)) + + # Maintain backward compatibility with existing callback system arr = Shoryuken.options[:lifecycle_events][event] arr.reverse! if reverse arr.each do |block| diff --git a/spec/integration/instrumentation/instrumentation_spec.rb b/spec/integration/instrumentation/instrumentation_spec.rb new file mode 100644 index 00000000..cdf82fc9 --- /dev/null +++ b/spec/integration/instrumentation/instrumentation_spec.rb @@ -0,0 +1,59 @@ +# frozen_string_literal: true + +# This spec tests the instrumentation system integration. +# It verifies that events are published during message processing. + +setup_localstack + +queue_name = DT.queue +create_test_queue(queue_name) +Shoryuken.add_group('default', 1) +Shoryuken.add_queue(queue_name, 1, 'default') + +# Reset monitor to ensure clean state +Shoryuken.reset_monitor! + +# Collect events +events_received = [] +Shoryuken.monitor.subscribe do |event| + events_received << { name: event.name, payload: event.payload.dup, time: event.time } +end + +# Worker for testing +worker_class = Class.new do + include Shoryuken::Worker + + shoryuken_options auto_delete: true, batch: false + + def perform(sqs_msg, body) + DT[:processed] << { message_id: sqs_msg.message_id, body: body } + end +end + +worker_class.get_shoryuken_options['queue'] = queue_name +Shoryuken.register_worker(queue_name, worker_class) + +# Send test messages +2.times { |i| Shoryuken::Client.queues(queue_name).send_message(message_body: "instrumentation-test-#{i}") } + +sleep 1 + +poll_queues_until { DT[:processed].size >= 2 } + +# Verify messages were processed +assert_equal(2, DT[:processed].size) + +# Verify instrumentation events were captured +processed_events = events_received.select { |e| e[:name] == 'message.processed' } +assert(processed_events.size >= 2, "Should have at least 2 message.processed events, got #{processed_events.size}") + +# Verify event payloads contain expected data +processed_events.each do |event| + assert_equal(queue_name, event[:payload][:queue], 'Event should include queue name') + assert(event[:payload][:message_id], 'Event should include message_id') + assert(event[:payload][:duration], 'Event should include duration') + assert(event[:payload][:duration] >= 0, 'Duration should be non-negative') +end + +# Cleanup +Shoryuken.reset_monitor! diff --git a/spec/lib/shoryuken/fetcher_spec.rb b/spec/lib/shoryuken/fetcher_spec.rb index 61befe74..5c84e498 100644 --- a/spec/lib/shoryuken/fetcher_spec.rb +++ b/spec/lib/shoryuken/fetcher_spec.rb @@ -35,20 +35,22 @@ subject.fetch(queue_config, limit) end - it 'logs debug only' do + it 'publishes fetcher events' do # See https://github.com/ruby-shoryuken/shoryuken/issues/435 - logger = double 'logger' - - allow(subject).to receive(:logger).and_return(logger) + # Fetcher should publish events instead of direct logging + events = [] + Shoryuken.monitor.subscribe('fetcher.started') { |e| events << e } + Shoryuken.monitor.subscribe('fetcher.completed') { |e| events << e } expect(Shoryuken::Client).to receive(:queues).with(queue_name).and_return(queue) - expect(queue).to receive(:receive_messages).and_return([double('SQS Msg')]) - expect(logger).to receive(:debug).exactly(3).times - expect(logger).to_not receive(:info) - subject.fetch(queue_config, limit) + + expect(events.size).to eq(2) + expect(events.map(&:name)).to eq(%w[fetcher.started fetcher.completed]) + expect(events.first[:queue]).to eq(queue_name) + expect(events.last[:message_count]).to eq(1) end context 'when receive options per queue' do diff --git a/spec/lib/shoryuken/instrumentation/event_spec.rb b/spec/lib/shoryuken/instrumentation/event_spec.rb new file mode 100644 index 00000000..283c37f4 --- /dev/null +++ b/spec/lib/shoryuken/instrumentation/event_spec.rb @@ -0,0 +1,69 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Shoryuken::Instrumentation::Event do + describe '#initialize' do + it 'sets the name' do + event = described_class.new('message.processed') + expect(event.name).to eq('message.processed') + end + + it 'sets the payload' do + event = described_class.new('message.processed', queue: 'default', worker: 'TestWorker') + expect(event.payload).to eq(queue: 'default', worker: 'TestWorker') + end + + it 'defaults payload to empty hash' do + event = described_class.new('message.processed') + expect(event.payload).to eq({}) + end + + it 'sets the time' do + freeze_time = Time.now + allow(Time).to receive(:now).and_return(freeze_time) + + event = described_class.new('message.processed') + expect(event.time).to eq(freeze_time) + end + end + + describe '#[]' do + it 'returns payload value by key' do + event = described_class.new('message.processed', queue: 'default') + expect(event[:queue]).to eq('default') + end + + it 'returns nil for missing key' do + event = described_class.new('message.processed') + expect(event[:missing]).to be_nil + end + end + + describe '#duration' do + it 'returns duration from payload' do + event = described_class.new('message.processed', duration: 1.5) + expect(event.duration).to eq(1.5) + end + + it 'returns nil if duration not set' do + event = described_class.new('message.processed') + expect(event.duration).to be_nil + end + end + + describe '#to_h' do + it 'returns hash representation' do + freeze_time = Time.now + allow(Time).to receive(:now).and_return(freeze_time) + + event = described_class.new('message.processed', queue: 'default') + + expect(event.to_h).to eq( + name: 'message.processed', + payload: { queue: 'default' }, + time: freeze_time + ) + end + end +end diff --git a/spec/lib/shoryuken/instrumentation/logger_listener_spec.rb b/spec/lib/shoryuken/instrumentation/logger_listener_spec.rb new file mode 100644 index 00000000..97b9d2fc --- /dev/null +++ b/spec/lib/shoryuken/instrumentation/logger_listener_spec.rb @@ -0,0 +1,303 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Shoryuken::Instrumentation::LoggerListener do + let(:logger) { instance_double(Logger, info: nil, error: nil, debug: nil, warn: nil) } + let(:listener) { described_class.new(logger) } + + describe '#initialize' do + it 'accepts a custom logger' do + expect(listener.logger).to eq(logger) + end + + it 'defaults to Shoryuken.logger' do + listener_without_logger = described_class.new + expect(listener_without_logger.logger).to eq(Shoryuken.logger) + end + end + + describe '#call' do + context 'with app.started event' do + it 'logs info message' do + event = Shoryuken::Instrumentation::Event.new('app.started', groups: %w[default priority]) + + expect(logger).to receive(:info).and_yield.and_return('Shoryuken started with 2 group(s)') + + listener.call(event) + end + end + + context 'with app.stopping event' do + it 'logs info message' do + event = Shoryuken::Instrumentation::Event.new('app.stopping') + + expect(logger).to receive(:info).and_yield.and_return('Shoryuken shutting down...') + + listener.call(event) + end + end + + context 'with app.stopped event' do + it 'logs info message' do + event = Shoryuken::Instrumentation::Event.new('app.stopped') + + expect(logger).to receive(:info).and_yield.and_return('Shoryuken stopped') + + listener.call(event) + end + end + + context 'with app.quiet event' do + it 'logs info message' do + event = Shoryuken::Instrumentation::Event.new('app.quiet') + + expect(logger).to receive(:info).and_yield.and_return('Shoryuken is quiet') + + listener.call(event) + end + end + + context 'with fetcher.started event' do + it 'logs debug message' do + event = Shoryuken::Instrumentation::Event.new('fetcher.started', queue: 'default', limit: 10) + + expect(logger).to receive(:debug).and_yield.and_return('Looking for new messages in default') + + listener.call(event) + end + end + + context 'with fetcher.completed event' do + it 'logs debug message with message count' do + event = Shoryuken::Instrumentation::Event.new( + 'fetcher.completed', + queue: 'default', + message_count: 5, + duration_ms: 123.45 + ) + + expect(logger).to receive(:debug).twice + + listener.call(event) + end + + it 'does not log found messages when count is zero' do + event = Shoryuken::Instrumentation::Event.new( + 'fetcher.completed', + queue: 'default', + message_count: 0, + duration_ms: 50.0 + ) + + # Only one debug call for completion, not for "found messages" + expect(logger).to receive(:debug).once + + listener.call(event) + end + end + + context 'with fetcher.retry event' do + it 'logs debug message with attempt info' do + event = Shoryuken::Instrumentation::Event.new( + 'fetcher.retry', + attempt: 2, + max_attempts: 3, + error_message: 'Connection timeout' + ) + + expect(logger).to receive(:debug).and_yield.and_return('Retrying fetch attempt 2 for Connection timeout') + + listener.call(event) + end + end + + context 'with manager.dispatch event' do + it 'logs debug message with state info' do + event = Shoryuken::Instrumentation::Event.new( + 'manager.dispatch', + group: 'default', + queue: 'my_queue', + ready: 5, + busy: 3, + active_queues: %w[queue1 queue2] + ) + + expect(logger).to receive(:debug) + + listener.call(event) + end + end + + context 'with manager.processor_assigned event' do + it 'logs debug message with message id' do + event = Shoryuken::Instrumentation::Event.new( + 'manager.processor_assigned', + group: 'default', + queue: 'my_queue', + message_id: 'msg-123' + ) + + expect(logger).to receive(:debug).and_yield.and_return('Assigning msg-123') + + listener.call(event) + end + end + + context 'with manager.failed event' do + it 'logs error message and backtrace' do + event = Shoryuken::Instrumentation::Event.new( + 'manager.failed', + group: 'default', + error_message: 'Something went wrong', + backtrace: ['line1', 'line2'] + ) + + expect(logger).to receive(:error).twice + + listener.call(event) + end + + it 'handles missing backtrace' do + event = Shoryuken::Instrumentation::Event.new( + 'manager.failed', + group: 'default', + error_message: 'Something went wrong' + ) + + expect(logger).to receive(:error).once + + listener.call(event) + end + end + + context 'with message.processed event' do + it 'logs info message with duration' do + event = Shoryuken::Instrumentation::Event.new( + 'message.processed', + queue: 'default', + worker: 'TestWorker', + duration: 0.12345 + ) + + expect(logger).to receive(:info).and_yield.and_return('Processed TestWorker/default in 123.45ms') + + listener.call(event) + end + + it 'handles missing duration' do + event = Shoryuken::Instrumentation::Event.new( + 'message.processed', + queue: 'default', + worker: 'TestWorker' + ) + + expect(logger).to receive(:info) + + listener.call(event) + end + + it 'does not log when exception is present' do + event = Shoryuken::Instrumentation::Event.new( + 'message.processed', + queue: 'default', + worker: 'TestWorker', + duration: 0.12345, + exception: ['StandardError', 'test error'] + ) + + expect(logger).not_to receive(:info) + + listener.call(event) + end + end + + context 'with message.failed event' do + it 'logs error message' do + error = StandardError.new('Something went wrong') + event = Shoryuken::Instrumentation::Event.new( + 'message.failed', + queue: 'default', + worker: 'TestWorker', + error: error + ) + + expect(logger).to receive(:error).and_yield.and_return('Failed TestWorker/default: Something went wrong') + + listener.call(event) + end + end + + context 'with error.occurred event' do + it 'logs error message with class name' do + error = ArgumentError.new('Invalid argument') + event = Shoryuken::Instrumentation::Event.new( + 'error.occurred', + error: error + ) + + expect(logger).to receive(:error).at_least(:once) + + listener.call(event) + end + + it 'includes type when present' do + error = ArgumentError.new('Invalid argument') + event = Shoryuken::Instrumentation::Event.new( + 'error.occurred', + error: error, + type: 'message.processed' + ) + + expect(logger).to receive(:error).at_least(:once) + + listener.call(event) + end + + it 'logs backtrace when present' do + error = ArgumentError.new('Invalid argument') + error.set_backtrace(['line1', 'line2']) + event = Shoryuken::Instrumentation::Event.new( + 'error.occurred', + error: error + ) + + expect(logger).to receive(:error).twice + + listener.call(event) + end + end + + context 'with queue.polling event' do + it 'logs debug message' do + event = Shoryuken::Instrumentation::Event.new('queue.polling', queue: 'default') + + expect(logger).to receive(:debug).and_yield.and_return('Polling queue: default') + + listener.call(event) + end + end + + context 'with queue.empty event' do + it 'logs debug message' do + event = Shoryuken::Instrumentation::Event.new('queue.empty', queue: 'default') + + expect(logger).to receive(:debug).and_yield.and_return('Queue default is empty') + + listener.call(event) + end + end + + context 'with unknown event' do + it 'does not log anything' do + event = Shoryuken::Instrumentation::Event.new('unknown.event') + + expect(logger).not_to receive(:info) + expect(logger).not_to receive(:error) + expect(logger).not_to receive(:debug) + + listener.call(event) + end + end + end +end diff --git a/spec/lib/shoryuken/instrumentation/notifications_spec.rb b/spec/lib/shoryuken/instrumentation/notifications_spec.rb new file mode 100644 index 00000000..2f378d37 --- /dev/null +++ b/spec/lib/shoryuken/instrumentation/notifications_spec.rb @@ -0,0 +1,360 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Shoryuken::Instrumentation::Notifications do + let(:notifications) { described_class.new } + + after do + notifications.clear + end + + describe '#subscribe' do + it 'subscribes to specific event' do + events = [] + notifications.subscribe('message.processed') { |e| events << e } + + notifications.publish('message.processed', queue: 'default') + notifications.publish('message.failed', queue: 'default') + + expect(events.size).to eq(1) + expect(events.first.name).to eq('message.processed') + end + + it 'subscribes to all events when no event name given' do + events = [] + notifications.subscribe { |e| events << e } + + notifications.publish('message.processed', queue: 'default') + notifications.publish('message.failed', queue: 'default') + + expect(events.size).to eq(2) + end + + it 'allows multiple subscribers for same event' do + counter = { a: 0, b: 0 } + notifications.subscribe('message.processed') { counter[:a] += 1 } + notifications.subscribe('message.processed') { counter[:b] += 1 } + + notifications.publish('message.processed') + + expect(counter[:a]).to eq(1) + expect(counter[:b]).to eq(1) + end + end + + describe '#unsubscribe' do + it 'removes subscriber from specific event' do + events = [] + block = ->(e) { events << e } + notifications.subscribe('message.processed', &block) + + notifications.publish('message.processed') + expect(events.size).to eq(1) + + notifications.unsubscribe('message.processed', &block) + notifications.publish('message.processed') + expect(events.size).to eq(1) + end + + it 'removes global subscriber' do + events = [] + block = ->(e) { events << e } + notifications.subscribe(&block) + + notifications.publish('message.processed') + expect(events.size).to eq(1) + + notifications.unsubscribe(&block) + notifications.publish('message.processed') + expect(events.size).to eq(1) + end + end + + describe '#instrument' do + it 'executes the block' do + result = notifications.instrument('message.processed') { 'hello' } + expect(result).to eq('hello') + end + + it 'publishes event with duration' do + events = [] + notifications.subscribe('message.processed') { |e| events << e } + + notifications.instrument('message.processed', queue: 'default') { sleep 0.01 } + + expect(events.size).to eq(1) + expect(events.first.duration).to be >= 0.01 + expect(events.first[:queue]).to eq('default') + end + + it 'publishes event even without block' do + events = [] + notifications.subscribe('message.processed') { |e| events << e } + + notifications.instrument('message.processed', queue: 'default') + + expect(events.size).to eq(1) + expect(events.first.duration).to be_a(Float) + end + + it 'yields payload to allow modification' do + events = [] + notifications.subscribe('message.processed') { |e| events << e } + + notifications.instrument('message.processed', queue: 'default') do |payload| + payload[:custom_key] = 'custom_value' + end + + expect(events.first[:custom_key]).to eq('custom_value') + end + + context 'when exception occurs (ActiveSupport-compatible)' do + it 'adds :exception to payload with class name and message' do + events = [] + notifications.subscribe('message.processed') { |e| events << e } + + expect do + notifications.instrument('message.processed', queue: 'default') do + raise ArgumentError, 'invalid argument' + end + end.to raise_error(ArgumentError, 'invalid argument') + + expect(events.size).to eq(1) + expect(events.first[:exception]).to eq(['ArgumentError', 'invalid argument']) + end + + it 'adds :exception_object to payload' do + events = [] + notifications.subscribe('message.processed') { |e| events << e } + + expect do + notifications.instrument('message.processed', queue: 'default') do + raise StandardError, 'test error' + end + end.to raise_error(StandardError) + + expect(events.first[:exception_object]).to be_a(StandardError) + expect(events.first[:exception_object].message).to eq('test error') + end + + it 'still publishes event with duration' do + events = [] + notifications.subscribe('message.processed') { |e| events << e } + + expect do + notifications.instrument('message.processed', queue: 'default') do + sleep 0.01 + raise 'error' + end + end.to raise_error(RuntimeError) + + expect(events.first.duration).to be >= 0.01 + end + + it 're-raises the exception' do + expect do + notifications.instrument('message.processed') do + raise ArgumentError, 'test' + end + end.to raise_error(ArgumentError, 'test') + end + end + + context 'when exception occurs (Karafka-style error.occurred)' do + it 'publishes error.occurred event with type key' do + error_events = [] + notifications.subscribe('error.occurred') { |e| error_events << e } + + expect do + notifications.instrument('message.processed', queue: 'default') do + raise StandardError, 'test error' + end + end.to raise_error(StandardError) + + expect(error_events.size).to eq(1) + expect(error_events.first[:type]).to eq('message.processed') + end + + it 'includes error object in error.occurred event' do + error_events = [] + notifications.subscribe('error.occurred') { |e| error_events << e } + + expect do + notifications.instrument('message.processed', queue: 'default') do + raise ArgumentError, 'invalid argument' + end + end.to raise_error(ArgumentError) + + expect(error_events.first[:error]).to be_a(ArgumentError) + expect(error_events.first[:error].message).to eq('invalid argument') + end + + it 'includes error_class and error_message in error.occurred event' do + error_events = [] + notifications.subscribe('error.occurred') { |e| error_events << e } + + expect do + notifications.instrument('message.processed', queue: 'default') do + raise RuntimeError, 'something went wrong' + end + end.to raise_error(RuntimeError) + + expect(error_events.first[:error_class]).to eq('RuntimeError') + expect(error_events.first[:error_message]).to eq('something went wrong') + end + + it 'includes original payload in error.occurred event' do + error_events = [] + notifications.subscribe('error.occurred') { |e| error_events << e } + + expect do + notifications.instrument('message.processed', queue: 'default', worker: 'TestWorker') do + raise StandardError, 'test error' + end + end.to raise_error(StandardError) + + expect(error_events.first[:queue]).to eq('default') + expect(error_events.first[:worker]).to eq('TestWorker') + end + + it 'includes duration in error.occurred event' do + error_events = [] + notifications.subscribe('error.occurred') { |e| error_events << e } + + expect do + notifications.instrument('message.processed', queue: 'default') do + sleep 0.01 + raise StandardError, 'test error' + end + end.to raise_error(StandardError) + + expect(error_events.first[:duration]).to be >= 0.01 + end + + it 'does not publish error.occurred on success' do + error_events = [] + notifications.subscribe('error.occurred') { |e| error_events << e } + + notifications.instrument('message.processed', queue: 'default') { 'success' } + + expect(error_events).to be_empty + end + end + end + + describe '#publish' do + it 'publishes event by name and payload' do + events = [] + notifications.subscribe('message.processed') { |e| events << e } + + notifications.publish('message.processed', queue: 'default') + + expect(events.size).to eq(1) + expect(events.first.name).to eq('message.processed') + expect(events.first[:queue]).to eq('default') + end + + it 'publishes existing Event instance' do + events = [] + notifications.subscribe('message.processed') { |e| events << e } + + event = Shoryuken::Instrumentation::Event.new('message.processed', queue: 'default') + notifications.publish(event) + + expect(events.size).to eq(1) + expect(events.first).to eq(event) + end + + it 'does not raise when subscriber raises' do + notifications.subscribe('message.processed') { raise 'boom' } + + expect { notifications.publish('message.processed') }.not_to raise_error + end + + it 'logs error when subscriber raises' do + notifications.subscribe('message.processed') { raise 'boom' } + + expect(Shoryuken.logger).to receive(:error).at_least(:once) + notifications.publish('message.processed') + end + + it 'continues to other subscribers when one raises' do + events = [] + notifications.subscribe('message.processed') { raise 'boom' } + notifications.subscribe('message.processed') { |e| events << e } + + notifications.publish('message.processed') + + expect(events.size).to eq(1) + end + end + + describe '#clear' do + it 'removes all subscribers' do + notifications.subscribe('message.processed') { } + notifications.subscribe { } + + expect(notifications.subscriber_count('message.processed')).to eq(1) + expect(notifications.subscriber_count).to eq(1) + + notifications.clear + + expect(notifications.subscriber_count('message.processed')).to eq(0) + expect(notifications.subscriber_count).to eq(0) + end + end + + describe '#subscriber_count' do + it 'returns count for specific event' do + notifications.subscribe('message.processed') { } + notifications.subscribe('message.processed') { } + notifications.subscribe('message.failed') { } + + expect(notifications.subscriber_count('message.processed')).to eq(2) + expect(notifications.subscriber_count('message.failed')).to eq(1) + end + + it 'returns count for global subscribers' do + notifications.subscribe { } + notifications.subscribe { } + + expect(notifications.subscriber_count).to eq(2) + end + + it 'returns 0 for events with no subscribers' do + expect(notifications.subscriber_count('nonexistent')).to eq(0) + end + end + + describe 'thread safety' do + it 'handles concurrent subscriptions' do + threads = 10.times.map do + Thread.new do + 10.times do + notifications.subscribe('message.processed') { } + end + end + end + + threads.each(&:join) + + expect(notifications.subscriber_count('message.processed')).to eq(100) + end + + it 'handles concurrent publishing' do + counter = Concurrent::AtomicFixnum.new(0) + notifications.subscribe('message.processed') { counter.increment } + + threads = 10.times.map do + Thread.new do + 10.times { notifications.publish('message.processed') } + end + end + + threads.each(&:join) + + expect(counter.value).to eq(100) + end + end +end diff --git a/spec/lib/shoryuken/manager_spec.rb b/spec/lib/shoryuken/manager_spec.rb index bc3db4e5..5756bb41 100644 --- a/spec/lib/shoryuken/manager_spec.rb +++ b/spec/lib/shoryuken/manager_spec.rb @@ -14,6 +14,11 @@ let(:concurrency) { 1 } let(:executor) { Concurrent::ImmediateExecutor.new } + # Helper to create proper SQS message doubles + def sqs_message(id: SecureRandom.uuid, body: 'test') + double(Shoryuken::Message, message_id: id, body: body, receipt_handle: SecureRandom.uuid) + end + subject { Shoryuken::Manager.new('default', fetcher, polling_strategy, concurrency, executor) } before do @@ -64,7 +69,7 @@ end specify do - message = ['test1'] + message = sqs_message(id: 'msg-123') messages = [message] q = Shoryuken::Polling::QueueConfiguration.new(queue, {}) @@ -101,7 +106,7 @@ context 'when batch' do specify do - messages = %w[test1 test2 test3] + messages = [sqs_message(id: 'msg-1'), sqs_message(id: 'msg-2'), sqs_message(id: 'msg-3')] q = Shoryuken::Polling::QueueConfiguration.new(queue, {}) expect(fetcher).to receive(:fetch).with(q, described_class::BATCH_LIMIT).and_return(messages) @@ -142,13 +147,16 @@ describe '#dispatch_single_messages' do let(:concurrency) { 3 } - it 'assings messages from batch one by one' do + it 'assigns messages from batch one by one' do q = polling_strategy.next_queue - messages = [1, 2, 3] + msg1 = sqs_message(id: 'msg-1') + msg2 = sqs_message(id: 'msg-2') + msg3 = sqs_message(id: 'msg-3') + messages = [msg1, msg2, msg3] expect(fetcher).to receive(:fetch).with(q, concurrency).and_return(messages) - expect_any_instance_of(described_class).to receive(:assign).with(q.name, 1) - expect_any_instance_of(described_class).to receive(:assign).with(q.name, 2) - expect_any_instance_of(described_class).to receive(:assign).with(q.name, 3) + expect_any_instance_of(described_class).to receive(:assign).with(q.name, msg1) + expect_any_instance_of(described_class).to receive(:assign).with(q.name, msg2) + expect_any_instance_of(described_class).to receive(:assign).with(q.name, msg3) subject.send(:dispatch_single_messages, q) end end diff --git a/spec/lib/shoryuken/monitor_spec.rb b/spec/lib/shoryuken/monitor_spec.rb new file mode 100644 index 00000000..7abffc67 --- /dev/null +++ b/spec/lib/shoryuken/monitor_spec.rb @@ -0,0 +1,49 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe 'Shoryuken.monitor' do + after do + Shoryuken.reset_monitor! + end + + describe '.monitor' do + it 'returns a Notifications instance' do + expect(Shoryuken.monitor).to be_a(Shoryuken::Instrumentation::Notifications) + end + + it 'returns the same instance on multiple calls' do + monitor1 = Shoryuken.monitor + monitor2 = Shoryuken.monitor + expect(monitor1).to be(monitor2) + end + + it 'allows subscribing to events' do + events = [] + Shoryuken.monitor.subscribe('test.event') { |e| events << e } + + Shoryuken.monitor.publish('test.event', key: 'value') + + expect(events.size).to eq(1) + expect(events.first[:key]).to eq('value') + end + end + + describe '.reset_monitor!' do + it 'creates a new monitor instance' do + original = Shoryuken.monitor + Shoryuken.reset_monitor! + expect(Shoryuken.monitor).not_to be(original) + end + + it 'clears subscribers' do + events = [] + Shoryuken.monitor.subscribe('test.event') { |e| events << e } + + Shoryuken.reset_monitor! + Shoryuken.monitor.publish('test.event') + + expect(events).to be_empty + end + end +end diff --git a/spec/lib/shoryuken/processor_spec.rb b/spec/lib/shoryuken/processor_spec.rb index b131efcf..c32091d2 100644 --- a/spec/lib/shoryuken/processor_spec.rb +++ b/spec/lib/shoryuken/processor_spec.rb @@ -34,6 +34,63 @@ subject.process end + context 'instrumentation' do + before do + Shoryuken.reset_monitor! + end + + after do + Shoryuken.reset_monitor! + end + + it 'publishes message.processed event on success' do + events = [] + Shoryuken.monitor.subscribe('message.processed') { |e| events << e } + + allow_any_instance_of(TestWorker).to receive(:perform) + + subject.process + + expect(events.size).to eq(1) + expect(events.first[:queue]).to eq(queue) + expect(events.first[:message_id]).to eq(sqs_msg.message_id) + expect(events.first[:worker]).to eq('TestWorker') + expect(events.first.duration).to be_a(Float) + end + + it 'includes exception info in message.processed event on error (ActiveSupport-compatible)' do + events = [] + Shoryuken.monitor.subscribe('message.processed') { |e| events << e } + + allow_any_instance_of(TestWorker).to receive(:perform).and_raise(StandardError, 'test error') + + expect { subject.process }.to raise_error(StandardError, 'test error') + + expect(events.size).to eq(1) + expect(events.first[:queue]).to eq(queue) + expect(events.first[:message_id]).to eq(sqs_msg.message_id) + expect(events.first[:exception]).to eq(['StandardError', 'test error']) + expect(events.first[:exception_object]).to be_a(StandardError) + expect(events.first[:exception_object].message).to eq('test error') + end + + it 'publishes error.occurred event on error (Karafka-style)' do + error_events = [] + Shoryuken.monitor.subscribe('error.occurred') { |e| error_events << e } + + allow_any_instance_of(TestWorker).to receive(:perform).and_raise(StandardError, 'test error') + + expect { subject.process }.to raise_error(StandardError, 'test error') + + expect(error_events.size).to eq(1) + expect(error_events.first[:type]).to eq('message.processed') + expect(error_events.first[:queue]).to eq(queue) + expect(error_events.first[:error]).to be_a(StandardError) + expect(error_events.first[:error_class]).to eq('StandardError') + expect(error_events.first[:error_message]).to eq('test error') + end + end + context 'when custom middleware' do let(:queue) { 'worker_called_middleware' } diff --git a/spec/lib/shoryuken/util_spec.rb b/spec/lib/shoryuken/util_spec.rb index 0a081e25..b9fb2adb 100644 --- a/spec/lib/shoryuken/util_spec.rb +++ b/spec/lib/shoryuken/util_spec.rb @@ -52,8 +52,13 @@ let(:callback_without_options) { proc { value_holder.value = :without_options } } let(:callback_with_options) { proc { |options| value_holder.value = [:with_options, options] } } + before do + Shoryuken.reset_monitor! + end + after :all do Shoryuken.options[:lifecycle_events].delete(:some_event) + Shoryuken.reset_monitor! end it 'triggers callbacks that do not accept arguments' do @@ -69,5 +74,40 @@ expect(value_holder).to receive(:value=).with([:with_options, { my_option: :some_option }]) subject.fire_event(:some_event, false, my_option: :some_option) end + + context 'instrumentation' do + it 'publishes mapped event for known lifecycle events' do + events = [] + Shoryuken.monitor.subscribe('app.started') { |e| events << e } + + Shoryuken.options[:lifecycle_events][:startup] = [] + subject.fire_event(:startup) + + expect(events.size).to eq(1) + expect(events.first.name).to eq('app.started') + expect(events.first[:legacy_event]).to eq(:startup) + end + + it 'publishes legacy.* event for unknown lifecycle events' do + events = [] + Shoryuken.monitor.subscribe('legacy.dispatch') { |e| events << e } + + Shoryuken.options[:lifecycle_events][:dispatch] = [] + subject.fire_event(:dispatch) + + expect(events.size).to eq(1) + expect(events.first.name).to eq('legacy.dispatch') + end + + it 'includes event_options in the payload' do + events = [] + Shoryuken.monitor.subscribe('app.stopped') { |e| events << e } + + Shoryuken.options[:lifecycle_events][:stopped] = [] + subject.fire_event(:stopped, false, custom_key: 'custom_value') + + expect(events.first[:custom_key]).to eq('custom_value') + end + end end end