Skip to content
Open
26 changes: 26 additions & 0 deletions lib/shoryuken.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,32 @@ def self.healthy?
Shoryuken::Runner.instance.healthy?
end

# Returns the global instrumentation monitor.
# Use this to subscribe to Shoryuken lifecycle events.
#
# @return [Shoryuken::Instrumentation::Notifications] the monitor instance
#
# @example Subscribe to message processing events
# Shoryuken.monitor.subscribe('message.processed') do |event|
# StatsD.timing('shoryuken.process_time', event.duration * 1000)
# end
#
# @example Subscribe to all events
# Shoryuken.monitor.subscribe do |event|
# logger.info("Event: #{event.name}")
# end
def self.monitor
@_monitor ||= Instrumentation::Notifications.new
end

# Resets the monitor instance (useful for testing)
#
# @return [void]
# @api private
def self.reset_monitor!
@_monitor = nil
end

def_delegators(
:shoryuken_options,
:active_job?,
Expand Down
14 changes: 10 additions & 4 deletions lib/shoryuken/fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ def fetch(queue, limit)
fetch_with_auto_retry(3) do
started_at = Time.now

logger.debug { "Looking for new messages in #{queue}" }
Shoryuken.monitor.publish('fetcher.started', queue: queue.name, limit: limit)

sqs_msgs = Array(receive_messages(queue, limit))

logger.debug { "Found #{sqs_msgs.size} messages for #{queue.name}" } unless sqs_msgs.empty?
logger.debug { "Fetcher for #{queue} completed in #{elapsed(started_at)} ms" }
Shoryuken.monitor.publish('fetcher.completed',
queue: queue.name,
message_count: sqs_msgs.size,
duration_ms: elapsed(started_at))

sqs_msgs
end
Expand All @@ -54,7 +56,11 @@ def fetch_with_auto_retry(max_attempts)

attempts += 1

logger.debug { "Retrying fetch attempt #{attempts} for #{e.message}" }
Shoryuken.monitor.publish('fetcher.retry',
attempt: attempts,
max_attempts: max_attempts,
error_message: e.message,
error_class: e.class.name)

sleep((1..5).to_a.sample)

Expand Down
18 changes: 18 additions & 0 deletions lib/shoryuken/instrumentation.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

require_relative 'instrumentation/event'
require_relative 'instrumentation/notifications'
require_relative 'instrumentation/logger_listener'

module Shoryuken
# Instrumentation module providing pub/sub event notifications.
# Inspired by Karafka's instrumentation architecture.
#
# @example Subscribing to events
# Shoryuken.monitor.subscribe('message.processed') do |event|
# StatsD.timing('shoryuken.process_time', event.duration * 1000)
# end
#
module Instrumentation
end
end
62 changes: 62 additions & 0 deletions lib/shoryuken/instrumentation/event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# frozen_string_literal: true

module Shoryuken
module Instrumentation
# Represents an instrumentation event with metadata.
# Events are published through the Notifications system and contain
# information about what happened, when, and relevant context.
#
# @example Creating an event
# event = Event.new('message.processed', queue: 'default', duration: 0.5)
# event.name # => 'message.processed'
# event[:queue] # => 'default'
# event.duration # => 0.5
#
class Event
# @return [String] the event name (e.g., 'message.processed')
attr_reader :name

# @return [Hash] the event payload containing contextual data
attr_reader :payload

# @return [Time] when the event was created
attr_reader :time

# Creates a new Event instance
#
# @param name [String] the event name using dot notation (e.g., 'message.processed')
# @param payload [Hash] contextual data for the event
def initialize(name, payload = {})
@name = name
@payload = payload
@time = Time.now
end

# Accesses a value from the payload by key
#
# @param key [Symbol, String] the payload key
# @return [Object, nil] the value or nil if not found
def [](key)
payload[key]
end

# Returns the duration from the payload if present
#
# @return [Float, nil] the duration in seconds or nil
def duration
payload[:duration]
end

# Returns a hash representation of the event
#
# @return [Hash] the event as a hash
def to_h
{
name: name,
payload: payload,
time: time
}
end
end
end
end
143 changes: 143 additions & 0 deletions lib/shoryuken/instrumentation/logger_listener.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# frozen_string_literal: true

module Shoryuken
module Instrumentation
# Default listener that logs instrumentation events.
# This provides human-readable log output for key Shoryuken events.
#
# @example Subscribing the logger listener
# Shoryuken.monitor.subscribe(&LoggerListener.new.method(:call))
#
class LoggerListener
# Creates a new LoggerListener
#
# @param logger [Logger] the logger to use (defaults to Shoryuken.logger)
def initialize(logger = nil)
@logger = logger
end

# Returns the logger instance
#
# @return [Logger] the logger
def logger
@logger || Shoryuken.logger
end

# Handles an instrumentation event by logging it appropriately
#
# @param event [Event] the event to handle
# @return [void]
def call(event)
method_name = "on_#{event.name.tr('.', '_')}"
send(method_name, event) if respond_to?(method_name, true)
end

private

# App lifecycle events

def on_app_started(event)
groups = event[:groups] || []
logger.info { "Shoryuken started with #{groups.size} group(s)" }
end

def on_app_stopping(_event)
logger.info { 'Shoryuken shutting down...' }
end

def on_app_stopped(_event)
logger.info { 'Shoryuken stopped' }
end

def on_app_quiet(_event)
logger.info { 'Shoryuken is quiet' }
end

# Fetcher events

def on_fetcher_started(event)
logger.debug { "Looking for new messages in #{event[:queue]}" }
end

def on_fetcher_completed(event)
queue = event[:queue]
message_count = event[:message_count] || 0
duration_ms = event[:duration_ms]

logger.debug { "Found #{message_count} messages for #{queue}" } if message_count.positive?
logger.debug { "Fetcher for #{queue} completed in #{duration_ms} ms" }
end

def on_fetcher_retry(event)
logger.debug { "Retrying fetch attempt #{event[:attempt]} for #{event[:error_message]}" }
end

# Manager events

def on_manager_dispatch(event)
logger.debug do
"Ready: #{event[:ready]}, Busy: #{event[:busy]}, Active Queues: #{event[:active_queues]}"
end
end

def on_manager_processor_assigned(event)
logger.debug { "Assigning #{event[:message_id]}" }
end

def on_manager_failed(event)
logger.error { "Manager failed: #{event[:error_message]}" }
logger.error { event[:backtrace].join("\n") } if event[:backtrace]
end

# Message processing events

def on_message_processed(event)
# Skip logging if there was an exception - error.occurred handles that
return if event[:exception]

duration_ms = event.duration ? (event.duration * 1000).round(2) : 0
worker = event[:worker] || 'Unknown'
queue = event[:queue] || 'Unknown'

logger.info { "Processed #{worker}/#{queue} in #{duration_ms}ms" }
end

def on_message_failed(event)
worker = event[:worker] || 'Unknown'
queue = event[:queue] || 'Unknown'
error = event[:error]
error_message = error.respond_to?(:message) ? error.message : error.to_s

logger.error { "Failed #{worker}/#{queue}: #{error_message}" }
end

# Error events

def on_error_occurred(event)
error = event[:error]
error_class = error.respond_to?(:class) ? error.class.name : 'Unknown'
error_message = error.respond_to?(:message) ? error.message : error.to_s
type = event[:type]

if type
logger.error { "Error in #{type}: #{error_class} - #{error_message}" }
else
logger.error { "Error occurred: #{error_class} - #{error_message}" }
end

logger.error { error.backtrace.join("\n") } if error.respond_to?(:backtrace) && error.backtrace
end

# Queue events

def on_queue_polling(event)
queue = event[:queue] || 'Unknown'
logger.debug { "Polling queue: #{queue}" }
end

def on_queue_empty(event)
logger.debug { "Queue #{event[:queue]} is empty" }
end
end
end
end
Loading