Skip to content
26 changes: 26 additions & 0 deletions lib/shoryuken.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,32 @@ def self.healthy?
Shoryuken::Runner.instance.healthy?
end

# Returns the global instrumentation monitor.
# Use this to subscribe to Shoryuken lifecycle events.
#
# @return [Shoryuken::Instrumentation::Notifications] the monitor instance
#
# @example Subscribe to message processing events
# Shoryuken.monitor.subscribe('message.processed') do |event|
# StatsD.timing('shoryuken.process_time', event.duration * 1000)
# end
#
# @example Subscribe to all events
# Shoryuken.monitor.subscribe do |event|
# logger.info("Event: #{event.name}")
# end
def self.monitor
@_monitor ||= Instrumentation::Notifications.new
end

# Resets the monitor instance (useful for testing)
#
# @return [void]
# @api private
def self.reset_monitor!
@_monitor = nil
end

def_delegators(
:shoryuken_options,
:active_job?,
Expand Down
18 changes: 18 additions & 0 deletions lib/shoryuken/instrumentation.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

require_relative 'instrumentation/event'
require_relative 'instrumentation/notifications'
require_relative 'instrumentation/logger_listener'

module Shoryuken
# Instrumentation module providing pub/sub event notifications.
# Inspired by Karafka's instrumentation architecture.
#
# @example Subscribing to events
# Shoryuken.monitor.subscribe('message.processed') do |event|
# StatsD.timing('shoryuken.process_time', event.duration * 1000)
# end
#
module Instrumentation
end
end
62 changes: 62 additions & 0 deletions lib/shoryuken/instrumentation/event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# frozen_string_literal: true

module Shoryuken
module Instrumentation
# Represents an instrumentation event with metadata.
# Events are published through the Notifications system and contain
# information about what happened, when, and relevant context.
#
# @example Creating an event
# event = Event.new('message.processed', queue: 'default', duration: 0.5)
# event.name # => 'message.processed'
# event[:queue] # => 'default'
# event.duration # => 0.5
#
class Event
# @return [String] the event name (e.g., 'message.processed')
attr_reader :name

# @return [Hash] the event payload containing contextual data
attr_reader :payload

# @return [Time] when the event was created
attr_reader :time

# Creates a new Event instance
#
# @param name [String] the event name using dot notation (e.g., 'message.processed')
# @param payload [Hash] contextual data for the event
def initialize(name, payload = {})
@name = name
@payload = payload
@time = Time.now
end

# Accesses a value from the payload by key
#
# @param key [Symbol, String] the payload key
# @return [Object, nil] the value or nil if not found
def [](key)
payload[key]
end

# Returns the duration from the payload if present
#
# @return [Float, nil] the duration in seconds or nil
def duration
payload[:duration]
end

# Returns a hash representation of the event
#
# @return [Hash] the event as a hash
def to_h
{
name: name,
payload: payload,
time: time
}
end
end
end
end
95 changes: 95 additions & 0 deletions lib/shoryuken/instrumentation/logger_listener.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# frozen_string_literal: true

module Shoryuken
module Instrumentation
# Default listener that logs instrumentation events.
# This provides human-readable log output for key Shoryuken events.
#
# @example Subscribing the logger listener
# Shoryuken.monitor.subscribe(&LoggerListener.new.method(:call))
#
class LoggerListener
# Creates a new LoggerListener
#
# @param logger [Logger] the logger to use (defaults to Shoryuken.logger)
def initialize(logger = nil)
@logger = logger
end

# Returns the logger instance
#
# @return [Logger] the logger
def logger
@logger || Shoryuken.logger
end

# Handles an instrumentation event by logging it appropriately
#
# @param event [Event] the event to handle
# @return [void]
def call(event)
case event.name
when 'app.started'
log_app_started(event)
when 'app.stopping'
log_app_stopping(event)
when 'app.stopped'
log_app_stopped(event)
when 'message.processed'
log_message_processed(event)
when 'message.failed'
log_message_failed(event)
when 'error.occurred'
log_error_occurred(event)
when 'queue.polling'
log_queue_polling(event)
end
end

private

def log_app_started(event)
groups = event[:groups] || []
logger.info { "Shoryuken started with #{groups.size} group(s)" }
end

def log_app_stopping(_event)
logger.info { 'Shoryuken shutting down...' }
end

def log_app_stopped(_event)
logger.info { 'Shoryuken stopped' }
end

def log_message_processed(event)
duration_ms = event.duration ? (event.duration * 1000).round(2) : 0
worker = event[:worker] || 'Unknown'
queue = event[:queue] || 'Unknown'

logger.info { "Processed #{worker}/#{queue} in #{duration_ms}ms" }
end

def log_message_failed(event)
worker = event[:worker] || 'Unknown'
queue = event[:queue] || 'Unknown'
error = event[:error]
error_message = error.respond_to?(:message) ? error.message : error.to_s

logger.error { "Failed #{worker}/#{queue}: #{error_message}" }
end

def log_error_occurred(event)
error = event[:error]
error_class = error.respond_to?(:class) ? error.class.name : 'Unknown'
error_message = error.respond_to?(:message) ? error.message : error.to_s

logger.error { "Error occurred: #{error_class} - #{error_message}" }
end

def log_queue_polling(event)
queue = event[:queue] || 'Unknown'
logger.debug { "Polling queue: #{queue}" }
end
end
end
end
Loading