Skip to content

Conversation

@mensfeld
Copy link
Collaborator

@mensfeld mensfeld commented Dec 11, 2025

Summary

Implement a pub/sub notification system inspired by Karafka's instrumentation architecture. This enables better monitoring, observability, and integration with APM tools like StatsD, NewRelic, Datadog, etc.

Key Components

  • Shoryuken::Instrumentation::Notifications - Thread-safe pub/sub event bus
  • Shoryuken::Instrumentation::Event - Event wrapper with metadata (name, payload, time, duration)
  • Shoryuken::Instrumentation::LoggerListener - Default logging subscriber
  • Shoryuken.monitor - Global accessor for the notifications instance

Supported Events

Event Description
app.started Fired when Shoryuken starts
app.stopping Fired when Shoryuken begins shutdown
app.stopped Fired when Shoryuken has stopped
app.quiet Fired when entering quiet mode
message.processed Fired after successful message processing (includes duration)
message.failed Fired when message processing fails
error.occurred Fired when an error occurs

Usage Examples

# Subscribe to specific events
Shoryuken.monitor.subscribe('message.processed') do |event|
  StatsD.timing('shoryuken.process_time', event.duration * 1000)
end

# Subscribe to all events (for APM integration)
Shoryuken.monitor.subscribe do |event|
  NewRelic::Agent.record_custom_event('Shoryuken', event.payload.merge(event: event.name))
end

# Error tracking
Shoryuken.monitor.subscribe('error.occurred') do |event|
  Sentry.capture_exception(event[:error])
end

# Instrument custom code
Shoryuken.monitor.instrument('custom.operation', key: 'value') do
  # your code here
end

Backward Compatibility

  • Existing fire_event callbacks continue to work
  • Shoryuken.on(:startup) etc. continue to work unchanged
  • Legacy events are also published to the new monitor with mapped names

Test plan

  • Added 39 unit tests for Event, Notifications, and LoggerListener classes
  • Added integration test verifying events are published during message processing
  • All existing tests pass (512 examples, 0 failures)
  • CI passes

References

Summary by CodeRabbit

  • New Features

    • Adds a global instrumentation namespace and thread-safe monitoring/pub‑sub system for events.
    • Introduces an Event type carrying name, payload, timestamp and duration; exposes monitor access and a reset mechanism.
    • Emits structured instrumentation events for lifecycle, fetcher, manager, and message processing (with legacy lifecycle mapping); logging routed via a readable instrumentation listener.
  • Tests

    • Comprehensive unit and integration tests covering publishing, subscriptions, error handling, durations, thread safety, and logging.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 11, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds an instrumentation subsystem: Notifications pub/sub, Event value object, LoggerListener, monitor singleton accessors, and instrumentation hooks in Processor, Fetcher, Manager, Launcher, and Util, plus comprehensive unit and integration tests.

Changes

Cohort / File(s) Summary
Core entrypoints
lib/shoryuken.rb, lib/shoryuken/instrumentation.rb
Adds Shoryuken.monitor (lazy singleton Instrumentation::Notifications) and Shoryuken.reset_monitor!; introduces Shoryuken::Instrumentation namespace.
Instrumentation primitives
lib/shoryuken/instrumentation/event.rb
Adds Shoryuken::Instrumentation::Event value object (name, payload, time, duration, [], to_h).
Pub/Sub system
lib/shoryuken/instrumentation/notifications.rb
Adds Shoryuken::Instrumentation::Notifications with EVENTS and thread-safe subscribe/unsubscribe/publish/instrument/clear/subscriber_count; emits error.occurred on exceptions.
Logging listener
lib/shoryuken/instrumentation/logger_listener.rb
Adds Shoryuken::Instrumentation::LoggerListener implementing call(event) with handlers for app, fetcher, manager, message, error, and queue events to format/log contextual messages.
Processor integration
lib/shoryuken/processor.rb
Wraps worker invocation with Shoryuken.monitor.instrument('message.processed', message_payload) and adds message_payload helper.
Fetcher / Manager / Launcher
lib/shoryuken/fetcher.rb, lib/shoryuken/manager.rb, lib/shoryuken/launcher.rb
Replaces internal debug/error logs with monitoring events (e.g., fetcher.started/completed/retry, manager.dispatch/processor_assigned/failed); removes some local startup/shutdown logs and uses safe callback invocation.
Legacy lifecycle mapping
lib/shoryuken/util.rb
Adds LEGACY_EVENT_MAP and updates fire_event to publish mapped instrumentation events and include legacy_event in payload.
Tests
spec/integration/instrumentation/..., spec/lib/shoryuken/instrumentation/*, spec/lib/shoryuken/monitor_spec.rb, spec/lib/shoryuken/processor_spec.rb, spec/lib/shoryuken/util_spec.rb, spec/lib/shoryuken/fetcher_spec.rb
Adds unit and integration specs for Notifications, Event, LoggerListener, monitor singleton/reset, Processor instrumentation, Fetcher event changes, Util lifecycle mapping, and thread-safety behavior.
Tests - updates
spec/lib/shoryuken/fetcher_spec.rb, spec/lib/shoryuken/manager_spec.rb
Adapts tests to assert event publications (fetcher) and to use SQS message doubles for manager tests.

Sequence Diagram(s)

sequenceDiagram
    participant Queue as Queue / Consumer
    participant Processor
    participant Monitor as Shoryuken.monitor
    participant LoggerListener
    participant Logger

    Queue->>Processor: deliver message(s)
    Processor->>Monitor: instrument("message.processed", payload) { block => invoke middleware + worker.perform }
    Monitor->>Monitor: record start time
    Monitor->>Processor: yield to block
    alt success
        Processor-->>Monitor: block returns
        Monitor->>Monitor: compute duration, build Event
        Monitor->>LoggerListener: publish(Event)
        LoggerListener->>Logger: info/debug log
    else exception
        Processor->>Monitor: block raises
        Monitor->>Monitor: publish("error.occurred", payload with error)
        Monitor->>LoggerListener: publish(error Event)
        LoggerListener->>Logger: error log with backtrace/details
        Monitor->>Processor: re-raise exception
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~30 minutes

  • Review thread-safety and Mutex usage in lib/shoryuken/instrumentation/notifications.rb.
  • Verify that publishing continues when a subscriber raises and subscriber isolation behavior.
  • Confirm message_payload handles single vs batch messages correctly in processor.rb.
  • Check LoggerListener formatting for missing fields and backtrace handling.
  • Validate lifecycle mapping and that fire_event includes legacy_event as intended.

Poem

🐇 I timed a hop, I watched a queue,

Events now whisper what workers do.
Subscribers gather, logs hum along,
Little rabbit, big observability song. 🎩

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 77.27% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and accurately summarizes the main objective of the changeset: introducing a Karafka-style instrumentation/events system as the primary feature.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch instrumentation-system

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between acf9f40 and d8bd260.

📒 Files selected for processing (1)
  • spec/lib/shoryuken/manager_spec.rb (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
spec/lib/shoryuken/manager_spec.rb (2)
spec/integrations_helper.rb (2)
  • uuid (76-78)
  • double (198-200)
lib/shoryuken/options.rb (1)
  • polling_strategy (111-127)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Integrations (3.4)
  • GitHub Check: Integrations (3.2)
  • GitHub Check: Integrations (3.3)
  • GitHub Check: Integrations (3.2)
  • GitHub Check: Integrations (3.3)
  • GitHub Check: Integrations (3.4)
🔇 Additional comments (4)
spec/lib/shoryuken/manager_spec.rb (4)

17-20: LGTM! Well-designed test helper.

The sqs_message helper promotes consistency and DRY principles across the test suite. The default parameters and use of SecureRandom.uuid for IDs and receipt handles are appropriate for test scenarios, and the double properly mimics the Shoryuken::Message interface needed for the instrumentation system.


72-72: LGTM! Proper usage of the helper.

The helper is correctly used to generate a message double with a specific ID for this test case.


109-109: LGTM! Consistent batch message creation.

The helper is properly used to create three distinct message doubles for batch processing tests, with each message having a unique identifier.


150-159: LGTM! Important test updates for instrumentation compatibility.

The typo fix improves clarity, and the updated expectations now properly verify that assign receives message objects rather than primitives. This change is essential for compatibility with the new instrumentation system that operates on message objects.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Implement a pub/sub notification system inspired by Karafka's
instrumentation architecture. This enables better monitoring,
observability, and integration with APM tools.

Key components:
- Shoryuken::Instrumentation::Notifications - thread-safe pub/sub event bus
- Shoryuken::Instrumentation::Event - event wrapper with metadata
- Shoryuken::Instrumentation::LoggerListener - default logging subscriber
- Shoryuken.monitor - global accessor for the notifications instance

Supported events:
- app.started, app.stopping, app.stopped, app.quiet (lifecycle)
- message.processed, message.failed (message processing)
- error.occurred (errors)

Usage:
```ruby
# Subscribe to specific events
Shoryuken.monitor.subscribe('message.processed') do |event|
  StatsD.timing('shoryuken.process_time', event.duration * 1000)
end

# Subscribe to all events
Shoryuken.monitor.subscribe do |event|
  logger.info("Event: #{event.name}")
end
```

Backward compatibility:
- Existing fire_event callbacks continue to work
- Shoryuken.on(:startup) etc. continue to work
- Legacy events are also published to the new monitor
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (6)
spec/integration/instrumentation/instrumentation_spec.rb (1)

36-41: Consider if the sleep 1 is necessary.

The explicit sleep 1 before polling could introduce flakiness or slow down tests unnecessarily. If poll_queues_until already handles synchronization properly, the sleep might be redundant. If it's required for LocalStack queue propagation, consider adding a comment explaining the reason.

 # Send test messages
 2.times { |i| Shoryuken::Client.queues(queue_name).send_message(message_body: "instrumentation-test-#{i}") }
 
-sleep 1
+# Allow time for messages to be available in LocalStack queue
+sleep 1
 
 poll_queues_until { DT[:processed].size >= 2 }
spec/lib/shoryuken/instrumentation/logger_listener_spec.rb (1)

20-116: Specs only verify that logging is invoked, not the actual log message content

The examples use and_yield.and_return('...'), which only asserts that info/error/debug are called with a block; the literal strings passed to and_return are not actually validated anywhere. Given the example descriptions (“logs info message”, “logs error message”), consider either:

  • Asserting on the formatted message (e.g., capturing the block and evaluating it), or
  • Dropping the unused and_return('...') and renaming examples to focus on “invokes logger with a block”.

This would better align expectations with what the specs guarantee while keeping them simple.

spec/lib/shoryuken/instrumentation/notifications_spec.rb (2)

80-89: Tight duration assertion may become flaky under load

The instrument spec sleeps for 0.01 seconds and asserts events.first.duration is >= 0.01. On a contended or slow CI environment with coarse timer resolution, this can occasionally be brittle.

To make the test more robust, consider either:

  • Asserting duration is > 0 (or a much smaller threshold), or
  • Stubbing Process.clock_gettime to control the measured duration.

This preserves the intent (non-zero measured duration) with less risk of intermittent failures.


151-176: Address RuboCop SpaceInsideBlockBraces offenses in empty blocks

RuboCop is flagging Layout/SpaceInsideBlockBraces for several empty blocks like:

notifications.subscribe('message.processed') { }
notifications.subscribe { }

To satisfy the style guide and keep the tests lint‑clean, you can drop the inner spaces:

-      notifications.subscribe('message.processed') { }
-      notifications.subscribe { }
+      notifications.subscribe('message.processed') {}
+      notifications.subscribe {}

Apply the same change to other empty block usages in this file.

Also applies to: 191-191

lib/shoryuken/instrumentation/logger_listener.rb (1)

64-71: Minor polish for missing durations and error handling in logger listener

Two small improvements to consider:

  1. Missing duration for message.processed

Currently, a missing duration logs as 0ms:

duration_ms = event.duration ? (event.duration * 1000).round(2) : 0
logger.info { "Processed #{worker}/#{queue} in #{duration_ms}ms" }

If you want logs to distinguish “unknown / not measured” from genuinely fast executions, you could:

-      def log_message_processed(event)
-        duration_ms = event.duration ? (event.duration * 1000).round(2) : 0
+      def log_message_processed(event)
+        duration_ms = event.duration ? (event.duration * 1000).round(2) : nil
         worker = event[:worker] || 'Unknown'
         queue = event[:queue] || 'Unknown'
-
-        logger.info { "Processed #{worker}/#{queue} in #{duration_ms}ms" }
+        message =
+          if duration_ms
+            "Processed #{worker}/#{queue} in #{duration_ms}ms"
+          else
+            "Processed #{worker}/#{queue}"
+          end
+
+        logger.info { message }
       end
  1. Simplify log_error_occurred and better handle nil errors

You can simplify and make the “no error” case clearer:

-      def log_error_occurred(event)
-        error = event[:error]
-        error_class = error.respond_to?(:class) ? error.class.name : 'Unknown'
-        error_message = error.respond_to?(:message) ? error.message : error.to_s
-
-        logger.error { "Error occurred: #{error_class} - #{error_message}" }
-      end
+      def log_error_occurred(event)
+        error = event[:error]
+        error_class = error&.class&.name || 'Unknown'
+        error_message = error&.message || error.to_s
+
+        logger.error { "Error occurred: #{error_class} - #{error_message}" }
+      end

Both changes are optional but make logs slightly clearer and the code a bit more direct.

Also applies to: 81-87

lib/shoryuken/instrumentation/notifications.rb (1)

64-72: Guard subscribe against missing blocks to avoid nil subscribers

subscribe assumes a block is given, but doesn’t enforce it:

def subscribe(event_name = nil, &block)
  @mutex.synchronize do
    if event_name
      @subscribers[event_name] << block
    else
      @subscribers[:all] << block
    end
  end
end

If someone accidentally calls subscribe('message.processed') without a block, a nil subscriber is stored and publish will later hit subscriber.call(event) and raise.

Adding a simple guard makes the API safer and fails fast at the call site:

def subscribe(event_name = nil, &block)
+  raise ArgumentError, 'block is required' unless block
+
   @mutex.synchronize do
     if event_name
       @subscribers[event_name] << block
     else
       @subscribers[:all] << block
     end
   end
 end

This doesn’t change any existing tested behavior, but prevents hard‑to‑trace runtime errors from misuses.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bc3e913 and c0ae469.

📒 Files selected for processing (11)
  • lib/shoryuken.rb (1 hunks)
  • lib/shoryuken/instrumentation.rb (1 hunks)
  • lib/shoryuken/instrumentation/event.rb (1 hunks)
  • lib/shoryuken/instrumentation/logger_listener.rb (1 hunks)
  • lib/shoryuken/instrumentation/notifications.rb (1 hunks)
  • lib/shoryuken/processor.rb (2 hunks)
  • lib/shoryuken/util.rb (1 hunks)
  • spec/integration/instrumentation/instrumentation_spec.rb (1 hunks)
  • spec/lib/shoryuken/instrumentation/event_spec.rb (1 hunks)
  • spec/lib/shoryuken/instrumentation/logger_listener_spec.rb (1 hunks)
  • spec/lib/shoryuken/instrumentation/notifications_spec.rb (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
spec/lib/shoryuken/instrumentation/event_spec.rb (1)
lib/shoryuken/instrumentation/event.rb (2)
  • duration (46-48)
  • to_h (53-59)
lib/shoryuken/util.rb (5)
lib/shoryuken/instrumentation/notifications.rb (1)
  • publish (118-132)
lib/shoryuken/options.rb (1)
  • logger (163-165)
lib/shoryuken/instrumentation/logger_listener.rb (1)
  • logger (22-24)
lib/shoryuken/logging.rb (2)
  • logger (39-41)
  • logger (47-49)
lib/shoryuken.rb (1)
  • monitor (53-55)
lib/shoryuken/instrumentation/notifications.rb (3)
lib/shoryuken/instrumentation/event.rb (3)
  • initialize (29-33)
  • key (39-41)
  • duration (46-48)
lib/shoryuken/instrumentation/logger_listener.rb (4)
  • initialize (11-93)
  • initialize (15-17)
  • call (30-47)
  • logger (22-24)
lib/shoryuken/util.rb (1)
  • logger (10-12)
lib/shoryuken/processor.rb (3)
lib/shoryuken.rb (1)
  • monitor (53-55)
lib/shoryuken/instrumentation/notifications.rb (2)
  • instrument (97-105)
  • publish (118-132)
spec/integration/instrumentation/instrumentation_spec.rb (1)
  • perform (28-30)
lib/shoryuken/instrumentation/logger_listener.rb (2)
lib/shoryuken/instrumentation/event.rb (2)
  • initialize (29-33)
  • duration (46-48)
lib/shoryuken/util.rb (1)
  • logger (10-12)
🪛 RuboCop (1.81.7)
spec/lib/shoryuken/instrumentation/notifications_spec.rb

[convention] 151-151: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 152-152: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 166-166: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 167-167: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 168-168: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 175-175: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 176-176: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 191-191: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Integrations (3.2)
  • GitHub Check: Integrations (3.4)
  • GitHub Check: Integrations (3.3)
  • GitHub Check: Integrations (3.2)
  • GitHub Check: Integrations (3.4)
  • GitHub Check: Integrations (3.3)
🔇 Additional comments (9)
lib/shoryuken/util.rb (2)

14-20: LGTM! Clear mapping of legacy events to new instrumentation names.

The LEGACY_EVENT_MAP correctly translates the four lifecycle events (startup, quiet, shutdown, stopped) to the new naming convention (app.*). The fallback to "legacy.#{event}" for unmapped events provides forward compatibility.


29-44: Good integration of the new instrumentation system with legacy callbacks.

The approach of publishing to the new instrumentation system first, then invoking legacy handlers, ensures subscribers always receive events regardless of legacy callback behavior. Merging legacy_event into the payload provides useful context for subscribers that need to know the original event name.

One consideration: the event_options hash is mutated by merge (creating a new hash) before being passed to monitor.publish, but the original event_options is passed to legacy handlers. This is correct and intentional.

spec/lib/shoryuken/instrumentation/event_spec.rb (1)

5-68: Comprehensive test coverage for the Event class.

The spec effectively covers:

  • Initialization with name, payload, and automatic time capture
  • Default payload behavior (empty hash)
  • Payload access via #[] with nil for missing keys
  • Duration extraction from payload
  • Hash representation via #to_h

The time-freezing pattern is correctly implemented.

spec/integration/instrumentation/instrumentation_spec.rb (1)

46-56: Good verification of instrumentation event payloads.

The assertions properly verify that message.processed events contain the expected data (queue name, message_id, duration) and validate that duration is non-negative.

lib/shoryuken.rb (2)

39-55: Clean API for the global instrumentation monitor.

The lazy initialization pattern is consistent with shoryuken_options. The documentation examples are helpful for users.

Minor consideration: ||= is not atomic, so in a multi-threaded environment, two Notifications instances could theoretically be created if threads race on first access. However, since the Notifications class appears to be thread-safe internally and subsequent accesses would use the same instance, this is likely acceptable. If strict single-instance guarantee is needed, consider using a Mutex.


57-63: Appropriate private API marker for testing utility.

The @api private tag correctly signals that reset_monitor! is not part of the public contract and is intended for testing purposes.

lib/shoryuken/instrumentation.rb (1)

1-18: Well-structured namespace module.

Clean setup for the instrumentation subsystem with appropriate documentation. Using require_relative for loading the component files is the idiomatic approach.

lib/shoryuken/processor.rb (1)

40-45: Instrumentation integration around worker execution looks solid

Wrapping the server middleware + perform call in Shoryuken.monitor.instrument('message.processed', message_payload) and emitting 'message.failed' in the rescue provides a clear, consistent event surface (queue, message_id, worker, error). This is a good, minimal integration point that keeps the existing control flow intact.

Also applies to: 57-58

lib/shoryuken/instrumentation/event.rb (1)

15-60: Event data structure is clear and sufficient for the instrumentation needs

The Event class cleanly exposes name, payload, time, [], duration, and to_h, which is exactly what the notifications and listeners rely on. The implementation is minimal and idiomatic; no issues spotted here.

Comment on lines 57 to 58
Shoryuken.monitor.publish('message.failed', message_payload.merge(error: e))
Array(Shoryuken.exception_handlers).each { |handler| handler.call(e, queue, sqs_msg) }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid re-invoking worker inside message_payload from the rescue path

message_payload currently calls worker&.class&.name. If Shoryuken.worker_registry.fetch_worker raises (e.g., misconfiguration), that first call happens in:

return logger.error { "No worker found for #{queue}" } unless worker

The exception is then rescued, but calling message_payload in the rescue will invoke worker again and re-raise, potentially:

  • Skipping Shoryuken.monitor.publish('message.failed', ...), and
  • Skipping configured exception_handlers.

You can avoid this by basing the worker name on the memoized ivar instead of the worker accessor, so the rescue path never tries to resolve the worker again.

-    def message_payload
-      {
-        queue: queue,
-        message_id: sqs_msg.is_a?(Array) ? sqs_msg.map(&:message_id) : sqs_msg.message_id,
-        worker: worker&.class&.name
-      }
-    end
+    def message_payload
+      current_worker = instance_variable_defined?(:@_worker) ? @_worker : nil
+
+      {
+        queue: queue,
+        message_id: sqs_msg.is_a?(Array) ? sqs_msg.map(&:message_id) : sqs_msg.message_id,
+        worker: current_worker&.class&.name
+      }
+    end

This keeps successful-path behavior the same while making the error path safer.

Also applies to: 65-74

🤖 Prompt for AI Agents
In lib/shoryuken/processor.rb around lines 57-58 (and similarly for 65-74), the
rescue path calls message_payload which invokes the worker accessor and may
re-raise if Shoryuken.worker_registry.fetch_worker fails; change message_payload
usage in the rescue to use the memoized instance variable (e.g., @worker) for
deriving the worker class/name instead of calling the worker method so the error
path does not attempt to resolve the worker again, ensuring
Shoryuken.monitor.publish('message.failed', ...) and exception_handlers still
run reliably.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements a Karafka-inspired instrumentation/events system to enable better monitoring and observability in Shoryuken. The implementation adds a thread-safe pub/sub notification system while maintaining full backward compatibility with existing lifecycle callbacks.

Key Changes:

  • Adds pub/sub event notification system for lifecycle events (app lifecycle, message processing, errors)
  • Provides structured instrumentation with automatic duration tracking for message processing
  • Maintains backward compatibility by publishing legacy events to the new system

Reviewed changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
lib/shoryuken/instrumentation/event.rb New Event wrapper class for instrumentation events with metadata
lib/shoryuken/instrumentation/notifications.rb Thread-safe pub/sub notification system with subscribe/publish/instrument methods
lib/shoryuken/instrumentation/logger_listener.rb Default logging subscriber for structured event logging
lib/shoryuken/instrumentation.rb Module definition and requires for instrumentation components
lib/shoryuken.rb Adds global Shoryuken.monitor accessor and reset_monitor! helper
lib/shoryuken/util.rb Maps legacy lifecycle events to new instrumentation event names
lib/shoryuken/processor.rb Instruments message processing with message.processed and message.failed events
spec/lib/shoryuken/instrumentation/event_spec.rb Unit tests for Event class
spec/lib/shoryuken/instrumentation/notifications_spec.rb Unit tests for Notifications including thread safety tests
spec/lib/shoryuken/instrumentation/logger_listener_spec.rb Unit tests for LoggerListener (contains test expectation issues)
spec/lib/shoryuken/monitor_spec.rb Integration tests for monitor accessor
spec/lib/shoryuken/util_spec.rb Tests for legacy event mapping
spec/lib/shoryuken/processor_spec.rb Tests for instrumentation in message processing
spec/integration/instrumentation/instrumentation_spec.rb End-to-end integration test verifying events during message processing

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

it 'logs info message' do
event = Shoryuken::Instrumentation::Event.new('app.stopped')

expect(logger).to receive(:info).and_yield.and_return('Shoryuken stopped')
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as previous test cases - the test expectation uses .and_yield.and_return(...) which doesn't properly validate the log message. The .and_return() part is misleading.

Copilot uses AI. Check for mistakes.
duration: 0.12345
)

expect(logger).to receive(:info).and_yield.and_return('Processed TestWorker/default in 123.45ms')
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as previous test cases - the test expectation uses .and_yield.and_return(...) which doesn't properly validate the log message. The .and_return() part is misleading.

Copilot uses AI. Check for mistakes.
error: error
)

expect(logger).to receive(:error).and_yield.and_return('Failed TestWorker/default: Something went wrong')
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as previous test cases - the test expectation uses .and_yield.and_return(...) which doesn't properly validate the log message. The .and_return() part is misleading.

Copilot uses AI. Check for mistakes.
error: error
)

expect(logger).to receive(:error).and_yield.and_return('Error occurred: ArgumentError - Invalid argument')
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as previous test cases - the test expectation uses .and_yield.and_return(...) which doesn't properly validate the log message. The .and_return() part is misleading.

Copilot uses AI. Check for mistakes.
it 'logs debug message' do
event = Shoryuken::Instrumentation::Event.new('queue.polling', queue: 'default')

expect(logger).to receive(:debug).and_yield.and_return('Polling queue: default')
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as previous test cases - the test expectation uses .and_yield.and_return(...) which doesn't properly validate the log message. The .and_return() part is misleading.

Copilot uses AI. Check for mistakes.
it 'logs info message' do
event = Shoryuken::Instrumentation::Event.new('app.started', groups: %w[default priority])

expect(logger).to receive(:info).and_yield.and_return('Shoryuken started with 2 group(s)')
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test expectations are using .and_yield.and_return(...) which appears to be checking that the logger method returns the log message string. However, when logger methods like logger.info are called with a block, they don't return the block's result. The .and_return() part doesn't actually validate that the correct message is being logged.

These expectations should either:

  1. Use .and_yield without .and_return() if you just want to verify the method is called with a block, or
  2. Use a more explicit expectation that verifies the block yields the correct message string, such as:
    expect(logger).to receive(:info) do |&block|
      expect(block.call).to eq('Shoryuken started with 2 group(s)')
    end

Copilot uses AI. Check for mistakes.
it 'logs info message' do
event = Shoryuken::Instrumentation::Event.new('app.stopping')

expect(logger).to receive(:info).and_yield.and_return('Shoryuken shutting down...')
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as line 25 - the test expectation uses .and_yield.and_return(...) which doesn't properly validate that the correct message is being logged. The .and_return() part is misleading since logger methods called with blocks don't return the block's result.

Copilot uses AI. Check for mistakes.
- Make instrument() ActiveSupport::Notifications compatible:
  - Add :exception [class_name, message] to payload on error
  - Add :exception_object to payload on error
  - Always publish event via ensure (even on error)
  - Re-raise the exception after publishing

- Add Karafka-style error.occurred event on exception:
  - Publishes separate error.occurred event with :type key
  - Includes :error, :error_class, :error_message
  - Preserves original payload for context
  - Allows centralized error handling via single subscription

- Add comprehensive tests for both behaviors
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
lib/shoryuken/processor.rb (1)

37-45: Keep failure instrumentation resilient by avoiding worker re-resolution in message_payload

In the happy path this works fine, but if Shoryuken.worker_registry.fetch_worker(queue, sqs_msg) raises (e.g., misconfiguration), the first worker call in:

return logger.error { "No worker found for #{queue}" } unless worker

will raise, be caught by the outer rescue, and then message_payload will call worker again via worker&.class&.name. That re-raises and can prevent:

  • Publishing the message.failed event, and
  • Invoking exception_handlers.

You can make the error path safer by using the memoized ivar instead of the accessor inside message_payload:

-    def message_payload
-      {
-        queue: queue,
-        message_id: sqs_msg.is_a?(Array) ? sqs_msg.map(&:message_id) : sqs_msg.message_id,
-        worker: worker&.class&.name
-      }
-    end
+    def message_payload
+      current_worker = instance_variable_defined?(:@_worker) ? @_worker : nil
+
+      {
+        queue: queue,
+        message_id: sqs_msg.is_a?(Array) ? sqs_msg.map(&:message_id) : sqs_msg.message_id,
+        worker: current_worker&.class&.name
+      }
+    end

This preserves behavior when the worker is resolved successfully while ensuring the failure instrumentation path cannot be short-circuited by another registry lookup.

Also applies to: 56-74

lib/shoryuken/instrumentation/notifications.rb (1)

86-105: Ensure instrument emits events even when the block raises

Right now instrument only publishes after a successful block return; if the block raises, no duration is recorded and no event is published. For a generic instrumentation primitive this weakens observability, especially since Processor and user code will likely rely on it for timing—even on failures.

You can wrap the yield in begin...ensure so the event is always emitted while still bubbling up the original exception:

-      def instrument(event_name, payload = {})
-        started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
-        result = yield if block_given?
-        duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at
-
-        event = Event.new(event_name, payload.merge(duration: duration))
-        publish(event)
-        result
-      end
+      def instrument(event_name, payload = {})
+        started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
+        result = nil
+
+        begin
+          result = yield if block_given?
+        ensure
+          duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at
+          event = Event.new(event_name, payload.merge(duration: duration))
+          publish(event)
+        end
+
+        result
+      end

This keeps success behavior intact while guaranteeing events on error paths.

🧹 Nitpick comments (5)
spec/integration/instrumentation/instrumentation_spec.rb (1)

39-41: Consider relying solely on poll_queues_until instead of a fixed sleep

Given you already wait with poll_queues_until, the extra sleep 1 may be redundant and can slow the integration test. If LocalStack/queue propagation is stable without it, consider removing the sleep and letting the polling helper handle timing.

spec/lib/shoryuken/util_spec.rb (1)

55-62: Avoid leaking lifecycle event mutations across specs

The instrumentation examples mutate Shoryuken.options[:lifecycle_events] (e.g., setting :startup, :dispatch, :stopped) but the after :all cleanup only removes :some_event. That can leave global state changed for later specs.

You can snapshot and restore the lifecycle events around this group, for example:

-  describe '#fire_event' do
+  describe '#fire_event' do
+    let!(:original_lifecycle_events) { Shoryuken.options[:lifecycle_events].dup }
@@
-    after :all do
-      Shoryuken.options[:lifecycle_events].delete(:some_event)
-      Shoryuken.reset_monitor!
-    end
+    after :all do
+      Shoryuken.options[:lifecycle_events].replace(original_lifecycle_events)
+      Shoryuken.reset_monitor!
+    end

This keeps the tests isolated while still validating the instrumentation mapping.

Also applies to: 78-111

spec/lib/shoryuken/instrumentation/notifications_spec.rb (1)

149-191: Tighten empty block formatting to satisfy RuboCop

RuboCop is flagging spaces inside empty block braces on the subscribe { } lines (e.g., Lines 151–152, 166–168, 175–176, 191). Switching to {} will keep the suite lint‑clean:

-      notifications.subscribe('message.processed') { }
-      notifications.subscribe { }
+      notifications.subscribe('message.processed') {}
+      notifications.subscribe {}

(and similarly for the other occurrences).

lib/shoryuken/instrumentation/logger_listener.rb (2)

8-24: Clarify the usage example with a fully qualified constant

The YARD example uses LoggerListener.new without qualification. In user code outside the Shoryuken::Instrumentation namespace this would typically need the full constant:

Shoryuken.monitor.subscribe(
  &Shoryuken::Instrumentation::LoggerListener.new.method(:call)
)

Tweaking the example to be explicit will make copy‑paste usage work reliably.


72-87: Consider extracting shared error formatting helper (optional)

Both log_message_failed and log_error_occurred manually derive error_message (and, in the latter, error_class). Not required, but you could centralize this into a tiny helper (e.g., format_error(error) returning [error_class, error_message]) to keep these branches consistent and easier to extend if more error events are added later.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c0ae469 and 5f5a277.

📒 Files selected for processing (14)
  • lib/shoryuken.rb (1 hunks)
  • lib/shoryuken/instrumentation.rb (1 hunks)
  • lib/shoryuken/instrumentation/event.rb (1 hunks)
  • lib/shoryuken/instrumentation/logger_listener.rb (1 hunks)
  • lib/shoryuken/instrumentation/notifications.rb (1 hunks)
  • lib/shoryuken/processor.rb (2 hunks)
  • lib/shoryuken/util.rb (1 hunks)
  • spec/integration/instrumentation/instrumentation_spec.rb (1 hunks)
  • spec/lib/shoryuken/instrumentation/event_spec.rb (1 hunks)
  • spec/lib/shoryuken/instrumentation/logger_listener_spec.rb (1 hunks)
  • spec/lib/shoryuken/instrumentation/notifications_spec.rb (1 hunks)
  • spec/lib/shoryuken/monitor_spec.rb (1 hunks)
  • spec/lib/shoryuken/processor_spec.rb (1 hunks)
  • spec/lib/shoryuken/util_spec.rb (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • spec/lib/shoryuken/instrumentation/event_spec.rb
  • lib/shoryuken/instrumentation.rb
  • lib/shoryuken/instrumentation/event.rb
  • lib/shoryuken/util.rb
🧰 Additional context used
🧬 Code graph analysis (6)
lib/shoryuken/processor.rb (2)
lib/shoryuken.rb (1)
  • monitor (53-55)
lib/shoryuken/instrumentation/notifications.rb (2)
  • instrument (97-105)
  • publish (118-132)
spec/lib/shoryuken/processor_spec.rb (4)
lib/shoryuken.rb (2)
  • reset_monitor! (61-63)
  • monitor (53-55)
lib/shoryuken/instrumentation/notifications.rb (1)
  • subscribe (64-72)
lib/shoryuken/processor.rb (2)
  • process (20-22)
  • process (36-61)
lib/shoryuken/instrumentation/event.rb (1)
  • duration (46-48)
lib/shoryuken/instrumentation/logger_listener.rb (2)
lib/shoryuken/instrumentation/event.rb (2)
  • initialize (29-33)
  • duration (46-48)
lib/shoryuken/util.rb (1)
  • logger (10-12)
spec/lib/shoryuken/instrumentation/logger_listener_spec.rb (2)
lib/shoryuken/instrumentation/logger_listener.rb (2)
  • logger (22-24)
  • call (30-47)
lib/shoryuken/util.rb (1)
  • logger (10-12)
lib/shoryuken/instrumentation/notifications.rb (3)
lib/shoryuken/instrumentation/event.rb (3)
  • initialize (29-33)
  • key (39-41)
  • duration (46-48)
lib/shoryuken/instrumentation/logger_listener.rb (4)
  • initialize (11-93)
  • initialize (15-17)
  • call (30-47)
  • logger (22-24)
lib/shoryuken/util.rb (1)
  • logger (10-12)
spec/lib/shoryuken/instrumentation/notifications_spec.rb (5)
lib/shoryuken/instrumentation/notifications.rb (6)
  • clear (137-141)
  • subscribe (64-72)
  • publish (118-132)
  • unsubscribe (79-84)
  • instrument (97-105)
  • subscriber_count (147-152)
lib/shoryuken/instrumentation/event.rb (1)
  • duration (46-48)
lib/shoryuken/instrumentation/logger_listener.rb (1)
  • logger (22-24)
lib/shoryuken/util.rb (1)
  • logger (10-12)
lib/shoryuken/helpers/atomic_counter.rb (1)
  • increment (79-81)
🪛 RuboCop (1.81.7)
spec/lib/shoryuken/instrumentation/notifications_spec.rb

[convention] 151-151: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 152-152: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 166-166: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 167-167: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 168-168: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 175-175: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 176-176: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 191-191: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Integrations (3.3)
  • GitHub Check: Integrations (3.3)
  • GitHub Check: Integrations (3.2)
  • GitHub Check: Integrations (3.4)
🔇 Additional comments (6)
spec/lib/shoryuken/monitor_spec.rb (1)

10-48: Good coverage of monitor singleton and reset semantics

The specs exercise type, singleton reuse, event delivery, and reset_monitor! behavior, which should keep future changes to the monitor implementation honest.

lib/shoryuken.rb (1)

39-63: Monitor API is simple and fits the instrumentation design

The lazy monitor accessor plus reset_monitor! for tests is a clean way to expose instrumentation without leaking implementation details.

spec/lib/shoryuken/processor_spec.rb (1)

37-75: Instrumentation specs for Processor look solid

These examples validate both success (message.processed with payload and duration) and failure (message.failed with error) flows and properly isolate monitor state with reset_monitor!.

spec/lib/shoryuken/instrumentation/logger_listener_spec.rb (1)

5-129: LoggerListener behavior is well covered

The specs cover initialization and all supported event types (including the “no-op” branch for unknown events), which should make refactors to log formatting or event handling low-risk.

spec/lib/shoryuken/instrumentation/notifications_spec.rb (1)

12-215: Strong, comprehensive coverage of Notifications behavior and concurrency

The specs nicely exercise the full lifecycle (subscribe/unsubscribe, instrument, publish, clear, subscriber_count) including error resilience and concurrent subscribe/publish. The expectations line up with the provided implementation (monotonic duration, non-fatal subscriber errors, thread-safe subscriber access), and the concurrency tests use an atomic counter correctly. Nothing blocking here.

lib/shoryuken/instrumentation/logger_listener.rb (1)

3-93: Event-to-log mapping and fallbacks look solid

The listener cleanly routes each event type to a focused logger method, uses lazy logger resolution, and handles missing payload fields with sensible defaults ('Unknown' and simple error stringification). It aligns well with the Event API (name, duration, [] payload access) and should be safe to plug directly into Shoryuken.monitor.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
spec/lib/shoryuken/instrumentation/notifications_spec.rb (1)

74-243: Comprehensive coverage for Notifications; only minor RuboCop/style nits

The spec file does a good job exercising all key behaviors (per‑event/global subscriptions, exception handling, error.occurred semantics, publish robustness, and basic concurrency), and matches the implementation semantics.

Two minor cleanups to align with the reported RuboCop hints:

  1. Redundant RuntimeError in raise (Style/RedundantException, around Line 199)
    You can drop the explicit RuntimeError and let Ruby use it implicitly:
-          notifications.instrument('message.processed', queue: 'default') do
-            raise RuntimeError, 'something went wrong'
-          end
+          notifications.instrument('message.processed', queue: 'default') do
+            raise 'something went wrong'
+          end
  1. Spaces inside empty block braces (Layout/SpaceInsideBlockBraces, around Lines 295–320, 335)
    Tighten empty blocks to avoid inner spaces:
-      notifications.subscribe('message.processed') { }
-      notifications.subscribe { }
+      notifications.subscribe('message.processed') {}
+      notifications.subscribe {}

-      notifications.subscribe('message.processed') { }
-      notifications.subscribe('message.processed') { }
-      notifications.subscribe('message.failed') { }
+      notifications.subscribe('message.processed') {}
+      notifications.subscribe('message.processed') {}
+      notifications.subscribe('message.failed') {}

-      notifications.subscribe { }
-      notifications.subscribe { }
+      notifications.subscribe {}
+      notifications.subscribe {}

-            notifications.subscribe('message.processed') { }
+            notifications.subscribe('message.processed') {}

These are purely stylistic and won’t change behavior, but will satisfy the reported RuboCop cops.

Also applies to: 246-359, 193-205, 293-320, 330-357

lib/shoryuken/instrumentation/notifications.rb (1)

118-146: Consider narrowing rescue Exception in instrument and preserving backtrace on re‑raise

Functionally the instrumentation behavior is correct and matches the specs (duration always measured, message.processed emitted with optional exception fields, and error.occurred emitted on failures). Two small refinements to consider:

  1. Rescue scope (Lint/RescueException)
    instrument currently rescues Exception, which includes SystemExit, NoMemoryError, SignalException, etc.:
rescue Exception => e

Unless you explicitly want to treat these non‑StandardError throwables as regular failures for instrumentation, consider narrowing this to StandardError so truly fatal conditions are not routed through the instrumentation pipeline:

-        rescue Exception => e
+        rescue StandardError => e

You’ll still get full coverage for typical runtime errors (including worker failures) while matching common Ruby style and the RuboCop hint.

  1. Re‑raising while keeping the original backtrace
    Inside a rescue, raise e can reset the top of the backtrace to the instrumentation method. To preserve the original backtrace frames more faithfully, you can use a bare raise:
-          raise e
+          raise

This doesn’t change the observable control flow, but gives error subscribers (e.g., Sentry) a slightly cleaner stack trace centered on the original failure site.

Both are optional and can be adopted if they align with your error-handling intent for instrument.

Also applies to: 64-72

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5f5a277 and 442bd62.

📒 Files selected for processing (4)
  • lib/shoryuken/instrumentation/notifications.rb (1 hunks)
  • lib/shoryuken/processor.rb (2 hunks)
  • spec/lib/shoryuken/instrumentation/notifications_spec.rb (1 hunks)
  • spec/lib/shoryuken/processor_spec.rb (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
spec/lib/shoryuken/processor_spec.rb (4)
lib/shoryuken.rb (1)
  • monitor (53-55)
lib/shoryuken/instrumentation/notifications.rb (1)
  • subscribe (64-72)
lib/shoryuken/processor.rb (2)
  • process (20-22)
  • process (36-62)
lib/shoryuken/instrumentation/event.rb (1)
  • duration (46-48)
🪛 RuboCop (1.81.7)
lib/shoryuken/instrumentation/notifications.rb

[warning] 123-127: Avoid rescuing the Exception class. Perhaps you meant to rescue StandardError?

(Lint/RescueException)

spec/lib/shoryuken/instrumentation/notifications_spec.rb

[convention] 199-199: Redundant RuntimeError argument can be removed.

(Style/RedundantException)


[convention] 295-295: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 296-296: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 310-310: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 311-311: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 312-312: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 319-319: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 320-320: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)


[convention] 335-335: Space inside empty braces detected.

(Layout/SpaceInsideBlockBraces)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Integrations (3.4)
  • GitHub Check: Integrations (3.2)
  • GitHub Check: Integrations (3.3)
  • GitHub Check: Integrations (3.2)
  • GitHub Check: Integrations (3.4)
  • GitHub Check: Integrations (3.3)
🔇 Additional comments (2)
spec/lib/shoryuken/processor_spec.rb (1)

37-91: Instrumentation specs thoroughly cover success and failure flows

The new examples validate message.processed payload (queue, message_id, worker, duration), ActiveSupport-style exception fields, and Karafka-style error.occurred events, with proper monitor reset in setup/teardown. This gives solid coverage of the new instrumentation behavior around Processor#process.

lib/shoryuken/processor.rb (1)

40-45: Processor#process instrumentation wiring looks correct

Wrapping the middleware/worker invocation in Shoryuken.monitor.instrument('message.processed', message_payload) and using message_payload for queue/message_id/worker gives the expected data to subscribers, and aligns with the behaviors asserted in the specs (including exception payloads handled inside Notifications#instrument). No functional issues spotted here.

Also applies to: 57-58, 66-75

- Add new events to Notifications:
  - fetcher.started, fetcher.completed, fetcher.retry
  - manager.dispatch, manager.processor_assigned, manager.failed
  - queue.empty, app.quiet

- Enhance LoggerListener to handle all events:
  - Use dynamic method dispatch (on_event_name pattern)
  - Handle fetcher events (started, completed, retry)
  - Handle manager events (dispatch, processor_assigned, failed)
  - Handle queue events (polling, empty)
  - Skip logging message.processed when exception present
  - Include backtrace in error.occurred logging

- Migrate direct logger calls to events:
  - Fetcher: publish fetcher.started/completed/retry events
  - Manager: publish manager.dispatch/processor_assigned/failed events
  - Launcher: remove direct logger calls (use events via fire_event)

- Update tests:
  - Add comprehensive LoggerListener tests for new events
  - Update fetcher_spec to verify events instead of logger calls

This follows the Karafka pattern where components publish events
and listeners handle logging, allowing users to customize logging
behavior or replace it entirely with their own listener.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
spec/lib/shoryuken/instrumentation/logger_listener_spec.rb (1)

25-25: Fix test expectations to properly validate log messages.

Multiple test cases use .and_yield.and_return(...) which doesn't properly validate the logged message. Logger methods called with blocks don't return the block's result, so the .and_return() part is misleading and doesn't actually verify the correct message is logged.

Replace expectations like this:

expect(logger).to receive(:info).and_yield.and_return('Shoryuken started with 2 group(s)')

With proper validation:

expect(logger).to receive(:info) do |&block|
  expect(block.call).to eq('Shoryuken started with 2 group(s)')
end

Or if you only need to verify the method is called with a block:

expect(logger).to receive(:info).and_yield

This issue occurs on lines: 25, 35, 45, 55, 65, 109, 141, 183, 225, 275, 285.

Also applies to: 35-35, 45-45, 55-55, 65-65, 109-109, 141-141, 183-183, 225-225, 275-275, 285-285

🧹 Nitpick comments (2)
lib/shoryuken/instrumentation/notifications.rb (1)

128-156: Add a comment explaining the Exception rescue.

The instrument method rescues Exception (line 133) rather than StandardError, which is unusual. While this is intentional to ensure instrumentation captures all exceptions (including SystemExit, SignalException, etc.) before re-raising them, this pattern should be documented to avoid confusion during future maintenance.

Apply this diff to add a clarifying comment:

         begin
           result = yield payload if block_given?
-        rescue Exception => e
+        rescue Exception => e # rubocop:disable Lint/RescueException - intentionally capture all exceptions for instrumentation
           exception_raised = e
           payload[:exception] = [e.class.name, e.message]
           payload[:exception_object] = e
spec/lib/shoryuken/instrumentation/logger_listener_spec.rb (1)

153-153: Consider using %w for word arrays.

The backtrace array literals could use %w notation for cleaner syntax:

-          backtrace: ['line1', 'line2']
+          backtrace: %w[line1 line2]
-        error.set_backtrace(['line1', 'line2'])
+        error.set_backtrace(%w[line1 line2])

Also applies to: 259-259

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 442bd62 and acf9f40.

📒 Files selected for processing (7)
  • lib/shoryuken/fetcher.rb (2 hunks)
  • lib/shoryuken/instrumentation/logger_listener.rb (1 hunks)
  • lib/shoryuken/instrumentation/notifications.rb (1 hunks)
  • lib/shoryuken/launcher.rb (1 hunks)
  • lib/shoryuken/manager.rb (3 hunks)
  • spec/lib/shoryuken/fetcher_spec.rb (1 hunks)
  • spec/lib/shoryuken/instrumentation/logger_listener_spec.rb (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • lib/shoryuken/instrumentation/logger_listener.rb
🧰 Additional context used
🧬 Code graph analysis (4)
spec/lib/shoryuken/fetcher_spec.rb (5)
lib/shoryuken.rb (1)
  • monitor (53-55)
lib/shoryuken/instrumentation/notifications.rb (1)
  • subscribe (74-82)
spec/integrations_helper.rb (2)
  • double (198-200)
  • assert (89-213)
lib/shoryuken/fetcher.rb (1)
  • fetch (24-39)
spec/lib/shoryuken/polling/base_strategy_spec.rb (1)
  • messages_found (16-19)
lib/shoryuken/launcher.rb (1)
lib/shoryuken/util.rb (1)
  • fire_event (29-45)
lib/shoryuken/manager.rb (2)
lib/shoryuken.rb (1)
  • monitor (53-55)
lib/shoryuken/instrumentation/notifications.rb (1)
  • publish (169-183)
spec/lib/shoryuken/instrumentation/logger_listener_spec.rb (2)
lib/shoryuken/instrumentation/logger_listener.rb (2)
  • logger (22-24)
  • call (30-33)
lib/shoryuken/util.rb (1)
  • logger (10-12)
🪛 RuboCop (1.81.7)
lib/shoryuken/instrumentation/notifications.rb

[warning] 133-137: Avoid rescuing the Exception class. Perhaps you meant to rescue StandardError?

(Lint/RescueException)

spec/lib/shoryuken/instrumentation/logger_listener_spec.rb

[convention] 153-153: Use %w or %W for an array of words.

(Style/WordArray)


[convention] 259-259: Use %w or %W for an array of words.

(Style/WordArray)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Integrations (3.3)
  • GitHub Check: Integrations (3.4)
  • GitHub Check: Integrations (3.2)
  • GitHub Check: Integrations (3.2)
  • GitHub Check: Integrations (3.3)
  • GitHub Check: Integrations (3.4)
🔇 Additional comments (12)
lib/shoryuken/launcher.rb (2)

120-123: LGTM! Clean refactor using safe navigation.

The use of &.call is idiomatic Ruby and cleanly handles optional callbacks without explicit nil checks. The :startup event firing remains in place, ensuring integration with the new instrumentation system.


128-131: LGTM! Consistent with start_callback refactor.

The safe navigation pattern is applied consistently with start_callback, maintaining clean callback handling while preserving the :shutdown event firing for instrumentation.

spec/lib/shoryuken/fetcher_spec.rb (1)

38-54: LGTM!

The test correctly verifies the new event-based instrumentation by subscribing to events and asserting on their names and payloads. This is a clean replacement for the previous logger-based assertions.

lib/shoryuken/instrumentation/notifications.rb (3)

169-183: LGTM!

The publish method properly isolates subscriber failures with rescue StandardError, ensuring that misbehaving subscribers cannot break the main application flow. The thread-safe subscriber collection and error logging are well implemented.


57-82: LGTM!

The thread-safe initialization and subscription mechanism is well-designed. Supporting both named event subscriptions and global (:all) subscriptions provides flexibility for different monitoring use cases.


26-55: LGTM!

The EVENTS constant provides excellent documentation of all supported instrumentation events. The categorization (app, fetcher, manager, message, worker, queue, error) makes it easy to understand the system's observability surface.

lib/shoryuken/manager.rb (3)

96-101: LGTM!

The manager.dispatch event provides excellent visibility into the dispatch state, including processor availability (ready, busy) and active queue information. This enables effective monitoring of system utilization.


147-150: LGTM!

The manager.processor_assigned event enables tracking of message-to-processor assignments, which is valuable for debugging message flow and monitoring processing distribution.


216-221: LGTM!

The manager.failed event includes comprehensive error information (error object, message, class, backtrace), enabling effective error tracking and integration with monitoring tools like Sentry or Datadog.

lib/shoryuken/fetcher.rb (3)

28-28: LGTM!

The fetcher.started event provides visibility into fetch operation initiation with appropriate context (queue name and limit).


32-35: LGTM!

The fetcher.completed event includes valuable metrics (message_count, duration_ms), enabling performance monitoring and analysis of fetch operation effectiveness.


59-63: LGTM!

The fetcher.retry event provides excellent visibility into retry attempts, including error context. This is valuable for debugging connectivity issues and understanding retry patterns.

Handle case where sqs_msg doesn't respond to message_id (e.g., in
tests using plain arrays as message mocks). This prevents NoMethodError
during instrumentation event publishing.
Replace plain arrays/integers with proper message doubles that
respond to message_id, matching the real SQS message interface.
This ensures tests accurately reflect production behavior.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants