Skip to content

Commit

Permalink
Merge pull request #6 from Jimdo/client_metrics
Browse files Browse the repository at this point in the history
Client metrics
  • Loading branch information
scootklein committed Apr 24, 2014
2 parents 616a69d + 147ec79 commit 6732198
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 52 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
language: ruby
rvm:
- 2.0.0
- 2.1.0
1 change: 0 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
source 'https://rubygems.org'
gemspec

1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Gems:
Compatibility (tested):

* Ruby 2.0.0
* Ruby 2.1.0

(if you can confirm another version of Ruby, email me at [email protected])

Expand Down
2 changes: 2 additions & 0 deletions lib/librato-sidekiq.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'librato-sidekiq/middleware'
require 'librato-sidekiq/client_middleware'

Librato::Sidekiq::Middleware.configure
Librato::Sidekiq::ClientMiddleware.configure
32 changes: 32 additions & 0 deletions lib/librato-sidekiq/client_middleware.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
module Librato
module Sidekiq
class ClientMiddleware < Middleware
def reconfigure
# puts "Reconfiguring with: #{options}"
::Sidekiq.configure_client do |config|
config.client_middleware do |chain|
chain.remove self.class
chain.add self.class, options
end
end
end

protected

def track(tracking_group, stats, worker_instance, msg, queue, elapsed)
tracking_group.increment 'queued'
return unless allowed_to_submit queue, worker_instance
# puts "doing Librato insert"
tracking_group.group queue.to_s do |q|
q.increment 'queued'

# using something like User.delay.send_email invokes
# a class name with slashes. remove them in favor of underscores
q.group msg['class'].underscore.gsub('/', '_') do |w|
w.increment 'queued'
end
end
end
end
end
end
123 changes: 72 additions & 51 deletions lib/librato-sidekiq/middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,31 @@ def initialize(options = {})
# hard dependency on one or the other being present
rails = !!defined?(Librato::Rails)
rack = !!defined?(Librato::Rack)
raise "librato-sidekiq depends on having one of librato-rails or librato-rack installed" unless rails || rack
fail 'librato-sidekiq depends on having one of librato-rails or librato-rack installed' unless rails || rack

# librato-rails >= 0.10 changes behavior of reporting agent
if File.basename($0) == 'sidekiq' && rails && Librato::Rails::VERSION.split('.')[1].to_i >= 10 && ENV['LIBRATO_AUTORUN'].nil?
puts "NOTICE: --------------------------------------------------------------------"
puts "NOTICE: THE REPORTING AGENT HAS NOT STARTED, AND NO METRICS WILL BE SENT"
puts "NOTICE: librato-rails >= 0.10 requires LIBRATO_AUTORUN=1 in your environment"
puts "NOTICE: --------------------------------------------------------------------"
if File.basename($PROGRAM_NAME) == 'sidekiq' && rails && Librato::Rails::VERSION.split('.')[1].to_i >= 10 && ENV['LIBRATO_AUTORUN'].nil?
puts 'NOTICE: --------------------------------------------------------------------'
puts 'NOTICE: THE REPORTING AGENT HAS NOT STARTED, AND NO METRICS WILL BE SENT'
puts 'NOTICE: librato-rails >= 0.10 requires LIBRATO_AUTORUN=1 in your environment'
puts 'NOTICE: --------------------------------------------------------------------'
end

self.reconfigure
reconfigure
end

def self.configure
yield(self) if block_given?
self.new # will call reconfigure
new # will call reconfigure
end

def options
{
:enabled => self.enabled,
:whitelist_queues => self.whitelist_queues,
:blacklist_queues => self.blacklist_queues,
:whitelist_classes => self.whitelist_classes,
:blacklist_classes => self.blacklist_classes
enabled: enabled,
whitelist_queues: whitelist_queues,
blacklist_queues: blacklist_queues,
whitelist_classes: whitelist_classes,
blacklist_classes: blacklist_classes
}
end

Expand All @@ -48,58 +48,79 @@ def reconfigure
::Sidekiq.configure_server do |config|
config.server_middleware do |chain|
chain.remove self.class
chain.add self.class, self.options
chain.add self.class, options
end
end
end

def call(worker_instance, msg, queue)
unless self.enabled
# puts "Gem not enabled"
yield
return
end

t = Time.now
yield
elapsed = (Time.now - t).to_f
# redis_pool is needed for the sidekiq 3 upgrade
# https://github.com/mperham/sidekiq/blob/master/3.0-Upgrade.md
def call(worker_instance, msg, queue, redis_pool = nil)
start_time = Time.now
result = yield
elapsed = (Time.now - start_time).to_f

queue_in_whitelist = self.whitelist_queues.nil? || self.whitelist_queues.empty? || self.whitelist_queues.include?(queue.to_s)
queue_in_blacklist = self.blacklist_queues.include?(queue.to_s)
class_in_whitelist = self.whitelist_classes.nil? || self.whitelist_classes.empty? || self.whitelist_classes.include?(worker_instance.class.to_s)
class_in_blacklist = self.blacklist_classes.include?(worker_instance.class.to_s)
return result unless enabled
# puts "#{worker_instance} #{queue}"

# puts "#{worker_instance} #{queue} qw:#{queue_in_whitelist} qb:#{queue_in_blacklist} cw:#{class_in_whitelist} cb:#{class_in_blacklist}"
stats = ::Sidekiq::Stats.new

Librato.group 'sidekiq' do |sidekiq|
stats = ::Sidekiq::Stats.new
track sidekiq, stats, worker_instance, msg, queue, elapsed
end

sidekiq.increment 'processed'
result
end

{
enqueued: nil,
failed: nil,
scheduled_size: 'scheduled'
}.each do |method, name|
sidekiq.measure (name || method).to_s, stats.send(method).to_i
private

def track(tracking_group, stats, worker_instance, msg, queue, elapsed)
submit_general_stats tracking_group, stats
return unless allowed_to_submit queue, worker_instance
# puts "doing Librato insert"
tracking_group.group queue.to_s do |q|
q.increment 'processed'
q.timing 'time', elapsed
q.measure 'enqueued', stats.queues[queue].to_i

# using something like User.delay.send_email invokes
# a class name with slashes. remove them in favor of underscores
q.group msg['class'].underscore.gsub('/', '_') do |w|
w.increment 'processed'
w.timing 'time', elapsed
end
end
end

def submit_general_stats(group, stats)
group.increment 'processed'
{
enqueued: nil,
failed: nil,
scheduled_size: 'scheduled'
}.each do |method, name|
group.measure((name || method).to_s, stats.send(method).to_i)
end
end

return unless class_in_whitelist && !class_in_blacklist && queue_in_whitelist && !queue_in_blacklist
# puts "doing Librato insert"
def queue_in_whitelist(queue)
whitelist_queues.nil? || whitelist_queues.empty? || whitelist_queues.include?(queue.to_s)
end

sidekiq.group queue.to_s do |q|
q.increment 'processed'
q.timing 'time', elapsed
q.measure 'enqueued', stats.queues[queue].to_i
def queue_in_blacklist(queue)
blacklist_queues.include?(queue.to_s)
end

# using something like User.delay.send_email invokes a class name with slashes
# remove them in favor of underscores
q.group msg["class"].underscore.gsub('/', '_') do |w|
w.increment 'processed'
w.timing 'time', elapsed
end
end
end
def class_in_whitelist(worker_instance)
whitelist_classes.nil? || whitelist_classes.empty? || whitelist_classes.include?(worker_instance.class.to_s)
end

def class_in_blacklist(worker_instance)
blacklist_classes.include?(worker_instance.class.to_s)
end

def allowed_to_submit(queue, worker_instance)
class_in_whitelist(worker_instance) && !class_in_blacklist(worker_instance) && queue_in_whitelist(queue) && !queue_in_blacklist(queue)
end
end
end
Expand Down
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'librato-sidekiq/middleware'
require 'librato-sidekiq/client_middleware'
require 'timecop'

# Fix time
Expand Down
137 changes: 137 additions & 0 deletions spec/unit/client_middleware_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
require 'spec_helper'

describe Librato::Sidekiq::ClientMiddleware do

before(:each) do
stub_const "Librato::Rails", Class.new
stub_const "Sidekiq", Module.new
stub_const "Sidekiq::Stats", Class.new
end

let(:middleware) do
allow(Sidekiq).to receive(:configure_client)
Librato::Sidekiq::ClientMiddleware.new
end

describe '#intialize' do
it 'should call reconfigure' do
expect(Sidekiq).to receive(:configure_client)
Librato::Sidekiq::ClientMiddleware.new
end
end

describe '#configure' do

before(:each) { Sidekiq.should_receive(:configure_client) }

it 'should yield with it self as argument' do
expect { |b| Librato::Sidekiq::ClientMiddleware.configure &b }.to yield_with_args(Librato::Sidekiq::ClientMiddleware)
end

it 'should return a new instance' do
expect(Librato::Sidekiq::ClientMiddleware.configure).to be_an_instance_of Librato::Sidekiq::ClientMiddleware
end

end

describe '#reconfigure' do

let(:chain) { double() }
let(:config) { double() }

it 'should add itself to the server middleware chain' do
expect(chain).to receive(:remove).with Librato::Sidekiq::ClientMiddleware
expect(chain).to receive(:add).with Librato::Sidekiq::ClientMiddleware, middleware.options

expect(config).to receive(:client_middleware).once.and_yield(chain)
expect(Sidekiq).to receive(:configure_client).once.and_yield(config)

middleware.reconfigure
end
end

describe '#call' do

let(:meter) { double(measure: nil, increment: nil, group: nil) }

let(:queue_name) { 'some_awesome_queue' }
let(:some_worker_instance) { nil }
let(:some_message) { Hash['class', double(underscore: queue_name)] }

let(:sidekiq_stats_instance_double) do
double("Sidekiq::Stats", :enqueued => 1, :failed => 2, :scheduled_size => 3)
end

context 'when middleware is not enabled' do

before(:each) { middleware.enabled = false }

it { expect { |b| middleware.call(1,2,3,&b) }.to yield_with_no_args }

it 'should not send any metrics' do
Librato.should_not_receive(:group)
end

end

context 'when middleware is enabled but queue is blacklisted' do

before(:each) do
allow(Sidekiq::Stats).to receive(:new).and_return(sidekiq_stats_instance_double)
allow(Librato).to receive(:group).with('sidekiq').and_yield meter
end

before(:each) do
middleware.enabled = true
middleware.blacklist_queues = []
middleware.blacklist_queues << queue_name
end

it { expect { |b| middleware.call(some_worker_instance, some_message, queue_name, &b) }.to yield_with_no_args }

it 'should measure increment queued metric' do
expect(meter).to receive(:increment).with 'queued'
middleware.call(some_worker_instance, some_message, queue_name) {}
end

end

context 'when middleware is enabled and everything is whitlisted' do

let(:sidekiq_group) { double(measure: nil, increment: nil, group: nil) }
let(:queue_group) { double(measure: nil, increment: nil, timing: nil, group: nil) }
let(:class_group) { double(measure: nil, increment: nil, timing: nil, group: nil) }

before(:each) do
middleware.enabled = true
middleware.blacklist_queues = []
end

before(:each) do
allow(Sidekiq::Stats).to receive(:new).and_return(sidekiq_stats_instance_double)
allow(Librato).to receive(:group).with('sidekiq').and_yield(sidekiq_group)
allow(sidekiq_stats_instance_double).to receive(:queues)
end

it 'should measure queue metrics' do
expect(sidekiq_group).to receive(:group).and_yield(queue_group)

expect(queue_group).to receive(:increment).with "queued"

middleware.call(some_worker_instance, some_message, queue_name) {}
end

it 'should measure class metrics' do
expect(sidekiq_group).to receive(:group).and_yield(queue_group)
expect(queue_group).to receive(:group).with(queue_name).and_yield(class_group)

expect(class_group).to receive(:increment).with "queued"

middleware.call(some_worker_instance, some_message, queue_name) {}
end

end

end

end
1 change: 1 addition & 0 deletions spec/unit/middleware_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@

before(:each) do
middleware.enabled = true
middleware.blacklist_queues = []
middleware.blacklist_queues << queue_name
end

Expand Down

0 comments on commit 6732198

Please sign in to comment.