Skip to content
26 changes: 26 additions & 0 deletions lib/shoryuken.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,32 @@ def self.healthy?
Shoryuken::Runner.instance.healthy?
end

# Returns the global instrumentation monitor.
# Use this to subscribe to Shoryuken lifecycle events.
#
# @return [Shoryuken::Instrumentation::Notifications] the monitor instance
#
# @example Subscribe to message processing events
# Shoryuken.monitor.subscribe('message.processed') do |event|
# StatsD.timing('shoryuken.process_time', event.duration * 1000)
# end
#
# @example Subscribe to all events
# Shoryuken.monitor.subscribe do |event|
# logger.info("Event: #{event.name}")
# end
def self.monitor
@_monitor ||= Instrumentation::Notifications.new
end

# Resets the monitor instance (useful for testing)
#
# @return [void]
# @api private
def self.reset_monitor!
@_monitor = nil
end

def_delegators(
:shoryuken_options,
:active_job?,
Expand Down
18 changes: 18 additions & 0 deletions lib/shoryuken/instrumentation.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

require_relative 'instrumentation/event'
require_relative 'instrumentation/notifications'
require_relative 'instrumentation/logger_listener'

module Shoryuken
# Instrumentation module providing pub/sub event notifications.
# Inspired by Karafka's instrumentation architecture.
#
# @example Subscribing to events
# Shoryuken.monitor.subscribe('message.processed') do |event|
# StatsD.timing('shoryuken.process_time', event.duration * 1000)
# end
#
module Instrumentation
end
end
62 changes: 62 additions & 0 deletions lib/shoryuken/instrumentation/event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# frozen_string_literal: true

module Shoryuken
module Instrumentation
# Represents an instrumentation event with metadata.
# Events are published through the Notifications system and contain
# information about what happened, when, and relevant context.
#
# @example Creating an event
# event = Event.new('message.processed', queue: 'default', duration: 0.5)
# event.name # => 'message.processed'
# event[:queue] # => 'default'
# event.duration # => 0.5
#
class Event
# @return [String] the event name (e.g., 'message.processed')
attr_reader :name

# @return [Hash] the event payload containing contextual data
attr_reader :payload

# @return [Time] when the event was created
attr_reader :time

# Creates a new Event instance
#
# @param name [String] the event name using dot notation (e.g., 'message.processed')
# @param payload [Hash] contextual data for the event
def initialize(name, payload = {})
@name = name
@payload = payload
@time = Time.now
end

# Accesses a value from the payload by key
#
# @param key [Symbol, String] the payload key
# @return [Object, nil] the value or nil if not found
def [](key)
payload[key]
end

# Returns the duration from the payload if present
#
# @return [Float, nil] the duration in seconds or nil
def duration
payload[:duration]
end

# Returns a hash representation of the event
#
# @return [Hash] the event as a hash
def to_h
{
name: name,
payload: payload,
time: time
}
end
end
end
end
95 changes: 95 additions & 0 deletions lib/shoryuken/instrumentation/logger_listener.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# frozen_string_literal: true

module Shoryuken
module Instrumentation
# Default listener that logs instrumentation events.
# This provides human-readable log output for key Shoryuken events.
#
# @example Subscribing the logger listener
# Shoryuken.monitor.subscribe(&LoggerListener.new.method(:call))
#
class LoggerListener
# Creates a new LoggerListener
#
# @param logger [Logger] the logger to use (defaults to Shoryuken.logger)
def initialize(logger = nil)
@logger = logger
end

# Returns the logger instance
#
# @return [Logger] the logger
def logger
@logger || Shoryuken.logger
end

# Handles an instrumentation event by logging it appropriately
#
# @param event [Event] the event to handle
# @return [void]
def call(event)
case event.name
when 'app.started'
log_app_started(event)
when 'app.stopping'
log_app_stopping(event)
when 'app.stopped'
log_app_stopped(event)
when 'message.processed'
log_message_processed(event)
when 'message.failed'
log_message_failed(event)
when 'error.occurred'
log_error_occurred(event)
when 'queue.polling'
log_queue_polling(event)
end
end

private

def log_app_started(event)
groups = event[:groups] || []
logger.info { "Shoryuken started with #{groups.size} group(s)" }
end

def log_app_stopping(_event)
logger.info { 'Shoryuken shutting down...' }
end

def log_app_stopped(_event)
logger.info { 'Shoryuken stopped' }
end

def log_message_processed(event)
duration_ms = event.duration ? (event.duration * 1000).round(2) : 0
worker = event[:worker] || 'Unknown'
queue = event[:queue] || 'Unknown'

logger.info { "Processed #{worker}/#{queue} in #{duration_ms}ms" }
end

def log_message_failed(event)
worker = event[:worker] || 'Unknown'
queue = event[:queue] || 'Unknown'
error = event[:error]
error_message = error.respond_to?(:message) ? error.message : error.to_s

logger.error { "Failed #{worker}/#{queue}: #{error_message}" }
end

def log_error_occurred(event)
error = event[:error]
error_class = error.respond_to?(:class) ? error.class.name : 'Unknown'
error_message = error.respond_to?(:message) ? error.message : error.to_s

logger.error { "Error occurred: #{error_class} - #{error_message}" }
end

def log_queue_polling(event)
queue = event[:queue] || 'Unknown'
logger.debug { "Polling queue: #{queue}" }
end
end
end
end
155 changes: 155 additions & 0 deletions lib/shoryuken/instrumentation/notifications.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# frozen_string_literal: true

module Shoryuken
module Instrumentation
# A thread-safe pub/sub notification system for instrumentation events.
# Inspired by Karafka's instrumentation architecture, this allows external
# systems (APM, logging, metrics) to subscribe to Shoryuken lifecycle events.
#
# @example Subscribing to specific events
# Shoryuken.monitor.subscribe('message.processed') do |event|
# StatsD.timing('shoryuken.process_time', event.duration * 1000)
# end
#
# @example Subscribing to all events
# Shoryuken.monitor.subscribe do |event|
# logger.info("Event: #{event.name}")
# end
#
# @example Instrumenting a block
# Shoryuken.monitor.instrument('message.processed', queue: 'default') do
# process_message
# end
#
class Notifications
# List of all supported events in the system
EVENTS = %w[
app.started
app.stopping
app.stopped
app.quiet

message.received
message.processed
message.failed
message.deleted

worker.started
worker.completed
worker.failed

queue.polling
queue.empty

error.occurred
].freeze

# Creates a new Notifications instance
def initialize
@subscribers = Hash.new { |h, k| h[k] = [] }
@mutex = Mutex.new
end

# Subscribes to events
#
# @param event_name [String, nil] the event name to subscribe to, or nil for all events
# @yield [Event] block called when matching events are published
# @return [void]
#
# @example Subscribe to specific event
# subscribe('message.processed') { |event| puts event.name }
#
# @example Subscribe to all events
# subscribe { |event| puts event.name }
def subscribe(event_name = nil, &block)
@mutex.synchronize do
if event_name
@subscribers[event_name] << block
else
@subscribers[:all] << block
end
end
end

# Unsubscribes a block from events
#
# @param event_name [String, nil] the event name to unsubscribe from, or nil for all events
# @param block [Proc] the block to unsubscribe
# @return [void]
def unsubscribe(event_name = nil, &block)
@mutex.synchronize do
key = event_name || :all
@subscribers[key].delete(block)
end
end

# Instruments a block of code, measuring its duration and publishing an event
#
# @param event_name [String] the event name to publish
# @param payload [Hash] additional data to include in the event
# @yield the code block to instrument
# @return [Object] the result of the block
#
# @example
# monitor.instrument('message.processed', queue: 'default') do
# worker.perform(message)
# end
def instrument(event_name, payload = {})
started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
result = yield if block_given?
duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at

event = Event.new(event_name, payload.merge(duration: duration))
publish(event)
result
end

# Publishes an event to all matching subscribers
#
# @param event_or_name [Event, String] an Event instance or event name
# @param payload [Hash] payload hash (only used if first arg is a String)
# @return [void]
#
# @example With Event instance
# publish(Event.new('message.processed', queue: 'default'))
#
# @example With name and payload
# publish('message.processed', queue: 'default')
def publish(event_or_name, payload = {})
event = event_or_name.is_a?(Event) ? event_or_name : Event.new(event_or_name, payload)

subscribers_for_event = @mutex.synchronize do
@subscribers[event.name] + @subscribers[:all]
end

subscribers_for_event.each do |subscriber|
subscriber.call(event)
rescue StandardError => e
# Log but don't raise - subscribers should not break the main flow
Shoryuken.logger.error { "Instrumentation subscriber error: #{e.message}" }
Shoryuken.logger.error { e.backtrace.join("\n") } if e.backtrace
end
end

# Clears all subscribers (useful for testing)
#
# @return [void]
def clear
@mutex.synchronize do
@subscribers.clear
end
end

# Returns the number of subscribers for an event
#
# @param event_name [String, nil] the event name, or nil for global subscribers
# @return [Integer] the subscriber count
def subscriber_count(event_name = nil)
@mutex.synchronize do
key = event_name || :all
@subscribers[key].size
end
end
end
end
end
Loading